generated from coulomb/repo-seed
src/artifactstore/events/:
- model.py: Event frozen dataclass (event_type, subject_kind, subject_id,
actor, payload, payload_digest; sequence + created_at populated by the
DB on write). make_event() helper computes payload_digest as raw BLAKE3
(32 bytes) of payload. ViewWriter Protocol with reset() + apply().
- log.py:
* write(connection, event) — inserts one row in the caller's transaction
and returns Event with sequence + created_at populated via RETURNING.
* fetch_since(connection, since_sequence, limit) — read events after a
cursor in order.
* tail(engine, since_sequence) — async-iterator long-poll over the log;
SQLite uses interval polling, PG LISTEN/NOTIFY is a future workplan.
* replay(engine, view_writer, reset=True) — drains the event log through
a ViewWriter inside one transaction; returns the highest sequence
applied.
- views.py: RegistryViewWriter — canonical event handlers shared by direct
write and replay paths. Ships handlers for v1.package.created (inserts
artifact_packages + retention_state) and v1.package.finalized (updates
status, finalized_at, manifest_digest). Unknown event types tolerated;
additional handlers register here as later tasks land.
src/artifactstore/db/schema.py: events.sequence type is now
BigInteger().with_variant(Integer(), 'sqlite') so SQLite's autoincrement
(INTEGER PRIMARY KEY rowid alias) works while PostgreSQL keeps BIGSERIAL.
tests/integration/test_event_log.py (6 cases):
- write() assigns monotonic sequence numbers (1, 2, ...) and a created_at.
- fetch_since(since_sequence=2) returns the ordered tail.
- tail() yields events and exits cleanly on consumer break.
- Direct write path (write + apply) and replay path produce byte-identical
materialised state — the key ADR-0002 invariant.
- Replay handles multiple event types (package.created -> finalized).
- Unknown event types are tolerated (no-op apply).
- payload_digest equals BLAKE3 of payload.
Gates: ruff clean, mypy --strict clean on 36 files, 45 tests pass.
make migrate-fresh end-to-end ok.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
237 lines
7.1 KiB
Python
237 lines
7.1 KiB
Python
"""Event log integration tests (ARTIFACT-STORE-WP-0001-T011)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import uuid
|
|
from collections.abc import AsyncIterator
|
|
from pathlib import Path
|
|
|
|
import cbor2
|
|
import pytest
|
|
import pytest_asyncio
|
|
from sqlalchemy import insert, select
|
|
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
|
|
|
|
from artifactstore.db.schema import (
|
|
artifact_packages,
|
|
metadata,
|
|
retention_classes,
|
|
retention_state,
|
|
)
|
|
from artifactstore.db.seed import RETENTION_CLASS_SEEDS
|
|
from artifactstore.events import (
|
|
RegistryViewWriter,
|
|
fetch_since,
|
|
make_event,
|
|
replay,
|
|
tail,
|
|
write,
|
|
)
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def fresh_engine(tmp_path: Path) -> AsyncIterator[AsyncEngine]:
|
|
db_path = tmp_path / "events.db"
|
|
engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}")
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(metadata.create_all)
|
|
for seed in RETENTION_CLASS_SEEDS:
|
|
await conn.execute(insert(retention_classes).values(**seed))
|
|
yield engine
|
|
await engine.dispose()
|
|
|
|
|
|
def _package_created_payload(*, name: str = "p", retention_class: str = "raw-evidence") -> bytes:
|
|
return cbor2.dumps(
|
|
{
|
|
"name": name,
|
|
"producer": "test-producer",
|
|
"subject": "test-subject",
|
|
"retention_class": retention_class,
|
|
"metadata": {"k": "v"},
|
|
},
|
|
canonical=True,
|
|
)
|
|
|
|
|
|
async def test_write_assigns_monotonic_sequence(fresh_engine: AsyncEngine) -> None:
|
|
e1 = make_event(
|
|
event_type="v1.system.note",
|
|
subject_kind="system",
|
|
actor="t",
|
|
payload=b"hi",
|
|
)
|
|
e2 = make_event(
|
|
event_type="v1.system.note",
|
|
subject_kind="system",
|
|
actor="t",
|
|
payload=b"hello",
|
|
)
|
|
async with fresh_engine.begin() as conn:
|
|
r1 = await write(conn, e1)
|
|
r2 = await write(conn, e2)
|
|
assert r1.sequence == 1
|
|
assert r2.sequence == 2
|
|
assert r1.created_at is not None
|
|
assert r2.created_at is not None
|
|
|
|
|
|
async def test_fetch_since_returns_ordered_subset(fresh_engine: AsyncEngine) -> None:
|
|
async with fresh_engine.begin() as conn:
|
|
for i in range(5):
|
|
await write(
|
|
conn,
|
|
make_event(
|
|
event_type="v1.system.note",
|
|
subject_kind="system",
|
|
actor="t",
|
|
payload=f"n{i}".encode(),
|
|
),
|
|
)
|
|
async with fresh_engine.connect() as conn:
|
|
batch = await fetch_since(conn, since_sequence=2)
|
|
seqs = [e.sequence for e in batch]
|
|
assert seqs == [3, 4, 5]
|
|
|
|
|
|
async def test_tail_yields_events_then_breaks(fresh_engine: AsyncEngine) -> None:
|
|
pkg_id = uuid.uuid4()
|
|
async with fresh_engine.begin() as conn:
|
|
await write(
|
|
conn,
|
|
make_event(
|
|
event_type="v1.package.created",
|
|
subject_kind="package",
|
|
subject_id=pkg_id,
|
|
actor="ops",
|
|
payload=_package_created_payload(),
|
|
),
|
|
)
|
|
|
|
collected = []
|
|
|
|
async def consume() -> None:
|
|
async for evt in tail(fresh_engine, since_sequence=0, poll_interval_seconds=0.01):
|
|
collected.append(evt)
|
|
if len(collected) >= 1:
|
|
break
|
|
|
|
await asyncio.wait_for(consume(), timeout=3.0)
|
|
assert len(collected) == 1
|
|
assert collected[0].event_type == "v1.package.created"
|
|
assert collected[0].subject_id == pkg_id
|
|
|
|
|
|
async def test_replay_reproduces_direct_write_state(fresh_engine: AsyncEngine) -> None:
|
|
"""Direct path (write + apply) and replay path produce identical state."""
|
|
writer = RegistryViewWriter()
|
|
pkg_id = uuid.uuid4()
|
|
|
|
# Direct path: write event + apply in one transaction.
|
|
async with fresh_engine.begin() as conn:
|
|
evt = await write(
|
|
conn,
|
|
make_event(
|
|
event_type="v1.package.created",
|
|
subject_kind="package",
|
|
subject_id=pkg_id,
|
|
actor="ops",
|
|
payload=_package_created_payload(),
|
|
),
|
|
)
|
|
await writer.apply(conn, evt)
|
|
|
|
async with fresh_engine.connect() as conn:
|
|
pkg_a = (await conn.execute(select(artifact_packages))).one()
|
|
ret_a = (await conn.execute(select(retention_state))).one()
|
|
|
|
# Replay path: reset views and replay every event.
|
|
applied_seq = await replay(fresh_engine, writer, reset=True)
|
|
assert applied_seq == evt.sequence
|
|
|
|
async with fresh_engine.connect() as conn:
|
|
pkg_b = (await conn.execute(select(artifact_packages))).one()
|
|
ret_b = (await conn.execute(select(retention_state))).one()
|
|
|
|
# Materialised state must match across direct vs replay paths.
|
|
assert tuple(pkg_a) == tuple(pkg_b)
|
|
assert tuple(ret_a) == tuple(ret_b)
|
|
|
|
|
|
async def test_replay_handles_multiple_event_types(fresh_engine: AsyncEngine) -> None:
|
|
writer = RegistryViewWriter()
|
|
pkg_id = uuid.uuid4()
|
|
|
|
async with fresh_engine.begin() as conn:
|
|
e_created = await write(
|
|
conn,
|
|
make_event(
|
|
event_type="v1.package.created",
|
|
subject_kind="package",
|
|
subject_id=pkg_id,
|
|
actor="ops",
|
|
payload=_package_created_payload(),
|
|
),
|
|
)
|
|
await writer.apply(conn, e_created)
|
|
|
|
e_finalized = await write(
|
|
conn,
|
|
make_event(
|
|
event_type="v1.package.finalized",
|
|
subject_kind="package",
|
|
subject_id=pkg_id,
|
|
actor="ops",
|
|
payload=cbor2.dumps(
|
|
{"manifest_digest_hex": "ab" * 32},
|
|
canonical=True,
|
|
),
|
|
),
|
|
)
|
|
await writer.apply(conn, e_finalized)
|
|
|
|
async with fresh_engine.connect() as conn:
|
|
pkg_a = (await conn.execute(select(artifact_packages))).one()
|
|
assert pkg_a.status == "finalized"
|
|
assert pkg_a.manifest_digest == bytes.fromhex("ab" * 32)
|
|
|
|
last_seq = await replay(fresh_engine, writer, reset=True)
|
|
assert last_seq == e_finalized.sequence
|
|
|
|
async with fresh_engine.connect() as conn:
|
|
pkg_b = (await conn.execute(select(artifact_packages))).one()
|
|
assert tuple(pkg_a) == tuple(pkg_b)
|
|
|
|
|
|
async def test_unknown_event_type_is_tolerated(fresh_engine: AsyncEngine) -> None:
|
|
writer = RegistryViewWriter()
|
|
async with fresh_engine.begin() as conn:
|
|
evt = await write(
|
|
conn,
|
|
make_event(
|
|
event_type="v1.unknown.frob",
|
|
subject_kind="system",
|
|
actor="ops",
|
|
payload=cbor2.dumps({}, canonical=True),
|
|
),
|
|
)
|
|
await writer.apply(conn, evt) # must not raise
|
|
|
|
|
|
async def test_event_payload_digest_is_blake3_of_payload() -> None:
|
|
import blake3 as _blake3
|
|
|
|
payload = cbor2.dumps({"hello": "world"}, canonical=True)
|
|
evt = make_event(
|
|
event_type="v1.system.note",
|
|
subject_kind="system",
|
|
actor="t",
|
|
payload=payload,
|
|
)
|
|
assert evt.payload_digest == _blake3.blake3(payload).digest()
|
|
|
|
|
|
# Acknowledge pytest fixture as used.
|
|
_ = pytest
|