generated from coulomb/repo-seed
src/artifactstore/registry/__init__.py implements the Registry class with six operations the HTTP API and CLI both consume: * create_package(name, producer, subject, retention_class, actor, metadata?) -> UUID. Validates retention_class against the seed table; emits v1.package.created with CBOR payload; applies view in same transaction. * ingest_file(package_id, relative_path, media_type, stream, actor) -> UUID. Validates the package is in 'created' status and rejects duplicate relative_path. Calls dataplane.ingest_stream (which dual-hashes and writes to the backend). Emits v1.file.ingested whose payload carries the file metadata + storage receipt + deterministic storage_location_id so replay reproduces UUIDs. View handler in events/views.py inserts artifact_files + storage_locations and bumps last_event_sequence on the package. * finalize_package(package_id, actor) -> ContentAddress. Queries the views to build a Manifest dataclass, encodes it as canonical CBOR, computes the BLAKE3 content address, and writes v1.package.finalized whose payload IS the canonical CBOR manifest. The view handler now records manifest_digest = event.payload_digest (BLAKE3 of the manifest), not a separate field parsed from the payload. * get_manifest_bytes(package_id, format='cbor'|'json') -> bytes. Reads the finalize event payload (CBOR) and optionally projects to JCS. * get_file(file_id) -> AsyncIterator[bytes]. Looks up the storage location and serves bytes via the data plane. * tail_events(since_sequence, poll_interval_seconds) -> AsyncIterator[Event]. Pass-through to events.tail. src/artifactstore/events/views.py: - New v1.file.ingested handler. - v1.package.finalized handler updated: manifest_digest now derived from event.payload_digest (= BLAKE3 of the canonical CBOR manifest payload). - All inserts now pass created_at=event.created_at explicitly so replay produces byte-identical materialised state (server_default=now() was firing fresh on each replay insert). tests/integration/test_registry.py (7 cases): - Rejects unknown retention class. - create_package writes the event and the package row. - ingest_file writes file + storage_location, populates content_address with blake3 prefix. - Duplicate relative_path raises DuplicateRelativePathError. - ingest into unknown package raises PackageNotFoundError. - Finalising twice raises IllegalPackageStateError. - End-to-end: create + ingest 3 files + finalize + read manifest in CBOR and JSON + download each file with byte equality + tail 5 events + replay + assert byte-identical materialised state across pre and post snapshots. tests/integration/test_event_log.py updated: the v1.package.finalized replay test now uses the new payload semantics (payload is the canonical CBOR manifest; manifest_digest = BLAKE3 of payload). Gates: ruff clean, mypy --strict clean on 45 files, 77 tests pass. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
241 lines
7.3 KiB
Python
241 lines
7.3 KiB
Python
"""Event log integration tests (ARTIFACT-STORE-WP-0001-T011)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import uuid
|
|
from collections.abc import AsyncIterator
|
|
from pathlib import Path
|
|
|
|
import cbor2
|
|
import pytest
|
|
import pytest_asyncio
|
|
from sqlalchemy import insert, select
|
|
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
|
|
|
|
from artifactstore.db.schema import (
|
|
artifact_packages,
|
|
metadata,
|
|
retention_classes,
|
|
retention_state,
|
|
)
|
|
from artifactstore.db.seed import RETENTION_CLASS_SEEDS
|
|
from artifactstore.events import (
|
|
RegistryViewWriter,
|
|
fetch_since,
|
|
make_event,
|
|
replay,
|
|
tail,
|
|
write,
|
|
)
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def fresh_engine(tmp_path: Path) -> AsyncIterator[AsyncEngine]:
|
|
db_path = tmp_path / "events.db"
|
|
engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}")
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(metadata.create_all)
|
|
for seed in RETENTION_CLASS_SEEDS:
|
|
await conn.execute(insert(retention_classes).values(**seed))
|
|
yield engine
|
|
await engine.dispose()
|
|
|
|
|
|
def _package_created_payload(*, name: str = "p", retention_class: str = "raw-evidence") -> bytes:
|
|
return cbor2.dumps(
|
|
{
|
|
"name": name,
|
|
"producer": "test-producer",
|
|
"subject": "test-subject",
|
|
"retention_class": retention_class,
|
|
"metadata": {"k": "v"},
|
|
},
|
|
canonical=True,
|
|
)
|
|
|
|
|
|
async def test_write_assigns_monotonic_sequence(fresh_engine: AsyncEngine) -> None:
|
|
e1 = make_event(
|
|
event_type="v1.system.note",
|
|
subject_kind="system",
|
|
actor="t",
|
|
payload=b"hi",
|
|
)
|
|
e2 = make_event(
|
|
event_type="v1.system.note",
|
|
subject_kind="system",
|
|
actor="t",
|
|
payload=b"hello",
|
|
)
|
|
async with fresh_engine.begin() as conn:
|
|
r1 = await write(conn, e1)
|
|
r2 = await write(conn, e2)
|
|
assert r1.sequence == 1
|
|
assert r2.sequence == 2
|
|
assert r1.created_at is not None
|
|
assert r2.created_at is not None
|
|
|
|
|
|
async def test_fetch_since_returns_ordered_subset(fresh_engine: AsyncEngine) -> None:
|
|
async with fresh_engine.begin() as conn:
|
|
for i in range(5):
|
|
await write(
|
|
conn,
|
|
make_event(
|
|
event_type="v1.system.note",
|
|
subject_kind="system",
|
|
actor="t",
|
|
payload=f"n{i}".encode(),
|
|
),
|
|
)
|
|
async with fresh_engine.connect() as conn:
|
|
batch = await fetch_since(conn, since_sequence=2)
|
|
seqs = [e.sequence for e in batch]
|
|
assert seqs == [3, 4, 5]
|
|
|
|
|
|
async def test_tail_yields_events_then_breaks(fresh_engine: AsyncEngine) -> None:
|
|
pkg_id = uuid.uuid4()
|
|
async with fresh_engine.begin() as conn:
|
|
await write(
|
|
conn,
|
|
make_event(
|
|
event_type="v1.package.created",
|
|
subject_kind="package",
|
|
subject_id=pkg_id,
|
|
actor="ops",
|
|
payload=_package_created_payload(),
|
|
),
|
|
)
|
|
|
|
collected = []
|
|
|
|
async def consume() -> None:
|
|
async for evt in tail(fresh_engine, since_sequence=0, poll_interval_seconds=0.01):
|
|
collected.append(evt)
|
|
if len(collected) >= 1:
|
|
break
|
|
|
|
await asyncio.wait_for(consume(), timeout=3.0)
|
|
assert len(collected) == 1
|
|
assert collected[0].event_type == "v1.package.created"
|
|
assert collected[0].subject_id == pkg_id
|
|
|
|
|
|
async def test_replay_reproduces_direct_write_state(fresh_engine: AsyncEngine) -> None:
|
|
"""Direct path (write + apply) and replay path produce identical state."""
|
|
writer = RegistryViewWriter()
|
|
pkg_id = uuid.uuid4()
|
|
|
|
# Direct path: write event + apply in one transaction.
|
|
async with fresh_engine.begin() as conn:
|
|
evt = await write(
|
|
conn,
|
|
make_event(
|
|
event_type="v1.package.created",
|
|
subject_kind="package",
|
|
subject_id=pkg_id,
|
|
actor="ops",
|
|
payload=_package_created_payload(),
|
|
),
|
|
)
|
|
await writer.apply(conn, evt)
|
|
|
|
async with fresh_engine.connect() as conn:
|
|
pkg_a = (await conn.execute(select(artifact_packages))).one()
|
|
ret_a = (await conn.execute(select(retention_state))).one()
|
|
|
|
# Replay path: reset views and replay every event.
|
|
applied_seq = await replay(fresh_engine, writer, reset=True)
|
|
assert applied_seq == evt.sequence
|
|
|
|
async with fresh_engine.connect() as conn:
|
|
pkg_b = (await conn.execute(select(artifact_packages))).one()
|
|
ret_b = (await conn.execute(select(retention_state))).one()
|
|
|
|
# Materialised state must match across direct vs replay paths.
|
|
assert tuple(pkg_a) == tuple(pkg_b)
|
|
assert tuple(ret_a) == tuple(ret_b)
|
|
|
|
|
|
async def test_replay_handles_multiple_event_types(fresh_engine: AsyncEngine) -> None:
|
|
import blake3 as _blake3
|
|
|
|
writer = RegistryViewWriter()
|
|
pkg_id = uuid.uuid4()
|
|
|
|
async with fresh_engine.begin() as conn:
|
|
e_created = await write(
|
|
conn,
|
|
make_event(
|
|
event_type="v1.package.created",
|
|
subject_kind="package",
|
|
subject_id=pkg_id,
|
|
actor="ops",
|
|
payload=_package_created_payload(),
|
|
),
|
|
)
|
|
await writer.apply(conn, e_created)
|
|
|
|
# The finalize payload is the canonical CBOR manifest; the view
|
|
# writer records manifest_digest = BLAKE3(payload) (= event.payload_digest).
|
|
manifest_cbor = cbor2.dumps({"manifest_version": 1, "fake": "manifest"}, canonical=True)
|
|
e_finalized = await write(
|
|
conn,
|
|
make_event(
|
|
event_type="v1.package.finalized",
|
|
subject_kind="package",
|
|
subject_id=pkg_id,
|
|
actor="ops",
|
|
payload=manifest_cbor,
|
|
),
|
|
)
|
|
await writer.apply(conn, e_finalized)
|
|
|
|
expected_digest = _blake3.blake3(manifest_cbor).digest()
|
|
|
|
async with fresh_engine.connect() as conn:
|
|
pkg_a = (await conn.execute(select(artifact_packages))).one()
|
|
assert pkg_a.status == "finalized"
|
|
assert pkg_a.manifest_digest == expected_digest
|
|
|
|
last_seq = await replay(fresh_engine, writer, reset=True)
|
|
assert last_seq == e_finalized.sequence
|
|
|
|
async with fresh_engine.connect() as conn:
|
|
pkg_b = (await conn.execute(select(artifact_packages))).one()
|
|
assert tuple(pkg_a) == tuple(pkg_b)
|
|
|
|
|
|
async def test_unknown_event_type_is_tolerated(fresh_engine: AsyncEngine) -> None:
|
|
writer = RegistryViewWriter()
|
|
async with fresh_engine.begin() as conn:
|
|
evt = await write(
|
|
conn,
|
|
make_event(
|
|
event_type="v1.unknown.frob",
|
|
subject_kind="system",
|
|
actor="ops",
|
|
payload=cbor2.dumps({}, canonical=True),
|
|
),
|
|
)
|
|
await writer.apply(conn, evt) # must not raise
|
|
|
|
|
|
async def test_event_payload_digest_is_blake3_of_payload() -> None:
|
|
import blake3 as _blake3
|
|
|
|
payload = cbor2.dumps({"hello": "world"}, canonical=True)
|
|
evt = make_event(
|
|
event_type="v1.system.note",
|
|
subject_kind="system",
|
|
actor="t",
|
|
payload=payload,
|
|
)
|
|
assert evt.payload_digest == _blake3.blake3(payload).digest()
|
|
|
|
|
|
# Acknowledge pytest fixture as used.
|
|
_ = pytest
|