diff --git a/src/shard_wiki/incremental/__init__.py b/src/shard_wiki/incremental/__init__.py index 419c7a9..ca9e646 100644 --- a/src/shard_wiki/incremental/__init__.py +++ b/src/shard_wiki/incremental/__init__.py @@ -22,6 +22,12 @@ from shard_wiki.incremental.minhash import ( jaccard, shingles, ) +from shard_wiki.incremental.verification import ( + ConsistencyChecker, + ConsistencyReport, + derived_digest, + region_digest, +) __all__ = [ "shingles", @@ -31,4 +37,8 @@ __all__ = [ "EquivalenceEdge", "EquivalenceIndex", "normalized_title", + "derived_digest", + "region_digest", + "ConsistencyReport", + "ConsistencyChecker", ] diff --git a/src/shard_wiki/incremental/equivalence.py b/src/shard_wiki/incremental/equivalence.py index d59d961..1357593 100644 --- a/src/shard_wiki/incremental/equivalence.py +++ b/src/shard_wiki/incremental/equivalence.py @@ -136,6 +136,15 @@ class EquivalenceIndex: # -- queries ------------------------------------------------------------- + def identities(self) -> frozenset[Identity]: + """All identities currently present in the index.""" + return frozenset(self._entries) + + def fingerprint(self, identity: Identity) -> str | None: + """The content fingerprint indexed for ``identity`` (None if absent) — a digest leaf.""" + entry = self._entries.get(identity) + return entry.fingerprint if entry is not None else None + def edges(self) -> frozenset[frozenset[Identity]]: """All equivalence edges (content + curator) among currently present identities.""" present = self._entries.keys() diff --git a/src/shard_wiki/incremental/verification.py b/src/shard_wiki/incremental/verification.py new file mode 100644 index 0000000..4c31f34 --- /dev/null +++ b/src/shard_wiki/incremental/verification.py @@ -0,0 +1,112 @@ +"""I-2 verification — digest + background consistency-checker (SHARD-WP-0011 T3). + +``derived = f(canonical)`` is made *verified*, not asserted. A **Merkle-style digest** summarizes +the derived tier (each identity's content fingerprint + its incident equivalence edges as a leaf, +order-independently combined into a root) so two derived states are equal iff their digests match. +A **consistency-checker** recomputes the authoritative fold from the current source, compares it to +the maintained index over a (sampled) region, and on mismatch performs a **scoped recompute** of +just the affected identities — self-healing drift from a missed delta or corrupted state. + +The digest is a pure function of index state, so it is "maintained alongside deltas" for free and +is stable under equivalent event orders (leaves are sorted before combination). +""" + +from __future__ import annotations + +import hashlib +from collections.abc import Callable, Iterable +from dataclasses import dataclass + +from shard_wiki.incremental.equivalence import EquivalenceIndex +from shard_wiki.model import Identity, Page + +__all__ = ["region_digest", "derived_digest", "ConsistencyReport", "ConsistencyChecker"] + +CuratorEdges = Iterable[tuple[Identity, Identity]] + + +def _leaf(index: EquivalenceIndex, identity: Identity) -> str: + """A digest leaf for one identity: its fingerprint + its incident edges (as sorted peers).""" + fingerprint = index.fingerprint(identity) or "∅" + peers = sorted( + str(other) + for edge in index.edges() + if identity in edge + for other in edge + if other != identity + ) + payload = f"{identity}|{fingerprint}|{','.join(peers)}" + return hashlib.blake2b(payload.encode("utf-8"), digest_size=16).hexdigest() + + +def region_digest(index: EquivalenceIndex, identities: Iterable[Identity]) -> str: + """A Merkle-style root over the given identities' leaves (order-independent).""" + leaves = sorted(_leaf(index, identity) for identity in identities) + root = hashlib.blake2b(digest_size=16) + for leaf in leaves: + root.update(leaf.encode("utf-8")) + return root.hexdigest() + + +def derived_digest(index: EquivalenceIndex) -> str: + """The digest of the whole maintained derived tier.""" + return region_digest(index, index.identities()) + + +@dataclass(frozen=True, slots=True) +class ConsistencyReport: + """Outcome of a consistency check: what was examined, whether it drifted, and if it healed.""" + + checked: int + drifted: bool + repaired: bool + healthy: bool + + +class ConsistencyChecker: + """Compares the maintained index against an authoritative rebuild and repairs drift in place.""" + + def __init__( + self, + index: EquivalenceIndex, + pages: Callable[[], Iterable[Page]], + curator_edges: Callable[[], CuratorEdges] = lambda: (), + ) -> None: + self._index = index + self._pages = pages + self._curator = curator_edges + + def _authoritative(self) -> EquivalenceIndex: + expected = EquivalenceIndex( + num_bands=self._index.num_bands, threshold=self._index.threshold + ) + expected.build(list(self._pages()), list(self._curator())) + return expected + + def check_and_repair(self, sample: Iterable[Identity] | None = None) -> ConsistencyReport: + """Verify the (sampled) region against a from-scratch fold; scoped-recompute on mismatch.""" + source = {p.identity: p for p in self._pages()} + expected = self._authoritative() + region = ( + set(sample) + if sample is not None + else set(source) | set(self._index.identities()) + ) + + drifted = region_digest(self._index, region) != region_digest(expected, region) + if not drifted: + return ConsistencyReport(len(region), drifted=False, repaired=False, healthy=True) + + self._repair(region, source) + healthy = region_digest(self._index, region) == region_digest(expected, region) + return ConsistencyReport(len(region), drifted=True, repaired=True, healthy=healthy) + + def _repair(self, region: set[Identity], source: dict[Identity, Page]) -> None: + """Scoped recompute: reconcile each affected identity to the current source.""" + present = self._index.identities() + for identity in region: + page = source.get(identity) + if page is not None: + self._index.update(page) if identity in present else self._index.add(page) + elif identity in present: + self._index.remove(identity) diff --git a/tests/test_incremental_verification.py b/tests/test_incremental_verification.py new file mode 100644 index 0000000..dfbb590 --- /dev/null +++ b/tests/test_incremental_verification.py @@ -0,0 +1,89 @@ +"""Tests for I-2 verification — digest + consistency-checker (SHARD-WP-0011 T3).""" + +from shard_wiki.incremental import ( + ConsistencyChecker, + EquivalenceIndex, + derived_digest, +) +from shard_wiki.model import Identity, Page +from shard_wiki.provenance import ProvenanceEnvelope + + +def _page(shard, key, body): + return Page( + identity=Identity(shard, key), + body=body, + envelope=ProvenanceEnvelope(source_shard=shard), + ) + + +def test_digest_is_stable_under_equivalent_event_orders(): + pages = [ + _page("A", "Foo", "shared body text here"), + _page("B", "Bar", "shared body text here"), + _page("C", "Baz", "an entirely separate unrelated document"), + ] + forward = EquivalenceIndex() + for p in pages: + forward.add(p) + reverse = EquivalenceIndex() + for p in reversed(pages): + reverse.add(p) + assert derived_digest(forward) == derived_digest(reverse) + + +def test_clean_index_reports_healthy(): + pages = [_page("A", "Foo", "same body"), _page("B", "Bar", "same body")] + idx = EquivalenceIndex() + idx.build(pages) + checker = ConsistencyChecker(idx, pages_fn := (lambda: pages)) + report = checker.check_and_repair() + assert report.drifted is False and report.healthy is True + assert pages_fn() # source unchanged + + +def test_missed_delta_drift_is_detected_and_repaired(): + a = _page("A", "Foo", "converging target body") + b = _page("B", "Bar", "initially unrelated separate text") + source = {"pages": [a, b]} + idx = EquivalenceIndex() + idx.build(source["pages"]) + assert idx.groups() == () # not equivalent yet + + # Source changes B to match A, but the index is never told (a missed delta → drift). + b2 = _page("B", "Bar", "converging target body") + source["pages"] = [a, b2] + + checker = ConsistencyChecker(idx, lambda: source["pages"]) + report = checker.check_and_repair() + assert report.drifted is True and report.repaired is True and report.healthy is True + # Self-healed: the index now reflects the equivalence. + assert idx.equivalent_to(Identity("A", "Foo")) == frozenset( + {Identity("A", "Foo"), Identity("B", "Bar")} + ) + + +def test_corrupted_internal_state_is_healed(): + a = _page("A", "Foo", "identical content") + b = _page("B", "Bar", "identical content") + idx = EquivalenceIndex() + idx.build([a, b]) + # Corrupt the derived tier directly: delete a true edge (simulated index corruption). + idx._content_edges.clear() + assert idx.groups() == () # corrupted away + + checker = ConsistencyChecker(idx, lambda: [a, b]) + report = checker.check_and_repair() + assert report.drifted is True and report.healthy is True + assert idx.groups() # edge restored by scoped recompute + + +def test_removed_source_page_is_reconciled(): + a = _page("A", "Foo", "same body") + b = _page("B", "Bar", "same body") + idx = EquivalenceIndex() + idx.build([a, b]) + checker = ConsistencyChecker(idx, lambda: [a]) # B vanished from source + report = checker.check_and_repair() + assert report.healthy is True + assert Identity("B", "Bar") not in idx.identities()