Files
kontextual-engine/tests/test_service_api.py

744 lines
29 KiB
Python

import pytest
from kontextual_engine import (
AuthorizationError,
OperationContext,
PolicyDecision,
PolicyEffect,
ServiceRuntime,
ValidationError,
create_app,
)
from kontextual_engine.adapters.memory import InMemoryAssetRegistryRepository
from kontextual_engine.api.app import _authorization_error_payload
def test_service_runtime_health_readiness_and_version_are_importable_without_fastapi() -> None:
runtime = ServiceRuntime(repository=InMemoryAssetRegistryRepository())
assert runtime.health()["status"] == "ok"
assert runtime.readiness()["ready"] is True
assert runtime.readiness()["checks"]["asset_registry"]["repository"] == (
"InMemoryAssetRegistryRepository"
)
assert runtime.version()["api_version"] == "v1"
def test_service_runtime_exposes_asset_metadata_relationship_audit_and_policy_operations() -> None:
runtime = ServiceRuntime(repository=InMemoryAssetRegistryRepository())
context = runtime.operation_context(actor_id="user-api", correlation_id="corr-api")
created = runtime.create_asset(
{
"asset_id": "asset-api",
"title": "API Asset",
"classification": {
"asset_type": "document",
"sensitivity": "internal",
"owner": "Platform Knowledge",
},
"metadata_records": [
{
"key": "status",
"value": "draft",
"confirmed": True,
"provenance": {"producer": "api-test"},
}
],
},
context,
)
successor = runtime.create_asset(
{
"asset_id": "asset-successor",
"title": "Successor",
"classification": {"asset_type": "document", "sensitivity": "public"},
},
context,
)
metadata = runtime.add_metadata_record(
"asset-api",
{"key": "reviewer", "value": "codex", "confirmed": True},
context,
)
lifecycle = runtime.transition_lifecycle(
"asset-api",
{"lifecycle": "retired"},
context,
)
relationship = runtime.create_relationship(
{
"source_asset_id": "asset-api",
"target_id": "asset-successor",
"predicate": "superseded_by",
"target_kind": "asset",
"confidence": 1.0,
},
context,
)
audit = runtime.list_audit_events(target="asset:asset-api")
policy = runtime.evaluate_policy(
{"action": "asset.retrieve", "resource": "asset:asset-api"},
context,
)
assert created["asset"]["id"] == "asset-api"
assert successor["asset"]["id"] == "asset-successor"
assert runtime.get_asset("asset-api")["lifecycle"] == "retired"
assert runtime.list_assets(asset_type="document")["count"] == 2
assert metadata["version"]["change_type"] == "metadata_changed"
assert lifecycle["asset"]["lifecycle"] == "retired"
assert [record["key"] for record in runtime.list_metadata_records("asset-api")["items"]] == [
"status",
"reviewer",
]
assert relationship["relationship"]["predicate"] == "superseded_by"
assert runtime.list_relationships(source_id="asset-api")["count"] == 1
assert [event["operation"] for event in audit["items"]] == [
"asset.create",
"asset.metadata.add",
"asset.lifecycle.transition",
"asset.relationship.add",
]
assert policy["effect"] == "allow"
def test_service_runtime_policy_denial_blocks_protected_asset_operation() -> None:
runtime = ServiceRuntime(
repository=InMemoryAssetRegistryRepository(),
policy_gateway=DenyCreatePolicy(),
)
with pytest.raises(AuthorizationError) as exc_info:
runtime.create_asset(
{
"asset_id": "asset-denied",
"title": "Denied",
"classification": {"asset_type": "document"},
},
runtime.operation_context(actor_id="user-denied"),
)
assert exc_info.value.details["policy_decision"]["effect"] == "deny"
assert runtime.list_assets()["count"] == 0
assert runtime.list_audit_events(target="asset:asset-denied")["items"][0]["outcome"] == "denied"
def test_service_runtime_exposes_ingestion_retrieval_transformation_and_workflow_operations(
tmp_path,
) -> None:
runtime = ServiceRuntime(repository=InMemoryAssetRegistryRepository())
context = runtime.operation_context(actor_id="user-api", correlation_id="corr-jobs")
source = tmp_path / "kontextual-source.txt"
source.write_text(
"Kontextual engine captures markdown proxy context, system boundaries, and source-grounded retrieval.",
encoding="utf-8",
)
ingestion = runtime.start_ingestion_job(
{
"mode": "file",
"path": str(source),
"asset_id": "asset-ingested",
"classification": {
"asset_type": "document",
"sensitivity": "internal",
"owner": "Platform Knowledge",
"topics": ["retrieval", "workflow"],
},
},
context,
)
retrieval = runtime.query_assets(
{
"text": "proxy context",
"include_snippets": True,
"representation_kind": "normalized",
"limit": 5,
},
context,
)
transformation = runtime.execute_transformation(
{
"operation_id": "structured_view",
"source_asset_ids": ["asset-ingested"],
"output_asset_id": "asset-derived",
"output_title": "Structured API View",
"metadata": {"request": "service-api-test"},
},
context,
)
template = runtime.register_workflow_template(
{
"template_id": "workflow-structured",
"name": "Structured Workflow",
"inputs": [{"name": "source_asset_ids", "kind": "asset"}],
"steps": [
{
"step_id": "build",
"kind": "transformation",
"operation_id": "structured_view",
"inputs": {"source_asset_ids": "$inputs.source_asset_ids"},
"outputs": {
"asset_id": "asset-workflow-structured",
"title": "Workflow Structured View",
"asset_type": "derived_artifact",
"media_type": "application/json",
},
}
],
},
context,
)
workflow = runtime.invoke_workflow_run(
{
"template_id": template["template_id"],
"inputs": {"source_asset_ids": ["asset-ingested"]},
},
context,
)
reconstruction = runtime.reconstruct_workflow_run(workflow["run"]["run_id"])
assert ingestion["status"] == "completed"
assert ingestion["correlation_id"] == "corr-jobs"
assert ingestion["output_asset_ids"] == ["asset-ingested"]
assert ingestion["retry_options"]["retryable"] is False
assert ingestion["job"]["partial_results"]["action"] == "created"
assert runtime.ingestion_capabilities()["connectors"][0]["supports_directories"] is True
assert retrieval["success"] is True
assert retrieval["correlation_id"] == "corr-jobs"
assert retrieval["results"][0]["asset_id"] == "asset-ingested"
assert retrieval["results"][0]["source_refs"]
assert retrieval["results"][0]["snippets"][0]["source_ref_id"]
assert retrieval["metadata"]["policy_enforced"] is True
assert transformation["success"] is True
assert transformation["run"]["status"] == "completed"
assert transformation["run"]["output_asset_ids"] == ["asset-derived"]
assert transformation["lineage"]["source_asset_ids"] == ["asset-ingested"]
assert transformation["audit_event"]["operation"] == "transformation.run.completed"
assert transformation["retry_options"]["retryable"] is False
assert runtime.get_transformation_run(transformation["run"]["run_id"])["status"] == "completed"
assert runtime.list_transformation_operations()["count"] >= 1
assert runtime.list_transformation_runs(status="completed")["count"] >= 2
assert workflow["success"] is True
assert workflow["run"]["status"] == "completed"
assert workflow["run"]["output_asset_ids"] == ["asset-workflow-structured"]
assert workflow["retry_options"]["retryable"] is False
assert runtime.get_workflow_run(workflow["run"]["run_id"])["status"] == "completed"
assert runtime.list_workflow_runs(status="completed")["count"] == 1
assert runtime.list_workflow_review_tasks(workflow_run_id=workflow["run"]["run_id"])["count"] == 0
assert runtime.list_workflow_exceptions(workflow_run_id=workflow["run"]["run_id"])["count"] == 0
assert reconstruction["run"]["run_id"] == workflow["run"]["run_id"]
assert reconstruction["template"]["template_id"] == "workflow-structured"
assert reconstruction["transformation_runs"][0]["status"] == "completed"
assert reconstruction["derived_lineage"][0]["output_asset_id"] == "asset-workflow-structured"
def test_service_runtime_exposes_failed_ingestion_job_state_and_recovery_envelope(tmp_path) -> None:
runtime = ServiceRuntime(repository=InMemoryAssetRegistryRepository())
context = runtime.operation_context(actor_id="user-api", correlation_id="corr-failed-job")
failed = runtime.start_ingestion_job(
{"mode": "file", "path": str(tmp_path / "missing.txt")},
context,
)
listed = runtime.list_ingestion_jobs(status="failed")
assert failed["status"] == "failed"
assert failed["correlation_id"] == "corr-failed-job"
assert failed["failures"][0]["code"] == "kontextual.not_found"
assert failed["retry_options"]["retryable"] is False
assert runtime.get_ingestion_job(failed["job_id"])["job_id"] == failed["job_id"]
assert listed["count"] == 1
assert listed["items"][0]["job_id"] == failed["job_id"]
def test_service_runtime_operation_context_represents_delegation_and_agent_identity() -> None:
runtime = ServiceRuntime(repository=InMemoryAssetRegistryRepository())
context = runtime.operation_context(
actor_id="agent-codex",
actor_type="ai_agent",
display_name="Codex",
external_ref="agent://codex",
groups=["automation", "engineering"],
delegated_actor_id="user-owner",
delegated_actor_type="human",
delegated_actor_display_name="Owner",
delegated_actor_groups=["knowledge-owners"],
request_scope={"tenant": "tenant-a", "surface": "service-api"},
policy_scope={"allowed_sensitivity": "internal"},
agent_id="agent-codex",
agent_name="Codex",
agent_run_id="run-123",
agent_tool="service-api",
correlation_id="corr-delegated",
)
payload = context.to_dict()
assert payload["actor"]["id"] == "agent-codex"
assert payload["actor"]["actor_type"] == "ai_agent"
assert payload["actor"]["groups"] == ["automation", "engineering"]
assert payload["delegated_actor"]["id"] == "user-owner"
assert payload["delegated_actor"]["actor_type"] == "human"
assert payload["request_scope"]["tenant"] == "tenant-a"
assert payload["policy_scope"]["allowed_sensitivity"] == "internal"
assert payload["metadata"]["agent"]["agent_run_id"] == "run-123"
assert payload["metadata"]["delegation"]["delegated_actor_id"] == "user-owner"
def test_service_api_authorization_error_payload_redacts_resource_metadata() -> None:
runtime = ServiceRuntime(
repository=InMemoryAssetRegistryRepository(),
policy_gateway=DenyCreatePolicy(),
)
with pytest.raises(AuthorizationError) as exc_info:
runtime.create_asset(
{
"asset_id": "asset-secret",
"title": "Protected Title",
"classification": {
"asset_type": "document",
"sensitivity": "restricted",
"metadata": {"secret": "do-not-echo"},
},
},
runtime.operation_context(actor_id="user-denied"),
)
payload = _authorization_error_payload(exc_info.value)
assert payload["details"]["policy_decision"]["effect"] == "deny"
assert "resource_metadata" not in payload["details"]["policy_decision"].get("context", {})
assert "Protected Title" not in str(payload)
assert "do-not-echo" not in str(payload)
def test_service_runtime_exposes_bounded_agent_operation_catalog_and_dispatch() -> None:
runtime = ServiceRuntime(repository=InMemoryAssetRegistryRepository())
context = runtime.operation_context(
actor_id="agent-codex",
actor_type="ai_agent",
agent_id="agent-codex",
agent_run_id="run-agent-1",
correlation_id="corr-agent",
)
runtime.create_asset(
{
"asset_id": "asset-agent",
"title": "Agent Visible Asset",
"classification": {"asset_type": "document", "sensitivity": "internal"},
"source_refs": [{"source_system": "test", "path": "agent.md"}],
},
context,
)
catalog = runtime.list_agent_operations()
dry_run = runtime.execute_agent_operation(
"enrich_metadata",
{
"dry_run": True,
"payload": {"asset_id": "asset-agent", "metadata": {"key": "agent_note", "value": "planned"}},
},
context,
)
search = runtime.execute_agent_operation(
"search_assets",
{"query": {"asset_type": "document", "limit": 5}},
context,
)
bundle = runtime.execute_agent_operation(
"retrieve_asset",
{"asset_id": "asset-agent"},
context,
)
report = runtime.execute_agent_operation(
"report_result",
{"summary": "Agent operation completed", "result_ref": {"asset_id": "asset-agent"}},
context,
)
assert catalog["count"] >= 10
assert runtime.get_agent_operation("search_assets")["audit_operation"] == "agent.operation.search_assets"
assert all(item["required_permissions"] for item in catalog["items"])
assert all(item["failure_modes"] for item in catalog["items"])
assert dry_run["dry_run"] is True
assert runtime.list_metadata_records("asset-agent")["count"] == 0
assert search["success"] is True
assert search["result"]["results"][0]["asset_id"] == "asset-agent"
assert search["audit_event"]["operation"] == "agent.operation.search_assets"
assert bundle["result"]["asset"]["id"] == "asset-agent"
assert bundle["result"]["source_grounded"] is True
assert report["result"]["audit_event"]["operation"] == "agent.report.recorded"
assert runtime.list_audit_events(target="agent_operation:search_assets")["count"] == 2
with pytest.raises(ValidationError):
runtime.execute_agent_operation("shell", {"command": "echo no"}, context)
def test_service_runtime_assembles_source_grounded_context_package_with_opaque_memory_refs(
tmp_path,
) -> None:
runtime = ServiceRuntime(repository=InMemoryAssetRegistryRepository())
context = runtime.operation_context(actor_id="agent-codex", actor_type="ai_agent", correlation_id="corr-context")
source = tmp_path / "context-source.txt"
source.write_text(
"Markdown-backed context packages should remain source grounded.",
encoding="utf-8",
)
runtime.start_ingestion_job(
{
"mode": "file",
"path": str(source),
"asset_id": "asset-context",
"classification": {"asset_type": "markdown", "sensitivity": "internal"},
},
context,
)
package = runtime.assemble_context_package(
{
"package_id": "ctxpkg-test",
"title": "Context Package",
"intent": "Support implementation planning.",
"format": "markitect",
"query": {
"text": "source grounded",
"representation_kind": "normalized",
"include_snippets": True,
},
"constraints": {"max_sensitivity": "internal", "no_external_publish": True},
"external_memory_refs": [
{
"ref_id": "phase-memory:episode-1",
"system": "phase-memory",
"content": "memory graph detail must not be embedded",
}
],
},
context,
)
assert runtime.context_package_schema()["formats"] == ["kontextual", "markitect"]
assert package["package_id"] == "ctxpkg-test"
assert package["source_grounded"] is True
assert package["result_count"] == 1
assert package["items"][0]["asset_id"] == "asset-context"
assert package["items"][0]["source_refs"]
assert package["items"][0]["snippets"]
assert package["external_memory_refs"] == [
{
"ref_id": "phase-memory:episode-1",
"system": "phase-memory",
"kind": "memory_ref",
"opaque": True,
"metadata": {},
}
]
assert "memory graph detail" not in str(package["external_memory_refs"])
assert package["markitect_payload"]["kind"] == "markitect.context_package"
assert package["markitect_payload"]["adapter_boundary"] == (
"markdown rendering and selector semantics are delegated to markitect-tool"
)
assert package["audit_event"]["operation"] == "context_package.assemble"
assert package["policy_decision"]["effect"] == "allow"
def test_service_runtime_agent_operations_support_review_required_and_dry_run_only_policy() -> None:
review_runtime = ServiceRuntime(
repository=InMemoryAssetRegistryRepository(),
policy_gateway=ReviewRequiredAgentPolicy(),
)
dry_run_runtime = ServiceRuntime(
repository=InMemoryAssetRegistryRepository(),
policy_gateway=DryRunOnlyAgentPolicy(),
)
review_context = review_runtime.operation_context(
actor_id="agent-codex",
actor_type="ai_agent",
correlation_id="corr-review",
)
dry_run_context = dry_run_runtime.operation_context(
actor_id="agent-codex",
actor_type="ai_agent",
correlation_id="corr-dry-run",
)
review = review_runtime.execute_agent_operation(
"enrich_metadata",
{"payload": {"asset_id": "asset-review", "metadata": {"key": "risk", "value": "high"}}},
review_context,
)
dry_run_required = dry_run_runtime.execute_agent_operation(
"transform_asset",
{"payload": {"transformation": {"operation_id": "structured_view"}}},
dry_run_context,
)
dry_run = dry_run_runtime.execute_agent_operation(
"transform_asset",
{"dry_run": True, "payload": {"transformation": {"operation_id": "structured_view"}}},
dry_run_context,
)
assert review["success"] is False
assert review["review_required"] is True
assert review["audit_event"]["outcome"] == "review_required"
assert review["policy_decision"]["effect"] == "require_review"
assert dry_run_required["success"] is False
assert dry_run_required["dry_run_required"] is True
assert dry_run_required["audit_event"]["outcome"] == "dry_run"
assert dry_run_required["policy_decision"]["effect"] == "dry_run_only"
assert dry_run["success"] is True
assert dry_run["dry_run"] is True
assert dry_run["audit_event"]["outcome"] == "dry_run"
def test_service_runtime_ingestion_directory_reports_partial_failures(tmp_path) -> None:
runtime = ServiceRuntime(repository=InMemoryAssetRegistryRepository())
context = runtime.operation_context(actor_id="user-api", correlation_id="corr-partial")
(tmp_path / "good.txt").write_text("source grounded text", encoding="utf-8")
(tmp_path / "unsupported.bin").write_bytes(b"\x00\x01\x02")
result = runtime.start_ingestion_job(
{"mode": "directory", "path": str(tmp_path), "recursive": False},
context,
)
assert result["status"] == "partially_completed"
assert result["output_asset_ids"]
assert result["failures"][0]["code"] == "kontextual.adapter_unavailable"
assert result["job"]["partial_results"]["files_total"] == 2
assert result["job"]["partial_results"]["failed"] == 1
assert result["job"]["partial_results"]["succeeded"] == 1
assert result["retry_options"]["retryable"] is True
def test_service_runtime_exposes_operator_metrics_recovery_export_governance_and_quality(
tmp_path,
) -> None:
runtime = ServiceRuntime(repository=InMemoryAssetRegistryRepository())
context = runtime.operation_context(actor_id="operator", actor_type="service_account", correlation_id="corr-ops")
source = tmp_path / "ops-source.txt"
source.write_text("Operational export and governance signals should be source grounded.", encoding="utf-8")
ingestion = runtime.start_ingestion_job(
{
"mode": "file",
"path": str(source),
"asset_id": "asset-ops",
"classification": {
"asset_type": "document",
"sensitivity": "confidential",
},
},
context,
)
runtime.query_assets({"text": "governance", "include_snippets": True}, context)
transform = runtime.execute_transformation(
{
"operation_id": "structured_view",
"source_asset_ids": ["asset-ops"],
"output_asset_id": "asset-ops-derived",
},
context,
)
metrics = runtime.operational_metrics()
jobs = runtime.inspect_jobs(correlation_id="corr-ops")
events = runtime.operational_events(correlation_id="corr-ops", operation_prefix="asset.")
recovery_actions = runtime.recovery_actions()
inspected_failure = runtime.execute_recovery_action(
"inspect_failure",
{"kind": "ingestion", "id": ingestion["job_id"]},
context,
)
export = runtime.create_export_package({"scope": {"asset_ids": ["asset-ops", "asset-ops-derived"]}}, context)
validation = runtime.validate_export_package({"package": export}, context)
tampered = dict(export)
tampered["manifest"] = dict(export["manifest"])
tampered["manifest"]["asset_count"] = 999
invalid = runtime.validate_export_package({"package": tampered}, context)
governance = runtime.governance_report({"scope": {"asset_ids": ["asset-ops"]}}, context)
extension_catalog = runtime.extension_catalog()
extension_event = runtime.emit_extension_event(
{"event_type": "derived_artifact.created", "target": "asset:asset-ops-derived"},
context,
)
signal = runtime.record_quality_signal(
{
"signal_type": "ai_usage",
"target": "workflow:ops",
"asset_id": "asset-ops",
"agent_id": "agent-ops",
"metrics": {"confidence": 0.91},
"ai_usage": {"provider": "test", "model": "deterministic", "tokens": 123},
"cost": {"estimated": 0.42, "currency": "EUR"},
},
context,
)
quality_cost = runtime.quality_cost_signals()
smoke = runtime.performance_smoke_report()
compliance = runtime.mvp_compliance_report()
assert metrics["ingestion"]["job_count"] >= 1
assert metrics["retrieval"]["query_events"] >= 1
assert metrics["transformations"]["completed"] >= 1
assert jobs["count"] >= 2
assert events["items"][0]["correlation_id"] == "corr-ops"
assert any(action["action"] == "retry_ingestion_job" for action in recovery_actions["items"])
assert inspected_failure["result"]["job_id"] == ingestion["job_id"]
assert export["manifest"]["asset_count"] == 2
assert export["manifest"]["export_hash"].startswith("sha256:")
assert export["adapter_sections"]["markitect_tool"]["included"] is False
assert validation["valid"] is True
assert invalid["valid"] is False
assert any(issue["code"] == "export.count_mismatch" for issue in invalid["issues"])
assert governance["redaction"]["content_included"] is False
assert "governance.sensitive_without_review_metadata" in governance["summary"]
assert "markitect-tool" in extension_catalog["markitect_boundary"]
assert extension_event["event"]["operation"] == "extension.derived_artifact.created"
assert signal["event"]["operation"] == "quality.signal.recorded"
assert quality_cost["ai_usage"]["tokens"] == 123
assert quality_cost["cost"]["estimated_total"] == 0.42
assert smoke["smoke_targets"] == ["ingestion", "retrieval", "workflow", "export"]
assert compliance["implemented_capabilities"]["exports"] is True
assert transform["lineage"]["output_asset_id"] == "asset-ops-derived"
def test_create_app_reports_missing_optional_dependency_when_fastapi_is_absent() -> None:
try:
import fastapi # noqa: F401
except ImportError:
with pytest.raises(RuntimeError, match=r"kontextual-engine\[service\]"):
create_app()
else:
pytest.skip("FastAPI is installed; missing-dependency path is not active")
@pytest.fixture
def client():
pytest.importorskip("fastapi")
pytest.importorskip("httpx")
from fastapi.testclient import TestClient
app = create_app(ServiceRuntime(repository=InMemoryAssetRegistryRepository()))
with TestClient(app) as test_client:
yield test_client
def test_service_health_readiness_version_and_openapi_contracts(client) -> None:
health = client.get("/health")
ready = client.get("/ready")
version = client.get("/version")
versioned_health = client.get("/api/v1/health")
openapi = client.get("/openapi.json")
assert health.status_code == 200
assert health.json()["status"] == "ok"
assert ready.status_code == 200
assert ready.json()["ready"] is True
assert ready.json()["checks"]["asset_registry"]["status"] == "ok"
assert version.status_code == 200
assert version.json()["api_version"] == "v1"
assert versioned_health.status_code == 200
assert versioned_health.json()["api_version"] == "v1"
assert openapi.status_code == 200
paths = openapi.json()["paths"]
assert "/health" in paths
assert "/ready" in paths
assert "/version" in paths
assert "/api/v1/health" in paths
assert "/api/v1/ready" in paths
assert "/api/v1/version" in paths
assert "/api/v1/context" in paths
assert "/api/v1/assets" in paths
assert "/api/v1/relationships" in paths
assert "/api/v1/audit/events" in paths
assert "/api/v1/policy/evaluate" in paths
assert "/api/v1/ingestion/jobs" in paths
assert "/api/v1/retrieval/assets" in paths
assert "/api/v1/transformations/runs" in paths
assert "/api/v1/workflows/templates" in paths
assert "/api/v1/workflows/runs" in paths
assert "/api/v1/workflows/reviews" in paths
assert "/api/v1/agents/operations" in paths
assert "/api/v1/context-packages" in paths
assert "/api/v1/context-packages/schema" in paths
assert "/api/v1/operations/metrics" in paths
assert "/api/v1/operations/recovery/actions" in paths
assert "/api/v1/exports" in paths
assert "/api/v1/governance/report" in paths
assert "/api/v1/extensions/catalog" in paths
assert "/api/v1/quality/signals" in paths
assert "/api/v1/compliance/mvp" in paths
def test_create_app_attaches_runtime_to_application_state(client) -> None:
assert client.app.state.kontextual_runtime.api_version == "v1"
class DenyCreatePolicy:
def authorize(
self,
context: OperationContext,
action: str,
resource: str,
*,
resource_metadata: dict[str, str] | None = None,
) -> PolicyDecision:
if action == "asset.create":
return PolicyDecision.deny(
context.actor.id,
action,
resource,
reason="asset creation disabled",
context={"resource_metadata": resource_metadata or {}},
)
return PolicyDecision.allow(context.actor.id, action, resource)
class ReviewRequiredAgentPolicy:
def authorize(
self,
context: OperationContext,
action: str,
resource: str,
*,
resource_metadata: dict[str, str] | None = None,
) -> PolicyDecision:
if action == "agent.operation.enrich_metadata":
return PolicyDecision(
PolicyEffect.REQUIRE_REVIEW,
context.actor.id,
action,
resource,
reason="metadata enrichment requires review",
obligations={"queue": "knowledge-review"},
context={"resource_metadata": resource_metadata or {}},
)
return PolicyDecision.allow(context.actor.id, action, resource)
class DryRunOnlyAgentPolicy:
def authorize(
self,
context: OperationContext,
action: str,
resource: str,
*,
resource_metadata: dict[str, str] | None = None,
) -> PolicyDecision:
if action == "agent.operation.transform_asset":
return PolicyDecision(
PolicyEffect.DRY_RUN_ONLY,
context.actor.id,
action,
resource,
reason="transformation requires dry-run preview",
context={"resource_metadata": resource_metadata or {}},
)
return PolicyDecision.allow(context.actor.id, action, resource)