generated from coulomb/repo-seed
441 lines
16 KiB
Python
441 lines
16 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Iterable
|
|
|
|
from .discovery import normalize_identity_part, short_fingerprint
|
|
|
|
|
|
ORIGIN_PRECEDENCE = {
|
|
"repo_declaration": 0,
|
|
"deterministic": 1,
|
|
"catalog": 2,
|
|
"registry": 3,
|
|
"llm": 4,
|
|
"manual": 5,
|
|
}
|
|
|
|
PATH_SCOPED_NODE_KINDS = {
|
|
"container-build",
|
|
"deployment-service",
|
|
"domain-name",
|
|
"kubernetes-config-map",
|
|
"kubernetes-cron-job",
|
|
"kubernetes-daemon-set",
|
|
"kubernetes-deployment",
|
|
"kubernetes-horizontal-pod-autoscaler",
|
|
"kubernetes-ingress",
|
|
"kubernetes-job",
|
|
"kubernetes-namespace",
|
|
"kubernetes-secret",
|
|
"kubernetes-service",
|
|
"kubernetes-stateful-set",
|
|
"lockfile",
|
|
"network-port",
|
|
"score-workload",
|
|
"service-config",
|
|
}
|
|
EVIDENCE_AGGREGATE_EDGE_TYPES = {
|
|
"exposes_port",
|
|
"listens_on",
|
|
"names_endpoint",
|
|
"opens_port",
|
|
"resolves_to",
|
|
"routes_to_port",
|
|
"routes_to_service",
|
|
}
|
|
|
|
|
|
def reconcile_discovery_snapshots(
|
|
previous: dict[str, Any] | None,
|
|
current: dict[str, Any],
|
|
*,
|
|
retired_at: str | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Merge, deduplicate, diff, and tombstone discovery candidates."""
|
|
|
|
result = _copy_json(current)
|
|
retired_at = retired_at or _utc_now()
|
|
previous = previous or {}
|
|
conflicts: list[dict[str, object]] = []
|
|
diff = {"added": set(), "changed": set(), "retired": set(), "conflicted": set()}
|
|
replacement_scopes = _scope_modes(result)
|
|
result_candidates = result.setdefault("candidates", {"nodes": [], "edges": [], "attributes": []})
|
|
|
|
for collection in ("nodes", "edges", "attributes"):
|
|
previous_map = _candidate_map(previous, collection)
|
|
current_items = [
|
|
item for item in result_candidates.get(collection, [])
|
|
if isinstance(item, dict) and item.get("stable_key")
|
|
]
|
|
deduped, collection_conflicts = _dedupe_collection(collection, current_items)
|
|
conflicts.extend(collection_conflicts)
|
|
for conflict in collection_conflicts:
|
|
for key in conflict.get("candidates", []):
|
|
diff["conflicted"].add(str(key))
|
|
|
|
result_candidates[collection] = [deduped[key] for key in sorted(deduped)]
|
|
current_keys = set(deduped)
|
|
previous_keys = set(previous_map)
|
|
diff["added"].update(current_keys - previous_keys)
|
|
for key in current_keys & previous_keys:
|
|
if _candidate_fingerprint(deduped[key]) != _candidate_fingerprint(previous_map[key]):
|
|
diff["changed"].add(key)
|
|
for key in previous_keys - current_keys:
|
|
previous_candidate = previous_map[key]
|
|
scope_id = str(previous_candidate.get("replacement_scope") or "")
|
|
if replacement_scopes.get(scope_id) != "replacement":
|
|
continue
|
|
diff["retired"].add(key)
|
|
_append_tombstone(
|
|
result,
|
|
stable_key=key,
|
|
entity_kind=_entity_kind(collection),
|
|
replacement_scope=scope_id,
|
|
retired_at=retired_at,
|
|
previous_candidate=previous_candidate,
|
|
)
|
|
|
|
existing_conflicts = result.setdefault("reconciliation", {}).get("conflicts", [])
|
|
if not isinstance(existing_conflicts, list):
|
|
existing_conflicts = []
|
|
result["reconciliation"]["conflicts"] = [*existing_conflicts, *conflicts]
|
|
result["reconciliation"]["diff"] = {
|
|
key: sorted(values)
|
|
for key, values in diff.items()
|
|
}
|
|
result["tombstones"] = _dedupe_tombstones([
|
|
*list(previous.get("tombstones", []) if isinstance(previous.get("tombstones"), list) else []),
|
|
*list(result.get("tombstones", []) if isinstance(result.get("tombstones"), list) else []),
|
|
])
|
|
return result
|
|
|
|
|
|
def _dedupe_collection(
|
|
collection: str,
|
|
items: list[dict[str, Any]],
|
|
) -> tuple[dict[str, dict[str, Any]], list[dict[str, object]]]:
|
|
by_key: dict[str, dict[str, Any]] = {}
|
|
conflicts: list[dict[str, object]] = []
|
|
for item in items:
|
|
key = str(item["stable_key"])
|
|
by_key[key], merge_conflicts = _merge_candidate(collection, by_key.get(key), item)
|
|
conflicts.extend(merge_conflicts)
|
|
|
|
if collection == "nodes":
|
|
conflicts.extend(_node_conflicts(by_key))
|
|
elif collection == "edges":
|
|
conflicts.extend(_edge_conflicts(by_key))
|
|
elif collection == "attributes":
|
|
conflicts.extend(_attribute_conflicts(by_key))
|
|
|
|
conflicted_keys = {
|
|
str(key)
|
|
for conflict in conflicts
|
|
for key in conflict.get("candidates", [])
|
|
}
|
|
for key in conflicted_keys:
|
|
candidate = by_key.get(key)
|
|
if candidate is not None:
|
|
candidate["status"] = "conflicted"
|
|
candidate["review_state"] = "needs_review"
|
|
return by_key, conflicts
|
|
|
|
|
|
def _merge_candidate(
|
|
collection: str,
|
|
existing: dict[str, Any] | None,
|
|
incoming: dict[str, Any],
|
|
) -> tuple[dict[str, Any], list[dict[str, object]]]:
|
|
if existing is None:
|
|
return _copy_json(incoming), []
|
|
|
|
winner, loser = _higher_precedence(existing, incoming)
|
|
merged = _copy_json(winner)
|
|
conflicts: list[dict[str, object]] = []
|
|
for field in ("aliases", "provenance", "source_anchors"):
|
|
values = [*list(existing.get(field, [])), *list(incoming.get(field, []))]
|
|
if values:
|
|
merged[field] = _unique_json(values) if field != "aliases" else _unique_strings(values)
|
|
if isinstance(existing.get("confidence"), (int, float)) and isinstance(incoming.get("confidence"), (int, float)):
|
|
merged["confidence"] = max(float(existing["confidence"]), float(incoming["confidence"]))
|
|
|
|
if collection == "attributes":
|
|
if existing.get("value") != incoming.get("value"):
|
|
merged["value"] = winner.get("value")
|
|
conflicts.append(_conflict("attribute_value_conflict", [existing, incoming], "candidate attribute values differ"))
|
|
return merged, conflicts
|
|
|
|
existing_attrs = existing.get("attributes") if isinstance(existing.get("attributes"), dict) else {}
|
|
incoming_attrs = incoming.get("attributes") if isinstance(incoming.get("attributes"), dict) else {}
|
|
merged_attrs = dict(existing_attrs)
|
|
for key, value in incoming_attrs.items():
|
|
if key not in merged_attrs:
|
|
merged_attrs[key] = value
|
|
continue
|
|
if merged_attrs[key] == value:
|
|
continue
|
|
selected = winner.get("attributes", {}).get(key) if isinstance(winner.get("attributes"), dict) else value
|
|
merged_attrs[key] = selected
|
|
conflicts.append(_conflict("attribute_conflict", [existing, incoming], f"attribute {key!r} differs"))
|
|
if merged_attrs:
|
|
loser_attrs = loser.get("attributes") if isinstance(loser.get("attributes"), dict) else {}
|
|
for key, value in loser_attrs.items():
|
|
merged_attrs.setdefault(key, value)
|
|
merged["attributes"] = merged_attrs
|
|
return merged, conflicts
|
|
|
|
|
|
def _node_conflicts(by_key: dict[str, dict[str, Any]]) -> list[dict[str, object]]:
|
|
conflicts: list[dict[str, object]] = []
|
|
seen: dict[tuple[str, str], list[str]] = {}
|
|
for key, node in sorted(by_key.items()):
|
|
kind = str(node.get("kind") or "")
|
|
labels = _node_identity_labels(node)
|
|
for label in labels:
|
|
match_key = (normalize_identity_part(kind), normalize_identity_part(label))
|
|
for other in seen.get(match_key, []):
|
|
if other == key or _path_scoped_nodes_are_distinct(by_key.get(other), node):
|
|
continue
|
|
conflicts.append(
|
|
{
|
|
"type": "possible_duplicate_node",
|
|
"reason": "same kind with matching normalized label or alias",
|
|
"candidates": sorted([other, key]),
|
|
"match": {"kind": kind, "label": label},
|
|
}
|
|
)
|
|
seen.setdefault(match_key, []).append(key)
|
|
return _unique_conflicts(conflicts)
|
|
|
|
|
|
def _edge_conflicts(by_key: dict[str, dict[str, Any]]) -> list[dict[str, object]]:
|
|
conflicts: list[dict[str, object]] = []
|
|
seen: dict[tuple[str, str, str], str] = {}
|
|
for key, edge in sorted(by_key.items()):
|
|
edge_type = str(edge.get("edge_type") or "")
|
|
if edge_type in EVIDENCE_AGGREGATE_EDGE_TYPES:
|
|
continue
|
|
match_key = (
|
|
str(edge.get("source_key") or ""),
|
|
edge_type,
|
|
str(edge.get("target_key") or ""),
|
|
)
|
|
other = seen.get(match_key)
|
|
if other and other != key:
|
|
conflicts.append(
|
|
{
|
|
"type": "possible_duplicate_edge",
|
|
"reason": "same source, target, and edge type",
|
|
"candidates": sorted([other, key]),
|
|
"match": {
|
|
"source_key": match_key[0],
|
|
"edge_type": match_key[1],
|
|
"target_key": match_key[2],
|
|
},
|
|
}
|
|
)
|
|
seen.setdefault(match_key, key)
|
|
return _unique_conflicts(conflicts)
|
|
|
|
|
|
def _attribute_conflicts(by_key: dict[str, dict[str, Any]]) -> list[dict[str, object]]:
|
|
conflicts: list[dict[str, object]] = []
|
|
seen: dict[tuple[str, str], str] = {}
|
|
for key, attribute in sorted(by_key.items()):
|
|
match_key = (str(attribute.get("entity_key") or ""), str(attribute.get("name") or ""))
|
|
other = seen.get(match_key)
|
|
if other and other != key:
|
|
conflicts.append(
|
|
{
|
|
"type": "possible_duplicate_attribute",
|
|
"reason": "same entity and attribute name",
|
|
"candidates": sorted([other, key]),
|
|
"match": {"entity_key": match_key[0], "name": match_key[1]},
|
|
}
|
|
)
|
|
seen.setdefault(match_key, key)
|
|
return _unique_conflicts(conflicts)
|
|
|
|
|
|
def _append_tombstone(
|
|
snapshot: dict[str, Any],
|
|
*,
|
|
stable_key: str,
|
|
entity_kind: str,
|
|
replacement_scope: str,
|
|
retired_at: str,
|
|
previous_candidate: dict[str, Any],
|
|
) -> None:
|
|
tombstones = snapshot.setdefault("tombstones", [])
|
|
tombstones.append(
|
|
{
|
|
"stable_key": stable_key,
|
|
"entity_kind": entity_kind,
|
|
"replacement_scope": replacement_scope,
|
|
"retired_at": retired_at,
|
|
"reason": "source_missing",
|
|
"previous_candidate": previous_candidate,
|
|
}
|
|
)
|
|
|
|
|
|
def _candidate_map(snapshot: dict[str, Any], collection: str) -> dict[str, dict[str, Any]]:
|
|
candidates = snapshot.get("candidates") if isinstance(snapshot.get("candidates"), dict) else {}
|
|
items = candidates.get(collection) if isinstance(candidates.get(collection), list) else []
|
|
return {
|
|
str(item["stable_key"]): item
|
|
for item in items
|
|
if isinstance(item, dict) and item.get("stable_key")
|
|
}
|
|
|
|
|
|
def _scope_modes(snapshot: dict[str, Any]) -> dict[str, str]:
|
|
scopes = snapshot.get("replacement_scopes") if isinstance(snapshot.get("replacement_scopes"), list) else []
|
|
return {
|
|
str(scope.get("id")): str(scope.get("mode") or "")
|
|
for scope in scopes
|
|
if isinstance(scope, dict) and scope.get("id")
|
|
}
|
|
|
|
|
|
def _higher_precedence(left: dict[str, Any], right: dict[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]:
|
|
left_rank = ORIGIN_PRECEDENCE.get(str(left.get("origin") or ""), 99)
|
|
right_rank = ORIGIN_PRECEDENCE.get(str(right.get("origin") or ""), 99)
|
|
if right_rank < left_rank:
|
|
return right, left
|
|
return left, right
|
|
|
|
|
|
def _node_identity_labels(node: dict[str, Any]) -> list[str]:
|
|
return _unique_strings([
|
|
node.get("label") or "",
|
|
node.get("graph_id") or "",
|
|
*_as_list(node.get("aliases")),
|
|
])
|
|
|
|
|
|
def _path_scoped_nodes_are_distinct(left: dict[str, Any] | None, right: dict[str, Any]) -> bool:
|
|
if not left:
|
|
return False
|
|
left_kind = normalize_identity_part(str(left.get("kind") or ""))
|
|
right_kind = normalize_identity_part(str(right.get("kind") or ""))
|
|
if left_kind != right_kind or left_kind not in PATH_SCOPED_NODE_KINDS:
|
|
return False
|
|
left_identities = _source_anchor_identities(left)
|
|
right_identities = _source_anchor_identities(right)
|
|
if left_identities and right_identities:
|
|
return left_identities.isdisjoint(right_identities)
|
|
left_paths = _source_anchor_paths(left)
|
|
right_paths = _source_anchor_paths(right)
|
|
return bool(left_paths and right_paths and left_paths.isdisjoint(right_paths))
|
|
|
|
|
|
def _source_anchor_identities(candidate: dict[str, Any]) -> set[str]:
|
|
anchors = candidate.get("source_anchors") if isinstance(candidate.get("source_anchors"), list) else []
|
|
return {
|
|
str(anchor.get("fingerprint") or "")
|
|
for anchor in anchors
|
|
if isinstance(anchor, dict) and anchor.get("fingerprint")
|
|
}
|
|
|
|
|
|
def _source_anchor_paths(candidate: dict[str, Any]) -> set[str]:
|
|
anchors = candidate.get("source_anchors") if isinstance(candidate.get("source_anchors"), list) else []
|
|
return {
|
|
str(anchor.get("path") or "")
|
|
for anchor in anchors
|
|
if isinstance(anchor, dict) and anchor.get("path")
|
|
}
|
|
|
|
|
|
def _candidate_fingerprint(candidate: dict[str, Any]) -> str:
|
|
ignored = {"provenance"}
|
|
stable = {
|
|
key: value
|
|
for key, value in candidate.items()
|
|
if key not in ignored
|
|
}
|
|
return short_fingerprint(stable, length=20)
|
|
|
|
|
|
def _conflict(conflict_type: str, candidates: list[dict[str, Any]], reason: str) -> dict[str, object]:
|
|
return {
|
|
"type": conflict_type,
|
|
"reason": reason,
|
|
"candidates": sorted(str(candidate.get("stable_key") or "") for candidate in candidates),
|
|
}
|
|
|
|
|
|
def _unique_conflicts(conflicts: Iterable[dict[str, object]]) -> list[dict[str, object]]:
|
|
seen: set[str] = set()
|
|
result: list[dict[str, object]] = []
|
|
for conflict in conflicts:
|
|
key = json.dumps(conflict, sort_keys=True, default=str)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
result.append(conflict)
|
|
return result
|
|
|
|
|
|
def _dedupe_tombstones(tombstones: list[object]) -> list[dict[str, Any]]:
|
|
by_key: dict[str, dict[str, Any]] = {}
|
|
for tombstone in tombstones:
|
|
if not isinstance(tombstone, dict):
|
|
continue
|
|
key = ":".join(
|
|
[
|
|
str(tombstone.get("stable_key") or ""),
|
|
str(tombstone.get("replacement_scope") or ""),
|
|
str(tombstone.get("reason") or ""),
|
|
]
|
|
)
|
|
by_key[key] = tombstone
|
|
return [by_key[key] for key in sorted(by_key)]
|
|
|
|
|
|
def _entity_kind(collection: str) -> str:
|
|
return {
|
|
"nodes": "node",
|
|
"edges": "edge",
|
|
"attributes": "attribute",
|
|
}[collection]
|
|
|
|
|
|
def _as_list(value: object) -> list[object]:
|
|
return value if isinstance(value, list) else []
|
|
|
|
|
|
def _unique_strings(values: Iterable[object]) -> list[str]:
|
|
seen: set[str] = set()
|
|
result: list[str] = []
|
|
for value in values:
|
|
text = str(value or "").strip()
|
|
if not text or text in seen:
|
|
continue
|
|
seen.add(text)
|
|
result.append(text)
|
|
return result
|
|
|
|
|
|
def _unique_json(values: Iterable[object]) -> list[object]:
|
|
seen: set[str] = set()
|
|
result: list[object] = []
|
|
for value in values:
|
|
key = json.dumps(value, sort_keys=True, default=str)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
result.append(value)
|
|
return result
|
|
|
|
|
|
def _copy_json(value: Any) -> Any:
|
|
return json.loads(json.dumps(value, default=str))
|
|
|
|
|
|
def _utc_now() -> str:
|
|
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
|