from pathlib import Path import pytest from kontextual_engine import ( Actor, ActorType, AssetRegistryService, AssetRepresentation, AssetVersion, AuthorizationError, AuditEvent, AuditOutcome, Classification, ContextEntity, ContextEntityType, InMemoryAssetRegistryRepository, IngestionJob, LifecycleState, MetadataFieldDefinition, MetadataRecord, MetadataSchema, MetadataSchemaAssignment, MetadataValueType, OperationContext, PolicyDecision, RepresentationKind, Sensitivity, SourceReference, SQLiteAssetRegistryRepository, ValidationError, VersionChangeType, ) def test_asset_registry_service_creates_assets_with_versions_and_audit() -> None: repo = InMemoryAssetRegistryRepository() service = AssetRegistryService(repo) context = operation_context() source_ref = SourceReference( source_system="repo", path="docs/intent.md", checksum="sha256:source", ) representation = AssetRepresentation.from_content( "asset-intent", RepresentationKind.SOURCE, "text/markdown", "# Intent\n", storage_ref="object://intent-source", source_ref_id=source_ref.id, ) metadata = MetadataRecord( "topic", "architecture", provenance={"producer": "human"}, confirmed=True, ) result = service.create_asset( "Intent", Classification( asset_type="document", sensitivity=Sensitivity.INTERNAL, owner="Platform Knowledge", ), context, asset_id="asset-intent", source_refs=[source_ref], representations=[representation], metadata_records=[metadata], ) assert result.asset.id == "asset-intent" assert result.asset.current_version_id == result.version.version_id assert result.version.sequence == 1 assert result.audit_event.outcome.value == "success" assert result.policy_decision.allowed is True assert repo.get_asset("asset-intent").source_refs[0].path == "docs/intent.md" assert repo.list_representations(asset_id="asset-intent")[0].storage_ref == "object://intent-source" assert repo.list_metadata_records("asset-intent")[0].confirmed is True assert repo.list_audit_events(target="asset:asset-intent")[0].operation == "asset.create" def test_asset_registry_lifecycle_policy_denial_fails_closed_and_audits() -> None: repo = InMemoryAssetRegistryRepository() service = AssetRegistryService(repo, policy_gateway=DenyLifecyclePolicy()) context = operation_context() created = service.create_asset( "Governed Asset", Classification(asset_type="document", sensitivity=Sensitivity.CONFIDENTIAL), context, asset_id="asset-governed", ) with pytest.raises(AuthorizationError) as exc_info: service.transition_lifecycle(created.asset.id, LifecycleState.RETIRED, context) events = repo.list_audit_events(target="asset:asset-governed") assert exc_info.value.details["correlation_id"] == "corr-test" assert exc_info.value.details["policy_decision"]["effect"] == "fail_closed" assert [event.outcome.value for event in events] == ["success", "denied"] assert repo.get_asset("asset-governed").lifecycle == LifecycleState.ACTIVE def test_asset_registry_create_is_idempotent_for_same_key_and_payload() -> None: repo = InMemoryAssetRegistryRepository() service = AssetRegistryService(repo) context = operation_context() classification = Classification(asset_type="document", sensitivity=Sensitivity.PUBLIC) source_ref = SourceReference(source_system="repo", path="README.md") representation = AssetRepresentation.from_content( "asset-readme", RepresentationKind.SOURCE, "text/markdown", "# Readme\n", ) first = service.create_asset( "Readme", classification, context, asset_id="asset-readme", source_refs=[source_ref], representations=[representation], idempotency_key="create-readme", ) second = service.create_asset( "Readme", classification, context, asset_id="asset-readme", source_refs=[source_ref], representations=[representation], idempotency_key="create-readme", ) assert second.asset.id == first.asset.id assert second.version.version_id == first.version.version_id assert second.audit_event.event_id == first.audit_event.event_id assert len(repo.list_versions("asset-readme")) == 1 assert len(repo.list_audit_events(target="asset:asset-readme")) == 1 with pytest.raises(ValidationError, match="Idempotency key"): service.create_asset( "Readme renamed", classification, context, asset_id="asset-readme", source_refs=[source_ref], representations=[representation], idempotency_key="create-readme", ) def test_asset_registry_rejects_stale_expected_current_version_for_mutations() -> None: repo = InMemoryAssetRegistryRepository() service = AssetRegistryService(repo) context = operation_context() created = service.create_asset( "Conflict Guard", Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), context, asset_id="asset-conflict", ) updated = service.add_metadata_record( created.asset.id, MetadataRecord("owner", "Platform Knowledge", confirmed=True), context, expected_current_version_id=created.version.version_id, ) with pytest.raises(ValidationError) as exc_info: service.transition_lifecycle( created.asset.id, LifecycleState.RETIRED, context, expected_current_version_id=created.version.version_id, ) assert exc_info.value.details["code"] == "asset.version_conflict" assert exc_info.value.details["expected_current_version_id"] == created.version.version_id assert exc_info.value.details["current_version_id"] == updated.version.version_id assert repo.get_asset(created.asset.id).lifecycle == LifecycleState.ACTIVE assert [version.sequence for version in repo.list_versions(created.asset.id)] == [1, 2] def test_asset_registry_restore_creates_new_version_without_erasing_history() -> None: repo = InMemoryAssetRegistryRepository() service = AssetRegistryService(repo) context = operation_context() created = service.create_asset( "Restorable Asset", Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), context, asset_id="asset-restorable", ) retired = service.transition_lifecycle( created.asset.id, LifecycleState.RETIRED, context, expected_current_version_id=created.version.version_id, ) restored = service.restore_asset_version( created.asset.id, created.version.version_id, context, expected_current_version_id=retired.version.version_id, ) versions = repo.list_versions(created.asset.id) assert restored.version.change_type.value == "restored" assert restored.version.parent_version_id == retired.version.version_id assert restored.version.metadata_delta["restored_from_version_id"] == created.version.version_id assert restored.version.metadata_delta["restored_from_sequence"] == 1 assert restored.asset.lifecycle == LifecycleState.ACTIVE assert repo.get_asset(created.asset.id).current_version_id == restored.version.version_id assert [version.sequence for version in versions] == [1, 2, 3] assert [version.change_type.value for version in versions] == [ "created", "lifecycle_changed", "restored", ] assert [event.operation for event in repo.list_audit_events(target="asset:asset-restorable")] == [ "asset.create", "asset.lifecycle.transition", "asset.version.restore", ] def test_asset_registry_supersede_creates_relationship_version_and_audit() -> None: repo = InMemoryAssetRegistryRepository() service = AssetRegistryService(repo) context = operation_context() source = service.create_asset( "Old Guide", Classification(asset_type="guide", sensitivity=Sensitivity.INTERNAL), context, asset_id="asset-old-guide", ) successor = service.create_asset( "New Guide", Classification(asset_type="guide", sensitivity=Sensitivity.INTERNAL), context, asset_id="asset-new-guide", ) superseded = service.supersede_asset( source.asset.id, successor.asset.id, context, reason="New canonical guide", expected_current_version_id=source.version.version_id, ) source_asset = repo.get_asset(source.asset.id) relationships = repo.list_relationships(source_id=source.asset.id) assert superseded.version.change_type.value == "superseded" assert superseded.relationship.predicate == "superseded_by" assert superseded.relationship.target_id == successor.asset.id assert relationships == [superseded.relationship] assert source_asset.lifecycle == LifecycleState.RETIRED assert source_asset.metadata["superseded_by"] == successor.asset.id assert source_asset.metadata["supersession_reason"] == "New canonical guide" assert superseded.version.relationship_delta["added"]["relationship_id"] == superseded.relationship.relationship_id assert superseded.version.metadata_delta["superseded_by"] == successor.asset.id assert repo.get_asset(successor.asset.id).current_version_id == successor.version.version_id assert repo.list_audit_events(target="asset:asset-old-guide")[-1].operation == "asset.supersede" def test_asset_registry_relationships_create_versions_and_audit() -> None: repo = InMemoryAssetRegistryRepository() service = AssetRegistryService(repo) context = operation_context() classification = Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL) source = service.create_asset("Source", classification, context, asset_id="asset-source") target = service.create_asset("Target", classification, context, asset_id="asset-target") result = service.link_asset_to_asset( source.asset.id, target.asset.id, "depends_on", context, confidence=0.91, provenance={"producer": "test"}, ) relationships = repo.list_relationships(source_id=source.asset.id) versions = repo.list_versions(source.asset.id) assert relationships == [result.relationship] assert result.relationship.target_id == target.asset.id assert result.relationship.confidence == 0.91 assert versions[-1].change_type.value == "relationship_changed" assert versions[-1].relationship_delta["added"]["predicate"] == "depends_on" assert repo.get_asset(source.asset.id).current_version_id == result.version.version_id assert repo.get_asset(target.asset.id).current_version_id == target.version.version_id assert repo.list_audit_events(target=f"asset:{source.asset.id}")[-1].operation == "asset.relationship.add" def test_asset_registry_validates_metadata_schema_before_writes() -> None: repo = InMemoryAssetRegistryRepository() schema = MetadataSchema( schema_id="schema-note-v1", name="Note Metadata", asset_types=("note",), allow_unknown=False, fields=( MetadataFieldDefinition("owner", MetadataValueType.STRING, required=True, require_confirmed=True), MetadataFieldDefinition("priority", MetadataValueType.INTEGER, min_value=1, max_value=5), ), ) service = AssetRegistryService(repo, metadata_schemas=[schema]) context = operation_context() with pytest.raises(ValidationError) as exc_info: service.create_asset( "Note", Classification(asset_type="note", sensitivity=Sensitivity.INTERNAL), context, asset_id="asset-invalid-note", metadata_records=[MetadataRecord("owner", "Platform Knowledge", confirmed=False)], ) assert {issue["code"] for issue in exc_info.value.details["issues"]} == { "metadata.confirmation_required" } assert repo.list_assets() == [] created = service.create_asset( "Note", Classification(asset_type="note", sensitivity=Sensitivity.INTERNAL), context, asset_id="asset-note", metadata_records=[MetadataRecord("owner", "Platform Knowledge", confirmed=True)], ) with pytest.raises(ValidationError) as update_exc: service.add_metadata_record(created.asset.id, MetadataRecord("priority", 9), context) assert {issue["code"] for issue in update_exc.value.details["issues"]} == { "metadata.value_too_large" } assert [record.key for record in repo.list_metadata_records(created.asset.id)] == ["owner"] def test_asset_registry_metadata_batch_reports_partial_failures() -> None: repo = InMemoryAssetRegistryRepository() schema = MetadataSchema( schema_id="schema-batch-note-v1", name="Batch Note Metadata", asset_types=("batch-note",), allow_unknown=False, fields=( MetadataFieldDefinition("owner", MetadataValueType.STRING, required=True, require_confirmed=True), MetadataFieldDefinition("priority", MetadataValueType.INTEGER, allow_multiple=True, min_value=1, max_value=5), ), ) service = AssetRegistryService(repo, metadata_schemas=[schema]) context = operation_context() created = service.create_asset( "Batch Note", Classification(asset_type="batch-note", sensitivity=Sensitivity.INTERNAL), context, asset_id="asset-batch-note", metadata_records=[MetadataRecord("owner", "Platform Knowledge", confirmed=True)], ) result = service.add_metadata_records_batch( created.asset.id, ( MetadataRecord("priority", 3, record_id="meta-priority-ok"), MetadataRecord("priority", 9, record_id="meta-priority-too-large"), MetadataRecord("phase", "beta", record_id="meta-phase-unknown"), ), context, expected_current_version_id=created.version.version_id, ) assert result.total == 3 assert result.succeeded == 1 assert result.failed == 2 assert result.partial is True assert result.outcome == "partial" assert [item.success for item in result.items] == [True, False, False] assert result.items[0].result_ref["record_id"] == "meta-priority-ok" assert result.items[1].error is not None assert result.items[1].error.code == "kontextual.validation" assert result.items[1].error.correlation_id == "corr-test" assert "metadata schema" in result.items[1].error.remediation assert {issue["code"] for issue in result.items[1].error.details["issues"]} == { "metadata.value_too_large" } assert result.items[2].error is not None assert {issue["code"] for issue in result.items[2].error.details["issues"]} == { "metadata.unknown_field" } assert result.to_dict()["audit_event_id"] == result.audit_event_id metadata_records = repo.list_metadata_records(created.asset.id) versions = repo.list_versions(created.asset.id) events = repo.list_audit_events(target=f"asset:{created.asset.id}") assert [record.key for record in metadata_records] == ["owner", "priority"] assert metadata_records[1].record_id == "meta-priority-ok" assert [version.sequence for version in versions] == [1, 2] assert versions[-1].metadata_delta == {"priority": 3} assert [event.operation for event in events] == [ "asset.create", "asset.metadata.add", "asset.metadata.batch_add", ] assert events[-1].outcome.value == "partial" assert events[-1].correlation_id == "corr-test" assert events[-1].details["total"] == 3 assert events[-1].details["succeeded"] == 1 assert events[-1].details["failed"] == 2 assert events[-1].details["failed_item_ids"] == [ "meta-priority-too-large", "meta-phase-unknown", ] def test_asset_registry_metadata_batch_rejects_stale_expected_version_before_writes() -> None: repo = InMemoryAssetRegistryRepository() service = AssetRegistryService(repo) context = operation_context() created = service.create_asset( "Batch Conflict", Classification(asset_type="note", sensitivity=Sensitivity.INTERNAL), context, asset_id="asset-batch-conflict", ) service.add_metadata_record( created.asset.id, MetadataRecord("owner", "Platform Knowledge", confirmed=True), context, expected_current_version_id=created.version.version_id, ) with pytest.raises(ValidationError) as exc_info: service.add_metadata_records_batch( created.asset.id, (MetadataRecord("topic", "architecture"),), context, expected_current_version_id=created.version.version_id, ) assert exc_info.value.details["code"] == "asset.version_conflict" assert exc_info.value.details["operation"] == "asset.metadata.batch_add" assert [record.key for record in repo.list_metadata_records(created.asset.id)] == ["owner"] assert [event.operation for event in repo.list_audit_events(target=f"asset:{created.asset.id}")] == [ "asset.create", "asset.metadata.add", ] def test_asset_registry_applies_persisted_metadata_schema_assignments() -> None: repo = InMemoryAssetRegistryRepository() service = AssetRegistryService(repo) context = operation_context() schema = MetadataSchema( schema_id="schema-policy-note-v1", name="Policy Note Metadata", allow_unknown=False, fields=( MetadataFieldDefinition("owner", MetadataValueType.STRING, required=True, require_confirmed=True), MetadataFieldDefinition("state", MetadataValueType.STRING, allowed_values=("draft", "approved")), ), ) assignment = MetadataSchemaAssignment( assignment_id="assignment-policy-note", schema_id=schema.schema_id, asset_types=("policy-note",), sensitivities=(Sensitivity.INTERNAL,), policy_ref="local://metadata-policy/policy-note", ) service.register_metadata_schema(schema, context) service.assign_metadata_schema(assignment, context) with pytest.raises(ValidationError) as exc_info: service.create_asset( "Policy Note", Classification(asset_type="policy-note", sensitivity=Sensitivity.INTERNAL), context, asset_id="asset-policy-note-invalid", metadata_records=[MetadataRecord("state", "published")], ) assert {issue["code"] for issue in exc_info.value.details["issues"]} == { "metadata.required_missing", "metadata.value_not_allowed", } assert repo.list_assets() == [] created = service.create_asset( "Policy Note", Classification(asset_type="policy-note", sensitivity=Sensitivity.INTERNAL), context, asset_id="asset-policy-note", metadata_records=[ MetadataRecord("owner", "Platform Knowledge", confirmed=True), MetadataRecord("state", "approved", confirmed=True), ], ) assert created.asset.id == "asset-policy-note" assert service.list_metadata_schema_assignments()[0].policy_ref == "local://metadata-policy/policy-note" def test_asset_registry_filters_assets_by_standard_metadata_and_records() -> None: repo = InMemoryAssetRegistryRepository() service = AssetRegistryService(repo) context = operation_context() service.create_asset( "Architecture ADR", Classification( asset_type="document", sensitivity=Sensitivity.PUBLIC, topics=("architecture", "adr"), owner="Platform Knowledge", review_state="approved", ), context, asset_id="asset-adr", metadata_records=[ MetadataRecord("status", "accepted", confirmed=True), MetadataRecord("tags", ["governance", "markdown"], confirmed=True), ], ) service.create_asset( "Internal Risk Note", Classification( asset_type="note", sensitivity=Sensitivity.CONFIDENTIAL, topics=("risk",), owner="Security", review_state="draft", ), context, asset_id="asset-risk", metadata_records=[MetadataRecord("status", "accepted", confirmed=False)], ) service.create_asset( "Architecture Brief", Classification( asset_type="document", sensitivity=Sensitivity.INTERNAL, topics=("architecture",), owner="Platform Knowledge", review_state="draft", ), context, asset_id="asset-brief", metadata_records=[MetadataRecord("status", "draft", confirmed=True)], ) assert [asset.id for asset in service.list_assets(sensitivity=Sensitivity.PUBLIC)] == ["asset-adr"] assert [asset.id for asset in service.list_assets(owner="Security")] == ["asset-risk"] assert [asset.id for asset in service.list_assets(topic="architecture")] == [ "asset-adr", "asset-brief", ] assert [asset.id for asset in service.list_assets(review_state="draft")] == [ "asset-brief", "asset-risk", ] assert [asset.id for asset in service.list_assets(metadata_filters={"status": "accepted"})] == [ "asset-adr", "asset-risk", ] assert [ asset.id for asset in service.list_assets( metadata_filters={"status": "accepted"}, confirmed_metadata_only=True, ) ] == ["asset-adr"] assert [asset.id for asset in service.list_assets(metadata_filters={"tags": "markdown"})] == [ "asset-adr" ] def test_sqlite_asset_registry_survives_reinstantiation(tmp_path: Path) -> None: db_path = tmp_path / "registry.sqlite" repo = SQLiteAssetRegistryRepository(db_path) service = AssetRegistryService(repo) context = operation_context() source_ref = SourceReference(source_system="repo", path="README.md", checksum="sha256:readme") source = AssetRepresentation.from_content( "asset-readme", RepresentationKind.SOURCE, "text/markdown", "# Readme\n", storage_ref="object://readme-source", ) created = service.create_asset( "Readme", Classification(asset_type="document", sensitivity=Sensitivity.PUBLIC), context, asset_id="asset-readme", source_refs=[source_ref], representations=[source], ) service.add_metadata_record( created.asset.id, MetadataRecord("owner", "Platform Knowledge", confirmed=True), context, ) service.request_delete(created.asset.id, context) reloaded = SQLiteAssetRegistryRepository(db_path) asset = reloaded.get_asset("asset-readme") assert asset.lifecycle == LifecycleState.DELETE_REQUESTED assert asset.source_refs[0].path == "README.md" assert [item.kind for item in reloaded.list_representations(asset_id=asset.id)] == [ RepresentationKind.SOURCE ] assert [item.key for item in reloaded.list_metadata_records(asset.id)] == ["owner"] assert [version.sequence for version in reloaded.list_versions(asset.id)] == [1, 2, 3] assert [event.operation for event in reloaded.list_audit_events(target="asset:asset-readme")] == [ "asset.create", "asset.metadata.add", "asset.lifecycle.transition", ] def test_sqlite_registry_persists_context_entities_relationships_and_idempotency(tmp_path: Path) -> None: db_path = tmp_path / "registry.sqlite" repo = SQLiteAssetRegistryRepository(db_path) service = AssetRegistryService(repo) context = operation_context() created = service.create_asset( "Knowledge Policy", Classification(asset_type="policy", sensitivity=Sensitivity.INTERNAL), context, asset_id="asset-policy", idempotency_key="create-policy", ) entity = ContextEntity( entity_type=ContextEntityType.PROJECT, name="Kontextual Engine", entity_id="entity-kontextual", ) linked = service.link_asset_to_context_entity( created.asset.id, entity, "about_project", context, ) reloaded = SQLiteAssetRegistryRepository(db_path) assert reloaded.get_idempotency_record("create-policy").result_refs["asset_id"] == "asset-policy" assert reloaded.list_context_entities()[0].entity_id == "entity-kontextual" assert reloaded.list_relationships(source_id="asset-policy")[0].relationship_id == linked.relationship.relationship_id assert reloaded.list_versions("asset-policy")[-1].relationship_delta["added"]["target_kind"] == "context_entity" assert [event.operation for event in reloaded.list_audit_events(target="asset:asset-policy")] == [ "asset.create", "asset.relationship.add", ] def test_sqlite_registry_persists_metadata_schemas_and_assignments(tmp_path: Path) -> None: db_path = tmp_path / "registry.sqlite" repo = SQLiteAssetRegistryRepository(db_path) service = AssetRegistryService(repo) context = operation_context() schema = MetadataSchema( schema_id="schema-review-v1", name="Review Metadata", allow_unknown=False, fields=( MetadataFieldDefinition("reviewer", MetadataValueType.STRING, required=True, require_confirmed=True), MetadataFieldDefinition("score", MetadataValueType.NUMBER, min_value=0, max_value=1), ), ) service.register_metadata_schema(schema, context) service.assign_metadata_schema( MetadataSchemaAssignment( assignment_id="assignment-review-documents", schema_id=schema.schema_id, asset_types=("review",), ), context, ) reloaded_service = AssetRegistryService(SQLiteAssetRegistryRepository(db_path)) with pytest.raises(ValidationError) as exc_info: reloaded_service.create_asset( "Review", Classification(asset_type="review", sensitivity=Sensitivity.INTERNAL), context, asset_id="asset-review-invalid", metadata_records=[ MetadataRecord("reviewer", "Ada", confirmed=False), MetadataRecord("score", 1.7), ], ) assert {issue["code"] for issue in exc_info.value.details["issues"]} == { "metadata.confirmation_required", "metadata.value_too_large", } created = reloaded_service.create_asset( "Review", Classification(asset_type="review", sensitivity=Sensitivity.INTERNAL), context, asset_id="asset-review", metadata_records=[ MetadataRecord("reviewer", "Ada", confirmed=True), MetadataRecord("score", 0.92), ], ) reloaded_repo = SQLiteAssetRegistryRepository(db_path) assert created.asset.id == "asset-review" assert reloaded_repo.get_metadata_schema("schema-review-v1").name == "Review Metadata" assert reloaded_repo.get_metadata_schema_assignment("assignment-review-documents").schema_id == "schema-review-v1" def test_sqlite_registry_filters_assets_after_reload(tmp_path: Path) -> None: db_path = tmp_path / "registry.sqlite" repo = SQLiteAssetRegistryRepository(db_path) service = AssetRegistryService(repo) context = operation_context() service.create_asset( "Public Guide", Classification( asset_type="guide", sensitivity=Sensitivity.PUBLIC, topics=("markdown", "proxy"), owner="Docs", review_state="approved", ), context, asset_id="asset-guide", metadata_records=[MetadataRecord("channel", "public", confirmed=True)], ) service.create_asset( "Internal Guide", Classification( asset_type="guide", sensitivity=Sensitivity.INTERNAL, topics=("markdown",), owner="Docs", review_state="draft", ), context, asset_id="asset-internal-guide", metadata_records=[MetadataRecord("channel", "public", confirmed=False)], ) reloaded = AssetRegistryService(SQLiteAssetRegistryRepository(db_path)) assert [asset.id for asset in reloaded.list_assets(asset_type="guide", owner="Docs")] == [ "asset-internal-guide", "asset-guide", ] assert [asset.id for asset in reloaded.list_assets(topic="proxy")] == ["asset-guide"] assert [ asset.id for asset in reloaded.list_assets( metadata_filters={"channel": "public"}, confirmed_metadata_only=True, ) ] == ["asset-guide"] def test_sqlite_registry_persists_metadata_batch_partial_audit_after_reload(tmp_path: Path) -> None: db_path = tmp_path / "registry.sqlite" repo = SQLiteAssetRegistryRepository(db_path) schema = MetadataSchema( schema_id="schema-batch-ticket-v1", name="Batch Ticket Metadata", asset_types=("ticket",), allow_unknown=False, fields=( MetadataFieldDefinition("owner", MetadataValueType.STRING, required=True, require_confirmed=True), MetadataFieldDefinition("severity", MetadataValueType.INTEGER, allow_multiple=True, min_value=1, max_value=5), ), ) service = AssetRegistryService(repo, metadata_schemas=[schema]) context = operation_context() created = service.create_asset( "Batch Ticket", Classification(asset_type="ticket", sensitivity=Sensitivity.INTERNAL), context, asset_id="asset-batch-ticket", metadata_records=[MetadataRecord("owner", "Operations", confirmed=True)], ) result = service.add_metadata_records_batch( created.asset.id, ( MetadataRecord("severity", 4, record_id="meta-severity-ok"), MetadataRecord("severity", 9, record_id="meta-severity-too-large"), ), context, expected_current_version_id=created.version.version_id, ) reloaded = SQLiteAssetRegistryRepository(db_path) assert result.outcome == "partial" assert result.audit_event_id is not None assert [record.key for record in reloaded.list_metadata_records(created.asset.id)] == [ "owner", "severity", ] assert reloaded.list_metadata_records(created.asset.id)[1].record_id == "meta-severity-ok" assert [version.sequence for version in reloaded.list_versions(created.asset.id)] == [1, 2] assert reloaded.get_audit_event(result.audit_event_id).outcome == AuditOutcome.PARTIAL assert reloaded.get_audit_event(result.audit_event_id).details["failed_item_ids"] == [ "meta-severity-too-large" ] def test_sqlite_registry_enforces_representation_asset_reference(tmp_path: Path) -> None: repo = SQLiteAssetRegistryRepository(tmp_path / "registry.sqlite") representation = AssetRepresentation.from_content( "missing-asset", RepresentationKind.NORMALIZED, "text/plain", "normalized", ) with pytest.raises(ValidationError, match="unknown asset"): repo.save_representation(representation) def test_sqlite_registry_enforces_durable_reference_integrity(tmp_path: Path) -> None: repo = SQLiteAssetRegistryRepository(tmp_path / "registry.sqlite") with pytest.raises(ValidationError, match="unknown asset"): repo.save_version( AssetVersion( asset_id="asset-missing", sequence=1, change_type=VersionChangeType.CREATED, ) ) with pytest.raises(ValidationError, match="unknown actor"): repo.save_audit_event( AuditEvent( operation="asset.create", target="asset:asset-missing", outcome=AuditOutcome.SUCCESS, actor_id="actor-missing", correlation_id="corr-missing", ) ) with pytest.raises(ValidationError, match="unknown actor"): repo.save_ingestion_job( IngestionJob.create( input={"connector": "local_file", "source_uri": "missing.txt"}, actor_id="actor-missing", correlation_id="corr-missing", ) ) def operation_context() -> OperationContext: actor = Actor.create( ActorType.HUMAN, actor_id="user-test", display_name="Test User", groups=["engineering"], ) return OperationContext.create(actor, correlation_id="corr-test") class DenyLifecyclePolicy: def authorize( self, context: OperationContext, action: str, resource: str, *, resource_metadata: dict[str, str] | None = None, ) -> PolicyDecision: if action == "asset.lifecycle.transition": return PolicyDecision.fail_closed( context.actor.id, action, resource, reason="lifecycle transitions require review", context={"resource_metadata": resource_metadata or {}}, ) return PolicyDecision.allow(context.actor.id, action, resource)