Files
railiance-fabric/railiance_fabric/reconciliation.py

441 lines
16 KiB
Python

from __future__ import annotations
import json
from datetime import datetime, timezone
from typing import Any, Iterable
from .discovery import normalize_identity_part, short_fingerprint
ORIGIN_PRECEDENCE = {
"repo_declaration": 0,
"deterministic": 1,
"catalog": 2,
"registry": 3,
"llm": 4,
"manual": 5,
}
PATH_SCOPED_NODE_KINDS = {
"container-build",
"deployment-service",
"domain-name",
"kubernetes-config-map",
"kubernetes-cron-job",
"kubernetes-daemon-set",
"kubernetes-deployment",
"kubernetes-horizontal-pod-autoscaler",
"kubernetes-ingress",
"kubernetes-job",
"kubernetes-namespace",
"kubernetes-secret",
"kubernetes-service",
"kubernetes-stateful-set",
"lockfile",
"network-port",
"score-workload",
"service-config",
}
EVIDENCE_AGGREGATE_EDGE_TYPES = {
"exposes_port",
"listens_on",
"names_endpoint",
"opens_port",
"resolves_to",
"routes_to_port",
"routes_to_service",
}
def reconcile_discovery_snapshots(
previous: dict[str, Any] | None,
current: dict[str, Any],
*,
retired_at: str | None = None,
) -> dict[str, Any]:
"""Merge, deduplicate, diff, and tombstone discovery candidates."""
result = _copy_json(current)
retired_at = retired_at or _utc_now()
previous = previous or {}
conflicts: list[dict[str, object]] = []
diff = {"added": set(), "changed": set(), "retired": set(), "conflicted": set()}
replacement_scopes = _scope_modes(result)
result_candidates = result.setdefault("candidates", {"nodes": [], "edges": [], "attributes": []})
for collection in ("nodes", "edges", "attributes"):
previous_map = _candidate_map(previous, collection)
current_items = [
item for item in result_candidates.get(collection, [])
if isinstance(item, dict) and item.get("stable_key")
]
deduped, collection_conflicts = _dedupe_collection(collection, current_items)
conflicts.extend(collection_conflicts)
for conflict in collection_conflicts:
for key in conflict.get("candidates", []):
diff["conflicted"].add(str(key))
result_candidates[collection] = [deduped[key] for key in sorted(deduped)]
current_keys = set(deduped)
previous_keys = set(previous_map)
diff["added"].update(current_keys - previous_keys)
for key in current_keys & previous_keys:
if _candidate_fingerprint(deduped[key]) != _candidate_fingerprint(previous_map[key]):
diff["changed"].add(key)
for key in previous_keys - current_keys:
previous_candidate = previous_map[key]
scope_id = str(previous_candidate.get("replacement_scope") or "")
if replacement_scopes.get(scope_id) != "replacement":
continue
diff["retired"].add(key)
_append_tombstone(
result,
stable_key=key,
entity_kind=_entity_kind(collection),
replacement_scope=scope_id,
retired_at=retired_at,
previous_candidate=previous_candidate,
)
existing_conflicts = result.setdefault("reconciliation", {}).get("conflicts", [])
if not isinstance(existing_conflicts, list):
existing_conflicts = []
result["reconciliation"]["conflicts"] = [*existing_conflicts, *conflicts]
result["reconciliation"]["diff"] = {
key: sorted(values)
for key, values in diff.items()
}
result["tombstones"] = _dedupe_tombstones([
*list(previous.get("tombstones", []) if isinstance(previous.get("tombstones"), list) else []),
*list(result.get("tombstones", []) if isinstance(result.get("tombstones"), list) else []),
])
return result
def _dedupe_collection(
collection: str,
items: list[dict[str, Any]],
) -> tuple[dict[str, dict[str, Any]], list[dict[str, object]]]:
by_key: dict[str, dict[str, Any]] = {}
conflicts: list[dict[str, object]] = []
for item in items:
key = str(item["stable_key"])
by_key[key], merge_conflicts = _merge_candidate(collection, by_key.get(key), item)
conflicts.extend(merge_conflicts)
if collection == "nodes":
conflicts.extend(_node_conflicts(by_key))
elif collection == "edges":
conflicts.extend(_edge_conflicts(by_key))
elif collection == "attributes":
conflicts.extend(_attribute_conflicts(by_key))
conflicted_keys = {
str(key)
for conflict in conflicts
for key in conflict.get("candidates", [])
}
for key in conflicted_keys:
candidate = by_key.get(key)
if candidate is not None:
candidate["status"] = "conflicted"
candidate["review_state"] = "needs_review"
return by_key, conflicts
def _merge_candidate(
collection: str,
existing: dict[str, Any] | None,
incoming: dict[str, Any],
) -> tuple[dict[str, Any], list[dict[str, object]]]:
if existing is None:
return _copy_json(incoming), []
winner, loser = _higher_precedence(existing, incoming)
merged = _copy_json(winner)
conflicts: list[dict[str, object]] = []
for field in ("aliases", "provenance", "source_anchors"):
values = [*list(existing.get(field, [])), *list(incoming.get(field, []))]
if values:
merged[field] = _unique_json(values) if field != "aliases" else _unique_strings(values)
if isinstance(existing.get("confidence"), (int, float)) and isinstance(incoming.get("confidence"), (int, float)):
merged["confidence"] = max(float(existing["confidence"]), float(incoming["confidence"]))
if collection == "attributes":
if existing.get("value") != incoming.get("value"):
merged["value"] = winner.get("value")
conflicts.append(_conflict("attribute_value_conflict", [existing, incoming], "candidate attribute values differ"))
return merged, conflicts
existing_attrs = existing.get("attributes") if isinstance(existing.get("attributes"), dict) else {}
incoming_attrs = incoming.get("attributes") if isinstance(incoming.get("attributes"), dict) else {}
merged_attrs = dict(existing_attrs)
for key, value in incoming_attrs.items():
if key not in merged_attrs:
merged_attrs[key] = value
continue
if merged_attrs[key] == value:
continue
selected = winner.get("attributes", {}).get(key) if isinstance(winner.get("attributes"), dict) else value
merged_attrs[key] = selected
conflicts.append(_conflict("attribute_conflict", [existing, incoming], f"attribute {key!r} differs"))
if merged_attrs:
loser_attrs = loser.get("attributes") if isinstance(loser.get("attributes"), dict) else {}
for key, value in loser_attrs.items():
merged_attrs.setdefault(key, value)
merged["attributes"] = merged_attrs
return merged, conflicts
def _node_conflicts(by_key: dict[str, dict[str, Any]]) -> list[dict[str, object]]:
conflicts: list[dict[str, object]] = []
seen: dict[tuple[str, str], list[str]] = {}
for key, node in sorted(by_key.items()):
kind = str(node.get("kind") or "")
labels = _node_identity_labels(node)
for label in labels:
match_key = (normalize_identity_part(kind), normalize_identity_part(label))
for other in seen.get(match_key, []):
if other == key or _path_scoped_nodes_are_distinct(by_key.get(other), node):
continue
conflicts.append(
{
"type": "possible_duplicate_node",
"reason": "same kind with matching normalized label or alias",
"candidates": sorted([other, key]),
"match": {"kind": kind, "label": label},
}
)
seen.setdefault(match_key, []).append(key)
return _unique_conflicts(conflicts)
def _edge_conflicts(by_key: dict[str, dict[str, Any]]) -> list[dict[str, object]]:
conflicts: list[dict[str, object]] = []
seen: dict[tuple[str, str, str], str] = {}
for key, edge in sorted(by_key.items()):
edge_type = str(edge.get("edge_type") or "")
if edge_type in EVIDENCE_AGGREGATE_EDGE_TYPES:
continue
match_key = (
str(edge.get("source_key") or ""),
edge_type,
str(edge.get("target_key") or ""),
)
other = seen.get(match_key)
if other and other != key:
conflicts.append(
{
"type": "possible_duplicate_edge",
"reason": "same source, target, and edge type",
"candidates": sorted([other, key]),
"match": {
"source_key": match_key[0],
"edge_type": match_key[1],
"target_key": match_key[2],
},
}
)
seen.setdefault(match_key, key)
return _unique_conflicts(conflicts)
def _attribute_conflicts(by_key: dict[str, dict[str, Any]]) -> list[dict[str, object]]:
conflicts: list[dict[str, object]] = []
seen: dict[tuple[str, str], str] = {}
for key, attribute in sorted(by_key.items()):
match_key = (str(attribute.get("entity_key") or ""), str(attribute.get("name") or ""))
other = seen.get(match_key)
if other and other != key:
conflicts.append(
{
"type": "possible_duplicate_attribute",
"reason": "same entity and attribute name",
"candidates": sorted([other, key]),
"match": {"entity_key": match_key[0], "name": match_key[1]},
}
)
seen.setdefault(match_key, key)
return _unique_conflicts(conflicts)
def _append_tombstone(
snapshot: dict[str, Any],
*,
stable_key: str,
entity_kind: str,
replacement_scope: str,
retired_at: str,
previous_candidate: dict[str, Any],
) -> None:
tombstones = snapshot.setdefault("tombstones", [])
tombstones.append(
{
"stable_key": stable_key,
"entity_kind": entity_kind,
"replacement_scope": replacement_scope,
"retired_at": retired_at,
"reason": "source_missing",
"previous_candidate": previous_candidate,
}
)
def _candidate_map(snapshot: dict[str, Any], collection: str) -> dict[str, dict[str, Any]]:
candidates = snapshot.get("candidates") if isinstance(snapshot.get("candidates"), dict) else {}
items = candidates.get(collection) if isinstance(candidates.get(collection), list) else []
return {
str(item["stable_key"]): item
for item in items
if isinstance(item, dict) and item.get("stable_key")
}
def _scope_modes(snapshot: dict[str, Any]) -> dict[str, str]:
scopes = snapshot.get("replacement_scopes") if isinstance(snapshot.get("replacement_scopes"), list) else []
return {
str(scope.get("id")): str(scope.get("mode") or "")
for scope in scopes
if isinstance(scope, dict) and scope.get("id")
}
def _higher_precedence(left: dict[str, Any], right: dict[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]:
left_rank = ORIGIN_PRECEDENCE.get(str(left.get("origin") or ""), 99)
right_rank = ORIGIN_PRECEDENCE.get(str(right.get("origin") or ""), 99)
if right_rank < left_rank:
return right, left
return left, right
def _node_identity_labels(node: dict[str, Any]) -> list[str]:
return _unique_strings([
node.get("label") or "",
node.get("graph_id") or "",
*_as_list(node.get("aliases")),
])
def _path_scoped_nodes_are_distinct(left: dict[str, Any] | None, right: dict[str, Any]) -> bool:
if not left:
return False
left_kind = normalize_identity_part(str(left.get("kind") or ""))
right_kind = normalize_identity_part(str(right.get("kind") or ""))
if left_kind != right_kind or left_kind not in PATH_SCOPED_NODE_KINDS:
return False
left_identities = _source_anchor_identities(left)
right_identities = _source_anchor_identities(right)
if left_identities and right_identities:
return left_identities.isdisjoint(right_identities)
left_paths = _source_anchor_paths(left)
right_paths = _source_anchor_paths(right)
return bool(left_paths and right_paths and left_paths.isdisjoint(right_paths))
def _source_anchor_identities(candidate: dict[str, Any]) -> set[str]:
anchors = candidate.get("source_anchors") if isinstance(candidate.get("source_anchors"), list) else []
return {
str(anchor.get("fingerprint") or "")
for anchor in anchors
if isinstance(anchor, dict) and anchor.get("fingerprint")
}
def _source_anchor_paths(candidate: dict[str, Any]) -> set[str]:
anchors = candidate.get("source_anchors") if isinstance(candidate.get("source_anchors"), list) else []
return {
str(anchor.get("path") or "")
for anchor in anchors
if isinstance(anchor, dict) and anchor.get("path")
}
def _candidate_fingerprint(candidate: dict[str, Any]) -> str:
ignored = {"provenance"}
stable = {
key: value
for key, value in candidate.items()
if key not in ignored
}
return short_fingerprint(stable, length=20)
def _conflict(conflict_type: str, candidates: list[dict[str, Any]], reason: str) -> dict[str, object]:
return {
"type": conflict_type,
"reason": reason,
"candidates": sorted(str(candidate.get("stable_key") or "") for candidate in candidates),
}
def _unique_conflicts(conflicts: Iterable[dict[str, object]]) -> list[dict[str, object]]:
seen: set[str] = set()
result: list[dict[str, object]] = []
for conflict in conflicts:
key = json.dumps(conflict, sort_keys=True, default=str)
if key in seen:
continue
seen.add(key)
result.append(conflict)
return result
def _dedupe_tombstones(tombstones: list[object]) -> list[dict[str, Any]]:
by_key: dict[str, dict[str, Any]] = {}
for tombstone in tombstones:
if not isinstance(tombstone, dict):
continue
key = ":".join(
[
str(tombstone.get("stable_key") or ""),
str(tombstone.get("replacement_scope") or ""),
str(tombstone.get("reason") or ""),
]
)
by_key[key] = tombstone
return [by_key[key] for key in sorted(by_key)]
def _entity_kind(collection: str) -> str:
return {
"nodes": "node",
"edges": "edge",
"attributes": "attribute",
}[collection]
def _as_list(value: object) -> list[object]:
return value if isinstance(value, list) else []
def _unique_strings(values: Iterable[object]) -> list[str]:
seen: set[str] = set()
result: list[str] = []
for value in values:
text = str(value or "").strip()
if not text or text in seen:
continue
seen.add(text)
result.append(text)
return result
def _unique_json(values: Iterable[object]) -> list[object]:
seen: set[str] = set()
result: list[object] = []
for value in values:
key = json.dumps(value, sort_keys=True, default=str)
if key in seen:
continue
seen.add(key)
result.append(value)
return result
def _copy_json(value: Any) -> Any:
return json.loads(json.dumps(value, default=str))
def _utc_now() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")