generated from coulomb/repo-seed
feat(coordination): per-space append authority (lease) (WP-0009 T2)
A single append authority per space serializes appends into a total order: at most one node holds a space's lease; only the holder writes, non-holders forward their append intent to the holder. Leases are time-bounded and re-grantable, so a dead holder's lease expires and a new node resumes from the log head (seq stays contiguous). A stale ex-holder discovers it is no longer the holder and forwards rather than writing, so a partitioned node cannot fork the log. Works over both in-memory and git stores. Single-coordinator only (distributed leasing out of scope). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -10,6 +10,12 @@ from shard_wiki.coordination.decision_log import (
|
||||
deserialize_event,
|
||||
serialize_event,
|
||||
)
|
||||
from shard_wiki.coordination.append_authority import (
|
||||
AppendAuthority,
|
||||
Lease,
|
||||
LeaseHeld,
|
||||
LeaseRegistry,
|
||||
)
|
||||
from shard_wiki.coordination.git_event_store import GitEventStore
|
||||
from shard_wiki.coordination.overlay import (
|
||||
ApplyResult,
|
||||
@@ -27,6 +33,10 @@ __all__ = [
|
||||
"EventStore",
|
||||
"InMemoryEventStore",
|
||||
"GitEventStore",
|
||||
"Lease",
|
||||
"LeaseHeld",
|
||||
"LeaseRegistry",
|
||||
"AppendAuthority",
|
||||
"serialize_event",
|
||||
"deserialize_event",
|
||||
"Overlay",
|
||||
|
||||
158
src/shard_wiki/coordination/append_authority.py
Normal file
158
src/shard_wiki/coordination/append_authority.py
Normal file
@@ -0,0 +1,158 @@
|
||||
"""Per-space append authority — the single-writer lease over the log (SHARD-WP-0009 T2).
|
||||
|
||||
The log is a *total order per space* (§8.6). :class:`~shard_wiki.coordination.git_event_store`
|
||||
makes a fork physically impossible via compare-and-swap; this layer adds the **policy** that gives
|
||||
the order a single designated writer: a **per-space lease**. At most one node holds a space's lease
|
||||
at a time; only the holder writes to the store. A non-holder does not write — it **forwards** its
|
||||
append intent to the current holder, so intents from anywhere still land in one serialized stream.
|
||||
|
||||
The lease is **time-bounded and re-grantable** (HA): if a holder dies, its lease expires and a new
|
||||
node may take it, resuming appends from the log head (``seq`` stays contiguous across the hand-off).
|
||||
A node holding a *stale* lease (already re-granted elsewhere) cannot write either — it discovers it
|
||||
is no longer the holder and forwards instead, so a partitioned ex-holder can never fork the log.
|
||||
|
||||
Mechanism over policy (CLAUDE.md): this provides the leasing *primitive*; who acquires when, and
|
||||
the TTL, are the caller's policy. Single-coordinator only — distributed multi-node leasing and log
|
||||
sharding are explicit non-goals of this workplan.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from collections.abc import Callable, Mapping
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Any
|
||||
|
||||
from shard_wiki.coordination.decision_log import DecisionEvent, EventStore, EventType
|
||||
|
||||
__all__ = ["Lease", "LeaseHeld", "LeaseRegistry", "AppendAuthority"]
|
||||
|
||||
|
||||
def _utcnow() -> datetime:
|
||||
return datetime.now(tz=timezone.utc)
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class Lease:
|
||||
"""A time-bounded grant of single-writer authority over one space."""
|
||||
|
||||
space: str
|
||||
holder: str
|
||||
token: str
|
||||
expires_at: datetime
|
||||
|
||||
def valid_at(self, now: datetime) -> bool:
|
||||
return now < self.expires_at
|
||||
|
||||
|
||||
class LeaseHeld(Exception):
|
||||
"""Raised when a space's lease is validly held by a different node."""
|
||||
|
||||
def __init__(self, lease: Lease) -> None:
|
||||
super().__init__(
|
||||
f"space {lease.space!r} leased to {lease.holder!r} until {lease.expires_at}"
|
||||
)
|
||||
self.lease = lease
|
||||
|
||||
|
||||
class LeaseRegistry:
|
||||
"""The single coordinator's grant table: at most one *valid* lease per space.
|
||||
|
||||
A lease that has expired is freely re-grantable to any node (the HA replacement path); a still
|
||||
valid lease is exclusive to its holder (renewable by that holder). The registry also routes
|
||||
forwarded append intents to the current holder node.
|
||||
"""
|
||||
|
||||
def __init__(self, clock: Callable[[], datetime] = _utcnow) -> None:
|
||||
self._clock = clock
|
||||
self._leases: dict[str, Lease] = {}
|
||||
self._nodes: dict[str, AppendAuthority] = {}
|
||||
|
||||
def register(self, node: AppendAuthority) -> None:
|
||||
self._nodes[node.node_id] = node
|
||||
|
||||
def grant(self, space: str, holder: str, ttl_seconds: float) -> Lease:
|
||||
"""Grant/renew the lease for ``space`` to ``holder``; raise :class:`LeaseHeld` if another
|
||||
node still holds it validly. An expired lease is re-grantable to anyone."""
|
||||
now = self._clock()
|
||||
current = self._leases.get(space)
|
||||
if current is not None and current.valid_at(now) and current.holder != holder:
|
||||
raise LeaseHeld(current)
|
||||
lease = Lease(
|
||||
space=space,
|
||||
holder=holder,
|
||||
token=uuid.uuid4().hex,
|
||||
expires_at=now + timedelta(seconds=ttl_seconds),
|
||||
)
|
||||
self._leases[space] = lease
|
||||
return lease
|
||||
|
||||
def current(self, space: str) -> Lease | None:
|
||||
"""The lease for ``space`` if one is currently valid, else None (expired/absent)."""
|
||||
lease = self._leases.get(space)
|
||||
return lease if lease is not None and lease.valid_at(self._clock()) else None
|
||||
|
||||
def holder_node(self, space: str) -> AppendAuthority | None:
|
||||
lease = self.current(space)
|
||||
return self._nodes.get(lease.holder) if lease is not None else None
|
||||
|
||||
|
||||
class AppendAuthority:
|
||||
"""A coordinator node that appends to the shared log only when it holds the space's lease.
|
||||
|
||||
Nodes share one :class:`EventStore` and one :class:`LeaseRegistry`. ``append`` routes itself:
|
||||
the holder writes; a non-holder forwards to whoever holds the lease (acquiring it first if the
|
||||
space is currently unleased). The append API mirrors :class:`EventStore` so the authority is a
|
||||
drop-in single-writer guard.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
node_id: str,
|
||||
store: EventStore,
|
||||
registry: LeaseRegistry,
|
||||
ttl_seconds: float = 30.0,
|
||||
) -> None:
|
||||
self.node_id = node_id
|
||||
self._store = store
|
||||
self._registry = registry
|
||||
self._ttl = ttl_seconds
|
||||
registry.register(self)
|
||||
|
||||
def acquire(self, space: str) -> Lease:
|
||||
"""Take (or renew) the lease for ``space``. Raises :class:`LeaseHeld` if another node holds
|
||||
it validly."""
|
||||
return self._registry.grant(space, self.node_id, self._ttl)
|
||||
|
||||
def holds(self, space: str) -> bool:
|
||||
lease = self._registry.current(space)
|
||||
return lease is not None and lease.holder == self.node_id
|
||||
|
||||
def append(
|
||||
self,
|
||||
space: str,
|
||||
type: EventType,
|
||||
payload: Mapping[str, Any],
|
||||
actor: str | None = None,
|
||||
) -> DecisionEvent:
|
||||
"""Append via the single authority. If we hold the lease, write; otherwise forward to the
|
||||
holder. If the space is unleased, acquire it first. A node with a *stale* lease forwards
|
||||
(it is not the current holder) rather than writing — so it cannot fork the log."""
|
||||
holder_node = self._registry.holder_node(space)
|
||||
if holder_node is None:
|
||||
self.acquire(space) # unleased: take authority, then write below
|
||||
holder_node = self
|
||||
if holder_node is self:
|
||||
return self._store.append(space, type, payload, actor=actor)
|
||||
return holder_node._write(space, type, payload, actor=actor)
|
||||
|
||||
def _write(
|
||||
self,
|
||||
space: str,
|
||||
type: EventType,
|
||||
payload: Mapping[str, Any],
|
||||
actor: str | None,
|
||||
) -> DecisionEvent:
|
||||
"""Apply a forwarded intent. Called only on the lease holder by a forwarding peer."""
|
||||
return self._store.append(space, type, payload, actor=actor)
|
||||
120
tests/test_append_authority.py
Normal file
120
tests/test_append_authority.py
Normal file
@@ -0,0 +1,120 @@
|
||||
"""Tests for the per-space append authority / lease (SHARD-WP-0009 T2).
|
||||
|
||||
A single append authority per space serializes appends into a total order; non-holders forward
|
||||
intents to the holder; the lease is time-bounded and re-grantable (HA hand-off); a stale ex-holder
|
||||
cannot fork the log.
|
||||
"""
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
import pytest
|
||||
|
||||
from shard_wiki.coordination import (
|
||||
AppendAuthority,
|
||||
EventType,
|
||||
GitEventStore,
|
||||
InMemoryEventStore,
|
||||
LeaseHeld,
|
||||
LeaseRegistry,
|
||||
)
|
||||
|
||||
|
||||
class FakeClock:
|
||||
def __init__(self):
|
||||
self.now = datetime(2026, 1, 1, tzinfo=timezone.utc)
|
||||
|
||||
def __call__(self):
|
||||
return self.now
|
||||
|
||||
def advance(self, seconds):
|
||||
self.now += timedelta(seconds=seconds)
|
||||
|
||||
|
||||
def test_only_one_node_holds_a_space_at_a_time():
|
||||
reg = LeaseRegistry()
|
||||
a = AppendAuthority("A", InMemoryEventStore(), reg)
|
||||
b = AppendAuthority("B", InMemoryEventStore(), reg)
|
||||
a.acquire("s")
|
||||
with pytest.raises(LeaseHeld):
|
||||
b.acquire("s") # B is refused while A's lease is valid
|
||||
|
||||
|
||||
def test_concurrent_appends_serialize_into_one_total_order():
|
||||
reg = LeaseRegistry()
|
||||
store = InMemoryEventStore()
|
||||
a = AppendAuthority("A", store, reg)
|
||||
b = AppendAuthority("B", store, reg)
|
||||
a.acquire("s")
|
||||
# B is a non-holder: its append forwards to A, the holder. Interleave A and B writers.
|
||||
a.append("s", EventType.ALIAS_SET, {"alias": "1", "target": "x:1"})
|
||||
b.append("s", EventType.ALIAS_SET, {"alias": "2", "target": "x:2"}) # forwarded
|
||||
a.append("s", EventType.ALIAS_SET, {"alias": "3", "target": "x:3"})
|
||||
seqs = [e.seq for e in store.events("s")]
|
||||
aliases = [e.payload["alias"] for e in store.events("s")]
|
||||
assert seqs == [0, 1, 2] # contiguous total order despite two writers
|
||||
assert aliases == ["1", "2", "3"]
|
||||
|
||||
|
||||
def test_non_holder_forwards_rather_than_writing_directly():
|
||||
reg = LeaseRegistry()
|
||||
store = InMemoryEventStore()
|
||||
a = AppendAuthority("A", store, reg)
|
||||
b = AppendAuthority("B", store, reg)
|
||||
a.acquire("s")
|
||||
assert not b.holds("s")
|
||||
b.append("s", EventType.ALIAS_SET, {"alias": "fwd", "target": "x:1"})
|
||||
# The write landed on the shared store under A's authority, in one stream.
|
||||
assert [e.payload["alias"] for e in store.events("s")] == ["fwd"]
|
||||
|
||||
|
||||
def test_lease_handoff_resumes_from_head():
|
||||
clock = FakeClock()
|
||||
reg = LeaseRegistry(clock=clock)
|
||||
store = InMemoryEventStore()
|
||||
a = AppendAuthority("A", store, reg, ttl_seconds=10)
|
||||
b = AppendAuthority("B", store, reg, ttl_seconds=10)
|
||||
a.acquire("s")
|
||||
a.append("s", EventType.ALIAS_SET, {"alias": "0", "target": "x:0"})
|
||||
a.append("s", EventType.ALIAS_SET, {"alias": "1", "target": "x:1"})
|
||||
clock.advance(20) # A's lease expires (A "dies")
|
||||
b.acquire("s") # re-grantable: B takes over
|
||||
b.append("s", EventType.ALIAS_SET, {"alias": "2", "target": "x:2"})
|
||||
assert [e.seq for e in store.events("s")] == [0, 1, 2] # contiguous across hand-off
|
||||
|
||||
|
||||
def test_stale_ex_holder_cannot_fork_the_log():
|
||||
clock = FakeClock()
|
||||
reg = LeaseRegistry(clock=clock)
|
||||
store = InMemoryEventStore()
|
||||
a = AppendAuthority("A", store, reg, ttl_seconds=10)
|
||||
b = AppendAuthority("B", store, reg, ttl_seconds=10)
|
||||
a.acquire("s")
|
||||
a.append("s", EventType.ALIAS_SET, {"alias": "0", "target": "x:0"})
|
||||
clock.advance(20)
|
||||
b.acquire("s") # B is now the holder; A's lease is stale
|
||||
b.append("s", EventType.ALIAS_SET, {"alias": "1", "target": "x:1"})
|
||||
# A still thinks it can write, but it's no longer the holder: its intent forwards to B.
|
||||
assert not a.holds("s")
|
||||
a.append("s", EventType.ALIAS_SET, {"alias": "2", "target": "x:2"})
|
||||
aliases = [e.payload["alias"] for e in store.events("s")]
|
||||
assert aliases == ["0", "1", "2"] # one stream, no fork
|
||||
|
||||
|
||||
def test_authority_over_git_store_keeps_total_order(tmp_path):
|
||||
reg = LeaseRegistry()
|
||||
store = GitEventStore(tmp_path / "coord")
|
||||
a = AppendAuthority("A", store, reg)
|
||||
b = AppendAuthority("B", store, reg)
|
||||
a.acquire("s")
|
||||
a.append("s", EventType.BINDING_MADE, {"members": ["a", "b"]})
|
||||
b.append("s", EventType.PAGE_FORKED, {"source": "a", "fork": "c"}) # forwarded
|
||||
assert [e.seq for e in store.events("s")] == [0, 1]
|
||||
|
||||
|
||||
def test_unleased_space_self_acquires_on_append():
|
||||
reg = LeaseRegistry()
|
||||
store = InMemoryEventStore()
|
||||
a = AppendAuthority("A", store, reg)
|
||||
a.append("s", EventType.ALIAS_SET, {"alias": "x", "target": "y:1"}) # no explicit acquire
|
||||
assert a.holds("s")
|
||||
assert len(store.events("s")) == 1
|
||||
Reference in New Issue
Block a user