generated from coulomb/repo-seed
Fix path-scoped duplicate detection
This commit is contained in:
@@ -16,6 +16,8 @@ ORIGIN_PRECEDENCE = {
|
||||
"manual": 5,
|
||||
}
|
||||
|
||||
PATH_SCOPED_NODE_KINDS = {"lockfile"}
|
||||
|
||||
|
||||
def reconcile_discovery_snapshots(
|
||||
previous: dict[str, Any] | None,
|
||||
@@ -159,14 +161,15 @@ def _merge_candidate(
|
||||
|
||||
def _node_conflicts(by_key: dict[str, dict[str, Any]]) -> list[dict[str, object]]:
|
||||
conflicts: list[dict[str, object]] = []
|
||||
seen: dict[tuple[str, str], str] = {}
|
||||
seen: dict[tuple[str, str], list[str]] = {}
|
||||
for key, node in sorted(by_key.items()):
|
||||
kind = str(node.get("kind") or "")
|
||||
labels = _node_identity_labels(node)
|
||||
for label in labels:
|
||||
match_key = (normalize_identity_part(kind), normalize_identity_part(label))
|
||||
other = seen.get(match_key)
|
||||
if other and other != key:
|
||||
for other in seen.get(match_key, []):
|
||||
if other == key or _path_scoped_nodes_are_distinct(by_key.get(other), node):
|
||||
continue
|
||||
conflicts.append(
|
||||
{
|
||||
"type": "possible_duplicate_node",
|
||||
@@ -175,7 +178,7 @@ def _node_conflicts(by_key: dict[str, dict[str, Any]]) -> list[dict[str, object]
|
||||
"match": {"kind": kind, "label": label},
|
||||
}
|
||||
)
|
||||
seen.setdefault(match_key, key)
|
||||
seen.setdefault(match_key, []).append(key)
|
||||
return _unique_conflicts(conflicts)
|
||||
|
||||
|
||||
@@ -282,6 +285,27 @@ def _node_identity_labels(node: dict[str, Any]) -> list[str]:
|
||||
])
|
||||
|
||||
|
||||
def _path_scoped_nodes_are_distinct(left: dict[str, Any] | None, right: dict[str, Any]) -> bool:
|
||||
if not left:
|
||||
return False
|
||||
left_kind = normalize_identity_part(str(left.get("kind") or ""))
|
||||
right_kind = normalize_identity_part(str(right.get("kind") or ""))
|
||||
if left_kind != right_kind or left_kind not in PATH_SCOPED_NODE_KINDS:
|
||||
return False
|
||||
left_paths = _source_anchor_paths(left)
|
||||
right_paths = _source_anchor_paths(right)
|
||||
return bool(left_paths and right_paths and left_paths.isdisjoint(right_paths))
|
||||
|
||||
|
||||
def _source_anchor_paths(candidate: dict[str, Any]) -> set[str]:
|
||||
anchors = candidate.get("source_anchors") if isinstance(candidate.get("source_anchors"), list) else []
|
||||
return {
|
||||
str(anchor.get("path") or "")
|
||||
for anchor in anchors
|
||||
if isinstance(anchor, dict) and anchor.get("path")
|
||||
}
|
||||
|
||||
|
||||
def _candidate_fingerprint(candidate: dict[str, Any]) -> str:
|
||||
ignored = {"provenance"}
|
||||
stable = {
|
||||
|
||||
Reference in New Issue
Block a user