from pathlib import Path from typing import Any from kontextual_engine import ( Actor, ActorType, AssetRegistryService, AssetRepresentation, AuditOutcome, Classification, InMemoryAssetRegistryRepository, OperationContext, PolicyDecision, RepresentationKind, Sensitivity, SQLiteAssetRegistryRepository, TransformationRequest, TransformationRunStatus, TransformationService, VersionChangeType, default_transformation_registry, ) def test_default_transformation_registry_declares_engine_and_adapter_boundaries() -> None: registry = default_transformation_registry() operations = {operation.operation_id: operation for operation in registry.list_operations()} assert "structured_view" in operations assert operations["structured_view"].adapter_ref is None assert registry.handler_for("structured_view") is not None assert operations["structured_view"].required_permissions == ("asset.retrieve", "asset.create") for operation_id in ( "markdown_compose", "markdown_include", "markdown_transform", "markdown_validate", ): operation = operations[operation_id] assert operation.adapter_ref == "markitect-tool" assert registry.handler_for(operation_id) is None assert operation.metadata["boundary"] == "adapter-backed; do not reimplement markdown syntax in engine" def test_structured_view_transformation_persists_run_output_lineage_and_audit() -> None: repo = InMemoryAssetRegistryRepository() registry = AssetRegistryService(repo) context = operation_context() source = registry.create_asset( "Intent", Classification( asset_type="document", sensitivity=Sensitivity.CONFIDENTIAL, owner="Platform Knowledge", ), context, asset_id="asset-intent", representations=[ AssetRepresentation.from_content( "asset-intent", RepresentationKind.SOURCE, "text/markdown", "# Intent\n\nBuild a context engine.\n", storage_ref="object://intent-source", ) ], ) service = TransformationService(repo, asset_service=registry) result = service.execute_transformation( TransformationRequest( operation_id="structured_view", source_asset_ids=("asset-intent",), parameters={"shape": "asset-summary"}, output_asset_id="asset-intent-structured", output_title="Intent Structured View", ), context, ) assert result.success is True assert result.run is not None assert result.run.status == TransformationRunStatus.COMPLETED assert result.run.output_asset_ids == ("asset-intent-structured",) assert repo.get_transformation_run(result.run.run_id).status == TransformationRunStatus.COMPLETED assert result.output_asset is not None assert result.output_asset.classification.sensitivity == Sensitivity.CONFIDENTIAL assert result.output_representation is not None assert result.output_representation.kind == RepresentationKind.DERIVED assert result.output_representation.media_type == "application/json" assert result.output_representation.metadata["transformation_run_id"] == result.run.run_id assert result.lineage is not None assert result.lineage.source_asset_ids == ("asset-intent",) assert result.lineage.source_version_ids == (source.version.version_id,) assert result.lineage.output_asset_id == "asset-intent-structured" assert result.lineage.policy_context["run_execute"]["effect"] == "allow" assert result.lineage.policy_context["source_reads"][0]["action"] == "asset.retrieve" assert repo.get_derived_lineage(result.lineage.lineage_id) == result.lineage assert repo.list_derived_lineage(source_asset_id="asset-intent") == [result.lineage] version = repo.list_versions("asset-intent-structured")[0] assert version.change_type == VersionChangeType.DERIVED_OUTPUT assert version.operation_id == result.run.run_id assert version.parent_version_id == source.version.version_id assert [record.key for record in repo.list_metadata_records("asset-intent-structured")] == [ "lineage", "transformation_run_id", ] assert [ event.operation for event in repo.list_audit_events(target=f"transformation_run:{result.run.run_id}") ] == [ "transformation.run.queued", "transformation.run.started", "transformation.run.completed", ] def test_sqlite_transformation_runs_and_lineage_survive_reinstantiation(tmp_path: Path) -> None: db_path = tmp_path / "registry.sqlite" repo = SQLiteAssetRegistryRepository(db_path) registry = AssetRegistryService(repo) context = operation_context() registry.create_asset( "Architecture", Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), context, asset_id="asset-architecture", representations=[ AssetRepresentation.from_content( "asset-architecture", RepresentationKind.SOURCE, "text/markdown", "# Architecture\n", ) ], ) service = TransformationService(repo, asset_service=registry) result = service.execute_transformation( TransformationRequest( operation_id="structured_view", source_asset_ids=("asset-architecture",), output_asset_id="asset-architecture-structured", ), context, ) assert result.run is not None assert result.lineage is not None reloaded = SQLiteAssetRegistryRepository(db_path) assert reloaded.get_transformation_run(result.run.run_id).status == TransformationRunStatus.COMPLETED assert reloaded.list_transformation_runs(operation_id="structured_view")[0].run_id == result.run.run_id assert reloaded.get_derived_lineage(result.lineage.lineage_id).output_asset_id == ( "asset-architecture-structured" ) assert reloaded.list_derived_lineage(output_asset_id="asset-architecture-structured")[0].lineage_id == ( result.lineage.lineage_id ) assert reloaded.list_representations(asset_id="asset-architecture-structured")[0].kind == ( RepresentationKind.DERIVED ) def test_adapter_backed_operation_returns_capability_diagnostic_without_reimplementation() -> None: repo = InMemoryAssetRegistryRepository() registry = AssetRegistryService(repo) context = operation_context() registry.create_asset( "Markdown Source", Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), context, asset_id="asset-markdown", ) service = TransformationService(repo, asset_service=registry) result = service.execute_transformation( TransformationRequest( operation_id="markdown_transform", source_asset_ids=("asset-markdown",), output_asset_id="asset-markdown-output", ), context, ) assert result.success is False assert result.run is not None assert result.run.status == TransformationRunStatus.FAILED assert result.diagnostics[0].code == "transformation.operation_not_executable" assert result.diagnostics[0].details["adapter_ref"] == "markitect-tool" assert repo.list_transformation_runs(status=TransformationRunStatus.FAILED)[0].run_id == result.run.run_id def test_unknown_operation_reports_supported_operations_without_creating_run() -> None: repo = InMemoryAssetRegistryRepository() service = TransformationService(repo) result = service.execute_transformation( TransformationRequest(operation_id="unknown_operation"), operation_context(), ) assert result.success is False assert result.run is None assert result.diagnostics[0].code == "transformation.operation_unsupported" assert "structured_view" in result.diagnostics[0].details["supported"] assert repo.list_transformation_runs() == [] def test_source_read_policy_denial_fails_run_before_handler_execution() -> None: repo = InMemoryAssetRegistryRepository() registry = AssetRegistryService(repo) context = operation_context() registry.create_asset( "Restricted Source", Classification(asset_type="document", sensitivity=Sensitivity.RESTRICTED), context, asset_id="asset-restricted", representations=[ AssetRepresentation.from_content( "asset-restricted", RepresentationKind.SOURCE, "text/markdown", "# Restricted\n", ) ], ) service = TransformationService( repo, asset_service=AssetRegistryService(repo, policy_gateway=DenySourceReadPolicy()), policy_gateway=DenySourceReadPolicy(), ) result = service.execute_transformation( TransformationRequest( operation_id="structured_view", source_asset_ids=("asset-restricted",), output_asset_id="asset-denied-output", ), context, ) assert result.success is False assert result.run is not None assert result.run.status == TransformationRunStatus.FAILED assert result.diagnostics[0].code == "transformation.permission_denied" assert result.policy_decision is not None assert result.policy_decision.resource == "asset:asset-restricted" assert repo.list_assets(asset_type="derived_artifact") == [] denied_event = repo.list_audit_events(target=f"transformation_run:{result.run.run_id}")[-1] assert denied_event.outcome == AuditOutcome.DENIED assert denied_event.operation == "transformation.run.execute" def operation_context() -> OperationContext: actor = Actor.create( ActorType.HUMAN, actor_id="user-test", display_name="Test User", groups=["engineering"], ) return OperationContext.create(actor, correlation_id="corr-test") class DenySourceReadPolicy: def authorize( self, context: OperationContext, action: str, resource: str, *, resource_metadata: dict[str, Any] | None = None, ) -> PolicyDecision: if action == "asset.retrieve": return PolicyDecision.deny( context.actor.id, action, resource, reason="source reads require review", context={"resource_metadata": resource_metadata or {}}, ) return PolicyDecision.allow( context.actor.id, action, resource, context={"resource_metadata": resource_metadata or {}}, )