WP-0001-T013: registry orchestrator (library surface)

src/artifactstore/registry/__init__.py implements the Registry class with
six operations the HTTP API and CLI both consume:

* create_package(name, producer, subject, retention_class, actor, metadata?)
  -> UUID. Validates retention_class against the seed table; emits
  v1.package.created with CBOR payload; applies view in same transaction.
* ingest_file(package_id, relative_path, media_type, stream, actor) -> UUID.
  Validates the package is in 'created' status and rejects duplicate
  relative_path. Calls dataplane.ingest_stream (which dual-hashes and writes
  to the backend). Emits v1.file.ingested whose payload carries the file
  metadata + storage receipt + deterministic storage_location_id so replay
  reproduces UUIDs. View handler in events/views.py inserts artifact_files
  + storage_locations and bumps last_event_sequence on the package.
* finalize_package(package_id, actor) -> ContentAddress. Queries the views
  to build a Manifest dataclass, encodes it as canonical CBOR, computes the
  BLAKE3 content address, and writes v1.package.finalized whose payload IS
  the canonical CBOR manifest. The view handler now records
  manifest_digest = event.payload_digest (BLAKE3 of the manifest), not a
  separate field parsed from the payload.
* get_manifest_bytes(package_id, format='cbor'|'json') -> bytes. Reads the
  finalize event payload (CBOR) and optionally projects to JCS.
* get_file(file_id) -> AsyncIterator[bytes]. Looks up the storage location
  and serves bytes via the data plane.
* tail_events(since_sequence, poll_interval_seconds) -> AsyncIterator[Event].
  Pass-through to events.tail.

src/artifactstore/events/views.py:
- New v1.file.ingested handler.
- v1.package.finalized handler updated: manifest_digest now derived from
  event.payload_digest (= BLAKE3 of the canonical CBOR manifest payload).
- All inserts now pass created_at=event.created_at explicitly so replay
  produces byte-identical materialised state (server_default=now() was
  firing fresh on each replay insert).

tests/integration/test_registry.py (7 cases):
- Rejects unknown retention class.
- create_package writes the event and the package row.
- ingest_file writes file + storage_location, populates content_address
  with blake3 prefix.
- Duplicate relative_path raises DuplicateRelativePathError.
- ingest into unknown package raises PackageNotFoundError.
- Finalising twice raises IllegalPackageStateError.
- End-to-end: create + ingest 3 files + finalize + read manifest in CBOR
  and JSON + download each file with byte equality + tail 5 events + replay
  + assert byte-identical materialised state across pre and post snapshots.

tests/integration/test_event_log.py updated: the v1.package.finalized
replay test now uses the new payload semantics (payload is the canonical
CBOR manifest; manifest_digest = BLAKE3 of payload).

Gates: ruff clean, mypy --strict clean on 45 files, 77 tests pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-16 08:17:40 +02:00
parent 48fd415330
commit 847b146475
5 changed files with 768 additions and 18 deletions

View File

@@ -5,17 +5,22 @@ serves both the direct write path (registry calls :func:`log.write` followed
by :meth:`RegistryViewWriter.apply` in one transaction) and the replay path
(:func:`log.replay` walks every event in order and applies it).
T011 ships handlers for the events emitted at registry baseline:
Handlers shipped:
* ``v1.package.created`` — inserts ``artifact_packages`` + ``retention_state``;
* ``v1.package.finalized`` — updates ``artifact_packages`` status and digest.
* ``v1.package.created`` — inserts ``artifact_packages`` + ``retention_state``.
* ``v1.file.ingested`` — inserts ``artifact_files`` + ``storage_locations``.
* ``v1.package.finalized`` — updates ``artifact_packages`` status, sets
``finalized_at`` and ``manifest_digest`` (= BLAKE3 of the event payload,
which **is** the canonical CBOR manifest).
Additional handlers (file ingestion, storage receipts, retention extensions)
land in later workplans.
Additional handlers (retention extensions, holds, deletions) land in later
workplans without changing this module's public surface.
"""
from __future__ import annotations
from uuid import UUID
import cbor2
from sqlalchemy import delete, insert, update
from sqlalchemy.ext.asyncio import AsyncConnection
@@ -52,9 +57,9 @@ class RegistryViewWriter:
async def _apply_package_created(connection: AsyncConnection, event: Event) -> None:
payload = cbor2.loads(event.payload)
if event.subject_id is None:
raise ValueError("v1.package.created event must have subject_id")
payload = cbor2.loads(event.payload)
await connection.execute(
insert(artifact_packages).values(
id=event.subject_id,
@@ -66,6 +71,9 @@ async def _apply_package_created(connection: AsyncConnection, event: Event) -> N
metadata=payload.get("metadata", {}),
status="created",
manifest_digest=None,
# Bind created_at to the event's timestamp so replay reproduces
# byte-identical state instead of taking a fresh server-side now().
created_at=event.created_at,
last_event_sequence=event.sequence,
)
)
@@ -79,17 +87,60 @@ async def _apply_package_created(connection: AsyncConnection, event: Event) -> N
)
async def _apply_package_finalized(connection: AsyncConnection, event: Event) -> None:
async def _apply_file_ingested(connection: AsyncConnection, event: Event) -> None:
if event.subject_id is None:
raise ValueError("v1.file.ingested event must have subject_id (the package id)")
payload = cbor2.loads(event.payload)
file_id = UUID(payload["file_id"])
package_id = event.subject_id
storage_location_id = UUID(payload["storage_location_id"])
await connection.execute(
insert(artifact_files).values(
id=file_id,
package_id=package_id,
relative_path=payload["relative_path"],
media_type=payload["media_type"],
size_bytes=payload["size_bytes"],
digest_algorithm=payload["digest_algorithm"],
digest_primary=bytes.fromhex(payload["digest_primary_hex"]),
digest_sha256=bytes.fromhex(payload["digest_sha256_hex"]),
created_at=event.created_at,
)
)
await connection.execute(
insert(storage_locations).values(
id=storage_location_id,
artifact_file_id=file_id,
backend_id=payload["backend_id"],
content_address=f"{payload['digest_algorithm']}:{payload['digest_primary_hex']}",
object_key=payload["object_key"],
storage_class=payload.get("storage_class"),
retrieval_tier=payload.get("retrieval_tier", "hot"),
restore_status=None,
status=payload.get("storage_status", "recorded"),
created_at=event.created_at,
)
)
await connection.execute(
update(artifact_packages)
.where(artifact_packages.c.id == package_id)
.values(last_event_sequence=event.sequence)
)
async def _apply_package_finalized(connection: AsyncConnection, event: Event) -> None:
if event.subject_id is None:
raise ValueError("v1.package.finalized event must have subject_id")
# The event payload **is** the canonical CBOR manifest; payload_digest
# is BLAKE3 of that payload — i.e. the manifest's content address digest.
await connection.execute(
update(artifact_packages)
.where(artifact_packages.c.id == event.subject_id)
.values(
status="finalized",
finalized_at=event.created_at,
manifest_digest=bytes.fromhex(payload["manifest_digest_hex"]),
manifest_digest=event.payload_digest,
last_event_sequence=event.sequence,
)
)
@@ -97,5 +148,6 @@ async def _apply_package_finalized(connection: AsyncConnection, event: Event) ->
_HANDLERS = {
"v1.package.created": _apply_package_created,
"v1.file.ingested": _apply_file_ingested,
"v1.package.finalized": _apply_package_finalized,
}

View File

@@ -1,6 +1,409 @@
"""Registry orchestrator.
"""Registry orchestrator (ADR-0002 + ADR-0003 + ADR-0004).
Real implementation lands in ARTIFACT-STORE-WP-0001-T013. The orchestrator
combines identity, manifest, events, retention, and dataplane into the
operations exposed by the HTTP API and CLI.
The :class:`Registry` is the library-shaped entry point. It combines
``identity`` + ``manifest`` + ``events`` + ``dataplane`` into the six
operations the HTTP API and CLI both consume:
* ``create_package``
* ``ingest_file``
* ``finalize_package``
* ``get_manifest_bytes`` (canonical CBOR or JCS projection)
* ``get_file``
* ``tail_events``
Every mutating operation writes one event and updates the materialised
views inside the same database transaction, so the event log remains the
source of truth (ADR-0002).
"""
from __future__ import annotations
import uuid
from collections.abc import AsyncIterator, Sequence
from datetime import UTC, datetime
from typing import Any
from uuid import UUID
import cbor2
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncEngine
from artifactstore.dataplane.spi import DataPlane
from artifactstore.db.schema import (
artifact_files,
artifact_packages,
retention_classes,
retention_state,
storage_locations,
)
from artifactstore.db.schema import (
events as events_t,
)
from artifactstore.events import RegistryViewWriter, make_event, tail, write
from artifactstore.events.model import Event
from artifactstore.identity import ContentAddress
from artifactstore.manifest import (
MANIFEST_VERSION,
FileEntry,
Manifest,
Package,
Provenance,
RetentionSummary,
)
from artifactstore.manifest import (
StorageReceipt as ManifestStorageReceipt,
)
from artifactstore.manifest import (
encode as manifest_encode,
)
from artifactstore.manifest import (
manifest_digest as compute_manifest_digest,
)
from artifactstore.manifest.codec import decode as manifest_decode
from artifactstore.manifest.projection import jcs_projection
__all__ = ["Registry"]
class PackageNotFoundError(KeyError):
"""Raised when a package id has no row in artifact_packages."""
class FileNotFoundError(KeyError):
"""Raised when a file id has no row in artifact_files / storage_locations."""
class IllegalPackageStateError(ValueError):
"""Raised when an operation is rejected because of package status."""
class DuplicateRelativePathError(ValueError):
"""Raised when ingest_file would create a duplicate ``relative_path`` in a package."""
class Registry:
"""Library-shaped orchestrator over events, dataplane, and views."""
def __init__(
self,
engine: AsyncEngine,
dataplane: DataPlane,
view_writer: RegistryViewWriter | None = None,
) -> None:
self._engine = engine
self._dataplane = dataplane
self._view_writer = view_writer or RegistryViewWriter()
# ---- mutating operations ------------------------------------------------
async def create_package(
self,
*,
name: str,
producer: str,
subject: str,
retention_class: str,
actor: str,
metadata: dict[str, Any] | None = None,
) -> UUID:
"""Create a new package; returns its ``UUID``."""
await self._validate_retention_class(retention_class)
package_id = uuid.uuid4()
payload = cbor2.dumps(
{
"name": name,
"producer": producer,
"subject": subject,
"retention_class": retention_class,
"metadata": metadata or {},
},
canonical=True,
)
event = make_event(
event_type="v1.package.created",
subject_kind="package",
subject_id=package_id,
actor=actor,
payload=payload,
)
async with self._engine.begin() as conn:
written = await write(conn, event)
await self._view_writer.apply(conn, written)
return package_id
async def ingest_file(
self,
package_id: UUID,
*,
relative_path: str,
media_type: str,
stream: AsyncIterator[bytes],
actor: str,
) -> UUID:
"""Stream a file into the package; returns the new file id."""
async with self._engine.connect() as conn:
pkg_row = (
await conn.execute(
select(artifact_packages).where(artifact_packages.c.id == package_id)
)
).first()
if pkg_row is None:
raise PackageNotFoundError(f"package not found: {package_id}")
if pkg_row.status != "created":
raise IllegalPackageStateError(
f"package {package_id} cannot accept files (status={pkg_row.status})"
)
existing = (
await conn.execute(
select(artifact_files).where(
artifact_files.c.package_id == package_id,
artifact_files.c.relative_path == relative_path,
)
)
).first()
if existing is not None:
raise DuplicateRelativePathError(
f"relative_path {relative_path!r} already exists in package {package_id}"
)
ingest = await self._dataplane.ingest_stream(stream)
file_id = uuid.uuid4()
storage_location_id = uuid.uuid4()
payload = cbor2.dumps(
{
"file_id": str(file_id),
"relative_path": relative_path,
"media_type": media_type,
"size_bytes": ingest.size_bytes,
"digest_algorithm": ingest.primary_digest.algorithm,
"digest_primary_hex": ingest.primary_digest.hex,
"digest_sha256_hex": ingest.sha256_digest.hex,
"backend_id": ingest.receipt.backend_id,
"object_key": ingest.receipt.object_key,
"storage_location_id": str(storage_location_id),
"retrieval_tier": "hot",
"storage_status": "recorded",
},
canonical=True,
)
event = make_event(
event_type="v1.file.ingested",
subject_kind="package",
subject_id=package_id,
actor=actor,
payload=payload,
)
async with self._engine.begin() as conn:
written = await write(conn, event)
await self._view_writer.apply(conn, written)
return file_id
async def finalize_package(self, package_id: UUID, *, actor: str) -> ContentAddress:
"""Finalise a package: build manifest, write it as the
v1.package.finalized event payload, and update the package's
terminal state. Returns the manifest's content address."""
async with self._engine.connect() as conn:
pkg_row = (
await conn.execute(
select(artifact_packages).where(artifact_packages.c.id == package_id)
)
).first()
if pkg_row is None:
raise PackageNotFoundError(f"package not found: {package_id}")
if pkg_row.status != "created":
raise IllegalPackageStateError(
f"package {package_id} cannot be finalized (status={pkg_row.status})"
)
file_rows = (
await conn.execute(
select(artifact_files)
.where(artifact_files.c.package_id == package_id)
.order_by(artifact_files.c.relative_path)
)
).all()
file_ids = [r.id for r in file_rows]
location_rows = (
(
await conn.execute(
select(storage_locations)
.where(storage_locations.c.artifact_file_id.in_(file_ids))
.order_by(storage_locations.c.artifact_file_id)
)
).all()
if file_ids
else []
)
retention_row = (
await conn.execute(
select(retention_state).where(retention_state.c.package_id == package_id)
)
).first()
manifest = _build_manifest(
pkg_row=pkg_row,
file_rows=file_rows,
location_rows=location_rows,
retention_row=retention_row,
actor=actor,
)
manifest_bytes = manifest_encode(manifest)
manifest_addr = compute_manifest_digest(manifest)
event = make_event(
event_type="v1.package.finalized",
subject_kind="package",
subject_id=package_id,
actor=actor,
payload=manifest_bytes,
)
async with self._engine.begin() as conn:
written = await write(conn, event)
await self._view_writer.apply(conn, written)
return manifest_addr
# ---- read-only operations ----------------------------------------------
async def get_manifest_bytes(self, package_id: UUID, *, format: str = "cbor") -> bytes:
"""Return the finalised manifest. ``format`` is ``cbor`` (canonical
CBOR, the wire form) or ``json`` (the JCS projection)."""
async with self._engine.connect() as conn:
row = (
await conn.execute(
select(events_t.c.payload)
.where(
events_t.c.event_type == "v1.package.finalized",
events_t.c.subject_id == package_id,
)
.order_by(events_t.c.sequence.desc())
.limit(1)
)
).first()
if row is None:
raise PackageNotFoundError(f"no finalized manifest for package {package_id}")
payload_bytes: bytes = row.payload
if format == "cbor":
return payload_bytes
if format == "json":
return jcs_projection(manifest_decode(payload_bytes))
raise ValueError(f"unknown manifest format: {format!r} (expected 'cbor' or 'json')")
async def get_file(self, file_id: UUID) -> AsyncIterator[bytes]:
"""Return an async byte iterator for the bytes of a stored file."""
async with self._engine.connect() as conn:
row = (
await conn.execute(
select(storage_locations)
.where(storage_locations.c.artifact_file_id == file_id)
.limit(1)
)
).first()
if row is None:
raise FileNotFoundError(f"file not found: {file_id}")
ca = ContentAddress(row.content_address)
return await self._dataplane.serve_object(ca)
def tail_events(
self,
*,
since_sequence: int = 0,
poll_interval_seconds: float = 0.5,
) -> AsyncIterator[Event]:
"""Long-poll the event log; pass-through to :func:`events.tail`."""
return tail(
self._engine,
since_sequence=since_sequence,
poll_interval_seconds=poll_interval_seconds,
)
# ---- internals ----------------------------------------------------------
async def _validate_retention_class(self, retention_class: str) -> None:
async with self._engine.connect() as conn:
row = (
await conn.execute(
select(retention_classes.c.class_id).where(
retention_classes.c.class_id == retention_class
)
)
).first()
if row is None:
raise ValueError(f"unknown retention class: {retention_class!r}")
def _iso(value: datetime | None) -> str | None:
if value is None:
return None
if value.tzinfo is None:
value = value.replace(tzinfo=UTC)
return value.isoformat()
def _build_manifest(
*,
pkg_row: Any,
file_rows: Sequence[Any],
location_rows: Sequence[Any],
retention_row: Any | None,
actor: str,
) -> Manifest:
files_entries = [
FileEntry(
id=str(r.id),
relative_path=r.relative_path,
media_type=r.media_type,
size_bytes=r.size_bytes,
digest_algorithm=r.digest_algorithm,
digest_primary_hex=r.digest_primary.hex(),
digest_sha256_hex=r.digest_sha256.hex(),
)
for r in file_rows
]
receipts = [
ManifestStorageReceipt(
file_id=str(r.artifact_file_id),
backend_id=r.backend_id,
content_address=r.content_address,
retrieval_tier=r.retrieval_tier,
status=r.status,
)
for r in location_rows
]
retention_class = (
retention_row.effective_class if retention_row is not None else pkg_row.retention_class
)
retention_summary = RetentionSummary(
retention_class=retention_class,
expires_at=_iso(retention_row.current_expires_at) if retention_row else None,
active_holds=[],
last_retention_event_sequence=None,
)
finalized_at_iso = datetime.now(UTC).isoformat()
return Manifest(
manifest_version=MANIFEST_VERSION,
package=Package(
id=str(pkg_row.id),
name=pkg_row.name,
producer=pkg_row.producer,
subject=pkg_row.subject,
retention_class=pkg_row.retention_class,
status="finalized",
created_at=_iso(pkg_row.created_at) or "",
finalized_at=finalized_at_iso,
expires_at=_iso(pkg_row.expires_at),
metadata=dict(pkg_row.metadata) if pkg_row.metadata else {},
metadata_schema_id=str(pkg_row.metadata_schema_id)
if pkg_row.metadata_schema_id
else None,
),
files=files_entries,
storage_receipts=receipts,
retention_summary=retention_summary,
provenance=Provenance(
source_commits={},
tool_versions={},
environment={},
ingest_actor=actor,
ingest_timestamps={"finalized_at": finalized_at_iso},
),
)

View File

@@ -160,6 +160,8 @@ async def test_replay_reproduces_direct_write_state(fresh_engine: AsyncEngine) -
async def test_replay_handles_multiple_event_types(fresh_engine: AsyncEngine) -> None:
import blake3 as _blake3
writer = RegistryViewWriter()
pkg_id = uuid.uuid4()
@@ -176,6 +178,9 @@ async def test_replay_handles_multiple_event_types(fresh_engine: AsyncEngine) ->
)
await writer.apply(conn, e_created)
# The finalize payload is the canonical CBOR manifest; the view
# writer records manifest_digest = BLAKE3(payload) (= event.payload_digest).
manifest_cbor = cbor2.dumps({"manifest_version": 1, "fake": "manifest"}, canonical=True)
e_finalized = await write(
conn,
make_event(
@@ -183,18 +188,17 @@ async def test_replay_handles_multiple_event_types(fresh_engine: AsyncEngine) ->
subject_kind="package",
subject_id=pkg_id,
actor="ops",
payload=cbor2.dumps(
{"manifest_digest_hex": "ab" * 32},
canonical=True,
),
payload=manifest_cbor,
),
)
await writer.apply(conn, e_finalized)
expected_digest = _blake3.blake3(manifest_cbor).digest()
async with fresh_engine.connect() as conn:
pkg_a = (await conn.execute(select(artifact_packages))).one()
assert pkg_a.status == "finalized"
assert pkg_a.manifest_digest == bytes.fromhex("ab" * 32)
assert pkg_a.manifest_digest == expected_digest
last_seq = await replay(fresh_engine, writer, reset=True)
assert last_seq == e_finalized.sequence

View File

@@ -0,0 +1,291 @@
"""End-to-end Registry orchestrator tests (ARTIFACT-STORE-WP-0001-T013)."""
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator
from pathlib import Path
import pytest
import pytest_asyncio
from sqlalchemy import insert, select
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from artifactstore.dataplane import InProcessDataPlane
from artifactstore.db.schema import (
artifact_files,
artifact_packages,
metadata,
retention_classes,
retention_state,
storage_locations,
)
from artifactstore.db.seed import RETENTION_CLASS_SEEDS
from artifactstore.events import RegistryViewWriter, replay
from artifactstore.manifest import decode as manifest_decode
from artifactstore.registry import (
DuplicateRelativePathError,
IllegalPackageStateError,
PackageNotFoundError,
Registry,
)
from artifactstore.storage import LocalBackend
@pytest_asyncio.fixture
async def engine(tmp_path: Path) -> AsyncIterator[AsyncEngine]:
db_path = tmp_path / "registry.db"
eng = create_async_engine(f"sqlite+aiosqlite:///{db_path}")
async with eng.begin() as conn:
await conn.run_sync(metadata.create_all)
for seed in RETENTION_CLASS_SEEDS:
await conn.execute(insert(retention_classes).values(**seed))
yield eng
await eng.dispose()
@pytest.fixture
def backend(tmp_path: Path) -> LocalBackend:
return LocalBackend(tmp_path / "store", backend_id="local")
@pytest.fixture
def view_writer() -> RegistryViewWriter:
return RegistryViewWriter()
@pytest_asyncio.fixture
async def registry(
engine: AsyncEngine,
backend: LocalBackend,
view_writer: RegistryViewWriter,
) -> Registry:
dp = InProcessDataPlane(backend)
return Registry(engine, dp, view_writer)
async def _stream(data: bytes) -> AsyncIterator[bytes]:
yield data
async def _consume(it: AsyncIterator[bytes]) -> bytes:
out = bytearray()
async for chunk in it:
out.extend(chunk)
return bytes(out)
async def test_create_package_rejects_unknown_retention_class(registry: Registry) -> None:
with pytest.raises(ValueError, match="unknown retention class"):
await registry.create_package(
name="x",
producer="p",
subject="s",
retention_class="does-not-exist",
actor="ops",
)
async def test_create_package_writes_event_and_view(
registry: Registry, engine: AsyncEngine
) -> None:
pkg_id = await registry.create_package(
name="p",
producer="prod",
subject="sub",
retention_class="raw-evidence",
actor="ops",
)
async with engine.connect() as conn:
row = (
await conn.execute(select(artifact_packages).where(artifact_packages.c.id == pkg_id))
).one()
assert row.name == "p"
assert row.retention_class == "raw-evidence"
assert row.status == "created"
async def test_ingest_file_writes_event_and_view(registry: Registry, engine: AsyncEngine) -> None:
pkg_id = await registry.create_package(
name="p",
producer="prod",
subject="sub",
retention_class="raw-evidence",
actor="ops",
)
data = b"hello world" * 100
file_id = await registry.ingest_file(
pkg_id,
relative_path="x/y.txt",
media_type="text/plain",
stream=_stream(data),
actor="ops",
)
async with engine.connect() as conn:
file_row = (
await conn.execute(select(artifact_files).where(artifact_files.c.id == file_id))
).one()
loc_row = (
await conn.execute(
select(storage_locations).where(storage_locations.c.artifact_file_id == file_id)
)
).one()
assert file_row.size_bytes == len(data)
assert file_row.relative_path == "x/y.txt"
assert loc_row.backend_id == "local"
assert loc_row.content_address.startswith("blake3:")
async def test_ingest_file_rejects_duplicate_relative_path(registry: Registry) -> None:
pkg_id = await registry.create_package(
name="p",
producer="prod",
subject="sub",
retention_class="raw-evidence",
actor="ops",
)
await registry.ingest_file(
pkg_id,
relative_path="a.txt",
media_type="text/plain",
stream=_stream(b"one"),
actor="ops",
)
with pytest.raises(DuplicateRelativePathError):
await registry.ingest_file(
pkg_id,
relative_path="a.txt",
media_type="text/plain",
stream=_stream(b"two"),
actor="ops",
)
async def test_ingest_into_unknown_package_raises(registry: Registry) -> None:
import uuid
with pytest.raises(PackageNotFoundError):
await registry.ingest_file(
uuid.uuid4(),
relative_path="a.txt",
media_type="text/plain",
stream=_stream(b"x"),
actor="ops",
)
async def test_finalize_rejects_non_created_package(registry: Registry) -> None:
pkg_id = await registry.create_package(
name="p",
producer="prod",
subject="sub",
retention_class="raw-evidence",
actor="ops",
)
await registry.finalize_package(pkg_id, actor="ops")
with pytest.raises(IllegalPackageStateError):
await registry.finalize_package(pkg_id, actor="ops")
async def test_end_to_end_ingest_finalize_replay(
registry: Registry,
engine: AsyncEngine,
view_writer: RegistryViewWriter,
) -> None:
"""Three-file package: create, ingest, finalize, retrieve manifest, get
every file back, tail events, and verify that replaying the events from
scratch reproduces the materialised view state byte-for-byte."""
pkg_id = await registry.create_package(
name="e2e",
producer="test",
subject="kontextual-engine",
retention_class="raw-evidence",
actor="ops",
metadata={"run_id": "r-1"},
)
payloads = [f"file content {i}".encode() * 50 for i in range(3)]
file_ids = []
for i, data in enumerate(payloads):
fid = await registry.ingest_file(
pkg_id,
relative_path=f"reports/{i}.txt",
media_type="text/plain",
stream=_stream(data),
actor="ops",
)
file_ids.append(fid)
manifest_addr = await registry.finalize_package(pkg_id, actor="ops")
assert str(manifest_addr).startswith("blake3:")
cbor_bytes = await registry.get_manifest_bytes(pkg_id, format="cbor")
manifest = manifest_decode(cbor_bytes)
assert manifest.manifest_version == 1
assert manifest.package.id == str(pkg_id)
assert manifest.package.status == "finalized"
assert len(manifest.files) == 3
assert [f.relative_path for f in manifest.files] == [
"reports/0.txt",
"reports/1.txt",
"reports/2.txt",
]
assert len(manifest.storage_receipts) == 3
json_bytes = await registry.get_manifest_bytes(pkg_id, format="json")
assert json_bytes.startswith(b"{")
assert b'"manifest_version":1' in json_bytes
# Download each file and verify byte equality.
for fid, expected in zip(file_ids, payloads, strict=True):
stream = await registry.get_file(fid)
assert await _consume(stream) == expected
# Tail events: 1 created + 3 ingested + 1 finalized = 5.
collected = []
async def _consume_tail() -> None:
async for evt in registry.tail_events(since_sequence=0, poll_interval_seconds=0.01):
collected.append(evt)
if len(collected) >= 5:
break
await asyncio.wait_for(_consume_tail(), timeout=5.0)
assert [e.event_type for e in collected] == [
"v1.package.created",
"v1.file.ingested",
"v1.file.ingested",
"v1.file.ingested",
"v1.package.finalized",
]
# Snapshot materialised state.
Row = tuple[object, ...] # noqa: N806 — local type alias, PascalCase by convention
async def _snapshot() -> tuple[Row, list[Row], list[Row], list[Row]]:
async with engine.connect() as conn:
pkg = (await conn.execute(select(artifact_packages))).one()
files = (
await conn.execute(select(artifact_files).order_by(artifact_files.c.relative_path))
).all()
locs = (
await conn.execute(
select(storage_locations).order_by(storage_locations.c.artifact_file_id)
)
).all()
rets = (await conn.execute(select(retention_state))).all()
return (
tuple(pkg),
[tuple(r) for r in files],
[tuple(r) for r in locs],
[tuple(r) for r in rets],
)
pre = await _snapshot()
await replay(engine, view_writer, reset=True)
post = await _snapshot()
assert pre == post, "replay must reproduce byte-identical materialised state"

View File

@@ -236,7 +236,7 @@ Acceptance:
```task
id: ARTIFACT-STORE-WP-0001-T013
status: todo
status: done
priority: high
state_hub_task_id: "f4967308-4613-4def-8c09-41caaeb631f7"
```