feat(views): RecentChanges merged change feed (WP-0010 T3)

One newest-first feed merging the coordination journal (overlay/alias/fork/merge/
binding decisions, with actor + payload) and shard change signals (page
source_rev / mtime). Each entry carries provenance: the originating shard for an
edit, or 'coordination' (and the actor) for a decision. Non-temporal revision
tokens are skipped gracefully. Derived/recomputable; notify-streaming later.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-16 01:59:11 +02:00
parent da540d4eea
commit 34b0c539f3
3 changed files with 178 additions and 0 deletions

View File

@@ -13,6 +13,7 @@ from shard_wiki.views.links import (
extract_links,
resolve_links,
)
from shard_wiki.views.recentchanges import ChangeEntry, recent_changes
__all__ = [
"WikiLink",
@@ -22,4 +23,6 @@ __all__ = [
"BackLink",
"BackLinksIndex",
"build_backlinks",
"ChangeEntry",
"recent_changes",
]

View File

@@ -0,0 +1,108 @@
"""RecentChanges — a merged change feed over the union (SHARD-WP-0010 T3; UC-17).
Two streams, one ordered feed (newest-first):
* the **coordination journal** — overlay/alias/fork/merge/binding decisions from the decision log,
each carrying its actor and the decision payload; and
* **shard change signals** — a page's current revision (folder mtime / ``source_rev``), i.e. the
backend's own "this changed" evidence.
Every entry carries provenance: which shard the edit came from, or that it was a coordination
decision (and by whom). Derived/recomputable — `notify`-driven streaming is a later binding.
"""
from __future__ import annotations
from collections.abc import Mapping
from dataclasses import dataclass, field
from datetime import datetime
from shard_wiki.coordination import DecisionLog, EventType
from shard_wiki.union import UnionGraph
__all__ = ["ChangeEntry", "recent_changes"]
_COORDINATION = "coordination"
# How each journal event names the thing it touched + a human kind label.
_EVENT_KIND = {
EventType.ALIAS_SET: "alias",
EventType.OVERLAY_CREATED: "overlay",
EventType.MERGE_DECIDED: "merge",
EventType.PAGE_FORKED: "fork",
EventType.BINDING_MADE: "binding",
}
@dataclass(frozen=True, slots=True)
class ChangeEntry:
"""One change in the feed. ``source`` is the shard id (a shard edit) or ``"coordination"``."""
when: datetime
kind: str
ref: str
source: str
actor: str | None = None
detail: Mapping[str, object] = field(default_factory=dict)
def _event_ref(event_type: EventType, payload: Mapping[str, object]) -> str:
if event_type is EventType.ALIAS_SET:
return str(payload.get("alias", ""))
if event_type is EventType.OVERLAY_CREATED:
return f"{payload.get('target_shard')}:{payload.get('target_key')}"
if event_type is EventType.PAGE_FORKED:
return f"{payload.get('source')}{payload.get('fork')}"
if event_type is EventType.BINDING_MADE:
return ", ".join(str(m) for m in payload.get("members", ()))
return str(payload.get("overlay_id", "")) # MERGE_DECIDED
def recent_changes(
union: UnionGraph,
log: DecisionLog,
space: str,
*,
limit: int | None = None,
) -> tuple[ChangeEntry, ...]:
"""Merge the coordination journal and shard change signals into one newest-first feed."""
entries: list[ChangeEntry] = []
for event in log.events(space):
entries.append(
ChangeEntry(
when=event.timestamp,
kind=_EVENT_KIND.get(event.type, event.type.value),
ref=_event_ref(event.type, event.payload),
source=_COORDINATION,
actor=event.actor,
detail=dict(event.payload),
)
)
for page in union.iter_pages():
rev = page.envelope.source_rev
when = _parse_rev(rev)
if when is None:
continue # shard offers no change signal for this page — skip gracefully
entries.append(
ChangeEntry(
when=when,
kind="edit",
ref=str(page.identity),
source=page.identity.shard,
detail={"source_rev": rev},
)
)
entries.sort(key=lambda e: e.when, reverse=True)
return tuple(entries if limit is None else entries[:limit])
def _parse_rev(rev: str | None) -> datetime | None:
if rev is None:
return None
try:
return datetime.fromisoformat(rev)
except ValueError:
return None # non-temporal revision token (e.g. a content hash) — no feed timestamp

View File

@@ -0,0 +1,67 @@
"""Tests for the RecentChanges merged feed (SHARD-WP-0010 T3)."""
import os
from datetime import datetime, timezone
from shard_wiki.adapters import FolderAdapter
from shard_wiki.coordination import DecisionLog, EventType
from shard_wiki.union import UnionGraph
from shard_wiki.views import recent_changes
def _shard(tmp_path, name, files, mtime=None):
root = tmp_path / name
for rel, text in files.items():
p = root / rel
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(text, encoding="utf-8")
if mtime is not None:
os.utime(p, (mtime, mtime))
return FolderAdapter(name, root)
def test_edit_and_alias_both_appear_newest_first(tmp_path):
# Page edit signal pinned to an old mtime; the alias decision happens "now" → alias is newest.
old = datetime(2020, 1, 1, tzinfo=timezone.utc).timestamp()
u = UnionGraph("space")
u.attach(_shard(tmp_path, "shardA", {"Home.md": "home"}, mtime=old))
log = DecisionLog()
log.append("space", EventType.ALIAS_SET, {"alias": "Start", "target": "shardA:Home"})
feed = recent_changes(u, log, "space")
kinds = [e.kind for e in feed]
assert "edit" in kinds and "alias" in kinds
assert feed[0].kind == "alias" # newest first
assert feed[-1].kind == "edit"
# Monotonic non-increasing by time.
assert all(feed[i].when >= feed[i + 1].when for i in range(len(feed) - 1))
def test_per_shard_attribution_present(tmp_path):
u = UnionGraph("space")
u.attach(_shard(tmp_path, "shardA", {"A.md": "a"}))
u.attach(_shard(tmp_path, "shardB", {"B.md": "b"}))
feed = recent_changes(u, DecisionLog(), "space")
edits = {e.ref: e.source for e in feed if e.kind == "edit"}
assert edits["shardA:A"] == "shardA"
assert edits["shardB:B"] == "shardB" # each edit attributed to its shard
def test_coordination_entries_carry_actor_and_ref(tmp_path):
u = UnionGraph("space")
u.attach(_shard(tmp_path, "shardA", {"Doc.md": "x"}))
log = DecisionLog()
log.append(
"space", EventType.PAGE_FORKED, {"source": "shardA:Doc", "fork": "shardB:Doc"}, actor="ana"
)
fork = next(e for e in recent_changes(u, log, "space") if e.kind == "fork")
assert fork.source == "coordination"
assert fork.actor == "ana"
assert fork.ref == "shardA:Doc→shardB:Doc"
def test_limit_truncates_to_newest(tmp_path):
u = UnionGraph("space")
u.attach(_shard(tmp_path, "shardA", {"A.md": "a", "B.md": "b", "C.md": "c"}))
feed = recent_changes(u, DecisionLog(), "space", limit=2)
assert len(feed) == 2