content-addressed blob storage: blob_storage.py, memory, local, and S3 adapters

This commit is contained in:
2026-05-07 03:51:25 +02:00
parent c2bc7071d7
commit ebace73761
22 changed files with 1489 additions and 47 deletions

View File

@@ -190,7 +190,7 @@ Required MVP ports:
- Repository port for assets, representations, metadata, relationships, - Repository port for assets, representations, metadata, relationships,
versions, runs, audit events, and exports. versions, runs, audit events, and exports.
- Object/content store port for source, normalized, and derived content payloads. - Blob/content store port for source, normalized, and derived content payloads.
- Search index port for lexical search and later semantic/hybrid retrieval. - Search index port for lexical search and later semantic/hybrid retrieval.
- Extractor port for format-specific normalization. - Extractor port for format-specific normalization.
- Connector port for source systems. - Connector port for source systems.
@@ -211,6 +211,10 @@ Adapter rules:
Markitect where useful, but they are not the canonical engine identity or Markitect where useful, but they are not the canonical engine identity or
storage model. The canonical layer remains asset, representation, metadata, storage model. The canonical layer remains asset, representation, metadata,
lifecycle, policy, lineage, and audit state. lifecycle, policy, lineage, and audit state.
- Blob storage is infrastructure behind `AssetRepresentation.storage_ref`.
Whole-object content addressing, digest verification, and chunked byte
streaming belong behind the blob port. Local filesystem and S3 are adapters,
not different domain models.
- `llm-connect` or equivalent is an adapter for LLM providers. - `llm-connect` or equivalent is an adapter for LLM providers.
- `phase-memory` is an adjacent memory runtime; this engine may exchange opaque - `phase-memory` is an adjacent memory runtime; this engine may exchange opaque
memory references or context packages but should not implement memory phases. memory references or context packages but should not implement memory phases.
@@ -251,6 +255,9 @@ Recommended storage style:
adapter-specific payloads. adapter-specific payloads.
- Separate content/object references for large source, normalized, or derived - Separate content/object references for large source, normalized, or derived
payloads. payloads.
- Store blob bytes outside repository rows when content is non-trivial. Keep
representation digest, size, media type, kind, producer, and storage ref in
the repository, and let blob adapters handle byte persistence and dedupe.
- Append-only audit events and change records. - Append-only audit events and change records.
- Deterministic ordering fields for pagination and tests. - Deterministic ordering fields for pagination and tests.

View File

@@ -2,7 +2,7 @@
Date: 2026-05-07 Date: 2026-05-07
Status: planned. Status: implemented.
## Purpose ## Purpose
@@ -11,23 +11,25 @@ normalized, and derived representations can reference real content bytes
without duplicating storage. Expose those bytes through engine-native without duplicating storage. Expose those bytes through engine-native
interfaces and CMIS content stream routes. interfaces and CMIS content stream routes.
## Current State ## Implemented State
The engine already records representation metadata: The engine records representation metadata:
- digest, - digest,
- size, - size,
- media type, - media type,
- representation kind, - representation kind,
- opaque `storage_ref`. - `storage_ref`.
It does not yet provide: It now provides:
- a content-addressed blob store, - a content-addressed blob storage port,
- deduplicating writes, - in-memory, local filesystem, and optional S3 adapters,
- blob read/stream interfaces, - deduplicating writes by `sha256:<hex>` digest,
- reference accounting or garbage collection, - whole-byte reads plus chunked `iter_bytes(...)` streaming,
- CMIS byte-stream download semantics. - representation-level content service governance,
- reference accounting and dry-run/active cleanup,
- CMIS Browser Binding content stream byte routes.
## Target Architecture ## Target Architecture
@@ -35,7 +37,7 @@ It does not yet provide:
bytes bytes
-> digest/size verification -> digest/size verification
-> BlobStoragePort -> BlobStoragePort
-> content-addressed adapter -> content-addressed adapter (memory/local/S3)
-> AssetRepresentation storage_ref -> AssetRepresentation storage_ref
-> governed representation service -> governed representation service
-> service API / CMIS content stream -> service API / CMIS content stream
@@ -60,29 +62,52 @@ justifies the complexity.
## Interfaces ## Interfaces
Planned engine-native interfaces: Engine-native interfaces:
- `BlobStoragePort.put_bytes(...)` - `BlobStoragePort.put_bytes(...)`
- `BlobStoragePort.open_bytes(...)` - `BlobStoragePort.read_bytes(...)`
- `BlobStoragePort.iter_bytes(...)`
- `BlobStoragePort.stat(...)` - `BlobStoragePort.stat(...)`
- `BlobStoragePort.exists(...)` - `BlobStoragePort.exists(...)`
- `BlobStoragePort.delete_unreferenced(...)` - `BlobStoragePort.delete_unreferenced(...)`
- `RepresentationContentService.add_representation_from_bytes(...)` - `RepresentationContentService.add_representation_from_bytes(...)`
- `RepresentationContentService.get_content_stream(...)` - `RepresentationContentService.get_content_stream(...)`
- `RepresentationContentService.stream_content(...)`
Planned CMIS integration: CMIS integration:
- `getContentStream` returns actual bytes/stream with content headers, - `getContentStream` returns actual bytes/stream with content headers,
- `setContentStream` stores through deduplicating representation service, - `setContentStream` stores through deduplicating representation service,
- content stream changes produce versions and audit events, - content stream changes produce versions and audit events,
- descriptors remain available for clients that only need metadata. - descriptors remain available for clients that only need metadata.
## Storage Backends
- `InMemoryBlobStorage` supports deterministic unit tests and default runtime
wiring.
- `LocalBlobStorage` stores content under digest-derived paths and uses atomic
temporary writes.
- `S3BlobStorage` is available through the optional `kontextual-engine[s3]`
extra and keeps S3 concerns behind the same blob port. It uses digest-derived
object keys and streams object bodies in chunks.
The engine stores only the returned `storage_ref` on representations. Backend
selection is therefore a deployment concern, not a domain-model fork.
## Migration Posture
Existing opaque `storage_ref` values remain valid metadata, but content bytes
can only be streamed when the configured blob adapter can resolve the reference.
Migration should import external content through
`RepresentationContentService.add_representation_from_bytes(...)` so dedupe,
digest verification, policy, versions, and audit events are preserved.
## Risks ## Risks
- Large files may require streaming APIs rather than in-memory bytes. - Very large files may require upload-side streaming beyond the current
byte-based write API.
- Local filesystem adapters need atomic writes and digest verification. - Local filesystem adapters need atomic writes and digest verification.
- Garbage collection must never delete referenced blobs. - Garbage collection must never delete referenced blobs.
- Security must treat blob bytes as governed content, not public storage. - Security must treat blob bytes as governed content, not public storage.
- Existing `storage_ref` values may point to external sources and should remain - Existing `storage_ref` values may point to external sources and should remain
valid as opaque references. valid as opaque references.

View File

@@ -25,6 +25,9 @@ service = [
storage = [ storage = [
"sqlalchemy>=2.0", "sqlalchemy>=2.0",
] ]
s3 = [
"boto3>=1.34",
]
markdown = [ markdown = [
"markitect-tool @ file:///home/worsch/markitect-tool", "markitect-tool @ file:///home/worsch/markitect-tool",
] ]

View File

@@ -12,6 +12,9 @@ from .artifacts import (
content_digest, content_digest,
) )
from .adapters.memory import InMemoryAssetRegistryRepository from .adapters.memory import InMemoryAssetRegistryRepository
from .adapters.memory import InMemoryBlobStorage
from .adapters.local_files import LocalBlobStorage
from .adapters.s3 import S3BlobStorage
from .adapters.sqlite import SQLiteAssetRegistryRepository from .adapters.sqlite import SQLiteAssetRegistryRepository
from .api import ServiceRuntime, create_app from .api import ServiceRuntime, create_app
from .context import ContextAssembler, ContextItem, ContextPackage from .context import ContextAssembler, ContextItem, ContextPackage
@@ -98,6 +101,10 @@ from .ingestion import IngestionRequest, IngestionResult, IngestionService
from .ports import ( from .ports import (
AllowAllPolicyGateway, AllowAllPolicyGateway,
AssetRegistryRepository, AssetRegistryRepository,
BlobCleanupResult,
BlobRef,
BlobStorage,
BlobWriteResult,
DirectorySourceConnector, DirectorySourceConnector,
FormatExtractor, FormatExtractor,
PolicyGateway, PolicyGateway,
@@ -122,6 +129,9 @@ from .services import (
RelationshipQueryItem, RelationshipQueryItem,
RelationshipQueryRequest, RelationshipQueryRequest,
RelationshipQueryResult, RelationshipQueryResult,
RepresentationContentResult,
RepresentationContentStream,
RepresentationContentService,
RetrievalFeedbackRequest, RetrievalFeedbackRequest,
RetrievalFeedbackResult, RetrievalFeedbackResult,
RetrievalQualityMetrics, RetrievalQualityMetrics,
@@ -168,6 +178,10 @@ __all__ = [
"AssetRegistryRepository", "AssetRegistryRepository",
"AssetRegistryService", "AssetRegistryService",
"AssetRetrievalService", "AssetRetrievalService",
"BlobCleanupResult",
"BlobRef",
"BlobStorage",
"BlobWriteResult",
"AssetVersion", "AssetVersion",
"AuditEvent", "AuditEvent",
"AuditOutcome", "AuditOutcome",
@@ -202,6 +216,7 @@ __all__ = [
"ExtractorCapability", "ExtractorCapability",
"FormatExtractor", "FormatExtractor",
"InMemoryAssetRegistryRepository", "InMemoryAssetRegistryRepository",
"InMemoryBlobStorage",
"InMemoryKnowledgeRepository", "InMemoryKnowledgeRepository",
"IngestionRequest", "IngestionRequest",
"IngestionResult", "IngestionResult",
@@ -217,6 +232,7 @@ __all__ = [
"KontextualError", "KontextualError",
"LexicalIndexRefreshResult", "LexicalIndexRefreshResult",
"LifecycleState", "LifecycleState",
"LocalBlobStorage",
"MetadataFieldDefinition", "MetadataFieldDefinition",
"MetadataRecord", "MetadataRecord",
"MetadataSchema", "MetadataSchema",
@@ -240,6 +256,9 @@ __all__ = [
"RelationshipQueryItem", "RelationshipQueryItem",
"RelationshipQueryRequest", "RelationshipQueryRequest",
"RelationshipQueryResult", "RelationshipQueryResult",
"RepresentationContentResult",
"RepresentationContentStream",
"RepresentationContentService",
"RelationshipTargetKind", "RelationshipTargetKind",
"RelationshipType", "RelationshipType",
"RepresentationKind", "RepresentationKind",
@@ -256,6 +275,7 @@ __all__ = [
"SourceReference", "SourceReference",
"SourceConnector", "SourceConnector",
"SourcePayload", "SourcePayload",
"S3BlobStorage",
"SQLiteAssetRegistryRepository", "SQLiteAssetRegistryRepository",
"TransformationExecutionContext", "TransformationExecutionContext",
"TransformationOperation", "TransformationOperation",

View File

@@ -1,5 +1,6 @@
"""Local filesystem ingestion connector.""" """Local filesystem ingestion connector."""
from .blob_storage import LocalBlobStorage
from .connector import LocalFileConnector from .connector import LocalFileConnector
__all__ = ["LocalFileConnector"] __all__ = ["LocalBlobStorage", "LocalFileConnector"]

View File

@@ -0,0 +1,141 @@
"""Local filesystem content-addressed blob storage."""
from __future__ import annotations
from collections.abc import Iterator
from pathlib import Path
from kontextual_engine.core import new_id
from kontextual_engine.errors import NotFoundError, ValidationError
from kontextual_engine.ports import BlobCleanupResult, BlobRef, BlobWriteResult, blob_digest, digest_storage_key
class LocalBlobStorage:
adapter_name = "local"
def __init__(self, root: str | Path) -> None:
self.root = Path(root).expanduser().resolve()
def put_bytes(self, content: bytes, *, media_type: str | None = None) -> BlobWriteResult:
digest = blob_digest(content)
storage_key = digest_storage_key(digest)
path = self._path(storage_key)
storage_ref = self._storage_ref(storage_key)
created = not path.exists()
if created:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.parent / f".{path.name}.{new_id('tmp')}"
tmp.write_bytes(content)
actual = blob_digest(tmp.read_bytes())
if actual != digest:
tmp.unlink(missing_ok=True)
raise ValidationError("Blob digest verification failed", details={"expected": digest, "actual": actual})
tmp.replace(path)
else:
existing = path.read_bytes()
actual = blob_digest(existing)
if actual != digest or len(existing) != len(content):
raise ValidationError(
"Existing blob digest mismatch",
details={"storage_ref": storage_ref, "expected": digest, "actual": actual},
)
return BlobWriteResult(
BlobRef(
digest=digest,
size_bytes=len(content),
storage_key=storage_key,
storage_ref=storage_ref,
adapter=self.adapter_name,
media_type=media_type,
),
created=created,
)
def read_bytes(self, storage_ref: str) -> bytes:
path = self._path(self._storage_key(storage_ref))
if not path.exists():
raise NotFoundError("Blob not found", details={"storage_ref": storage_ref})
return path.read_bytes()
def iter_bytes(self, storage_ref: str, *, chunk_size: int = 65536) -> Iterator[bytes]:
path = self._path(self._storage_key(storage_ref))
if not path.exists():
raise NotFoundError("Blob not found", details={"storage_ref": storage_ref})
size = max(int(chunk_size), 1)
with path.open("rb") as handle:
while chunk := handle.read(size):
yield chunk
def stat(self, storage_ref: str) -> BlobRef:
storage_key = self._storage_key(storage_ref)
path = self._path(storage_key)
if not path.exists():
raise NotFoundError("Blob not found", details={"storage_ref": storage_ref})
return BlobRef(
digest=_digest_from_storage_key(storage_key),
size_bytes=path.stat().st_size,
storage_key=storage_key,
storage_ref=self._storage_ref(storage_key),
adapter=self.adapter_name,
)
def exists(self, storage_ref_or_digest: str) -> bool:
try:
storage_key = self._storage_key(storage_ref_or_digest)
except ValueError:
storage_key = digest_storage_key(storage_ref_or_digest)
return self._path(storage_key).exists()
def iter_blobs(self) -> list[BlobRef]:
root = self.root / "sha256"
if not root.exists():
return []
refs = []
for path in sorted(root.glob("*/*/*")):
if path.is_file() and not path.name.startswith("."):
storage_key = str(path.relative_to(self.root)).replace("\\", "/")
refs.append(self.stat(self._storage_ref(storage_key)))
return refs
def delete_unreferenced(
self,
referenced_storage_refs: set[str],
*,
dry_run: bool = True,
) -> BlobCleanupResult:
referenced = {self._storage_key(ref) for ref in referenced_storage_refs if ref.startswith("blob://local/")}
deleted: list[str] = []
reclaimable = 0
retained = 0
for blob in self.iter_blobs():
if blob.storage_key in referenced:
retained += 1
continue
reclaimable += blob.size_bytes
deleted.append(blob.storage_ref)
if not dry_run:
self._path(blob.storage_key).unlink(missing_ok=True)
return BlobCleanupResult(
dry_run=dry_run,
deleted_count=len(deleted),
retained_count=retained,
reclaimable_bytes=reclaimable,
deleted_storage_refs=tuple(deleted),
)
def _path(self, storage_key: str) -> Path:
return self.root / storage_key
def _storage_ref(self, storage_key: str) -> str:
return f"blob://local/{storage_key}"
def _storage_key(self, storage_ref_or_digest: str) -> str:
if storage_ref_or_digest.startswith("blob://local/"):
return storage_ref_or_digest.removeprefix("blob://local/")
if storage_ref_or_digest.startswith("sha256:"):
return digest_storage_key(storage_ref_or_digest)
raise ValueError(f"Unsupported local blob reference: {storage_ref_or_digest}")
def _digest_from_storage_key(storage_key: str) -> str:
return "sha256:" + storage_key.rsplit("/", 1)[-1]

View File

@@ -1,6 +1,6 @@
"""In-memory adapters for deterministic tests.""" """In-memory adapters for deterministic tests."""
from .asset_registry import InMemoryAssetRegistryRepository from .asset_registry import InMemoryAssetRegistryRepository
from .blob_storage import InMemoryBlobStorage
__all__ = ["InMemoryAssetRegistryRepository"] __all__ = ["InMemoryAssetRegistryRepository", "InMemoryBlobStorage"]

View File

@@ -0,0 +1,109 @@
"""In-memory content-addressed blob storage for tests."""
from __future__ import annotations
from collections.abc import Iterator
from kontextual_engine.errors import NotFoundError
from kontextual_engine.ports import BlobCleanupResult, BlobRef, BlobWriteResult, blob_digest, digest_storage_key
class InMemoryBlobStorage:
adapter_name = "memory"
def __init__(self) -> None:
self._blobs: dict[str, bytes] = {}
self._media_types: dict[str, str | None] = {}
def put_bytes(self, content: bytes, *, media_type: str | None = None) -> BlobWriteResult:
digest = blob_digest(content)
storage_key = digest_storage_key(digest)
storage_ref = self._storage_ref(storage_key)
created = storage_key not in self._blobs
if created:
self._blobs[storage_key] = bytes(content)
self._media_types[storage_key] = media_type
return BlobWriteResult(
BlobRef(
digest=digest,
size_bytes=len(content),
storage_key=storage_key,
storage_ref=storage_ref,
adapter=self.adapter_name,
media_type=media_type or self._media_types.get(storage_key),
),
created=created,
)
def read_bytes(self, storage_ref: str) -> bytes:
storage_key = self._storage_key(storage_ref)
try:
return self._blobs[storage_key]
except KeyError as exc:
raise NotFoundError("Blob not found", details={"storage_ref": storage_ref}) from exc
def iter_bytes(self, storage_ref: str, *, chunk_size: int = 65536) -> Iterator[bytes]:
content = self.read_bytes(storage_ref)
size = max(int(chunk_size), 1)
for index in range(0, len(content), size):
yield content[index : index + size]
def stat(self, storage_ref: str) -> BlobRef:
content = self.read_bytes(storage_ref)
storage_key = self._storage_key(storage_ref)
return BlobRef(
digest=blob_digest(content),
size_bytes=len(content),
storage_key=storage_key,
storage_ref=self._storage_ref(storage_key),
adapter=self.adapter_name,
media_type=self._media_types.get(storage_key),
)
def exists(self, storage_ref_or_digest: str) -> bool:
try:
storage_key = self._storage_key(storage_ref_or_digest)
except ValueError:
storage_key = digest_storage_key(storage_ref_or_digest)
return storage_key in self._blobs
def iter_blobs(self) -> list[BlobRef]:
return [self.stat(self._storage_ref(storage_key)) for storage_key in sorted(self._blobs)]
def delete_unreferenced(
self,
referenced_storage_refs: set[str],
*,
dry_run: bool = True,
) -> BlobCleanupResult:
referenced = {self._storage_key(ref) for ref in referenced_storage_refs if ref.startswith("blob://memory/")}
deleted: list[str] = []
reclaimable = 0
retained = 0
for storage_key, content in list(self._blobs.items()):
if storage_key in referenced:
retained += 1
continue
reclaimable += len(content)
storage_ref = self._storage_ref(storage_key)
deleted.append(storage_ref)
if not dry_run:
self._blobs.pop(storage_key, None)
self._media_types.pop(storage_key, None)
return BlobCleanupResult(
dry_run=dry_run,
deleted_count=len(deleted),
retained_count=retained,
reclaimable_bytes=reclaimable,
deleted_storage_refs=tuple(deleted),
)
def _storage_ref(self, storage_key: str) -> str:
return f"blob://memory/{storage_key}"
def _storage_key(self, storage_ref_or_digest: str) -> str:
if storage_ref_or_digest.startswith("blob://memory/"):
return storage_ref_or_digest.removeprefix("blob://memory/")
if storage_ref_or_digest.startswith("sha256:"):
return digest_storage_key(storage_ref_or_digest)
raise ValueError(f"Unsupported memory blob reference: {storage_ref_or_digest}")

View File

@@ -0,0 +1,6 @@
"""S3-backed blob storage adapter."""
from .blob_storage import S3BlobStorage
__all__ = ["S3BlobStorage"]

View File

@@ -0,0 +1,198 @@
"""S3 content-addressed blob storage adapter."""
from __future__ import annotations
from collections.abc import Iterator
from typing import Any
from kontextual_engine.errors import NotFoundError
from kontextual_engine.ports import BlobCleanupResult, BlobRef, BlobWriteResult, blob_digest, digest_storage_key
class S3BlobStorage:
adapter_name = "s3"
def __init__(
self,
*,
bucket: str,
prefix: str = "",
client: Any | None = None,
) -> None:
self.bucket = bucket
self.prefix = prefix.strip("/")
if client is None:
import boto3 # type: ignore[import-not-found]
client = boto3.client("s3")
self.client = client
def put_bytes(self, content: bytes, *, media_type: str | None = None) -> BlobWriteResult:
digest = blob_digest(content)
storage_key = self._key(digest_storage_key(digest))
storage_ref = self._storage_ref(storage_key)
created = not self.exists(storage_ref)
if created:
kwargs: dict[str, Any] = {
"Bucket": self.bucket,
"Key": storage_key,
"Body": content,
"Metadata": {"digest": digest, "size-bytes": str(len(content))},
}
if media_type:
kwargs["ContentType"] = media_type
self.client.put_object(**kwargs)
return BlobWriteResult(
BlobRef(
digest=digest,
size_bytes=len(content),
storage_key=storage_key,
storage_ref=storage_ref,
adapter=self.adapter_name,
media_type=media_type,
),
created=created,
)
def read_bytes(self, storage_ref: str) -> bytes:
storage_key = self._storage_key(storage_ref)
try:
result = self.client.get_object(Bucket=self.bucket, Key=storage_key)
except Exception as exc:
if _is_not_found(exc):
raise NotFoundError("Blob not found", details={"storage_ref": storage_ref}) from exc
raise
body = result["Body"]
return body.read() if hasattr(body, "read") else bytes(body)
def iter_bytes(self, storage_ref: str, *, chunk_size: int = 65536) -> Iterator[bytes]:
storage_key = self._storage_key(storage_ref)
try:
result = self.client.get_object(Bucket=self.bucket, Key=storage_key)
except Exception as exc:
if _is_not_found(exc):
raise NotFoundError("Blob not found", details={"storage_ref": storage_ref}) from exc
raise
body = result["Body"]
size = max(int(chunk_size), 1)
try:
if hasattr(body, "iter_chunks"):
for chunk in body.iter_chunks(chunk_size=size):
if chunk:
yield chunk
return
while True:
chunk = body.read(size) if hasattr(body, "read") else bytes(body)
if not chunk:
break
yield chunk
if not hasattr(body, "read"):
break
finally:
close = getattr(body, "close", None)
if close:
close()
def stat(self, storage_ref: str) -> BlobRef:
storage_key = self._storage_key(storage_ref)
try:
result = self.client.head_object(Bucket=self.bucket, Key=storage_key)
except Exception as exc:
if _is_not_found(exc):
raise NotFoundError("Blob not found", details={"storage_ref": storage_ref}) from exc
raise
metadata = dict(result.get("Metadata", {}))
digest = metadata.get("digest") or _digest_from_key(storage_key)
return BlobRef(
digest=digest,
size_bytes=int(result.get("ContentLength", metadata.get("size-bytes", 0))),
storage_key=storage_key,
storage_ref=self._storage_ref(storage_key),
adapter=self.adapter_name,
media_type=result.get("ContentType"),
)
def exists(self, storage_ref_or_digest: str) -> bool:
try:
self.stat(storage_ref_or_digest)
return True
except NotFoundError:
return False
def iter_blobs(self) -> list[BlobRef]:
prefix = f"{self.prefix}/sha256/" if self.prefix else "sha256/"
refs: list[BlobRef] = []
token: str | None = None
while True:
kwargs: dict[str, Any] = {"Bucket": self.bucket, "Prefix": prefix}
if token:
kwargs["ContinuationToken"] = token
result = self.client.list_objects_v2(**kwargs)
for item in result.get("Contents", []):
key = item["Key"]
refs.append(
BlobRef(
digest=_digest_from_key(key),
size_bytes=int(item.get("Size", 0)),
storage_key=key,
storage_ref=self._storage_ref(key),
adapter=self.adapter_name,
)
)
if not result.get("IsTruncated"):
return refs
token = result.get("NextContinuationToken")
def delete_unreferenced(
self,
referenced_storage_refs: set[str],
*,
dry_run: bool = True,
) -> BlobCleanupResult:
referenced = {self._storage_key(ref) for ref in referenced_storage_refs if ref.startswith(f"s3://{self.bucket}/")}
deleted: list[str] = []
reclaimable = 0
retained = 0
for blob in self.iter_blobs():
if blob.storage_key in referenced:
retained += 1
continue
deleted.append(blob.storage_ref)
reclaimable += blob.size_bytes
if not dry_run:
self.client.delete_object(Bucket=self.bucket, Key=blob.storage_key)
return BlobCleanupResult(
dry_run=dry_run,
deleted_count=len(deleted),
retained_count=retained,
reclaimable_bytes=reclaimable,
deleted_storage_refs=tuple(deleted),
)
def _key(self, storage_key: str) -> str:
return f"{self.prefix}/{storage_key}" if self.prefix else storage_key
def _storage_ref(self, storage_key: str) -> str:
return f"s3://{self.bucket}/{storage_key}"
def _storage_key(self, storage_ref_or_digest: str) -> str:
if storage_ref_or_digest.startswith(f"s3://{self.bucket}/"):
return storage_ref_or_digest.removeprefix(f"s3://{self.bucket}/")
if storage_ref_or_digest.startswith("sha256:"):
return self._key(digest_storage_key(storage_ref_or_digest))
if storage_ref_or_digest.startswith("blob://"):
raise ValueError(f"Unsupported S3 blob reference: {storage_ref_or_digest}")
return storage_ref_or_digest
def _is_not_found(exc: Exception) -> bool:
response = getattr(exc, "response", None)
if isinstance(response, dict):
code = str(response.get("Error", {}).get("Code", ""))
status = str(response.get("ResponseMetadata", {}).get("HTTPStatusCode", ""))
return code in {"404", "NoSuchKey", "NotFound"} or status == "404"
return False
def _digest_from_key(key: str) -> str:
return "sha256:" + key.rsplit("/", 1)[-1]

View File

@@ -12,7 +12,7 @@ from datetime import datetime
from importlib import metadata from importlib import metadata
from typing import Any from typing import Any
from kontextual_engine.adapters.memory import InMemoryAssetRegistryRepository from kontextual_engine.adapters.memory import InMemoryAssetRegistryRepository, InMemoryBlobStorage
from kontextual_engine.core import ( from kontextual_engine.core import (
Actor, Actor,
ActorType, ActorType,
@@ -52,7 +52,7 @@ from kontextual_engine.core import (
utc_now, utc_now,
) )
from kontextual_engine.errors import AuthorizationError, KontextualError, NotFoundError, ValidationError from kontextual_engine.errors import AuthorizationError, KontextualError, NotFoundError, ValidationError
from kontextual_engine.ports import AllowAllPolicyGateway, AssetRegistryRepository, PolicyGateway from kontextual_engine.ports import AllowAllPolicyGateway, AssetRegistryRepository, BlobStorage, PolicyGateway
from kontextual_engine.services import ( from kontextual_engine.services import (
AssetIngestionService, AssetIngestionService,
AssetQueryRequest, AssetQueryRequest,
@@ -60,6 +60,7 @@ from kontextual_engine.services import (
AssetRetrievalService, AssetRetrievalService,
ContextEntityQueryRequest, ContextEntityQueryRequest,
RelationshipQueryRequest, RelationshipQueryRequest,
RepresentationContentService,
RetrievalFeedbackRequest, RetrievalFeedbackRequest,
TransformationRequest, TransformationRequest,
TransformationService, TransformationService,
@@ -179,6 +180,7 @@ AGENT_OPERATION_CATALOG: tuple[dict[str, Any], ...] = (
@dataclass @dataclass
class ServiceRuntime: class ServiceRuntime:
repository: AssetRegistryRepository = field(default_factory=InMemoryAssetRegistryRepository) repository: AssetRegistryRepository = field(default_factory=InMemoryAssetRegistryRepository)
blob_storage: BlobStorage = field(default_factory=InMemoryBlobStorage)
policy_gateway: PolicyGateway = field(default_factory=AllowAllPolicyGateway) policy_gateway: PolicyGateway = field(default_factory=AllowAllPolicyGateway)
api_version: str = API_VERSION api_version: str = API_VERSION
service_name: str = "kontextual-engine" service_name: str = "kontextual-engine"
@@ -193,6 +195,14 @@ class ServiceRuntime:
def retrieval_service(self) -> AssetRetrievalService: def retrieval_service(self) -> AssetRetrievalService:
return AssetRetrievalService(self.repository, policy_gateway=self.policy_gateway) return AssetRetrievalService(self.repository, policy_gateway=self.policy_gateway)
def content_service(self) -> RepresentationContentService:
return RepresentationContentService(
self.repository,
self.blob_storage,
policy_gateway=self.policy_gateway,
asset_service=self.asset_service(),
)
def transformation_service(self) -> TransformationService: def transformation_service(self) -> TransformationService:
return TransformationService( return TransformationService(
self.repository, self.repository,
@@ -407,6 +417,25 @@ class ServiceRuntime:
) )
return content_stream return content_stream
def cmis_content_stream_bytes(
self,
access_point_id: str,
object_id: str,
context: OperationContext,
):
mapper = self._cmis_mapper(access_point_id)
decision = mapper.access_point.decide_action(CMISAction.GET_CONTENT_STREAM, context, resource=object_id)
if not decision.allowed:
raise _cmis_authorization_error(decision, "getContentStream")
asset_id = _cmis_asset_id(object_id)
asset = self.repository.get_asset(asset_id)
if not mapper.access_point.exposes_asset(asset, context):
raise NotFoundError(
"CMIS object not found",
details={"object_id": object_id, "access_point_id": access_point_id},
)
return self.content_service().stream_content(asset_id, context)
def cmis_acl( def cmis_acl(
self, self,
access_point_id: str, access_point_id: str,
@@ -466,23 +495,23 @@ class ServiceRuntime:
"metadata": dict(payload.get("classification_metadata", {})), "metadata": dict(payload.get("classification_metadata", {})),
} }
) )
asset_id = payload.get("asset_id") or new_id("asset")
content = payload.get("content") content = payload.get("content")
representations = [] representations = []
if content is not None: if content is not None:
representations.append( representation, _blob, _created = self.content_service().build_representation_from_bytes(
AssetRepresentation.from_content( asset_id,
payload.get("asset_id") or "cmis-new-document", RepresentationKind.SOURCE,
RepresentationKind.SOURCE, payload.get("media_type", "text/plain"),
payload.get("media_type", "text/plain"), content,
content, metadata={"cmis": {"operation": "createDocument"}},
storage_ref=payload.get("storage_ref"),
)
) )
representations.append(representation)
result = self.asset_service().create_asset( result = self.asset_service().create_asset(
payload["name"], payload["name"],
classification, classification,
context, context,
asset_id=payload.get("asset_id"), asset_id=asset_id,
representations=representations, representations=representations,
metadata_records=[_metadata_record(item) for item in payload.get("metadata_records", [])], metadata_records=[_metadata_record(item) for item in payload.get("metadata_records", [])],
idempotency_key=payload.get("idempotency_key"), idempotency_key=payload.get("idempotency_key"),
@@ -527,21 +556,29 @@ class ServiceRuntime:
if not decision.allowed: if not decision.allowed:
raise _cmis_authorization_error(decision, "setContentStream") raise _cmis_authorization_error(decision, "setContentStream")
asset_id = _cmis_asset_id(object_id) asset_id = _cmis_asset_id(object_id)
representation = AssetRepresentation.from_content( self.content_service().add_representation_from_bytes(
asset_id, asset_id,
payload.get("kind", RepresentationKind.SOURCE.value), payload.get("kind", RepresentationKind.SOURCE.value),
payload.get("media_type", "text/plain"), payload.get("media_type", "text/plain"),
payload.get("content", ""), payload.get("content", ""),
storage_ref=payload.get("storage_ref"),
)
self.asset_service().add_representation(
asset_id,
representation,
context, context,
expected_current_version_id=payload.get("expected_current_version_id"), expected_current_version_id=payload.get("expected_current_version_id"),
metadata={"cmis": {"operation": "setContentStream"}},
) )
return self.cmis_object(access_point_id, object_id, context) return self.cmis_object(access_point_id, object_id, context)
def representation_content_stream(
self,
asset_id: str,
representation_id: str,
context: OperationContext,
):
return self.content_service().stream_content(
asset_id,
context,
representation_id=representation_id,
)
def cmis_delete_object( def cmis_delete_object(
self, self,
access_point_id: str, access_point_id: str,
@@ -2031,7 +2068,7 @@ class ServiceRuntime:
def create_app(runtime: ServiceRuntime | None = None): def create_app(runtime: ServiceRuntime | None = None):
try: try:
from fastapi import Depends, FastAPI, Header, HTTPException, Query from fastapi import Depends, FastAPI, Header, HTTPException, Query
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse, StreamingResponse
except ImportError as exc: # pragma: no cover - exercised when optional extra is absent except ImportError as exc: # pragma: no cover - exercised when optional extra is absent
raise RuntimeError( raise RuntimeError(
"FastAPI service dependencies are not installed. Install kontextual-engine[service]." "FastAPI service dependencies are not installed. Install kontextual-engine[service]."
@@ -2202,6 +2239,25 @@ def create_app(runtime: ServiceRuntime | None = None):
) -> dict[str, Any]: ) -> dict[str, Any]:
return response(runtime.cmis_content_stream, access_point_id, object_id, context) return response(runtime.cmis_content_stream, access_point_id, object_id, context)
@app.get("/cmis/{access_point_id}/browser/content-bytes/{object_id:path}", tags=["cmis"])
def cmis_content_stream_bytes(
access_point_id: str,
object_id: str,
context: OperationContext = Depends(context_from_headers),
) -> StreamingResponse:
result = response(runtime.cmis_content_stream_bytes, access_point_id, object_id, context)
representation = result.representation
return StreamingResponse(
result.chunks,
media_type=representation.media_type,
headers={
"Content-Length": str(representation.size_bytes),
"ETag": representation.digest,
"X-Kontextual-Representation-Id": representation.representation_id,
"X-Kontextual-Storage-Ref": representation.storage_ref or "",
},
)
@app.get("/cmis/{access_point_id}/browser/acl/{object_id:path}", tags=["cmis"]) @app.get("/cmis/{access_point_id}/browser/acl/{object_id:path}", tags=["cmis"])
def cmis_acl( def cmis_acl(
access_point_id: str, access_point_id: str,
@@ -2323,6 +2379,25 @@ def create_app(runtime: ServiceRuntime | None = None):
def get_asset(asset_id: str) -> dict[str, Any]: def get_asset(asset_id: str) -> dict[str, Any]:
return response(runtime.get_asset, asset_id) return response(runtime.get_asset, asset_id)
@app.get(f"{prefix}/assets/{{asset_id}}/representations/{{representation_id}}/content", tags=["assets"])
def get_representation_content(
asset_id: str,
representation_id: str,
context: OperationContext = Depends(context_from_headers),
) -> StreamingResponse:
result = response(runtime.representation_content_stream, asset_id, representation_id, context)
representation = result.representation
return StreamingResponse(
result.chunks,
media_type=representation.media_type,
headers={
"Content-Length": str(representation.size_bytes),
"ETag": representation.digest,
"X-Kontextual-Representation-Id": representation.representation_id,
"X-Kontextual-Storage-Ref": representation.storage_ref or "",
},
)
@app.post(f"{prefix}/assets/{{asset_id}}/metadata", tags=["metadata"]) @app.post(f"{prefix}/assets/{{asset_id}}/metadata", tags=["metadata"])
def add_metadata( def add_metadata(
asset_id: str, asset_id: str,

View File

@@ -858,7 +858,9 @@ def _preferred_representation(
RepresentationKind.NORMALIZED: 1, RepresentationKind.NORMALIZED: 1,
RepresentationKind.DERIVED: 2, RepresentationKind.DERIVED: 2,
} }
return sorted(representations, key=lambda item: priority.get(item.kind, 99))[0] best_priority = min(priority.get(item.kind, 99) for item in representations)
candidates = [item for item in representations if priority.get(item.kind, 99) == best_priority]
return sorted(candidates, key=lambda item: (item.created_at, item.representation_id), reverse=True)[0]
def _normalize_path(path: str) -> str: def _normalize_path(path: str) -> str:

View File

@@ -1,5 +1,13 @@
"""Stable ports owned by the engine.""" """Stable ports owned by the engine."""
from .blob_storage import (
BlobCleanupResult,
BlobRef,
BlobStorage,
BlobWriteResult,
blob_digest,
digest_storage_key,
)
from .ingestion import DirectorySourceConnector, FormatExtractor, SourceConnector from .ingestion import DirectorySourceConnector, FormatExtractor, SourceConnector
from .policy import AllowAllPolicyGateway, PolicyGateway from .policy import AllowAllPolicyGateway, PolicyGateway
from .repositories import AssetRegistryRepository from .repositories import AssetRegistryRepository
@@ -7,6 +15,12 @@ from .repositories import AssetRegistryRepository
__all__ = [ __all__ = [
"AllowAllPolicyGateway", "AllowAllPolicyGateway",
"AssetRegistryRepository", "AssetRegistryRepository",
"BlobCleanupResult",
"BlobRef",
"BlobStorage",
"BlobWriteResult",
"blob_digest",
"digest_storage_key",
"DirectorySourceConnector", "DirectorySourceConnector",
"FormatExtractor", "FormatExtractor",
"PolicyGateway", "PolicyGateway",

View File

@@ -0,0 +1,95 @@
"""Blob storage port and content-addressed reference models."""
from __future__ import annotations
from dataclasses import dataclass
from collections.abc import Iterator
from typing import Protocol
from kontextual_engine.core import content_digest
from kontextual_engine.errors import ValidationError
@dataclass(frozen=True)
class BlobRef:
digest: str
size_bytes: int
storage_key: str
storage_ref: str
adapter: str
media_type: str | None = None
def to_dict(self) -> dict[str, object]:
data: dict[str, object] = {
"digest": self.digest,
"size_bytes": self.size_bytes,
"storage_key": self.storage_key,
"storage_ref": self.storage_ref,
"adapter": self.adapter,
}
if self.media_type:
data["media_type"] = self.media_type
return data
@dataclass(frozen=True)
class BlobWriteResult:
blob: BlobRef
created: bool
def to_dict(self) -> dict[str, object]:
return {"blob": self.blob.to_dict(), "created": self.created}
@dataclass(frozen=True)
class BlobCleanupResult:
dry_run: bool
deleted_count: int
retained_count: int
reclaimable_bytes: int
deleted_storage_refs: tuple[str, ...] = ()
def to_dict(self) -> dict[str, object]:
return {
"dry_run": self.dry_run,
"deleted_count": self.deleted_count,
"retained_count": self.retained_count,
"reclaimable_bytes": self.reclaimable_bytes,
"deleted_storage_refs": list(self.deleted_storage_refs),
}
class BlobStorage(Protocol):
adapter_name: str
def put_bytes(self, content: bytes, *, media_type: str | None = None) -> BlobWriteResult: ...
def read_bytes(self, storage_ref: str) -> bytes: ...
def iter_bytes(self, storage_ref: str, *, chunk_size: int = 65536) -> Iterator[bytes]: ...
def stat(self, storage_ref: str) -> BlobRef: ...
def exists(self, storage_ref_or_digest: str) -> bool: ...
def iter_blobs(self) -> list[BlobRef]: ...
def delete_unreferenced(
self,
referenced_storage_refs: set[str],
*,
dry_run: bool = True,
) -> BlobCleanupResult: ...
def blob_digest(content: bytes) -> str:
return content_digest(content)
def digest_storage_key(digest: str) -> str:
if not digest.startswith("sha256:"):
raise ValidationError("Unsupported blob digest", details={"digest": digest})
value = digest.removeprefix("sha256:")
if len(value) < 4:
raise ValidationError("Invalid blob digest", details={"digest": digest})
return f"sha256/{value[:2]}/{value[2:4]}/{value}"

View File

@@ -5,6 +5,7 @@ from .asset_service import (
AssetRegistryService, AssetRegistryService,
RelationshipChangeResult, RelationshipChangeResult,
) )
from .content_service import RepresentationContentResult, RepresentationContentStream, RepresentationContentService
from .ingestion_service import AssetIngestionResult, AssetIngestionService from .ingestion_service import AssetIngestionResult, AssetIngestionService
from .retrieval_service import ( from .retrieval_service import (
AssetQueryItem, AssetQueryItem,
@@ -53,6 +54,9 @@ __all__ = [
"ContextEntityQueryResult", "ContextEntityQueryResult",
"LexicalIndexRefreshResult", "LexicalIndexRefreshResult",
"RelationshipChangeResult", "RelationshipChangeResult",
"RepresentationContentResult",
"RepresentationContentStream",
"RepresentationContentService",
"RelationshipQueryItem", "RelationshipQueryItem",
"RelationshipQueryRequest", "RelationshipQueryRequest",
"RelationshipQueryResult", "RelationshipQueryResult",

View File

@@ -0,0 +1,383 @@
"""Governed representation byte storage and streaming service."""
from __future__ import annotations
import hashlib
from collections.abc import Iterable, Iterator
from dataclasses import dataclass
from typing import Any
from kontextual_engine.core import (
AssetRepresentation,
AuditEvent,
AuditOutcome,
OperationContext,
PolicyDecision,
RepresentationKind,
new_id,
)
from kontextual_engine.errors import AuthorizationError, NotFoundError, ValidationError
from kontextual_engine.ports import (
AllowAllPolicyGateway,
AssetRegistryRepository,
BlobCleanupResult,
BlobRef,
BlobStorage,
PolicyGateway,
)
from kontextual_engine.services.asset_service import AssetChangeResult, AssetRegistryService
@dataclass(frozen=True)
class RepresentationContentResult:
representation: AssetRepresentation
content: bytes
blob: BlobRef
policy_decision: PolicyDecision
audit_event: AuditEvent
def to_dict(self, *, include_content: bool = False) -> dict[str, Any]:
data = {
"representation": self.representation.to_dict(),
"content_stream": {
"representation_id": self.representation.representation_id,
"asset_id": self.representation.asset_id,
"media_type": self.representation.media_type,
"digest": self.representation.digest,
"size_bytes": self.representation.size_bytes,
"storage_ref": self.representation.storage_ref,
"blob": self.blob.to_dict(),
},
"policy_decision": self.policy_decision.to_dict(),
"audit_event": self.audit_event.to_dict(),
}
if include_content:
data["content"] = self.content
return data
@dataclass(frozen=True)
class RepresentationContentStream:
representation: AssetRepresentation
chunks: Iterable[bytes]
blob: BlobRef
policy_decision: PolicyDecision
audit_event: AuditEvent
class RepresentationContentService:
def __init__(
self,
repository: AssetRegistryRepository,
blob_storage: BlobStorage,
*,
policy_gateway: PolicyGateway | None = None,
asset_service: AssetRegistryService | None = None,
) -> None:
self.repository = repository
self.blob_storage = blob_storage
self.policy_gateway = policy_gateway or AllowAllPolicyGateway()
self.asset_service = asset_service or AssetRegistryService(
repository,
policy_gateway=self.policy_gateway,
)
def build_representation_from_bytes(
self,
asset_id: str,
kind: RepresentationKind | str,
media_type: str,
content: str | bytes,
*,
producer: str | None = None,
source_ref_id: str | None = None,
metadata: dict[str, Any] | None = None,
representation_id: str | None = None,
) -> tuple[AssetRepresentation, BlobRef, bool]:
data = content.encode("utf-8") if isinstance(content, str) else bytes(content)
write = self.blob_storage.put_bytes(data, media_type=media_type)
blob = write.blob
representation = AssetRepresentation(
asset_id=asset_id,
kind=RepresentationKind(kind),
media_type=media_type,
digest=blob.digest,
size_bytes=blob.size_bytes,
storage_ref=blob.storage_ref,
producer=producer,
source_ref_id=source_ref_id,
metadata={"blob_adapter": blob.adapter, **dict(metadata or {})},
representation_id=representation_id or new_id("repr"),
)
return representation, blob, write.created
def add_representation_from_bytes(
self,
asset_id: str,
kind: RepresentationKind | str,
media_type: str,
content: str | bytes,
context: OperationContext,
*,
expected_current_version_id: str | None = None,
producer: str | None = None,
source_ref_id: str | None = None,
metadata: dict[str, Any] | None = None,
) -> AssetChangeResult:
representation, _blob, _created = self.build_representation_from_bytes(
asset_id,
kind,
media_type,
content,
producer=producer,
source_ref_id=source_ref_id,
metadata=metadata,
)
return self.asset_service.add_representation(
asset_id,
representation,
context,
expected_current_version_id=expected_current_version_id,
)
def get_content_stream(
self,
asset_id: str,
context: OperationContext,
*,
representation_id: str | None = None,
kind: RepresentationKind | str | None = None,
) -> RepresentationContentResult:
asset = self.repository.get_asset(asset_id)
representation = self._representation(asset_id, representation_id=representation_id, kind=kind)
if not representation.storage_ref:
raise NotFoundError(
"Representation content is not available in blob storage",
details={"asset_id": asset_id, "representation_id": representation.representation_id},
)
decision = self._authorize(
context,
"asset.content_stream.read",
f"asset:{asset.id}",
resource_metadata={
"representation_id": representation.representation_id,
"digest": representation.digest,
"media_type": representation.media_type,
},
)
try:
blob = self.blob_storage.stat(representation.storage_ref)
content = self.blob_storage.read_bytes(representation.storage_ref)
except ValueError as exc:
raise NotFoundError(
"Representation content is not available in configured blob storage",
details={
"asset_id": asset_id,
"representation_id": representation.representation_id,
"storage_ref": representation.storage_ref,
},
) from exc
if blob.digest != representation.digest:
raise ValidationError(
"Representation digest does not match stored blob",
details={
"representation_id": representation.representation_id,
"representation_digest": representation.digest,
"blob_digest": blob.digest,
},
)
actual_digest = "sha256:" + hashlib.sha256(content).hexdigest()
if actual_digest != representation.digest:
raise ValidationError(
"Representation content does not match expected digest",
details={
"representation_id": representation.representation_id,
"representation_digest": representation.digest,
"actual_digest": actual_digest,
},
)
event = self._audit(
"asset.content_stream.read",
f"asset:{asset.id}",
AuditOutcome.SUCCESS,
context,
decision,
details={"representation_id": representation.representation_id, "digest": representation.digest},
)
return RepresentationContentResult(representation, content, blob, decision, event)
def stream_content(
self,
asset_id: str,
context: OperationContext,
*,
representation_id: str | None = None,
kind: RepresentationKind | str | None = None,
chunk_size: int = 65536,
) -> RepresentationContentStream:
asset = self.repository.get_asset(asset_id)
representation = self._representation(asset_id, representation_id=representation_id, kind=kind)
if not representation.storage_ref:
raise NotFoundError(
"Representation content is not available in blob storage",
details={"asset_id": asset_id, "representation_id": representation.representation_id},
)
decision = self._authorize(
context,
"asset.content_stream.read",
f"asset:{asset.id}",
resource_metadata={
"representation_id": representation.representation_id,
"digest": representation.digest,
"media_type": representation.media_type,
},
)
try:
blob = self.blob_storage.stat(representation.storage_ref)
except ValueError as exc:
raise NotFoundError(
"Representation content is not available in configured blob storage",
details={
"asset_id": asset_id,
"representation_id": representation.representation_id,
"storage_ref": representation.storage_ref,
},
) from exc
if blob.digest != representation.digest:
raise ValidationError(
"Representation digest does not match stored blob",
details={
"representation_id": representation.representation_id,
"representation_digest": representation.digest,
"blob_digest": blob.digest,
},
)
event = self._audit(
"asset.content_stream.read",
f"asset:{asset.id}",
AuditOutcome.SUCCESS,
context,
decision,
details={"representation_id": representation.representation_id, "digest": representation.digest},
)
return RepresentationContentStream(
representation,
self._verified_chunks(representation, chunk_size=chunk_size),
blob,
decision,
event,
)
def referenced_storage_refs(self) -> set[str]:
return {
representation.storage_ref
for representation in self.repository.list_representations()
if representation.storage_ref
}
def cleanup_unreferenced_blobs(self, *, dry_run: bool = True) -> BlobCleanupResult:
return self.blob_storage.delete_unreferenced(self.referenced_storage_refs(), dry_run=dry_run)
def _representation(
self,
asset_id: str,
*,
representation_id: str | None,
kind: RepresentationKind | str | None,
) -> AssetRepresentation:
if representation_id:
representation = self.repository.get_representation(representation_id)
if representation.asset_id != asset_id:
raise NotFoundError(
"Representation not found for asset",
details={"asset_id": asset_id, "representation_id": representation_id},
)
return representation
parsed_kind = RepresentationKind(kind) if kind else None
representations = self.repository.list_representations(asset_id=asset_id, kind=parsed_kind)
if not representations:
raise NotFoundError("Representation not found", details={"asset_id": asset_id, "kind": kind})
priority = {
RepresentationKind.SOURCE: 0,
RepresentationKind.NORMALIZED: 1,
RepresentationKind.DERIVED: 2,
}
best_priority = min(priority.get(item.kind, 99) for item in representations)
candidates = [item for item in representations if priority.get(item.kind, 99) == best_priority]
return sorted(candidates, key=lambda item: (item.created_at, item.representation_id), reverse=True)[0]
def _verified_chunks(
self,
representation: AssetRepresentation,
*,
chunk_size: int,
) -> Iterator[bytes]:
if not representation.storage_ref:
raise NotFoundError(
"Representation content is not available in blob storage",
details={"asset_id": representation.asset_id, "representation_id": representation.representation_id},
)
hasher = hashlib.sha256()
for chunk in self.blob_storage.iter_bytes(representation.storage_ref, chunk_size=chunk_size):
hasher.update(chunk)
yield chunk
actual_digest = "sha256:" + hasher.hexdigest()
if actual_digest != representation.digest:
raise ValidationError(
"Representation content does not match expected digest",
details={
"representation_id": representation.representation_id,
"representation_digest": representation.digest,
"actual_digest": actual_digest,
},
)
def _authorize(
self,
context: OperationContext,
action: str,
resource: str,
*,
resource_metadata: dict[str, str] | None = None,
) -> PolicyDecision:
self.repository.save_actor(context.actor)
decision = self.policy_gateway.authorize(
context,
action,
resource,
resource_metadata=resource_metadata,
)
if not decision.allowed:
event = self._audit(action, resource, AuditOutcome.DENIED, context, decision)
raise AuthorizationError(
"Operation denied by policy",
details={
"action": action,
"resource": resource,
"correlation_id": context.correlation_id,
"audit_event_id": event.event_id,
"policy_decision": decision.to_dict(),
},
)
return decision
def _audit(
self,
operation: str,
target: str,
outcome: AuditOutcome,
context: OperationContext,
policy_decision: PolicyDecision,
*,
details: dict[str, Any] | None = None,
) -> AuditEvent:
return self.repository.save_audit_event(
AuditEvent.from_context(
operation,
target,
outcome,
context,
policy_decision=policy_decision,
details=details,
)
)

View File

@@ -83,6 +83,7 @@ def test_cmis_browser_binding_routes_are_advertised_in_openapi(cmis_client) -> N
assert "/cmis/{access_point_id}/browser/children" in paths assert "/cmis/{access_point_id}/browser/children" in paths
assert "/cmis/{access_point_id}/browser/object/{object_id}" in paths assert "/cmis/{access_point_id}/browser/object/{object_id}" in paths
assert "/cmis/{access_point_id}/browser/content/{object_id}" in paths assert "/cmis/{access_point_id}/browser/content/{object_id}" in paths
assert "/cmis/{access_point_id}/browser/content-bytes/{object_id}" in paths
assert "/cmis/{access_point_id}/browser/acl/{object_id}" in paths assert "/cmis/{access_point_id}/browser/acl/{object_id}" in paths
assert "/cmis/{access_point_id}/browser/parents/{object_id}" in paths assert "/cmis/{access_point_id}/browser/parents/{object_id}" in paths
assert "/cmis/{access_point_id}/browser/query" in paths assert "/cmis/{access_point_id}/browser/query" in paths
@@ -184,6 +185,9 @@ def test_cmis_governed_authoring_routes_allow_selected_mutations(cmis_client) ->
"/cmis/governed-authoring/browser/object/cmis:asset:asset-api-authored/content", "/cmis/governed-authoring/browser/object/cmis:asset:asset-api-authored/content",
json={"content": "# Updated", "media_type": "text/markdown"}, json={"content": "# Updated", "media_type": "text/markdown"},
) )
byte_stream = cmis_client.get(
"/cmis/governed-authoring/browser/content-bytes/cmis:asset:asset-api-authored",
)
deleted = cmis_client.post( deleted = cmis_client.post(
"/cmis/governed-authoring/browser/object/cmis:asset:asset-api-authored/delete", "/cmis/governed-authoring/browser/object/cmis:asset:asset-api-authored/delete",
json={}, json={},
@@ -192,6 +196,8 @@ def test_cmis_governed_authoring_routes_allow_selected_mutations(cmis_client) ->
assert created.status_code == 200 assert created.status_code == 200
assert updated.json()["properties"]["kontextual:metadata:status"] == "draft" assert updated.json()["properties"]["kontextual:metadata:status"] == "draft"
assert streamed.json()["content_stream"]["mime_type"] == "text/markdown" assert streamed.json()["content_stream"]["mime_type"] == "text/markdown"
assert byte_stream.content == b"# Updated"
assert byte_stream.headers["etag"].startswith("sha256:")
assert deleted.json()["lifecycle"] == "delete_requested" assert deleted.json()["lifecycle"] == "delete_requested"

View File

@@ -164,6 +164,11 @@ def test_runtime_cmis_governed_authoring_allows_selected_mutations(cmis_runtime)
{"content": "# Authored\n\nUpdated stream.", "media_type": "text/markdown"}, {"content": "# Authored\n\nUpdated stream.", "media_type": "text/markdown"},
context, context,
) )
stream_bytes = runtime.cmis_content_stream_bytes(
"governed-authoring",
"cmis:asset:asset-authored",
context,
)
deleted = runtime.cmis_delete_object( deleted = runtime.cmis_delete_object(
"governed-authoring", "governed-authoring",
"cmis:asset:asset-authored", "cmis:asset:asset-authored",
@@ -174,6 +179,8 @@ def test_runtime_cmis_governed_authoring_allows_selected_mutations(cmis_runtime)
assert created["object_id"] == "cmis:asset:asset-authored" assert created["object_id"] == "cmis:asset:asset-authored"
assert updated["properties"]["kontextual:metadata:reviewer"] == "codex" assert updated["properties"]["kontextual:metadata:reviewer"] == "codex"
assert streamed["content_stream"]["mime_type"] == "text/markdown" assert streamed["content_stream"]["mime_type"] == "text/markdown"
assert b"".join(stream_bytes.chunks) == b"# Authored\n\nUpdated stream."
assert stream_bytes.representation.storage_ref.startswith("blob://memory/")
assert deleted["deleted"] is False assert deleted["deleted"] is False
assert deleted["lifecycle"] == "delete_requested" assert deleted["lifecycle"] == "delete_requested"

112
tests/test_blob_storage.py Normal file
View File

@@ -0,0 +1,112 @@
from __future__ import annotations
from io import BytesIO
from kontextual_engine import InMemoryBlobStorage, LocalBlobStorage, S3BlobStorage, content_digest
def test_memory_blob_storage_deduplicates_by_digest() -> None:
storage = InMemoryBlobStorage()
first = storage.put_bytes(b"same content", media_type="text/plain")
second = storage.put_bytes(b"same content", media_type="text/plain")
assert first.created is True
assert second.created is False
assert first.blob.storage_ref == second.blob.storage_ref
assert storage.read_bytes(first.blob.storage_ref) == b"same content"
assert b"".join(storage.iter_bytes(first.blob.storage_ref, chunk_size=4)) == b"same content"
assert storage.exists(first.blob.digest) is True
assert len(storage.iter_blobs()) == 1
def test_local_blob_storage_stores_one_file_for_duplicate_content(tmp_path) -> None:
storage = LocalBlobStorage(tmp_path / "blobs")
first = storage.put_bytes(b"local content", media_type="text/plain")
second = storage.put_bytes(b"local content", media_type="text/plain")
assert first.created is True
assert second.created is False
assert first.blob.storage_ref == second.blob.storage_ref
assert storage.read_bytes(first.blob.storage_ref) == b"local content"
assert b"".join(storage.iter_bytes(first.blob.storage_ref, chunk_size=5)) == b"local content"
assert len(storage.iter_blobs()) == 1
assert len([path for path in (tmp_path / "blobs").rglob("*") if path.is_file()]) == 1
def test_blob_cleanup_dry_run_and_delete(tmp_path) -> None:
storage = LocalBlobStorage(tmp_path / "blobs")
kept = storage.put_bytes(b"kept").blob
orphan = storage.put_bytes(b"orphan").blob
dry_run = storage.delete_unreferenced({kept.storage_ref}, dry_run=True)
deleted = storage.delete_unreferenced({kept.storage_ref}, dry_run=False)
assert dry_run.deleted_count == 1
assert dry_run.reclaimable_bytes == len(b"orphan")
assert orphan.storage_ref in dry_run.deleted_storage_refs
assert deleted.deleted_storage_refs == (orphan.storage_ref,)
assert storage.exists(kept.storage_ref) is True
assert storage.exists(orphan.storage_ref) is False
def test_s3_blob_storage_uses_content_addressed_keys_with_fake_client() -> None:
client = FakeS3Client()
storage = S3BlobStorage(bucket="test-bucket", prefix="kontextual", client=client)
first = storage.put_bytes(b"s3 content", media_type="text/plain")
second = storage.put_bytes(b"s3 content", media_type="text/plain")
readback = storage.read_bytes(first.blob.storage_ref)
listed = storage.iter_blobs()
assert first.created is True
assert second.created is False
assert first.blob.storage_ref == second.blob.storage_ref
assert first.blob.digest == content_digest(b"s3 content")
assert first.blob.storage_key.startswith("kontextual/sha256/")
assert readback == b"s3 content"
assert b"".join(storage.iter_bytes(first.blob.storage_ref, chunk_size=2)) == b"s3 content"
assert [item.storage_ref for item in listed] == [first.blob.storage_ref]
assert client.put_count == 1
class FakeS3Client:
def __init__(self) -> None:
self.objects: dict[tuple[str, str], dict] = {}
self.put_count = 0
def head_object(self, *, Bucket: str, Key: str) -> dict:
try:
item = self.objects[(Bucket, Key)]
except KeyError as exc:
raise FakeS3NotFound() from exc
return {
"ContentLength": len(item["Body"]),
"ContentType": item.get("ContentType"),
"Metadata": item.get("Metadata", {}),
}
def put_object(self, **kwargs) -> None:
self.put_count += 1
self.objects[(kwargs["Bucket"], kwargs["Key"])] = kwargs
def get_object(self, *, Bucket: str, Key: str) -> dict:
try:
item = self.objects[(Bucket, Key)]
except KeyError as exc:
raise FakeS3NotFound() from exc
return {"Body": BytesIO(item["Body"])}
def list_objects_v2(self, *, Bucket: str, Prefix: str, ContinuationToken: str | None = None) -> dict:
contents = [
{"Key": key, "Size": len(item["Body"])}
for (bucket, key), item in sorted(self.objects.items())
if bucket == Bucket and key.startswith(Prefix)
]
return {"Contents": contents, "IsTruncated": False}
def delete_object(self, *, Bucket: str, Key: str) -> None:
self.objects.pop((Bucket, Key), None)
class FakeS3NotFound(Exception):
response = {"Error": {"Code": "NoSuchKey"}, "ResponseMetadata": {"HTTPStatusCode": 404}}

View File

@@ -0,0 +1,216 @@
from __future__ import annotations
import pytest
from kontextual_engine import (
Actor,
ActorType,
AssetRegistryService,
AuthorizationError,
Classification,
InMemoryAssetRegistryRepository,
InMemoryBlobStorage,
LocalBlobStorage,
OperationContext,
PolicyDecision,
RepresentationContentService,
RepresentationKind,
Sensitivity,
ValidationError,
)
def test_content_service_adds_representation_bytes_with_deduplicated_blob() -> None:
repo = InMemoryAssetRegistryRepository()
blobs = InMemoryBlobStorage()
context = operation_context()
AssetRegistryService(repo).create_asset(
"Content Asset",
Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL),
context,
asset_id="asset-content",
)
service = RepresentationContentService(repo, blobs)
first = service.add_representation_from_bytes(
"asset-content",
RepresentationKind.SOURCE,
"text/plain",
b"same bytes",
context,
)
second = service.add_representation_from_bytes(
"asset-content",
RepresentationKind.DERIVED,
"text/plain",
b"same bytes",
context,
)
representations = repo.list_representations(asset_id="asset-content")
assert len(representations) == 2
assert representations[0].storage_ref == representations[1].storage_ref
assert first.version.sequence == 2
assert second.version.sequence == 3
assert len(blobs.iter_blobs()) == 1
def test_content_service_reads_bytes_with_policy_and_audit() -> None:
repo = InMemoryAssetRegistryRepository()
blobs = InMemoryBlobStorage()
context = operation_context()
AssetRegistryService(repo).create_asset(
"Readable",
Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL),
context,
asset_id="asset-readable",
)
service = RepresentationContentService(repo, blobs)
service.add_representation_from_bytes(
"asset-readable",
RepresentationKind.SOURCE,
"text/plain",
b"read me",
context,
)
content = service.get_content_stream("asset-readable", context)
streamed = service.stream_content("asset-readable", context, chunk_size=3)
assert content.content == b"read me"
assert b"".join(streamed.chunks) == b"read me"
assert content.representation.media_type == "text/plain"
assert content.blob.storage_ref == content.representation.storage_ref
assert content.audit_event.operation == "asset.content_stream.read"
assert repo.list_audit_events(target="asset:asset-readable")[-1].operation == "asset.content_stream.read"
def test_content_service_reads_source_normalized_and_derived_by_kind() -> None:
repo = InMemoryAssetRegistryRepository()
blobs = InMemoryBlobStorage()
context = operation_context()
AssetRegistryService(repo).create_asset(
"Kinds",
Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL),
context,
asset_id="asset-kinds",
)
service = RepresentationContentService(repo, blobs)
service.add_representation_from_bytes("asset-kinds", RepresentationKind.SOURCE, "text/plain", b"source", context)
service.add_representation_from_bytes(
"asset-kinds",
RepresentationKind.NORMALIZED,
"text/plain",
b"normalized",
context,
)
service.add_representation_from_bytes("asset-kinds", RepresentationKind.DERIVED, "text/plain", b"derived", context)
assert service.get_content_stream("asset-kinds", context, kind=RepresentationKind.SOURCE).content == b"source"
assert service.get_content_stream("asset-kinds", context, kind=RepresentationKind.NORMALIZED).content == b"normalized"
assert service.get_content_stream("asset-kinds", context, kind=RepresentationKind.DERIVED).content == b"derived"
def test_content_service_denies_bytes_before_exposure() -> None:
repo = InMemoryAssetRegistryRepository()
blobs = InMemoryBlobStorage()
context = operation_context()
AssetRegistryService(repo).create_asset(
"Denied",
Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL),
context,
asset_id="asset-denied-stream",
)
writer = RepresentationContentService(repo, blobs)
writer.add_representation_from_bytes(
"asset-denied-stream",
RepresentationKind.SOURCE,
"text/plain",
b"secret",
context,
)
reader = RepresentationContentService(repo, blobs, policy_gateway=DenyContentPolicy())
with pytest.raises(AuthorizationError):
reader.get_content_stream("asset-denied-stream", context)
def test_content_service_cleanup_uses_repository_references() -> None:
repo = InMemoryAssetRegistryRepository()
blobs = InMemoryBlobStorage()
context = operation_context()
AssetRegistryService(repo).create_asset(
"Cleanup",
Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL),
context,
asset_id="asset-cleanup",
)
service = RepresentationContentService(repo, blobs)
service.add_representation_from_bytes(
"asset-cleanup",
RepresentationKind.SOURCE,
"text/plain",
b"kept",
context,
)
orphan = blobs.put_bytes(b"orphan").blob
cleanup = service.cleanup_unreferenced_blobs(dry_run=True)
assert cleanup.deleted_count == 1
assert cleanup.deleted_storage_refs == (orphan.storage_ref,)
assert cleanup.reclaimable_bytes == len(b"orphan")
def test_content_service_detects_corrupted_stored_content(tmp_path) -> None:
repo = InMemoryAssetRegistryRepository()
blobs = LocalBlobStorage(tmp_path / "blobs")
context = operation_context()
AssetRegistryService(repo).create_asset(
"Corrupt",
Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL),
context,
asset_id="asset-corrupt",
)
service = RepresentationContentService(repo, blobs)
service.add_representation_from_bytes(
"asset-corrupt",
RepresentationKind.SOURCE,
"text/plain",
b"expected",
context,
)
representation = repo.list_representations(asset_id="asset-corrupt")[0]
path = blobs.root / representation.storage_ref.removeprefix("blob://local/")
path.write_bytes(b"corrupted")
with pytest.raises(ValidationError):
service.get_content_stream("asset-corrupt", context)
with pytest.raises(ValidationError):
b"".join(service.stream_content("asset-corrupt", context).chunks)
def operation_context() -> OperationContext:
return OperationContext.create(
Actor.create(ActorType.HUMAN, actor_id="content-test"),
correlation_id="corr-content",
)
class DenyContentPolicy:
def authorize(
self,
context: OperationContext,
action: str,
resource: str,
*,
resource_metadata: dict[str, str] | None = None,
) -> PolicyDecision:
if action == "asset.content_stream.read":
return PolicyDecision.deny(
context.actor.id,
action,
resource,
reason="content reads disabled",
)
return PolicyDecision.allow(context.actor.id, action, resource)

View File

@@ -658,7 +658,9 @@ def test_service_health_readiness_version_and_openapi_contracts(client) -> None:
assert "/cmis/{access_point_id}/browser" in paths assert "/cmis/{access_point_id}/browser" in paths
assert "/cmis/{access_point_id}/browser/children" in paths assert "/cmis/{access_point_id}/browser/children" in paths
assert "/cmis/{access_point_id}/browser/acl/{object_id}" in paths assert "/cmis/{access_point_id}/browser/acl/{object_id}" in paths
assert "/cmis/{access_point_id}/browser/content-bytes/{object_id}" in paths
assert "/cmis/{access_point_id}/browser/parents/{object_id}" in paths assert "/cmis/{access_point_id}/browser/parents/{object_id}" in paths
assert "/api/v1/assets/{asset_id}/representations/{representation_id}/content" in paths
assert "/cmis/{access_point_id}/browser/document" in paths assert "/cmis/{access_point_id}/browser/document" in paths
assert "/cmis/{access_point_id}/browser/object/{object_id}/properties" in paths assert "/cmis/{access_point_id}/browser/object/{object_id}/properties" in paths
assert "/api/v1/assets" in paths assert "/api/v1/assets" in paths

View File

@@ -4,7 +4,7 @@ type: workplan
title: "Blob Storage Deduplication And Content Streaming" title: "Blob Storage Deduplication And Content Streaming"
domain: markitect domain: markitect
repo: kontextual-engine repo: kontextual-engine
status: active status: completed
owner: codex owner: codex
topic_slug: markitect topic_slug: markitect
planning_priority: high planning_priority: high
@@ -37,6 +37,10 @@ This workplan adds content-addressed blob infrastructure and stream interfaces.
It does not introduce AtomPub, SOAP/Web Services, chunk-level deduplication, or It does not introduce AtomPub, SOAP/Web Services, chunk-level deduplication, or
a general document-management storage model. a general document-management storage model.
It includes an optional S3 backend as an infrastructure adapter behind the same
blob storage port. S3 object keys are digest-derived, so object storage can be
used without changing engine semantics or CMIS profile governance.
## Architecture Constraint ## Architecture Constraint
Blob bytes are infrastructure state. Engine semantics remain attached to Blob bytes are infrastructure state. Engine semantics remain attached to
@@ -48,7 +52,7 @@ engine-native content services instead of bypassing governance.
```task ```task
id: KONT-WP-0013-T001 id: KONT-WP-0013-T001
status: todo status: done
priority: high priority: high
state_hub_task_id: "6bb5b49a-cf9f-47ce-86d3-24b47a20a2c6" state_hub_task_id: "6bb5b49a-cf9f-47ce-86d3-24b47a20a2c6"
``` ```
@@ -66,7 +70,7 @@ Acceptance:
```task ```task
id: KONT-WP-0013-T002 id: KONT-WP-0013-T002
status: todo status: done
priority: high priority: high
state_hub_task_id: "661386c7-8094-4f0f-928c-c17f5b3a9132" state_hub_task_id: "661386c7-8094-4f0f-928c-c17f5b3a9132"
``` ```
@@ -82,7 +86,7 @@ Acceptance:
```task ```task
id: KONT-WP-0013-T003 id: KONT-WP-0013-T003
status: todo status: done
priority: high priority: high
state_hub_task_id: "00bc34c5-0f79-47b6-b305-f47311edd3a7" state_hub_task_id: "00bc34c5-0f79-47b6-b305-f47311edd3a7"
``` ```
@@ -98,7 +102,7 @@ Acceptance:
```task ```task
id: KONT-WP-0013-T004 id: KONT-WP-0013-T004
status: todo status: done
priority: medium priority: medium
state_hub_task_id: "cc4445d9-f773-4337-afd4-aeccc743dc1e" state_hub_task_id: "cc4445d9-f773-4337-afd4-aeccc743dc1e"
``` ```
@@ -113,7 +117,7 @@ Acceptance:
```task ```task
id: KONT-WP-0013-T005 id: KONT-WP-0013-T005
status: todo status: done
priority: high priority: high
state_hub_task_id: "db0e8a2d-50ce-439c-8393-d65e2fc4bc9e" state_hub_task_id: "db0e8a2d-50ce-439c-8393-d65e2fc4bc9e"
``` ```
@@ -129,7 +133,7 @@ Acceptance:
```task ```task
id: KONT-WP-0013-T006 id: KONT-WP-0013-T006
status: todo status: done
priority: high priority: high
state_hub_task_id: "2f1da1fb-9634-4ba6-931a-3e29394efd37" state_hub_task_id: "2f1da1fb-9634-4ba6-931a-3e29394efd37"
``` ```
@@ -146,7 +150,7 @@ Acceptance:
```task ```task
id: KONT-WP-0013-T007 id: KONT-WP-0013-T007
status: todo status: done
priority: medium priority: medium
state_hub_task_id: "987ad4f6-8658-4e93-82c2-b9fa0a3a2270" state_hub_task_id: "987ad4f6-8658-4e93-82c2-b9fa0a3a2270"
``` ```
@@ -168,3 +172,15 @@ Acceptance:
- Existing tests continue to pass. - Existing tests continue to pass.
- Focused dedupe/content-stream tests cover duplicate content, readback, - Focused dedupe/content-stream tests cover duplicate content, readback,
policy denial, cleanup dry-run, and CMIS integration. policy denial, cleanup dry-run, and CMIS integration.
## Completion Notes
- Implemented `BlobStorage` port with `put_bytes`, `read_bytes`, `iter_bytes`,
`stat`, `exists`, and `delete_unreferenced`.
- Added in-memory, local filesystem, and optional S3 content-addressed adapters.
- Added governed representation content service for byte-backed
representations, chunked streams, policy checks, audit events, and cleanup.
- Wired CMIS `setContentStream` and byte stream routes through the content
service; repeated content updates now expose the latest source representation.
- Added tests for dedupe, local/S3 adapter behavior, content-kind reads, policy
denial, cleanup dry-run, and CMIS stream integration.