"""Indexed equivalence — blocking + verify, incrementally maintained (SHARD-WP-0011 T1/T2). Equivalence (two *distinct* identities holding the same page) is detected without pairwise O(N²): 1. **Blocking** generates candidate pairs — pages sharing a normalized-title bucket or an LSH band (MinHash over content shingles). 2. **Verify** confirms a candidate — exact-body fingerprint match, or shingle Jaccard ≥ threshold — plus **curator bindings** (explicit decision-log edges) which are always equivalence edges. The index is **incrementally maintained** (T2): ``add`` / ``update`` / ``remove`` re-bucket the changed page, **retract** the edges it leaves and **add** the edges it enters; equivalence groups are the connected components of the current edge set, so a retraction that disconnects a component **splits** a chorus automatically. A full :meth:`build` is just repeated ``add`` — the bounded rebuild fallback. The invariant (and the test oracle): incremental state == a from-scratch rebuild. """ from __future__ import annotations import hashlib import re from collections.abc import Iterable from dataclasses import dataclass from shard_wiki.incremental.minhash import MinHasher, band_keys, jaccard, shingles from shard_wiki.model import Identity, Page __all__ = ["EquivalenceEdge", "EquivalenceIndex", "normalized_title"] _NONALNUM_RE = re.compile(r"[^a-z0-9]+") def normalized_title(key: str) -> str: """A blocking bucket key: the last path segment, lowercased, stripped of non-alphanumerics.""" leaf = key.rsplit("/", 1)[-1] return _NONALNUM_RE.sub("", leaf.lower()) @dataclass(frozen=True, slots=True) class EquivalenceEdge: """A verified equivalence between two identities, tagged with why it was accepted.""" a: Identity b: Identity reason: str # "fingerprint" | "content" | "curator" @dataclass(frozen=True, slots=True) class _Entry: shingle_set: frozenset[str] bands: tuple[tuple[int, tuple[int, ...]], ...] title: str fingerprint: str def _fingerprint(body: str) -> str: return hashlib.blake2b(body.strip().encode("utf-8"), digest_size=16).hexdigest() def _pair(a: Identity, b: Identity) -> frozenset[Identity]: return frozenset((a, b)) class EquivalenceIndex: """An incrementally maintained, blocked-and-verified equivalence relation over union pages.""" def __init__( self, *, num_perm: int = 64, num_bands: int = 32, threshold: float = 0.7, hasher: MinHasher | None = None, ) -> None: self.threshold = threshold self.num_bands = num_bands self._hasher = hasher or MinHasher(num_perm=num_perm) self._entries: dict[Identity, _Entry] = {} self._band_buckets: dict[tuple[int, tuple[int, ...]], set[Identity]] = {} self._title_buckets: dict[str, set[Identity]] = {} self._content_edges: dict[frozenset[Identity], str] = {} self._curator_edges: set[frozenset[Identity]] = set() # -- build / maintain ---------------------------------------------------- def build( self, pages: Iterable[Page], curator_edges: Iterable[tuple[Identity, Identity]] = (), ) -> None: """Rebuild from scratch (the bounded fallback): add every page, then curator edges.""" self.__init__( num_bands=self.num_bands, threshold=self.threshold, hasher=self._hasher ) for page in pages: self.add(page) for a, b in curator_edges: self.bind(a, b) def add(self, page: Page) -> None: """Index a new (or, via :meth:`update`, refreshed) page and add its equivalence edges.""" identity = page.identity entry = self._make_entry(page) self._entries[identity] = entry for key in entry.bands: self._band_buckets.setdefault(key, set()).add(identity) self._title_buckets.setdefault(entry.title, set()).add(identity) for candidate in self._candidates(identity, entry): reason = self._verify(identity, candidate) if reason is not None: self._content_edges[_pair(identity, candidate)] = reason def remove(self, identity: Identity) -> None: """Drop a page: de-bucket it and retract every content edge incident to it.""" entry = self._entries.pop(identity, None) if entry is None: return for key in entry.bands: self._discard_bucket(self._band_buckets, key, identity) self._discard_bucket(self._title_buckets, entry.title, identity) for edge in [e for e in self._content_edges if identity in e]: del self._content_edges[edge] def update(self, page: Page) -> None: """Apply a change as retract-then-add: stale (bucket-exit) edges go, new edges arrive.""" self.remove(page.identity) self.add(page) def bind(self, a: Identity, b: Identity) -> None: """Record a curator equivalence (an explicit decision-log binding); always an edge.""" if a != b: self._curator_edges.add(_pair(a, b)) def unbind(self, a: Identity, b: Identity) -> None: self._curator_edges.discard(_pair(a, b)) # -- queries ------------------------------------------------------------- def identities(self) -> frozenset[Identity]: """All identities currently present in the index.""" return frozenset(self._entries) def fingerprint(self, identity: Identity) -> str | None: """The content fingerprint indexed for ``identity`` (None if absent) — a digest leaf.""" entry = self._entries.get(identity) return entry.fingerprint if entry is not None else None def edges(self) -> frozenset[frozenset[Identity]]: """All equivalence edges (content + curator) among currently present identities.""" present = self._entries.keys() curator = {e for e in self._curator_edges if e <= present} return frozenset(set(self._content_edges) | curator) def groups(self) -> tuple[frozenset[Identity], ...]: """Equivalence groups: connected components of size ≥ 2 (union-find over the edges).""" parent: dict[Identity, Identity] = {} def find(x: Identity) -> Identity: parent.setdefault(x, x) root = x while parent[root] != root: root = parent[root] while parent[x] != root: parent[x], x = root, parent[x] return root for edge in self.edges(): a, b = tuple(edge) ra, rb = find(a), find(b) if ra != rb: parent[ra] = rb comps: dict[Identity, set[Identity]] = {} for node in parent: comps.setdefault(find(node), set()).add(node) return tuple( frozenset(members) for members in comps.values() if len(members) > 1 ) def equivalent_to(self, identity: Identity) -> frozenset[Identity]: """The equivalence group containing ``identity`` (including itself), else just itself.""" for group in self.groups(): if identity in group: return group return frozenset({identity}) # -- internals ----------------------------------------------------------- def _make_entry(self, page: Page) -> _Entry: shingle_set = shingles(page.body) signature = self._hasher.signature(shingle_set) return _Entry( shingle_set=shingle_set, bands=band_keys(signature, self.num_bands), title=normalized_title(page.identity.key), fingerprint=_fingerprint(page.body), ) def _candidates(self, identity: Identity, entry: _Entry) -> set[Identity]: candidates: set[Identity] = set() for key in entry.bands: candidates |= self._band_buckets.get(key, set()) candidates |= self._title_buckets.get(entry.title, set()) candidates.discard(identity) return candidates def _verify(self, a: Identity, b: Identity) -> str | None: ea, eb = self._entries[a], self._entries[b] if ea.fingerprint == eb.fingerprint: return "fingerprint" if jaccard(ea.shingle_set, eb.shingle_set) >= self.threshold: return "content" return None @staticmethod def _discard_bucket(buckets: dict, key, identity: Identity) -> None: bucket = buckets.get(key) if bucket is not None: bucket.discard(identity) if not bucket: del buckets[key]