diff --git a/src/shard_wiki/coordination/__init__.py b/src/shard_wiki/coordination/__init__.py index d941f6f..9407c41 100644 --- a/src/shard_wiki/coordination/__init__.py +++ b/src/shard_wiki/coordination/__init__.py @@ -4,8 +4,13 @@ from shard_wiki.coordination.decision_log import ( CoordinationState, DecisionEvent, DecisionLog, + EventStore, EventType, + InMemoryEventStore, + deserialize_event, + serialize_event, ) +from shard_wiki.coordination.git_event_store import GitEventStore from shard_wiki.coordination.overlay import ( ApplyResult, ApplyStatus, @@ -19,6 +24,11 @@ __all__ = [ "DecisionEvent", "EventType", "CoordinationState", + "EventStore", + "InMemoryEventStore", + "GitEventStore", + "serialize_event", + "deserialize_event", "Overlay", "OverlayEngine", "ApplyStatus", diff --git a/src/shard_wiki/coordination/decision_log.py b/src/shard_wiki/coordination/decision_log.py index b894231..417fe89 100644 --- a/src/shard_wiki/coordination/decision_log.py +++ b/src/shard_wiki/coordination/decision_log.py @@ -3,22 +3,36 @@ Coordination-canonical state (overlays, equivalence bindings, aliases, merges, forks) is an **append-only decision log**, not a mutable file; the queryable *current* state is a **derived fold** of the log (tier-3 disposable). The log is **totally ordered per space** via a single -**append authority** — here an in-process counter; a git-backed, lease-held authority is a later -binding. That total order is what gives read-your-writes across readers (§8.6). +**append authority**. That total order is what gives read-your-writes across readers (§8.6). + +Storage lives behind :class:`EventStore`: :class:`InMemoryEventStore` is the default test double +(an in-process counter); :class:`~shard_wiki.coordination.git_event_store.GitEventStore` is the +git-addressable backend (SHARD-WP-0009). The :class:`DecisionLog` API and the :meth:`fold` are +identical across backends — only storage + the concurrency model differ. `derived = f(canonical)`: :class:`CoordinationState` is always reproducible by replaying the log. """ from __future__ import annotations +import json from collections.abc import Mapping from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum from types import MappingProxyType -from typing import Any +from typing import Any, Protocol, runtime_checkable -__all__ = ["EventType", "DecisionEvent", "CoordinationState", "DecisionLog"] +__all__ = [ + "EventType", + "DecisionEvent", + "CoordinationState", + "EventStore", + "InMemoryEventStore", + "DecisionLog", + "serialize_event", + "deserialize_event", +] class EventType(Enum): @@ -63,10 +77,57 @@ class CoordinationState: return frozenset({identity}) -class DecisionLog: - """In-memory append-only log, totally ordered per space (the append authority for a process). +def serialize_event(event: DecisionEvent) -> bytes: + """Deterministic, stable-JSON wire form of an event (same bytes for equal events, any process). - A later binding swaps the storage for git + a per-space lease without changing this API. + Sorted keys + compact separators make the serialization canonical, so a git object hashed from + it is reproducible — the basis for content-addressable, comparable logs across backends. + """ + obj = { + "seq": event.seq, + "space": event.space, + "type": event.type.value, + "payload": event.payload, + "actor": event.actor, + "timestamp": event.timestamp.isoformat(), + } + return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode() + + +def deserialize_event(data: bytes | str) -> DecisionEvent: + """Inverse of :func:`serialize_event` — round-trips an event byte-for-byte by field.""" + obj = json.loads(data) + return DecisionEvent( + seq=obj["seq"], + space=obj["space"], + type=EventType(obj["type"]), + payload=obj["payload"], + actor=obj["actor"], + timestamp=datetime.fromisoformat(obj["timestamp"]), + ) + + +@runtime_checkable +class EventStore(Protocol): + """Append-only, per-space ordered storage behind :class:`DecisionLog`. + + Two bindings exist: :class:`InMemoryEventStore` (default/test double) and + :class:`~shard_wiki.coordination.git_event_store.GitEventStore` (git-addressable). Both assign + a per-space monotonic ``seq`` at the log head and guarantee read-your-writes for their reach + (in-process for memory; cross-process for git). + """ + + def append( + self, space: str, type: EventType, payload: Mapping[str, Any], actor: str | None = None + ) -> DecisionEvent: ... + + def events(self, space: str) -> tuple[DecisionEvent, ...]: ... + + +class InMemoryEventStore: + """In-process append-only store, totally ordered per space (the append authority for a process). + + The default test double; the git backend preserves this exact contract on durable storage. """ def __init__(self) -> None: @@ -84,10 +145,33 @@ class DecisionLog: self._events.setdefault(space, []).append(event) return event + def events(self, space: str) -> tuple[DecisionEvent, ...]: + return tuple(self._events.get(space, ())) + + +class DecisionLog: + """Append-only decision log, totally ordered per space, with a derived :meth:`fold`. + + Storage is delegated to an :class:`EventStore` (default :class:`InMemoryEventStore`); swapping + in the git backend changes only durability + the concurrency model, not this API or the fold. + """ + + def __init__(self, store: EventStore | None = None) -> None: + self._store: EventStore = store if store is not None else InMemoryEventStore() + + def append( + self, + space: str, + type: EventType, + payload: Mapping[str, Any], + actor: str | None = None, + ) -> DecisionEvent: + return self._store.append(space, type, payload, actor=actor) + def events(self, space: str) -> tuple[DecisionEvent, ...]: """The space's events in append (total) order. Read-your-writes: a just-appended event is present immediately.""" - return tuple(self._events.get(space, ())) + return self._store.events(space) def fold(self, space: str) -> CoordinationState: """Replay the log into current coordination state (derived = f(log)).""" diff --git a/src/shard_wiki/coordination/git_event_store.py b/src/shard_wiki/coordination/git_event_store.py new file mode 100644 index 0000000..8c83def --- /dev/null +++ b/src/shard_wiki/coordination/git_event_store.py @@ -0,0 +1,154 @@ +"""GitEventStore — a git-addressable binding of :class:`EventStore` (SHARD-WP-0009 T1). + +Each space is a ref (``refs/spaces/``); each ``append`` writes the event as an +immutable git object (a one-blob tree committed onto the ref) and advances the ref. The commit +chain *is* the totally ordered log: ``seq`` is the depth, ``events`` walks first-parent from the +head oldest→newest. Coordination-canonical state therefore inherits git's history / patch / +review / backup affordances (I-6) and is read-your-writes correct across processes. + +The total order is enforced at storage by a **compare-and-swap** ref update +(``git update-ref ``): two appenders racing off the same head — the loser's CAS +fails and it retries off the new head, so a non-holder can never fork the log. The lease layer +(T2) sits *above* this as the append-authority policy; CAS is the mechanism that makes it safe. + +Implemented over the ``git`` CLI through :mod:`subprocess` — zero runtime dependencies. +""" + +from __future__ import annotations + +import hashlib +import os +import subprocess +from collections.abc import Mapping +from pathlib import Path +from typing import Any + +from shard_wiki.coordination.decision_log import ( + DecisionEvent, + EventType, + deserialize_event, + serialize_event, +) + +__all__ = ["GitEventStore"] + +# Fixed identity so commit objects are reproducible and never prompt for git config; the event's +# own timestamp/actor carry the real provenance, the commit is just the ordered container. +_GIT_IDENTITY = { + "GIT_AUTHOR_NAME": "shard-wiki", + "GIT_AUTHOR_EMAIL": "coordination@shard-wiki", + "GIT_COMMITTER_NAME": "shard-wiki", + "GIT_COMMITTER_EMAIL": "coordination@shard-wiki", +} +_EVENT_PATH = "event.json" +_MAX_CAS_RETRIES = 50 + + +class GitEventStore: + """Git-backed, append-only, per-space ordered event store (an :class:`EventStore`).""" + + def __init__(self, repo_path: str | Path) -> None: + self.repo_path = Path(repo_path) + self.repo_path.mkdir(parents=True, exist_ok=True) + if not (self.repo_path / "HEAD").exists() and not (self.repo_path / ".git").exists(): + self._git("init", "--quiet", str(self.repo_path), at_cwd=True) + + # -- EventStore contract ------------------------------------------------- + + def append( + self, + space: str, + type: EventType, + payload: Mapping[str, Any], + actor: str | None = None, + ) -> DecisionEvent: + """Append one event, advancing the space ref under compare-and-swap (retry-on-race).""" + ref = self._ref(space) + for _ in range(_MAX_CAS_RETRIES): + head = self._head(ref) + seq = self._count(ref, head) + event = DecisionEvent( + seq=seq, space=space, type=type, payload=dict(payload), actor=actor + ) + commit = self._commit_event(event, parent=head) + if self._cas_update(ref, new=commit, old=head): + return event + raise RuntimeError(f"append contention on {space!r}: exhausted {_MAX_CAS_RETRIES} retries") + + def events(self, space: str) -> tuple[DecisionEvent, ...]: + """The space's events oldest→newest (append/total order).""" + ref = self._ref(space) + head = self._head(ref) + if head is None: + return () + shas = self._git("rev-list", "--reverse", "--first-parent", ref).decode().split() + return tuple( + deserialize_event(self._git("cat-file", "blob", f"{sha}:{_EVENT_PATH}")) + for sha in shas + ) + + # -- git plumbing -------------------------------------------------------- + + def _commit_event(self, event: DecisionEvent, parent: str | None) -> str: + blob = self._git( + "hash-object", "-w", "--stdin", stdin=serialize_event(event) + ).decode().strip() + tree = self._git( + "mktree", stdin=f"100644 blob {blob}\t{_EVENT_PATH}\n".encode() + ).decode().strip() + args = ["commit-tree", tree, "-m", f"event {event.seq} {event.type.value}"] + if parent is not None: + args += ["-p", parent] + # Pin the commit date to the event's timestamp for reproducible objects. + date = event.timestamp.isoformat() + env = {**_GIT_IDENTITY, "GIT_AUTHOR_DATE": date, "GIT_COMMITTER_DATE": date} + return self._git(*args, env=env).decode().strip() + + def _cas_update(self, ref: str, new: str, old: str | None) -> bool: + """``git update-ref`` with the old value as a CAS guard (empty oldvalue == must-not-exist). + + Returns False if the ref moved since we read ``old`` (lost the race) — the caller retries. + """ + result = self._run("update-ref", ref, new, old if old is not None else "") + return result.returncode == 0 + + def _head(self, ref: str) -> str | None: + result = self._run("rev-parse", "--verify", "--quiet", ref) + out = result.stdout.decode().strip() + return out or None + + def _count(self, ref: str, head: str | None) -> int: + if head is None: + return 0 + return int(self._git("rev-list", "--count", "--first-parent", ref).decode().strip()) + + @staticmethod + def _ref(space: str) -> str: + return f"refs/spaces/{hashlib.sha1(space.encode()).hexdigest()}" + + def _git( + self, + *args: str, + stdin: bytes | None = None, + env: dict | None = None, + at_cwd: bool = False, + ) -> bytes: + result = self._run(*args, stdin=stdin, env=env, at_cwd=at_cwd, check=True) + return result.stdout + + def _run( + self, + *args: str, + stdin: bytes | None = None, + env: dict | None = None, + at_cwd: bool = False, + check: bool = False, + ) -> subprocess.CompletedProcess: + base = ["git"] if at_cwd else ["git", "-C", str(self.repo_path)] + return subprocess.run( + [*base, *args], + input=stdin, + capture_output=True, + env={**os.environ, **(env or {})}, + check=check, + ) diff --git a/tests/test_git_event_store.py b/tests/test_git_event_store.py new file mode 100644 index 0000000..397ec15 --- /dev/null +++ b/tests/test_git_event_store.py @@ -0,0 +1,84 @@ +"""Tests for the git-backed event store (SHARD-WP-0009 T1). + +The git backend must satisfy the same EventStore contract as the in-memory one (round-trip, +ordering, determinism) while making the log git-addressable. +""" + +import subprocess + +import pytest + +from shard_wiki.coordination import ( + DecisionLog, + EventType, + GitEventStore, + InMemoryEventStore, + deserialize_event, + serialize_event, +) + + +@pytest.fixture +def git_store(tmp_path): + return GitEventStore(tmp_path / "coord") + + +def test_append_git_read_round_trips(git_store): + log = DecisionLog(git_store) + ev = log.append("s", EventType.ALIAS_SET, {"alias": "Home", "target": "shardA:Index"}) + (read,) = log.events("s") + assert read.seq == ev.seq == 0 + assert read.space == "s" + assert read.type is EventType.ALIAS_SET + assert read.payload == {"alias": "Home", "target": "shardA:Index"} + + +def test_ordering_preserved_and_per_space_monotonic(git_store): + log = DecisionLog(git_store) + log.append("a", EventType.ALIAS_SET, {"alias": "X", "target": "s:1"}) + log.append("a", EventType.ALIAS_SET, {"alias": "Y", "target": "s:2"}) + log.append("b", EventType.ALIAS_SET, {"alias": "Z", "target": "s:3"}) + assert [e.seq for e in log.events("a")] == [0, 1] + assert [e.payload["alias"] for e in log.events("a")] == ["X", "Y"] + assert [e.seq for e in log.events("b")] == [0] # independent ref/ordering + + +def test_each_append_is_a_git_commit(git_store): + log = DecisionLog(git_store) + log.append("s", EventType.BINDING_MADE, {"members": ["a", "b"]}) + log.append("s", EventType.PAGE_FORKED, {"source": "a", "fork": "c"}) + ref = GitEventStore._ref("s") + count = subprocess.run( + ["git", "-C", str(git_store.repo_path), "rev-list", "--count", ref], + capture_output=True, text=True, check=True, + ).stdout.strip() + assert count == "2" # one immutable commit object per append + + +def test_deterministic_serialization_is_stable_and_sorted(): + log = InMemoryEventStore() + ev = log.append("s", EventType.ALIAS_SET, {"target": "z", "alias": "a"}) + blob = serialize_event(ev) + assert serialize_event(ev) == blob # stable across calls + assert blob.index(b'"alias"') < blob.index(b'"target"') # payload keys sorted, not insertion + assert deserialize_event(blob).payload == {"alias": "a", "target": "z"} + + +def test_git_fold_matches_in_memory_fold(git_store): + events = [ + (EventType.ALIAS_SET, {"alias": "Home", "target": "shardA:Index"}), + (EventType.BINDING_MADE, {"members": ["a", "b"]}), + (EventType.BINDING_MADE, {"members": ["b", "c"]}), + (EventType.ALIAS_SET, {"alias": "Home", "target": "shardB:Main"}), + ] + mem = DecisionLog(InMemoryEventStore()) + git = DecisionLog(git_store) + for typ, payload in events: + mem.append("s", typ, payload) + git.append("s", typ, payload) + assert git.fold("s").aliases == mem.fold("s").aliases + assert git.fold("s").equivalence_groups == mem.fold("s").equivalence_groups + + +def test_default_decisionlog_is_in_memory(): + assert isinstance(DecisionLog()._store, InMemoryEventStore)