From 847b1464753dcfe6d0b75351715a1fa0c5a2c3b4 Mon Sep 17 00:00:00 2001 From: tegwick Date: Sat, 16 May 2026 08:17:40 +0200 Subject: [PATCH] WP-0001-T013: registry orchestrator (library surface) src/artifactstore/registry/__init__.py implements the Registry class with six operations the HTTP API and CLI both consume: * create_package(name, producer, subject, retention_class, actor, metadata?) -> UUID. Validates retention_class against the seed table; emits v1.package.created with CBOR payload; applies view in same transaction. * ingest_file(package_id, relative_path, media_type, stream, actor) -> UUID. Validates the package is in 'created' status and rejects duplicate relative_path. Calls dataplane.ingest_stream (which dual-hashes and writes to the backend). Emits v1.file.ingested whose payload carries the file metadata + storage receipt + deterministic storage_location_id so replay reproduces UUIDs. View handler in events/views.py inserts artifact_files + storage_locations and bumps last_event_sequence on the package. * finalize_package(package_id, actor) -> ContentAddress. Queries the views to build a Manifest dataclass, encodes it as canonical CBOR, computes the BLAKE3 content address, and writes v1.package.finalized whose payload IS the canonical CBOR manifest. The view handler now records manifest_digest = event.payload_digest (BLAKE3 of the manifest), not a separate field parsed from the payload. * get_manifest_bytes(package_id, format='cbor'|'json') -> bytes. Reads the finalize event payload (CBOR) and optionally projects to JCS. * get_file(file_id) -> AsyncIterator[bytes]. Looks up the storage location and serves bytes via the data plane. * tail_events(since_sequence, poll_interval_seconds) -> AsyncIterator[Event]. Pass-through to events.tail. src/artifactstore/events/views.py: - New v1.file.ingested handler. - v1.package.finalized handler updated: manifest_digest now derived from event.payload_digest (= BLAKE3 of the canonical CBOR manifest payload). - All inserts now pass created_at=event.created_at explicitly so replay produces byte-identical materialised state (server_default=now() was firing fresh on each replay insert). tests/integration/test_registry.py (7 cases): - Rejects unknown retention class. - create_package writes the event and the package row. - ingest_file writes file + storage_location, populates content_address with blake3 prefix. - Duplicate relative_path raises DuplicateRelativePathError. - ingest into unknown package raises PackageNotFoundError. - Finalising twice raises IllegalPackageStateError. - End-to-end: create + ingest 3 files + finalize + read manifest in CBOR and JSON + download each file with byte equality + tail 5 events + replay + assert byte-identical materialised state across pre and post snapshots. tests/integration/test_event_log.py updated: the v1.package.finalized replay test now uses the new payload semantics (payload is the canonical CBOR manifest; manifest_digest = BLAKE3 of payload). Gates: ruff clean, mypy --strict clean on 45 files, 77 tests pass. Co-Authored-By: Claude Opus 4.7 --- src/artifactstore/events/views.py | 68 ++- src/artifactstore/registry/__init__.py | 411 +++++++++++++++++- tests/integration/test_event_log.py | 14 +- tests/integration/test_registry.py | 291 +++++++++++++ ...ARTIFACT-STORE-WP-0001-service-baseline.md | 2 +- 5 files changed, 768 insertions(+), 18 deletions(-) create mode 100644 tests/integration/test_registry.py diff --git a/src/artifactstore/events/views.py b/src/artifactstore/events/views.py index c5bb343..31ca335 100644 --- a/src/artifactstore/events/views.py +++ b/src/artifactstore/events/views.py @@ -5,17 +5,22 @@ serves both the direct write path (registry calls :func:`log.write` followed by :meth:`RegistryViewWriter.apply` in one transaction) and the replay path (:func:`log.replay` walks every event in order and applies it). -T011 ships handlers for the events emitted at registry baseline: +Handlers shipped: -* ``v1.package.created`` — inserts ``artifact_packages`` + ``retention_state``; -* ``v1.package.finalized`` — updates ``artifact_packages`` status and digest. +* ``v1.package.created`` — inserts ``artifact_packages`` + ``retention_state``. +* ``v1.file.ingested`` — inserts ``artifact_files`` + ``storage_locations``. +* ``v1.package.finalized`` — updates ``artifact_packages`` status, sets + ``finalized_at`` and ``manifest_digest`` (= BLAKE3 of the event payload, + which **is** the canonical CBOR manifest). -Additional handlers (file ingestion, storage receipts, retention extensions) -land in later workplans. +Additional handlers (retention extensions, holds, deletions) land in later +workplans without changing this module's public surface. """ from __future__ import annotations +from uuid import UUID + import cbor2 from sqlalchemy import delete, insert, update from sqlalchemy.ext.asyncio import AsyncConnection @@ -52,9 +57,9 @@ class RegistryViewWriter: async def _apply_package_created(connection: AsyncConnection, event: Event) -> None: - payload = cbor2.loads(event.payload) if event.subject_id is None: raise ValueError("v1.package.created event must have subject_id") + payload = cbor2.loads(event.payload) await connection.execute( insert(artifact_packages).values( id=event.subject_id, @@ -66,6 +71,9 @@ async def _apply_package_created(connection: AsyncConnection, event: Event) -> N metadata=payload.get("metadata", {}), status="created", manifest_digest=None, + # Bind created_at to the event's timestamp so replay reproduces + # byte-identical state instead of taking a fresh server-side now(). + created_at=event.created_at, last_event_sequence=event.sequence, ) ) @@ -79,17 +87,60 @@ async def _apply_package_created(connection: AsyncConnection, event: Event) -> N ) -async def _apply_package_finalized(connection: AsyncConnection, event: Event) -> None: +async def _apply_file_ingested(connection: AsyncConnection, event: Event) -> None: + if event.subject_id is None: + raise ValueError("v1.file.ingested event must have subject_id (the package id)") payload = cbor2.loads(event.payload) + file_id = UUID(payload["file_id"]) + package_id = event.subject_id + storage_location_id = UUID(payload["storage_location_id"]) + + await connection.execute( + insert(artifact_files).values( + id=file_id, + package_id=package_id, + relative_path=payload["relative_path"], + media_type=payload["media_type"], + size_bytes=payload["size_bytes"], + digest_algorithm=payload["digest_algorithm"], + digest_primary=bytes.fromhex(payload["digest_primary_hex"]), + digest_sha256=bytes.fromhex(payload["digest_sha256_hex"]), + created_at=event.created_at, + ) + ) + await connection.execute( + insert(storage_locations).values( + id=storage_location_id, + artifact_file_id=file_id, + backend_id=payload["backend_id"], + content_address=f"{payload['digest_algorithm']}:{payload['digest_primary_hex']}", + object_key=payload["object_key"], + storage_class=payload.get("storage_class"), + retrieval_tier=payload.get("retrieval_tier", "hot"), + restore_status=None, + status=payload.get("storage_status", "recorded"), + created_at=event.created_at, + ) + ) + await connection.execute( + update(artifact_packages) + .where(artifact_packages.c.id == package_id) + .values(last_event_sequence=event.sequence) + ) + + +async def _apply_package_finalized(connection: AsyncConnection, event: Event) -> None: if event.subject_id is None: raise ValueError("v1.package.finalized event must have subject_id") + # The event payload **is** the canonical CBOR manifest; payload_digest + # is BLAKE3 of that payload — i.e. the manifest's content address digest. await connection.execute( update(artifact_packages) .where(artifact_packages.c.id == event.subject_id) .values( status="finalized", finalized_at=event.created_at, - manifest_digest=bytes.fromhex(payload["manifest_digest_hex"]), + manifest_digest=event.payload_digest, last_event_sequence=event.sequence, ) ) @@ -97,5 +148,6 @@ async def _apply_package_finalized(connection: AsyncConnection, event: Event) -> _HANDLERS = { "v1.package.created": _apply_package_created, + "v1.file.ingested": _apply_file_ingested, "v1.package.finalized": _apply_package_finalized, } diff --git a/src/artifactstore/registry/__init__.py b/src/artifactstore/registry/__init__.py index adaf4e0..73ca23c 100644 --- a/src/artifactstore/registry/__init__.py +++ b/src/artifactstore/registry/__init__.py @@ -1,6 +1,409 @@ -"""Registry orchestrator. +"""Registry orchestrator (ADR-0002 + ADR-0003 + ADR-0004). -Real implementation lands in ARTIFACT-STORE-WP-0001-T013. The orchestrator -combines identity, manifest, events, retention, and dataplane into the -operations exposed by the HTTP API and CLI. +The :class:`Registry` is the library-shaped entry point. It combines +``identity`` + ``manifest`` + ``events`` + ``dataplane`` into the six +operations the HTTP API and CLI both consume: + +* ``create_package`` +* ``ingest_file`` +* ``finalize_package`` +* ``get_manifest_bytes`` (canonical CBOR or JCS projection) +* ``get_file`` +* ``tail_events`` + +Every mutating operation writes one event and updates the materialised +views inside the same database transaction, so the event log remains the +source of truth (ADR-0002). """ + +from __future__ import annotations + +import uuid +from collections.abc import AsyncIterator, Sequence +from datetime import UTC, datetime +from typing import Any +from uuid import UUID + +import cbor2 +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncEngine + +from artifactstore.dataplane.spi import DataPlane +from artifactstore.db.schema import ( + artifact_files, + artifact_packages, + retention_classes, + retention_state, + storage_locations, +) +from artifactstore.db.schema import ( + events as events_t, +) +from artifactstore.events import RegistryViewWriter, make_event, tail, write +from artifactstore.events.model import Event +from artifactstore.identity import ContentAddress +from artifactstore.manifest import ( + MANIFEST_VERSION, + FileEntry, + Manifest, + Package, + Provenance, + RetentionSummary, +) +from artifactstore.manifest import ( + StorageReceipt as ManifestStorageReceipt, +) +from artifactstore.manifest import ( + encode as manifest_encode, +) +from artifactstore.manifest import ( + manifest_digest as compute_manifest_digest, +) +from artifactstore.manifest.codec import decode as manifest_decode +from artifactstore.manifest.projection import jcs_projection + +__all__ = ["Registry"] + + +class PackageNotFoundError(KeyError): + """Raised when a package id has no row in artifact_packages.""" + + +class FileNotFoundError(KeyError): + """Raised when a file id has no row in artifact_files / storage_locations.""" + + +class IllegalPackageStateError(ValueError): + """Raised when an operation is rejected because of package status.""" + + +class DuplicateRelativePathError(ValueError): + """Raised when ingest_file would create a duplicate ``relative_path`` in a package.""" + + +class Registry: + """Library-shaped orchestrator over events, dataplane, and views.""" + + def __init__( + self, + engine: AsyncEngine, + dataplane: DataPlane, + view_writer: RegistryViewWriter | None = None, + ) -> None: + self._engine = engine + self._dataplane = dataplane + self._view_writer = view_writer or RegistryViewWriter() + + # ---- mutating operations ------------------------------------------------ + + async def create_package( + self, + *, + name: str, + producer: str, + subject: str, + retention_class: str, + actor: str, + metadata: dict[str, Any] | None = None, + ) -> UUID: + """Create a new package; returns its ``UUID``.""" + await self._validate_retention_class(retention_class) + package_id = uuid.uuid4() + payload = cbor2.dumps( + { + "name": name, + "producer": producer, + "subject": subject, + "retention_class": retention_class, + "metadata": metadata or {}, + }, + canonical=True, + ) + event = make_event( + event_type="v1.package.created", + subject_kind="package", + subject_id=package_id, + actor=actor, + payload=payload, + ) + async with self._engine.begin() as conn: + written = await write(conn, event) + await self._view_writer.apply(conn, written) + return package_id + + async def ingest_file( + self, + package_id: UUID, + *, + relative_path: str, + media_type: str, + stream: AsyncIterator[bytes], + actor: str, + ) -> UUID: + """Stream a file into the package; returns the new file id.""" + async with self._engine.connect() as conn: + pkg_row = ( + await conn.execute( + select(artifact_packages).where(artifact_packages.c.id == package_id) + ) + ).first() + if pkg_row is None: + raise PackageNotFoundError(f"package not found: {package_id}") + if pkg_row.status != "created": + raise IllegalPackageStateError( + f"package {package_id} cannot accept files (status={pkg_row.status})" + ) + existing = ( + await conn.execute( + select(artifact_files).where( + artifact_files.c.package_id == package_id, + artifact_files.c.relative_path == relative_path, + ) + ) + ).first() + if existing is not None: + raise DuplicateRelativePathError( + f"relative_path {relative_path!r} already exists in package {package_id}" + ) + + ingest = await self._dataplane.ingest_stream(stream) + + file_id = uuid.uuid4() + storage_location_id = uuid.uuid4() + payload = cbor2.dumps( + { + "file_id": str(file_id), + "relative_path": relative_path, + "media_type": media_type, + "size_bytes": ingest.size_bytes, + "digest_algorithm": ingest.primary_digest.algorithm, + "digest_primary_hex": ingest.primary_digest.hex, + "digest_sha256_hex": ingest.sha256_digest.hex, + "backend_id": ingest.receipt.backend_id, + "object_key": ingest.receipt.object_key, + "storage_location_id": str(storage_location_id), + "retrieval_tier": "hot", + "storage_status": "recorded", + }, + canonical=True, + ) + event = make_event( + event_type="v1.file.ingested", + subject_kind="package", + subject_id=package_id, + actor=actor, + payload=payload, + ) + async with self._engine.begin() as conn: + written = await write(conn, event) + await self._view_writer.apply(conn, written) + return file_id + + async def finalize_package(self, package_id: UUID, *, actor: str) -> ContentAddress: + """Finalise a package: build manifest, write it as the + v1.package.finalized event payload, and update the package's + terminal state. Returns the manifest's content address.""" + async with self._engine.connect() as conn: + pkg_row = ( + await conn.execute( + select(artifact_packages).where(artifact_packages.c.id == package_id) + ) + ).first() + if pkg_row is None: + raise PackageNotFoundError(f"package not found: {package_id}") + if pkg_row.status != "created": + raise IllegalPackageStateError( + f"package {package_id} cannot be finalized (status={pkg_row.status})" + ) + file_rows = ( + await conn.execute( + select(artifact_files) + .where(artifact_files.c.package_id == package_id) + .order_by(artifact_files.c.relative_path) + ) + ).all() + file_ids = [r.id for r in file_rows] + location_rows = ( + ( + await conn.execute( + select(storage_locations) + .where(storage_locations.c.artifact_file_id.in_(file_ids)) + .order_by(storage_locations.c.artifact_file_id) + ) + ).all() + if file_ids + else [] + ) + retention_row = ( + await conn.execute( + select(retention_state).where(retention_state.c.package_id == package_id) + ) + ).first() + + manifest = _build_manifest( + pkg_row=pkg_row, + file_rows=file_rows, + location_rows=location_rows, + retention_row=retention_row, + actor=actor, + ) + manifest_bytes = manifest_encode(manifest) + manifest_addr = compute_manifest_digest(manifest) + + event = make_event( + event_type="v1.package.finalized", + subject_kind="package", + subject_id=package_id, + actor=actor, + payload=manifest_bytes, + ) + async with self._engine.begin() as conn: + written = await write(conn, event) + await self._view_writer.apply(conn, written) + return manifest_addr + + # ---- read-only operations ---------------------------------------------- + + async def get_manifest_bytes(self, package_id: UUID, *, format: str = "cbor") -> bytes: + """Return the finalised manifest. ``format`` is ``cbor`` (canonical + CBOR, the wire form) or ``json`` (the JCS projection).""" + async with self._engine.connect() as conn: + row = ( + await conn.execute( + select(events_t.c.payload) + .where( + events_t.c.event_type == "v1.package.finalized", + events_t.c.subject_id == package_id, + ) + .order_by(events_t.c.sequence.desc()) + .limit(1) + ) + ).first() + if row is None: + raise PackageNotFoundError(f"no finalized manifest for package {package_id}") + payload_bytes: bytes = row.payload + if format == "cbor": + return payload_bytes + if format == "json": + return jcs_projection(manifest_decode(payload_bytes)) + raise ValueError(f"unknown manifest format: {format!r} (expected 'cbor' or 'json')") + + async def get_file(self, file_id: UUID) -> AsyncIterator[bytes]: + """Return an async byte iterator for the bytes of a stored file.""" + async with self._engine.connect() as conn: + row = ( + await conn.execute( + select(storage_locations) + .where(storage_locations.c.artifact_file_id == file_id) + .limit(1) + ) + ).first() + if row is None: + raise FileNotFoundError(f"file not found: {file_id}") + ca = ContentAddress(row.content_address) + return await self._dataplane.serve_object(ca) + + def tail_events( + self, + *, + since_sequence: int = 0, + poll_interval_seconds: float = 0.5, + ) -> AsyncIterator[Event]: + """Long-poll the event log; pass-through to :func:`events.tail`.""" + return tail( + self._engine, + since_sequence=since_sequence, + poll_interval_seconds=poll_interval_seconds, + ) + + # ---- internals ---------------------------------------------------------- + + async def _validate_retention_class(self, retention_class: str) -> None: + async with self._engine.connect() as conn: + row = ( + await conn.execute( + select(retention_classes.c.class_id).where( + retention_classes.c.class_id == retention_class + ) + ) + ).first() + if row is None: + raise ValueError(f"unknown retention class: {retention_class!r}") + + +def _iso(value: datetime | None) -> str | None: + if value is None: + return None + if value.tzinfo is None: + value = value.replace(tzinfo=UTC) + return value.isoformat() + + +def _build_manifest( + *, + pkg_row: Any, + file_rows: Sequence[Any], + location_rows: Sequence[Any], + retention_row: Any | None, + actor: str, +) -> Manifest: + files_entries = [ + FileEntry( + id=str(r.id), + relative_path=r.relative_path, + media_type=r.media_type, + size_bytes=r.size_bytes, + digest_algorithm=r.digest_algorithm, + digest_primary_hex=r.digest_primary.hex(), + digest_sha256_hex=r.digest_sha256.hex(), + ) + for r in file_rows + ] + receipts = [ + ManifestStorageReceipt( + file_id=str(r.artifact_file_id), + backend_id=r.backend_id, + content_address=r.content_address, + retrieval_tier=r.retrieval_tier, + status=r.status, + ) + for r in location_rows + ] + retention_class = ( + retention_row.effective_class if retention_row is not None else pkg_row.retention_class + ) + retention_summary = RetentionSummary( + retention_class=retention_class, + expires_at=_iso(retention_row.current_expires_at) if retention_row else None, + active_holds=[], + last_retention_event_sequence=None, + ) + finalized_at_iso = datetime.now(UTC).isoformat() + return Manifest( + manifest_version=MANIFEST_VERSION, + package=Package( + id=str(pkg_row.id), + name=pkg_row.name, + producer=pkg_row.producer, + subject=pkg_row.subject, + retention_class=pkg_row.retention_class, + status="finalized", + created_at=_iso(pkg_row.created_at) or "", + finalized_at=finalized_at_iso, + expires_at=_iso(pkg_row.expires_at), + metadata=dict(pkg_row.metadata) if pkg_row.metadata else {}, + metadata_schema_id=str(pkg_row.metadata_schema_id) + if pkg_row.metadata_schema_id + else None, + ), + files=files_entries, + storage_receipts=receipts, + retention_summary=retention_summary, + provenance=Provenance( + source_commits={}, + tool_versions={}, + environment={}, + ingest_actor=actor, + ingest_timestamps={"finalized_at": finalized_at_iso}, + ), + ) diff --git a/tests/integration/test_event_log.py b/tests/integration/test_event_log.py index a20c790..57f060b 100644 --- a/tests/integration/test_event_log.py +++ b/tests/integration/test_event_log.py @@ -160,6 +160,8 @@ async def test_replay_reproduces_direct_write_state(fresh_engine: AsyncEngine) - async def test_replay_handles_multiple_event_types(fresh_engine: AsyncEngine) -> None: + import blake3 as _blake3 + writer = RegistryViewWriter() pkg_id = uuid.uuid4() @@ -176,6 +178,9 @@ async def test_replay_handles_multiple_event_types(fresh_engine: AsyncEngine) -> ) await writer.apply(conn, e_created) + # The finalize payload is the canonical CBOR manifest; the view + # writer records manifest_digest = BLAKE3(payload) (= event.payload_digest). + manifest_cbor = cbor2.dumps({"manifest_version": 1, "fake": "manifest"}, canonical=True) e_finalized = await write( conn, make_event( @@ -183,18 +188,17 @@ async def test_replay_handles_multiple_event_types(fresh_engine: AsyncEngine) -> subject_kind="package", subject_id=pkg_id, actor="ops", - payload=cbor2.dumps( - {"manifest_digest_hex": "ab" * 32}, - canonical=True, - ), + payload=manifest_cbor, ), ) await writer.apply(conn, e_finalized) + expected_digest = _blake3.blake3(manifest_cbor).digest() + async with fresh_engine.connect() as conn: pkg_a = (await conn.execute(select(artifact_packages))).one() assert pkg_a.status == "finalized" - assert pkg_a.manifest_digest == bytes.fromhex("ab" * 32) + assert pkg_a.manifest_digest == expected_digest last_seq = await replay(fresh_engine, writer, reset=True) assert last_seq == e_finalized.sequence diff --git a/tests/integration/test_registry.py b/tests/integration/test_registry.py new file mode 100644 index 0000000..4f23855 --- /dev/null +++ b/tests/integration/test_registry.py @@ -0,0 +1,291 @@ +"""End-to-end Registry orchestrator tests (ARTIFACT-STORE-WP-0001-T013).""" + +from __future__ import annotations + +import asyncio +from collections.abc import AsyncIterator +from pathlib import Path + +import pytest +import pytest_asyncio +from sqlalchemy import insert, select +from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine + +from artifactstore.dataplane import InProcessDataPlane +from artifactstore.db.schema import ( + artifact_files, + artifact_packages, + metadata, + retention_classes, + retention_state, + storage_locations, +) +from artifactstore.db.seed import RETENTION_CLASS_SEEDS +from artifactstore.events import RegistryViewWriter, replay +from artifactstore.manifest import decode as manifest_decode +from artifactstore.registry import ( + DuplicateRelativePathError, + IllegalPackageStateError, + PackageNotFoundError, + Registry, +) +from artifactstore.storage import LocalBackend + + +@pytest_asyncio.fixture +async def engine(tmp_path: Path) -> AsyncIterator[AsyncEngine]: + db_path = tmp_path / "registry.db" + eng = create_async_engine(f"sqlite+aiosqlite:///{db_path}") + async with eng.begin() as conn: + await conn.run_sync(metadata.create_all) + for seed in RETENTION_CLASS_SEEDS: + await conn.execute(insert(retention_classes).values(**seed)) + yield eng + await eng.dispose() + + +@pytest.fixture +def backend(tmp_path: Path) -> LocalBackend: + return LocalBackend(tmp_path / "store", backend_id="local") + + +@pytest.fixture +def view_writer() -> RegistryViewWriter: + return RegistryViewWriter() + + +@pytest_asyncio.fixture +async def registry( + engine: AsyncEngine, + backend: LocalBackend, + view_writer: RegistryViewWriter, +) -> Registry: + dp = InProcessDataPlane(backend) + return Registry(engine, dp, view_writer) + + +async def _stream(data: bytes) -> AsyncIterator[bytes]: + yield data + + +async def _consume(it: AsyncIterator[bytes]) -> bytes: + out = bytearray() + async for chunk in it: + out.extend(chunk) + return bytes(out) + + +async def test_create_package_rejects_unknown_retention_class(registry: Registry) -> None: + with pytest.raises(ValueError, match="unknown retention class"): + await registry.create_package( + name="x", + producer="p", + subject="s", + retention_class="does-not-exist", + actor="ops", + ) + + +async def test_create_package_writes_event_and_view( + registry: Registry, engine: AsyncEngine +) -> None: + pkg_id = await registry.create_package( + name="p", + producer="prod", + subject="sub", + retention_class="raw-evidence", + actor="ops", + ) + async with engine.connect() as conn: + row = ( + await conn.execute(select(artifact_packages).where(artifact_packages.c.id == pkg_id)) + ).one() + assert row.name == "p" + assert row.retention_class == "raw-evidence" + assert row.status == "created" + + +async def test_ingest_file_writes_event_and_view(registry: Registry, engine: AsyncEngine) -> None: + pkg_id = await registry.create_package( + name="p", + producer="prod", + subject="sub", + retention_class="raw-evidence", + actor="ops", + ) + data = b"hello world" * 100 + file_id = await registry.ingest_file( + pkg_id, + relative_path="x/y.txt", + media_type="text/plain", + stream=_stream(data), + actor="ops", + ) + + async with engine.connect() as conn: + file_row = ( + await conn.execute(select(artifact_files).where(artifact_files.c.id == file_id)) + ).one() + loc_row = ( + await conn.execute( + select(storage_locations).where(storage_locations.c.artifact_file_id == file_id) + ) + ).one() + assert file_row.size_bytes == len(data) + assert file_row.relative_path == "x/y.txt" + assert loc_row.backend_id == "local" + assert loc_row.content_address.startswith("blake3:") + + +async def test_ingest_file_rejects_duplicate_relative_path(registry: Registry) -> None: + pkg_id = await registry.create_package( + name="p", + producer="prod", + subject="sub", + retention_class="raw-evidence", + actor="ops", + ) + await registry.ingest_file( + pkg_id, + relative_path="a.txt", + media_type="text/plain", + stream=_stream(b"one"), + actor="ops", + ) + with pytest.raises(DuplicateRelativePathError): + await registry.ingest_file( + pkg_id, + relative_path="a.txt", + media_type="text/plain", + stream=_stream(b"two"), + actor="ops", + ) + + +async def test_ingest_into_unknown_package_raises(registry: Registry) -> None: + import uuid + + with pytest.raises(PackageNotFoundError): + await registry.ingest_file( + uuid.uuid4(), + relative_path="a.txt", + media_type="text/plain", + stream=_stream(b"x"), + actor="ops", + ) + + +async def test_finalize_rejects_non_created_package(registry: Registry) -> None: + pkg_id = await registry.create_package( + name="p", + producer="prod", + subject="sub", + retention_class="raw-evidence", + actor="ops", + ) + await registry.finalize_package(pkg_id, actor="ops") + with pytest.raises(IllegalPackageStateError): + await registry.finalize_package(pkg_id, actor="ops") + + +async def test_end_to_end_ingest_finalize_replay( + registry: Registry, + engine: AsyncEngine, + view_writer: RegistryViewWriter, +) -> None: + """Three-file package: create, ingest, finalize, retrieve manifest, get + every file back, tail events, and verify that replaying the events from + scratch reproduces the materialised view state byte-for-byte.""" + + pkg_id = await registry.create_package( + name="e2e", + producer="test", + subject="kontextual-engine", + retention_class="raw-evidence", + actor="ops", + metadata={"run_id": "r-1"}, + ) + + payloads = [f"file content {i}".encode() * 50 for i in range(3)] + file_ids = [] + for i, data in enumerate(payloads): + fid = await registry.ingest_file( + pkg_id, + relative_path=f"reports/{i}.txt", + media_type="text/plain", + stream=_stream(data), + actor="ops", + ) + file_ids.append(fid) + + manifest_addr = await registry.finalize_package(pkg_id, actor="ops") + assert str(manifest_addr).startswith("blake3:") + + cbor_bytes = await registry.get_manifest_bytes(pkg_id, format="cbor") + manifest = manifest_decode(cbor_bytes) + assert manifest.manifest_version == 1 + assert manifest.package.id == str(pkg_id) + assert manifest.package.status == "finalized" + assert len(manifest.files) == 3 + assert [f.relative_path for f in manifest.files] == [ + "reports/0.txt", + "reports/1.txt", + "reports/2.txt", + ] + assert len(manifest.storage_receipts) == 3 + + json_bytes = await registry.get_manifest_bytes(pkg_id, format="json") + assert json_bytes.startswith(b"{") + assert b'"manifest_version":1' in json_bytes + + # Download each file and verify byte equality. + for fid, expected in zip(file_ids, payloads, strict=True): + stream = await registry.get_file(fid) + assert await _consume(stream) == expected + + # Tail events: 1 created + 3 ingested + 1 finalized = 5. + collected = [] + + async def _consume_tail() -> None: + async for evt in registry.tail_events(since_sequence=0, poll_interval_seconds=0.01): + collected.append(evt) + if len(collected) >= 5: + break + + await asyncio.wait_for(_consume_tail(), timeout=5.0) + assert [e.event_type for e in collected] == [ + "v1.package.created", + "v1.file.ingested", + "v1.file.ingested", + "v1.file.ingested", + "v1.package.finalized", + ] + + # Snapshot materialised state. + Row = tuple[object, ...] # noqa: N806 — local type alias, PascalCase by convention + + async def _snapshot() -> tuple[Row, list[Row], list[Row], list[Row]]: + async with engine.connect() as conn: + pkg = (await conn.execute(select(artifact_packages))).one() + files = ( + await conn.execute(select(artifact_files).order_by(artifact_files.c.relative_path)) + ).all() + locs = ( + await conn.execute( + select(storage_locations).order_by(storage_locations.c.artifact_file_id) + ) + ).all() + rets = (await conn.execute(select(retention_state))).all() + return ( + tuple(pkg), + [tuple(r) for r in files], + [tuple(r) for r in locs], + [tuple(r) for r in rets], + ) + + pre = await _snapshot() + + await replay(engine, view_writer, reset=True) + post = await _snapshot() + + assert pre == post, "replay must reproduce byte-identical materialised state" diff --git a/workplans/ARTIFACT-STORE-WP-0001-service-baseline.md b/workplans/ARTIFACT-STORE-WP-0001-service-baseline.md index 505979b..cb04d4d 100644 --- a/workplans/ARTIFACT-STORE-WP-0001-service-baseline.md +++ b/workplans/ARTIFACT-STORE-WP-0001-service-baseline.md @@ -236,7 +236,7 @@ Acceptance: ```task id: ARTIFACT-STORE-WP-0001-T013 -status: todo +status: done priority: high state_hub_task_id: "f4967308-4613-4def-8c09-41caaeb631f7" ```