feat(incremental): indexed equivalence — blocking + verify (WP-0011 T1)

Detect equivalence (distinct identities holding the same page) without pairwise
O(N²): MinHash/LSH bands over content shingles + normalized-title buckets
generate candidates (blocking), then exact-fingerprint or Jaccard>=threshold
confirm them (verify), with curator decision-log bindings always forming edges.
Groups are the connected components of the edge set. Includes the incremental
add/update/remove internals used by T2. Matches a brute-force oracle. New
incremental/ package (minhash primitives + EquivalenceIndex).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-16 02:13:06 +02:00
parent d85d019543
commit 0b3ab2086f
4 changed files with 406 additions and 0 deletions

View File

@@ -0,0 +1,34 @@
"""incremental/ — the incremental-first derived tier (CoreArchitectureBlueprint §8.7).
Equivalence is **indexed** (blocking/LSH + verify), not pairwise O(N²); maintenance is
**change-driven** (delta with retraction + propagation, review B-4), keeping the derived tier equal
to a from-scratch rebuild — which becomes a bounded fallback, not the operational path. A
Merkle-style **digest** plus a background **consistency-checker** make ``derived = f(canonical)``
verified rather than asserted (I-2), self-healing on detected drift.
In-memory only for this slice (no persisted index store); per-partition structure is honoured but
multi-tenant deployment is later. Per the dependency rule this imports down (model/provenance) and
is wired by the orchestrator.
"""
from shard_wiki.incremental.equivalence import (
EquivalenceEdge,
EquivalenceIndex,
normalized_title,
)
from shard_wiki.incremental.minhash import (
MinHasher,
band_keys,
jaccard,
shingles,
)
__all__ = [
"shingles",
"MinHasher",
"band_keys",
"jaccard",
"EquivalenceEdge",
"EquivalenceIndex",
"normalized_title",
]

View File

@@ -0,0 +1,212 @@
"""Indexed equivalence — blocking + verify, incrementally maintained (SHARD-WP-0011 T1/T2).
Equivalence (two *distinct* identities holding the same page) is detected without pairwise O(N²):
1. **Blocking** generates candidate pairs — pages sharing a normalized-title bucket or an LSH band
(MinHash over content shingles).
2. **Verify** confirms a candidate — exact-body fingerprint match, or shingle Jaccard ≥ threshold —
plus **curator bindings** (explicit decision-log edges) which are always equivalence edges.
The index is **incrementally maintained** (T2): ``add`` / ``update`` / ``remove`` re-bucket the
changed page, **retract** the edges it leaves and **add** the edges it enters; equivalence groups
are the connected components of the current edge set, so a retraction that disconnects a component
**splits** a chorus automatically. A full :meth:`build` is just repeated ``add`` — the bounded
rebuild fallback. The invariant (and the test oracle): incremental state == a from-scratch rebuild.
"""
from __future__ import annotations
import hashlib
import re
from collections.abc import Iterable
from dataclasses import dataclass
from shard_wiki.incremental.minhash import MinHasher, band_keys, jaccard, shingles
from shard_wiki.model import Identity, Page
__all__ = ["EquivalenceEdge", "EquivalenceIndex", "normalized_title"]
_NONALNUM_RE = re.compile(r"[^a-z0-9]+")
def normalized_title(key: str) -> str:
"""A blocking bucket key: the last path segment, lowercased, stripped of non-alphanumerics."""
leaf = key.rsplit("/", 1)[-1]
return _NONALNUM_RE.sub("", leaf.lower())
@dataclass(frozen=True, slots=True)
class EquivalenceEdge:
"""A verified equivalence between two identities, tagged with why it was accepted."""
a: Identity
b: Identity
reason: str # "fingerprint" | "content" | "curator"
@dataclass(frozen=True, slots=True)
class _Entry:
shingle_set: frozenset[str]
bands: tuple[tuple[int, tuple[int, ...]], ...]
title: str
fingerprint: str
def _fingerprint(body: str) -> str:
return hashlib.blake2b(body.strip().encode("utf-8"), digest_size=16).hexdigest()
def _pair(a: Identity, b: Identity) -> frozenset[Identity]:
return frozenset((a, b))
class EquivalenceIndex:
"""An incrementally maintained, blocked-and-verified equivalence relation over union pages."""
def __init__(
self,
*,
num_perm: int = 64,
num_bands: int = 32,
threshold: float = 0.7,
hasher: MinHasher | None = None,
) -> None:
self.threshold = threshold
self.num_bands = num_bands
self._hasher = hasher or MinHasher(num_perm=num_perm)
self._entries: dict[Identity, _Entry] = {}
self._band_buckets: dict[tuple[int, tuple[int, ...]], set[Identity]] = {}
self._title_buckets: dict[str, set[Identity]] = {}
self._content_edges: dict[frozenset[Identity], str] = {}
self._curator_edges: set[frozenset[Identity]] = set()
# -- build / maintain ----------------------------------------------------
def build(
self,
pages: Iterable[Page],
curator_edges: Iterable[tuple[Identity, Identity]] = (),
) -> None:
"""Rebuild from scratch (the bounded fallback): add every page, then curator edges."""
self.__init__(
num_bands=self.num_bands, threshold=self.threshold, hasher=self._hasher
)
for page in pages:
self.add(page)
for a, b in curator_edges:
self.bind(a, b)
def add(self, page: Page) -> None:
"""Index a new (or, via :meth:`update`, refreshed) page and add its equivalence edges."""
identity = page.identity
entry = self._make_entry(page)
self._entries[identity] = entry
for key in entry.bands:
self._band_buckets.setdefault(key, set()).add(identity)
self._title_buckets.setdefault(entry.title, set()).add(identity)
for candidate in self._candidates(identity, entry):
reason = self._verify(identity, candidate)
if reason is not None:
self._content_edges[_pair(identity, candidate)] = reason
def remove(self, identity: Identity) -> None:
"""Drop a page: de-bucket it and retract every content edge incident to it."""
entry = self._entries.pop(identity, None)
if entry is None:
return
for key in entry.bands:
self._discard_bucket(self._band_buckets, key, identity)
self._discard_bucket(self._title_buckets, entry.title, identity)
for edge in [e for e in self._content_edges if identity in e]:
del self._content_edges[edge]
def update(self, page: Page) -> None:
"""Apply a change as retract-then-add: stale (bucket-exit) edges go, new edges arrive."""
self.remove(page.identity)
self.add(page)
def bind(self, a: Identity, b: Identity) -> None:
"""Record a curator equivalence (an explicit decision-log binding); always an edge."""
if a != b:
self._curator_edges.add(_pair(a, b))
def unbind(self, a: Identity, b: Identity) -> None:
self._curator_edges.discard(_pair(a, b))
# -- queries -------------------------------------------------------------
def edges(self) -> frozenset[frozenset[Identity]]:
"""All equivalence edges (content + curator) among currently present identities."""
present = self._entries.keys()
curator = {e for e in self._curator_edges if e <= present}
return frozenset(set(self._content_edges) | curator)
def groups(self) -> tuple[frozenset[Identity], ...]:
"""Equivalence groups: connected components of size ≥ 2 (union-find over the edges)."""
parent: dict[Identity, Identity] = {}
def find(x: Identity) -> Identity:
parent.setdefault(x, x)
root = x
while parent[root] != root:
root = parent[root]
while parent[x] != root:
parent[x], x = root, parent[x]
return root
for edge in self.edges():
a, b = tuple(edge)
ra, rb = find(a), find(b)
if ra != rb:
parent[ra] = rb
comps: dict[Identity, set[Identity]] = {}
for node in parent:
comps.setdefault(find(node), set()).add(node)
return tuple(
frozenset(members) for members in comps.values() if len(members) > 1
)
def equivalent_to(self, identity: Identity) -> frozenset[Identity]:
"""The equivalence group containing ``identity`` (including itself), else just itself."""
for group in self.groups():
if identity in group:
return group
return frozenset({identity})
# -- internals -----------------------------------------------------------
def _make_entry(self, page: Page) -> _Entry:
shingle_set = shingles(page.body)
signature = self._hasher.signature(shingle_set)
return _Entry(
shingle_set=shingle_set,
bands=band_keys(signature, self.num_bands),
title=normalized_title(page.identity.key),
fingerprint=_fingerprint(page.body),
)
def _candidates(self, identity: Identity, entry: _Entry) -> set[Identity]:
candidates: set[Identity] = set()
for key in entry.bands:
candidates |= self._band_buckets.get(key, set())
candidates |= self._title_buckets.get(entry.title, set())
candidates.discard(identity)
return candidates
def _verify(self, a: Identity, b: Identity) -> str | None:
ea, eb = self._entries[a], self._entries[b]
if ea.fingerprint == eb.fingerprint:
return "fingerprint"
if jaccard(ea.shingle_set, eb.shingle_set) >= self.threshold:
return "content"
return None
@staticmethod
def _discard_bucket(buckets: dict, key, identity: Identity) -> None:
bucket = buckets.get(key)
if bucket is not None:
bucket.discard(identity)
if not bucket:
del buckets[key]

View File

@@ -0,0 +1,71 @@
"""MinHash + LSH banding primitives for content-similarity blocking (SHARD-WP-0011 T1).
Pure, deterministic functions (fixed hashing, no per-run randomness) so the derived tier and its
digest are reproducible. Shingle a body into k-grams, MinHash the shingle set into a signature,
split the signature into LSH bands; two pages sharing a band are *candidates* for equivalence —
the cheap pre-filter that replaces pairwise O(N²) comparison.
"""
from __future__ import annotations
import hashlib
import random
import re
from collections.abc import Iterable
__all__ = ["shingles", "MinHasher", "band_keys", "jaccard"]
_WORD_RE = re.compile(r"\w+")
# Largest Mersenne prime below 2**61 — the modulus for the universal-hash permutations.
_PRIME = (1 << 61) - 1
def shingles(text: str, k: int = 3) -> frozenset[str]:
"""The set of word k-grams in ``text`` (lowercased). Short texts fall back to their word set."""
words = _WORD_RE.findall(text.lower())
if len(words) < k:
return frozenset(words)
return frozenset(" ".join(words[i : i + k]) for i in range(len(words) - k + 1))
def _stable_hash(token: str) -> int:
return int.from_bytes(hashlib.blake2b(token.encode("utf-8"), digest_size=8).digest(), "big")
class MinHasher:
"""A bank of ``num_perm`` universal hash permutations producing a fixed-length signature."""
def __init__(self, num_perm: int = 64, seed: int = 1) -> None:
self.num_perm = num_perm
rng = random.Random(seed)
self._coeffs = [
(rng.randrange(1, _PRIME), rng.randrange(0, _PRIME)) for _ in range(num_perm)
]
def signature(self, shingle_set: Iterable[str]) -> tuple[int, ...]:
"""The MinHash signature of ``shingle_set`` (empty set → all-``_PRIME`` sentinel)."""
hashed = [_stable_hash(s) for s in shingle_set]
if not hashed:
return tuple(_PRIME for _ in self._coeffs)
return tuple(min((a * h + b) % _PRIME for h in hashed) for a, b in self._coeffs)
def band_keys(
signature: tuple[int, ...], num_bands: int
) -> tuple[tuple[int, tuple[int, ...]], ...]:
"""Split a signature into ``num_bands`` band keys; two pages sharing one are LSH candidates."""
if num_bands <= 0 or len(signature) % num_bands != 0:
raise ValueError(f"signature length {len(signature)} not divisible into {num_bands} bands")
rows = len(signature) // num_bands
return tuple(
(b, signature[b * rows : (b + 1) * rows]) for b in range(num_bands)
)
def jaccard(a: frozenset[str], b: frozenset[str]) -> float:
"""Jaccard similarity of two shingle sets; two empty sets are defined as identical (1.0)."""
if not a and not b:
return 1.0
if not a or not b:
return 0.0
return len(a & b) / len(a | b)

View File

@@ -0,0 +1,89 @@
"""Tests for the indexed equivalence relation — blocking + verify (SHARD-WP-0011 T1)."""
from itertools import combinations
from shard_wiki.incremental import EquivalenceIndex, MinHasher, band_keys, jaccard, shingles
from shard_wiki.incremental.equivalence import _fingerprint
from shard_wiki.model import Identity, Page
from shard_wiki.provenance import ProvenanceEnvelope
def _page(shard, key, body):
return Page(
identity=Identity(shard, key),
body=body,
envelope=ProvenanceEnvelope(source_shard=shard),
)
def _brute_force_groups(pages, threshold):
"""Oracle: O(N²) verify of every pair, then connected components."""
parent = {p.identity: p.identity for p in pages}
def find(x):
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
for p, q in combinations(pages, 2):
same_fp = _fingerprint(p.body) == _fingerprint(q.body)
sim = jaccard(shingles(p.body), shingles(q.body))
if same_fp or sim >= threshold:
parent[find(p.identity)] = find(q.identity)
comps = {}
for p in pages:
comps.setdefault(find(p.identity), set()).add(p.identity)
return {frozenset(v) for v in comps.values() if len(v) > 1}
def test_minhash_lsh_buckets_near_duplicates_together():
hasher = MinHasher(num_perm=64)
base = "the quick brown fox jumps over the lazy dog near the river bank today"
near = base + " and then some"
far = "completely unrelated content about astrophysics and distant galaxies far"
b_base = set(band_keys(hasher.signature(shingles(base)), 32))
b_near = set(band_keys(hasher.signature(shingles(near)), 32))
b_far = set(band_keys(hasher.signature(shingles(far)), 32))
assert b_base & b_near # near-duplicates share at least one band
assert not (b_base & b_far) # unrelated pages do not
def test_exact_duplicate_across_shards_is_equivalent():
idx = EquivalenceIndex()
idx.add(_page("A", "Foo", "identical body text here"))
idx.add(_page("B", "Bar", "identical body text here"))
assert idx.equivalent_to(Identity("A", "Foo")) == frozenset(
{Identity("A", "Foo"), Identity("B", "Bar")}
)
def test_unrelated_pages_are_not_equivalent():
idx = EquivalenceIndex()
idx.add(_page("A", "Foo", "alpha beta gamma delta epsilon"))
idx.add(_page("B", "Bar", "nothing in common whatsoever entirely"))
assert idx.groups() == ()
def test_curator_binding_forces_equivalence_regardless_of_content():
idx = EquivalenceIndex()
idx.add(_page("A", "Foo", "one thing"))
idx.add(_page("B", "Bar", "totally different"))
idx.bind(Identity("A", "Foo"), Identity("B", "Bar"))
assert idx.equivalent_to(Identity("A", "Foo")) == frozenset(
{Identity("A", "Foo"), Identity("B", "Bar")}
)
def test_index_matches_brute_force_oracle():
threshold = 0.7
shared = "shared sentence one shared sentence two shared sentence three end"
pages = [
_page("A", "Doc1", shared),
_page("B", "Doc1copy", shared + " minor tail"), # near-dup of A
_page("C", "Other", "a totally distinct page with no overlapping shingles at all here"),
_page("D", "Lonely", "yet another isolated document about unrelated subject matter alone"),
]
idx = EquivalenceIndex(threshold=threshold)
idx.build(pages)
assert set(idx.groups()) == _brute_force_groups(pages, threshold)