from __future__ import annotations import pytest from kontextual_engine import ( Actor, ActorType, AssetRegistryService, AuthorizationError, Classification, InMemoryAssetRegistryRepository, InMemoryBlobStorage, LocalBlobStorage, OperationContext, PolicyDecision, RepresentationContentService, RepresentationKind, Sensitivity, ValidationError, ) def test_content_service_adds_representation_bytes_with_deduplicated_blob() -> None: repo = InMemoryAssetRegistryRepository() blobs = InMemoryBlobStorage() context = operation_context() AssetRegistryService(repo).create_asset( "Content Asset", Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), context, asset_id="asset-content", ) service = RepresentationContentService(repo, blobs) first = service.add_representation_from_bytes( "asset-content", RepresentationKind.SOURCE, "text/plain", b"same bytes", context, ) second = service.add_representation_from_bytes( "asset-content", RepresentationKind.DERIVED, "text/plain", b"same bytes", context, ) representations = repo.list_representations(asset_id="asset-content") assert len(representations) == 2 assert representations[0].storage_ref == representations[1].storage_ref assert first.version.sequence == 2 assert second.version.sequence == 3 assert len(blobs.iter_blobs()) == 1 def test_content_service_reads_bytes_with_policy_and_audit() -> None: repo = InMemoryAssetRegistryRepository() blobs = InMemoryBlobStorage() context = operation_context() AssetRegistryService(repo).create_asset( "Readable", Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), context, asset_id="asset-readable", ) service = RepresentationContentService(repo, blobs) service.add_representation_from_bytes( "asset-readable", RepresentationKind.SOURCE, "text/plain", b"read me", context, ) content = service.get_content_stream("asset-readable", context) streamed = service.stream_content("asset-readable", context, chunk_size=3) assert content.content == b"read me" assert b"".join(streamed.chunks) == b"read me" assert content.representation.media_type == "text/plain" assert content.blob.storage_ref == content.representation.storage_ref assert content.audit_event.operation == "asset.content_stream.read" assert repo.list_audit_events(target="asset:asset-readable")[-1].operation == "asset.content_stream.read" def test_content_service_reads_source_normalized_and_derived_by_kind() -> None: repo = InMemoryAssetRegistryRepository() blobs = InMemoryBlobStorage() context = operation_context() AssetRegistryService(repo).create_asset( "Kinds", Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), context, asset_id="asset-kinds", ) service = RepresentationContentService(repo, blobs) service.add_representation_from_bytes("asset-kinds", RepresentationKind.SOURCE, "text/plain", b"source", context) service.add_representation_from_bytes( "asset-kinds", RepresentationKind.NORMALIZED, "text/plain", b"normalized", context, ) service.add_representation_from_bytes("asset-kinds", RepresentationKind.DERIVED, "text/plain", b"derived", context) assert service.get_content_stream("asset-kinds", context, kind=RepresentationKind.SOURCE).content == b"source" assert service.get_content_stream("asset-kinds", context, kind=RepresentationKind.NORMALIZED).content == b"normalized" assert service.get_content_stream("asset-kinds", context, kind=RepresentationKind.DERIVED).content == b"derived" def test_content_service_denies_bytes_before_exposure() -> None: repo = InMemoryAssetRegistryRepository() blobs = InMemoryBlobStorage() context = operation_context() AssetRegistryService(repo).create_asset( "Denied", Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), context, asset_id="asset-denied-stream", ) writer = RepresentationContentService(repo, blobs) writer.add_representation_from_bytes( "asset-denied-stream", RepresentationKind.SOURCE, "text/plain", b"secret", context, ) reader = RepresentationContentService(repo, blobs, policy_gateway=DenyContentPolicy()) with pytest.raises(AuthorizationError): reader.get_content_stream("asset-denied-stream", context) def test_content_service_cleanup_uses_repository_references() -> None: repo = InMemoryAssetRegistryRepository() blobs = InMemoryBlobStorage() context = operation_context() AssetRegistryService(repo).create_asset( "Cleanup", Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), context, asset_id="asset-cleanup", ) service = RepresentationContentService(repo, blobs) service.add_representation_from_bytes( "asset-cleanup", RepresentationKind.SOURCE, "text/plain", b"kept", context, ) orphan = blobs.put_bytes(b"orphan").blob cleanup = service.cleanup_unreferenced_blobs(dry_run=True) assert cleanup.deleted_count == 1 assert cleanup.deleted_storage_refs == (orphan.storage_ref,) assert cleanup.reclaimable_bytes == len(b"orphan") def test_content_service_detects_corrupted_stored_content(tmp_path) -> None: repo = InMemoryAssetRegistryRepository() blobs = LocalBlobStorage(tmp_path / "blobs") context = operation_context() AssetRegistryService(repo).create_asset( "Corrupt", Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), context, asset_id="asset-corrupt", ) service = RepresentationContentService(repo, blobs) service.add_representation_from_bytes( "asset-corrupt", RepresentationKind.SOURCE, "text/plain", b"expected", context, ) representation = repo.list_representations(asset_id="asset-corrupt")[0] path = blobs.root / representation.storage_ref.removeprefix("blob://local/") path.write_bytes(b"corrupted") with pytest.raises(ValidationError): service.get_content_stream("asset-corrupt", context) with pytest.raises(ValidationError): b"".join(service.stream_content("asset-corrupt", context).chunks) def operation_context() -> OperationContext: return OperationContext.create( Actor.create(ActorType.HUMAN, actor_id="content-test"), correlation_id="corr-content", ) class DenyContentPolicy: def authorize( self, context: OperationContext, action: str, resource: str, *, resource_metadata: dict[str, str] | None = None, ) -> PolicyDecision: if action == "asset.content_stream.read": return PolicyDecision.deny( context.actor.id, action, resource, reason="content reads disabled", ) return PolicyDecision.allow(context.actor.id, action, resource)