"""End-to-end Registry orchestrator tests (ARTIFACT-STORE-WP-0001-T013).""" from __future__ import annotations import asyncio from collections.abc import AsyncIterator from pathlib import Path import pytest import pytest_asyncio from sqlalchemy import insert, select from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine from artifactstore.dataplane import InProcessDataPlane from artifactstore.db.schema import ( artifact_files, artifact_packages, metadata, retention_classes, retention_state, storage_locations, ) from artifactstore.db.seed import RETENTION_CLASS_SEEDS from artifactstore.events import RegistryViewWriter, replay from artifactstore.manifest import decode as manifest_decode from artifactstore.registry import ( DuplicateRelativePathError, IllegalPackageStateError, PackageNotFoundError, Registry, ) from artifactstore.storage import LocalBackend @pytest_asyncio.fixture async def engine(tmp_path: Path) -> AsyncIterator[AsyncEngine]: db_path = tmp_path / "registry.db" eng = create_async_engine(f"sqlite+aiosqlite:///{db_path}") async with eng.begin() as conn: await conn.run_sync(metadata.create_all) for seed in RETENTION_CLASS_SEEDS: await conn.execute(insert(retention_classes).values(**seed)) yield eng await eng.dispose() @pytest.fixture def backend(tmp_path: Path) -> LocalBackend: return LocalBackend(tmp_path / "store", backend_id="local") @pytest.fixture def view_writer() -> RegistryViewWriter: return RegistryViewWriter() @pytest_asyncio.fixture async def registry( engine: AsyncEngine, backend: LocalBackend, view_writer: RegistryViewWriter, ) -> Registry: dp = InProcessDataPlane(backend) return Registry(engine, dp, view_writer) async def _stream(data: bytes) -> AsyncIterator[bytes]: yield data async def _consume(it: AsyncIterator[bytes]) -> bytes: out = bytearray() async for chunk in it: out.extend(chunk) return bytes(out) async def test_create_package_rejects_unknown_retention_class(registry: Registry) -> None: with pytest.raises(ValueError, match="unknown retention class"): await registry.create_package( name="x", producer="p", subject="s", retention_class="does-not-exist", actor="ops", ) async def test_create_package_writes_event_and_view( registry: Registry, engine: AsyncEngine ) -> None: pkg_id = await registry.create_package( name="p", producer="prod", subject="sub", retention_class="raw-evidence", actor="ops", ) async with engine.connect() as conn: row = ( await conn.execute(select(artifact_packages).where(artifact_packages.c.id == pkg_id)) ).one() assert row.name == "p" assert row.retention_class == "raw-evidence" assert row.status == "created" async def test_ingest_file_writes_event_and_view(registry: Registry, engine: AsyncEngine) -> None: pkg_id = await registry.create_package( name="p", producer="prod", subject="sub", retention_class="raw-evidence", actor="ops", ) data = b"hello world" * 100 file_id = await registry.ingest_file( pkg_id, relative_path="x/y.txt", media_type="text/plain", stream=_stream(data), actor="ops", ) async with engine.connect() as conn: file_row = ( await conn.execute(select(artifact_files).where(artifact_files.c.id == file_id)) ).one() loc_row = ( await conn.execute( select(storage_locations).where(storage_locations.c.artifact_file_id == file_id) ) ).one() assert file_row.size_bytes == len(data) assert file_row.relative_path == "x/y.txt" assert loc_row.backend_id == "local" assert loc_row.content_address.startswith("blake3:") async def test_ingest_file_rejects_duplicate_relative_path(registry: Registry) -> None: pkg_id = await registry.create_package( name="p", producer="prod", subject="sub", retention_class="raw-evidence", actor="ops", ) await registry.ingest_file( pkg_id, relative_path="a.txt", media_type="text/plain", stream=_stream(b"one"), actor="ops", ) with pytest.raises(DuplicateRelativePathError): await registry.ingest_file( pkg_id, relative_path="a.txt", media_type="text/plain", stream=_stream(b"two"), actor="ops", ) async def test_ingest_into_unknown_package_raises(registry: Registry) -> None: import uuid with pytest.raises(PackageNotFoundError): await registry.ingest_file( uuid.uuid4(), relative_path="a.txt", media_type="text/plain", stream=_stream(b"x"), actor="ops", ) async def test_finalize_rejects_non_created_package(registry: Registry) -> None: pkg_id = await registry.create_package( name="p", producer="prod", subject="sub", retention_class="raw-evidence", actor="ops", ) await registry.finalize_package(pkg_id, actor="ops") with pytest.raises(IllegalPackageStateError): await registry.finalize_package(pkg_id, actor="ops") async def test_end_to_end_ingest_finalize_replay( registry: Registry, engine: AsyncEngine, view_writer: RegistryViewWriter, ) -> None: """Three-file package: create, ingest, finalize, retrieve manifest, get every file back, tail events, and verify that replaying the events from scratch reproduces the materialised view state byte-for-byte.""" pkg_id = await registry.create_package( name="e2e", producer="test", subject="kontextual-engine", retention_class="raw-evidence", actor="ops", metadata={"run_id": "r-1"}, ) payloads = [f"file content {i}".encode() * 50 for i in range(3)] file_ids = [] for i, data in enumerate(payloads): fid = await registry.ingest_file( pkg_id, relative_path=f"reports/{i}.txt", media_type="text/plain", stream=_stream(data), actor="ops", ) file_ids.append(fid) manifest_addr = await registry.finalize_package(pkg_id, actor="ops") assert str(manifest_addr).startswith("blake3:") cbor_bytes = await registry.get_manifest_bytes(pkg_id, format="cbor") manifest = manifest_decode(cbor_bytes) assert manifest.manifest_version == 1 assert manifest.package.id == str(pkg_id) assert manifest.package.status == "finalized" assert len(manifest.files) == 3 assert [f.relative_path for f in manifest.files] == [ "reports/0.txt", "reports/1.txt", "reports/2.txt", ] assert len(manifest.storage_receipts) == 3 json_bytes = await registry.get_manifest_bytes(pkg_id, format="json") assert json_bytes.startswith(b"{") assert b'"manifest_version":1' in json_bytes # Download each file and verify byte equality. for fid, expected in zip(file_ids, payloads, strict=True): stream = await registry.get_file(fid) assert await _consume(stream) == expected # Tail events: 1 created + 1 default retention + 3 ingested + 1 finalized = 6. collected = [] async def _consume_tail() -> None: async for evt in registry.tail_events(since_sequence=0, poll_interval_seconds=0.01): collected.append(evt) if len(collected) >= 6: break await asyncio.wait_for(_consume_tail(), timeout=5.0) assert [e.event_type for e in collected] == [ "v1.package.created", "v1.retention.default_applied", "v1.file.ingested", "v1.file.ingested", "v1.file.ingested", "v1.package.finalized", ] # Snapshot materialised state. Row = tuple[object, ...] # noqa: N806 — local type alias, PascalCase by convention async def _snapshot() -> tuple[Row, list[Row], list[Row], list[Row]]: async with engine.connect() as conn: pkg = (await conn.execute(select(artifact_packages))).one() files = ( await conn.execute(select(artifact_files).order_by(artifact_files.c.relative_path)) ).all() locs = ( await conn.execute( select(storage_locations).order_by(storage_locations.c.artifact_file_id) ) ).all() rets = (await conn.execute(select(retention_state))).all() return ( tuple(pkg), [tuple(r) for r in files], [tuple(r) for r in locs], [tuple(r) for r in rets], ) pre = await _snapshot() await replay(engine, view_writer, reset=True) post = await _snapshot() assert pre == post, "replay must reproduce byte-identical materialised state"