feat(coordination): git-backed DecisionLog event store (WP-0009 T1)

Factor DecisionLog storage behind an EventStore abstraction: InMemoryEventStore
stays the default/test double, GitEventStore makes the coordination log
git-addressable. Each space is a ref (refs/spaces/<sha1>); append writes an
immutable one-blob commit and advances the ref under compare-and-swap, so the
commit chain is the per-space total order and a racing appender can never fork
the log. Deterministic stable-JSON event serialization. Zero runtime deps
(git CLI via subprocess). API and fold unchanged across backends.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-16 01:41:27 +02:00
parent b31e9bc337
commit 45a858ead0
4 changed files with 340 additions and 8 deletions

View File

@@ -4,8 +4,13 @@ from shard_wiki.coordination.decision_log import (
CoordinationState,
DecisionEvent,
DecisionLog,
EventStore,
EventType,
InMemoryEventStore,
deserialize_event,
serialize_event,
)
from shard_wiki.coordination.git_event_store import GitEventStore
from shard_wiki.coordination.overlay import (
ApplyResult,
ApplyStatus,
@@ -19,6 +24,11 @@ __all__ = [
"DecisionEvent",
"EventType",
"CoordinationState",
"EventStore",
"InMemoryEventStore",
"GitEventStore",
"serialize_event",
"deserialize_event",
"Overlay",
"OverlayEngine",
"ApplyStatus",

View File

@@ -3,22 +3,36 @@
Coordination-canonical state (overlays, equivalence bindings, aliases, merges, forks) is an
**append-only decision log**, not a mutable file; the queryable *current* state is a **derived
fold** of the log (tier-3 disposable). The log is **totally ordered per space** via a single
**append authority** — here an in-process counter; a git-backed, lease-held authority is a later
binding. That total order is what gives read-your-writes across readers (§8.6).
**append authority**. That total order is what gives read-your-writes across readers (§8.6).
Storage lives behind :class:`EventStore`: :class:`InMemoryEventStore` is the default test double
(an in-process counter); :class:`~shard_wiki.coordination.git_event_store.GitEventStore` is the
git-addressable backend (SHARD-WP-0009). The :class:`DecisionLog` API and the :meth:`fold` are
identical across backends — only storage + the concurrency model differ.
`derived = f(canonical)`: :class:`CoordinationState` is always reproducible by replaying the log.
"""
from __future__ import annotations
import json
from collections.abc import Mapping
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from types import MappingProxyType
from typing import Any
from typing import Any, Protocol, runtime_checkable
__all__ = ["EventType", "DecisionEvent", "CoordinationState", "DecisionLog"]
__all__ = [
"EventType",
"DecisionEvent",
"CoordinationState",
"EventStore",
"InMemoryEventStore",
"DecisionLog",
"serialize_event",
"deserialize_event",
]
class EventType(Enum):
@@ -63,10 +77,57 @@ class CoordinationState:
return frozenset({identity})
class DecisionLog:
"""In-memory append-only log, totally ordered per space (the append authority for a process).
def serialize_event(event: DecisionEvent) -> bytes:
"""Deterministic, stable-JSON wire form of an event (same bytes for equal events, any process).
A later binding swaps the storage for git + a per-space lease without changing this API.
Sorted keys + compact separators make the serialization canonical, so a git object hashed from
it is reproducible — the basis for content-addressable, comparable logs across backends.
"""
obj = {
"seq": event.seq,
"space": event.space,
"type": event.type.value,
"payload": event.payload,
"actor": event.actor,
"timestamp": event.timestamp.isoformat(),
}
return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode()
def deserialize_event(data: bytes | str) -> DecisionEvent:
"""Inverse of :func:`serialize_event` — round-trips an event byte-for-byte by field."""
obj = json.loads(data)
return DecisionEvent(
seq=obj["seq"],
space=obj["space"],
type=EventType(obj["type"]),
payload=obj["payload"],
actor=obj["actor"],
timestamp=datetime.fromisoformat(obj["timestamp"]),
)
@runtime_checkable
class EventStore(Protocol):
"""Append-only, per-space ordered storage behind :class:`DecisionLog`.
Two bindings exist: :class:`InMemoryEventStore` (default/test double) and
:class:`~shard_wiki.coordination.git_event_store.GitEventStore` (git-addressable). Both assign
a per-space monotonic ``seq`` at the log head and guarantee read-your-writes for their reach
(in-process for memory; cross-process for git).
"""
def append(
self, space: str, type: EventType, payload: Mapping[str, Any], actor: str | None = None
) -> DecisionEvent: ...
def events(self, space: str) -> tuple[DecisionEvent, ...]: ...
class InMemoryEventStore:
"""In-process append-only store, totally ordered per space (the append authority for a process).
The default test double; the git backend preserves this exact contract on durable storage.
"""
def __init__(self) -> None:
@@ -84,10 +145,33 @@ class DecisionLog:
self._events.setdefault(space, []).append(event)
return event
def events(self, space: str) -> tuple[DecisionEvent, ...]:
return tuple(self._events.get(space, ()))
class DecisionLog:
"""Append-only decision log, totally ordered per space, with a derived :meth:`fold`.
Storage is delegated to an :class:`EventStore` (default :class:`InMemoryEventStore`); swapping
in the git backend changes only durability + the concurrency model, not this API or the fold.
"""
def __init__(self, store: EventStore | None = None) -> None:
self._store: EventStore = store if store is not None else InMemoryEventStore()
def append(
self,
space: str,
type: EventType,
payload: Mapping[str, Any],
actor: str | None = None,
) -> DecisionEvent:
return self._store.append(space, type, payload, actor=actor)
def events(self, space: str) -> tuple[DecisionEvent, ...]:
"""The space's events in append (total) order. Read-your-writes: a just-appended event
is present immediately."""
return tuple(self._events.get(space, ()))
return self._store.events(space)
def fold(self, space: str) -> CoordinationState:
"""Replay the log into current coordination state (derived = f(log))."""

View File

@@ -0,0 +1,154 @@
"""GitEventStore — a git-addressable binding of :class:`EventStore` (SHARD-WP-0009 T1).
Each space is a ref (``refs/spaces/<sha1(space)>``); each ``append`` writes the event as an
immutable git object (a one-blob tree committed onto the ref) and advances the ref. The commit
chain *is* the totally ordered log: ``seq`` is the depth, ``events`` walks first-parent from the
head oldest→newest. Coordination-canonical state therefore inherits git's history / patch /
review / backup affordances (I-6) and is read-your-writes correct across processes.
The total order is enforced at storage by a **compare-and-swap** ref update
(``git update-ref <ref> <new> <old>``): two appenders racing off the same head — the loser's CAS
fails and it retries off the new head, so a non-holder can never fork the log. The lease layer
(T2) sits *above* this as the append-authority policy; CAS is the mechanism that makes it safe.
Implemented over the ``git`` CLI through :mod:`subprocess` — zero runtime dependencies.
"""
from __future__ import annotations
import hashlib
import os
import subprocess
from collections.abc import Mapping
from pathlib import Path
from typing import Any
from shard_wiki.coordination.decision_log import (
DecisionEvent,
EventType,
deserialize_event,
serialize_event,
)
__all__ = ["GitEventStore"]
# Fixed identity so commit objects are reproducible and never prompt for git config; the event's
# own timestamp/actor carry the real provenance, the commit is just the ordered container.
_GIT_IDENTITY = {
"GIT_AUTHOR_NAME": "shard-wiki",
"GIT_AUTHOR_EMAIL": "coordination@shard-wiki",
"GIT_COMMITTER_NAME": "shard-wiki",
"GIT_COMMITTER_EMAIL": "coordination@shard-wiki",
}
_EVENT_PATH = "event.json"
_MAX_CAS_RETRIES = 50
class GitEventStore:
"""Git-backed, append-only, per-space ordered event store (an :class:`EventStore`)."""
def __init__(self, repo_path: str | Path) -> None:
self.repo_path = Path(repo_path)
self.repo_path.mkdir(parents=True, exist_ok=True)
if not (self.repo_path / "HEAD").exists() and not (self.repo_path / ".git").exists():
self._git("init", "--quiet", str(self.repo_path), at_cwd=True)
# -- EventStore contract -------------------------------------------------
def append(
self,
space: str,
type: EventType,
payload: Mapping[str, Any],
actor: str | None = None,
) -> DecisionEvent:
"""Append one event, advancing the space ref under compare-and-swap (retry-on-race)."""
ref = self._ref(space)
for _ in range(_MAX_CAS_RETRIES):
head = self._head(ref)
seq = self._count(ref, head)
event = DecisionEvent(
seq=seq, space=space, type=type, payload=dict(payload), actor=actor
)
commit = self._commit_event(event, parent=head)
if self._cas_update(ref, new=commit, old=head):
return event
raise RuntimeError(f"append contention on {space!r}: exhausted {_MAX_CAS_RETRIES} retries")
def events(self, space: str) -> tuple[DecisionEvent, ...]:
"""The space's events oldest→newest (append/total order)."""
ref = self._ref(space)
head = self._head(ref)
if head is None:
return ()
shas = self._git("rev-list", "--reverse", "--first-parent", ref).decode().split()
return tuple(
deserialize_event(self._git("cat-file", "blob", f"{sha}:{_EVENT_PATH}"))
for sha in shas
)
# -- git plumbing --------------------------------------------------------
def _commit_event(self, event: DecisionEvent, parent: str | None) -> str:
blob = self._git(
"hash-object", "-w", "--stdin", stdin=serialize_event(event)
).decode().strip()
tree = self._git(
"mktree", stdin=f"100644 blob {blob}\t{_EVENT_PATH}\n".encode()
).decode().strip()
args = ["commit-tree", tree, "-m", f"event {event.seq} {event.type.value}"]
if parent is not None:
args += ["-p", parent]
# Pin the commit date to the event's timestamp for reproducible objects.
date = event.timestamp.isoformat()
env = {**_GIT_IDENTITY, "GIT_AUTHOR_DATE": date, "GIT_COMMITTER_DATE": date}
return self._git(*args, env=env).decode().strip()
def _cas_update(self, ref: str, new: str, old: str | None) -> bool:
"""``git update-ref`` with the old value as a CAS guard (empty oldvalue == must-not-exist).
Returns False if the ref moved since we read ``old`` (lost the race) — the caller retries.
"""
result = self._run("update-ref", ref, new, old if old is not None else "")
return result.returncode == 0
def _head(self, ref: str) -> str | None:
result = self._run("rev-parse", "--verify", "--quiet", ref)
out = result.stdout.decode().strip()
return out or None
def _count(self, ref: str, head: str | None) -> int:
if head is None:
return 0
return int(self._git("rev-list", "--count", "--first-parent", ref).decode().strip())
@staticmethod
def _ref(space: str) -> str:
return f"refs/spaces/{hashlib.sha1(space.encode()).hexdigest()}"
def _git(
self,
*args: str,
stdin: bytes | None = None,
env: dict | None = None,
at_cwd: bool = False,
) -> bytes:
result = self._run(*args, stdin=stdin, env=env, at_cwd=at_cwd, check=True)
return result.stdout
def _run(
self,
*args: str,
stdin: bytes | None = None,
env: dict | None = None,
at_cwd: bool = False,
check: bool = False,
) -> subprocess.CompletedProcess:
base = ["git"] if at_cwd else ["git", "-C", str(self.repo_path)]
return subprocess.run(
[*base, *args],
input=stdin,
capture_output=True,
env={**os.environ, **(env or {})},
check=check,
)

View File

@@ -0,0 +1,84 @@
"""Tests for the git-backed event store (SHARD-WP-0009 T1).
The git backend must satisfy the same EventStore contract as the in-memory one (round-trip,
ordering, determinism) while making the log git-addressable.
"""
import subprocess
import pytest
from shard_wiki.coordination import (
DecisionLog,
EventType,
GitEventStore,
InMemoryEventStore,
deserialize_event,
serialize_event,
)
@pytest.fixture
def git_store(tmp_path):
return GitEventStore(tmp_path / "coord")
def test_append_git_read_round_trips(git_store):
log = DecisionLog(git_store)
ev = log.append("s", EventType.ALIAS_SET, {"alias": "Home", "target": "shardA:Index"})
(read,) = log.events("s")
assert read.seq == ev.seq == 0
assert read.space == "s"
assert read.type is EventType.ALIAS_SET
assert read.payload == {"alias": "Home", "target": "shardA:Index"}
def test_ordering_preserved_and_per_space_monotonic(git_store):
log = DecisionLog(git_store)
log.append("a", EventType.ALIAS_SET, {"alias": "X", "target": "s:1"})
log.append("a", EventType.ALIAS_SET, {"alias": "Y", "target": "s:2"})
log.append("b", EventType.ALIAS_SET, {"alias": "Z", "target": "s:3"})
assert [e.seq for e in log.events("a")] == [0, 1]
assert [e.payload["alias"] for e in log.events("a")] == ["X", "Y"]
assert [e.seq for e in log.events("b")] == [0] # independent ref/ordering
def test_each_append_is_a_git_commit(git_store):
log = DecisionLog(git_store)
log.append("s", EventType.BINDING_MADE, {"members": ["a", "b"]})
log.append("s", EventType.PAGE_FORKED, {"source": "a", "fork": "c"})
ref = GitEventStore._ref("s")
count = subprocess.run(
["git", "-C", str(git_store.repo_path), "rev-list", "--count", ref],
capture_output=True, text=True, check=True,
).stdout.strip()
assert count == "2" # one immutable commit object per append
def test_deterministic_serialization_is_stable_and_sorted():
log = InMemoryEventStore()
ev = log.append("s", EventType.ALIAS_SET, {"target": "z", "alias": "a"})
blob = serialize_event(ev)
assert serialize_event(ev) == blob # stable across calls
assert blob.index(b'"alias"') < blob.index(b'"target"') # payload keys sorted, not insertion
assert deserialize_event(blob).payload == {"alias": "a", "target": "z"}
def test_git_fold_matches_in_memory_fold(git_store):
events = [
(EventType.ALIAS_SET, {"alias": "Home", "target": "shardA:Index"}),
(EventType.BINDING_MADE, {"members": ["a", "b"]}),
(EventType.BINDING_MADE, {"members": ["b", "c"]}),
(EventType.ALIAS_SET, {"alias": "Home", "target": "shardB:Main"}),
]
mem = DecisionLog(InMemoryEventStore())
git = DecisionLog(git_store)
for typ, payload in events:
mem.append("s", typ, payload)
git.append("s", typ, payload)
assert git.fold("s").aliases == mem.fold("s").aliases
assert git.fold("s").equivalence_groups == mem.fold("s").equivalence_groups
def test_default_decisionlog_is_in_memory():
assert isinstance(DecisionLog()._store, InMemoryEventStore)