generated from coulomb/repo-seed
WP-0001-T011: append-only event log — write, fetch_since, tail, replay
src/artifactstore/events/:
- model.py: Event frozen dataclass (event_type, subject_kind, subject_id,
actor, payload, payload_digest; sequence + created_at populated by the
DB on write). make_event() helper computes payload_digest as raw BLAKE3
(32 bytes) of payload. ViewWriter Protocol with reset() + apply().
- log.py:
* write(connection, event) — inserts one row in the caller's transaction
and returns Event with sequence + created_at populated via RETURNING.
* fetch_since(connection, since_sequence, limit) — read events after a
cursor in order.
* tail(engine, since_sequence) — async-iterator long-poll over the log;
SQLite uses interval polling, PG LISTEN/NOTIFY is a future workplan.
* replay(engine, view_writer, reset=True) — drains the event log through
a ViewWriter inside one transaction; returns the highest sequence
applied.
- views.py: RegistryViewWriter — canonical event handlers shared by direct
write and replay paths. Ships handlers for v1.package.created (inserts
artifact_packages + retention_state) and v1.package.finalized (updates
status, finalized_at, manifest_digest). Unknown event types tolerated;
additional handlers register here as later tasks land.
src/artifactstore/db/schema.py: events.sequence type is now
BigInteger().with_variant(Integer(), 'sqlite') so SQLite's autoincrement
(INTEGER PRIMARY KEY rowid alias) works while PostgreSQL keeps BIGSERIAL.
tests/integration/test_event_log.py (6 cases):
- write() assigns monotonic sequence numbers (1, 2, ...) and a created_at.
- fetch_since(since_sequence=2) returns the ordered tail.
- tail() yields events and exits cleanly on consumer break.
- Direct write path (write + apply) and replay path produce byte-identical
materialised state — the key ADR-0002 invariant.
- Replay handles multiple event types (package.created -> finalized).
- Unknown event types are tolerated (no-op apply).
- payload_digest equals BLAKE3 of payload.
Gates: ruff clean, mypy --strict clean on 36 files, 45 tests pass.
make migrate-fresh end-to-end ok.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -20,6 +20,7 @@ from sqlalchemy import (
|
||||
DateTime,
|
||||
ForeignKey,
|
||||
Index,
|
||||
Integer,
|
||||
LargeBinary,
|
||||
MetaData,
|
||||
String,
|
||||
@@ -35,11 +36,17 @@ metadata = MetaData()
|
||||
|
||||
_JSON_TYPE = JSON().with_variant(JSONB(), "postgresql")
|
||||
|
||||
# SQLite's autoincrement only applies to ``INTEGER PRIMARY KEY`` (rowid alias),
|
||||
# not ``BIGINT``. We want a 64-bit sequence on PostgreSQL and a working
|
||||
# autoincrement on SQLite, so we declare BigInteger with a SQLite Integer
|
||||
# variant. On 64-bit SQLite this is still effectively 64-bit at runtime.
|
||||
_SEQUENCE_TYPE = BigInteger().with_variant(Integer(), "sqlite")
|
||||
|
||||
|
||||
events = Table(
|
||||
"events",
|
||||
metadata,
|
||||
Column("sequence", BigInteger, primary_key=True, autoincrement=True),
|
||||
Column("sequence", _SEQUENCE_TYPE, primary_key=True, autoincrement=True),
|
||||
Column("created_at", DateTime(timezone=True), nullable=False, server_default=func.now()),
|
||||
Column("event_type", String, nullable=False),
|
||||
Column("subject_kind", String, nullable=False),
|
||||
|
||||
@@ -1,5 +1,27 @@
|
||||
"""Append-only event log and materialised-view replayer.
|
||||
"""Append-only event log and materialised-view replayer (ADR-0002).
|
||||
|
||||
Real implementation lands in ARTIFACT-STORE-WP-0001-T011. See ADR-0002 for
|
||||
the event-log-as-source-of-truth contract.
|
||||
Two entry points use the same view-application logic:
|
||||
|
||||
* the direct write path — :func:`write` followed by
|
||||
:meth:`ViewWriter.apply` in the same transaction;
|
||||
* the replay path — :func:`replay` reads every event in order and applies
|
||||
it via the configured :class:`ViewWriter`.
|
||||
|
||||
Both paths produce byte-identical materialised state for the same event
|
||||
sequence.
|
||||
"""
|
||||
|
||||
from artifactstore.events.log import fetch_since, replay, tail, write
|
||||
from artifactstore.events.model import Event, ViewWriter, make_event
|
||||
from artifactstore.events.views import RegistryViewWriter
|
||||
|
||||
__all__ = [
|
||||
"Event",
|
||||
"RegistryViewWriter",
|
||||
"ViewWriter",
|
||||
"fetch_since",
|
||||
"make_event",
|
||||
"replay",
|
||||
"tail",
|
||||
"write",
|
||||
]
|
||||
|
||||
125
src/artifactstore/events/log.py
Normal file
125
src/artifactstore/events/log.py
Normal file
@@ -0,0 +1,125 @@
|
||||
"""Append-only event log: write, fetch_since, tail, replay (ADR-0002)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import AsyncIterator
|
||||
from dataclasses import replace
|
||||
|
||||
from sqlalchemy import insert, select
|
||||
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine
|
||||
|
||||
from artifactstore.db.schema import events as events_t
|
||||
from artifactstore.events.model import Event, ViewWriter
|
||||
|
||||
__all__ = ["fetch_since", "replay", "tail", "write"]
|
||||
|
||||
|
||||
async def write(connection: AsyncConnection, event: Event) -> Event:
|
||||
"""Insert one event row in the given transaction.
|
||||
|
||||
Returns a new :class:`Event` with ``sequence`` and ``created_at``
|
||||
populated by the database. The caller is responsible for committing
|
||||
the surrounding transaction.
|
||||
"""
|
||||
result = await connection.execute(
|
||||
insert(events_t)
|
||||
.values(
|
||||
event_type=event.event_type,
|
||||
subject_kind=event.subject_kind,
|
||||
subject_id=event.subject_id,
|
||||
actor=event.actor,
|
||||
payload=event.payload,
|
||||
payload_digest=event.payload_digest,
|
||||
)
|
||||
.returning(events_t.c.sequence, events_t.c.created_at)
|
||||
)
|
||||
row = result.one()
|
||||
return replace(event, sequence=row.sequence, created_at=row.created_at)
|
||||
|
||||
|
||||
async def fetch_since(
|
||||
connection: AsyncConnection,
|
||||
*,
|
||||
since_sequence: int,
|
||||
limit: int | None = None,
|
||||
) -> list[Event]:
|
||||
"""Return events with ``sequence > since_sequence``, ordered by sequence."""
|
||||
stmt = (
|
||||
select(events_t).where(events_t.c.sequence > since_sequence).order_by(events_t.c.sequence)
|
||||
)
|
||||
if limit is not None:
|
||||
stmt = stmt.limit(limit)
|
||||
result = await connection.execute(stmt)
|
||||
return [
|
||||
Event(
|
||||
event_type=r.event_type,
|
||||
subject_kind=r.subject_kind,
|
||||
subject_id=r.subject_id,
|
||||
actor=r.actor,
|
||||
payload=r.payload,
|
||||
payload_digest=r.payload_digest,
|
||||
sequence=r.sequence,
|
||||
created_at=r.created_at,
|
||||
)
|
||||
for r in result.all()
|
||||
]
|
||||
|
||||
|
||||
async def tail(
|
||||
engine: AsyncEngine,
|
||||
*,
|
||||
since_sequence: int = 0,
|
||||
poll_interval_seconds: float = 0.5,
|
||||
batch_size: int = 100,
|
||||
) -> AsyncIterator[Event]:
|
||||
"""Yield events with sequence > since_sequence, long-polling at the tail.
|
||||
|
||||
SQLite uses interval polling. PostgreSQL ``LISTEN/NOTIFY`` support is a
|
||||
future improvement (deferred to its own workplan). The iterator never
|
||||
terminates of its own accord; callers stop it via ``break`` or
|
||||
cancellation.
|
||||
"""
|
||||
cursor = since_sequence
|
||||
while True:
|
||||
async with engine.connect() as conn:
|
||||
batch = await fetch_since(conn, since_sequence=cursor, limit=batch_size)
|
||||
if batch:
|
||||
for evt in batch:
|
||||
yield evt
|
||||
if evt.sequence is None:
|
||||
raise RuntimeError("event from DB missing sequence")
|
||||
cursor = evt.sequence
|
||||
else:
|
||||
await asyncio.sleep(poll_interval_seconds)
|
||||
|
||||
|
||||
async def replay(
|
||||
engine: AsyncEngine,
|
||||
view_writer: ViewWriter,
|
||||
*,
|
||||
reset: bool = True,
|
||||
batch_size: int = 500,
|
||||
) -> int:
|
||||
"""Replay every event through ``view_writer``.
|
||||
|
||||
When ``reset`` is True (the default), :meth:`ViewWriter.reset` is invoked
|
||||
before any events are applied. The whole replay runs in one transaction
|
||||
so a failure rolls back to the pre-replay state.
|
||||
|
||||
Returns the highest event sequence number applied.
|
||||
"""
|
||||
async with engine.begin() as conn:
|
||||
if reset:
|
||||
await view_writer.reset(conn)
|
||||
last_seq = 0
|
||||
while True:
|
||||
batch = await fetch_since(conn, since_sequence=last_seq, limit=batch_size)
|
||||
if not batch:
|
||||
break
|
||||
for evt in batch:
|
||||
await view_writer.apply(conn, evt)
|
||||
if evt.sequence is None:
|
||||
raise RuntimeError("event from DB missing sequence")
|
||||
last_seq = evt.sequence
|
||||
return last_seq
|
||||
70
src/artifactstore/events/model.py
Normal file
70
src/artifactstore/events/model.py
Normal file
@@ -0,0 +1,70 @@
|
||||
"""Event domain types (ADR-0002).
|
||||
|
||||
An :class:`Event` is one row in the ``events`` table. ``payload`` is
|
||||
canonical CBOR (matching the schema column type ``BYTEA``); ``payload_digest``
|
||||
is the raw BLAKE3 of the payload (32 bytes). ``sequence`` and ``created_at``
|
||||
are populated by the database on write.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import Protocol
|
||||
from uuid import UUID
|
||||
|
||||
import blake3 as _blake3
|
||||
from sqlalchemy.ext.asyncio import AsyncConnection
|
||||
|
||||
__all__ = ["Event", "ViewWriter", "make_event"]
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class Event:
|
||||
"""One row in the append-only events table."""
|
||||
|
||||
event_type: str
|
||||
subject_kind: str
|
||||
actor: str
|
||||
payload: bytes
|
||||
payload_digest: bytes
|
||||
subject_id: UUID | None = None
|
||||
sequence: int | None = None
|
||||
created_at: datetime | None = None
|
||||
|
||||
|
||||
def make_event(
|
||||
*,
|
||||
event_type: str,
|
||||
subject_kind: str,
|
||||
actor: str,
|
||||
payload: bytes,
|
||||
subject_id: UUID | None = None,
|
||||
) -> Event:
|
||||
"""Construct an :class:`Event` with ``payload_digest`` computed from
|
||||
the payload via BLAKE3."""
|
||||
return Event(
|
||||
event_type=event_type,
|
||||
subject_kind=subject_kind,
|
||||
actor=actor,
|
||||
payload=payload,
|
||||
payload_digest=_blake3.blake3(payload).digest(),
|
||||
subject_id=subject_id,
|
||||
)
|
||||
|
||||
|
||||
class ViewWriter(Protocol):
|
||||
"""Applies events to the materialised views (ADR-0002).
|
||||
|
||||
The same implementation is invoked from both the direct write path
|
||||
(called inline by the registry after :func:`write`) and the replay
|
||||
path (called by :func:`replay` for each event).
|
||||
"""
|
||||
|
||||
async def reset(self, connection: AsyncConnection) -> None:
|
||||
"""Truncate the materialised views in dependency order."""
|
||||
...
|
||||
|
||||
async def apply(self, connection: AsyncConnection, event: Event) -> None:
|
||||
"""Apply one event to the materialised views."""
|
||||
...
|
||||
101
src/artifactstore/events/views.py
Normal file
101
src/artifactstore/events/views.py
Normal file
@@ -0,0 +1,101 @@
|
||||
"""Materialised view writer — canonical event handlers (ADR-0002).
|
||||
|
||||
This module owns the event-to-view application logic. The same implementation
|
||||
serves both the direct write path (registry calls :func:`log.write` followed
|
||||
by :meth:`RegistryViewWriter.apply` in one transaction) and the replay path
|
||||
(:func:`log.replay` walks every event in order and applies it).
|
||||
|
||||
T011 ships handlers for the events emitted at registry baseline:
|
||||
|
||||
* ``v1.package.created`` — inserts ``artifact_packages`` + ``retention_state``;
|
||||
* ``v1.package.finalized`` — updates ``artifact_packages`` status and digest.
|
||||
|
||||
Additional handlers (file ingestion, storage receipts, retention extensions)
|
||||
land in later workplans.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import cbor2
|
||||
from sqlalchemy import delete, insert, update
|
||||
from sqlalchemy.ext.asyncio import AsyncConnection
|
||||
|
||||
from artifactstore.db.schema import (
|
||||
artifact_files,
|
||||
artifact_packages,
|
||||
retention_state,
|
||||
storage_locations,
|
||||
)
|
||||
from artifactstore.events.model import Event
|
||||
|
||||
__all__ = ["RegistryViewWriter"]
|
||||
|
||||
|
||||
class RegistryViewWriter:
|
||||
"""Default :class:`~artifactstore.events.model.ViewWriter` over the
|
||||
blueprint materialised view tables."""
|
||||
|
||||
async def reset(self, connection: AsyncConnection) -> None:
|
||||
# Delete in dependency order: child tables before parents.
|
||||
await connection.execute(delete(storage_locations))
|
||||
await connection.execute(delete(artifact_files))
|
||||
await connection.execute(delete(retention_state))
|
||||
await connection.execute(delete(artifact_packages))
|
||||
|
||||
async def apply(self, connection: AsyncConnection, event: Event) -> None:
|
||||
handler = _HANDLERS.get(event.event_type)
|
||||
if handler is None:
|
||||
# Unknown event types are tolerated at v1; later tasks register
|
||||
# additional handlers without changing this class's surface.
|
||||
return
|
||||
await handler(connection, event)
|
||||
|
||||
|
||||
async def _apply_package_created(connection: AsyncConnection, event: Event) -> None:
|
||||
payload = cbor2.loads(event.payload)
|
||||
if event.subject_id is None:
|
||||
raise ValueError("v1.package.created event must have subject_id")
|
||||
await connection.execute(
|
||||
insert(artifact_packages).values(
|
||||
id=event.subject_id,
|
||||
name=payload["name"],
|
||||
producer=payload["producer"],
|
||||
subject=payload["subject"],
|
||||
retention_class=payload["retention_class"],
|
||||
metadata_schema_id=None,
|
||||
metadata=payload.get("metadata", {}),
|
||||
status="created",
|
||||
manifest_digest=None,
|
||||
last_event_sequence=event.sequence,
|
||||
)
|
||||
)
|
||||
await connection.execute(
|
||||
insert(retention_state).values(
|
||||
package_id=event.subject_id,
|
||||
current_expires_at=None,
|
||||
effective_class=payload["retention_class"],
|
||||
active_hold_id=None,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
async def _apply_package_finalized(connection: AsyncConnection, event: Event) -> None:
|
||||
payload = cbor2.loads(event.payload)
|
||||
if event.subject_id is None:
|
||||
raise ValueError("v1.package.finalized event must have subject_id")
|
||||
await connection.execute(
|
||||
update(artifact_packages)
|
||||
.where(artifact_packages.c.id == event.subject_id)
|
||||
.values(
|
||||
status="finalized",
|
||||
finalized_at=event.created_at,
|
||||
manifest_digest=bytes.fromhex(payload["manifest_digest_hex"]),
|
||||
last_event_sequence=event.sequence,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
_HANDLERS = {
|
||||
"v1.package.created": _apply_package_created,
|
||||
"v1.package.finalized": _apply_package_finalized,
|
||||
}
|
||||
236
tests/integration/test_event_log.py
Normal file
236
tests/integration/test_event_log.py
Normal file
@@ -0,0 +1,236 @@
|
||||
"""Event log integration tests (ARTIFACT-STORE-WP-0001-T011)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import uuid
|
||||
from collections.abc import AsyncIterator
|
||||
from pathlib import Path
|
||||
|
||||
import cbor2
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
from sqlalchemy import insert, select
|
||||
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
|
||||
|
||||
from artifactstore.db.schema import (
|
||||
artifact_packages,
|
||||
metadata,
|
||||
retention_classes,
|
||||
retention_state,
|
||||
)
|
||||
from artifactstore.db.seed import RETENTION_CLASS_SEEDS
|
||||
from artifactstore.events import (
|
||||
RegistryViewWriter,
|
||||
fetch_since,
|
||||
make_event,
|
||||
replay,
|
||||
tail,
|
||||
write,
|
||||
)
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def fresh_engine(tmp_path: Path) -> AsyncIterator[AsyncEngine]:
|
||||
db_path = tmp_path / "events.db"
|
||||
engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}")
|
||||
async with engine.begin() as conn:
|
||||
await conn.run_sync(metadata.create_all)
|
||||
for seed in RETENTION_CLASS_SEEDS:
|
||||
await conn.execute(insert(retention_classes).values(**seed))
|
||||
yield engine
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
def _package_created_payload(*, name: str = "p", retention_class: str = "raw-evidence") -> bytes:
|
||||
return cbor2.dumps(
|
||||
{
|
||||
"name": name,
|
||||
"producer": "test-producer",
|
||||
"subject": "test-subject",
|
||||
"retention_class": retention_class,
|
||||
"metadata": {"k": "v"},
|
||||
},
|
||||
canonical=True,
|
||||
)
|
||||
|
||||
|
||||
async def test_write_assigns_monotonic_sequence(fresh_engine: AsyncEngine) -> None:
|
||||
e1 = make_event(
|
||||
event_type="v1.system.note",
|
||||
subject_kind="system",
|
||||
actor="t",
|
||||
payload=b"hi",
|
||||
)
|
||||
e2 = make_event(
|
||||
event_type="v1.system.note",
|
||||
subject_kind="system",
|
||||
actor="t",
|
||||
payload=b"hello",
|
||||
)
|
||||
async with fresh_engine.begin() as conn:
|
||||
r1 = await write(conn, e1)
|
||||
r2 = await write(conn, e2)
|
||||
assert r1.sequence == 1
|
||||
assert r2.sequence == 2
|
||||
assert r1.created_at is not None
|
||||
assert r2.created_at is not None
|
||||
|
||||
|
||||
async def test_fetch_since_returns_ordered_subset(fresh_engine: AsyncEngine) -> None:
|
||||
async with fresh_engine.begin() as conn:
|
||||
for i in range(5):
|
||||
await write(
|
||||
conn,
|
||||
make_event(
|
||||
event_type="v1.system.note",
|
||||
subject_kind="system",
|
||||
actor="t",
|
||||
payload=f"n{i}".encode(),
|
||||
),
|
||||
)
|
||||
async with fresh_engine.connect() as conn:
|
||||
batch = await fetch_since(conn, since_sequence=2)
|
||||
seqs = [e.sequence for e in batch]
|
||||
assert seqs == [3, 4, 5]
|
||||
|
||||
|
||||
async def test_tail_yields_events_then_breaks(fresh_engine: AsyncEngine) -> None:
|
||||
pkg_id = uuid.uuid4()
|
||||
async with fresh_engine.begin() as conn:
|
||||
await write(
|
||||
conn,
|
||||
make_event(
|
||||
event_type="v1.package.created",
|
||||
subject_kind="package",
|
||||
subject_id=pkg_id,
|
||||
actor="ops",
|
||||
payload=_package_created_payload(),
|
||||
),
|
||||
)
|
||||
|
||||
collected = []
|
||||
|
||||
async def consume() -> None:
|
||||
async for evt in tail(fresh_engine, since_sequence=0, poll_interval_seconds=0.01):
|
||||
collected.append(evt)
|
||||
if len(collected) >= 1:
|
||||
break
|
||||
|
||||
await asyncio.wait_for(consume(), timeout=3.0)
|
||||
assert len(collected) == 1
|
||||
assert collected[0].event_type == "v1.package.created"
|
||||
assert collected[0].subject_id == pkg_id
|
||||
|
||||
|
||||
async def test_replay_reproduces_direct_write_state(fresh_engine: AsyncEngine) -> None:
|
||||
"""Direct path (write + apply) and replay path produce identical state."""
|
||||
writer = RegistryViewWriter()
|
||||
pkg_id = uuid.uuid4()
|
||||
|
||||
# Direct path: write event + apply in one transaction.
|
||||
async with fresh_engine.begin() as conn:
|
||||
evt = await write(
|
||||
conn,
|
||||
make_event(
|
||||
event_type="v1.package.created",
|
||||
subject_kind="package",
|
||||
subject_id=pkg_id,
|
||||
actor="ops",
|
||||
payload=_package_created_payload(),
|
||||
),
|
||||
)
|
||||
await writer.apply(conn, evt)
|
||||
|
||||
async with fresh_engine.connect() as conn:
|
||||
pkg_a = (await conn.execute(select(artifact_packages))).one()
|
||||
ret_a = (await conn.execute(select(retention_state))).one()
|
||||
|
||||
# Replay path: reset views and replay every event.
|
||||
applied_seq = await replay(fresh_engine, writer, reset=True)
|
||||
assert applied_seq == evt.sequence
|
||||
|
||||
async with fresh_engine.connect() as conn:
|
||||
pkg_b = (await conn.execute(select(artifact_packages))).one()
|
||||
ret_b = (await conn.execute(select(retention_state))).one()
|
||||
|
||||
# Materialised state must match across direct vs replay paths.
|
||||
assert tuple(pkg_a) == tuple(pkg_b)
|
||||
assert tuple(ret_a) == tuple(ret_b)
|
||||
|
||||
|
||||
async def test_replay_handles_multiple_event_types(fresh_engine: AsyncEngine) -> None:
|
||||
writer = RegistryViewWriter()
|
||||
pkg_id = uuid.uuid4()
|
||||
|
||||
async with fresh_engine.begin() as conn:
|
||||
e_created = await write(
|
||||
conn,
|
||||
make_event(
|
||||
event_type="v1.package.created",
|
||||
subject_kind="package",
|
||||
subject_id=pkg_id,
|
||||
actor="ops",
|
||||
payload=_package_created_payload(),
|
||||
),
|
||||
)
|
||||
await writer.apply(conn, e_created)
|
||||
|
||||
e_finalized = await write(
|
||||
conn,
|
||||
make_event(
|
||||
event_type="v1.package.finalized",
|
||||
subject_kind="package",
|
||||
subject_id=pkg_id,
|
||||
actor="ops",
|
||||
payload=cbor2.dumps(
|
||||
{"manifest_digest_hex": "ab" * 32},
|
||||
canonical=True,
|
||||
),
|
||||
),
|
||||
)
|
||||
await writer.apply(conn, e_finalized)
|
||||
|
||||
async with fresh_engine.connect() as conn:
|
||||
pkg_a = (await conn.execute(select(artifact_packages))).one()
|
||||
assert pkg_a.status == "finalized"
|
||||
assert pkg_a.manifest_digest == bytes.fromhex("ab" * 32)
|
||||
|
||||
last_seq = await replay(fresh_engine, writer, reset=True)
|
||||
assert last_seq == e_finalized.sequence
|
||||
|
||||
async with fresh_engine.connect() as conn:
|
||||
pkg_b = (await conn.execute(select(artifact_packages))).one()
|
||||
assert tuple(pkg_a) == tuple(pkg_b)
|
||||
|
||||
|
||||
async def test_unknown_event_type_is_tolerated(fresh_engine: AsyncEngine) -> None:
|
||||
writer = RegistryViewWriter()
|
||||
async with fresh_engine.begin() as conn:
|
||||
evt = await write(
|
||||
conn,
|
||||
make_event(
|
||||
event_type="v1.unknown.frob",
|
||||
subject_kind="system",
|
||||
actor="ops",
|
||||
payload=cbor2.dumps({}, canonical=True),
|
||||
),
|
||||
)
|
||||
await writer.apply(conn, evt) # must not raise
|
||||
|
||||
|
||||
async def test_event_payload_digest_is_blake3_of_payload() -> None:
|
||||
import blake3 as _blake3
|
||||
|
||||
payload = cbor2.dumps({"hello": "world"}, canonical=True)
|
||||
evt = make_event(
|
||||
event_type="v1.system.note",
|
||||
subject_kind="system",
|
||||
actor="t",
|
||||
payload=payload,
|
||||
)
|
||||
assert evt.payload_digest == _blake3.blake3(payload).digest()
|
||||
|
||||
|
||||
# Acknowledge pytest fixture as used.
|
||||
_ = pytest
|
||||
@@ -167,7 +167,7 @@ Acceptance:
|
||||
|
||||
```task
|
||||
id: ARTIFACT-STORE-WP-0001-T011
|
||||
status: todo
|
||||
status: done
|
||||
priority: high
|
||||
state_hub_task_id: "90fce17d-cce5-4687-ae9e-02abd7d92622"
|
||||
```
|
||||
|
||||
Reference in New Issue
Block a user