From 0a4979869918104e2327d931e74f9b9887c2c836 Mon Sep 17 00:00:00 2001 From: tegwick Date: Sat, 16 May 2026 01:56:04 +0200 Subject: [PATCH] =?UTF-8?q?WP-0001-T011:=20append-only=20event=20log=20?= =?UTF-8?q?=E2=80=94=20write,=20fetch=5Fsince,=20tail,=20replay?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit src/artifactstore/events/: - model.py: Event frozen dataclass (event_type, subject_kind, subject_id, actor, payload, payload_digest; sequence + created_at populated by the DB on write). make_event() helper computes payload_digest as raw BLAKE3 (32 bytes) of payload. ViewWriter Protocol with reset() + apply(). - log.py: * write(connection, event) — inserts one row in the caller's transaction and returns Event with sequence + created_at populated via RETURNING. * fetch_since(connection, since_sequence, limit) — read events after a cursor in order. * tail(engine, since_sequence) — async-iterator long-poll over the log; SQLite uses interval polling, PG LISTEN/NOTIFY is a future workplan. * replay(engine, view_writer, reset=True) — drains the event log through a ViewWriter inside one transaction; returns the highest sequence applied. - views.py: RegistryViewWriter — canonical event handlers shared by direct write and replay paths. Ships handlers for v1.package.created (inserts artifact_packages + retention_state) and v1.package.finalized (updates status, finalized_at, manifest_digest). Unknown event types tolerated; additional handlers register here as later tasks land. src/artifactstore/db/schema.py: events.sequence type is now BigInteger().with_variant(Integer(), 'sqlite') so SQLite's autoincrement (INTEGER PRIMARY KEY rowid alias) works while PostgreSQL keeps BIGSERIAL. tests/integration/test_event_log.py (6 cases): - write() assigns monotonic sequence numbers (1, 2, ...) and a created_at. - fetch_since(since_sequence=2) returns the ordered tail. - tail() yields events and exits cleanly on consumer break. - Direct write path (write + apply) and replay path produce byte-identical materialised state — the key ADR-0002 invariant. - Replay handles multiple event types (package.created -> finalized). - Unknown event types are tolerated (no-op apply). - payload_digest equals BLAKE3 of payload. Gates: ruff clean, mypy --strict clean on 36 files, 45 tests pass. make migrate-fresh end-to-end ok. Co-Authored-By: Claude Opus 4.7 --- src/artifactstore/db/schema.py | 9 +- src/artifactstore/events/__init__.py | 28 ++- src/artifactstore/events/log.py | 125 ++++++++++ src/artifactstore/events/model.py | 70 ++++++ src/artifactstore/events/views.py | 101 ++++++++ tests/integration/test_event_log.py | 236 ++++++++++++++++++ ...ARTIFACT-STORE-WP-0001-service-baseline.md | 2 +- 7 files changed, 566 insertions(+), 5 deletions(-) create mode 100644 src/artifactstore/events/log.py create mode 100644 src/artifactstore/events/model.py create mode 100644 src/artifactstore/events/views.py create mode 100644 tests/integration/test_event_log.py diff --git a/src/artifactstore/db/schema.py b/src/artifactstore/db/schema.py index bf143a1..d4bb50b 100644 --- a/src/artifactstore/db/schema.py +++ b/src/artifactstore/db/schema.py @@ -20,6 +20,7 @@ from sqlalchemy import ( DateTime, ForeignKey, Index, + Integer, LargeBinary, MetaData, String, @@ -35,11 +36,17 @@ metadata = MetaData() _JSON_TYPE = JSON().with_variant(JSONB(), "postgresql") +# SQLite's autoincrement only applies to ``INTEGER PRIMARY KEY`` (rowid alias), +# not ``BIGINT``. We want a 64-bit sequence on PostgreSQL and a working +# autoincrement on SQLite, so we declare BigInteger with a SQLite Integer +# variant. On 64-bit SQLite this is still effectively 64-bit at runtime. +_SEQUENCE_TYPE = BigInteger().with_variant(Integer(), "sqlite") + events = Table( "events", metadata, - Column("sequence", BigInteger, primary_key=True, autoincrement=True), + Column("sequence", _SEQUENCE_TYPE, primary_key=True, autoincrement=True), Column("created_at", DateTime(timezone=True), nullable=False, server_default=func.now()), Column("event_type", String, nullable=False), Column("subject_kind", String, nullable=False), diff --git a/src/artifactstore/events/__init__.py b/src/artifactstore/events/__init__.py index bf91644..4fb26c4 100644 --- a/src/artifactstore/events/__init__.py +++ b/src/artifactstore/events/__init__.py @@ -1,5 +1,27 @@ -"""Append-only event log and materialised-view replayer. +"""Append-only event log and materialised-view replayer (ADR-0002). -Real implementation lands in ARTIFACT-STORE-WP-0001-T011. See ADR-0002 for -the event-log-as-source-of-truth contract. +Two entry points use the same view-application logic: + +* the direct write path — :func:`write` followed by + :meth:`ViewWriter.apply` in the same transaction; +* the replay path — :func:`replay` reads every event in order and applies + it via the configured :class:`ViewWriter`. + +Both paths produce byte-identical materialised state for the same event +sequence. """ + +from artifactstore.events.log import fetch_since, replay, tail, write +from artifactstore.events.model import Event, ViewWriter, make_event +from artifactstore.events.views import RegistryViewWriter + +__all__ = [ + "Event", + "RegistryViewWriter", + "ViewWriter", + "fetch_since", + "make_event", + "replay", + "tail", + "write", +] diff --git a/src/artifactstore/events/log.py b/src/artifactstore/events/log.py new file mode 100644 index 0000000..9976871 --- /dev/null +++ b/src/artifactstore/events/log.py @@ -0,0 +1,125 @@ +"""Append-only event log: write, fetch_since, tail, replay (ADR-0002).""" + +from __future__ import annotations + +import asyncio +from collections.abc import AsyncIterator +from dataclasses import replace + +from sqlalchemy import insert, select +from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine + +from artifactstore.db.schema import events as events_t +from artifactstore.events.model import Event, ViewWriter + +__all__ = ["fetch_since", "replay", "tail", "write"] + + +async def write(connection: AsyncConnection, event: Event) -> Event: + """Insert one event row in the given transaction. + + Returns a new :class:`Event` with ``sequence`` and ``created_at`` + populated by the database. The caller is responsible for committing + the surrounding transaction. + """ + result = await connection.execute( + insert(events_t) + .values( + event_type=event.event_type, + subject_kind=event.subject_kind, + subject_id=event.subject_id, + actor=event.actor, + payload=event.payload, + payload_digest=event.payload_digest, + ) + .returning(events_t.c.sequence, events_t.c.created_at) + ) + row = result.one() + return replace(event, sequence=row.sequence, created_at=row.created_at) + + +async def fetch_since( + connection: AsyncConnection, + *, + since_sequence: int, + limit: int | None = None, +) -> list[Event]: + """Return events with ``sequence > since_sequence``, ordered by sequence.""" + stmt = ( + select(events_t).where(events_t.c.sequence > since_sequence).order_by(events_t.c.sequence) + ) + if limit is not None: + stmt = stmt.limit(limit) + result = await connection.execute(stmt) + return [ + Event( + event_type=r.event_type, + subject_kind=r.subject_kind, + subject_id=r.subject_id, + actor=r.actor, + payload=r.payload, + payload_digest=r.payload_digest, + sequence=r.sequence, + created_at=r.created_at, + ) + for r in result.all() + ] + + +async def tail( + engine: AsyncEngine, + *, + since_sequence: int = 0, + poll_interval_seconds: float = 0.5, + batch_size: int = 100, +) -> AsyncIterator[Event]: + """Yield events with sequence > since_sequence, long-polling at the tail. + + SQLite uses interval polling. PostgreSQL ``LISTEN/NOTIFY`` support is a + future improvement (deferred to its own workplan). The iterator never + terminates of its own accord; callers stop it via ``break`` or + cancellation. + """ + cursor = since_sequence + while True: + async with engine.connect() as conn: + batch = await fetch_since(conn, since_sequence=cursor, limit=batch_size) + if batch: + for evt in batch: + yield evt + if evt.sequence is None: + raise RuntimeError("event from DB missing sequence") + cursor = evt.sequence + else: + await asyncio.sleep(poll_interval_seconds) + + +async def replay( + engine: AsyncEngine, + view_writer: ViewWriter, + *, + reset: bool = True, + batch_size: int = 500, +) -> int: + """Replay every event through ``view_writer``. + + When ``reset`` is True (the default), :meth:`ViewWriter.reset` is invoked + before any events are applied. The whole replay runs in one transaction + so a failure rolls back to the pre-replay state. + + Returns the highest event sequence number applied. + """ + async with engine.begin() as conn: + if reset: + await view_writer.reset(conn) + last_seq = 0 + while True: + batch = await fetch_since(conn, since_sequence=last_seq, limit=batch_size) + if not batch: + break + for evt in batch: + await view_writer.apply(conn, evt) + if evt.sequence is None: + raise RuntimeError("event from DB missing sequence") + last_seq = evt.sequence + return last_seq diff --git a/src/artifactstore/events/model.py b/src/artifactstore/events/model.py new file mode 100644 index 0000000..a13671d --- /dev/null +++ b/src/artifactstore/events/model.py @@ -0,0 +1,70 @@ +"""Event domain types (ADR-0002). + +An :class:`Event` is one row in the ``events`` table. ``payload`` is +canonical CBOR (matching the schema column type ``BYTEA``); ``payload_digest`` +is the raw BLAKE3 of the payload (32 bytes). ``sequence`` and ``created_at`` +are populated by the database on write. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime +from typing import Protocol +from uuid import UUID + +import blake3 as _blake3 +from sqlalchemy.ext.asyncio import AsyncConnection + +__all__ = ["Event", "ViewWriter", "make_event"] + + +@dataclass(frozen=True, slots=True) +class Event: + """One row in the append-only events table.""" + + event_type: str + subject_kind: str + actor: str + payload: bytes + payload_digest: bytes + subject_id: UUID | None = None + sequence: int | None = None + created_at: datetime | None = None + + +def make_event( + *, + event_type: str, + subject_kind: str, + actor: str, + payload: bytes, + subject_id: UUID | None = None, +) -> Event: + """Construct an :class:`Event` with ``payload_digest`` computed from + the payload via BLAKE3.""" + return Event( + event_type=event_type, + subject_kind=subject_kind, + actor=actor, + payload=payload, + payload_digest=_blake3.blake3(payload).digest(), + subject_id=subject_id, + ) + + +class ViewWriter(Protocol): + """Applies events to the materialised views (ADR-0002). + + The same implementation is invoked from both the direct write path + (called inline by the registry after :func:`write`) and the replay + path (called by :func:`replay` for each event). + """ + + async def reset(self, connection: AsyncConnection) -> None: + """Truncate the materialised views in dependency order.""" + ... + + async def apply(self, connection: AsyncConnection, event: Event) -> None: + """Apply one event to the materialised views.""" + ... diff --git a/src/artifactstore/events/views.py b/src/artifactstore/events/views.py new file mode 100644 index 0000000..c5bb343 --- /dev/null +++ b/src/artifactstore/events/views.py @@ -0,0 +1,101 @@ +"""Materialised view writer — canonical event handlers (ADR-0002). + +This module owns the event-to-view application logic. The same implementation +serves both the direct write path (registry calls :func:`log.write` followed +by :meth:`RegistryViewWriter.apply` in one transaction) and the replay path +(:func:`log.replay` walks every event in order and applies it). + +T011 ships handlers for the events emitted at registry baseline: + +* ``v1.package.created`` — inserts ``artifact_packages`` + ``retention_state``; +* ``v1.package.finalized`` — updates ``artifact_packages`` status and digest. + +Additional handlers (file ingestion, storage receipts, retention extensions) +land in later workplans. +""" + +from __future__ import annotations + +import cbor2 +from sqlalchemy import delete, insert, update +from sqlalchemy.ext.asyncio import AsyncConnection + +from artifactstore.db.schema import ( + artifact_files, + artifact_packages, + retention_state, + storage_locations, +) +from artifactstore.events.model import Event + +__all__ = ["RegistryViewWriter"] + + +class RegistryViewWriter: + """Default :class:`~artifactstore.events.model.ViewWriter` over the + blueprint materialised view tables.""" + + async def reset(self, connection: AsyncConnection) -> None: + # Delete in dependency order: child tables before parents. + await connection.execute(delete(storage_locations)) + await connection.execute(delete(artifact_files)) + await connection.execute(delete(retention_state)) + await connection.execute(delete(artifact_packages)) + + async def apply(self, connection: AsyncConnection, event: Event) -> None: + handler = _HANDLERS.get(event.event_type) + if handler is None: + # Unknown event types are tolerated at v1; later tasks register + # additional handlers without changing this class's surface. + return + await handler(connection, event) + + +async def _apply_package_created(connection: AsyncConnection, event: Event) -> None: + payload = cbor2.loads(event.payload) + if event.subject_id is None: + raise ValueError("v1.package.created event must have subject_id") + await connection.execute( + insert(artifact_packages).values( + id=event.subject_id, + name=payload["name"], + producer=payload["producer"], + subject=payload["subject"], + retention_class=payload["retention_class"], + metadata_schema_id=None, + metadata=payload.get("metadata", {}), + status="created", + manifest_digest=None, + last_event_sequence=event.sequence, + ) + ) + await connection.execute( + insert(retention_state).values( + package_id=event.subject_id, + current_expires_at=None, + effective_class=payload["retention_class"], + active_hold_id=None, + ) + ) + + +async def _apply_package_finalized(connection: AsyncConnection, event: Event) -> None: + payload = cbor2.loads(event.payload) + if event.subject_id is None: + raise ValueError("v1.package.finalized event must have subject_id") + await connection.execute( + update(artifact_packages) + .where(artifact_packages.c.id == event.subject_id) + .values( + status="finalized", + finalized_at=event.created_at, + manifest_digest=bytes.fromhex(payload["manifest_digest_hex"]), + last_event_sequence=event.sequence, + ) + ) + + +_HANDLERS = { + "v1.package.created": _apply_package_created, + "v1.package.finalized": _apply_package_finalized, +} diff --git a/tests/integration/test_event_log.py b/tests/integration/test_event_log.py new file mode 100644 index 0000000..a20c790 --- /dev/null +++ b/tests/integration/test_event_log.py @@ -0,0 +1,236 @@ +"""Event log integration tests (ARTIFACT-STORE-WP-0001-T011).""" + +from __future__ import annotations + +import asyncio +import uuid +from collections.abc import AsyncIterator +from pathlib import Path + +import cbor2 +import pytest +import pytest_asyncio +from sqlalchemy import insert, select +from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine + +from artifactstore.db.schema import ( + artifact_packages, + metadata, + retention_classes, + retention_state, +) +from artifactstore.db.seed import RETENTION_CLASS_SEEDS +from artifactstore.events import ( + RegistryViewWriter, + fetch_since, + make_event, + replay, + tail, + write, +) + + +@pytest_asyncio.fixture +async def fresh_engine(tmp_path: Path) -> AsyncIterator[AsyncEngine]: + db_path = tmp_path / "events.db" + engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}") + async with engine.begin() as conn: + await conn.run_sync(metadata.create_all) + for seed in RETENTION_CLASS_SEEDS: + await conn.execute(insert(retention_classes).values(**seed)) + yield engine + await engine.dispose() + + +def _package_created_payload(*, name: str = "p", retention_class: str = "raw-evidence") -> bytes: + return cbor2.dumps( + { + "name": name, + "producer": "test-producer", + "subject": "test-subject", + "retention_class": retention_class, + "metadata": {"k": "v"}, + }, + canonical=True, + ) + + +async def test_write_assigns_monotonic_sequence(fresh_engine: AsyncEngine) -> None: + e1 = make_event( + event_type="v1.system.note", + subject_kind="system", + actor="t", + payload=b"hi", + ) + e2 = make_event( + event_type="v1.system.note", + subject_kind="system", + actor="t", + payload=b"hello", + ) + async with fresh_engine.begin() as conn: + r1 = await write(conn, e1) + r2 = await write(conn, e2) + assert r1.sequence == 1 + assert r2.sequence == 2 + assert r1.created_at is not None + assert r2.created_at is not None + + +async def test_fetch_since_returns_ordered_subset(fresh_engine: AsyncEngine) -> None: + async with fresh_engine.begin() as conn: + for i in range(5): + await write( + conn, + make_event( + event_type="v1.system.note", + subject_kind="system", + actor="t", + payload=f"n{i}".encode(), + ), + ) + async with fresh_engine.connect() as conn: + batch = await fetch_since(conn, since_sequence=2) + seqs = [e.sequence for e in batch] + assert seqs == [3, 4, 5] + + +async def test_tail_yields_events_then_breaks(fresh_engine: AsyncEngine) -> None: + pkg_id = uuid.uuid4() + async with fresh_engine.begin() as conn: + await write( + conn, + make_event( + event_type="v1.package.created", + subject_kind="package", + subject_id=pkg_id, + actor="ops", + payload=_package_created_payload(), + ), + ) + + collected = [] + + async def consume() -> None: + async for evt in tail(fresh_engine, since_sequence=0, poll_interval_seconds=0.01): + collected.append(evt) + if len(collected) >= 1: + break + + await asyncio.wait_for(consume(), timeout=3.0) + assert len(collected) == 1 + assert collected[0].event_type == "v1.package.created" + assert collected[0].subject_id == pkg_id + + +async def test_replay_reproduces_direct_write_state(fresh_engine: AsyncEngine) -> None: + """Direct path (write + apply) and replay path produce identical state.""" + writer = RegistryViewWriter() + pkg_id = uuid.uuid4() + + # Direct path: write event + apply in one transaction. + async with fresh_engine.begin() as conn: + evt = await write( + conn, + make_event( + event_type="v1.package.created", + subject_kind="package", + subject_id=pkg_id, + actor="ops", + payload=_package_created_payload(), + ), + ) + await writer.apply(conn, evt) + + async with fresh_engine.connect() as conn: + pkg_a = (await conn.execute(select(artifact_packages))).one() + ret_a = (await conn.execute(select(retention_state))).one() + + # Replay path: reset views and replay every event. + applied_seq = await replay(fresh_engine, writer, reset=True) + assert applied_seq == evt.sequence + + async with fresh_engine.connect() as conn: + pkg_b = (await conn.execute(select(artifact_packages))).one() + ret_b = (await conn.execute(select(retention_state))).one() + + # Materialised state must match across direct vs replay paths. + assert tuple(pkg_a) == tuple(pkg_b) + assert tuple(ret_a) == tuple(ret_b) + + +async def test_replay_handles_multiple_event_types(fresh_engine: AsyncEngine) -> None: + writer = RegistryViewWriter() + pkg_id = uuid.uuid4() + + async with fresh_engine.begin() as conn: + e_created = await write( + conn, + make_event( + event_type="v1.package.created", + subject_kind="package", + subject_id=pkg_id, + actor="ops", + payload=_package_created_payload(), + ), + ) + await writer.apply(conn, e_created) + + e_finalized = await write( + conn, + make_event( + event_type="v1.package.finalized", + subject_kind="package", + subject_id=pkg_id, + actor="ops", + payload=cbor2.dumps( + {"manifest_digest_hex": "ab" * 32}, + canonical=True, + ), + ), + ) + await writer.apply(conn, e_finalized) + + async with fresh_engine.connect() as conn: + pkg_a = (await conn.execute(select(artifact_packages))).one() + assert pkg_a.status == "finalized" + assert pkg_a.manifest_digest == bytes.fromhex("ab" * 32) + + last_seq = await replay(fresh_engine, writer, reset=True) + assert last_seq == e_finalized.sequence + + async with fresh_engine.connect() as conn: + pkg_b = (await conn.execute(select(artifact_packages))).one() + assert tuple(pkg_a) == tuple(pkg_b) + + +async def test_unknown_event_type_is_tolerated(fresh_engine: AsyncEngine) -> None: + writer = RegistryViewWriter() + async with fresh_engine.begin() as conn: + evt = await write( + conn, + make_event( + event_type="v1.unknown.frob", + subject_kind="system", + actor="ops", + payload=cbor2.dumps({}, canonical=True), + ), + ) + await writer.apply(conn, evt) # must not raise + + +async def test_event_payload_digest_is_blake3_of_payload() -> None: + import blake3 as _blake3 + + payload = cbor2.dumps({"hello": "world"}, canonical=True) + evt = make_event( + event_type="v1.system.note", + subject_kind="system", + actor="t", + payload=payload, + ) + assert evt.payload_digest == _blake3.blake3(payload).digest() + + +# Acknowledge pytest fixture as used. +_ = pytest diff --git a/workplans/ARTIFACT-STORE-WP-0001-service-baseline.md b/workplans/ARTIFACT-STORE-WP-0001-service-baseline.md index 0176c8d..40d3fcf 100644 --- a/workplans/ARTIFACT-STORE-WP-0001-service-baseline.md +++ b/workplans/ARTIFACT-STORE-WP-0001-service-baseline.md @@ -167,7 +167,7 @@ Acceptance: ```task id: ARTIFACT-STORE-WP-0001-T011 -status: todo +status: done priority: high state_hub_task_id: "90fce17d-cce5-4687-ae9e-02abd7d92622" ```