generated from coulomb/repo-seed
114 lines
3.5 KiB
Python
114 lines
3.5 KiB
Python
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import re
|
|
from typing import Any
|
|
|
|
|
|
_IDENTITY_PART_RE = re.compile(r"[^a-z0-9._@+-]+")
|
|
_DASH_RE = re.compile(r"-+")
|
|
|
|
|
|
def normalize_identity_part(value: object, *, fallback: str = "unknown") -> str:
|
|
"""Normalize one stable-key segment without making it opaque."""
|
|
|
|
text = re.sub(r"([a-z0-9])([A-Z])", r"\1-\2", str(value or "").strip()).lower()
|
|
text = _IDENTITY_PART_RE.sub("-", text)
|
|
text = _DASH_RE.sub("-", text).strip("._-+@")
|
|
return text or fallback
|
|
|
|
|
|
def short_fingerprint(value: object, *, length: int = 12) -> str:
|
|
"""Return a deterministic short SHA-256 fingerprint for identity suffixes."""
|
|
|
|
if length < 8:
|
|
raise ValueError("fingerprints shorter than 8 characters are too collision-prone")
|
|
if isinstance(value, str):
|
|
payload = value
|
|
else:
|
|
payload = json.dumps(value, sort_keys=True, separators=(",", ":"), default=str)
|
|
return hashlib.sha256(payload.encode("utf-8")).hexdigest()[:length]
|
|
|
|
|
|
def discovery_stable_key(
|
|
repo_slug: str,
|
|
entity_kind: str,
|
|
name: str,
|
|
*,
|
|
source_anchor: object | None = None,
|
|
) -> str:
|
|
"""Build a canonical discovery key for a repo-scoped candidate entity."""
|
|
|
|
key = "discovery:{repo}:{kind}:{name}".format(
|
|
repo=normalize_identity_part(repo_slug),
|
|
kind=normalize_identity_part(entity_kind),
|
|
name=normalize_identity_part(name),
|
|
)
|
|
if source_anchor is not None:
|
|
key = f"{key}:{short_fingerprint(source_anchor)}"
|
|
return _limit_stable_key(key)
|
|
|
|
|
|
def relationship_stable_key(
|
|
source_key: str,
|
|
edge_type: str,
|
|
target_key: str,
|
|
*,
|
|
evidence_scope: object | None = None,
|
|
) -> str:
|
|
"""Build a stable relationship key from normalized endpoints and edge type."""
|
|
|
|
payload = {
|
|
"source": source_key,
|
|
"edge_type": normalize_identity_part(edge_type),
|
|
"target": target_key,
|
|
"evidence_scope": evidence_scope or "",
|
|
}
|
|
return f"edge:{short_fingerprint(payload, length=20)}"
|
|
|
|
|
|
def attribute_stable_key(entity_key: str, attribute_name: str, *, source_anchor: object | None = None) -> str:
|
|
"""Build a stable key for a discovered attribute on an entity."""
|
|
|
|
key = f"attribute:{str(entity_key).strip()}:{normalize_identity_part(attribute_name)}"
|
|
if source_anchor is not None:
|
|
key = f"{key}:{short_fingerprint(source_anchor)}"
|
|
return _limit_stable_key(key)
|
|
|
|
|
|
def replacement_scope_id(
|
|
repo_slug: str,
|
|
extractor_id: str,
|
|
source_kind: str,
|
|
*,
|
|
source_path: str | None = None,
|
|
) -> str:
|
|
"""Build the scope id that controls safe replacement on rescans."""
|
|
|
|
key = "scope:{repo}:{extractor}:{source_kind}".format(
|
|
repo=normalize_identity_part(repo_slug),
|
|
extractor=normalize_identity_part(extractor_id),
|
|
source_kind=normalize_identity_part(source_kind),
|
|
)
|
|
if source_path:
|
|
key = f"{key}:{short_fingerprint(source_path)}"
|
|
return _limit_stable_key(key)
|
|
|
|
|
|
def source_fingerprint(anchor: dict[str, Any]) -> str:
|
|
"""Fingerprint the stable parts of a source anchor."""
|
|
|
|
stable_anchor = {
|
|
key: anchor.get(key)
|
|
for key in ("source_kind", "path", "url", "ref", "line_start", "line_end", "json_pointer")
|
|
if anchor.get(key) not in (None, "")
|
|
}
|
|
return short_fingerprint(stable_anchor, length=16)
|
|
|
|
|
|
def _limit_stable_key(key: str, *, max_length: int = 240) -> str:
|
|
if len(key) <= max_length:
|
|
return key
|
|
return f"{key[: max_length - 21].rstrip(':._-')}:{short_fingerprint(key, length=20)}"
|