Add discovery reconciliation engine

This commit is contained in:
2026-05-19 04:49:08 +02:00
parent 73f7cdbdb5
commit 17356a41d6
6 changed files with 675 additions and 1 deletions

View File

@@ -91,6 +91,46 @@ candidates. Unresolved edge endpoints or attribute targets also become review
artifacts. Accepted graph data still requires deterministic evidence,
repo-owned declarations, or a later human review/acceptance path.
## Reconciliation And Dry-Run Diffs
Scans can be reconciled against a previous discovery snapshot:
```bash
railiance-fabric scan . \
--repo-slug railiance-fabric \
--previous-snapshot previous-discovery.json \
--dry-run \
--output current-discovery.json
```
The reconciler writes `reconciliation.diff` with explicit stable-key sets:
- `added`
- `changed`
- `retired`
- `conflicted`
It deduplicates candidates by stable key, merges source anchors and provenance,
and applies source-aware precedence when duplicate candidates disagree. The
current precedence is:
1. `repo_declaration`
2. `deterministic`
3. `catalog`
4. `registry`
5. `llm`
6. `manual`
Possible duplicates found through matching aliases, normalized labels,
relationship endpoints, or attribute targets are not silently merged. They are
marked `status: conflicted`, moved to `review_state: needs_review`, and listed
under `reconciliation.conflicts`.
Missing previous candidates become tombstones only when their replacement scope
is present in the current scan and has `mode: replacement`. Missing candidates
from additive scopes, such as broad LLM evidence bundles, are left alone.
Existing tombstones are preserved so repeated scans can explain graph drift.
## Identity
Identity is the main safety boundary. The scanner must not append guesses on

View File

@@ -14,6 +14,7 @@ from .loader import declaration_files, load_yaml
from .graph import FabricGraph, build_graph
from .graph_explorer import fabric_graph_explorer_payload
from .llm_extraction import LLMExtractionConfig
from .reconciliation import reconcile_discovery_snapshots
from .scanner import ScanOptions, scan_repo
from .validation import validate_roots
@@ -73,6 +74,7 @@ def build_parser() -> argparse.ArgumentParser:
scan.add_argument("--profile", default="deterministic")
scan.add_argument("--dry-run", action="store_true", help="Do not write anywhere except an explicit --output file.")
scan.add_argument("--output", type=Path, default=None, help="Write the discovery snapshot JSON to a file.")
scan.add_argument("--previous-snapshot", type=Path, default=None, help="Reconcile against a previous discovery snapshot JSON.")
scan.add_argument("--json", action="store_true", help="Print the discovery snapshot JSON to stdout.")
scan.add_argument("--llm", action="store_true", help="Enable llm-connect assisted extraction.")
scan.add_argument("--llm-provider", default="mock", help="llm-connect provider name.")
@@ -410,6 +412,16 @@ def _scan_repo(args: argparse.Namespace) -> int:
),
)
)
if args.previous_snapshot:
try:
previous = json.loads(args.previous_snapshot.read_text(encoding="utf-8"))
except Exception as exc:
print(f"ERROR {args.previous_snapshot}: cannot read previous snapshot: {exc}", file=sys.stderr)
return 1
if not isinstance(previous, dict):
print(f"ERROR {args.previous_snapshot}: previous snapshot must be a JSON object", file=sys.stderr)
return 1
snapshot = reconcile_discovery_snapshots(previous, snapshot)
payload = json.dumps(snapshot, indent=2, sort_keys=True)
if args.output:
args.output.parent.mkdir(parents=True, exist_ok=True)
@@ -421,6 +433,15 @@ def _scan_repo(args: argparse.Namespace) -> int:
candidates = snapshot["candidates"]
review_count = len(snapshot.get("review_artifacts", []))
review_summary = f", {review_count} review artifact(s)" if review_count else ""
diff = snapshot.get("reconciliation", {}).get("diff", {})
diff_summary = ""
if isinstance(diff, dict):
diff_summary = (
f", diff +{len(diff.get('added', []))}"
f"/~{len(diff.get('changed', []))}"
f"/-{len(diff.get('retired', []))}"
f"/!{len(diff.get('conflicted', []))}"
)
mode = "dry-run " if args.dry_run else ""
print(
f"{mode}scan {snapshot['source']['repo_slug']} "
@@ -430,6 +451,7 @@ def _scan_repo(args: argparse.Namespace) -> int:
f"{len(candidates['attributes'])} attribute(s), "
f"{len(snapshot['replacement_scopes'])} replacement scope(s)"
f"{review_summary}"
f"{diff_summary}"
)
if args.output:
print(f"wrote {args.output}")

View File

@@ -0,0 +1,372 @@
from __future__ import annotations
import json
from datetime import datetime, timezone
from typing import Any, Iterable
from .discovery import normalize_identity_part, short_fingerprint
ORIGIN_PRECEDENCE = {
"repo_declaration": 0,
"deterministic": 1,
"catalog": 2,
"registry": 3,
"llm": 4,
"manual": 5,
}
def reconcile_discovery_snapshots(
previous: dict[str, Any] | None,
current: dict[str, Any],
*,
retired_at: str | None = None,
) -> dict[str, Any]:
"""Merge, deduplicate, diff, and tombstone discovery candidates."""
result = _copy_json(current)
retired_at = retired_at or _utc_now()
previous = previous or {}
conflicts: list[dict[str, object]] = []
diff = {"added": set(), "changed": set(), "retired": set(), "conflicted": set()}
replacement_scopes = _scope_modes(result)
result_candidates = result.setdefault("candidates", {"nodes": [], "edges": [], "attributes": []})
for collection in ("nodes", "edges", "attributes"):
previous_map = _candidate_map(previous, collection)
current_items = [
item for item in result_candidates.get(collection, [])
if isinstance(item, dict) and item.get("stable_key")
]
deduped, collection_conflicts = _dedupe_collection(collection, current_items)
conflicts.extend(collection_conflicts)
for conflict in collection_conflicts:
for key in conflict.get("candidates", []):
diff["conflicted"].add(str(key))
result_candidates[collection] = [deduped[key] for key in sorted(deduped)]
current_keys = set(deduped)
previous_keys = set(previous_map)
diff["added"].update(current_keys - previous_keys)
for key in current_keys & previous_keys:
if _candidate_fingerprint(deduped[key]) != _candidate_fingerprint(previous_map[key]):
diff["changed"].add(key)
for key in previous_keys - current_keys:
previous_candidate = previous_map[key]
scope_id = str(previous_candidate.get("replacement_scope") or "")
if replacement_scopes.get(scope_id) != "replacement":
continue
diff["retired"].add(key)
_append_tombstone(
result,
stable_key=key,
entity_kind=_entity_kind(collection),
replacement_scope=scope_id,
retired_at=retired_at,
previous_candidate=previous_candidate,
)
existing_conflicts = result.setdefault("reconciliation", {}).get("conflicts", [])
if not isinstance(existing_conflicts, list):
existing_conflicts = []
result["reconciliation"]["conflicts"] = [*existing_conflicts, *conflicts]
result["reconciliation"]["diff"] = {
key: sorted(values)
for key, values in diff.items()
}
result["tombstones"] = _dedupe_tombstones([
*list(previous.get("tombstones", []) if isinstance(previous.get("tombstones"), list) else []),
*list(result.get("tombstones", []) if isinstance(result.get("tombstones"), list) else []),
])
return result
def _dedupe_collection(
collection: str,
items: list[dict[str, Any]],
) -> tuple[dict[str, dict[str, Any]], list[dict[str, object]]]:
by_key: dict[str, dict[str, Any]] = {}
conflicts: list[dict[str, object]] = []
for item in items:
key = str(item["stable_key"])
by_key[key], merge_conflicts = _merge_candidate(collection, by_key.get(key), item)
conflicts.extend(merge_conflicts)
if collection == "nodes":
conflicts.extend(_node_conflicts(by_key))
elif collection == "edges":
conflicts.extend(_edge_conflicts(by_key))
elif collection == "attributes":
conflicts.extend(_attribute_conflicts(by_key))
conflicted_keys = {
str(key)
for conflict in conflicts
for key in conflict.get("candidates", [])
}
for key in conflicted_keys:
candidate = by_key.get(key)
if candidate is not None:
candidate["status"] = "conflicted"
candidate["review_state"] = "needs_review"
return by_key, conflicts
def _merge_candidate(
collection: str,
existing: dict[str, Any] | None,
incoming: dict[str, Any],
) -> tuple[dict[str, Any], list[dict[str, object]]]:
if existing is None:
return _copy_json(incoming), []
winner, loser = _higher_precedence(existing, incoming)
merged = _copy_json(winner)
conflicts: list[dict[str, object]] = []
for field in ("aliases", "provenance", "source_anchors"):
values = [*list(existing.get(field, [])), *list(incoming.get(field, []))]
if values:
merged[field] = _unique_json(values) if field != "aliases" else _unique_strings(values)
if isinstance(existing.get("confidence"), (int, float)) and isinstance(incoming.get("confidence"), (int, float)):
merged["confidence"] = max(float(existing["confidence"]), float(incoming["confidence"]))
if collection == "attributes":
if existing.get("value") != incoming.get("value"):
merged["value"] = winner.get("value")
conflicts.append(_conflict("attribute_value_conflict", [existing, incoming], "candidate attribute values differ"))
return merged, conflicts
existing_attrs = existing.get("attributes") if isinstance(existing.get("attributes"), dict) else {}
incoming_attrs = incoming.get("attributes") if isinstance(incoming.get("attributes"), dict) else {}
merged_attrs = dict(existing_attrs)
for key, value in incoming_attrs.items():
if key not in merged_attrs:
merged_attrs[key] = value
continue
if merged_attrs[key] == value:
continue
selected = winner.get("attributes", {}).get(key) if isinstance(winner.get("attributes"), dict) else value
merged_attrs[key] = selected
conflicts.append(_conflict("attribute_conflict", [existing, incoming], f"attribute {key!r} differs"))
if merged_attrs:
loser_attrs = loser.get("attributes") if isinstance(loser.get("attributes"), dict) else {}
for key, value in loser_attrs.items():
merged_attrs.setdefault(key, value)
merged["attributes"] = merged_attrs
return merged, conflicts
def _node_conflicts(by_key: dict[str, dict[str, Any]]) -> list[dict[str, object]]:
conflicts: list[dict[str, object]] = []
seen: dict[tuple[str, str], str] = {}
for key, node in sorted(by_key.items()):
kind = str(node.get("kind") or "")
labels = _node_identity_labels(node)
for label in labels:
match_key = (normalize_identity_part(kind), normalize_identity_part(label))
other = seen.get(match_key)
if other and other != key:
conflicts.append(
{
"type": "possible_duplicate_node",
"reason": "same kind with matching normalized label or alias",
"candidates": sorted([other, key]),
"match": {"kind": kind, "label": label},
}
)
seen.setdefault(match_key, key)
return _unique_conflicts(conflicts)
def _edge_conflicts(by_key: dict[str, dict[str, Any]]) -> list[dict[str, object]]:
conflicts: list[dict[str, object]] = []
seen: dict[tuple[str, str, str], str] = {}
for key, edge in sorted(by_key.items()):
match_key = (
str(edge.get("source_key") or ""),
str(edge.get("edge_type") or ""),
str(edge.get("target_key") or ""),
)
other = seen.get(match_key)
if other and other != key:
conflicts.append(
{
"type": "possible_duplicate_edge",
"reason": "same source, target, and edge type",
"candidates": sorted([other, key]),
"match": {
"source_key": match_key[0],
"edge_type": match_key[1],
"target_key": match_key[2],
},
}
)
seen.setdefault(match_key, key)
return _unique_conflicts(conflicts)
def _attribute_conflicts(by_key: dict[str, dict[str, Any]]) -> list[dict[str, object]]:
conflicts: list[dict[str, object]] = []
seen: dict[tuple[str, str], str] = {}
for key, attribute in sorted(by_key.items()):
match_key = (str(attribute.get("entity_key") or ""), str(attribute.get("name") or ""))
other = seen.get(match_key)
if other and other != key:
conflicts.append(
{
"type": "possible_duplicate_attribute",
"reason": "same entity and attribute name",
"candidates": sorted([other, key]),
"match": {"entity_key": match_key[0], "name": match_key[1]},
}
)
seen.setdefault(match_key, key)
return _unique_conflicts(conflicts)
def _append_tombstone(
snapshot: dict[str, Any],
*,
stable_key: str,
entity_kind: str,
replacement_scope: str,
retired_at: str,
previous_candidate: dict[str, Any],
) -> None:
tombstones = snapshot.setdefault("tombstones", [])
tombstones.append(
{
"stable_key": stable_key,
"entity_kind": entity_kind,
"replacement_scope": replacement_scope,
"retired_at": retired_at,
"reason": "source_missing",
"previous_candidate": previous_candidate,
}
)
def _candidate_map(snapshot: dict[str, Any], collection: str) -> dict[str, dict[str, Any]]:
candidates = snapshot.get("candidates") if isinstance(snapshot.get("candidates"), dict) else {}
items = candidates.get(collection) if isinstance(candidates.get(collection), list) else []
return {
str(item["stable_key"]): item
for item in items
if isinstance(item, dict) and item.get("stable_key")
}
def _scope_modes(snapshot: dict[str, Any]) -> dict[str, str]:
scopes = snapshot.get("replacement_scopes") if isinstance(snapshot.get("replacement_scopes"), list) else []
return {
str(scope.get("id")): str(scope.get("mode") or "")
for scope in scopes
if isinstance(scope, dict) and scope.get("id")
}
def _higher_precedence(left: dict[str, Any], right: dict[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]:
left_rank = ORIGIN_PRECEDENCE.get(str(left.get("origin") or ""), 99)
right_rank = ORIGIN_PRECEDENCE.get(str(right.get("origin") or ""), 99)
if right_rank < left_rank:
return right, left
return left, right
def _node_identity_labels(node: dict[str, Any]) -> list[str]:
return _unique_strings([
node.get("label") or "",
node.get("graph_id") or "",
*_as_list(node.get("aliases")),
])
def _candidate_fingerprint(candidate: dict[str, Any]) -> str:
ignored = {"provenance"}
stable = {
key: value
for key, value in candidate.items()
if key not in ignored
}
return short_fingerprint(stable, length=20)
def _conflict(conflict_type: str, candidates: list[dict[str, Any]], reason: str) -> dict[str, object]:
return {
"type": conflict_type,
"reason": reason,
"candidates": sorted(str(candidate.get("stable_key") or "") for candidate in candidates),
}
def _unique_conflicts(conflicts: Iterable[dict[str, object]]) -> list[dict[str, object]]:
seen: set[str] = set()
result: list[dict[str, object]] = []
for conflict in conflicts:
key = json.dumps(conflict, sort_keys=True, default=str)
if key in seen:
continue
seen.add(key)
result.append(conflict)
return result
def _dedupe_tombstones(tombstones: list[object]) -> list[dict[str, Any]]:
by_key: dict[str, dict[str, Any]] = {}
for tombstone in tombstones:
if not isinstance(tombstone, dict):
continue
key = ":".join(
[
str(tombstone.get("stable_key") or ""),
str(tombstone.get("replacement_scope") or ""),
str(tombstone.get("reason") or ""),
]
)
by_key[key] = tombstone
return [by_key[key] for key in sorted(by_key)]
def _entity_kind(collection: str) -> str:
return {
"nodes": "node",
"edges": "edge",
"attributes": "attribute",
}[collection]
def _as_list(value: object) -> list[object]:
return value if isinstance(value, list) else []
def _unique_strings(values: Iterable[object]) -> list[str]:
seen: set[str] = set()
result: list[str] = []
for value in values:
text = str(value or "").strip()
if not text or text in seen:
continue
seen.add(text)
result.append(text)
return result
def _unique_json(values: Iterable[object]) -> list[object]:
seen: set[str] = set()
result: list[object] = []
for value in values:
key = json.dumps(value, sort_keys=True, default=str)
if key in seen:
continue
seen.add(key)
result.append(value)
return result
def _copy_json(value: Any) -> Any:
return json.loads(json.dumps(value, default=str))
def _utc_now() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")

View File

@@ -126,6 +126,35 @@ properties:
items:
type: object
additionalProperties: true
diff:
type: object
additionalProperties: false
required:
- added
- changed
- retired
- conflicted
properties:
added:
type: array
uniqueItems: true
items:
$ref: "#/$defs/stableKey"
changed:
type: array
uniqueItems: true
items:
$ref: "#/$defs/stableKey"
retired:
type: array
uniqueItems: true
items:
$ref: "#/$defs/stableKey"
conflicted:
type: array
uniqueItems: true
items:
$ref: "#/$defs/stableKey"
$defs:
stableKey:

View File

@@ -0,0 +1,211 @@
from __future__ import annotations
import json
from pathlib import Path
from railiance_fabric.cli import main as cli_main
from railiance_fabric.discovery import discovery_stable_key, replacement_scope_id, source_fingerprint
from railiance_fabric.reconciliation import reconcile_discovery_snapshots
from railiance_fabric.scanner import ScanOptions, scan_repo
from railiance_fabric.schema_validation import draft202012_validator
def test_reconciliation_dedupes_diffs_and_tombstones_by_scope() -> None:
scope_replace = _scope("deterministic", "file", "README.md", "replacement")
scope_additive = _scope("llm-connect-repo-evidence", "llm", "bundle", "additive")
service_key = discovery_stable_key("fixture-repo", "ServiceDeclaration", "fixture.api")
old_key = discovery_stable_key("fixture-repo", "ServiceDeclaration", "old.api")
additive_old_key = discovery_stable_key("fixture-repo", "CapabilityDeclaration", "old-llm")
new_key = discovery_stable_key("fixture-repo", "CapabilityDeclaration", "fixture.ops")
duplicate_key = discovery_stable_key(
"fixture-repo",
"ServiceDeclaration",
"Fixture API",
source_anchor={"path": "README.md", "line_start": 5},
)
previous = _snapshot(
replacement_scopes=[scope_replace, scope_additive],
nodes=[
_node(service_key, "ServiceDeclaration", "Fixture API", scope_replace["id"], attributes={"owner": "old"}),
_node(old_key, "ServiceDeclaration", "Old API", scope_replace["id"]),
_node(additive_old_key, "CapabilityDeclaration", "Old LLM", scope_additive["id"], origin="llm"),
],
tombstones=[
{
"stable_key": discovery_stable_key("fixture-repo", "ServiceDeclaration", "Ancient API"),
"entity_kind": "node",
"replacement_scope": scope_replace["id"],
"retired_at": "2026-05-18T00:00:00Z",
"reason": "source_missing",
}
],
)
current = _snapshot(
replacement_scopes=[scope_replace, scope_additive],
nodes=[
_node(service_key, "ServiceDeclaration", "Fixture API", scope_replace["id"], origin="llm", attributes={"owner": "llm"}),
_node(
service_key,
"ServiceDeclaration",
"Fixture API",
scope_replace["id"],
origin="repo_declaration",
review_state="accepted",
attributes={"owner": "declared"},
),
_node(new_key, "CapabilityDeclaration", "Fixture Ops", scope_replace["id"]),
_node(duplicate_key, "ServiceDeclaration", "Fixture API", scope_replace["id"], aliases=["fixture.api.copy"]),
],
)
reconciled = reconcile_discovery_snapshots(previous, current, retired_at="2026-05-19T00:00:00Z")
_validate_schema("discovery-snapshot.schema.yaml", reconciled)
nodes = {node["stable_key"]: node for node in reconciled["candidates"]["nodes"]}
assert nodes[service_key]["origin"] == "repo_declaration"
assert nodes[service_key]["attributes"]["owner"] == "declared"
assert nodes[service_key]["review_state"] == "needs_review"
assert nodes[duplicate_key]["status"] == "conflicted"
diff = reconciled["reconciliation"]["diff"]
assert new_key in diff["added"]
assert duplicate_key in diff["added"]
assert service_key in diff["changed"]
assert old_key in diff["retired"]
assert additive_old_key not in diff["retired"]
assert {service_key, duplicate_key} <= set(diff["conflicted"])
assert any(conflict["type"] == "possible_duplicate_node" for conflict in reconciled["reconciliation"]["conflicts"])
assert any(tombstone["stable_key"] == old_key for tombstone in reconciled["tombstones"])
assert any(tombstone["stable_key"].endswith("ancient-api") for tombstone in reconciled["tombstones"])
def test_scan_cli_reconciles_against_previous_snapshot(tmp_path: Path, capsys) -> None:
repo = tmp_path / "fixture-repo"
repo.mkdir()
(repo / "README.md").write_text("# Fixture Repo\n", encoding="utf-8")
previous = scan_repo(
ScanOptions(
repo_path=repo,
repo_slug="fixture-repo",
repo_name="Fixture Repo",
commit="old",
)
)
scope_id = next(scope["id"] for scope in previous["replacement_scopes"] if scope["extractor_id"] == "repo-metadata")
vanished_key = discovery_stable_key("fixture-repo", "ServiceDeclaration", "Vanished API")
previous["candidates"]["nodes"].append(
_node(vanished_key, "ServiceDeclaration", "Vanished API", scope_id)
)
previous_path = tmp_path / "previous.json"
output_path = tmp_path / "current.json"
previous_path.write_text(json.dumps(previous), encoding="utf-8")
assert cli_main(
[
"scan",
str(repo),
"--repo-slug",
"fixture-repo",
"--repo-name",
"Fixture Repo",
"--commit",
"new",
"--previous-snapshot",
str(previous_path),
"--output",
str(output_path),
]
) == 0
summary = capsys.readouterr().out
assert "diff +" in summary
payload = json.loads(output_path.read_text(encoding="utf-8"))
_validate_schema("discovery-snapshot.schema.yaml", payload)
assert vanished_key in payload["reconciliation"]["diff"]["retired"]
assert any(tombstone["stable_key"] == vanished_key for tombstone in payload["tombstones"])
def _snapshot(
*,
replacement_scopes: list[dict[str, object]],
nodes: list[dict[str, object]],
tombstones: list[dict[str, object]] | None = None,
) -> dict[str, object]:
return {
"apiVersion": "railiance.fabric/v1alpha1",
"kind": "FabricDiscoverySnapshot",
"generated_at": "2026-05-19T00:00:00Z",
"source": {"repo_slug": "fixture-repo", "repo_name": "Fixture Repo", "commit": "abc123"},
"scan": {
"run_id": "scan:fixture-repo:deterministic:abc123",
"profile": "deterministic",
"deterministic_only": True,
"llm_enabled": False,
},
"replacement_scopes": replacement_scopes,
"candidates": {"nodes": nodes, "edges": [], "attributes": []},
"tombstones": tombstones or [],
"reconciliation": {
"precedence": ["repo_declaration", "deterministic", "catalog", "registry", "llm", "manual"],
"duplicate_policy": "stable-key matches merge automatically",
"retirement_policy": "missing candidates retire only inside their replacement scope",
},
}
def _scope(extractor_id: str, source_kind: str, source_path: str, mode: str) -> dict[str, object]:
return {
"id": replacement_scope_id("fixture-repo", extractor_id, source_kind, source_path=source_path),
"extractor_id": extractor_id,
"source_kind": source_kind,
"source_path": source_path,
"mode": mode,
}
def _node(
stable_key: str,
kind: str,
label: str,
replacement_scope: str,
*,
origin: str = "deterministic",
review_state: str = "candidate",
aliases: list[str] | None = None,
attributes: dict[str, object] | None = None,
) -> dict[str, object]:
anchor = _anchor("file", "README.md")
return {
"stable_key": stable_key,
"kind": kind,
"label": label,
"repo": "fixture-repo",
"aliases": aliases or [label],
"attributes": attributes or {},
"origin": origin,
"review_state": review_state,
"status": "active",
"confidence": 0.9,
"replacement_scope": replacement_scope,
"provenance": [
{
"extractor_id": "fixture",
"extractor_version": "0.1.0",
"method": "declaration" if origin == "repo_declaration" else "deterministic" if origin == "deterministic" else "llm",
"origin": origin,
}
],
"source_anchors": [anchor],
}
def _anchor(source_kind: str, path: str) -> dict[str, object]:
anchor = {"source_kind": source_kind, "path": path}
anchor["fingerprint"] = source_fingerprint(anchor)
return anchor
def _validate_schema(schema_name: str, payload: dict[str, object]) -> None:
validator = draft202012_validator(Path("schemas") / schema_name)
validator.validate(payload)

View File

@@ -199,7 +199,7 @@ Acceptance notes:
```task
id: RAIL-FAB-WP-0010-T04
status: todo
status: done
priority: high
state_hub_task_id: "f0844595-23e0-4e7a-bfd9-e0526b8f85b9"
```