from __future__ import annotations import json from pathlib import Path from railiance_fabric.cli import main as cli_main from railiance_fabric.discovery import discovery_stable_key, replacement_scope_id, source_fingerprint from railiance_fabric.reconciliation import reconcile_discovery_snapshots from railiance_fabric.scanner import ScanOptions, scan_repo from railiance_fabric.schema_validation import draft202012_validator def test_reconciliation_dedupes_diffs_and_tombstones_by_scope() -> None: scope_replace = _scope("deterministic", "file", "README.md", "replacement") scope_additive = _scope("llm-connect-repo-evidence", "llm", "bundle", "additive") service_key = discovery_stable_key("fixture-repo", "ServiceDeclaration", "fixture.api") old_key = discovery_stable_key("fixture-repo", "ServiceDeclaration", "old.api") additive_old_key = discovery_stable_key("fixture-repo", "CapabilityDeclaration", "old-llm") new_key = discovery_stable_key("fixture-repo", "CapabilityDeclaration", "fixture.ops") duplicate_key = discovery_stable_key( "fixture-repo", "ServiceDeclaration", "Fixture API", source_anchor={"path": "README.md", "line_start": 5}, ) previous = _snapshot( replacement_scopes=[scope_replace, scope_additive], nodes=[ _node(service_key, "ServiceDeclaration", "Fixture API", scope_replace["id"], attributes={"owner": "old"}), _node(old_key, "ServiceDeclaration", "Old API", scope_replace["id"]), _node(additive_old_key, "CapabilityDeclaration", "Old LLM", scope_additive["id"], origin="llm"), ], tombstones=[ { "stable_key": discovery_stable_key("fixture-repo", "ServiceDeclaration", "Ancient API"), "entity_kind": "node", "replacement_scope": scope_replace["id"], "retired_at": "2026-05-18T00:00:00Z", "reason": "source_missing", } ], ) current = _snapshot( replacement_scopes=[scope_replace, scope_additive], nodes=[ _node(service_key, "ServiceDeclaration", "Fixture API", scope_replace["id"], origin="llm", attributes={"owner": "llm"}), _node( service_key, "ServiceDeclaration", "Fixture API", scope_replace["id"], origin="repo_declaration", review_state="accepted", attributes={"owner": "declared"}, ), _node(new_key, "CapabilityDeclaration", "Fixture Ops", scope_replace["id"]), _node(duplicate_key, "ServiceDeclaration", "Fixture API", scope_replace["id"], aliases=["fixture.api.copy"]), ], ) reconciled = reconcile_discovery_snapshots(previous, current, retired_at="2026-05-19T00:00:00Z") _validate_schema("discovery-snapshot.schema.yaml", reconciled) nodes = {node["stable_key"]: node for node in reconciled["candidates"]["nodes"]} assert nodes[service_key]["origin"] == "repo_declaration" assert nodes[service_key]["attributes"]["owner"] == "declared" assert nodes[service_key]["review_state"] == "needs_review" assert nodes[duplicate_key]["status"] == "conflicted" diff = reconciled["reconciliation"]["diff"] assert new_key in diff["added"] assert duplicate_key in diff["added"] assert service_key in diff["changed"] assert old_key in diff["retired"] assert additive_old_key not in diff["retired"] assert {service_key, duplicate_key} <= set(diff["conflicted"]) assert any(conflict["type"] == "possible_duplicate_node" for conflict in reconciled["reconciliation"]["conflicts"]) assert any(tombstone["stable_key"] == old_key for tombstone in reconciled["tombstones"]) assert any(tombstone["stable_key"].endswith("ancient-api") for tombstone in reconciled["tombstones"]) def test_reconciliation_keeps_distinct_path_scoped_lockfiles_separate() -> None: scope = _scope("lockfiles", "file", "var/checkouts", "replacement") uv_a = "discovery:fixture-repo:lockfile:var-checkouts-a-uv.lock" uv_b = "discovery:fixture-repo:lockfile:var-checkouts-b-uv.lock" current = _snapshot( replacement_scopes=[scope], nodes=[ _node(uv_a, "Lockfile", "uv.lock", scope["id"], source_path="var/checkouts/a/uv.lock"), _node(uv_b, "Lockfile", "uv.lock", scope["id"], source_path="var/checkouts/b/uv.lock"), ], ) reconciled = reconcile_discovery_snapshots(None, current, retired_at="2026-05-19T00:00:00Z") assert reconciled["reconciliation"]["conflicts"] == [] assert reconciled["reconciliation"]["diff"]["conflicted"] == [] nodes = {node["stable_key"]: node for node in reconciled["candidates"]["nodes"]} assert nodes[uv_a]["review_state"] == "candidate" assert nodes[uv_b]["review_state"] == "candidate" def test_scan_cli_reconciles_against_previous_snapshot(tmp_path: Path, capsys) -> None: repo = tmp_path / "fixture-repo" repo.mkdir() (repo / "README.md").write_text("# Fixture Repo\n", encoding="utf-8") previous = scan_repo( ScanOptions( repo_path=repo, repo_slug="fixture-repo", repo_name="Fixture Repo", commit="old", ) ) scope_id = next(scope["id"] for scope in previous["replacement_scopes"] if scope["extractor_id"] == "repo-metadata") vanished_key = discovery_stable_key("fixture-repo", "ServiceDeclaration", "Vanished API") previous["candidates"]["nodes"].append( _node(vanished_key, "ServiceDeclaration", "Vanished API", scope_id) ) previous_path = tmp_path / "previous.json" output_path = tmp_path / "current.json" previous_path.write_text(json.dumps(previous), encoding="utf-8") assert cli_main( [ "scan", str(repo), "--repo-slug", "fixture-repo", "--repo-name", "Fixture Repo", "--commit", "new", "--previous-snapshot", str(previous_path), "--output", str(output_path), ] ) == 0 summary = capsys.readouterr().out assert "diff +" in summary payload = json.loads(output_path.read_text(encoding="utf-8")) _validate_schema("discovery-snapshot.schema.yaml", payload) assert vanished_key in payload["reconciliation"]["diff"]["retired"] assert any(tombstone["stable_key"] == vanished_key for tombstone in payload["tombstones"]) def test_three_rescans_keep_stable_identity_and_retire_vanished_evidence(tmp_path: Path) -> None: repo = tmp_path / "fixture-repo" repo.mkdir() (repo / "README.md").write_text("# Fixture Repo\n", encoding="utf-8") _write_pyproject(repo, ["PyYAML>=6.0"]) first = scan_repo( ScanOptions( repo_path=repo, repo_slug="fixture-repo", repo_name="Fixture Repo", commit="commit-1", ) ) _assert_unique_candidate_keys(first) _write_pyproject(repo, ["PyYAML>=6.0", "requests>=2.31"]) second = reconcile_discovery_snapshots( first, scan_repo( ScanOptions( repo_path=repo, repo_slug="fixture-repo", repo_name="Fixture Repo", commit="commit-2", ) ), ) _validate_schema("discovery-snapshot.schema.yaml", second) _assert_unique_candidate_keys(second) requests_key = discovery_stable_key("fixture-repo", "ExternalLibrary", "requests") pyyaml_key = discovery_stable_key("fixture-repo", "ExternalLibrary", "PyYAML") assert requests_key in second["reconciliation"]["diff"]["added"] assert requests_key in {node["stable_key"] for node in second["candidates"]["nodes"]} _write_pyproject(repo, ["PyYAML>=6.0"]) third = reconcile_discovery_snapshots( second, scan_repo( ScanOptions( repo_path=repo, repo_slug="fixture-repo", repo_name="Fixture Repo", commit="commit-3", ) ), ) _validate_schema("discovery-snapshot.schema.yaml", third) _assert_unique_candidate_keys(third) assert requests_key in third["reconciliation"]["diff"]["retired"] assert pyyaml_key not in third["reconciliation"]["diff"]["retired"] assert requests_key not in {node["stable_key"] for node in third["candidates"]["nodes"]} assert any(tombstone["stable_key"] == requests_key for tombstone in third["tombstones"]) def _snapshot( *, replacement_scopes: list[dict[str, object]], nodes: list[dict[str, object]], tombstones: list[dict[str, object]] | None = None, ) -> dict[str, object]: return { "apiVersion": "railiance.fabric/v1alpha1", "kind": "FabricDiscoverySnapshot", "generated_at": "2026-05-19T00:00:00Z", "source": {"repo_slug": "fixture-repo", "repo_name": "Fixture Repo", "commit": "abc123"}, "scan": { "run_id": "scan:fixture-repo:deterministic:abc123", "profile": "deterministic", "deterministic_only": True, "llm_enabled": False, }, "replacement_scopes": replacement_scopes, "candidates": {"nodes": nodes, "edges": [], "attributes": []}, "tombstones": tombstones or [], "reconciliation": { "precedence": ["repo_declaration", "deterministic", "catalog", "registry", "llm", "manual"], "duplicate_policy": "stable-key matches merge automatically", "retirement_policy": "missing candidates retire only inside their replacement scope", }, } def _scope(extractor_id: str, source_kind: str, source_path: str, mode: str) -> dict[str, object]: return { "id": replacement_scope_id("fixture-repo", extractor_id, source_kind, source_path=source_path), "extractor_id": extractor_id, "source_kind": source_kind, "source_path": source_path, "mode": mode, } def _node( stable_key: str, kind: str, label: str, replacement_scope: str, *, origin: str = "deterministic", review_state: str = "candidate", aliases: list[str] | None = None, attributes: dict[str, object] | None = None, source_path: str = "README.md", ) -> dict[str, object]: anchor = _anchor("file", source_path) return { "stable_key": stable_key, "kind": kind, "label": label, "repo": "fixture-repo", "aliases": aliases or [label], "attributes": attributes or {}, "origin": origin, "review_state": review_state, "status": "active", "confidence": 0.9, "replacement_scope": replacement_scope, "provenance": [ { "extractor_id": "fixture", "extractor_version": "0.1.0", "method": "declaration" if origin == "repo_declaration" else "deterministic" if origin == "deterministic" else "llm", "origin": origin, } ], "source_anchors": [anchor], } def _anchor(source_kind: str, path: str) -> dict[str, object]: anchor = {"source_kind": source_kind, "path": path} anchor["fingerprint"] = source_fingerprint(anchor) return anchor def _write_pyproject(repo: Path, dependencies: list[str]) -> None: dependency_lines = "\n".join(f' "{dependency}",' for dependency in dependencies) (repo / "pyproject.toml").write_text( f""" [project] name = "fixture-service" version = "0.1.0" dependencies = [ {dependency_lines} ] """.lstrip(), encoding="utf-8", ) def _assert_unique_candidate_keys(snapshot: dict[str, object]) -> None: candidates = snapshot["candidates"] assert isinstance(candidates, dict) for collection_name in ("nodes", "edges", "attributes"): collection = candidates[collection_name] assert isinstance(collection, list) stable_keys = [item["stable_key"] for item in collection] assert len(stable_keys) == len(set(stable_keys)) def _validate_schema(schema_name: str, payload: dict[str, object]) -> None: validator = draft202012_validator(Path("schemas") / schema_name) validator.validate(payload)