Files
artifact-store/tests/integration/test_registry.py
tegwick 847b146475 WP-0001-T013: registry orchestrator (library surface)
src/artifactstore/registry/__init__.py implements the Registry class with
six operations the HTTP API and CLI both consume:

* create_package(name, producer, subject, retention_class, actor, metadata?)
  -> UUID. Validates retention_class against the seed table; emits
  v1.package.created with CBOR payload; applies view in same transaction.
* ingest_file(package_id, relative_path, media_type, stream, actor) -> UUID.
  Validates the package is in 'created' status and rejects duplicate
  relative_path. Calls dataplane.ingest_stream (which dual-hashes and writes
  to the backend). Emits v1.file.ingested whose payload carries the file
  metadata + storage receipt + deterministic storage_location_id so replay
  reproduces UUIDs. View handler in events/views.py inserts artifact_files
  + storage_locations and bumps last_event_sequence on the package.
* finalize_package(package_id, actor) -> ContentAddress. Queries the views
  to build a Manifest dataclass, encodes it as canonical CBOR, computes the
  BLAKE3 content address, and writes v1.package.finalized whose payload IS
  the canonical CBOR manifest. The view handler now records
  manifest_digest = event.payload_digest (BLAKE3 of the manifest), not a
  separate field parsed from the payload.
* get_manifest_bytes(package_id, format='cbor'|'json') -> bytes. Reads the
  finalize event payload (CBOR) and optionally projects to JCS.
* get_file(file_id) -> AsyncIterator[bytes]. Looks up the storage location
  and serves bytes via the data plane.
* tail_events(since_sequence, poll_interval_seconds) -> AsyncIterator[Event].
  Pass-through to events.tail.

src/artifactstore/events/views.py:
- New v1.file.ingested handler.
- v1.package.finalized handler updated: manifest_digest now derived from
  event.payload_digest (= BLAKE3 of the canonical CBOR manifest payload).
- All inserts now pass created_at=event.created_at explicitly so replay
  produces byte-identical materialised state (server_default=now() was
  firing fresh on each replay insert).

tests/integration/test_registry.py (7 cases):
- Rejects unknown retention class.
- create_package writes the event and the package row.
- ingest_file writes file + storage_location, populates content_address
  with blake3 prefix.
- Duplicate relative_path raises DuplicateRelativePathError.
- ingest into unknown package raises PackageNotFoundError.
- Finalising twice raises IllegalPackageStateError.
- End-to-end: create + ingest 3 files + finalize + read manifest in CBOR
  and JSON + download each file with byte equality + tail 5 events + replay
  + assert byte-identical materialised state across pre and post snapshots.

tests/integration/test_event_log.py updated: the v1.package.finalized
replay test now uses the new payload semantics (payload is the canonical
CBOR manifest; manifest_digest = BLAKE3 of payload).

Gates: ruff clean, mypy --strict clean on 45 files, 77 tests pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-16 08:17:40 +02:00

292 lines
8.8 KiB
Python

"""End-to-end Registry orchestrator tests (ARTIFACT-STORE-WP-0001-T013)."""
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator
from pathlib import Path
import pytest
import pytest_asyncio
from sqlalchemy import insert, select
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from artifactstore.dataplane import InProcessDataPlane
from artifactstore.db.schema import (
artifact_files,
artifact_packages,
metadata,
retention_classes,
retention_state,
storage_locations,
)
from artifactstore.db.seed import RETENTION_CLASS_SEEDS
from artifactstore.events import RegistryViewWriter, replay
from artifactstore.manifest import decode as manifest_decode
from artifactstore.registry import (
DuplicateRelativePathError,
IllegalPackageStateError,
PackageNotFoundError,
Registry,
)
from artifactstore.storage import LocalBackend
@pytest_asyncio.fixture
async def engine(tmp_path: Path) -> AsyncIterator[AsyncEngine]:
db_path = tmp_path / "registry.db"
eng = create_async_engine(f"sqlite+aiosqlite:///{db_path}")
async with eng.begin() as conn:
await conn.run_sync(metadata.create_all)
for seed in RETENTION_CLASS_SEEDS:
await conn.execute(insert(retention_classes).values(**seed))
yield eng
await eng.dispose()
@pytest.fixture
def backend(tmp_path: Path) -> LocalBackend:
return LocalBackend(tmp_path / "store", backend_id="local")
@pytest.fixture
def view_writer() -> RegistryViewWriter:
return RegistryViewWriter()
@pytest_asyncio.fixture
async def registry(
engine: AsyncEngine,
backend: LocalBackend,
view_writer: RegistryViewWriter,
) -> Registry:
dp = InProcessDataPlane(backend)
return Registry(engine, dp, view_writer)
async def _stream(data: bytes) -> AsyncIterator[bytes]:
yield data
async def _consume(it: AsyncIterator[bytes]) -> bytes:
out = bytearray()
async for chunk in it:
out.extend(chunk)
return bytes(out)
async def test_create_package_rejects_unknown_retention_class(registry: Registry) -> None:
with pytest.raises(ValueError, match="unknown retention class"):
await registry.create_package(
name="x",
producer="p",
subject="s",
retention_class="does-not-exist",
actor="ops",
)
async def test_create_package_writes_event_and_view(
registry: Registry, engine: AsyncEngine
) -> None:
pkg_id = await registry.create_package(
name="p",
producer="prod",
subject="sub",
retention_class="raw-evidence",
actor="ops",
)
async with engine.connect() as conn:
row = (
await conn.execute(select(artifact_packages).where(artifact_packages.c.id == pkg_id))
).one()
assert row.name == "p"
assert row.retention_class == "raw-evidence"
assert row.status == "created"
async def test_ingest_file_writes_event_and_view(registry: Registry, engine: AsyncEngine) -> None:
pkg_id = await registry.create_package(
name="p",
producer="prod",
subject="sub",
retention_class="raw-evidence",
actor="ops",
)
data = b"hello world" * 100
file_id = await registry.ingest_file(
pkg_id,
relative_path="x/y.txt",
media_type="text/plain",
stream=_stream(data),
actor="ops",
)
async with engine.connect() as conn:
file_row = (
await conn.execute(select(artifact_files).where(artifact_files.c.id == file_id))
).one()
loc_row = (
await conn.execute(
select(storage_locations).where(storage_locations.c.artifact_file_id == file_id)
)
).one()
assert file_row.size_bytes == len(data)
assert file_row.relative_path == "x/y.txt"
assert loc_row.backend_id == "local"
assert loc_row.content_address.startswith("blake3:")
async def test_ingest_file_rejects_duplicate_relative_path(registry: Registry) -> None:
pkg_id = await registry.create_package(
name="p",
producer="prod",
subject="sub",
retention_class="raw-evidence",
actor="ops",
)
await registry.ingest_file(
pkg_id,
relative_path="a.txt",
media_type="text/plain",
stream=_stream(b"one"),
actor="ops",
)
with pytest.raises(DuplicateRelativePathError):
await registry.ingest_file(
pkg_id,
relative_path="a.txt",
media_type="text/plain",
stream=_stream(b"two"),
actor="ops",
)
async def test_ingest_into_unknown_package_raises(registry: Registry) -> None:
import uuid
with pytest.raises(PackageNotFoundError):
await registry.ingest_file(
uuid.uuid4(),
relative_path="a.txt",
media_type="text/plain",
stream=_stream(b"x"),
actor="ops",
)
async def test_finalize_rejects_non_created_package(registry: Registry) -> None:
pkg_id = await registry.create_package(
name="p",
producer="prod",
subject="sub",
retention_class="raw-evidence",
actor="ops",
)
await registry.finalize_package(pkg_id, actor="ops")
with pytest.raises(IllegalPackageStateError):
await registry.finalize_package(pkg_id, actor="ops")
async def test_end_to_end_ingest_finalize_replay(
registry: Registry,
engine: AsyncEngine,
view_writer: RegistryViewWriter,
) -> None:
"""Three-file package: create, ingest, finalize, retrieve manifest, get
every file back, tail events, and verify that replaying the events from
scratch reproduces the materialised view state byte-for-byte."""
pkg_id = await registry.create_package(
name="e2e",
producer="test",
subject="kontextual-engine",
retention_class="raw-evidence",
actor="ops",
metadata={"run_id": "r-1"},
)
payloads = [f"file content {i}".encode() * 50 for i in range(3)]
file_ids = []
for i, data in enumerate(payloads):
fid = await registry.ingest_file(
pkg_id,
relative_path=f"reports/{i}.txt",
media_type="text/plain",
stream=_stream(data),
actor="ops",
)
file_ids.append(fid)
manifest_addr = await registry.finalize_package(pkg_id, actor="ops")
assert str(manifest_addr).startswith("blake3:")
cbor_bytes = await registry.get_manifest_bytes(pkg_id, format="cbor")
manifest = manifest_decode(cbor_bytes)
assert manifest.manifest_version == 1
assert manifest.package.id == str(pkg_id)
assert manifest.package.status == "finalized"
assert len(manifest.files) == 3
assert [f.relative_path for f in manifest.files] == [
"reports/0.txt",
"reports/1.txt",
"reports/2.txt",
]
assert len(manifest.storage_receipts) == 3
json_bytes = await registry.get_manifest_bytes(pkg_id, format="json")
assert json_bytes.startswith(b"{")
assert b'"manifest_version":1' in json_bytes
# Download each file and verify byte equality.
for fid, expected in zip(file_ids, payloads, strict=True):
stream = await registry.get_file(fid)
assert await _consume(stream) == expected
# Tail events: 1 created + 3 ingested + 1 finalized = 5.
collected = []
async def _consume_tail() -> None:
async for evt in registry.tail_events(since_sequence=0, poll_interval_seconds=0.01):
collected.append(evt)
if len(collected) >= 5:
break
await asyncio.wait_for(_consume_tail(), timeout=5.0)
assert [e.event_type for e in collected] == [
"v1.package.created",
"v1.file.ingested",
"v1.file.ingested",
"v1.file.ingested",
"v1.package.finalized",
]
# Snapshot materialised state.
Row = tuple[object, ...] # noqa: N806 — local type alias, PascalCase by convention
async def _snapshot() -> tuple[Row, list[Row], list[Row], list[Row]]:
async with engine.connect() as conn:
pkg = (await conn.execute(select(artifact_packages))).one()
files = (
await conn.execute(select(artifact_files).order_by(artifact_files.c.relative_path))
).all()
locs = (
await conn.execute(
select(storage_locations).order_by(storage_locations.c.artifact_file_id)
)
).all()
rets = (await conn.execute(select(retention_state))).all()
return (
tuple(pkg),
[tuple(r) for r in files],
[tuple(r) for r in locs],
[tuple(r) for r in rets],
)
pre = await _snapshot()
await replay(engine, view_writer, reset=True)
post = await _snapshot()
assert pre == post, "replay must reproduce byte-identical materialised state"