diff --git a/docs/architecture-blueprint.md b/docs/architecture-blueprint.md index 0da6492..ac9439a 100644 --- a/docs/architecture-blueprint.md +++ b/docs/architecture-blueprint.md @@ -190,7 +190,7 @@ Required MVP ports: - Repository port for assets, representations, metadata, relationships, versions, runs, audit events, and exports. -- Object/content store port for source, normalized, and derived content payloads. +- Blob/content store port for source, normalized, and derived content payloads. - Search index port for lexical search and later semantic/hybrid retrieval. - Extractor port for format-specific normalization. - Connector port for source systems. @@ -211,6 +211,10 @@ Adapter rules: Markitect where useful, but they are not the canonical engine identity or storage model. The canonical layer remains asset, representation, metadata, lifecycle, policy, lineage, and audit state. +- Blob storage is infrastructure behind `AssetRepresentation.storage_ref`. + Whole-object content addressing, digest verification, and chunked byte + streaming belong behind the blob port. Local filesystem and S3 are adapters, + not different domain models. - `llm-connect` or equivalent is an adapter for LLM providers. - `phase-memory` is an adjacent memory runtime; this engine may exchange opaque memory references or context packages but should not implement memory phases. @@ -251,6 +255,9 @@ Recommended storage style: adapter-specific payloads. - Separate content/object references for large source, normalized, or derived payloads. +- Store blob bytes outside repository rows when content is non-trivial. Keep + representation digest, size, media type, kind, producer, and storage ref in + the repository, and let blob adapters handle byte persistence and dedupe. - Append-only audit events and change records. - Deterministic ordering fields for pagination and tests. diff --git a/docs/blob-storage-content-streaming-workplan.md b/docs/blob-storage-content-streaming-workplan.md index 6e9db7f..d7a9391 100644 --- a/docs/blob-storage-content-streaming-workplan.md +++ b/docs/blob-storage-content-streaming-workplan.md @@ -2,7 +2,7 @@ Date: 2026-05-07 -Status: planned. +Status: implemented. ## Purpose @@ -11,23 +11,25 @@ normalized, and derived representations can reference real content bytes without duplicating storage. Expose those bytes through engine-native interfaces and CMIS content stream routes. -## Current State +## Implemented State -The engine already records representation metadata: +The engine records representation metadata: - digest, - size, - media type, - representation kind, -- opaque `storage_ref`. +- `storage_ref`. -It does not yet provide: +It now provides: -- a content-addressed blob store, -- deduplicating writes, -- blob read/stream interfaces, -- reference accounting or garbage collection, -- CMIS byte-stream download semantics. +- a content-addressed blob storage port, +- in-memory, local filesystem, and optional S3 adapters, +- deduplicating writes by `sha256:` digest, +- whole-byte reads plus chunked `iter_bytes(...)` streaming, +- representation-level content service governance, +- reference accounting and dry-run/active cleanup, +- CMIS Browser Binding content stream byte routes. ## Target Architecture @@ -35,7 +37,7 @@ It does not yet provide: bytes -> digest/size verification -> BlobStoragePort - -> content-addressed adapter + -> content-addressed adapter (memory/local/S3) -> AssetRepresentation storage_ref -> governed representation service -> service API / CMIS content stream @@ -60,29 +62,52 @@ justifies the complexity. ## Interfaces -Planned engine-native interfaces: +Engine-native interfaces: - `BlobStoragePort.put_bytes(...)` -- `BlobStoragePort.open_bytes(...)` +- `BlobStoragePort.read_bytes(...)` +- `BlobStoragePort.iter_bytes(...)` - `BlobStoragePort.stat(...)` - `BlobStoragePort.exists(...)` - `BlobStoragePort.delete_unreferenced(...)` - `RepresentationContentService.add_representation_from_bytes(...)` - `RepresentationContentService.get_content_stream(...)` +- `RepresentationContentService.stream_content(...)` -Planned CMIS integration: +CMIS integration: - `getContentStream` returns actual bytes/stream with content headers, - `setContentStream` stores through deduplicating representation service, - content stream changes produce versions and audit events, - descriptors remain available for clients that only need metadata. +## Storage Backends + +- `InMemoryBlobStorage` supports deterministic unit tests and default runtime + wiring. +- `LocalBlobStorage` stores content under digest-derived paths and uses atomic + temporary writes. +- `S3BlobStorage` is available through the optional `kontextual-engine[s3]` + extra and keeps S3 concerns behind the same blob port. It uses digest-derived + object keys and streams object bodies in chunks. + +The engine stores only the returned `storage_ref` on representations. Backend +selection is therefore a deployment concern, not a domain-model fork. + +## Migration Posture + +Existing opaque `storage_ref` values remain valid metadata, but content bytes +can only be streamed when the configured blob adapter can resolve the reference. +Migration should import external content through +`RepresentationContentService.add_representation_from_bytes(...)` so dedupe, +digest verification, policy, versions, and audit events are preserved. + ## Risks -- Large files may require streaming APIs rather than in-memory bytes. +- Very large files may require upload-side streaming beyond the current + byte-based write API. - Local filesystem adapters need atomic writes and digest verification. - Garbage collection must never delete referenced blobs. - Security must treat blob bytes as governed content, not public storage. - Existing `storage_ref` values may point to external sources and should remain valid as opaque references. - diff --git a/pyproject.toml b/pyproject.toml index 4fe2d92..3ecd589 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,9 @@ service = [ storage = [ "sqlalchemy>=2.0", ] +s3 = [ + "boto3>=1.34", +] markdown = [ "markitect-tool @ file:///home/worsch/markitect-tool", ] diff --git a/src/kontextual_engine/__init__.py b/src/kontextual_engine/__init__.py index 21197ab..620bc22 100644 --- a/src/kontextual_engine/__init__.py +++ b/src/kontextual_engine/__init__.py @@ -12,6 +12,9 @@ from .artifacts import ( content_digest, ) from .adapters.memory import InMemoryAssetRegistryRepository +from .adapters.memory import InMemoryBlobStorage +from .adapters.local_files import LocalBlobStorage +from .adapters.s3 import S3BlobStorage from .adapters.sqlite import SQLiteAssetRegistryRepository from .api import ServiceRuntime, create_app from .context import ContextAssembler, ContextItem, ContextPackage @@ -98,6 +101,10 @@ from .ingestion import IngestionRequest, IngestionResult, IngestionService from .ports import ( AllowAllPolicyGateway, AssetRegistryRepository, + BlobCleanupResult, + BlobRef, + BlobStorage, + BlobWriteResult, DirectorySourceConnector, FormatExtractor, PolicyGateway, @@ -122,6 +129,9 @@ from .services import ( RelationshipQueryItem, RelationshipQueryRequest, RelationshipQueryResult, + RepresentationContentResult, + RepresentationContentStream, + RepresentationContentService, RetrievalFeedbackRequest, RetrievalFeedbackResult, RetrievalQualityMetrics, @@ -168,6 +178,10 @@ __all__ = [ "AssetRegistryRepository", "AssetRegistryService", "AssetRetrievalService", + "BlobCleanupResult", + "BlobRef", + "BlobStorage", + "BlobWriteResult", "AssetVersion", "AuditEvent", "AuditOutcome", @@ -202,6 +216,7 @@ __all__ = [ "ExtractorCapability", "FormatExtractor", "InMemoryAssetRegistryRepository", + "InMemoryBlobStorage", "InMemoryKnowledgeRepository", "IngestionRequest", "IngestionResult", @@ -217,6 +232,7 @@ __all__ = [ "KontextualError", "LexicalIndexRefreshResult", "LifecycleState", + "LocalBlobStorage", "MetadataFieldDefinition", "MetadataRecord", "MetadataSchema", @@ -240,6 +256,9 @@ __all__ = [ "RelationshipQueryItem", "RelationshipQueryRequest", "RelationshipQueryResult", + "RepresentationContentResult", + "RepresentationContentStream", + "RepresentationContentService", "RelationshipTargetKind", "RelationshipType", "RepresentationKind", @@ -256,6 +275,7 @@ __all__ = [ "SourceReference", "SourceConnector", "SourcePayload", + "S3BlobStorage", "SQLiteAssetRegistryRepository", "TransformationExecutionContext", "TransformationOperation", diff --git a/src/kontextual_engine/adapters/local_files/__init__.py b/src/kontextual_engine/adapters/local_files/__init__.py index cc87846..0dc8e45 100644 --- a/src/kontextual_engine/adapters/local_files/__init__.py +++ b/src/kontextual_engine/adapters/local_files/__init__.py @@ -1,5 +1,6 @@ """Local filesystem ingestion connector.""" +from .blob_storage import LocalBlobStorage from .connector import LocalFileConnector -__all__ = ["LocalFileConnector"] +__all__ = ["LocalBlobStorage", "LocalFileConnector"] diff --git a/src/kontextual_engine/adapters/local_files/blob_storage.py b/src/kontextual_engine/adapters/local_files/blob_storage.py new file mode 100644 index 0000000..a12846e --- /dev/null +++ b/src/kontextual_engine/adapters/local_files/blob_storage.py @@ -0,0 +1,141 @@ +"""Local filesystem content-addressed blob storage.""" + +from __future__ import annotations + +from collections.abc import Iterator +from pathlib import Path + +from kontextual_engine.core import new_id +from kontextual_engine.errors import NotFoundError, ValidationError +from kontextual_engine.ports import BlobCleanupResult, BlobRef, BlobWriteResult, blob_digest, digest_storage_key + + +class LocalBlobStorage: + adapter_name = "local" + + def __init__(self, root: str | Path) -> None: + self.root = Path(root).expanduser().resolve() + + def put_bytes(self, content: bytes, *, media_type: str | None = None) -> BlobWriteResult: + digest = blob_digest(content) + storage_key = digest_storage_key(digest) + path = self._path(storage_key) + storage_ref = self._storage_ref(storage_key) + created = not path.exists() + if created: + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.parent / f".{path.name}.{new_id('tmp')}" + tmp.write_bytes(content) + actual = blob_digest(tmp.read_bytes()) + if actual != digest: + tmp.unlink(missing_ok=True) + raise ValidationError("Blob digest verification failed", details={"expected": digest, "actual": actual}) + tmp.replace(path) + else: + existing = path.read_bytes() + actual = blob_digest(existing) + if actual != digest or len(existing) != len(content): + raise ValidationError( + "Existing blob digest mismatch", + details={"storage_ref": storage_ref, "expected": digest, "actual": actual}, + ) + return BlobWriteResult( + BlobRef( + digest=digest, + size_bytes=len(content), + storage_key=storage_key, + storage_ref=storage_ref, + adapter=self.adapter_name, + media_type=media_type, + ), + created=created, + ) + + def read_bytes(self, storage_ref: str) -> bytes: + path = self._path(self._storage_key(storage_ref)) + if not path.exists(): + raise NotFoundError("Blob not found", details={"storage_ref": storage_ref}) + return path.read_bytes() + + def iter_bytes(self, storage_ref: str, *, chunk_size: int = 65536) -> Iterator[bytes]: + path = self._path(self._storage_key(storage_ref)) + if not path.exists(): + raise NotFoundError("Blob not found", details={"storage_ref": storage_ref}) + size = max(int(chunk_size), 1) + with path.open("rb") as handle: + while chunk := handle.read(size): + yield chunk + + def stat(self, storage_ref: str) -> BlobRef: + storage_key = self._storage_key(storage_ref) + path = self._path(storage_key) + if not path.exists(): + raise NotFoundError("Blob not found", details={"storage_ref": storage_ref}) + return BlobRef( + digest=_digest_from_storage_key(storage_key), + size_bytes=path.stat().st_size, + storage_key=storage_key, + storage_ref=self._storage_ref(storage_key), + adapter=self.adapter_name, + ) + + def exists(self, storage_ref_or_digest: str) -> bool: + try: + storage_key = self._storage_key(storage_ref_or_digest) + except ValueError: + storage_key = digest_storage_key(storage_ref_or_digest) + return self._path(storage_key).exists() + + def iter_blobs(self) -> list[BlobRef]: + root = self.root / "sha256" + if not root.exists(): + return [] + refs = [] + for path in sorted(root.glob("*/*/*")): + if path.is_file() and not path.name.startswith("."): + storage_key = str(path.relative_to(self.root)).replace("\\", "/") + refs.append(self.stat(self._storage_ref(storage_key))) + return refs + + def delete_unreferenced( + self, + referenced_storage_refs: set[str], + *, + dry_run: bool = True, + ) -> BlobCleanupResult: + referenced = {self._storage_key(ref) for ref in referenced_storage_refs if ref.startswith("blob://local/")} + deleted: list[str] = [] + reclaimable = 0 + retained = 0 + for blob in self.iter_blobs(): + if blob.storage_key in referenced: + retained += 1 + continue + reclaimable += blob.size_bytes + deleted.append(blob.storage_ref) + if not dry_run: + self._path(blob.storage_key).unlink(missing_ok=True) + return BlobCleanupResult( + dry_run=dry_run, + deleted_count=len(deleted), + retained_count=retained, + reclaimable_bytes=reclaimable, + deleted_storage_refs=tuple(deleted), + ) + + def _path(self, storage_key: str) -> Path: + return self.root / storage_key + + def _storage_ref(self, storage_key: str) -> str: + return f"blob://local/{storage_key}" + + def _storage_key(self, storage_ref_or_digest: str) -> str: + if storage_ref_or_digest.startswith("blob://local/"): + return storage_ref_or_digest.removeprefix("blob://local/") + if storage_ref_or_digest.startswith("sha256:"): + return digest_storage_key(storage_ref_or_digest) + raise ValueError(f"Unsupported local blob reference: {storage_ref_or_digest}") + + +def _digest_from_storage_key(storage_key: str) -> str: + return "sha256:" + storage_key.rsplit("/", 1)[-1] diff --git a/src/kontextual_engine/adapters/memory/__init__.py b/src/kontextual_engine/adapters/memory/__init__.py index 86c66f2..6f7a629 100644 --- a/src/kontextual_engine/adapters/memory/__init__.py +++ b/src/kontextual_engine/adapters/memory/__init__.py @@ -1,6 +1,6 @@ """In-memory adapters for deterministic tests.""" from .asset_registry import InMemoryAssetRegistryRepository +from .blob_storage import InMemoryBlobStorage -__all__ = ["InMemoryAssetRegistryRepository"] - +__all__ = ["InMemoryAssetRegistryRepository", "InMemoryBlobStorage"] diff --git a/src/kontextual_engine/adapters/memory/blob_storage.py b/src/kontextual_engine/adapters/memory/blob_storage.py new file mode 100644 index 0000000..e0b7ad7 --- /dev/null +++ b/src/kontextual_engine/adapters/memory/blob_storage.py @@ -0,0 +1,109 @@ +"""In-memory content-addressed blob storage for tests.""" + +from __future__ import annotations + +from collections.abc import Iterator + +from kontextual_engine.errors import NotFoundError +from kontextual_engine.ports import BlobCleanupResult, BlobRef, BlobWriteResult, blob_digest, digest_storage_key + + +class InMemoryBlobStorage: + adapter_name = "memory" + + def __init__(self) -> None: + self._blobs: dict[str, bytes] = {} + self._media_types: dict[str, str | None] = {} + + def put_bytes(self, content: bytes, *, media_type: str | None = None) -> BlobWriteResult: + digest = blob_digest(content) + storage_key = digest_storage_key(digest) + storage_ref = self._storage_ref(storage_key) + created = storage_key not in self._blobs + if created: + self._blobs[storage_key] = bytes(content) + self._media_types[storage_key] = media_type + return BlobWriteResult( + BlobRef( + digest=digest, + size_bytes=len(content), + storage_key=storage_key, + storage_ref=storage_ref, + adapter=self.adapter_name, + media_type=media_type or self._media_types.get(storage_key), + ), + created=created, + ) + + def read_bytes(self, storage_ref: str) -> bytes: + storage_key = self._storage_key(storage_ref) + try: + return self._blobs[storage_key] + except KeyError as exc: + raise NotFoundError("Blob not found", details={"storage_ref": storage_ref}) from exc + + def iter_bytes(self, storage_ref: str, *, chunk_size: int = 65536) -> Iterator[bytes]: + content = self.read_bytes(storage_ref) + size = max(int(chunk_size), 1) + for index in range(0, len(content), size): + yield content[index : index + size] + + def stat(self, storage_ref: str) -> BlobRef: + content = self.read_bytes(storage_ref) + storage_key = self._storage_key(storage_ref) + return BlobRef( + digest=blob_digest(content), + size_bytes=len(content), + storage_key=storage_key, + storage_ref=self._storage_ref(storage_key), + adapter=self.adapter_name, + media_type=self._media_types.get(storage_key), + ) + + def exists(self, storage_ref_or_digest: str) -> bool: + try: + storage_key = self._storage_key(storage_ref_or_digest) + except ValueError: + storage_key = digest_storage_key(storage_ref_or_digest) + return storage_key in self._blobs + + def iter_blobs(self) -> list[BlobRef]: + return [self.stat(self._storage_ref(storage_key)) for storage_key in sorted(self._blobs)] + + def delete_unreferenced( + self, + referenced_storage_refs: set[str], + *, + dry_run: bool = True, + ) -> BlobCleanupResult: + referenced = {self._storage_key(ref) for ref in referenced_storage_refs if ref.startswith("blob://memory/")} + deleted: list[str] = [] + reclaimable = 0 + retained = 0 + for storage_key, content in list(self._blobs.items()): + if storage_key in referenced: + retained += 1 + continue + reclaimable += len(content) + storage_ref = self._storage_ref(storage_key) + deleted.append(storage_ref) + if not dry_run: + self._blobs.pop(storage_key, None) + self._media_types.pop(storage_key, None) + return BlobCleanupResult( + dry_run=dry_run, + deleted_count=len(deleted), + retained_count=retained, + reclaimable_bytes=reclaimable, + deleted_storage_refs=tuple(deleted), + ) + + def _storage_ref(self, storage_key: str) -> str: + return f"blob://memory/{storage_key}" + + def _storage_key(self, storage_ref_or_digest: str) -> str: + if storage_ref_or_digest.startswith("blob://memory/"): + return storage_ref_or_digest.removeprefix("blob://memory/") + if storage_ref_or_digest.startswith("sha256:"): + return digest_storage_key(storage_ref_or_digest) + raise ValueError(f"Unsupported memory blob reference: {storage_ref_or_digest}") diff --git a/src/kontextual_engine/adapters/s3/__init__.py b/src/kontextual_engine/adapters/s3/__init__.py new file mode 100644 index 0000000..bd001b1 --- /dev/null +++ b/src/kontextual_engine/adapters/s3/__init__.py @@ -0,0 +1,6 @@ +"""S3-backed blob storage adapter.""" + +from .blob_storage import S3BlobStorage + +__all__ = ["S3BlobStorage"] + diff --git a/src/kontextual_engine/adapters/s3/blob_storage.py b/src/kontextual_engine/adapters/s3/blob_storage.py new file mode 100644 index 0000000..0f4b676 --- /dev/null +++ b/src/kontextual_engine/adapters/s3/blob_storage.py @@ -0,0 +1,198 @@ +"""S3 content-addressed blob storage adapter.""" + +from __future__ import annotations + +from collections.abc import Iterator +from typing import Any + +from kontextual_engine.errors import NotFoundError +from kontextual_engine.ports import BlobCleanupResult, BlobRef, BlobWriteResult, blob_digest, digest_storage_key + + +class S3BlobStorage: + adapter_name = "s3" + + def __init__( + self, + *, + bucket: str, + prefix: str = "", + client: Any | None = None, + ) -> None: + self.bucket = bucket + self.prefix = prefix.strip("/") + if client is None: + import boto3 # type: ignore[import-not-found] + + client = boto3.client("s3") + self.client = client + + def put_bytes(self, content: bytes, *, media_type: str | None = None) -> BlobWriteResult: + digest = blob_digest(content) + storage_key = self._key(digest_storage_key(digest)) + storage_ref = self._storage_ref(storage_key) + created = not self.exists(storage_ref) + if created: + kwargs: dict[str, Any] = { + "Bucket": self.bucket, + "Key": storage_key, + "Body": content, + "Metadata": {"digest": digest, "size-bytes": str(len(content))}, + } + if media_type: + kwargs["ContentType"] = media_type + self.client.put_object(**kwargs) + return BlobWriteResult( + BlobRef( + digest=digest, + size_bytes=len(content), + storage_key=storage_key, + storage_ref=storage_ref, + adapter=self.adapter_name, + media_type=media_type, + ), + created=created, + ) + + def read_bytes(self, storage_ref: str) -> bytes: + storage_key = self._storage_key(storage_ref) + try: + result = self.client.get_object(Bucket=self.bucket, Key=storage_key) + except Exception as exc: + if _is_not_found(exc): + raise NotFoundError("Blob not found", details={"storage_ref": storage_ref}) from exc + raise + body = result["Body"] + return body.read() if hasattr(body, "read") else bytes(body) + + def iter_bytes(self, storage_ref: str, *, chunk_size: int = 65536) -> Iterator[bytes]: + storage_key = self._storage_key(storage_ref) + try: + result = self.client.get_object(Bucket=self.bucket, Key=storage_key) + except Exception as exc: + if _is_not_found(exc): + raise NotFoundError("Blob not found", details={"storage_ref": storage_ref}) from exc + raise + body = result["Body"] + size = max(int(chunk_size), 1) + try: + if hasattr(body, "iter_chunks"): + for chunk in body.iter_chunks(chunk_size=size): + if chunk: + yield chunk + return + while True: + chunk = body.read(size) if hasattr(body, "read") else bytes(body) + if not chunk: + break + yield chunk + if not hasattr(body, "read"): + break + finally: + close = getattr(body, "close", None) + if close: + close() + + def stat(self, storage_ref: str) -> BlobRef: + storage_key = self._storage_key(storage_ref) + try: + result = self.client.head_object(Bucket=self.bucket, Key=storage_key) + except Exception as exc: + if _is_not_found(exc): + raise NotFoundError("Blob not found", details={"storage_ref": storage_ref}) from exc + raise + metadata = dict(result.get("Metadata", {})) + digest = metadata.get("digest") or _digest_from_key(storage_key) + return BlobRef( + digest=digest, + size_bytes=int(result.get("ContentLength", metadata.get("size-bytes", 0))), + storage_key=storage_key, + storage_ref=self._storage_ref(storage_key), + adapter=self.adapter_name, + media_type=result.get("ContentType"), + ) + + def exists(self, storage_ref_or_digest: str) -> bool: + try: + self.stat(storage_ref_or_digest) + return True + except NotFoundError: + return False + + def iter_blobs(self) -> list[BlobRef]: + prefix = f"{self.prefix}/sha256/" if self.prefix else "sha256/" + refs: list[BlobRef] = [] + token: str | None = None + while True: + kwargs: dict[str, Any] = {"Bucket": self.bucket, "Prefix": prefix} + if token: + kwargs["ContinuationToken"] = token + result = self.client.list_objects_v2(**kwargs) + for item in result.get("Contents", []): + key = item["Key"] + refs.append( + BlobRef( + digest=_digest_from_key(key), + size_bytes=int(item.get("Size", 0)), + storage_key=key, + storage_ref=self._storage_ref(key), + adapter=self.adapter_name, + ) + ) + if not result.get("IsTruncated"): + return refs + token = result.get("NextContinuationToken") + + def delete_unreferenced( + self, + referenced_storage_refs: set[str], + *, + dry_run: bool = True, + ) -> BlobCleanupResult: + referenced = {self._storage_key(ref) for ref in referenced_storage_refs if ref.startswith(f"s3://{self.bucket}/")} + deleted: list[str] = [] + reclaimable = 0 + retained = 0 + for blob in self.iter_blobs(): + if blob.storage_key in referenced: + retained += 1 + continue + deleted.append(blob.storage_ref) + reclaimable += blob.size_bytes + if not dry_run: + self.client.delete_object(Bucket=self.bucket, Key=blob.storage_key) + return BlobCleanupResult( + dry_run=dry_run, + deleted_count=len(deleted), + retained_count=retained, + reclaimable_bytes=reclaimable, + deleted_storage_refs=tuple(deleted), + ) + + def _key(self, storage_key: str) -> str: + return f"{self.prefix}/{storage_key}" if self.prefix else storage_key + + def _storage_ref(self, storage_key: str) -> str: + return f"s3://{self.bucket}/{storage_key}" + + def _storage_key(self, storage_ref_or_digest: str) -> str: + if storage_ref_or_digest.startswith(f"s3://{self.bucket}/"): + return storage_ref_or_digest.removeprefix(f"s3://{self.bucket}/") + if storage_ref_or_digest.startswith("sha256:"): + return self._key(digest_storage_key(storage_ref_or_digest)) + if storage_ref_or_digest.startswith("blob://"): + raise ValueError(f"Unsupported S3 blob reference: {storage_ref_or_digest}") + return storage_ref_or_digest + + +def _is_not_found(exc: Exception) -> bool: + response = getattr(exc, "response", None) + if isinstance(response, dict): + code = str(response.get("Error", {}).get("Code", "")) + status = str(response.get("ResponseMetadata", {}).get("HTTPStatusCode", "")) + return code in {"404", "NoSuchKey", "NotFound"} or status == "404" + return False + + +def _digest_from_key(key: str) -> str: + return "sha256:" + key.rsplit("/", 1)[-1] diff --git a/src/kontextual_engine/api/app.py b/src/kontextual_engine/api/app.py index 53d159e..bf58e74 100644 --- a/src/kontextual_engine/api/app.py +++ b/src/kontextual_engine/api/app.py @@ -12,7 +12,7 @@ from datetime import datetime from importlib import metadata from typing import Any -from kontextual_engine.adapters.memory import InMemoryAssetRegistryRepository +from kontextual_engine.adapters.memory import InMemoryAssetRegistryRepository, InMemoryBlobStorage from kontextual_engine.core import ( Actor, ActorType, @@ -52,7 +52,7 @@ from kontextual_engine.core import ( utc_now, ) from kontextual_engine.errors import AuthorizationError, KontextualError, NotFoundError, ValidationError -from kontextual_engine.ports import AllowAllPolicyGateway, AssetRegistryRepository, PolicyGateway +from kontextual_engine.ports import AllowAllPolicyGateway, AssetRegistryRepository, BlobStorage, PolicyGateway from kontextual_engine.services import ( AssetIngestionService, AssetQueryRequest, @@ -60,6 +60,7 @@ from kontextual_engine.services import ( AssetRetrievalService, ContextEntityQueryRequest, RelationshipQueryRequest, + RepresentationContentService, RetrievalFeedbackRequest, TransformationRequest, TransformationService, @@ -179,6 +180,7 @@ AGENT_OPERATION_CATALOG: tuple[dict[str, Any], ...] = ( @dataclass class ServiceRuntime: repository: AssetRegistryRepository = field(default_factory=InMemoryAssetRegistryRepository) + blob_storage: BlobStorage = field(default_factory=InMemoryBlobStorage) policy_gateway: PolicyGateway = field(default_factory=AllowAllPolicyGateway) api_version: str = API_VERSION service_name: str = "kontextual-engine" @@ -193,6 +195,14 @@ class ServiceRuntime: def retrieval_service(self) -> AssetRetrievalService: return AssetRetrievalService(self.repository, policy_gateway=self.policy_gateway) + def content_service(self) -> RepresentationContentService: + return RepresentationContentService( + self.repository, + self.blob_storage, + policy_gateway=self.policy_gateway, + asset_service=self.asset_service(), + ) + def transformation_service(self) -> TransformationService: return TransformationService( self.repository, @@ -407,6 +417,25 @@ class ServiceRuntime: ) return content_stream + def cmis_content_stream_bytes( + self, + access_point_id: str, + object_id: str, + context: OperationContext, + ): + mapper = self._cmis_mapper(access_point_id) + decision = mapper.access_point.decide_action(CMISAction.GET_CONTENT_STREAM, context, resource=object_id) + if not decision.allowed: + raise _cmis_authorization_error(decision, "getContentStream") + asset_id = _cmis_asset_id(object_id) + asset = self.repository.get_asset(asset_id) + if not mapper.access_point.exposes_asset(asset, context): + raise NotFoundError( + "CMIS object not found", + details={"object_id": object_id, "access_point_id": access_point_id}, + ) + return self.content_service().stream_content(asset_id, context) + def cmis_acl( self, access_point_id: str, @@ -466,23 +495,23 @@ class ServiceRuntime: "metadata": dict(payload.get("classification_metadata", {})), } ) + asset_id = payload.get("asset_id") or new_id("asset") content = payload.get("content") representations = [] if content is not None: - representations.append( - AssetRepresentation.from_content( - payload.get("asset_id") or "cmis-new-document", - RepresentationKind.SOURCE, - payload.get("media_type", "text/plain"), - content, - storage_ref=payload.get("storage_ref"), - ) + representation, _blob, _created = self.content_service().build_representation_from_bytes( + asset_id, + RepresentationKind.SOURCE, + payload.get("media_type", "text/plain"), + content, + metadata={"cmis": {"operation": "createDocument"}}, ) + representations.append(representation) result = self.asset_service().create_asset( payload["name"], classification, context, - asset_id=payload.get("asset_id"), + asset_id=asset_id, representations=representations, metadata_records=[_metadata_record(item) for item in payload.get("metadata_records", [])], idempotency_key=payload.get("idempotency_key"), @@ -527,21 +556,29 @@ class ServiceRuntime: if not decision.allowed: raise _cmis_authorization_error(decision, "setContentStream") asset_id = _cmis_asset_id(object_id) - representation = AssetRepresentation.from_content( + self.content_service().add_representation_from_bytes( asset_id, payload.get("kind", RepresentationKind.SOURCE.value), payload.get("media_type", "text/plain"), payload.get("content", ""), - storage_ref=payload.get("storage_ref"), - ) - self.asset_service().add_representation( - asset_id, - representation, context, expected_current_version_id=payload.get("expected_current_version_id"), + metadata={"cmis": {"operation": "setContentStream"}}, ) return self.cmis_object(access_point_id, object_id, context) + def representation_content_stream( + self, + asset_id: str, + representation_id: str, + context: OperationContext, + ): + return self.content_service().stream_content( + asset_id, + context, + representation_id=representation_id, + ) + def cmis_delete_object( self, access_point_id: str, @@ -2031,7 +2068,7 @@ class ServiceRuntime: def create_app(runtime: ServiceRuntime | None = None): try: from fastapi import Depends, FastAPI, Header, HTTPException, Query - from fastapi.responses import JSONResponse + from fastapi.responses import JSONResponse, StreamingResponse except ImportError as exc: # pragma: no cover - exercised when optional extra is absent raise RuntimeError( "FastAPI service dependencies are not installed. Install kontextual-engine[service]." @@ -2202,6 +2239,25 @@ def create_app(runtime: ServiceRuntime | None = None): ) -> dict[str, Any]: return response(runtime.cmis_content_stream, access_point_id, object_id, context) + @app.get("/cmis/{access_point_id}/browser/content-bytes/{object_id:path}", tags=["cmis"]) + def cmis_content_stream_bytes( + access_point_id: str, + object_id: str, + context: OperationContext = Depends(context_from_headers), + ) -> StreamingResponse: + result = response(runtime.cmis_content_stream_bytes, access_point_id, object_id, context) + representation = result.representation + return StreamingResponse( + result.chunks, + media_type=representation.media_type, + headers={ + "Content-Length": str(representation.size_bytes), + "ETag": representation.digest, + "X-Kontextual-Representation-Id": representation.representation_id, + "X-Kontextual-Storage-Ref": representation.storage_ref or "", + }, + ) + @app.get("/cmis/{access_point_id}/browser/acl/{object_id:path}", tags=["cmis"]) def cmis_acl( access_point_id: str, @@ -2323,6 +2379,25 @@ def create_app(runtime: ServiceRuntime | None = None): def get_asset(asset_id: str) -> dict[str, Any]: return response(runtime.get_asset, asset_id) + @app.get(f"{prefix}/assets/{{asset_id}}/representations/{{representation_id}}/content", tags=["assets"]) + def get_representation_content( + asset_id: str, + representation_id: str, + context: OperationContext = Depends(context_from_headers), + ) -> StreamingResponse: + result = response(runtime.representation_content_stream, asset_id, representation_id, context) + representation = result.representation + return StreamingResponse( + result.chunks, + media_type=representation.media_type, + headers={ + "Content-Length": str(representation.size_bytes), + "ETag": representation.digest, + "X-Kontextual-Representation-Id": representation.representation_id, + "X-Kontextual-Storage-Ref": representation.storage_ref or "", + }, + ) + @app.post(f"{prefix}/assets/{{asset_id}}/metadata", tags=["metadata"]) def add_metadata( asset_id: str, diff --git a/src/kontextual_engine/core/cmis.py b/src/kontextual_engine/core/cmis.py index b380493..e96c7a2 100644 --- a/src/kontextual_engine/core/cmis.py +++ b/src/kontextual_engine/core/cmis.py @@ -858,7 +858,9 @@ def _preferred_representation( RepresentationKind.NORMALIZED: 1, RepresentationKind.DERIVED: 2, } - return sorted(representations, key=lambda item: priority.get(item.kind, 99))[0] + best_priority = min(priority.get(item.kind, 99) for item in representations) + candidates = [item for item in representations if priority.get(item.kind, 99) == best_priority] + return sorted(candidates, key=lambda item: (item.created_at, item.representation_id), reverse=True)[0] def _normalize_path(path: str) -> str: diff --git a/src/kontextual_engine/ports/__init__.py b/src/kontextual_engine/ports/__init__.py index b9f21d8..98ec100 100644 --- a/src/kontextual_engine/ports/__init__.py +++ b/src/kontextual_engine/ports/__init__.py @@ -1,5 +1,13 @@ """Stable ports owned by the engine.""" +from .blob_storage import ( + BlobCleanupResult, + BlobRef, + BlobStorage, + BlobWriteResult, + blob_digest, + digest_storage_key, +) from .ingestion import DirectorySourceConnector, FormatExtractor, SourceConnector from .policy import AllowAllPolicyGateway, PolicyGateway from .repositories import AssetRegistryRepository @@ -7,6 +15,12 @@ from .repositories import AssetRegistryRepository __all__ = [ "AllowAllPolicyGateway", "AssetRegistryRepository", + "BlobCleanupResult", + "BlobRef", + "BlobStorage", + "BlobWriteResult", + "blob_digest", + "digest_storage_key", "DirectorySourceConnector", "FormatExtractor", "PolicyGateway", diff --git a/src/kontextual_engine/ports/blob_storage.py b/src/kontextual_engine/ports/blob_storage.py new file mode 100644 index 0000000..ca2678b --- /dev/null +++ b/src/kontextual_engine/ports/blob_storage.py @@ -0,0 +1,95 @@ +"""Blob storage port and content-addressed reference models.""" + +from __future__ import annotations + +from dataclasses import dataclass +from collections.abc import Iterator +from typing import Protocol + +from kontextual_engine.core import content_digest +from kontextual_engine.errors import ValidationError + + +@dataclass(frozen=True) +class BlobRef: + digest: str + size_bytes: int + storage_key: str + storage_ref: str + adapter: str + media_type: str | None = None + + def to_dict(self) -> dict[str, object]: + data: dict[str, object] = { + "digest": self.digest, + "size_bytes": self.size_bytes, + "storage_key": self.storage_key, + "storage_ref": self.storage_ref, + "adapter": self.adapter, + } + if self.media_type: + data["media_type"] = self.media_type + return data + + +@dataclass(frozen=True) +class BlobWriteResult: + blob: BlobRef + created: bool + + def to_dict(self) -> dict[str, object]: + return {"blob": self.blob.to_dict(), "created": self.created} + + +@dataclass(frozen=True) +class BlobCleanupResult: + dry_run: bool + deleted_count: int + retained_count: int + reclaimable_bytes: int + deleted_storage_refs: tuple[str, ...] = () + + def to_dict(self) -> dict[str, object]: + return { + "dry_run": self.dry_run, + "deleted_count": self.deleted_count, + "retained_count": self.retained_count, + "reclaimable_bytes": self.reclaimable_bytes, + "deleted_storage_refs": list(self.deleted_storage_refs), + } + + +class BlobStorage(Protocol): + adapter_name: str + + def put_bytes(self, content: bytes, *, media_type: str | None = None) -> BlobWriteResult: ... + + def read_bytes(self, storage_ref: str) -> bytes: ... + + def iter_bytes(self, storage_ref: str, *, chunk_size: int = 65536) -> Iterator[bytes]: ... + + def stat(self, storage_ref: str) -> BlobRef: ... + + def exists(self, storage_ref_or_digest: str) -> bool: ... + + def iter_blobs(self) -> list[BlobRef]: ... + + def delete_unreferenced( + self, + referenced_storage_refs: set[str], + *, + dry_run: bool = True, + ) -> BlobCleanupResult: ... + + +def blob_digest(content: bytes) -> str: + return content_digest(content) + + +def digest_storage_key(digest: str) -> str: + if not digest.startswith("sha256:"): + raise ValidationError("Unsupported blob digest", details={"digest": digest}) + value = digest.removeprefix("sha256:") + if len(value) < 4: + raise ValidationError("Invalid blob digest", details={"digest": digest}) + return f"sha256/{value[:2]}/{value[2:4]}/{value}" diff --git a/src/kontextual_engine/services/__init__.py b/src/kontextual_engine/services/__init__.py index 66a8839..0f26c55 100644 --- a/src/kontextual_engine/services/__init__.py +++ b/src/kontextual_engine/services/__init__.py @@ -5,6 +5,7 @@ from .asset_service import ( AssetRegistryService, RelationshipChangeResult, ) +from .content_service import RepresentationContentResult, RepresentationContentStream, RepresentationContentService from .ingestion_service import AssetIngestionResult, AssetIngestionService from .retrieval_service import ( AssetQueryItem, @@ -53,6 +54,9 @@ __all__ = [ "ContextEntityQueryResult", "LexicalIndexRefreshResult", "RelationshipChangeResult", + "RepresentationContentResult", + "RepresentationContentStream", + "RepresentationContentService", "RelationshipQueryItem", "RelationshipQueryRequest", "RelationshipQueryResult", diff --git a/src/kontextual_engine/services/content_service.py b/src/kontextual_engine/services/content_service.py new file mode 100644 index 0000000..7222112 --- /dev/null +++ b/src/kontextual_engine/services/content_service.py @@ -0,0 +1,383 @@ +"""Governed representation byte storage and streaming service.""" + +from __future__ import annotations + +import hashlib +from collections.abc import Iterable, Iterator +from dataclasses import dataclass +from typing import Any + +from kontextual_engine.core import ( + AssetRepresentation, + AuditEvent, + AuditOutcome, + OperationContext, + PolicyDecision, + RepresentationKind, + new_id, +) +from kontextual_engine.errors import AuthorizationError, NotFoundError, ValidationError +from kontextual_engine.ports import ( + AllowAllPolicyGateway, + AssetRegistryRepository, + BlobCleanupResult, + BlobRef, + BlobStorage, + PolicyGateway, +) +from kontextual_engine.services.asset_service import AssetChangeResult, AssetRegistryService + + +@dataclass(frozen=True) +class RepresentationContentResult: + representation: AssetRepresentation + content: bytes + blob: BlobRef + policy_decision: PolicyDecision + audit_event: AuditEvent + + def to_dict(self, *, include_content: bool = False) -> dict[str, Any]: + data = { + "representation": self.representation.to_dict(), + "content_stream": { + "representation_id": self.representation.representation_id, + "asset_id": self.representation.asset_id, + "media_type": self.representation.media_type, + "digest": self.representation.digest, + "size_bytes": self.representation.size_bytes, + "storage_ref": self.representation.storage_ref, + "blob": self.blob.to_dict(), + }, + "policy_decision": self.policy_decision.to_dict(), + "audit_event": self.audit_event.to_dict(), + } + if include_content: + data["content"] = self.content + return data + + +@dataclass(frozen=True) +class RepresentationContentStream: + representation: AssetRepresentation + chunks: Iterable[bytes] + blob: BlobRef + policy_decision: PolicyDecision + audit_event: AuditEvent + + +class RepresentationContentService: + def __init__( + self, + repository: AssetRegistryRepository, + blob_storage: BlobStorage, + *, + policy_gateway: PolicyGateway | None = None, + asset_service: AssetRegistryService | None = None, + ) -> None: + self.repository = repository + self.blob_storage = blob_storage + self.policy_gateway = policy_gateway or AllowAllPolicyGateway() + self.asset_service = asset_service or AssetRegistryService( + repository, + policy_gateway=self.policy_gateway, + ) + + def build_representation_from_bytes( + self, + asset_id: str, + kind: RepresentationKind | str, + media_type: str, + content: str | bytes, + *, + producer: str | None = None, + source_ref_id: str | None = None, + metadata: dict[str, Any] | None = None, + representation_id: str | None = None, + ) -> tuple[AssetRepresentation, BlobRef, bool]: + data = content.encode("utf-8") if isinstance(content, str) else bytes(content) + write = self.blob_storage.put_bytes(data, media_type=media_type) + blob = write.blob + representation = AssetRepresentation( + asset_id=asset_id, + kind=RepresentationKind(kind), + media_type=media_type, + digest=blob.digest, + size_bytes=blob.size_bytes, + storage_ref=blob.storage_ref, + producer=producer, + source_ref_id=source_ref_id, + metadata={"blob_adapter": blob.adapter, **dict(metadata or {})}, + representation_id=representation_id or new_id("repr"), + ) + return representation, blob, write.created + + def add_representation_from_bytes( + self, + asset_id: str, + kind: RepresentationKind | str, + media_type: str, + content: str | bytes, + context: OperationContext, + *, + expected_current_version_id: str | None = None, + producer: str | None = None, + source_ref_id: str | None = None, + metadata: dict[str, Any] | None = None, + ) -> AssetChangeResult: + representation, _blob, _created = self.build_representation_from_bytes( + asset_id, + kind, + media_type, + content, + producer=producer, + source_ref_id=source_ref_id, + metadata=metadata, + ) + return self.asset_service.add_representation( + asset_id, + representation, + context, + expected_current_version_id=expected_current_version_id, + ) + + def get_content_stream( + self, + asset_id: str, + context: OperationContext, + *, + representation_id: str | None = None, + kind: RepresentationKind | str | None = None, + ) -> RepresentationContentResult: + asset = self.repository.get_asset(asset_id) + representation = self._representation(asset_id, representation_id=representation_id, kind=kind) + if not representation.storage_ref: + raise NotFoundError( + "Representation content is not available in blob storage", + details={"asset_id": asset_id, "representation_id": representation.representation_id}, + ) + decision = self._authorize( + context, + "asset.content_stream.read", + f"asset:{asset.id}", + resource_metadata={ + "representation_id": representation.representation_id, + "digest": representation.digest, + "media_type": representation.media_type, + }, + ) + try: + blob = self.blob_storage.stat(representation.storage_ref) + content = self.blob_storage.read_bytes(representation.storage_ref) + except ValueError as exc: + raise NotFoundError( + "Representation content is not available in configured blob storage", + details={ + "asset_id": asset_id, + "representation_id": representation.representation_id, + "storage_ref": representation.storage_ref, + }, + ) from exc + if blob.digest != representation.digest: + raise ValidationError( + "Representation digest does not match stored blob", + details={ + "representation_id": representation.representation_id, + "representation_digest": representation.digest, + "blob_digest": blob.digest, + }, + ) + actual_digest = "sha256:" + hashlib.sha256(content).hexdigest() + if actual_digest != representation.digest: + raise ValidationError( + "Representation content does not match expected digest", + details={ + "representation_id": representation.representation_id, + "representation_digest": representation.digest, + "actual_digest": actual_digest, + }, + ) + event = self._audit( + "asset.content_stream.read", + f"asset:{asset.id}", + AuditOutcome.SUCCESS, + context, + decision, + details={"representation_id": representation.representation_id, "digest": representation.digest}, + ) + return RepresentationContentResult(representation, content, blob, decision, event) + + def stream_content( + self, + asset_id: str, + context: OperationContext, + *, + representation_id: str | None = None, + kind: RepresentationKind | str | None = None, + chunk_size: int = 65536, + ) -> RepresentationContentStream: + asset = self.repository.get_asset(asset_id) + representation = self._representation(asset_id, representation_id=representation_id, kind=kind) + if not representation.storage_ref: + raise NotFoundError( + "Representation content is not available in blob storage", + details={"asset_id": asset_id, "representation_id": representation.representation_id}, + ) + decision = self._authorize( + context, + "asset.content_stream.read", + f"asset:{asset.id}", + resource_metadata={ + "representation_id": representation.representation_id, + "digest": representation.digest, + "media_type": representation.media_type, + }, + ) + try: + blob = self.blob_storage.stat(representation.storage_ref) + except ValueError as exc: + raise NotFoundError( + "Representation content is not available in configured blob storage", + details={ + "asset_id": asset_id, + "representation_id": representation.representation_id, + "storage_ref": representation.storage_ref, + }, + ) from exc + if blob.digest != representation.digest: + raise ValidationError( + "Representation digest does not match stored blob", + details={ + "representation_id": representation.representation_id, + "representation_digest": representation.digest, + "blob_digest": blob.digest, + }, + ) + event = self._audit( + "asset.content_stream.read", + f"asset:{asset.id}", + AuditOutcome.SUCCESS, + context, + decision, + details={"representation_id": representation.representation_id, "digest": representation.digest}, + ) + return RepresentationContentStream( + representation, + self._verified_chunks(representation, chunk_size=chunk_size), + blob, + decision, + event, + ) + + def referenced_storage_refs(self) -> set[str]: + return { + representation.storage_ref + for representation in self.repository.list_representations() + if representation.storage_ref + } + + def cleanup_unreferenced_blobs(self, *, dry_run: bool = True) -> BlobCleanupResult: + return self.blob_storage.delete_unreferenced(self.referenced_storage_refs(), dry_run=dry_run) + + def _representation( + self, + asset_id: str, + *, + representation_id: str | None, + kind: RepresentationKind | str | None, + ) -> AssetRepresentation: + if representation_id: + representation = self.repository.get_representation(representation_id) + if representation.asset_id != asset_id: + raise NotFoundError( + "Representation not found for asset", + details={"asset_id": asset_id, "representation_id": representation_id}, + ) + return representation + parsed_kind = RepresentationKind(kind) if kind else None + representations = self.repository.list_representations(asset_id=asset_id, kind=parsed_kind) + if not representations: + raise NotFoundError("Representation not found", details={"asset_id": asset_id, "kind": kind}) + priority = { + RepresentationKind.SOURCE: 0, + RepresentationKind.NORMALIZED: 1, + RepresentationKind.DERIVED: 2, + } + best_priority = min(priority.get(item.kind, 99) for item in representations) + candidates = [item for item in representations if priority.get(item.kind, 99) == best_priority] + return sorted(candidates, key=lambda item: (item.created_at, item.representation_id), reverse=True)[0] + + def _verified_chunks( + self, + representation: AssetRepresentation, + *, + chunk_size: int, + ) -> Iterator[bytes]: + if not representation.storage_ref: + raise NotFoundError( + "Representation content is not available in blob storage", + details={"asset_id": representation.asset_id, "representation_id": representation.representation_id}, + ) + hasher = hashlib.sha256() + for chunk in self.blob_storage.iter_bytes(representation.storage_ref, chunk_size=chunk_size): + hasher.update(chunk) + yield chunk + actual_digest = "sha256:" + hasher.hexdigest() + if actual_digest != representation.digest: + raise ValidationError( + "Representation content does not match expected digest", + details={ + "representation_id": representation.representation_id, + "representation_digest": representation.digest, + "actual_digest": actual_digest, + }, + ) + + def _authorize( + self, + context: OperationContext, + action: str, + resource: str, + *, + resource_metadata: dict[str, str] | None = None, + ) -> PolicyDecision: + self.repository.save_actor(context.actor) + decision = self.policy_gateway.authorize( + context, + action, + resource, + resource_metadata=resource_metadata, + ) + if not decision.allowed: + event = self._audit(action, resource, AuditOutcome.DENIED, context, decision) + raise AuthorizationError( + "Operation denied by policy", + details={ + "action": action, + "resource": resource, + "correlation_id": context.correlation_id, + "audit_event_id": event.event_id, + "policy_decision": decision.to_dict(), + }, + ) + return decision + + def _audit( + self, + operation: str, + target: str, + outcome: AuditOutcome, + context: OperationContext, + policy_decision: PolicyDecision, + *, + details: dict[str, Any] | None = None, + ) -> AuditEvent: + return self.repository.save_audit_event( + AuditEvent.from_context( + operation, + target, + outcome, + context, + policy_decision=policy_decision, + details=details, + ) + ) diff --git a/tests/cmis/test_cmis_browser_binding_api.py b/tests/cmis/test_cmis_browser_binding_api.py index 41c58c0..e6e5b8d 100644 --- a/tests/cmis/test_cmis_browser_binding_api.py +++ b/tests/cmis/test_cmis_browser_binding_api.py @@ -83,6 +83,7 @@ def test_cmis_browser_binding_routes_are_advertised_in_openapi(cmis_client) -> N assert "/cmis/{access_point_id}/browser/children" in paths assert "/cmis/{access_point_id}/browser/object/{object_id}" in paths assert "/cmis/{access_point_id}/browser/content/{object_id}" in paths + assert "/cmis/{access_point_id}/browser/content-bytes/{object_id}" in paths assert "/cmis/{access_point_id}/browser/acl/{object_id}" in paths assert "/cmis/{access_point_id}/browser/parents/{object_id}" in paths assert "/cmis/{access_point_id}/browser/query" in paths @@ -184,6 +185,9 @@ def test_cmis_governed_authoring_routes_allow_selected_mutations(cmis_client) -> "/cmis/governed-authoring/browser/object/cmis:asset:asset-api-authored/content", json={"content": "# Updated", "media_type": "text/markdown"}, ) + byte_stream = cmis_client.get( + "/cmis/governed-authoring/browser/content-bytes/cmis:asset:asset-api-authored", + ) deleted = cmis_client.post( "/cmis/governed-authoring/browser/object/cmis:asset:asset-api-authored/delete", json={}, @@ -192,6 +196,8 @@ def test_cmis_governed_authoring_routes_allow_selected_mutations(cmis_client) -> assert created.status_code == 200 assert updated.json()["properties"]["kontextual:metadata:status"] == "draft" assert streamed.json()["content_stream"]["mime_type"] == "text/markdown" + assert byte_stream.content == b"# Updated" + assert byte_stream.headers["etag"].startswith("sha256:") assert deleted.json()["lifecycle"] == "delete_requested" diff --git a/tests/cmis/test_cmis_runtime_browser_binding.py b/tests/cmis/test_cmis_runtime_browser_binding.py index 316cabd..ac1761a 100644 --- a/tests/cmis/test_cmis_runtime_browser_binding.py +++ b/tests/cmis/test_cmis_runtime_browser_binding.py @@ -164,6 +164,11 @@ def test_runtime_cmis_governed_authoring_allows_selected_mutations(cmis_runtime) {"content": "# Authored\n\nUpdated stream.", "media_type": "text/markdown"}, context, ) + stream_bytes = runtime.cmis_content_stream_bytes( + "governed-authoring", + "cmis:asset:asset-authored", + context, + ) deleted = runtime.cmis_delete_object( "governed-authoring", "cmis:asset:asset-authored", @@ -174,6 +179,8 @@ def test_runtime_cmis_governed_authoring_allows_selected_mutations(cmis_runtime) assert created["object_id"] == "cmis:asset:asset-authored" assert updated["properties"]["kontextual:metadata:reviewer"] == "codex" assert streamed["content_stream"]["mime_type"] == "text/markdown" + assert b"".join(stream_bytes.chunks) == b"# Authored\n\nUpdated stream." + assert stream_bytes.representation.storage_ref.startswith("blob://memory/") assert deleted["deleted"] is False assert deleted["lifecycle"] == "delete_requested" diff --git a/tests/test_blob_storage.py b/tests/test_blob_storage.py new file mode 100644 index 0000000..263195f --- /dev/null +++ b/tests/test_blob_storage.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +from io import BytesIO + +from kontextual_engine import InMemoryBlobStorage, LocalBlobStorage, S3BlobStorage, content_digest + + +def test_memory_blob_storage_deduplicates_by_digest() -> None: + storage = InMemoryBlobStorage() + first = storage.put_bytes(b"same content", media_type="text/plain") + second = storage.put_bytes(b"same content", media_type="text/plain") + + assert first.created is True + assert second.created is False + assert first.blob.storage_ref == second.blob.storage_ref + assert storage.read_bytes(first.blob.storage_ref) == b"same content" + assert b"".join(storage.iter_bytes(first.blob.storage_ref, chunk_size=4)) == b"same content" + assert storage.exists(first.blob.digest) is True + assert len(storage.iter_blobs()) == 1 + + +def test_local_blob_storage_stores_one_file_for_duplicate_content(tmp_path) -> None: + storage = LocalBlobStorage(tmp_path / "blobs") + first = storage.put_bytes(b"local content", media_type="text/plain") + second = storage.put_bytes(b"local content", media_type="text/plain") + + assert first.created is True + assert second.created is False + assert first.blob.storage_ref == second.blob.storage_ref + assert storage.read_bytes(first.blob.storage_ref) == b"local content" + assert b"".join(storage.iter_bytes(first.blob.storage_ref, chunk_size=5)) == b"local content" + assert len(storage.iter_blobs()) == 1 + assert len([path for path in (tmp_path / "blobs").rglob("*") if path.is_file()]) == 1 + + +def test_blob_cleanup_dry_run_and_delete(tmp_path) -> None: + storage = LocalBlobStorage(tmp_path / "blobs") + kept = storage.put_bytes(b"kept").blob + orphan = storage.put_bytes(b"orphan").blob + + dry_run = storage.delete_unreferenced({kept.storage_ref}, dry_run=True) + deleted = storage.delete_unreferenced({kept.storage_ref}, dry_run=False) + + assert dry_run.deleted_count == 1 + assert dry_run.reclaimable_bytes == len(b"orphan") + assert orphan.storage_ref in dry_run.deleted_storage_refs + assert deleted.deleted_storage_refs == (orphan.storage_ref,) + assert storage.exists(kept.storage_ref) is True + assert storage.exists(orphan.storage_ref) is False + + +def test_s3_blob_storage_uses_content_addressed_keys_with_fake_client() -> None: + client = FakeS3Client() + storage = S3BlobStorage(bucket="test-bucket", prefix="kontextual", client=client) + + first = storage.put_bytes(b"s3 content", media_type="text/plain") + second = storage.put_bytes(b"s3 content", media_type="text/plain") + readback = storage.read_bytes(first.blob.storage_ref) + listed = storage.iter_blobs() + + assert first.created is True + assert second.created is False + assert first.blob.storage_ref == second.blob.storage_ref + assert first.blob.digest == content_digest(b"s3 content") + assert first.blob.storage_key.startswith("kontextual/sha256/") + assert readback == b"s3 content" + assert b"".join(storage.iter_bytes(first.blob.storage_ref, chunk_size=2)) == b"s3 content" + assert [item.storage_ref for item in listed] == [first.blob.storage_ref] + assert client.put_count == 1 + + +class FakeS3Client: + def __init__(self) -> None: + self.objects: dict[tuple[str, str], dict] = {} + self.put_count = 0 + + def head_object(self, *, Bucket: str, Key: str) -> dict: + try: + item = self.objects[(Bucket, Key)] + except KeyError as exc: + raise FakeS3NotFound() from exc + return { + "ContentLength": len(item["Body"]), + "ContentType": item.get("ContentType"), + "Metadata": item.get("Metadata", {}), + } + + def put_object(self, **kwargs) -> None: + self.put_count += 1 + self.objects[(kwargs["Bucket"], kwargs["Key"])] = kwargs + + def get_object(self, *, Bucket: str, Key: str) -> dict: + try: + item = self.objects[(Bucket, Key)] + except KeyError as exc: + raise FakeS3NotFound() from exc + return {"Body": BytesIO(item["Body"])} + + def list_objects_v2(self, *, Bucket: str, Prefix: str, ContinuationToken: str | None = None) -> dict: + contents = [ + {"Key": key, "Size": len(item["Body"])} + for (bucket, key), item in sorted(self.objects.items()) + if bucket == Bucket and key.startswith(Prefix) + ] + return {"Contents": contents, "IsTruncated": False} + + def delete_object(self, *, Bucket: str, Key: str) -> None: + self.objects.pop((Bucket, Key), None) + + +class FakeS3NotFound(Exception): + response = {"Error": {"Code": "NoSuchKey"}, "ResponseMetadata": {"HTTPStatusCode": 404}} diff --git a/tests/test_representation_content_service.py b/tests/test_representation_content_service.py new file mode 100644 index 0000000..3935a62 --- /dev/null +++ b/tests/test_representation_content_service.py @@ -0,0 +1,216 @@ +from __future__ import annotations + +import pytest + +from kontextual_engine import ( + Actor, + ActorType, + AssetRegistryService, + AuthorizationError, + Classification, + InMemoryAssetRegistryRepository, + InMemoryBlobStorage, + LocalBlobStorage, + OperationContext, + PolicyDecision, + RepresentationContentService, + RepresentationKind, + Sensitivity, + ValidationError, +) + + +def test_content_service_adds_representation_bytes_with_deduplicated_blob() -> None: + repo = InMemoryAssetRegistryRepository() + blobs = InMemoryBlobStorage() + context = operation_context() + AssetRegistryService(repo).create_asset( + "Content Asset", + Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), + context, + asset_id="asset-content", + ) + service = RepresentationContentService(repo, blobs) + + first = service.add_representation_from_bytes( + "asset-content", + RepresentationKind.SOURCE, + "text/plain", + b"same bytes", + context, + ) + second = service.add_representation_from_bytes( + "asset-content", + RepresentationKind.DERIVED, + "text/plain", + b"same bytes", + context, + ) + + representations = repo.list_representations(asset_id="asset-content") + assert len(representations) == 2 + assert representations[0].storage_ref == representations[1].storage_ref + assert first.version.sequence == 2 + assert second.version.sequence == 3 + assert len(blobs.iter_blobs()) == 1 + + +def test_content_service_reads_bytes_with_policy_and_audit() -> None: + repo = InMemoryAssetRegistryRepository() + blobs = InMemoryBlobStorage() + context = operation_context() + AssetRegistryService(repo).create_asset( + "Readable", + Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), + context, + asset_id="asset-readable", + ) + service = RepresentationContentService(repo, blobs) + service.add_representation_from_bytes( + "asset-readable", + RepresentationKind.SOURCE, + "text/plain", + b"read me", + context, + ) + + content = service.get_content_stream("asset-readable", context) + streamed = service.stream_content("asset-readable", context, chunk_size=3) + + assert content.content == b"read me" + assert b"".join(streamed.chunks) == b"read me" + assert content.representation.media_type == "text/plain" + assert content.blob.storage_ref == content.representation.storage_ref + assert content.audit_event.operation == "asset.content_stream.read" + assert repo.list_audit_events(target="asset:asset-readable")[-1].operation == "asset.content_stream.read" + + +def test_content_service_reads_source_normalized_and_derived_by_kind() -> None: + repo = InMemoryAssetRegistryRepository() + blobs = InMemoryBlobStorage() + context = operation_context() + AssetRegistryService(repo).create_asset( + "Kinds", + Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), + context, + asset_id="asset-kinds", + ) + service = RepresentationContentService(repo, blobs) + service.add_representation_from_bytes("asset-kinds", RepresentationKind.SOURCE, "text/plain", b"source", context) + service.add_representation_from_bytes( + "asset-kinds", + RepresentationKind.NORMALIZED, + "text/plain", + b"normalized", + context, + ) + service.add_representation_from_bytes("asset-kinds", RepresentationKind.DERIVED, "text/plain", b"derived", context) + + assert service.get_content_stream("asset-kinds", context, kind=RepresentationKind.SOURCE).content == b"source" + assert service.get_content_stream("asset-kinds", context, kind=RepresentationKind.NORMALIZED).content == b"normalized" + assert service.get_content_stream("asset-kinds", context, kind=RepresentationKind.DERIVED).content == b"derived" + + +def test_content_service_denies_bytes_before_exposure() -> None: + repo = InMemoryAssetRegistryRepository() + blobs = InMemoryBlobStorage() + context = operation_context() + AssetRegistryService(repo).create_asset( + "Denied", + Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), + context, + asset_id="asset-denied-stream", + ) + writer = RepresentationContentService(repo, blobs) + writer.add_representation_from_bytes( + "asset-denied-stream", + RepresentationKind.SOURCE, + "text/plain", + b"secret", + context, + ) + reader = RepresentationContentService(repo, blobs, policy_gateway=DenyContentPolicy()) + + with pytest.raises(AuthorizationError): + reader.get_content_stream("asset-denied-stream", context) + + +def test_content_service_cleanup_uses_repository_references() -> None: + repo = InMemoryAssetRegistryRepository() + blobs = InMemoryBlobStorage() + context = operation_context() + AssetRegistryService(repo).create_asset( + "Cleanup", + Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), + context, + asset_id="asset-cleanup", + ) + service = RepresentationContentService(repo, blobs) + service.add_representation_from_bytes( + "asset-cleanup", + RepresentationKind.SOURCE, + "text/plain", + b"kept", + context, + ) + orphan = blobs.put_bytes(b"orphan").blob + + cleanup = service.cleanup_unreferenced_blobs(dry_run=True) + + assert cleanup.deleted_count == 1 + assert cleanup.deleted_storage_refs == (orphan.storage_ref,) + assert cleanup.reclaimable_bytes == len(b"orphan") + + +def test_content_service_detects_corrupted_stored_content(tmp_path) -> None: + repo = InMemoryAssetRegistryRepository() + blobs = LocalBlobStorage(tmp_path / "blobs") + context = operation_context() + AssetRegistryService(repo).create_asset( + "Corrupt", + Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), + context, + asset_id="asset-corrupt", + ) + service = RepresentationContentService(repo, blobs) + service.add_representation_from_bytes( + "asset-corrupt", + RepresentationKind.SOURCE, + "text/plain", + b"expected", + context, + ) + representation = repo.list_representations(asset_id="asset-corrupt")[0] + path = blobs.root / representation.storage_ref.removeprefix("blob://local/") + path.write_bytes(b"corrupted") + + with pytest.raises(ValidationError): + service.get_content_stream("asset-corrupt", context) + with pytest.raises(ValidationError): + b"".join(service.stream_content("asset-corrupt", context).chunks) + + +def operation_context() -> OperationContext: + return OperationContext.create( + Actor.create(ActorType.HUMAN, actor_id="content-test"), + correlation_id="corr-content", + ) + + +class DenyContentPolicy: + def authorize( + self, + context: OperationContext, + action: str, + resource: str, + *, + resource_metadata: dict[str, str] | None = None, + ) -> PolicyDecision: + if action == "asset.content_stream.read": + return PolicyDecision.deny( + context.actor.id, + action, + resource, + reason="content reads disabled", + ) + return PolicyDecision.allow(context.actor.id, action, resource) diff --git a/tests/test_service_api.py b/tests/test_service_api.py index c0975b8..42121da 100644 --- a/tests/test_service_api.py +++ b/tests/test_service_api.py @@ -658,7 +658,9 @@ def test_service_health_readiness_version_and_openapi_contracts(client) -> None: assert "/cmis/{access_point_id}/browser" in paths assert "/cmis/{access_point_id}/browser/children" in paths assert "/cmis/{access_point_id}/browser/acl/{object_id}" in paths + assert "/cmis/{access_point_id}/browser/content-bytes/{object_id}" in paths assert "/cmis/{access_point_id}/browser/parents/{object_id}" in paths + assert "/api/v1/assets/{asset_id}/representations/{representation_id}/content" in paths assert "/cmis/{access_point_id}/browser/document" in paths assert "/cmis/{access_point_id}/browser/object/{object_id}/properties" in paths assert "/api/v1/assets" in paths diff --git a/workplans/KONT-WP-0013-blob-storage-content-streaming.md b/workplans/KONT-WP-0013-blob-storage-content-streaming.md index 67adb72..156cb23 100644 --- a/workplans/KONT-WP-0013-blob-storage-content-streaming.md +++ b/workplans/KONT-WP-0013-blob-storage-content-streaming.md @@ -4,7 +4,7 @@ type: workplan title: "Blob Storage Deduplication And Content Streaming" domain: markitect repo: kontextual-engine -status: active +status: completed owner: codex topic_slug: markitect planning_priority: high @@ -37,6 +37,10 @@ This workplan adds content-addressed blob infrastructure and stream interfaces. It does not introduce AtomPub, SOAP/Web Services, chunk-level deduplication, or a general document-management storage model. +It includes an optional S3 backend as an infrastructure adapter behind the same +blob storage port. S3 object keys are digest-derived, so object storage can be +used without changing engine semantics or CMIS profile governance. + ## Architecture Constraint Blob bytes are infrastructure state. Engine semantics remain attached to @@ -48,7 +52,7 @@ engine-native content services instead of bypassing governance. ```task id: KONT-WP-0013-T001 -status: todo +status: done priority: high state_hub_task_id: "6bb5b49a-cf9f-47ce-86d3-24b47a20a2c6" ``` @@ -66,7 +70,7 @@ Acceptance: ```task id: KONT-WP-0013-T002 -status: todo +status: done priority: high state_hub_task_id: "661386c7-8094-4f0f-928c-c17f5b3a9132" ``` @@ -82,7 +86,7 @@ Acceptance: ```task id: KONT-WP-0013-T003 -status: todo +status: done priority: high state_hub_task_id: "00bc34c5-0f79-47b6-b305-f47311edd3a7" ``` @@ -98,7 +102,7 @@ Acceptance: ```task id: KONT-WP-0013-T004 -status: todo +status: done priority: medium state_hub_task_id: "cc4445d9-f773-4337-afd4-aeccc743dc1e" ``` @@ -113,7 +117,7 @@ Acceptance: ```task id: KONT-WP-0013-T005 -status: todo +status: done priority: high state_hub_task_id: "db0e8a2d-50ce-439c-8393-d65e2fc4bc9e" ``` @@ -129,7 +133,7 @@ Acceptance: ```task id: KONT-WP-0013-T006 -status: todo +status: done priority: high state_hub_task_id: "2f1da1fb-9634-4ba6-931a-3e29394efd37" ``` @@ -146,7 +150,7 @@ Acceptance: ```task id: KONT-WP-0013-T007 -status: todo +status: done priority: medium state_hub_task_id: "987ad4f6-8658-4e93-82c2-b9fa0a3a2270" ``` @@ -168,3 +172,15 @@ Acceptance: - Existing tests continue to pass. - Focused dedupe/content-stream tests cover duplicate content, readback, policy denial, cleanup dry-run, and CMIS integration. + +## Completion Notes + +- Implemented `BlobStorage` port with `put_bytes`, `read_bytes`, `iter_bytes`, + `stat`, `exists`, and `delete_unreferenced`. +- Added in-memory, local filesystem, and optional S3 content-addressed adapters. +- Added governed representation content service for byte-backed + representations, chunked streams, policy checks, audit events, and cleanup. +- Wired CMIS `setContentStream` and byte stream routes through the content + service; repeated content updates now expose the latest source representation. +- Added tests for dedupe, local/S3 adapter behavior, content-kind reads, policy + denial, cleanup dry-run, and CMIS stream integration.