Files
artifact-store/tests/integration/test_event_log.py
tegwick 847b146475 WP-0001-T013: registry orchestrator (library surface)
src/artifactstore/registry/__init__.py implements the Registry class with
six operations the HTTP API and CLI both consume:

* create_package(name, producer, subject, retention_class, actor, metadata?)
  -> UUID. Validates retention_class against the seed table; emits
  v1.package.created with CBOR payload; applies view in same transaction.
* ingest_file(package_id, relative_path, media_type, stream, actor) -> UUID.
  Validates the package is in 'created' status and rejects duplicate
  relative_path. Calls dataplane.ingest_stream (which dual-hashes and writes
  to the backend). Emits v1.file.ingested whose payload carries the file
  metadata + storage receipt + deterministic storage_location_id so replay
  reproduces UUIDs. View handler in events/views.py inserts artifact_files
  + storage_locations and bumps last_event_sequence on the package.
* finalize_package(package_id, actor) -> ContentAddress. Queries the views
  to build a Manifest dataclass, encodes it as canonical CBOR, computes the
  BLAKE3 content address, and writes v1.package.finalized whose payload IS
  the canonical CBOR manifest. The view handler now records
  manifest_digest = event.payload_digest (BLAKE3 of the manifest), not a
  separate field parsed from the payload.
* get_manifest_bytes(package_id, format='cbor'|'json') -> bytes. Reads the
  finalize event payload (CBOR) and optionally projects to JCS.
* get_file(file_id) -> AsyncIterator[bytes]. Looks up the storage location
  and serves bytes via the data plane.
* tail_events(since_sequence, poll_interval_seconds) -> AsyncIterator[Event].
  Pass-through to events.tail.

src/artifactstore/events/views.py:
- New v1.file.ingested handler.
- v1.package.finalized handler updated: manifest_digest now derived from
  event.payload_digest (= BLAKE3 of the canonical CBOR manifest payload).
- All inserts now pass created_at=event.created_at explicitly so replay
  produces byte-identical materialised state (server_default=now() was
  firing fresh on each replay insert).

tests/integration/test_registry.py (7 cases):
- Rejects unknown retention class.
- create_package writes the event and the package row.
- ingest_file writes file + storage_location, populates content_address
  with blake3 prefix.
- Duplicate relative_path raises DuplicateRelativePathError.
- ingest into unknown package raises PackageNotFoundError.
- Finalising twice raises IllegalPackageStateError.
- End-to-end: create + ingest 3 files + finalize + read manifest in CBOR
  and JSON + download each file with byte equality + tail 5 events + replay
  + assert byte-identical materialised state across pre and post snapshots.

tests/integration/test_event_log.py updated: the v1.package.finalized
replay test now uses the new payload semantics (payload is the canonical
CBOR manifest; manifest_digest = BLAKE3 of payload).

Gates: ruff clean, mypy --strict clean on 45 files, 77 tests pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-16 08:17:40 +02:00

241 lines
7.3 KiB
Python

"""Event log integration tests (ARTIFACT-STORE-WP-0001-T011)."""
from __future__ import annotations
import asyncio
import uuid
from collections.abc import AsyncIterator
from pathlib import Path
import cbor2
import pytest
import pytest_asyncio
from sqlalchemy import insert, select
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from artifactstore.db.schema import (
artifact_packages,
metadata,
retention_classes,
retention_state,
)
from artifactstore.db.seed import RETENTION_CLASS_SEEDS
from artifactstore.events import (
RegistryViewWriter,
fetch_since,
make_event,
replay,
tail,
write,
)
@pytest_asyncio.fixture
async def fresh_engine(tmp_path: Path) -> AsyncIterator[AsyncEngine]:
db_path = tmp_path / "events.db"
engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}")
async with engine.begin() as conn:
await conn.run_sync(metadata.create_all)
for seed in RETENTION_CLASS_SEEDS:
await conn.execute(insert(retention_classes).values(**seed))
yield engine
await engine.dispose()
def _package_created_payload(*, name: str = "p", retention_class: str = "raw-evidence") -> bytes:
return cbor2.dumps(
{
"name": name,
"producer": "test-producer",
"subject": "test-subject",
"retention_class": retention_class,
"metadata": {"k": "v"},
},
canonical=True,
)
async def test_write_assigns_monotonic_sequence(fresh_engine: AsyncEngine) -> None:
e1 = make_event(
event_type="v1.system.note",
subject_kind="system",
actor="t",
payload=b"hi",
)
e2 = make_event(
event_type="v1.system.note",
subject_kind="system",
actor="t",
payload=b"hello",
)
async with fresh_engine.begin() as conn:
r1 = await write(conn, e1)
r2 = await write(conn, e2)
assert r1.sequence == 1
assert r2.sequence == 2
assert r1.created_at is not None
assert r2.created_at is not None
async def test_fetch_since_returns_ordered_subset(fresh_engine: AsyncEngine) -> None:
async with fresh_engine.begin() as conn:
for i in range(5):
await write(
conn,
make_event(
event_type="v1.system.note",
subject_kind="system",
actor="t",
payload=f"n{i}".encode(),
),
)
async with fresh_engine.connect() as conn:
batch = await fetch_since(conn, since_sequence=2)
seqs = [e.sequence for e in batch]
assert seqs == [3, 4, 5]
async def test_tail_yields_events_then_breaks(fresh_engine: AsyncEngine) -> None:
pkg_id = uuid.uuid4()
async with fresh_engine.begin() as conn:
await write(
conn,
make_event(
event_type="v1.package.created",
subject_kind="package",
subject_id=pkg_id,
actor="ops",
payload=_package_created_payload(),
),
)
collected = []
async def consume() -> None:
async for evt in tail(fresh_engine, since_sequence=0, poll_interval_seconds=0.01):
collected.append(evt)
if len(collected) >= 1:
break
await asyncio.wait_for(consume(), timeout=3.0)
assert len(collected) == 1
assert collected[0].event_type == "v1.package.created"
assert collected[0].subject_id == pkg_id
async def test_replay_reproduces_direct_write_state(fresh_engine: AsyncEngine) -> None:
"""Direct path (write + apply) and replay path produce identical state."""
writer = RegistryViewWriter()
pkg_id = uuid.uuid4()
# Direct path: write event + apply in one transaction.
async with fresh_engine.begin() as conn:
evt = await write(
conn,
make_event(
event_type="v1.package.created",
subject_kind="package",
subject_id=pkg_id,
actor="ops",
payload=_package_created_payload(),
),
)
await writer.apply(conn, evt)
async with fresh_engine.connect() as conn:
pkg_a = (await conn.execute(select(artifact_packages))).one()
ret_a = (await conn.execute(select(retention_state))).one()
# Replay path: reset views and replay every event.
applied_seq = await replay(fresh_engine, writer, reset=True)
assert applied_seq == evt.sequence
async with fresh_engine.connect() as conn:
pkg_b = (await conn.execute(select(artifact_packages))).one()
ret_b = (await conn.execute(select(retention_state))).one()
# Materialised state must match across direct vs replay paths.
assert tuple(pkg_a) == tuple(pkg_b)
assert tuple(ret_a) == tuple(ret_b)
async def test_replay_handles_multiple_event_types(fresh_engine: AsyncEngine) -> None:
import blake3 as _blake3
writer = RegistryViewWriter()
pkg_id = uuid.uuid4()
async with fresh_engine.begin() as conn:
e_created = await write(
conn,
make_event(
event_type="v1.package.created",
subject_kind="package",
subject_id=pkg_id,
actor="ops",
payload=_package_created_payload(),
),
)
await writer.apply(conn, e_created)
# The finalize payload is the canonical CBOR manifest; the view
# writer records manifest_digest = BLAKE3(payload) (= event.payload_digest).
manifest_cbor = cbor2.dumps({"manifest_version": 1, "fake": "manifest"}, canonical=True)
e_finalized = await write(
conn,
make_event(
event_type="v1.package.finalized",
subject_kind="package",
subject_id=pkg_id,
actor="ops",
payload=manifest_cbor,
),
)
await writer.apply(conn, e_finalized)
expected_digest = _blake3.blake3(manifest_cbor).digest()
async with fresh_engine.connect() as conn:
pkg_a = (await conn.execute(select(artifact_packages))).one()
assert pkg_a.status == "finalized"
assert pkg_a.manifest_digest == expected_digest
last_seq = await replay(fresh_engine, writer, reset=True)
assert last_seq == e_finalized.sequence
async with fresh_engine.connect() as conn:
pkg_b = (await conn.execute(select(artifact_packages))).one()
assert tuple(pkg_a) == tuple(pkg_b)
async def test_unknown_event_type_is_tolerated(fresh_engine: AsyncEngine) -> None:
writer = RegistryViewWriter()
async with fresh_engine.begin() as conn:
evt = await write(
conn,
make_event(
event_type="v1.unknown.frob",
subject_kind="system",
actor="ops",
payload=cbor2.dumps({}, canonical=True),
),
)
await writer.apply(conn, evt) # must not raise
async def test_event_payload_digest_is_blake3_of_payload() -> None:
import blake3 as _blake3
payload = cbor2.dumps({"hello": "world"}, canonical=True)
evt = make_event(
event_type="v1.system.note",
subject_kind="system",
actor="t",
payload=payload,
)
assert evt.payload_digest == _blake3.blake3(payload).digest()
# Acknowledge pytest fixture as used.
_ = pytest