diff --git a/src/shard_wiki/coordination/__init__.py b/src/shard_wiki/coordination/__init__.py index 9407c41..aa80bbe 100644 --- a/src/shard_wiki/coordination/__init__.py +++ b/src/shard_wiki/coordination/__init__.py @@ -10,6 +10,12 @@ from shard_wiki.coordination.decision_log import ( deserialize_event, serialize_event, ) +from shard_wiki.coordination.append_authority import ( + AppendAuthority, + Lease, + LeaseHeld, + LeaseRegistry, +) from shard_wiki.coordination.git_event_store import GitEventStore from shard_wiki.coordination.overlay import ( ApplyResult, @@ -27,6 +33,10 @@ __all__ = [ "EventStore", "InMemoryEventStore", "GitEventStore", + "Lease", + "LeaseHeld", + "LeaseRegistry", + "AppendAuthority", "serialize_event", "deserialize_event", "Overlay", diff --git a/src/shard_wiki/coordination/append_authority.py b/src/shard_wiki/coordination/append_authority.py new file mode 100644 index 0000000..6740348 --- /dev/null +++ b/src/shard_wiki/coordination/append_authority.py @@ -0,0 +1,158 @@ +"""Per-space append authority — the single-writer lease over the log (SHARD-WP-0009 T2). + +The log is a *total order per space* (§8.6). :class:`~shard_wiki.coordination.git_event_store` +makes a fork physically impossible via compare-and-swap; this layer adds the **policy** that gives +the order a single designated writer: a **per-space lease**. At most one node holds a space's lease +at a time; only the holder writes to the store. A non-holder does not write — it **forwards** its +append intent to the current holder, so intents from anywhere still land in one serialized stream. + +The lease is **time-bounded and re-grantable** (HA): if a holder dies, its lease expires and a new +node may take it, resuming appends from the log head (``seq`` stays contiguous across the hand-off). +A node holding a *stale* lease (already re-granted elsewhere) cannot write either — it discovers it +is no longer the holder and forwards instead, so a partitioned ex-holder can never fork the log. + +Mechanism over policy (CLAUDE.md): this provides the leasing *primitive*; who acquires when, and +the TTL, are the caller's policy. Single-coordinator only — distributed multi-node leasing and log +sharding are explicit non-goals of this workplan. +""" + +from __future__ import annotations + +import uuid +from collections.abc import Callable, Mapping +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from typing import Any + +from shard_wiki.coordination.decision_log import DecisionEvent, EventStore, EventType + +__all__ = ["Lease", "LeaseHeld", "LeaseRegistry", "AppendAuthority"] + + +def _utcnow() -> datetime: + return datetime.now(tz=timezone.utc) + + +@dataclass(frozen=True, slots=True) +class Lease: + """A time-bounded grant of single-writer authority over one space.""" + + space: str + holder: str + token: str + expires_at: datetime + + def valid_at(self, now: datetime) -> bool: + return now < self.expires_at + + +class LeaseHeld(Exception): + """Raised when a space's lease is validly held by a different node.""" + + def __init__(self, lease: Lease) -> None: + super().__init__( + f"space {lease.space!r} leased to {lease.holder!r} until {lease.expires_at}" + ) + self.lease = lease + + +class LeaseRegistry: + """The single coordinator's grant table: at most one *valid* lease per space. + + A lease that has expired is freely re-grantable to any node (the HA replacement path); a still + valid lease is exclusive to its holder (renewable by that holder). The registry also routes + forwarded append intents to the current holder node. + """ + + def __init__(self, clock: Callable[[], datetime] = _utcnow) -> None: + self._clock = clock + self._leases: dict[str, Lease] = {} + self._nodes: dict[str, AppendAuthority] = {} + + def register(self, node: AppendAuthority) -> None: + self._nodes[node.node_id] = node + + def grant(self, space: str, holder: str, ttl_seconds: float) -> Lease: + """Grant/renew the lease for ``space`` to ``holder``; raise :class:`LeaseHeld` if another + node still holds it validly. An expired lease is re-grantable to anyone.""" + now = self._clock() + current = self._leases.get(space) + if current is not None and current.valid_at(now) and current.holder != holder: + raise LeaseHeld(current) + lease = Lease( + space=space, + holder=holder, + token=uuid.uuid4().hex, + expires_at=now + timedelta(seconds=ttl_seconds), + ) + self._leases[space] = lease + return lease + + def current(self, space: str) -> Lease | None: + """The lease for ``space`` if one is currently valid, else None (expired/absent).""" + lease = self._leases.get(space) + return lease if lease is not None and lease.valid_at(self._clock()) else None + + def holder_node(self, space: str) -> AppendAuthority | None: + lease = self.current(space) + return self._nodes.get(lease.holder) if lease is not None else None + + +class AppendAuthority: + """A coordinator node that appends to the shared log only when it holds the space's lease. + + Nodes share one :class:`EventStore` and one :class:`LeaseRegistry`. ``append`` routes itself: + the holder writes; a non-holder forwards to whoever holds the lease (acquiring it first if the + space is currently unleased). The append API mirrors :class:`EventStore` so the authority is a + drop-in single-writer guard. + """ + + def __init__( + self, + node_id: str, + store: EventStore, + registry: LeaseRegistry, + ttl_seconds: float = 30.0, + ) -> None: + self.node_id = node_id + self._store = store + self._registry = registry + self._ttl = ttl_seconds + registry.register(self) + + def acquire(self, space: str) -> Lease: + """Take (or renew) the lease for ``space``. Raises :class:`LeaseHeld` if another node holds + it validly.""" + return self._registry.grant(space, self.node_id, self._ttl) + + def holds(self, space: str) -> bool: + lease = self._registry.current(space) + return lease is not None and lease.holder == self.node_id + + def append( + self, + space: str, + type: EventType, + payload: Mapping[str, Any], + actor: str | None = None, + ) -> DecisionEvent: + """Append via the single authority. If we hold the lease, write; otherwise forward to the + holder. If the space is unleased, acquire it first. A node with a *stale* lease forwards + (it is not the current holder) rather than writing — so it cannot fork the log.""" + holder_node = self._registry.holder_node(space) + if holder_node is None: + self.acquire(space) # unleased: take authority, then write below + holder_node = self + if holder_node is self: + return self._store.append(space, type, payload, actor=actor) + return holder_node._write(space, type, payload, actor=actor) + + def _write( + self, + space: str, + type: EventType, + payload: Mapping[str, Any], + actor: str | None, + ) -> DecisionEvent: + """Apply a forwarded intent. Called only on the lease holder by a forwarding peer.""" + return self._store.append(space, type, payload, actor=actor) diff --git a/tests/test_append_authority.py b/tests/test_append_authority.py new file mode 100644 index 0000000..14f5279 --- /dev/null +++ b/tests/test_append_authority.py @@ -0,0 +1,120 @@ +"""Tests for the per-space append authority / lease (SHARD-WP-0009 T2). + +A single append authority per space serializes appends into a total order; non-holders forward +intents to the holder; the lease is time-bounded and re-grantable (HA hand-off); a stale ex-holder +cannot fork the log. +""" + +from datetime import datetime, timedelta, timezone + +import pytest + +from shard_wiki.coordination import ( + AppendAuthority, + EventType, + GitEventStore, + InMemoryEventStore, + LeaseHeld, + LeaseRegistry, +) + + +class FakeClock: + def __init__(self): + self.now = datetime(2026, 1, 1, tzinfo=timezone.utc) + + def __call__(self): + return self.now + + def advance(self, seconds): + self.now += timedelta(seconds=seconds) + + +def test_only_one_node_holds_a_space_at_a_time(): + reg = LeaseRegistry() + a = AppendAuthority("A", InMemoryEventStore(), reg) + b = AppendAuthority("B", InMemoryEventStore(), reg) + a.acquire("s") + with pytest.raises(LeaseHeld): + b.acquire("s") # B is refused while A's lease is valid + + +def test_concurrent_appends_serialize_into_one_total_order(): + reg = LeaseRegistry() + store = InMemoryEventStore() + a = AppendAuthority("A", store, reg) + b = AppendAuthority("B", store, reg) + a.acquire("s") + # B is a non-holder: its append forwards to A, the holder. Interleave A and B writers. + a.append("s", EventType.ALIAS_SET, {"alias": "1", "target": "x:1"}) + b.append("s", EventType.ALIAS_SET, {"alias": "2", "target": "x:2"}) # forwarded + a.append("s", EventType.ALIAS_SET, {"alias": "3", "target": "x:3"}) + seqs = [e.seq for e in store.events("s")] + aliases = [e.payload["alias"] for e in store.events("s")] + assert seqs == [0, 1, 2] # contiguous total order despite two writers + assert aliases == ["1", "2", "3"] + + +def test_non_holder_forwards_rather_than_writing_directly(): + reg = LeaseRegistry() + store = InMemoryEventStore() + a = AppendAuthority("A", store, reg) + b = AppendAuthority("B", store, reg) + a.acquire("s") + assert not b.holds("s") + b.append("s", EventType.ALIAS_SET, {"alias": "fwd", "target": "x:1"}) + # The write landed on the shared store under A's authority, in one stream. + assert [e.payload["alias"] for e in store.events("s")] == ["fwd"] + + +def test_lease_handoff_resumes_from_head(): + clock = FakeClock() + reg = LeaseRegistry(clock=clock) + store = InMemoryEventStore() + a = AppendAuthority("A", store, reg, ttl_seconds=10) + b = AppendAuthority("B", store, reg, ttl_seconds=10) + a.acquire("s") + a.append("s", EventType.ALIAS_SET, {"alias": "0", "target": "x:0"}) + a.append("s", EventType.ALIAS_SET, {"alias": "1", "target": "x:1"}) + clock.advance(20) # A's lease expires (A "dies") + b.acquire("s") # re-grantable: B takes over + b.append("s", EventType.ALIAS_SET, {"alias": "2", "target": "x:2"}) + assert [e.seq for e in store.events("s")] == [0, 1, 2] # contiguous across hand-off + + +def test_stale_ex_holder_cannot_fork_the_log(): + clock = FakeClock() + reg = LeaseRegistry(clock=clock) + store = InMemoryEventStore() + a = AppendAuthority("A", store, reg, ttl_seconds=10) + b = AppendAuthority("B", store, reg, ttl_seconds=10) + a.acquire("s") + a.append("s", EventType.ALIAS_SET, {"alias": "0", "target": "x:0"}) + clock.advance(20) + b.acquire("s") # B is now the holder; A's lease is stale + b.append("s", EventType.ALIAS_SET, {"alias": "1", "target": "x:1"}) + # A still thinks it can write, but it's no longer the holder: its intent forwards to B. + assert not a.holds("s") + a.append("s", EventType.ALIAS_SET, {"alias": "2", "target": "x:2"}) + aliases = [e.payload["alias"] for e in store.events("s")] + assert aliases == ["0", "1", "2"] # one stream, no fork + + +def test_authority_over_git_store_keeps_total_order(tmp_path): + reg = LeaseRegistry() + store = GitEventStore(tmp_path / "coord") + a = AppendAuthority("A", store, reg) + b = AppendAuthority("B", store, reg) + a.acquire("s") + a.append("s", EventType.BINDING_MADE, {"members": ["a", "b"]}) + b.append("s", EventType.PAGE_FORKED, {"source": "a", "fork": "c"}) # forwarded + assert [e.seq for e in store.events("s")] == [0, 1] + + +def test_unleased_space_self_acquires_on_append(): + reg = LeaseRegistry() + store = InMemoryEventStore() + a = AppendAuthority("A", store, reg) + a.append("s", EventType.ALIAS_SET, {"alias": "x", "target": "y:1"}) # no explicit acquire + assert a.holds("s") + assert len(store.events("s")) == 1