diff --git a/src/shard_wiki/incremental/__init__.py b/src/shard_wiki/incremental/__init__.py new file mode 100644 index 0000000..419c7a9 --- /dev/null +++ b/src/shard_wiki/incremental/__init__.py @@ -0,0 +1,34 @@ +"""incremental/ — the incremental-first derived tier (CoreArchitectureBlueprint §8.7). + +Equivalence is **indexed** (blocking/LSH + verify), not pairwise O(N²); maintenance is +**change-driven** (delta with retraction + propagation, review B-4), keeping the derived tier equal +to a from-scratch rebuild — which becomes a bounded fallback, not the operational path. A +Merkle-style **digest** plus a background **consistency-checker** make ``derived = f(canonical)`` +verified rather than asserted (I-2), self-healing on detected drift. + +In-memory only for this slice (no persisted index store); per-partition structure is honoured but +multi-tenant deployment is later. Per the dependency rule this imports down (model/provenance) and +is wired by the orchestrator. +""" + +from shard_wiki.incremental.equivalence import ( + EquivalenceEdge, + EquivalenceIndex, + normalized_title, +) +from shard_wiki.incremental.minhash import ( + MinHasher, + band_keys, + jaccard, + shingles, +) + +__all__ = [ + "shingles", + "MinHasher", + "band_keys", + "jaccard", + "EquivalenceEdge", + "EquivalenceIndex", + "normalized_title", +] diff --git a/src/shard_wiki/incremental/equivalence.py b/src/shard_wiki/incremental/equivalence.py new file mode 100644 index 0000000..d59d961 --- /dev/null +++ b/src/shard_wiki/incremental/equivalence.py @@ -0,0 +1,212 @@ +"""Indexed equivalence — blocking + verify, incrementally maintained (SHARD-WP-0011 T1/T2). + +Equivalence (two *distinct* identities holding the same page) is detected without pairwise O(N²): + +1. **Blocking** generates candidate pairs — pages sharing a normalized-title bucket or an LSH band + (MinHash over content shingles). +2. **Verify** confirms a candidate — exact-body fingerprint match, or shingle Jaccard ≥ threshold — + plus **curator bindings** (explicit decision-log edges) which are always equivalence edges. + +The index is **incrementally maintained** (T2): ``add`` / ``update`` / ``remove`` re-bucket the +changed page, **retract** the edges it leaves and **add** the edges it enters; equivalence groups +are the connected components of the current edge set, so a retraction that disconnects a component +**splits** a chorus automatically. A full :meth:`build` is just repeated ``add`` — the bounded +rebuild fallback. The invariant (and the test oracle): incremental state == a from-scratch rebuild. +""" + +from __future__ import annotations + +import hashlib +import re +from collections.abc import Iterable +from dataclasses import dataclass + +from shard_wiki.incremental.minhash import MinHasher, band_keys, jaccard, shingles +from shard_wiki.model import Identity, Page + +__all__ = ["EquivalenceEdge", "EquivalenceIndex", "normalized_title"] + +_NONALNUM_RE = re.compile(r"[^a-z0-9]+") + + +def normalized_title(key: str) -> str: + """A blocking bucket key: the last path segment, lowercased, stripped of non-alphanumerics.""" + leaf = key.rsplit("/", 1)[-1] + return _NONALNUM_RE.sub("", leaf.lower()) + + +@dataclass(frozen=True, slots=True) +class EquivalenceEdge: + """A verified equivalence between two identities, tagged with why it was accepted.""" + + a: Identity + b: Identity + reason: str # "fingerprint" | "content" | "curator" + + +@dataclass(frozen=True, slots=True) +class _Entry: + shingle_set: frozenset[str] + bands: tuple[tuple[int, tuple[int, ...]], ...] + title: str + fingerprint: str + + +def _fingerprint(body: str) -> str: + return hashlib.blake2b(body.strip().encode("utf-8"), digest_size=16).hexdigest() + + +def _pair(a: Identity, b: Identity) -> frozenset[Identity]: + return frozenset((a, b)) + + +class EquivalenceIndex: + """An incrementally maintained, blocked-and-verified equivalence relation over union pages.""" + + def __init__( + self, + *, + num_perm: int = 64, + num_bands: int = 32, + threshold: float = 0.7, + hasher: MinHasher | None = None, + ) -> None: + self.threshold = threshold + self.num_bands = num_bands + self._hasher = hasher or MinHasher(num_perm=num_perm) + self._entries: dict[Identity, _Entry] = {} + self._band_buckets: dict[tuple[int, tuple[int, ...]], set[Identity]] = {} + self._title_buckets: dict[str, set[Identity]] = {} + self._content_edges: dict[frozenset[Identity], str] = {} + self._curator_edges: set[frozenset[Identity]] = set() + + # -- build / maintain ---------------------------------------------------- + + def build( + self, + pages: Iterable[Page], + curator_edges: Iterable[tuple[Identity, Identity]] = (), + ) -> None: + """Rebuild from scratch (the bounded fallback): add every page, then curator edges.""" + self.__init__( + num_bands=self.num_bands, threshold=self.threshold, hasher=self._hasher + ) + for page in pages: + self.add(page) + for a, b in curator_edges: + self.bind(a, b) + + def add(self, page: Page) -> None: + """Index a new (or, via :meth:`update`, refreshed) page and add its equivalence edges.""" + identity = page.identity + entry = self._make_entry(page) + self._entries[identity] = entry + for key in entry.bands: + self._band_buckets.setdefault(key, set()).add(identity) + self._title_buckets.setdefault(entry.title, set()).add(identity) + + for candidate in self._candidates(identity, entry): + reason = self._verify(identity, candidate) + if reason is not None: + self._content_edges[_pair(identity, candidate)] = reason + + def remove(self, identity: Identity) -> None: + """Drop a page: de-bucket it and retract every content edge incident to it.""" + entry = self._entries.pop(identity, None) + if entry is None: + return + for key in entry.bands: + self._discard_bucket(self._band_buckets, key, identity) + self._discard_bucket(self._title_buckets, entry.title, identity) + for edge in [e for e in self._content_edges if identity in e]: + del self._content_edges[edge] + + def update(self, page: Page) -> None: + """Apply a change as retract-then-add: stale (bucket-exit) edges go, new edges arrive.""" + self.remove(page.identity) + self.add(page) + + def bind(self, a: Identity, b: Identity) -> None: + """Record a curator equivalence (an explicit decision-log binding); always an edge.""" + if a != b: + self._curator_edges.add(_pair(a, b)) + + def unbind(self, a: Identity, b: Identity) -> None: + self._curator_edges.discard(_pair(a, b)) + + # -- queries ------------------------------------------------------------- + + def edges(self) -> frozenset[frozenset[Identity]]: + """All equivalence edges (content + curator) among currently present identities.""" + present = self._entries.keys() + curator = {e for e in self._curator_edges if e <= present} + return frozenset(set(self._content_edges) | curator) + + def groups(self) -> tuple[frozenset[Identity], ...]: + """Equivalence groups: connected components of size ≥ 2 (union-find over the edges).""" + parent: dict[Identity, Identity] = {} + + def find(x: Identity) -> Identity: + parent.setdefault(x, x) + root = x + while parent[root] != root: + root = parent[root] + while parent[x] != root: + parent[x], x = root, parent[x] + return root + + for edge in self.edges(): + a, b = tuple(edge) + ra, rb = find(a), find(b) + if ra != rb: + parent[ra] = rb + + comps: dict[Identity, set[Identity]] = {} + for node in parent: + comps.setdefault(find(node), set()).add(node) + return tuple( + frozenset(members) for members in comps.values() if len(members) > 1 + ) + + def equivalent_to(self, identity: Identity) -> frozenset[Identity]: + """The equivalence group containing ``identity`` (including itself), else just itself.""" + for group in self.groups(): + if identity in group: + return group + return frozenset({identity}) + + # -- internals ----------------------------------------------------------- + + def _make_entry(self, page: Page) -> _Entry: + shingle_set = shingles(page.body) + signature = self._hasher.signature(shingle_set) + return _Entry( + shingle_set=shingle_set, + bands=band_keys(signature, self.num_bands), + title=normalized_title(page.identity.key), + fingerprint=_fingerprint(page.body), + ) + + def _candidates(self, identity: Identity, entry: _Entry) -> set[Identity]: + candidates: set[Identity] = set() + for key in entry.bands: + candidates |= self._band_buckets.get(key, set()) + candidates |= self._title_buckets.get(entry.title, set()) + candidates.discard(identity) + return candidates + + def _verify(self, a: Identity, b: Identity) -> str | None: + ea, eb = self._entries[a], self._entries[b] + if ea.fingerprint == eb.fingerprint: + return "fingerprint" + if jaccard(ea.shingle_set, eb.shingle_set) >= self.threshold: + return "content" + return None + + @staticmethod + def _discard_bucket(buckets: dict, key, identity: Identity) -> None: + bucket = buckets.get(key) + if bucket is not None: + bucket.discard(identity) + if not bucket: + del buckets[key] diff --git a/src/shard_wiki/incremental/minhash.py b/src/shard_wiki/incremental/minhash.py new file mode 100644 index 0000000..24d7240 --- /dev/null +++ b/src/shard_wiki/incremental/minhash.py @@ -0,0 +1,71 @@ +"""MinHash + LSH banding primitives for content-similarity blocking (SHARD-WP-0011 T1). + +Pure, deterministic functions (fixed hashing, no per-run randomness) so the derived tier and its +digest are reproducible. Shingle a body into k-grams, MinHash the shingle set into a signature, +split the signature into LSH bands; two pages sharing a band are *candidates* for equivalence — +the cheap pre-filter that replaces pairwise O(N²) comparison. +""" + +from __future__ import annotations + +import hashlib +import random +import re +from collections.abc import Iterable + +__all__ = ["shingles", "MinHasher", "band_keys", "jaccard"] + +_WORD_RE = re.compile(r"\w+") +# Largest Mersenne prime below 2**61 — the modulus for the universal-hash permutations. +_PRIME = (1 << 61) - 1 + + +def shingles(text: str, k: int = 3) -> frozenset[str]: + """The set of word k-grams in ``text`` (lowercased). Short texts fall back to their word set.""" + words = _WORD_RE.findall(text.lower()) + if len(words) < k: + return frozenset(words) + return frozenset(" ".join(words[i : i + k]) for i in range(len(words) - k + 1)) + + +def _stable_hash(token: str) -> int: + return int.from_bytes(hashlib.blake2b(token.encode("utf-8"), digest_size=8).digest(), "big") + + +class MinHasher: + """A bank of ``num_perm`` universal hash permutations producing a fixed-length signature.""" + + def __init__(self, num_perm: int = 64, seed: int = 1) -> None: + self.num_perm = num_perm + rng = random.Random(seed) + self._coeffs = [ + (rng.randrange(1, _PRIME), rng.randrange(0, _PRIME)) for _ in range(num_perm) + ] + + def signature(self, shingle_set: Iterable[str]) -> tuple[int, ...]: + """The MinHash signature of ``shingle_set`` (empty set → all-``_PRIME`` sentinel).""" + hashed = [_stable_hash(s) for s in shingle_set] + if not hashed: + return tuple(_PRIME for _ in self._coeffs) + return tuple(min((a * h + b) % _PRIME for h in hashed) for a, b in self._coeffs) + + +def band_keys( + signature: tuple[int, ...], num_bands: int +) -> tuple[tuple[int, tuple[int, ...]], ...]: + """Split a signature into ``num_bands`` band keys; two pages sharing one are LSH candidates.""" + if num_bands <= 0 or len(signature) % num_bands != 0: + raise ValueError(f"signature length {len(signature)} not divisible into {num_bands} bands") + rows = len(signature) // num_bands + return tuple( + (b, signature[b * rows : (b + 1) * rows]) for b in range(num_bands) + ) + + +def jaccard(a: frozenset[str], b: frozenset[str]) -> float: + """Jaccard similarity of two shingle sets; two empty sets are defined as identical (1.0).""" + if not a and not b: + return 1.0 + if not a or not b: + return 0.0 + return len(a & b) / len(a | b) diff --git a/tests/test_incremental_equivalence.py b/tests/test_incremental_equivalence.py new file mode 100644 index 0000000..0d366b7 --- /dev/null +++ b/tests/test_incremental_equivalence.py @@ -0,0 +1,89 @@ +"""Tests for the indexed equivalence relation — blocking + verify (SHARD-WP-0011 T1).""" + +from itertools import combinations + +from shard_wiki.incremental import EquivalenceIndex, MinHasher, band_keys, jaccard, shingles +from shard_wiki.incremental.equivalence import _fingerprint +from shard_wiki.model import Identity, Page +from shard_wiki.provenance import ProvenanceEnvelope + + +def _page(shard, key, body): + return Page( + identity=Identity(shard, key), + body=body, + envelope=ProvenanceEnvelope(source_shard=shard), + ) + + +def _brute_force_groups(pages, threshold): + """Oracle: O(N²) verify of every pair, then connected components.""" + parent = {p.identity: p.identity for p in pages} + + def find(x): + while parent[x] != x: + parent[x] = parent[parent[x]] + x = parent[x] + return x + + for p, q in combinations(pages, 2): + same_fp = _fingerprint(p.body) == _fingerprint(q.body) + sim = jaccard(shingles(p.body), shingles(q.body)) + if same_fp or sim >= threshold: + parent[find(p.identity)] = find(q.identity) + comps = {} + for p in pages: + comps.setdefault(find(p.identity), set()).add(p.identity) + return {frozenset(v) for v in comps.values() if len(v) > 1} + + +def test_minhash_lsh_buckets_near_duplicates_together(): + hasher = MinHasher(num_perm=64) + base = "the quick brown fox jumps over the lazy dog near the river bank today" + near = base + " and then some" + far = "completely unrelated content about astrophysics and distant galaxies far" + b_base = set(band_keys(hasher.signature(shingles(base)), 32)) + b_near = set(band_keys(hasher.signature(shingles(near)), 32)) + b_far = set(band_keys(hasher.signature(shingles(far)), 32)) + assert b_base & b_near # near-duplicates share at least one band + assert not (b_base & b_far) # unrelated pages do not + + +def test_exact_duplicate_across_shards_is_equivalent(): + idx = EquivalenceIndex() + idx.add(_page("A", "Foo", "identical body text here")) + idx.add(_page("B", "Bar", "identical body text here")) + assert idx.equivalent_to(Identity("A", "Foo")) == frozenset( + {Identity("A", "Foo"), Identity("B", "Bar")} + ) + + +def test_unrelated_pages_are_not_equivalent(): + idx = EquivalenceIndex() + idx.add(_page("A", "Foo", "alpha beta gamma delta epsilon")) + idx.add(_page("B", "Bar", "nothing in common whatsoever entirely")) + assert idx.groups() == () + + +def test_curator_binding_forces_equivalence_regardless_of_content(): + idx = EquivalenceIndex() + idx.add(_page("A", "Foo", "one thing")) + idx.add(_page("B", "Bar", "totally different")) + idx.bind(Identity("A", "Foo"), Identity("B", "Bar")) + assert idx.equivalent_to(Identity("A", "Foo")) == frozenset( + {Identity("A", "Foo"), Identity("B", "Bar")} + ) + + +def test_index_matches_brute_force_oracle(): + threshold = 0.7 + shared = "shared sentence one shared sentence two shared sentence three end" + pages = [ + _page("A", "Doc1", shared), + _page("B", "Doc1copy", shared + " minor tail"), # near-dup of A + _page("C", "Other", "a totally distinct page with no overlapping shingles at all here"), + _page("D", "Lonely", "yet another isolated document about unrelated subject matter alone"), + ] + idx = EquivalenceIndex(threshold=threshold) + idx.build(pages) + assert set(idx.groups()) == _brute_force_groups(pages, threshold)