from __future__ import annotations import hashlib import json import re from typing import Any _IDENTITY_PART_RE = re.compile(r"[^a-z0-9._@+-]+") _DASH_RE = re.compile(r"-+") def normalize_identity_part(value: object, *, fallback: str = "unknown") -> str: """Normalize one stable-key segment without making it opaque.""" text = re.sub(r"([a-z0-9])([A-Z])", r"\1-\2", str(value or "").strip()).lower() text = _IDENTITY_PART_RE.sub("-", text) text = _DASH_RE.sub("-", text).strip("._-+@") return text or fallback def short_fingerprint(value: object, *, length: int = 12) -> str: """Return a deterministic short SHA-256 fingerprint for identity suffixes.""" if length < 8: raise ValueError("fingerprints shorter than 8 characters are too collision-prone") if isinstance(value, str): payload = value else: payload = json.dumps(value, sort_keys=True, separators=(",", ":"), default=str) return hashlib.sha256(payload.encode("utf-8")).hexdigest()[:length] def discovery_stable_key( repo_slug: str, entity_kind: str, name: str, *, source_anchor: object | None = None, ) -> str: """Build a canonical discovery key for a repo-scoped candidate entity.""" key = "discovery:{repo}:{kind}:{name}".format( repo=normalize_identity_part(repo_slug), kind=normalize_identity_part(entity_kind), name=normalize_identity_part(name), ) if source_anchor is not None: key = f"{key}:{short_fingerprint(source_anchor)}" return _limit_stable_key(key) def relationship_stable_key( source_key: str, edge_type: str, target_key: str, *, evidence_scope: object | None = None, ) -> str: """Build a stable relationship key from normalized endpoints and edge type.""" payload = { "source": source_key, "edge_type": normalize_identity_part(edge_type), "target": target_key, "evidence_scope": evidence_scope or "", } return f"edge:{short_fingerprint(payload, length=20)}" def attribute_stable_key(entity_key: str, attribute_name: str, *, source_anchor: object | None = None) -> str: """Build a stable key for a discovered attribute on an entity.""" key = f"attribute:{str(entity_key).strip()}:{normalize_identity_part(attribute_name)}" if source_anchor is not None: key = f"{key}:{short_fingerprint(source_anchor)}" return _limit_stable_key(key) def replacement_scope_id( repo_slug: str, extractor_id: str, source_kind: str, *, source_path: str | None = None, ) -> str: """Build the scope id that controls safe replacement on rescans.""" key = "scope:{repo}:{extractor}:{source_kind}".format( repo=normalize_identity_part(repo_slug), extractor=normalize_identity_part(extractor_id), source_kind=normalize_identity_part(source_kind), ) if source_path: key = f"{key}:{short_fingerprint(source_path)}" return _limit_stable_key(key) def source_fingerprint(anchor: dict[str, Any]) -> str: """Fingerprint the stable parts of a source anchor.""" stable_anchor = { key: anchor.get(key) for key in ("source_kind", "path", "url", "ref", "line_start", "line_end", "json_pointer") if anchor.get(key) not in (None, "") } return short_fingerprint(stable_anchor, length=16) def _limit_stable_key(key: str, *, max_length: int = 240) -> str: if len(key) <= max_length: return key return f"{key[: max_length - 21].rstrip(':._-')}:{short_fingerprint(key, length=20)}"