Files
railiance-fabric/railiance_fabric/cli.py

1675 lines
68 KiB
Python

from __future__ import annotations
import argparse
import hashlib
import json
import os
import re
import subprocess
import sys
import urllib.error
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import quote
from .accountability_roots import DEFAULT_ROOT_MANIFEST_PATH, collect_accountability_root_evidence
from .connectors import ConnectorConfig
from .financial_baseline import financial_export_from_legacy
from .loader import declaration_files, load_yaml
from .graph import FabricGraph, build_graph
from .graph_explorer import fabric_graph_explorer_payload
from .llm_extraction import LLMExtractionConfig
from .reconciliation import reconcile_discovery_snapshots
from .registry import RESET_CONFIRMATION_TOKEN
from .scanner import EXTRACTOR_VERSION, ScanOptions, scan_repo
from .validation import validate_roots
DEFAULT_DISCOVERY_DIR = ".fabric-discovery"
DEFAULT_SNAPSHOT_DIR = "snapshots"
DEFAULT_REPORT_DIR = "reports"
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="railiance-fabric",
description="Load and validate Railiance Fabric declarations.",
)
sub = parser.add_subparsers(dest="command", required=True)
validate = sub.add_parser(
"validate",
help="Validate one or more repo roots or declaration files.",
)
validate.add_argument(
"paths",
nargs="+",
type=Path,
help="Repo root, fabric directory, or declaration YAML file.",
)
validate.add_argument(
"--warnings-as-errors",
action="store_true",
help="Exit non-zero when warnings are present.",
)
providers = sub.add_parser("providers", help="List providers for a capability type or id.")
providers.add_argument("capability", help="Capability type or capability id.")
providers.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
consumers = sub.add_parser("consumers", help="List consumers of a capability or interface.")
consumers.add_argument("target", help="Capability/interface type or declaration id.")
consumers.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
dependency_path = sub.add_parser("dependency-path", help="Show dependency path for a service.")
dependency_path.add_argument("service_id", help="Service declaration id.")
dependency_path.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
unresolved = sub.add_parser("unresolved", help="Show missing or unresolved dependencies.")
unresolved.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
blast = sub.add_parser("blast-radius", help="Show consumers affected by an interface change.")
blast.add_argument("interface", help="Interface type or interface declaration id.")
blast.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
export = sub.add_parser("export", help="Export graph as JSON, Mermaid, graph-explorer, or financial payload.")
export.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
export.add_argument("--format", choices=["json", "mermaid", "graph-explorer", "financial"], default="json")
scan = sub.add_parser("scan", help="Scan a repo for deterministic discovery candidates.")
scan.add_argument("path", nargs="?", type=Path, default=Path("."))
scan.add_argument("--repo-slug", default=None)
scan.add_argument("--repo-name", default=None)
scan.add_argument("--domain", default=None)
scan.add_argument("--commit", default=None)
scan.add_argument("--profile", default="deterministic")
scan.add_argument("--dry-run", action="store_true", help="Do not write anywhere except an explicit --output file.")
scan.add_argument("--output", type=Path, default=None, help="Write the discovery snapshot JSON to a file.")
scan.add_argument("--previous-snapshot", type=Path, default=None, help="Reconcile against a previous discovery snapshot JSON.")
scan.add_argument("--json", action="store_true", help="Print the discovery snapshot JSON to stdout.")
scan.add_argument("--llm", action="store_true", help="Enable llm-connect assisted extraction.")
scan.add_argument("--llm-provider", default="mock", help="llm-connect provider name.")
scan.add_argument("--llm-model", default="mock", help="Model name passed to llm-connect.")
scan.add_argument("--llm-temperature", type=float, default=0.0)
scan.add_argument("--llm-max-tokens", type=int, default=1500)
scan.add_argument("--llm-min-confidence", type=float, default=0.6)
scan.add_argument(
"--connector",
action="append",
choices=["local-fabric-registry"],
default=[],
help="Enable a discovery connector. May be passed more than once.",
)
scan.add_argument(
"--connector-manifest",
type=Path,
default=Path("registry/local-repos.yaml"),
help="Manifest path for the local-fabric-registry connector.",
)
discover_roots = sub.add_parser(
"discover-roots",
help="Collect raw evidence from accountability root manifest entries.",
)
discover_roots.add_argument("--manifest", type=Path, default=DEFAULT_ROOT_MANIFEST_PATH)
discover_roots.add_argument("--include-remote", action="store_true", help="Allow HTTP reads from configured remote roots.")
discover_roots.add_argument("--max-items-per-root", type=int, default=200)
registry = sub.add_parser("registry", help="Feed a running Railiance Fabric registry service.")
registry_sub = registry.add_subparsers(dest="registry_command", required=True)
sync = registry_sub.add_parser("sync", help="Register a repo and ingest its current graph snapshot.")
sync.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
sync.add_argument("--registry-url", default="http://127.0.0.1:8765")
sync.add_argument("--repo-slug", default=None)
sync.add_argument("--name", default=None)
sync.add_argument("--remote-url", default=None)
sync.add_argument("--default-branch", default="main")
sync.add_argument("--state-hub-repo-id", default=None)
sync.add_argument("--commit", default=None)
sync.add_argument("--json", action="store_true", help="Print the raw snapshot response.")
sync_manifest = registry_sub.add_parser("sync-manifest", help="Register and sync repos from an onboarding manifest.")
sync_manifest.add_argument("manifest", type=Path)
sync_manifest.add_argument("--registry-url", default=None, help="Override the manifest registry_url.")
sync_manifest.add_argument("--strict", action="store_true", help="Exit non-zero when any repo cannot be synced.")
sync_manifest.add_argument("--json", action="store_true", help="Print the raw manifest sync summary.")
scan_manifest = registry_sub.add_parser("scan-manifest", help="Run discovery scans from an onboarding manifest.")
scan_manifest.add_argument("manifest", type=Path)
scan_manifest.add_argument("--registry-url", default=None, help="Override the manifest registry_url.")
scan_manifest.add_argument("--repo-slug", action="append", default=[], help="Only scan this repo slug. May be repeated.")
scan_manifest.add_argument("--profile", default="deterministic", help="Discovery scan profile name.")
scan_manifest.add_argument("--output-dir", type=Path, default=None, help="Write one discovery snapshot JSON per scanned repo.")
scan_manifest.add_argument("--previous-dir", type=Path, default=None, help="Read previous per-repo snapshots for reconciliation.")
scan_manifest.add_argument(
"--cache-dir",
type=Path,
default=None,
help="Operational cache directory. Defaults to <manifest-dir>/.fabric-discovery.",
)
scan_manifest.add_argument(
"--no-cache",
action="store_true",
help="Disable default cache snapshots and report output unless explicit paths are provided.",
)
scan_manifest.add_argument(
"--previous-source",
choices=["dir", "registry", "none"],
default="dir",
help="Where to read previous discovery snapshots from before reconciling.",
)
scan_manifest.add_argument(
"--previous-from-registry",
action="store_true",
help="Shortcut for --previous-source registry.",
)
scan_manifest.add_argument("--report-output", type=Path, default=None, help="Write an operational rescan report JSON.")
scan_manifest.add_argument("--dry-run", action="store_true", help="Do not write to the registry.")
scan_manifest.add_argument("--ingest", action="store_true", help="Store each discovery snapshot in the registry.")
scan_manifest.add_argument(
"--ingest-unchanged",
action="store_true",
help="Also ingest unchanged snapshots when --ingest is enabled.",
)
scan_manifest.add_argument("--accept", action="store_true", help="Accept ingested snapshots after scanning.")
scan_manifest.add_argument(
"--accepted-key",
action="append",
default=[],
help="Candidate stable key to include during acceptance. May be repeated.",
)
scan_manifest.add_argument(
"--accept-review-state",
action="append",
default=None,
help="Review state accepted during projection. Defaults to accepted.",
)
scan_manifest.add_argument(
"--accept-policy",
choices=["safe", "explicit"],
default="safe",
help="Safe blocks conflicts/tombstones and accepts only accepted review state; explicit allows provided overrides.",
)
scan_manifest.add_argument("--strict", action="store_true", help="Exit non-zero when any repo cannot be scanned or stored.")
scan_manifest.add_argument(
"--exit-code-mode",
choices=["default", "operational"],
default="default",
help="Use operational exit codes: 2 changes/baseline, 3 review required, 4 partial failure.",
)
scan_manifest.add_argument("--lock-file", type=Path, default=None, help="Lock file path for automation-safe runs.")
scan_manifest.add_argument("--no-lock", action="store_true", help="Disable the default cache-directory rescan lock.")
scan_manifest.add_argument("--json", action="store_true", help="Print the raw manifest scan summary.")
scan_manifest.add_argument("--llm", action="store_true", help="Enable llm-connect assisted extraction.")
scan_manifest.add_argument("--deterministic-only", action="store_true", help="Force deterministic-only scans even when --llm is set.")
scan_manifest.add_argument("--llm-provider", default="mock", help="llm-connect provider name.")
scan_manifest.add_argument("--llm-model", default="mock", help="Model name passed to llm-connect.")
scan_manifest.add_argument("--llm-temperature", type=float, default=0.0)
scan_manifest.add_argument("--llm-max-tokens", type=int, default=1500)
scan_manifest.add_argument("--llm-min-confidence", type=float, default=0.6)
scan_manifest.add_argument("--llm-max-runs", type=int, default=None, help="Maximum repos allowed to run LLM extraction.")
scan_manifest.add_argument(
"--connector",
action="append",
choices=["local-fabric-registry"],
default=[],
help="Enable a discovery connector for each repo. May be passed more than once.",
)
scan_manifest.add_argument(
"--connector-manifest",
type=Path,
default=None,
help="Manifest path for the local-fabric-registry connector. Defaults to the scanned manifest.",
)
rescan_status = registry_sub.add_parser("rescan-status", help="Show discovery rescan freshness and review status.")
rescan_status.add_argument("--registry-url", default="http://127.0.0.1:8765")
rescan_status.add_argument("--review-only", action="store_true", help="Only show repos whose latest discovery needs review.")
rescan_status.add_argument("--stale-after-hours", type=float, default=None, help="Only show discovery snapshots older than this age.")
rescan_status.add_argument("--json", action="store_true", help="Print the raw rescan status summary.")
cyclonedx = registry_sub.add_parser("ingest-cyclonedx", help="Ingest a CycloneDX SBOM as library inventory.")
cyclonedx.add_argument("sbom", type=Path)
cyclonedx.add_argument("--registry-url", default="http://127.0.0.1:8765")
cyclonedx.add_argument("--repo-slug", required=True)
cyclonedx.add_argument("--json", action="store_true", help="Print the raw ingest response.")
discovery = registry_sub.add_parser("ingest-discovery", help="Store a discovery snapshot for review.")
discovery.add_argument("snapshot", type=Path)
discovery.add_argument("--registry-url", default="http://127.0.0.1:8765")
discovery.add_argument("--repo-slug", default=None)
discovery.add_argument("--json", action="store_true", help="Print the raw ingest response.")
accept_discovery = registry_sub.add_parser("accept-discovery", help="Project accepted discovery candidates into a graph snapshot.")
accept_discovery.add_argument("repo_slug")
accept_discovery.add_argument("discovery_snapshot_id", type=int)
accept_discovery.add_argument("--registry-url", default="http://127.0.0.1:8765")
accept_discovery.add_argument("--accepted-key", action="append", default=[])
accept_discovery.add_argument("--accept-review-state", action="append", default=None)
accept_discovery.add_argument("--commit", default=None)
accept_discovery.add_argument("--json", action="store_true", help="Print the raw accept response.")
export_archive = registry_sub.add_parser(
"export-reset-archive",
help="Export registry graph/discovery data before a guarded reset.",
)
export_archive.add_argument("output", type=Path)
export_archive.add_argument("--registry-url", default="http://127.0.0.1:8765")
export_archive.add_argument("--overwrite", action="store_true", help="Overwrite an existing archive file.")
export_archive.add_argument("--json", action="store_true", help="Print archive metadata.")
reset_graph = registry_sub.add_parser(
"reset-graph-data",
help="Export an archive, then reset registry graph/discovery data with an explicit confirmation token.",
)
reset_graph.add_argument("--registry-url", default="http://127.0.0.1:8765")
reset_graph.add_argument("--archive", type=Path, required=True, help="Archive JSON file to write before reset.")
reset_graph.add_argument("--overwrite-archive", action="store_true", help="Overwrite an existing archive file.")
reset_graph.add_argument("--confirm", required=True, help=f"Must equal {RESET_CONFIRMATION_TOKEN}.")
reset_graph.add_argument("--reason", required=True)
reset_graph.add_argument("--json", action="store_true", help="Print the raw reset response.")
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
if args.command == "validate":
report = validate_roots(args.paths)
for diagnostic in report.diagnostics:
print(diagnostic.format())
print(report.summary())
if report.errors:
return 1
if args.warnings_as_errors and report.warnings:
return 1
return 0
if args.command == "providers":
graph = _load_graph_or_exit(args.paths)
_print_providers(graph, args.capability)
return 0
if args.command == "consumers":
graph = _load_graph_or_exit(args.paths)
_print_consumers(graph, args.target)
return 0
if args.command == "dependency-path":
graph = _load_graph_or_exit(args.paths)
print("\n".join(graph.dependency_path_lines(args.service_id)))
return 0
if args.command == "unresolved":
graph = _load_graph_or_exit(args.paths)
_print_unresolved(graph)
return 0
if args.command == "blast-radius":
graph = _load_graph_or_exit(args.paths)
_print_consumers(graph, args.interface, matches=graph.blast_radius(args.interface))
return 0
if args.command == "export":
graph = _load_graph_or_exit(args.paths)
if args.format == "mermaid":
print(graph.to_mermaid())
elif args.format == "graph-explorer":
print(json.dumps(fabric_graph_explorer_payload(graph.to_export()), indent=2, sort_keys=True))
elif args.format == "financial":
print(json.dumps(financial_export_from_legacy(graph.to_export()), indent=2, sort_keys=True))
else:
print(graph.to_json())
return 0
if args.command == "scan":
return _scan_repo(args)
if args.command == "discover-roots":
payload = collect_accountability_root_evidence(
args.manifest,
include_remote=args.include_remote,
max_items_per_root=args.max_items_per_root,
)
print(json.dumps(payload, indent=2, sort_keys=True))
return 0
if args.command == "registry":
if args.registry_command == "sync":
return _registry_sync(args)
if args.registry_command == "sync-manifest":
return _registry_sync_manifest(args)
if args.registry_command == "scan-manifest":
return _registry_scan_manifest(args)
if args.registry_command == "rescan-status":
return _registry_rescan_status(args)
if args.registry_command == "ingest-cyclonedx":
return _registry_ingest_cyclonedx(args)
if args.registry_command == "ingest-discovery":
return _registry_ingest_discovery(args)
if args.registry_command == "accept-discovery":
return _registry_accept_discovery(args)
if args.registry_command == "export-reset-archive":
return _registry_export_reset_archive(args)
if args.registry_command == "reset-graph-data":
return _registry_reset_graph_data(args)
parser.error(f"unknown command {args.command!r}")
return 2
def _registry_sync(args: argparse.Namespace) -> int:
report = validate_roots(args.paths)
for diagnostic in report.diagnostics:
print(diagnostic.format(), file=sys.stderr)
if report.errors:
print(report.summary(), file=sys.stderr)
return 1
graph = _load_graph_or_exit(args.paths)
repo_path = _primary_repo_path(args.paths)
repo_slug = args.repo_slug or _slugify(repo_path.name)
repository = _registry_post(
args.registry_url,
"/repositories",
{
"slug": repo_slug,
"name": args.name or repo_path.name,
"remote_url": args.remote_url or _git_value(repo_path, "config", "--get", "remote.origin.url"),
"default_branch": args.default_branch,
"state_hub_repo_id": args.state_hub_repo_id,
},
)
snapshot = _registry_post(
args.registry_url,
f"/repositories/{repo_slug}/snapshots",
{
"commit": args.commit or _git_value(repo_path, "rev-parse", "HEAD") or "working-tree",
"generated_at": _utc_now(),
"graph": graph.to_export(),
},
)
if args.json:
print(json.dumps({"repository": repository, "snapshot": snapshot}, indent=2, sort_keys=True))
else:
print(f"registered {repository['slug']}")
print(f"snapshot {snapshot['id']} accepted for {snapshot['commit']}")
return 0
def _registry_sync_manifest(args: argparse.Namespace) -> int:
manifest_path = args.manifest.resolve()
manifest = load_yaml(manifest_path)
if not isinstance(manifest, dict):
print(f"ERROR {manifest_path}: manifest must be a YAML mapping", file=sys.stderr)
return 1
repositories = manifest.get("repositories")
if not isinstance(repositories, list):
print(f"ERROR {manifest_path}: manifest requires a repositories list", file=sys.stderr)
return 1
registry_url = args.registry_url or str(manifest.get("registry_url") or "http://127.0.0.1:8765")
results = [
_sync_manifest_repo(registry_url, manifest_path.parent, item)
for item in repositories
]
summary = {
"manifest": str(manifest_path),
"registry_url": registry_url,
"repositories": results,
"counts": {
"total": len(results),
"synced": sum(1 for item in results if item["status"] == "synced"),
"registered": sum(1 for item in results if item["status"] == "registered"),
"errors": sum(1 for item in results if item["status"] == "error"),
},
}
if args.json:
print(json.dumps(summary, indent=2, sort_keys=True))
else:
for item in results:
if item["status"] == "synced":
print(f"synced {item['slug']} snapshot {item['snapshot_id']} ({item['commit']})")
elif item["status"] == "registered":
print(f"registered {item['slug']} ({'; '.join(item['warnings'])})")
else:
print(f"error {item['slug']}: {item['error']}")
counts = summary["counts"]
print(
f"summary: {counts['total']} repo(s), {counts['synced']} synced, "
f"{counts['registered']} registered only, {counts['errors']} error(s)"
)
if args.strict and (summary["counts"]["errors"] or summary["counts"]["registered"]):
return 1
return 0
def _registry_scan_manifest(args: argparse.Namespace) -> int:
if args.llm_max_runs is not None and args.llm_max_runs < 0:
print("ERROR --llm-max-runs must be zero or greater", file=sys.stderr)
return 1
if args.accept and not args.ingest and not args.dry_run:
print("ERROR --accept requires --ingest unless --dry-run is set", file=sys.stderr)
return 1
if args.accept_policy == "safe":
if args.accept_review_state and set(args.accept_review_state) != {"accepted"}:
print("ERROR --accept-review-state requires --accept-policy explicit unless it is only 'accepted'", file=sys.stderr)
return 1
if args.accepted_key:
print("ERROR --accepted-key requires --accept-policy explicit", file=sys.stderr)
return 1
if args.previous_from_registry:
args.previous_source = "registry"
manifest_path = args.manifest.resolve()
manifest = load_yaml(manifest_path)
if not isinstance(manifest, dict):
print(f"ERROR {manifest_path}: manifest must be a YAML mapping", file=sys.stderr)
return 1
repositories = manifest.get("repositories")
if not isinstance(repositories, list):
print(f"ERROR {manifest_path}: manifest requires a repositories list", file=sys.stderr)
return 1
lock_path = _scan_manifest_lock_path(args, manifest_path)
lock_fd: int | None = None
if lock_path is not None:
try:
lock_fd = _scan_manifest_acquire_lock(lock_path)
except FileExistsError:
print(f"ERROR rescan lock already exists: {lock_path}", file=sys.stderr)
return 5 if args.exit_code_mode == "operational" else 1
registry_url = args.registry_url or str(manifest.get("registry_url") or "http://127.0.0.1:8765")
allowlist = {slug for slug in args.repo_slug if slug}
selected = [
item for item in repositories
if not allowlist or _manifest_repo_slug(item) in allowlist
]
llm_runs_used = 0
results: list[dict[str, Any]] = []
for item in selected:
llm_enabled = False
llm_skip_reason = "deterministic_only"
if args.llm and not args.deterministic_only:
if args.llm_max_runs is None or llm_runs_used < args.llm_max_runs:
llm_enabled = True
llm_skip_reason = ""
else:
llm_skip_reason = "budget_exhausted"
result = _scan_manifest_repo(
args,
registry_url,
manifest_path.parent,
manifest_path,
item,
llm_enabled=llm_enabled,
llm_skip_reason=llm_skip_reason,
)
if result.get("llm", {}).get("attempted"):
llm_runs_used += 1
results.append(result)
summary = {
"apiVersion": "railiance.fabric/v1alpha1",
"kind": "FabricDiscoveryRescanReport",
"manifest": str(manifest_path),
"registry_url": registry_url,
"generated_at": _utc_now(),
"profile": args.profile,
"dry_run": args.dry_run,
"ingest": args.ingest and not args.dry_run,
"accept": args.accept and args.ingest and not args.dry_run,
"allowlist": sorted(allowlist),
"cache": _scan_manifest_cache_config(args, manifest_path),
"scanner": {"extractor_version": EXTRACTOR_VERSION},
"previous_source": args.previous_source,
"accept_policy": args.accept_policy,
"llm": {
"requested": bool(args.llm),
"deterministic_only": bool(args.deterministic_only or not args.llm),
"max_runs": args.llm_max_runs,
"used_runs": llm_runs_used,
},
"repositories": results,
"counts": _scan_manifest_counts(results),
}
report_path = _scan_manifest_report_path(args, manifest_path, summary["generated_at"])
if report_path is not None:
summary["report_path"] = str(report_path)
try:
report_path.parent.mkdir(parents=True, exist_ok=True)
report_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n", encoding="utf-8")
except Exception as exc:
_scan_manifest_release_lock(lock_fd, lock_path)
print(f"ERROR cannot write rescan report {report_path}: {exc}", file=sys.stderr)
return 1
if args.json:
print(json.dumps(summary, indent=2, sort_keys=True))
else:
_print_scan_manifest_summary(summary)
if summary.get("report_path"):
print(f"report: {summary['report_path']}")
exit_code = _scan_manifest_exit_code(summary, args)
_scan_manifest_release_lock(lock_fd, lock_path)
return exit_code
def _registry_rescan_status(args: argparse.Namespace) -> int:
status = _registry_get_checked(args.registry_url, "/status")
snapshots = status.get("latest_discovery_snapshots", [])
if not isinstance(snapshots, list):
snapshots = []
now = datetime.now(timezone.utc)
filtered: list[dict[str, Any]] = []
for item in snapshots:
if not isinstance(item, dict):
continue
if args.review_only and not item.get("review_required"):
continue
age_hours = _age_hours(item.get("generated_at"), now)
item = {**item, "age_hours": age_hours}
if args.stale_after_hours is not None and (age_hours is None or age_hours <= args.stale_after_hours):
continue
filtered.append(item)
summary = {
"apiVersion": "railiance.fabric/v1alpha1",
"kind": "FabricDiscoveryRescanStatus",
"generated_at": _utc_now(),
"registry_url": args.registry_url,
"review_only": args.review_only,
"stale_after_hours": args.stale_after_hours,
"counts": {
"total": len(snapshots),
"shown": len(filtered),
"review_required": sum(
1 for item in snapshots
if isinstance(item, dict) and item.get("review_required")
),
},
"repositories": filtered,
}
if args.json:
print(json.dumps(summary, indent=2, sort_keys=True))
else:
for item in filtered:
age = item.get("age_hours")
age_text = f", age {age:.1f}h" if isinstance(age, float) else ""
diff = item.get("diff_counts", {}) if isinstance(item.get("diff_counts"), dict) else {}
print(
f"{item.get('health', 'unknown')} {item.get('repo_slug')} "
f"({item.get('profile')}, {item.get('commit')}): "
f"diff +{diff.get('added', 0)}/~{diff.get('changed', 0)}"
f"/-{diff.get('retired', 0)}/!{diff.get('conflicted', 0)}"
f", review artifacts {item.get('review_artifact_count', 0)}"
f", tombstones {item.get('tombstone_count', 0)}"
f"{age_text}"
)
counts = summary["counts"]
print(
f"summary: {counts['shown']} shown, {counts['total']} latest discovery snapshot(s), "
f"{counts['review_required']} review required"
)
return 0
def _scan_manifest_repo(
args: argparse.Namespace,
registry_url: str,
manifest_dir: Path,
manifest_path: Path,
item: object,
*,
llm_enabled: bool,
llm_skip_reason: str,
) -> dict[str, Any]:
if not isinstance(item, dict):
return {
"slug": "<invalid>",
"status": "error",
"scanned": False,
"ingested": False,
"accepted": False,
"error": "repository entry must be a mapping",
}
slug = str(item.get("slug") or "").strip()
if not slug:
return {
"slug": "<missing>",
"status": "error",
"scanned": False,
"ingested": False,
"accepted": False,
"error": "repository entry requires slug",
}
result: dict[str, Any] = {
"slug": slug,
"status": "pending",
"scanned": False,
"ingested": False,
"accepted": False,
"change_state": "unknown",
"llm": {
"enabled": llm_enabled,
"attempted": False,
"status": "pending" if llm_enabled else "skipped",
"reason": "" if llm_enabled else llm_skip_reason,
},
}
repo_path = _manifest_optional_path(item.get("path"), manifest_dir)
if repo_path is None:
result.update({"status": "error", "error": "no repo path configured"})
return result
result["path"] = str(repo_path)
if not repo_path.is_dir():
result.update({"status": "error", "error": f"repo path not found: {repo_path}"})
return result
commit = str(item.get("commit") or _git_value(repo_path, "rev-parse", "HEAD") or "working-tree")
connector_manifest = _scan_manifest_connector_manifest(args.connector_manifest, manifest_dir, manifest_path)
connectors = [
ConnectorConfig(
connector_id=connector_id,
connector_type="fabric_registry",
source_path=str(connector_manifest),
)
for connector_id in args.connector
]
try:
result["llm"]["attempted"] = llm_enabled
snapshot = scan_repo(
ScanOptions(
repo_path=repo_path,
repo_slug=slug,
repo_name=str(item.get("name") or repo_path.name),
domain=str(item.get("domain") or "") or None,
commit=commit,
profile=args.profile,
deterministic_only=not llm_enabled,
llm_enabled=llm_enabled,
llm_config=LLMExtractionConfig(
provider=args.llm_provider,
model=args.llm_model,
temperature=args.llm_temperature,
max_tokens=args.llm_max_tokens,
min_confidence=args.llm_min_confidence,
),
connectors=connectors,
)
)
previous, previous_metadata = _scan_manifest_previous_snapshot(
args,
registry_url,
manifest_path,
slug,
)
result["previous"] = previous_metadata
if previous is not None:
snapshot = reconcile_discovery_snapshots(previous, snapshot)
except Exception as exc:
result["status"] = "error"
result["error"] = f"scan failed: {exc}"
if llm_enabled:
result["llm"]["status"] = "failed"
result["llm"]["reason"] = "scan_failed"
return result
result.update(
{
"status": "scanned",
"scanned": True,
"commit": snapshot["source"]["commit"],
"candidate_counts": _candidate_counts(snapshot),
"diff_counts": _discovery_diff_counts(snapshot),
"review_artifact_counts": _review_artifact_counts(snapshot),
"connector_runs": _connector_run_summaries(snapshot),
"tombstone_count": len(snapshot.get("tombstones", [])),
"change_state": _scan_manifest_change_state(snapshot, result.get("previous")),
}
)
_set_llm_result(result, snapshot, llm_enabled, llm_skip_reason)
output_path = _scan_manifest_output_path(args, manifest_path, slug)
if output_path is not None:
try:
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(snapshot, indent=2, sort_keys=True) + "\n", encoding="utf-8")
result["output_path"] = str(output_path)
except Exception as exc:
result.update({"status": "error", "error": f"cannot write snapshot: {exc}"})
return result
if args.ingest and args.dry_run:
result["registry_action"] = "skipped_dry_run"
return result
if args.ingest:
if result["change_state"] == "unchanged" and not args.ingest_unchanged:
result.update({"status": "skipped_unchanged", "registry_action": "skipped_unchanged"})
return result
try:
repository = _registry_post_checked(
registry_url,
"/repositories",
{
"slug": slug,
"name": item.get("name") or repo_path.name,
"remote_url": item.get("remote_url") or _git_value(repo_path, "config", "--get", "remote.origin.url"),
"default_branch": item.get("default_branch") or "main",
"state_hub_repo_id": item.get("state_hub_repo_id"),
},
)
stored = _registry_post_checked(
registry_url,
f"/repositories/{slug}/discovery-snapshots",
snapshot,
)
result.update(
{
"status": "ingested",
"repository": {"slug": repository.get("slug")},
"ingested": True,
"discovery_snapshot_id": stored["id"],
}
)
if args.accept:
blockers = _scan_manifest_acceptance_blockers(snapshot, args)
if blockers:
result.update(
{
"status": "review_required",
"registry_action": "accept_blocked",
"acceptance_blockers": blockers,
}
)
return result
accept_payload: dict[str, object] = {
"commit": f"discovery:{snapshot['source']['commit']}",
}
if args.accepted_key:
accept_payload["accepted_keys"] = args.accepted_key
if args.accept_review_state is not None:
accept_payload["accept_review_states"] = args.accept_review_state
accepted = _registry_post_checked(
registry_url,
f"/repositories/{slug}/discovery-snapshots/{stored['id']}/accept",
accept_payload,
)
graph_snapshot = accepted["graph_snapshot"]
result.update(
{
"status": "accepted",
"accepted": True,
"accepted_graph_snapshot_id": graph_snapshot["id"],
"accepted_graph_commit": graph_snapshot["commit"],
}
)
except RegistryRequestError as exc:
result.update({"status": "error", "error": str(exc)})
return result
def _scan_manifest_counts(results: list[dict[str, Any]]) -> dict[str, int]:
return {
"total": len(results),
"scanned": sum(1 for item in results if item.get("scanned")),
"baseline": sum(1 for item in results if item.get("change_state") == "baseline"),
"unchanged": sum(1 for item in results if item.get("change_state") == "unchanged"),
"changed": sum(1 for item in results if _scan_manifest_has_diff(item)),
"retired": sum(_int_from_nested(item, "diff_counts", "retired") for item in results),
"conflicted": sum(_int_from_nested(item, "diff_counts", "conflicted") for item in results),
"review_required": sum(1 for item in results if _scan_manifest_needs_review(item)),
"llm_skipped": sum(1 for item in results if item.get("llm", {}).get("status") == "skipped"),
"llm_failed": sum(1 for item in results if item.get("llm", {}).get("status") == "failed"),
"ingested": sum(1 for item in results if item.get("ingested")),
"accepted": sum(1 for item in results if item.get("accepted")),
"errors": sum(1 for item in results if item.get("status") == "error"),
}
def _print_scan_manifest_summary(summary: dict[str, Any]) -> None:
for item in summary["repositories"]:
slug = item["slug"]
if item["status"] == "error":
scanned = " after scan" if item.get("scanned") else ""
print(f"error{scanned} {slug}: {item['error']}")
continue
counts = item.get("candidate_counts", {})
diff = item.get("diff_counts", {})
action = "accepted" if item.get("accepted") else "ingested" if item.get("ingested") else "scanned"
if item.get("registry_action") == "skipped_dry_run":
action = "dry-run scanned"
elif item.get("registry_action") == "skipped_unchanged":
action = "unchanged"
elif item.get("registry_action") == "accept_blocked":
action = "review required"
previous = item.get("previous") if isinstance(item.get("previous"), dict) else {}
previous_summary = f", previous {previous.get('source')}" if previous.get("source") else ""
print(
f"{action} {slug} ({item.get('commit', 'working-tree')}): "
f"{counts.get('nodes', 0)} node(s), {counts.get('edges', 0)} edge(s), "
f"{counts.get('attributes', 0)} attribute(s), "
f"diff +{diff.get('added', 0)}/~{diff.get('changed', 0)}"
f"/-{diff.get('retired', 0)}/!{diff.get('conflicted', 0)}"
f"{previous_summary}"
)
counts = summary["counts"]
print(
f"summary: {counts['total']} repo(s), {counts['scanned']} scanned, "
f"{counts['baseline']} baseline, {counts['unchanged']} unchanged, "
f"{counts['changed']} changed, {counts['retired']} retired, "
f"{counts['conflicted']} conflicted, {counts['review_required']} review required, "
f"{counts['llm_skipped']} LLM skipped, "
f"{counts['llm_failed']} LLM failed, {counts['accepted']} accepted, "
f"{counts['errors']} error(s)"
)
def _manifest_repo_slug(item: object) -> str:
if not isinstance(item, dict):
return "<invalid>"
return str(item.get("slug") or "").strip()
def _scan_manifest_connector_manifest(
connector_manifest: Path | None,
manifest_dir: Path,
manifest_path: Path,
) -> Path:
if connector_manifest is None:
return manifest_path
path = connector_manifest.expanduser()
return path.resolve() if path.is_absolute() else (manifest_dir / path).resolve()
def _scan_manifest_cache_config(args: argparse.Namespace, manifest_path: Path) -> dict[str, object]:
cache_dir = _scan_manifest_cache_dir(args, manifest_path)
if cache_dir is None:
return {"enabled": False}
return {
"enabled": True,
"cache_dir": str(cache_dir),
"snapshot_dir": str(cache_dir / DEFAULT_SNAPSHOT_DIR),
"report_dir": str(cache_dir / DEFAULT_REPORT_DIR),
}
def _scan_manifest_cache_dir(args: argparse.Namespace, manifest_path: Path) -> Path | None:
if args.no_cache:
return None
cache_dir = args.cache_dir or Path(DEFAULT_DISCOVERY_DIR)
if cache_dir.is_absolute():
return cache_dir
return (manifest_path.parent / cache_dir).resolve()
def _scan_manifest_lock_path(args: argparse.Namespace, manifest_path: Path) -> Path | None:
if args.no_lock:
return None
if args.lock_file:
path = args.lock_file.expanduser()
return path.resolve() if path.is_absolute() else (manifest_path.parent / path).resolve()
cache_dir = _scan_manifest_cache_dir(args, manifest_path)
if cache_dir is None:
return None
return cache_dir / "rescan.lock"
def _scan_manifest_acquire_lock(lock_path: Path) -> int:
lock_path.parent.mkdir(parents=True, exist_ok=True)
fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
os.write(fd, f"pid={os.getpid()}\ncreated_at={_utc_now()}\n".encode("utf-8"))
return fd
def _scan_manifest_release_lock(lock_fd: int | None, lock_path: Path | None) -> None:
if lock_fd is not None:
os.close(lock_fd)
if lock_path is not None:
try:
lock_path.unlink()
except FileNotFoundError:
pass
def _scan_manifest_output_path(args: argparse.Namespace, manifest_path: Path, slug: str) -> Path | None:
if args.output_dir:
return _manifest_discovery_snapshot_path(args.output_dir, slug, args.profile)
cache_dir = _scan_manifest_cache_dir(args, manifest_path)
if cache_dir is None:
return None
return _manifest_discovery_snapshot_path(cache_dir / DEFAULT_SNAPSHOT_DIR, slug, args.profile)
def _scan_manifest_report_path(args: argparse.Namespace, manifest_path: Path, generated_at: str) -> Path | None:
if args.report_output:
path = args.report_output.expanduser()
return path.resolve() if path.is_absolute() else (manifest_path.parent / path).resolve()
cache_dir = _scan_manifest_cache_dir(args, manifest_path)
if cache_dir is None:
return None
timestamp = _slugify(generated_at.replace(":", "").replace("+", ""))
return cache_dir / DEFAULT_REPORT_DIR / f"{timestamp}-{_slugify(args.profile)}.rescan-report.json"
def _scan_manifest_previous_snapshot(
args: argparse.Namespace,
registry_url: str,
manifest_path: Path,
slug: str,
) -> tuple[dict[str, Any] | None, dict[str, Any]]:
if args.previous_source == "none":
return None, {"source": "none", "found": False}
if args.previous_source == "registry":
try:
previous = _registry_get_checked(
registry_url,
f"/repositories/{quote(slug)}/discovery-snapshots/latest?profile={quote(args.profile)}",
)
except RegistryRequestError as exc:
if exc.status_code == 404:
return None, {"source": "registry", "found": False}
raise
snapshot = previous.get("snapshot") if isinstance(previous.get("snapshot"), dict) else None
if snapshot is None:
raise ValueError(f"registry latest discovery snapshot for {slug} has no snapshot object")
return snapshot, {
"source": "registry",
"found": True,
"discovery_snapshot_id": previous.get("id"),
"commit": previous.get("commit"),
"profile": previous.get("profile"),
}
previous_dir = args.previous_dir
if previous_dir is None and not args.no_cache:
cache_dir = _scan_manifest_cache_dir(args, manifest_path)
previous_dir = cache_dir / DEFAULT_SNAPSHOT_DIR if cache_dir is not None else None
if previous_dir is None:
return None, {"source": "dir", "found": False}
previous_path = _manifest_discovery_snapshot_path(previous_dir, slug, args.profile)
if not previous_path.is_file():
return None, {"source": "dir", "found": False, "path": str(previous_path)}
previous = json.loads(previous_path.read_text(encoding="utf-8"))
if not isinstance(previous, dict):
raise ValueError(f"previous snapshot must be a JSON object: {previous_path}")
return previous, {"source": "dir", "found": True, "path": str(previous_path)}
def _scan_manifest_change_state(snapshot: dict[str, Any], previous_metadata: object) -> str:
previous = previous_metadata if isinstance(previous_metadata, dict) else {}
if not previous.get("found"):
return "baseline"
return "changed" if any(_discovery_diff_counts(snapshot).values()) else "unchanged"
def _scan_manifest_acceptance_blockers(snapshot: dict[str, Any], args: argparse.Namespace) -> list[str]:
if args.accept_policy == "explicit":
return []
blockers: list[str] = []
diff = _discovery_diff_counts(snapshot)
if diff["conflicted"]:
blockers.append(f"{diff['conflicted']} conflicted candidate(s)")
tombstone_count = len(snapshot.get("tombstones", []))
if tombstone_count:
blockers.append(f"{tombstone_count} tombstone(s)")
review_artifacts = _review_artifact_counts(snapshot)
low_confidence = review_artifacts.get("llm_low_confidence", 0)
if low_confidence:
blockers.append(f"{low_confidence} low-confidence LLM artifact(s)")
if args.accept_review_state is not None and set(args.accept_review_state) != {"accepted"}:
blockers.append("non-accepted review states requested")
return blockers
def _scan_manifest_needs_review(item: dict[str, Any]) -> bool:
if item.get("status") == "review_required":
return True
if _int_from_nested(item, "diff_counts", "conflicted"):
return True
if int(item.get("tombstone_count") or 0):
return True
review_artifacts = item.get("review_artifact_counts")
return isinstance(review_artifacts, dict) and any(int(value or 0) for value in review_artifacts.values())
def _scan_manifest_exit_code(summary: dict[str, Any], args: argparse.Namespace) -> int:
counts = summary.get("counts") if isinstance(summary.get("counts"), dict) else {}
errors = int(counts.get("errors", 0) or 0)
if args.strict and errors:
return 1
if args.exit_code_mode != "operational":
return 0
if errors:
return 4
if int(counts.get("review_required", 0) or 0):
return 3
if int(counts.get("changed", 0) or 0) or int(counts.get("baseline", 0) or 0):
return 2
return 0
def _manifest_discovery_snapshot_path(base_dir: Path, slug: str, profile: str) -> Path:
slug_part = _slugify(slug)
raw_slug_part = slug.strip().lower()
if slug_part != raw_slug_part:
fingerprint = hashlib.sha256(slug.encode("utf-8")).hexdigest()[:8]
slug_part = f"{slug_part}-{fingerprint}"
return base_dir.resolve() / f"{slug_part}-{_slugify(profile)}.discovery.json"
def _candidate_counts(snapshot: dict[str, Any]) -> dict[str, int]:
candidates = snapshot.get("candidates") if isinstance(snapshot.get("candidates"), dict) else {}
return {
"nodes": len(candidates.get("nodes", [])),
"edges": len(candidates.get("edges", [])),
"attributes": len(candidates.get("attributes", [])),
}
def _discovery_diff_counts(snapshot: dict[str, Any]) -> dict[str, int]:
reconciliation = snapshot.get("reconciliation") if isinstance(snapshot.get("reconciliation"), dict) else {}
diff = reconciliation.get("diff") if isinstance(reconciliation.get("diff"), dict) else {}
return {
"added": len(diff.get("added", [])),
"changed": len(diff.get("changed", [])),
"retired": len(diff.get("retired", [])),
"conflicted": len(diff.get("conflicted", [])),
}
def _review_artifact_counts(snapshot: dict[str, Any]) -> dict[str, int]:
counts: dict[str, int] = {}
for artifact in snapshot.get("review_artifacts", []):
if not isinstance(artifact, dict):
continue
artifact_type = str(artifact.get("artifact_type") or "unknown")
counts[artifact_type] = counts.get(artifact_type, 0) + 1
return counts
def _connector_run_summaries(snapshot: dict[str, Any]) -> list[dict[str, Any]]:
summaries: list[dict[str, Any]] = []
for run in snapshot.get("connector_runs", []):
if not isinstance(run, dict):
continue
summary = {
"connector_id": run.get("connector_id"),
"status": run.get("status"),
"candidate_counts": run.get("candidate_counts", {}),
}
if run.get("message"):
summary["message"] = run.get("message")
summaries.append(summary)
return summaries
def _set_llm_result(
result: dict[str, Any],
snapshot: dict[str, Any],
llm_enabled: bool,
llm_skip_reason: str,
) -> None:
if not llm_enabled:
result["llm"] = {
"enabled": False,
"attempted": False,
"status": "skipped",
"reason": llm_skip_reason,
}
return
artifact_counts = _review_artifact_counts(snapshot)
failure_count = sum(
count for artifact_type, count in artifact_counts.items()
if artifact_type in {"llm_execution_error", "llm_output_invalid"}
)
result["llm"] = {
"enabled": True,
"attempted": True,
"status": "failed" if failure_count else "used",
"failure_artifacts": failure_count,
}
def _scan_manifest_has_diff(item: dict[str, Any]) -> bool:
diff = item.get("diff_counts") if isinstance(item.get("diff_counts"), dict) else {}
return any(int(diff.get(key, 0)) for key in ("added", "changed", "retired", "conflicted"))
def _int_from_nested(item: dict[str, Any], object_key: str, value_key: str) -> int:
nested = item.get(object_key) if isinstance(item.get(object_key), dict) else {}
try:
return int(nested.get(value_key, 0))
except (TypeError, ValueError):
return 0
def _sync_manifest_repo(registry_url: str, manifest_dir: Path, item: object) -> dict[str, object]:
if not isinstance(item, dict):
return {"slug": "<invalid>", "status": "error", "error": "repository entry must be a mapping"}
slug = str(item.get("slug") or "").strip()
if not slug:
return {"slug": "<missing>", "status": "error", "error": "repository entry requires slug"}
repo_path = _manifest_optional_path(item.get("path"), manifest_dir)
result: dict[str, object] = {"slug": slug, "status": "registered", "warnings": []}
try:
repository = _registry_post_checked(
registry_url,
"/repositories",
{
"slug": slug,
"name": item.get("name") or (repo_path.name if repo_path else slug),
"remote_url": item.get("remote_url") or _git_value(repo_path, "config", "--get", "remote.origin.url"),
"default_branch": item.get("default_branch") or "main",
"state_hub_repo_id": item.get("state_hub_repo_id"),
},
)
result["repository"] = repository
except RegistryRequestError as exc:
return {"slug": slug, "status": "error", "error": str(exc), "warnings": []}
if repo_path is None:
result["warnings"].append("no repo path configured")
return result
if not repo_path.is_dir():
result["warnings"].append(f"repo path not found: {repo_path}")
return result
graph_paths = _manifest_paths(item.get("declaration_paths"), manifest_dir) or [repo_path]
if not any(declaration_files(path) for path in graph_paths):
result["warnings"].append("no Fabric declarations found")
try:
_ingest_manifest_sboms(registry_url, manifest_dir, slug, item, result)
except RegistryRequestError as exc:
return {"slug": slug, "status": "error", "error": str(exc), "warnings": result["warnings"]}
return result
report = validate_roots(graph_paths)
if report.errors:
return {
"slug": slug,
"status": "error",
"error": report.summary(),
"warnings": [diagnostic.format() for diagnostic in report.diagnostics],
}
graph = build_graph(graph_paths)
if graph.load_errors:
return {
"slug": slug,
"status": "error",
"error": "; ".join(f"{path}: {message}" for path, message in graph.load_errors),
"warnings": [],
}
try:
snapshot = _registry_post_checked(
registry_url,
f"/repositories/{slug}/snapshots",
{
"commit": item.get("commit") or _git_value(repo_path, "rev-parse", "HEAD") or "working-tree",
"generated_at": _utc_now(),
"graph": graph.to_export(),
},
)
result.update(
{
"status": "synced",
"snapshot_id": snapshot["id"],
"commit": snapshot["commit"],
}
)
_ingest_manifest_sboms(registry_url, manifest_dir, slug, item, result)
except RegistryRequestError as exc:
return {"slug": slug, "status": "error", "error": str(exc), "warnings": result["warnings"]}
return result
def _ingest_manifest_sboms(
registry_url: str,
manifest_dir: Path,
slug: str,
item: dict[str, object],
result: dict[str, object],
) -> None:
sbom_paths = _manifest_paths(item.get("sboms"), manifest_dir)
single_sbom = _manifest_optional_path(item.get("sbom"), manifest_dir)
if single_sbom:
sbom_paths.insert(0, single_sbom)
ingested: list[dict[str, object]] = []
for sbom_path in sbom_paths:
if not sbom_path.is_file():
result.setdefault("warnings", []).append(f"SBOM not found: {sbom_path}")
continue
payload = load_yaml(sbom_path)
if not isinstance(payload, dict):
result.setdefault("warnings", []).append(f"SBOM must be an object: {sbom_path}")
continue
ingested.append(
_registry_post_checked(
registry_url,
f"/repositories/{slug}/libraries/cyclonedx",
payload,
)
)
if ingested:
result["libraries"] = ingested
def _registry_ingest_cyclonedx(args: argparse.Namespace) -> int:
payload = load_yaml(args.sbom)
if not isinstance(payload, dict):
print(f"ERROR {args.sbom}: CycloneDX SBOM must be a mapping/object", file=sys.stderr)
return 1
result = _registry_post(
args.registry_url,
f"/repositories/{args.repo_slug}/libraries/cyclonedx",
payload,
)
if args.json:
print(json.dumps(result, indent=2, sort_keys=True))
else:
print(f"ingested {result['component_count']} library component(s) for {result['repo_slug']}")
return 0
def _registry_ingest_discovery(args: argparse.Namespace) -> int:
payload = json.loads(args.snapshot.read_text(encoding="utf-8"))
if not isinstance(payload, dict):
print(f"ERROR {args.snapshot}: discovery snapshot must be a JSON object", file=sys.stderr)
return 1
source = payload.get("source") if isinstance(payload.get("source"), dict) else {}
repo_slug = args.repo_slug or str(source.get("repo_slug") or "").strip()
if not repo_slug:
print("ERROR discovery snapshot source.repo_slug is required unless --repo-slug is provided", file=sys.stderr)
return 1
result = _registry_post(
args.registry_url,
f"/repositories/{repo_slug}/discovery-snapshots",
payload,
)
if args.json:
print(json.dumps(result, indent=2, sort_keys=True))
else:
candidates = result.get("snapshot", {}).get("candidates", {})
counts = {
"nodes": len(candidates.get("nodes", [])),
"edges": len(candidates.get("edges", [])),
"attributes": len(candidates.get("attributes", [])),
}
print(
f"ingested discovery snapshot {result['id']} for {result['repo_slug']} "
f"({result['profile']}, {result['commit']}): "
f"{counts['nodes']} node candidate(s), {counts['edges']} edge candidate(s), "
f"{counts['attributes']} attribute candidate(s)"
)
return 0
def _registry_accept_discovery(args: argparse.Namespace) -> int:
payload = {
"accepted_keys": args.accepted_key,
"commit": args.commit,
}
if args.accept_review_state is not None:
payload["accept_review_states"] = args.accept_review_state
result = _registry_post(
args.registry_url,
f"/repositories/{args.repo_slug}/discovery-snapshots/{args.discovery_snapshot_id}/accept",
payload,
)
if args.json:
print(json.dumps(result, indent=2, sort_keys=True))
else:
graph_snapshot = result["graph_snapshot"]
print(
f"accepted discovery snapshot {args.discovery_snapshot_id} for {args.repo_slug}; "
f"graph snapshot {graph_snapshot['id']} stored for {graph_snapshot['commit']}"
)
return 0
def _registry_export_reset_archive(args: argparse.Namespace) -> int:
try:
archive = _registry_get_checked(args.registry_url, "/exports/reset-archive")
archive_sha256 = _write_json_archive(args.output, archive, overwrite=args.overwrite)
except (RegistryRequestError, OSError) as exc:
print(f"ERROR {exc}", file=sys.stderr)
return 1
metadata = {
"archive": str(args.output),
"archive_sha256": archive_sha256,
"counts": archive.get("counts", {}),
}
if args.json:
print(json.dumps(metadata, indent=2, sort_keys=True))
else:
print(f"wrote reset archive {args.output}")
print(f"archive sha256 {archive_sha256}")
return 0
def _registry_reset_graph_data(args: argparse.Namespace) -> int:
if args.confirm != RESET_CONFIRMATION_TOKEN:
print(f"ERROR --confirm must equal {RESET_CONFIRMATION_TOKEN}", file=sys.stderr)
return 1
try:
archive = _registry_get_checked(args.registry_url, "/exports/reset-archive")
archive_sha256 = _write_json_archive(args.archive, archive, overwrite=args.overwrite_archive)
result = _registry_post_checked(
args.registry_url,
"/admin/reset-graph-data",
{
"confirm": args.confirm,
"reason": args.reason,
"archive_path": str(args.archive),
"archive_sha256": archive_sha256,
},
)
except (RegistryRequestError, OSError) as exc:
print(f"ERROR {exc}", file=sys.stderr)
return 1
if args.json:
print(json.dumps(result, indent=2, sort_keys=True))
else:
print(f"wrote reset archive {args.archive}")
print(f"archive sha256 {archive_sha256}")
print(f"reset event {result['id']} recorded")
print(f"dropped {json.dumps(result.get('dropped_counts', {}), sort_keys=True)}")
print(f"preserved {result.get('repositories_preserved', 0)} repository registration(s)")
return 0
def _scan_repo(args: argparse.Namespace) -> int:
snapshot = scan_repo(
ScanOptions(
repo_path=args.path,
repo_slug=args.repo_slug,
repo_name=args.repo_name,
domain=args.domain,
commit=args.commit,
profile=args.profile,
deterministic_only=not args.llm,
llm_enabled=args.llm,
llm_config=LLMExtractionConfig(
provider=args.llm_provider,
model=args.llm_model,
temperature=args.llm_temperature,
max_tokens=args.llm_max_tokens,
min_confidence=args.llm_min_confidence,
),
connectors=[
ConnectorConfig(
connector_id=connector_id,
connector_type="fabric_registry",
source_path=str(args.connector_manifest),
)
for connector_id in args.connector
],
)
)
if args.previous_snapshot:
try:
previous = json.loads(args.previous_snapshot.read_text(encoding="utf-8"))
except Exception as exc:
print(f"ERROR {args.previous_snapshot}: cannot read previous snapshot: {exc}", file=sys.stderr)
return 1
if not isinstance(previous, dict):
print(f"ERROR {args.previous_snapshot}: previous snapshot must be a JSON object", file=sys.stderr)
return 1
snapshot = reconcile_discovery_snapshots(previous, snapshot)
payload = json.dumps(snapshot, indent=2, sort_keys=True)
if args.output:
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(payload + "\n", encoding="utf-8")
if args.json:
print(payload)
return 0
candidates = snapshot["candidates"]
review_count = len(snapshot.get("review_artifacts", []))
review_summary = f", {review_count} review artifact(s)" if review_count else ""
connector_count = len(snapshot.get("connector_runs", []))
connector_summary = f", {connector_count} connector run(s)" if connector_count else ""
reconciliation = snapshot.get("reconciliation", {})
diff = reconciliation.get("diff") if isinstance(reconciliation, dict) else None
diff_summary = ""
if isinstance(diff, dict):
diff_summary = (
f", diff +{len(diff.get('added', []))}"
f"/~{len(diff.get('changed', []))}"
f"/-{len(diff.get('retired', []))}"
f"/!{len(diff.get('conflicted', []))}"
)
mode = "dry-run " if args.dry_run else ""
print(
f"{mode}scan {snapshot['source']['repo_slug']} "
f"({snapshot['source']['commit']}): "
f"{len(candidates['nodes'])} node(s), "
f"{len(candidates['edges'])} edge(s), "
f"{len(candidates['attributes'])} attribute(s), "
f"{len(snapshot['replacement_scopes'])} replacement scope(s)"
f"{review_summary}"
f"{connector_summary}"
f"{diff_summary}"
)
if args.output:
print(f"wrote {args.output}")
return 0
class RegistryRequestError(Exception):
def __init__(self, message: str, status_code: int | None = None) -> None:
super().__init__(message)
self.status_code = status_code
def _write_json_archive(path: Path, archive: dict[str, object], *, overwrite: bool) -> str:
if path.exists() and not overwrite:
raise OSError(f"archive already exists: {path} (use --overwrite or --overwrite-archive)")
path.parent.mkdir(parents=True, exist_ok=True)
data = json.dumps(archive, indent=2, sort_keys=True).encode("utf-8")
path.write_bytes(data)
return hashlib.sha256(data).hexdigest()
def _registry_post(registry_url: str, path: str, payload: dict[str, object]) -> dict[str, object]:
try:
return _registry_post_checked(registry_url, path, payload)
except RegistryRequestError as exc:
print(f"ERROR {exc}", file=sys.stderr)
raise SystemExit(1) from exc
def _registry_post_checked(registry_url: str, path: str, payload: dict[str, object]) -> dict[str, object]:
data = json.dumps({key: value for key, value in payload.items() if value is not None}).encode("utf-8")
request = urllib.request.Request(
registry_url.rstrip("/") + path,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(request, timeout=15) as response:
body = json.loads(response.read())
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise RegistryRequestError(f"registry request failed ({exc.code}): {detail}", status_code=exc.code) from exc
except urllib.error.URLError as exc:
raise RegistryRequestError(f"cannot reach registry at {registry_url}: {exc}") from exc
if not isinstance(body, dict):
raise RegistryRequestError("registry returned a non-object response")
return body
def _registry_get_checked(registry_url: str, path: str) -> dict[str, object]:
request = urllib.request.Request(registry_url.rstrip("/") + path, method="GET")
try:
with urllib.request.urlopen(request, timeout=15) as response:
body = json.loads(response.read())
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise RegistryRequestError(f"registry request failed ({exc.code}): {detail}", status_code=exc.code) from exc
except urllib.error.URLError as exc:
raise RegistryRequestError(f"cannot reach registry at {registry_url}: {exc}") from exc
if not isinstance(body, dict):
raise RegistryRequestError("registry returned a non-object response")
return body
def _manifest_optional_path(value: object, manifest_dir: Path) -> Path | None:
if not isinstance(value, str) or not value.strip():
return None
path = Path(value).expanduser()
return path if path.is_absolute() else (manifest_dir / path).resolve()
def _manifest_paths(value: object, manifest_dir: Path) -> list[Path]:
if value is None:
return []
if isinstance(value, str):
values = [value]
elif isinstance(value, list):
values = value
else:
return []
paths: list[Path] = []
for item in values:
path = _manifest_optional_path(item, manifest_dir)
if path is not None:
paths.append(path)
return paths
def _primary_repo_path(paths: list[Path]) -> Path:
if not paths:
return Path(".").resolve()
path = paths[0].resolve()
return path.parent if path.is_file() else path
def _slugify(value: str) -> str:
return re.sub(r"-+", "-", re.sub(r"[^a-z0-9]", "-", value.lower())).strip("-") or "repo"
def _git_value(repo_path: Path | None, *args: str) -> str | None:
if repo_path is None:
return None
try:
result = subprocess.run(
["git", *args],
cwd=repo_path,
check=False,
capture_output=True,
text=True,
timeout=5,
)
except (OSError, subprocess.TimeoutExpired):
return None
if result.returncode != 0:
return None
value = result.stdout.strip()
return value or None
def _utc_now() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def _age_hours(value: object, now: datetime) -> float | None:
if not isinstance(value, str) or not value:
return None
try:
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
return None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return max(0.0, (now - parsed).total_seconds() / 3600)
def _load_graph_or_exit(paths: list[Path]) -> FabricGraph:
graph = build_graph(paths)
if graph.load_errors:
for path, message in graph.load_errors:
print(f"ERROR {path}: {message}", file=sys.stderr)
raise SystemExit(1)
return graph
def _print_providers(graph: FabricGraph, capability: str) -> None:
providers = graph.providers(capability)
if not providers:
print(f"no providers found for {capability}")
return
print("provider_id\tservice_id\tlifecycle\tenvironments\tinterfaces")
for provider in providers:
spec = provider.spec
print(
"\t".join(
[
provider.id,
str(spec.get("service_id", "")),
str(spec.get("lifecycle", "")),
",".join(spec.get("environments", [])),
",".join(spec.get("interface_ids", [])),
]
)
)
def _print_consumers(
graph: FabricGraph,
target: str,
matches: object | None = None,
) -> None:
consumer_matches = graph.consumers(target) if matches is None else list(matches)
if not consumer_matches:
print(f"no consumers found for {target}")
return
print("consumer_service_id\tdependency_id\trequires\tprovider_capability_id\tprovider_interface_id\tstatus")
for match in consumer_matches:
print(
"\t".join(
[
match.consumer_service_id,
match.dependency_id,
match.required_capability_type,
match.provider_capability_id,
match.provider_interface_id,
match.status,
]
)
)
def _print_unresolved(graph: FabricGraph) -> None:
unresolved = graph.unresolved_dependencies()
if not unresolved:
print("no unresolved dependencies")
return
print("dependency_id\tconsumer_service_id\trequires")
for dependency in unresolved:
spec = dependency.spec
requires = spec.get("requires", {})
print(
"\t".join(
[
dependency.id,
str(spec.get("consumer_service_id", "")),
str(requires.get("capability_id") or requires.get("capability_type", "")),
]
)
)
if __name__ == "__main__":
sys.exit(main())