from pathlib import Path import pytest from kontextual_engine import ( Actor, ActorType, AssetRegistryService, AssetRepresentation, AuthorizationError, Classification, ContextEntity, ContextEntityType, InMemoryAssetRegistryRepository, LifecycleState, MetadataRecord, OperationContext, PolicyDecision, RepresentationKind, Sensitivity, SourceReference, SQLiteAssetRegistryRepository, ValidationError, ) def test_asset_registry_service_creates_assets_with_versions_and_audit() -> None: repo = InMemoryAssetRegistryRepository() service = AssetRegistryService(repo) context = operation_context() source_ref = SourceReference( source_system="repo", path="docs/intent.md", checksum="sha256:source", ) representation = AssetRepresentation.from_content( "asset-intent", RepresentationKind.SOURCE, "text/markdown", "# Intent\n", storage_ref="object://intent-source", source_ref_id=source_ref.id, ) metadata = MetadataRecord( "topic", "architecture", provenance={"producer": "human"}, confirmed=True, ) result = service.create_asset( "Intent", Classification( asset_type="document", sensitivity=Sensitivity.INTERNAL, owner="Platform Knowledge", ), context, asset_id="asset-intent", source_refs=[source_ref], representations=[representation], metadata_records=[metadata], ) assert result.asset.id == "asset-intent" assert result.asset.current_version_id == result.version.version_id assert result.version.sequence == 1 assert result.audit_event.outcome.value == "success" assert result.policy_decision.allowed is True assert repo.get_asset("asset-intent").source_refs[0].path == "docs/intent.md" assert repo.list_representations(asset_id="asset-intent")[0].storage_ref == "object://intent-source" assert repo.list_metadata_records("asset-intent")[0].confirmed is True assert repo.list_audit_events(target="asset:asset-intent")[0].operation == "asset.create" def test_asset_registry_lifecycle_policy_denial_fails_closed_and_audits() -> None: repo = InMemoryAssetRegistryRepository() service = AssetRegistryService(repo, policy_gateway=DenyLifecyclePolicy()) context = operation_context() created = service.create_asset( "Governed Asset", Classification(asset_type="document", sensitivity=Sensitivity.CONFIDENTIAL), context, asset_id="asset-governed", ) with pytest.raises(AuthorizationError) as exc_info: service.transition_lifecycle(created.asset.id, LifecycleState.RETIRED, context) events = repo.list_audit_events(target="asset:asset-governed") assert exc_info.value.details["correlation_id"] == "corr-test" assert exc_info.value.details["policy_decision"]["effect"] == "fail_closed" assert [event.outcome.value for event in events] == ["success", "denied"] assert repo.get_asset("asset-governed").lifecycle == LifecycleState.ACTIVE def test_asset_registry_create_is_idempotent_for_same_key_and_payload() -> None: repo = InMemoryAssetRegistryRepository() service = AssetRegistryService(repo) context = operation_context() classification = Classification(asset_type="document", sensitivity=Sensitivity.PUBLIC) source_ref = SourceReference(source_system="repo", path="README.md") representation = AssetRepresentation.from_content( "asset-readme", RepresentationKind.SOURCE, "text/markdown", "# Readme\n", ) first = service.create_asset( "Readme", classification, context, asset_id="asset-readme", source_refs=[source_ref], representations=[representation], idempotency_key="create-readme", ) second = service.create_asset( "Readme", classification, context, asset_id="asset-readme", source_refs=[source_ref], representations=[representation], idempotency_key="create-readme", ) assert second.asset.id == first.asset.id assert second.version.version_id == first.version.version_id assert second.audit_event.event_id == first.audit_event.event_id assert len(repo.list_versions("asset-readme")) == 1 assert len(repo.list_audit_events(target="asset:asset-readme")) == 1 with pytest.raises(ValidationError, match="Idempotency key"): service.create_asset( "Readme renamed", classification, context, asset_id="asset-readme", source_refs=[source_ref], representations=[representation], idempotency_key="create-readme", ) def test_asset_registry_relationships_create_versions_and_audit() -> None: repo = InMemoryAssetRegistryRepository() service = AssetRegistryService(repo) context = operation_context() classification = Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL) source = service.create_asset("Source", classification, context, asset_id="asset-source") target = service.create_asset("Target", classification, context, asset_id="asset-target") result = service.link_asset_to_asset( source.asset.id, target.asset.id, "depends_on", context, confidence=0.91, provenance={"producer": "test"}, ) relationships = repo.list_relationships(source_id=source.asset.id) versions = repo.list_versions(source.asset.id) assert relationships == [result.relationship] assert result.relationship.target_id == target.asset.id assert result.relationship.confidence == 0.91 assert versions[-1].change_type.value == "relationship_changed" assert versions[-1].relationship_delta["added"]["predicate"] == "depends_on" assert repo.get_asset(source.asset.id).current_version_id == result.version.version_id assert repo.get_asset(target.asset.id).current_version_id == target.version.version_id assert repo.list_audit_events(target=f"asset:{source.asset.id}")[-1].operation == "asset.relationship.add" def test_sqlite_asset_registry_survives_reinstantiation(tmp_path: Path) -> None: db_path = tmp_path / "registry.sqlite" repo = SQLiteAssetRegistryRepository(db_path) service = AssetRegistryService(repo) context = operation_context() source_ref = SourceReference(source_system="repo", path="README.md", checksum="sha256:readme") source = AssetRepresentation.from_content( "asset-readme", RepresentationKind.SOURCE, "text/markdown", "# Readme\n", storage_ref="object://readme-source", ) created = service.create_asset( "Readme", Classification(asset_type="document", sensitivity=Sensitivity.PUBLIC), context, asset_id="asset-readme", source_refs=[source_ref], representations=[source], ) service.add_metadata_record( created.asset.id, MetadataRecord("owner", "Platform Knowledge", confirmed=True), context, ) service.request_delete(created.asset.id, context) reloaded = SQLiteAssetRegistryRepository(db_path) asset = reloaded.get_asset("asset-readme") assert asset.lifecycle == LifecycleState.DELETE_REQUESTED assert asset.source_refs[0].path == "README.md" assert [item.kind for item in reloaded.list_representations(asset_id=asset.id)] == [ RepresentationKind.SOURCE ] assert [item.key for item in reloaded.list_metadata_records(asset.id)] == ["owner"] assert [version.sequence for version in reloaded.list_versions(asset.id)] == [1, 2, 3] assert [event.operation for event in reloaded.list_audit_events(target="asset:asset-readme")] == [ "asset.create", "asset.metadata.add", "asset.lifecycle.transition", ] def test_sqlite_registry_persists_context_entities_relationships_and_idempotency(tmp_path: Path) -> None: db_path = tmp_path / "registry.sqlite" repo = SQLiteAssetRegistryRepository(db_path) service = AssetRegistryService(repo) context = operation_context() created = service.create_asset( "Knowledge Policy", Classification(asset_type="policy", sensitivity=Sensitivity.INTERNAL), context, asset_id="asset-policy", idempotency_key="create-policy", ) entity = ContextEntity( entity_type=ContextEntityType.PROJECT, name="Kontextual Engine", entity_id="entity-kontextual", ) linked = service.link_asset_to_context_entity( created.asset.id, entity, "about_project", context, ) reloaded = SQLiteAssetRegistryRepository(db_path) assert reloaded.get_idempotency_record("create-policy").result_refs["asset_id"] == "asset-policy" assert reloaded.list_context_entities()[0].entity_id == "entity-kontextual" assert reloaded.list_relationships(source_id="asset-policy")[0].relationship_id == linked.relationship.relationship_id assert reloaded.list_versions("asset-policy")[-1].relationship_delta["added"]["target_kind"] == "context_entity" assert [event.operation for event in reloaded.list_audit_events(target="asset:asset-policy")] == [ "asset.create", "asset.relationship.add", ] def test_sqlite_registry_enforces_representation_asset_reference(tmp_path: Path) -> None: repo = SQLiteAssetRegistryRepository(tmp_path / "registry.sqlite") representation = AssetRepresentation.from_content( "missing-asset", RepresentationKind.NORMALIZED, "text/plain", "normalized", ) with pytest.raises(ValidationError, match="unknown asset"): repo.save_representation(representation) def operation_context() -> OperationContext: actor = Actor.create( ActorType.HUMAN, actor_id="user-test", display_name="Test User", groups=["engineering"], ) return OperationContext.create(actor, correlation_id="corr-test") class DenyLifecyclePolicy: def authorize( self, context: OperationContext, action: str, resource: str, *, resource_metadata: dict[str, str] | None = None, ) -> PolicyDecision: if action == "asset.lifecycle.transition": return PolicyDecision.fail_closed( context.actor.id, action, resource, reason="lifecycle transitions require review", context={"resource_metadata": resource_metadata or {}}, ) return PolicyDecision.allow(context.actor.id, action, resource)