Files
railiance-fabric/tests/test_reconciliation.py

317 lines
12 KiB
Python

from __future__ import annotations
import json
from pathlib import Path
from railiance_fabric.cli import main as cli_main
from railiance_fabric.discovery import discovery_stable_key, replacement_scope_id, source_fingerprint
from railiance_fabric.reconciliation import reconcile_discovery_snapshots
from railiance_fabric.scanner import ScanOptions, scan_repo
from railiance_fabric.schema_validation import draft202012_validator
def test_reconciliation_dedupes_diffs_and_tombstones_by_scope() -> None:
scope_replace = _scope("deterministic", "file", "README.md", "replacement")
scope_additive = _scope("llm-connect-repo-evidence", "llm", "bundle", "additive")
service_key = discovery_stable_key("fixture-repo", "ServiceDeclaration", "fixture.api")
old_key = discovery_stable_key("fixture-repo", "ServiceDeclaration", "old.api")
additive_old_key = discovery_stable_key("fixture-repo", "CapabilityDeclaration", "old-llm")
new_key = discovery_stable_key("fixture-repo", "CapabilityDeclaration", "fixture.ops")
duplicate_key = discovery_stable_key(
"fixture-repo",
"ServiceDeclaration",
"Fixture API",
source_anchor={"path": "README.md", "line_start": 5},
)
previous = _snapshot(
replacement_scopes=[scope_replace, scope_additive],
nodes=[
_node(service_key, "ServiceDeclaration", "Fixture API", scope_replace["id"], attributes={"owner": "old"}),
_node(old_key, "ServiceDeclaration", "Old API", scope_replace["id"]),
_node(additive_old_key, "CapabilityDeclaration", "Old LLM", scope_additive["id"], origin="llm"),
],
tombstones=[
{
"stable_key": discovery_stable_key("fixture-repo", "ServiceDeclaration", "Ancient API"),
"entity_kind": "node",
"replacement_scope": scope_replace["id"],
"retired_at": "2026-05-18T00:00:00Z",
"reason": "source_missing",
}
],
)
current = _snapshot(
replacement_scopes=[scope_replace, scope_additive],
nodes=[
_node(service_key, "ServiceDeclaration", "Fixture API", scope_replace["id"], origin="llm", attributes={"owner": "llm"}),
_node(
service_key,
"ServiceDeclaration",
"Fixture API",
scope_replace["id"],
origin="repo_declaration",
review_state="accepted",
attributes={"owner": "declared"},
),
_node(new_key, "CapabilityDeclaration", "Fixture Ops", scope_replace["id"]),
_node(duplicate_key, "ServiceDeclaration", "Fixture API", scope_replace["id"], aliases=["fixture.api.copy"]),
],
)
reconciled = reconcile_discovery_snapshots(previous, current, retired_at="2026-05-19T00:00:00Z")
_validate_schema("discovery-snapshot.schema.yaml", reconciled)
nodes = {node["stable_key"]: node for node in reconciled["candidates"]["nodes"]}
assert nodes[service_key]["origin"] == "repo_declaration"
assert nodes[service_key]["attributes"]["owner"] == "declared"
assert nodes[service_key]["review_state"] == "needs_review"
assert nodes[duplicate_key]["status"] == "conflicted"
diff = reconciled["reconciliation"]["diff"]
assert new_key in diff["added"]
assert duplicate_key in diff["added"]
assert service_key in diff["changed"]
assert old_key in diff["retired"]
assert additive_old_key not in diff["retired"]
assert {service_key, duplicate_key} <= set(diff["conflicted"])
assert any(conflict["type"] == "possible_duplicate_node" for conflict in reconciled["reconciliation"]["conflicts"])
assert any(tombstone["stable_key"] == old_key for tombstone in reconciled["tombstones"])
assert any(tombstone["stable_key"].endswith("ancient-api") for tombstone in reconciled["tombstones"])
def test_reconciliation_keeps_distinct_path_scoped_lockfiles_separate() -> None:
scope = _scope("lockfiles", "file", "var/checkouts", "replacement")
uv_a = "discovery:fixture-repo:lockfile:var-checkouts-a-uv.lock"
uv_b = "discovery:fixture-repo:lockfile:var-checkouts-b-uv.lock"
current = _snapshot(
replacement_scopes=[scope],
nodes=[
_node(uv_a, "Lockfile", "uv.lock", scope["id"], source_path="var/checkouts/a/uv.lock"),
_node(uv_b, "Lockfile", "uv.lock", scope["id"], source_path="var/checkouts/b/uv.lock"),
],
)
reconciled = reconcile_discovery_snapshots(None, current, retired_at="2026-05-19T00:00:00Z")
assert reconciled["reconciliation"]["conflicts"] == []
assert reconciled["reconciliation"]["diff"]["conflicted"] == []
nodes = {node["stable_key"]: node for node in reconciled["candidates"]["nodes"]}
assert nodes[uv_a]["review_state"] == "candidate"
assert nodes[uv_b]["review_state"] == "candidate"
def test_scan_cli_reconciles_against_previous_snapshot(tmp_path: Path, capsys) -> None:
repo = tmp_path / "fixture-repo"
repo.mkdir()
(repo / "README.md").write_text("# Fixture Repo\n", encoding="utf-8")
previous = scan_repo(
ScanOptions(
repo_path=repo,
repo_slug="fixture-repo",
repo_name="Fixture Repo",
commit="old",
)
)
scope_id = next(scope["id"] for scope in previous["replacement_scopes"] if scope["extractor_id"] == "repo-metadata")
vanished_key = discovery_stable_key("fixture-repo", "ServiceDeclaration", "Vanished API")
previous["candidates"]["nodes"].append(
_node(vanished_key, "ServiceDeclaration", "Vanished API", scope_id)
)
previous_path = tmp_path / "previous.json"
output_path = tmp_path / "current.json"
previous_path.write_text(json.dumps(previous), encoding="utf-8")
assert cli_main(
[
"scan",
str(repo),
"--repo-slug",
"fixture-repo",
"--repo-name",
"Fixture Repo",
"--commit",
"new",
"--previous-snapshot",
str(previous_path),
"--output",
str(output_path),
]
) == 0
summary = capsys.readouterr().out
assert "diff +" in summary
payload = json.loads(output_path.read_text(encoding="utf-8"))
_validate_schema("discovery-snapshot.schema.yaml", payload)
assert vanished_key in payload["reconciliation"]["diff"]["retired"]
assert any(tombstone["stable_key"] == vanished_key for tombstone in payload["tombstones"])
def test_three_rescans_keep_stable_identity_and_retire_vanished_evidence(tmp_path: Path) -> None:
repo = tmp_path / "fixture-repo"
repo.mkdir()
(repo / "README.md").write_text("# Fixture Repo\n", encoding="utf-8")
_write_pyproject(repo, ["PyYAML>=6.0"])
first = scan_repo(
ScanOptions(
repo_path=repo,
repo_slug="fixture-repo",
repo_name="Fixture Repo",
commit="commit-1",
)
)
_assert_unique_candidate_keys(first)
_write_pyproject(repo, ["PyYAML>=6.0", "requests>=2.31"])
second = reconcile_discovery_snapshots(
first,
scan_repo(
ScanOptions(
repo_path=repo,
repo_slug="fixture-repo",
repo_name="Fixture Repo",
commit="commit-2",
)
),
)
_validate_schema("discovery-snapshot.schema.yaml", second)
_assert_unique_candidate_keys(second)
requests_key = discovery_stable_key("fixture-repo", "ExternalLibrary", "requests")
pyyaml_key = discovery_stable_key("fixture-repo", "ExternalLibrary", "PyYAML")
assert requests_key in second["reconciliation"]["diff"]["added"]
assert requests_key in {node["stable_key"] for node in second["candidates"]["nodes"]}
_write_pyproject(repo, ["PyYAML>=6.0"])
third = reconcile_discovery_snapshots(
second,
scan_repo(
ScanOptions(
repo_path=repo,
repo_slug="fixture-repo",
repo_name="Fixture Repo",
commit="commit-3",
)
),
)
_validate_schema("discovery-snapshot.schema.yaml", third)
_assert_unique_candidate_keys(third)
assert requests_key in third["reconciliation"]["diff"]["retired"]
assert pyyaml_key not in third["reconciliation"]["diff"]["retired"]
assert requests_key not in {node["stable_key"] for node in third["candidates"]["nodes"]}
assert any(tombstone["stable_key"] == requests_key for tombstone in third["tombstones"])
def _snapshot(
*,
replacement_scopes: list[dict[str, object]],
nodes: list[dict[str, object]],
tombstones: list[dict[str, object]] | None = None,
) -> dict[str, object]:
return {
"apiVersion": "railiance.fabric/v1alpha1",
"kind": "FabricDiscoverySnapshot",
"generated_at": "2026-05-19T00:00:00Z",
"source": {"repo_slug": "fixture-repo", "repo_name": "Fixture Repo", "commit": "abc123"},
"scan": {
"run_id": "scan:fixture-repo:deterministic:abc123",
"profile": "deterministic",
"deterministic_only": True,
"llm_enabled": False,
},
"replacement_scopes": replacement_scopes,
"candidates": {"nodes": nodes, "edges": [], "attributes": []},
"tombstones": tombstones or [],
"reconciliation": {
"precedence": ["repo_declaration", "deterministic", "catalog", "registry", "llm", "manual"],
"duplicate_policy": "stable-key matches merge automatically",
"retirement_policy": "missing candidates retire only inside their replacement scope",
},
}
def _scope(extractor_id: str, source_kind: str, source_path: str, mode: str) -> dict[str, object]:
return {
"id": replacement_scope_id("fixture-repo", extractor_id, source_kind, source_path=source_path),
"extractor_id": extractor_id,
"source_kind": source_kind,
"source_path": source_path,
"mode": mode,
}
def _node(
stable_key: str,
kind: str,
label: str,
replacement_scope: str,
*,
origin: str = "deterministic",
review_state: str = "candidate",
aliases: list[str] | None = None,
attributes: dict[str, object] | None = None,
source_path: str = "README.md",
) -> dict[str, object]:
anchor = _anchor("file", source_path)
return {
"stable_key": stable_key,
"kind": kind,
"label": label,
"repo": "fixture-repo",
"aliases": aliases or [label],
"attributes": attributes or {},
"origin": origin,
"review_state": review_state,
"status": "active",
"confidence": 0.9,
"replacement_scope": replacement_scope,
"provenance": [
{
"extractor_id": "fixture",
"extractor_version": "0.1.0",
"method": "declaration" if origin == "repo_declaration" else "deterministic" if origin == "deterministic" else "llm",
"origin": origin,
}
],
"source_anchors": [anchor],
}
def _anchor(source_kind: str, path: str) -> dict[str, object]:
anchor = {"source_kind": source_kind, "path": path}
anchor["fingerprint"] = source_fingerprint(anchor)
return anchor
def _write_pyproject(repo: Path, dependencies: list[str]) -> None:
dependency_lines = "\n".join(f' "{dependency}",' for dependency in dependencies)
(repo / "pyproject.toml").write_text(
f"""
[project]
name = "fixture-service"
version = "0.1.0"
dependencies = [
{dependency_lines}
]
""".lstrip(),
encoding="utf-8",
)
def _assert_unique_candidate_keys(snapshot: dict[str, object]) -> None:
candidates = snapshot["candidates"]
assert isinstance(candidates, dict)
for collection_name in ("nodes", "edges", "attributes"):
collection = candidates[collection_name]
assert isinstance(collection, list)
stable_keys = [item["stable_key"] for item in collection]
assert len(stable_keys) == len(set(stable_keys))
def _validate_schema(schema_name: str, payload: dict[str, object]) -> None:
validator = draft202012_validator(Path("schemas") / schema_name)
validator.validate(payload)