Files
artifact-store/tests/integration/test_registry.py

293 lines
8.9 KiB
Python

"""End-to-end Registry orchestrator tests (ARTIFACT-STORE-WP-0001-T013)."""
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator
from pathlib import Path
import pytest
import pytest_asyncio
from sqlalchemy import insert, select
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from artifactstore.dataplane import InProcessDataPlane
from artifactstore.db.schema import (
artifact_files,
artifact_packages,
metadata,
retention_classes,
retention_state,
storage_locations,
)
from artifactstore.db.seed import RETENTION_CLASS_SEEDS
from artifactstore.events import RegistryViewWriter, replay
from artifactstore.manifest import decode as manifest_decode
from artifactstore.registry import (
DuplicateRelativePathError,
IllegalPackageStateError,
PackageNotFoundError,
Registry,
)
from artifactstore.storage import LocalBackend
@pytest_asyncio.fixture
async def engine(tmp_path: Path) -> AsyncIterator[AsyncEngine]:
db_path = tmp_path / "registry.db"
eng = create_async_engine(f"sqlite+aiosqlite:///{db_path}")
async with eng.begin() as conn:
await conn.run_sync(metadata.create_all)
for seed in RETENTION_CLASS_SEEDS:
await conn.execute(insert(retention_classes).values(**seed))
yield eng
await eng.dispose()
@pytest.fixture
def backend(tmp_path: Path) -> LocalBackend:
return LocalBackend(tmp_path / "store", backend_id="local")
@pytest.fixture
def view_writer() -> RegistryViewWriter:
return RegistryViewWriter()
@pytest_asyncio.fixture
async def registry(
engine: AsyncEngine,
backend: LocalBackend,
view_writer: RegistryViewWriter,
) -> Registry:
dp = InProcessDataPlane(backend)
return Registry(engine, dp, view_writer)
async def _stream(data: bytes) -> AsyncIterator[bytes]:
yield data
async def _consume(it: AsyncIterator[bytes]) -> bytes:
out = bytearray()
async for chunk in it:
out.extend(chunk)
return bytes(out)
async def test_create_package_rejects_unknown_retention_class(registry: Registry) -> None:
with pytest.raises(ValueError, match="unknown retention class"):
await registry.create_package(
name="x",
producer="p",
subject="s",
retention_class="does-not-exist",
actor="ops",
)
async def test_create_package_writes_event_and_view(
registry: Registry, engine: AsyncEngine
) -> None:
pkg_id = await registry.create_package(
name="p",
producer="prod",
subject="sub",
retention_class="raw-evidence",
actor="ops",
)
async with engine.connect() as conn:
row = (
await conn.execute(select(artifact_packages).where(artifact_packages.c.id == pkg_id))
).one()
assert row.name == "p"
assert row.retention_class == "raw-evidence"
assert row.status == "created"
async def test_ingest_file_writes_event_and_view(registry: Registry, engine: AsyncEngine) -> None:
pkg_id = await registry.create_package(
name="p",
producer="prod",
subject="sub",
retention_class="raw-evidence",
actor="ops",
)
data = b"hello world" * 100
file_id = await registry.ingest_file(
pkg_id,
relative_path="x/y.txt",
media_type="text/plain",
stream=_stream(data),
actor="ops",
)
async with engine.connect() as conn:
file_row = (
await conn.execute(select(artifact_files).where(artifact_files.c.id == file_id))
).one()
loc_row = (
await conn.execute(
select(storage_locations).where(storage_locations.c.artifact_file_id == file_id)
)
).one()
assert file_row.size_bytes == len(data)
assert file_row.relative_path == "x/y.txt"
assert loc_row.backend_id == "local"
assert loc_row.content_address.startswith("blake3:")
async def test_ingest_file_rejects_duplicate_relative_path(registry: Registry) -> None:
pkg_id = await registry.create_package(
name="p",
producer="prod",
subject="sub",
retention_class="raw-evidence",
actor="ops",
)
await registry.ingest_file(
pkg_id,
relative_path="a.txt",
media_type="text/plain",
stream=_stream(b"one"),
actor="ops",
)
with pytest.raises(DuplicateRelativePathError):
await registry.ingest_file(
pkg_id,
relative_path="a.txt",
media_type="text/plain",
stream=_stream(b"two"),
actor="ops",
)
async def test_ingest_into_unknown_package_raises(registry: Registry) -> None:
import uuid
with pytest.raises(PackageNotFoundError):
await registry.ingest_file(
uuid.uuid4(),
relative_path="a.txt",
media_type="text/plain",
stream=_stream(b"x"),
actor="ops",
)
async def test_finalize_rejects_non_created_package(registry: Registry) -> None:
pkg_id = await registry.create_package(
name="p",
producer="prod",
subject="sub",
retention_class="raw-evidence",
actor="ops",
)
await registry.finalize_package(pkg_id, actor="ops")
with pytest.raises(IllegalPackageStateError):
await registry.finalize_package(pkg_id, actor="ops")
async def test_end_to_end_ingest_finalize_replay(
registry: Registry,
engine: AsyncEngine,
view_writer: RegistryViewWriter,
) -> None:
"""Three-file package: create, ingest, finalize, retrieve manifest, get
every file back, tail events, and verify that replaying the events from
scratch reproduces the materialised view state byte-for-byte."""
pkg_id = await registry.create_package(
name="e2e",
producer="test",
subject="kontextual-engine",
retention_class="raw-evidence",
actor="ops",
metadata={"run_id": "r-1"},
)
payloads = [f"file content {i}".encode() * 50 for i in range(3)]
file_ids = []
for i, data in enumerate(payloads):
fid = await registry.ingest_file(
pkg_id,
relative_path=f"reports/{i}.txt",
media_type="text/plain",
stream=_stream(data),
actor="ops",
)
file_ids.append(fid)
manifest_addr = await registry.finalize_package(pkg_id, actor="ops")
assert str(manifest_addr).startswith("blake3:")
cbor_bytes = await registry.get_manifest_bytes(pkg_id, format="cbor")
manifest = manifest_decode(cbor_bytes)
assert manifest.manifest_version == 1
assert manifest.package.id == str(pkg_id)
assert manifest.package.status == "finalized"
assert len(manifest.files) == 3
assert [f.relative_path for f in manifest.files] == [
"reports/0.txt",
"reports/1.txt",
"reports/2.txt",
]
assert len(manifest.storage_receipts) == 3
json_bytes = await registry.get_manifest_bytes(pkg_id, format="json")
assert json_bytes.startswith(b"{")
assert b'"manifest_version":1' in json_bytes
# Download each file and verify byte equality.
for fid, expected in zip(file_ids, payloads, strict=True):
stream = await registry.get_file(fid)
assert await _consume(stream) == expected
# Tail events: 1 created + 1 default retention + 3 ingested + 1 finalized = 6.
collected = []
async def _consume_tail() -> None:
async for evt in registry.tail_events(since_sequence=0, poll_interval_seconds=0.01):
collected.append(evt)
if len(collected) >= 6:
break
await asyncio.wait_for(_consume_tail(), timeout=5.0)
assert [e.event_type for e in collected] == [
"v1.package.created",
"v1.retention.default_applied",
"v1.file.ingested",
"v1.file.ingested",
"v1.file.ingested",
"v1.package.finalized",
]
# Snapshot materialised state.
Row = tuple[object, ...] # noqa: N806 — local type alias, PascalCase by convention
async def _snapshot() -> tuple[Row, list[Row], list[Row], list[Row]]:
async with engine.connect() as conn:
pkg = (await conn.execute(select(artifact_packages))).one()
files = (
await conn.execute(select(artifact_files).order_by(artifact_files.c.relative_path))
).all()
locs = (
await conn.execute(
select(storage_locations).order_by(storage_locations.c.artifact_file_id)
)
).all()
rets = (await conn.execute(select(retention_state))).all()
return (
tuple(pkg),
[tuple(r) for r in files],
[tuple(r) for r in locs],
[tuple(r) for r in rets],
)
pre = await _snapshot()
await replay(engine, view_writer, reset=True)
post = await _snapshot()
assert pre == post, "replay must reproduce byte-identical materialised state"