Files
railiance-fabric/railiance_fabric/discovery.py

114 lines
3.5 KiB
Python

from __future__ import annotations
import hashlib
import json
import re
from typing import Any
_IDENTITY_PART_RE = re.compile(r"[^a-z0-9._@+-]+")
_DASH_RE = re.compile(r"-+")
def normalize_identity_part(value: object, *, fallback: str = "unknown") -> str:
"""Normalize one stable-key segment without making it opaque."""
text = re.sub(r"([a-z0-9])([A-Z])", r"\1-\2", str(value or "").strip()).lower()
text = _IDENTITY_PART_RE.sub("-", text)
text = _DASH_RE.sub("-", text).strip("._-+@")
return text or fallback
def short_fingerprint(value: object, *, length: int = 12) -> str:
"""Return a deterministic short SHA-256 fingerprint for identity suffixes."""
if length < 8:
raise ValueError("fingerprints shorter than 8 characters are too collision-prone")
if isinstance(value, str):
payload = value
else:
payload = json.dumps(value, sort_keys=True, separators=(",", ":"), default=str)
return hashlib.sha256(payload.encode("utf-8")).hexdigest()[:length]
def discovery_stable_key(
repo_slug: str,
entity_kind: str,
name: str,
*,
source_anchor: object | None = None,
) -> str:
"""Build a canonical discovery key for a repo-scoped candidate entity."""
key = "discovery:{repo}:{kind}:{name}".format(
repo=normalize_identity_part(repo_slug),
kind=normalize_identity_part(entity_kind),
name=normalize_identity_part(name),
)
if source_anchor is not None:
key = f"{key}:{short_fingerprint(source_anchor)}"
return _limit_stable_key(key)
def relationship_stable_key(
source_key: str,
edge_type: str,
target_key: str,
*,
evidence_scope: object | None = None,
) -> str:
"""Build a stable relationship key from normalized endpoints and edge type."""
payload = {
"source": source_key,
"edge_type": normalize_identity_part(edge_type),
"target": target_key,
"evidence_scope": evidence_scope or "",
}
return f"edge:{short_fingerprint(payload, length=20)}"
def attribute_stable_key(entity_key: str, attribute_name: str, *, source_anchor: object | None = None) -> str:
"""Build a stable key for a discovered attribute on an entity."""
key = f"attribute:{str(entity_key).strip()}:{normalize_identity_part(attribute_name)}"
if source_anchor is not None:
key = f"{key}:{short_fingerprint(source_anchor)}"
return _limit_stable_key(key)
def replacement_scope_id(
repo_slug: str,
extractor_id: str,
source_kind: str,
*,
source_path: str | None = None,
) -> str:
"""Build the scope id that controls safe replacement on rescans."""
key = "scope:{repo}:{extractor}:{source_kind}".format(
repo=normalize_identity_part(repo_slug),
extractor=normalize_identity_part(extractor_id),
source_kind=normalize_identity_part(source_kind),
)
if source_path:
key = f"{key}:{short_fingerprint(source_path)}"
return _limit_stable_key(key)
def source_fingerprint(anchor: dict[str, Any]) -> str:
"""Fingerprint the stable parts of a source anchor."""
stable_anchor = {
key: anchor.get(key)
for key in ("source_kind", "path", "url", "ref", "line_start", "line_end", "json_pointer")
if anchor.get(key) not in (None, "")
}
return short_fingerprint(stable_anchor, length=16)
def _limit_stable_key(key: str, *, max_length: int = 240) -> str:
if len(key) <= max_length:
return key
return f"{key[: max_length - 21].rstrip(':._-')}:{short_fingerprint(key, length=20)}"