from __future__ import annotations import argparse import hashlib import json import os import re import subprocess import sys import urllib.error import urllib.request from datetime import datetime, timezone from pathlib import Path from typing import Any from urllib.parse import quote from .connectors import ConnectorConfig from .loader import declaration_files, load_yaml from .graph import FabricGraph, build_graph from .graph_explorer import fabric_graph_explorer_payload from .llm_extraction import LLMExtractionConfig from .reconciliation import reconcile_discovery_snapshots from .scanner import EXTRACTOR_VERSION, ScanOptions, scan_repo from .validation import validate_roots DEFAULT_DISCOVERY_DIR = ".fabric-discovery" DEFAULT_SNAPSHOT_DIR = "snapshots" DEFAULT_REPORT_DIR = "reports" def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="railiance-fabric", description="Load and validate Railiance Fabric declarations.", ) sub = parser.add_subparsers(dest="command", required=True) validate = sub.add_parser( "validate", help="Validate one or more repo roots or declaration files.", ) validate.add_argument( "paths", nargs="+", type=Path, help="Repo root, fabric directory, or declaration YAML file.", ) validate.add_argument( "--warnings-as-errors", action="store_true", help="Exit non-zero when warnings are present.", ) providers = sub.add_parser("providers", help="List providers for a capability type or id.") providers.add_argument("capability", help="Capability type or capability id.") providers.add_argument("paths", nargs="*", type=Path, default=[Path(".")]) consumers = sub.add_parser("consumers", help="List consumers of a capability or interface.") consumers.add_argument("target", help="Capability/interface type or declaration id.") consumers.add_argument("paths", nargs="*", type=Path, default=[Path(".")]) dependency_path = sub.add_parser("dependency-path", help="Show dependency path for a service.") dependency_path.add_argument("service_id", help="Service declaration id.") dependency_path.add_argument("paths", nargs="*", type=Path, default=[Path(".")]) unresolved = sub.add_parser("unresolved", help="Show missing or unresolved dependencies.") unresolved.add_argument("paths", nargs="*", type=Path, default=[Path(".")]) blast = sub.add_parser("blast-radius", help="Show consumers affected by an interface change.") blast.add_argument("interface", help="Interface type or interface declaration id.") blast.add_argument("paths", nargs="*", type=Path, default=[Path(".")]) export = sub.add_parser("export", help="Export graph as JSON, Mermaid, or graph-explorer payload.") export.add_argument("paths", nargs="*", type=Path, default=[Path(".")]) export.add_argument("--format", choices=["json", "mermaid", "graph-explorer"], default="json") scan = sub.add_parser("scan", help="Scan a repo for deterministic discovery candidates.") scan.add_argument("path", nargs="?", type=Path, default=Path(".")) scan.add_argument("--repo-slug", default=None) scan.add_argument("--repo-name", default=None) scan.add_argument("--domain", default=None) scan.add_argument("--commit", default=None) scan.add_argument("--profile", default="deterministic") scan.add_argument("--dry-run", action="store_true", help="Do not write anywhere except an explicit --output file.") scan.add_argument("--output", type=Path, default=None, help="Write the discovery snapshot JSON to a file.") scan.add_argument("--previous-snapshot", type=Path, default=None, help="Reconcile against a previous discovery snapshot JSON.") scan.add_argument("--json", action="store_true", help="Print the discovery snapshot JSON to stdout.") scan.add_argument("--llm", action="store_true", help="Enable llm-connect assisted extraction.") scan.add_argument("--llm-provider", default="mock", help="llm-connect provider name.") scan.add_argument("--llm-model", default="mock", help="Model name passed to llm-connect.") scan.add_argument("--llm-temperature", type=float, default=0.0) scan.add_argument("--llm-max-tokens", type=int, default=1500) scan.add_argument("--llm-min-confidence", type=float, default=0.6) scan.add_argument( "--connector", action="append", choices=["local-fabric-registry"], default=[], help="Enable a discovery connector. May be passed more than once.", ) scan.add_argument( "--connector-manifest", type=Path, default=Path("registry/local-repos.yaml"), help="Manifest path for the local-fabric-registry connector.", ) registry = sub.add_parser("registry", help="Feed a running Railiance Fabric registry service.") registry_sub = registry.add_subparsers(dest="registry_command", required=True) sync = registry_sub.add_parser("sync", help="Register a repo and ingest its current graph snapshot.") sync.add_argument("paths", nargs="*", type=Path, default=[Path(".")]) sync.add_argument("--registry-url", default="http://127.0.0.1:8765") sync.add_argument("--repo-slug", default=None) sync.add_argument("--name", default=None) sync.add_argument("--remote-url", default=None) sync.add_argument("--default-branch", default="main") sync.add_argument("--state-hub-repo-id", default=None) sync.add_argument("--commit", default=None) sync.add_argument("--json", action="store_true", help="Print the raw snapshot response.") sync_manifest = registry_sub.add_parser("sync-manifest", help="Register and sync repos from an onboarding manifest.") sync_manifest.add_argument("manifest", type=Path) sync_manifest.add_argument("--registry-url", default=None, help="Override the manifest registry_url.") sync_manifest.add_argument("--strict", action="store_true", help="Exit non-zero when any repo cannot be synced.") sync_manifest.add_argument("--json", action="store_true", help="Print the raw manifest sync summary.") scan_manifest = registry_sub.add_parser("scan-manifest", help="Run discovery scans from an onboarding manifest.") scan_manifest.add_argument("manifest", type=Path) scan_manifest.add_argument("--registry-url", default=None, help="Override the manifest registry_url.") scan_manifest.add_argument("--repo-slug", action="append", default=[], help="Only scan this repo slug. May be repeated.") scan_manifest.add_argument("--profile", default="deterministic", help="Discovery scan profile name.") scan_manifest.add_argument("--output-dir", type=Path, default=None, help="Write one discovery snapshot JSON per scanned repo.") scan_manifest.add_argument("--previous-dir", type=Path, default=None, help="Read previous per-repo snapshots for reconciliation.") scan_manifest.add_argument( "--cache-dir", type=Path, default=None, help="Operational cache directory. Defaults to /.fabric-discovery.", ) scan_manifest.add_argument( "--no-cache", action="store_true", help="Disable default cache snapshots and report output unless explicit paths are provided.", ) scan_manifest.add_argument( "--previous-source", choices=["dir", "registry", "none"], default="dir", help="Where to read previous discovery snapshots from before reconciling.", ) scan_manifest.add_argument( "--previous-from-registry", action="store_true", help="Shortcut for --previous-source registry.", ) scan_manifest.add_argument("--report-output", type=Path, default=None, help="Write an operational rescan report JSON.") scan_manifest.add_argument("--dry-run", action="store_true", help="Do not write to the registry.") scan_manifest.add_argument("--ingest", action="store_true", help="Store each discovery snapshot in the registry.") scan_manifest.add_argument( "--ingest-unchanged", action="store_true", help="Also ingest unchanged snapshots when --ingest is enabled.", ) scan_manifest.add_argument("--accept", action="store_true", help="Accept ingested snapshots after scanning.") scan_manifest.add_argument( "--accepted-key", action="append", default=[], help="Candidate stable key to include during acceptance. May be repeated.", ) scan_manifest.add_argument( "--accept-review-state", action="append", default=None, help="Review state accepted during projection. Defaults to accepted.", ) scan_manifest.add_argument( "--accept-policy", choices=["safe", "explicit"], default="safe", help="Safe blocks conflicts/tombstones and accepts only accepted review state; explicit allows provided overrides.", ) scan_manifest.add_argument("--strict", action="store_true", help="Exit non-zero when any repo cannot be scanned or stored.") scan_manifest.add_argument( "--exit-code-mode", choices=["default", "operational"], default="default", help="Use operational exit codes: 2 changes/baseline, 3 review required, 4 partial failure.", ) scan_manifest.add_argument("--lock-file", type=Path, default=None, help="Lock file path for automation-safe runs.") scan_manifest.add_argument("--no-lock", action="store_true", help="Disable the default cache-directory rescan lock.") scan_manifest.add_argument("--json", action="store_true", help="Print the raw manifest scan summary.") scan_manifest.add_argument("--llm", action="store_true", help="Enable llm-connect assisted extraction.") scan_manifest.add_argument("--deterministic-only", action="store_true", help="Force deterministic-only scans even when --llm is set.") scan_manifest.add_argument("--llm-provider", default="mock", help="llm-connect provider name.") scan_manifest.add_argument("--llm-model", default="mock", help="Model name passed to llm-connect.") scan_manifest.add_argument("--llm-temperature", type=float, default=0.0) scan_manifest.add_argument("--llm-max-tokens", type=int, default=1500) scan_manifest.add_argument("--llm-min-confidence", type=float, default=0.6) scan_manifest.add_argument("--llm-max-runs", type=int, default=None, help="Maximum repos allowed to run LLM extraction.") scan_manifest.add_argument( "--connector", action="append", choices=["local-fabric-registry"], default=[], help="Enable a discovery connector for each repo. May be passed more than once.", ) scan_manifest.add_argument( "--connector-manifest", type=Path, default=None, help="Manifest path for the local-fabric-registry connector. Defaults to the scanned manifest.", ) rescan_status = registry_sub.add_parser("rescan-status", help="Show discovery rescan freshness and review status.") rescan_status.add_argument("--registry-url", default="http://127.0.0.1:8765") rescan_status.add_argument("--review-only", action="store_true", help="Only show repos whose latest discovery needs review.") rescan_status.add_argument("--stale-after-hours", type=float, default=None, help="Only show discovery snapshots older than this age.") rescan_status.add_argument("--json", action="store_true", help="Print the raw rescan status summary.") cyclonedx = registry_sub.add_parser("ingest-cyclonedx", help="Ingest a CycloneDX SBOM as library inventory.") cyclonedx.add_argument("sbom", type=Path) cyclonedx.add_argument("--registry-url", default="http://127.0.0.1:8765") cyclonedx.add_argument("--repo-slug", required=True) cyclonedx.add_argument("--json", action="store_true", help="Print the raw ingest response.") discovery = registry_sub.add_parser("ingest-discovery", help="Store a discovery snapshot for review.") discovery.add_argument("snapshot", type=Path) discovery.add_argument("--registry-url", default="http://127.0.0.1:8765") discovery.add_argument("--repo-slug", default=None) discovery.add_argument("--json", action="store_true", help="Print the raw ingest response.") accept_discovery = registry_sub.add_parser("accept-discovery", help="Project accepted discovery candidates into a graph snapshot.") accept_discovery.add_argument("repo_slug") accept_discovery.add_argument("discovery_snapshot_id", type=int) accept_discovery.add_argument("--registry-url", default="http://127.0.0.1:8765") accept_discovery.add_argument("--accepted-key", action="append", default=[]) accept_discovery.add_argument("--accept-review-state", action="append", default=None) accept_discovery.add_argument("--commit", default=None) accept_discovery.add_argument("--json", action="store_true", help="Print the raw accept response.") return parser def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) if args.command == "validate": report = validate_roots(args.paths) for diagnostic in report.diagnostics: print(diagnostic.format()) print(report.summary()) if report.errors: return 1 if args.warnings_as_errors and report.warnings: return 1 return 0 if args.command == "providers": graph = _load_graph_or_exit(args.paths) _print_providers(graph, args.capability) return 0 if args.command == "consumers": graph = _load_graph_or_exit(args.paths) _print_consumers(graph, args.target) return 0 if args.command == "dependency-path": graph = _load_graph_or_exit(args.paths) print("\n".join(graph.dependency_path_lines(args.service_id))) return 0 if args.command == "unresolved": graph = _load_graph_or_exit(args.paths) _print_unresolved(graph) return 0 if args.command == "blast-radius": graph = _load_graph_or_exit(args.paths) _print_consumers(graph, args.interface, matches=graph.blast_radius(args.interface)) return 0 if args.command == "export": graph = _load_graph_or_exit(args.paths) if args.format == "mermaid": print(graph.to_mermaid()) elif args.format == "graph-explorer": print(json.dumps(fabric_graph_explorer_payload(graph.to_export()), indent=2, sort_keys=True)) else: print(graph.to_json()) return 0 if args.command == "scan": return _scan_repo(args) if args.command == "registry": if args.registry_command == "sync": return _registry_sync(args) if args.registry_command == "sync-manifest": return _registry_sync_manifest(args) if args.registry_command == "scan-manifest": return _registry_scan_manifest(args) if args.registry_command == "rescan-status": return _registry_rescan_status(args) if args.registry_command == "ingest-cyclonedx": return _registry_ingest_cyclonedx(args) if args.registry_command == "ingest-discovery": return _registry_ingest_discovery(args) if args.registry_command == "accept-discovery": return _registry_accept_discovery(args) parser.error(f"unknown command {args.command!r}") return 2 def _registry_sync(args: argparse.Namespace) -> int: report = validate_roots(args.paths) for diagnostic in report.diagnostics: print(diagnostic.format(), file=sys.stderr) if report.errors: print(report.summary(), file=sys.stderr) return 1 graph = _load_graph_or_exit(args.paths) repo_path = _primary_repo_path(args.paths) repo_slug = args.repo_slug or _slugify(repo_path.name) repository = _registry_post( args.registry_url, "/repositories", { "slug": repo_slug, "name": args.name or repo_path.name, "remote_url": args.remote_url or _git_value(repo_path, "config", "--get", "remote.origin.url"), "default_branch": args.default_branch, "state_hub_repo_id": args.state_hub_repo_id, }, ) snapshot = _registry_post( args.registry_url, f"/repositories/{repo_slug}/snapshots", { "commit": args.commit or _git_value(repo_path, "rev-parse", "HEAD") or "working-tree", "generated_at": _utc_now(), "graph": graph.to_export(), }, ) if args.json: print(json.dumps({"repository": repository, "snapshot": snapshot}, indent=2, sort_keys=True)) else: print(f"registered {repository['slug']}") print(f"snapshot {snapshot['id']} accepted for {snapshot['commit']}") return 0 def _registry_sync_manifest(args: argparse.Namespace) -> int: manifest_path = args.manifest.resolve() manifest = load_yaml(manifest_path) if not isinstance(manifest, dict): print(f"ERROR {manifest_path}: manifest must be a YAML mapping", file=sys.stderr) return 1 repositories = manifest.get("repositories") if not isinstance(repositories, list): print(f"ERROR {manifest_path}: manifest requires a repositories list", file=sys.stderr) return 1 registry_url = args.registry_url or str(manifest.get("registry_url") or "http://127.0.0.1:8765") results = [ _sync_manifest_repo(registry_url, manifest_path.parent, item) for item in repositories ] summary = { "manifest": str(manifest_path), "registry_url": registry_url, "repositories": results, "counts": { "total": len(results), "synced": sum(1 for item in results if item["status"] == "synced"), "registered": sum(1 for item in results if item["status"] == "registered"), "errors": sum(1 for item in results if item["status"] == "error"), }, } if args.json: print(json.dumps(summary, indent=2, sort_keys=True)) else: for item in results: if item["status"] == "synced": print(f"synced {item['slug']} snapshot {item['snapshot_id']} ({item['commit']})") elif item["status"] == "registered": print(f"registered {item['slug']} ({'; '.join(item['warnings'])})") else: print(f"error {item['slug']}: {item['error']}") counts = summary["counts"] print( f"summary: {counts['total']} repo(s), {counts['synced']} synced, " f"{counts['registered']} registered only, {counts['errors']} error(s)" ) if args.strict and (summary["counts"]["errors"] or summary["counts"]["registered"]): return 1 return 0 def _registry_scan_manifest(args: argparse.Namespace) -> int: if args.llm_max_runs is not None and args.llm_max_runs < 0: print("ERROR --llm-max-runs must be zero or greater", file=sys.stderr) return 1 if args.accept and not args.ingest and not args.dry_run: print("ERROR --accept requires --ingest unless --dry-run is set", file=sys.stderr) return 1 if args.accept_policy == "safe": if args.accept_review_state and set(args.accept_review_state) != {"accepted"}: print("ERROR --accept-review-state requires --accept-policy explicit unless it is only 'accepted'", file=sys.stderr) return 1 if args.accepted_key: print("ERROR --accepted-key requires --accept-policy explicit", file=sys.stderr) return 1 if args.previous_from_registry: args.previous_source = "registry" manifest_path = args.manifest.resolve() manifest = load_yaml(manifest_path) if not isinstance(manifest, dict): print(f"ERROR {manifest_path}: manifest must be a YAML mapping", file=sys.stderr) return 1 repositories = manifest.get("repositories") if not isinstance(repositories, list): print(f"ERROR {manifest_path}: manifest requires a repositories list", file=sys.stderr) return 1 lock_path = _scan_manifest_lock_path(args, manifest_path) lock_fd: int | None = None if lock_path is not None: try: lock_fd = _scan_manifest_acquire_lock(lock_path) except FileExistsError: print(f"ERROR rescan lock already exists: {lock_path}", file=sys.stderr) return 5 if args.exit_code_mode == "operational" else 1 registry_url = args.registry_url or str(manifest.get("registry_url") or "http://127.0.0.1:8765") allowlist = {slug for slug in args.repo_slug if slug} selected = [ item for item in repositories if not allowlist or _manifest_repo_slug(item) in allowlist ] llm_runs_used = 0 results: list[dict[str, Any]] = [] for item in selected: llm_enabled = False llm_skip_reason = "deterministic_only" if args.llm and not args.deterministic_only: if args.llm_max_runs is None or llm_runs_used < args.llm_max_runs: llm_enabled = True llm_skip_reason = "" else: llm_skip_reason = "budget_exhausted" result = _scan_manifest_repo( args, registry_url, manifest_path.parent, manifest_path, item, llm_enabled=llm_enabled, llm_skip_reason=llm_skip_reason, ) if result.get("llm", {}).get("attempted"): llm_runs_used += 1 results.append(result) summary = { "apiVersion": "railiance.fabric/v1alpha1", "kind": "FabricDiscoveryRescanReport", "manifest": str(manifest_path), "registry_url": registry_url, "generated_at": _utc_now(), "profile": args.profile, "dry_run": args.dry_run, "ingest": args.ingest and not args.dry_run, "accept": args.accept and args.ingest and not args.dry_run, "allowlist": sorted(allowlist), "cache": _scan_manifest_cache_config(args, manifest_path), "scanner": {"extractor_version": EXTRACTOR_VERSION}, "previous_source": args.previous_source, "accept_policy": args.accept_policy, "llm": { "requested": bool(args.llm), "deterministic_only": bool(args.deterministic_only or not args.llm), "max_runs": args.llm_max_runs, "used_runs": llm_runs_used, }, "repositories": results, "counts": _scan_manifest_counts(results), } report_path = _scan_manifest_report_path(args, manifest_path, summary["generated_at"]) if report_path is not None: summary["report_path"] = str(report_path) try: report_path.parent.mkdir(parents=True, exist_ok=True) report_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n", encoding="utf-8") except Exception as exc: _scan_manifest_release_lock(lock_fd, lock_path) print(f"ERROR cannot write rescan report {report_path}: {exc}", file=sys.stderr) return 1 if args.json: print(json.dumps(summary, indent=2, sort_keys=True)) else: _print_scan_manifest_summary(summary) if summary.get("report_path"): print(f"report: {summary['report_path']}") exit_code = _scan_manifest_exit_code(summary, args) _scan_manifest_release_lock(lock_fd, lock_path) return exit_code def _registry_rescan_status(args: argparse.Namespace) -> int: status = _registry_get_checked(args.registry_url, "/status") snapshots = status.get("latest_discovery_snapshots", []) if not isinstance(snapshots, list): snapshots = [] now = datetime.now(timezone.utc) filtered: list[dict[str, Any]] = [] for item in snapshots: if not isinstance(item, dict): continue if args.review_only and not item.get("review_required"): continue age_hours = _age_hours(item.get("generated_at"), now) item = {**item, "age_hours": age_hours} if args.stale_after_hours is not None and (age_hours is None or age_hours <= args.stale_after_hours): continue filtered.append(item) summary = { "apiVersion": "railiance.fabric/v1alpha1", "kind": "FabricDiscoveryRescanStatus", "generated_at": _utc_now(), "registry_url": args.registry_url, "review_only": args.review_only, "stale_after_hours": args.stale_after_hours, "counts": { "total": len(snapshots), "shown": len(filtered), "review_required": sum( 1 for item in snapshots if isinstance(item, dict) and item.get("review_required") ), }, "repositories": filtered, } if args.json: print(json.dumps(summary, indent=2, sort_keys=True)) else: for item in filtered: age = item.get("age_hours") age_text = f", age {age:.1f}h" if isinstance(age, float) else "" diff = item.get("diff_counts", {}) if isinstance(item.get("diff_counts"), dict) else {} print( f"{item.get('health', 'unknown')} {item.get('repo_slug')} " f"({item.get('profile')}, {item.get('commit')}): " f"diff +{diff.get('added', 0)}/~{diff.get('changed', 0)}" f"/-{diff.get('retired', 0)}/!{diff.get('conflicted', 0)}" f", review artifacts {item.get('review_artifact_count', 0)}" f", tombstones {item.get('tombstone_count', 0)}" f"{age_text}" ) counts = summary["counts"] print( f"summary: {counts['shown']} shown, {counts['total']} latest discovery snapshot(s), " f"{counts['review_required']} review required" ) return 0 def _scan_manifest_repo( args: argparse.Namespace, registry_url: str, manifest_dir: Path, manifest_path: Path, item: object, *, llm_enabled: bool, llm_skip_reason: str, ) -> dict[str, Any]: if not isinstance(item, dict): return { "slug": "", "status": "error", "scanned": False, "ingested": False, "accepted": False, "error": "repository entry must be a mapping", } slug = str(item.get("slug") or "").strip() if not slug: return { "slug": "", "status": "error", "scanned": False, "ingested": False, "accepted": False, "error": "repository entry requires slug", } result: dict[str, Any] = { "slug": slug, "status": "pending", "scanned": False, "ingested": False, "accepted": False, "change_state": "unknown", "llm": { "enabled": llm_enabled, "attempted": False, "status": "pending" if llm_enabled else "skipped", "reason": "" if llm_enabled else llm_skip_reason, }, } repo_path = _manifest_optional_path(item.get("path"), manifest_dir) if repo_path is None: result.update({"status": "error", "error": "no repo path configured"}) return result result["path"] = str(repo_path) if not repo_path.is_dir(): result.update({"status": "error", "error": f"repo path not found: {repo_path}"}) return result commit = str(item.get("commit") or _git_value(repo_path, "rev-parse", "HEAD") or "working-tree") connector_manifest = _scan_manifest_connector_manifest(args.connector_manifest, manifest_dir, manifest_path) connectors = [ ConnectorConfig( connector_id=connector_id, connector_type="fabric_registry", source_path=str(connector_manifest), ) for connector_id in args.connector ] try: result["llm"]["attempted"] = llm_enabled snapshot = scan_repo( ScanOptions( repo_path=repo_path, repo_slug=slug, repo_name=str(item.get("name") or repo_path.name), domain=str(item.get("domain") or "") or None, commit=commit, profile=args.profile, deterministic_only=not llm_enabled, llm_enabled=llm_enabled, llm_config=LLMExtractionConfig( provider=args.llm_provider, model=args.llm_model, temperature=args.llm_temperature, max_tokens=args.llm_max_tokens, min_confidence=args.llm_min_confidence, ), connectors=connectors, ) ) previous, previous_metadata = _scan_manifest_previous_snapshot( args, registry_url, manifest_path, slug, ) result["previous"] = previous_metadata if previous is not None: snapshot = reconcile_discovery_snapshots(previous, snapshot) except Exception as exc: result["status"] = "error" result["error"] = f"scan failed: {exc}" if llm_enabled: result["llm"]["status"] = "failed" result["llm"]["reason"] = "scan_failed" return result result.update( { "status": "scanned", "scanned": True, "commit": snapshot["source"]["commit"], "candidate_counts": _candidate_counts(snapshot), "diff_counts": _discovery_diff_counts(snapshot), "review_artifact_counts": _review_artifact_counts(snapshot), "connector_runs": _connector_run_summaries(snapshot), "tombstone_count": len(snapshot.get("tombstones", [])), "change_state": _scan_manifest_change_state(snapshot, result.get("previous")), } ) _set_llm_result(result, snapshot, llm_enabled, llm_skip_reason) output_path = _scan_manifest_output_path(args, manifest_path, slug) if output_path is not None: try: output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(json.dumps(snapshot, indent=2, sort_keys=True) + "\n", encoding="utf-8") result["output_path"] = str(output_path) except Exception as exc: result.update({"status": "error", "error": f"cannot write snapshot: {exc}"}) return result if args.ingest and args.dry_run: result["registry_action"] = "skipped_dry_run" return result if args.ingest: if result["change_state"] == "unchanged" and not args.ingest_unchanged: result.update({"status": "skipped_unchanged", "registry_action": "skipped_unchanged"}) return result try: repository = _registry_post_checked( registry_url, "/repositories", { "slug": slug, "name": item.get("name") or repo_path.name, "remote_url": item.get("remote_url") or _git_value(repo_path, "config", "--get", "remote.origin.url"), "default_branch": item.get("default_branch") or "main", "state_hub_repo_id": item.get("state_hub_repo_id"), }, ) stored = _registry_post_checked( registry_url, f"/repositories/{slug}/discovery-snapshots", snapshot, ) result.update( { "status": "ingested", "repository": {"slug": repository.get("slug")}, "ingested": True, "discovery_snapshot_id": stored["id"], } ) if args.accept: blockers = _scan_manifest_acceptance_blockers(snapshot, args) if blockers: result.update( { "status": "review_required", "registry_action": "accept_blocked", "acceptance_blockers": blockers, } ) return result accept_payload: dict[str, object] = { "commit": f"discovery:{snapshot['source']['commit']}", } if args.accepted_key: accept_payload["accepted_keys"] = args.accepted_key if args.accept_review_state is not None: accept_payload["accept_review_states"] = args.accept_review_state accepted = _registry_post_checked( registry_url, f"/repositories/{slug}/discovery-snapshots/{stored['id']}/accept", accept_payload, ) graph_snapshot = accepted["graph_snapshot"] result.update( { "status": "accepted", "accepted": True, "accepted_graph_snapshot_id": graph_snapshot["id"], "accepted_graph_commit": graph_snapshot["commit"], } ) except RegistryRequestError as exc: result.update({"status": "error", "error": str(exc)}) return result def _scan_manifest_counts(results: list[dict[str, Any]]) -> dict[str, int]: return { "total": len(results), "scanned": sum(1 for item in results if item.get("scanned")), "baseline": sum(1 for item in results if item.get("change_state") == "baseline"), "unchanged": sum(1 for item in results if item.get("change_state") == "unchanged"), "changed": sum(1 for item in results if _scan_manifest_has_diff(item)), "retired": sum(_int_from_nested(item, "diff_counts", "retired") for item in results), "conflicted": sum(_int_from_nested(item, "diff_counts", "conflicted") for item in results), "review_required": sum(1 for item in results if _scan_manifest_needs_review(item)), "llm_skipped": sum(1 for item in results if item.get("llm", {}).get("status") == "skipped"), "llm_failed": sum(1 for item in results if item.get("llm", {}).get("status") == "failed"), "ingested": sum(1 for item in results if item.get("ingested")), "accepted": sum(1 for item in results if item.get("accepted")), "errors": sum(1 for item in results if item.get("status") == "error"), } def _print_scan_manifest_summary(summary: dict[str, Any]) -> None: for item in summary["repositories"]: slug = item["slug"] if item["status"] == "error": scanned = " after scan" if item.get("scanned") else "" print(f"error{scanned} {slug}: {item['error']}") continue counts = item.get("candidate_counts", {}) diff = item.get("diff_counts", {}) action = "accepted" if item.get("accepted") else "ingested" if item.get("ingested") else "scanned" if item.get("registry_action") == "skipped_dry_run": action = "dry-run scanned" elif item.get("registry_action") == "skipped_unchanged": action = "unchanged" elif item.get("registry_action") == "accept_blocked": action = "review required" previous = item.get("previous") if isinstance(item.get("previous"), dict) else {} previous_summary = f", previous {previous.get('source')}" if previous.get("source") else "" print( f"{action} {slug} ({item.get('commit', 'working-tree')}): " f"{counts.get('nodes', 0)} node(s), {counts.get('edges', 0)} edge(s), " f"{counts.get('attributes', 0)} attribute(s), " f"diff +{diff.get('added', 0)}/~{diff.get('changed', 0)}" f"/-{diff.get('retired', 0)}/!{diff.get('conflicted', 0)}" f"{previous_summary}" ) counts = summary["counts"] print( f"summary: {counts['total']} repo(s), {counts['scanned']} scanned, " f"{counts['baseline']} baseline, {counts['unchanged']} unchanged, " f"{counts['changed']} changed, {counts['retired']} retired, " f"{counts['conflicted']} conflicted, {counts['review_required']} review required, " f"{counts['llm_skipped']} LLM skipped, " f"{counts['llm_failed']} LLM failed, {counts['accepted']} accepted, " f"{counts['errors']} error(s)" ) def _manifest_repo_slug(item: object) -> str: if not isinstance(item, dict): return "" return str(item.get("slug") or "").strip() def _scan_manifest_connector_manifest( connector_manifest: Path | None, manifest_dir: Path, manifest_path: Path, ) -> Path: if connector_manifest is None: return manifest_path path = connector_manifest.expanduser() return path.resolve() if path.is_absolute() else (manifest_dir / path).resolve() def _scan_manifest_cache_config(args: argparse.Namespace, manifest_path: Path) -> dict[str, object]: cache_dir = _scan_manifest_cache_dir(args, manifest_path) if cache_dir is None: return {"enabled": False} return { "enabled": True, "cache_dir": str(cache_dir), "snapshot_dir": str(cache_dir / DEFAULT_SNAPSHOT_DIR), "report_dir": str(cache_dir / DEFAULT_REPORT_DIR), } def _scan_manifest_cache_dir(args: argparse.Namespace, manifest_path: Path) -> Path | None: if args.no_cache: return None cache_dir = args.cache_dir or Path(DEFAULT_DISCOVERY_DIR) if cache_dir.is_absolute(): return cache_dir return (manifest_path.parent / cache_dir).resolve() def _scan_manifest_lock_path(args: argparse.Namespace, manifest_path: Path) -> Path | None: if args.no_lock: return None if args.lock_file: path = args.lock_file.expanduser() return path.resolve() if path.is_absolute() else (manifest_path.parent / path).resolve() cache_dir = _scan_manifest_cache_dir(args, manifest_path) if cache_dir is None: return None return cache_dir / "rescan.lock" def _scan_manifest_acquire_lock(lock_path: Path) -> int: lock_path.parent.mkdir(parents=True, exist_ok=True) fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644) os.write(fd, f"pid={os.getpid()}\ncreated_at={_utc_now()}\n".encode("utf-8")) return fd def _scan_manifest_release_lock(lock_fd: int | None, lock_path: Path | None) -> None: if lock_fd is not None: os.close(lock_fd) if lock_path is not None: try: lock_path.unlink() except FileNotFoundError: pass def _scan_manifest_output_path(args: argparse.Namespace, manifest_path: Path, slug: str) -> Path | None: if args.output_dir: return _manifest_discovery_snapshot_path(args.output_dir, slug, args.profile) cache_dir = _scan_manifest_cache_dir(args, manifest_path) if cache_dir is None: return None return _manifest_discovery_snapshot_path(cache_dir / DEFAULT_SNAPSHOT_DIR, slug, args.profile) def _scan_manifest_report_path(args: argparse.Namespace, manifest_path: Path, generated_at: str) -> Path | None: if args.report_output: path = args.report_output.expanduser() return path.resolve() if path.is_absolute() else (manifest_path.parent / path).resolve() cache_dir = _scan_manifest_cache_dir(args, manifest_path) if cache_dir is None: return None timestamp = _slugify(generated_at.replace(":", "").replace("+", "")) return cache_dir / DEFAULT_REPORT_DIR / f"{timestamp}-{_slugify(args.profile)}.rescan-report.json" def _scan_manifest_previous_snapshot( args: argparse.Namespace, registry_url: str, manifest_path: Path, slug: str, ) -> tuple[dict[str, Any] | None, dict[str, Any]]: if args.previous_source == "none": return None, {"source": "none", "found": False} if args.previous_source == "registry": try: previous = _registry_get_checked( registry_url, f"/repositories/{quote(slug)}/discovery-snapshots/latest?profile={quote(args.profile)}", ) except RegistryRequestError as exc: if exc.status_code == 404: return None, {"source": "registry", "found": False} raise snapshot = previous.get("snapshot") if isinstance(previous.get("snapshot"), dict) else None if snapshot is None: raise ValueError(f"registry latest discovery snapshot for {slug} has no snapshot object") return snapshot, { "source": "registry", "found": True, "discovery_snapshot_id": previous.get("id"), "commit": previous.get("commit"), "profile": previous.get("profile"), } previous_dir = args.previous_dir if previous_dir is None and not args.no_cache: cache_dir = _scan_manifest_cache_dir(args, manifest_path) previous_dir = cache_dir / DEFAULT_SNAPSHOT_DIR if cache_dir is not None else None if previous_dir is None: return None, {"source": "dir", "found": False} previous_path = _manifest_discovery_snapshot_path(previous_dir, slug, args.profile) if not previous_path.is_file(): return None, {"source": "dir", "found": False, "path": str(previous_path)} previous = json.loads(previous_path.read_text(encoding="utf-8")) if not isinstance(previous, dict): raise ValueError(f"previous snapshot must be a JSON object: {previous_path}") return previous, {"source": "dir", "found": True, "path": str(previous_path)} def _scan_manifest_change_state(snapshot: dict[str, Any], previous_metadata: object) -> str: previous = previous_metadata if isinstance(previous_metadata, dict) else {} if not previous.get("found"): return "baseline" return "changed" if any(_discovery_diff_counts(snapshot).values()) else "unchanged" def _scan_manifest_acceptance_blockers(snapshot: dict[str, Any], args: argparse.Namespace) -> list[str]: if args.accept_policy == "explicit": return [] blockers: list[str] = [] diff = _discovery_diff_counts(snapshot) if diff["conflicted"]: blockers.append(f"{diff['conflicted']} conflicted candidate(s)") tombstone_count = len(snapshot.get("tombstones", [])) if tombstone_count: blockers.append(f"{tombstone_count} tombstone(s)") review_artifacts = _review_artifact_counts(snapshot) low_confidence = review_artifacts.get("llm_low_confidence", 0) if low_confidence: blockers.append(f"{low_confidence} low-confidence LLM artifact(s)") if args.accept_review_state is not None and set(args.accept_review_state) != {"accepted"}: blockers.append("non-accepted review states requested") return blockers def _scan_manifest_needs_review(item: dict[str, Any]) -> bool: if item.get("status") == "review_required": return True if _int_from_nested(item, "diff_counts", "conflicted"): return True if int(item.get("tombstone_count") or 0): return True review_artifacts = item.get("review_artifact_counts") return isinstance(review_artifacts, dict) and any(int(value or 0) for value in review_artifacts.values()) def _scan_manifest_exit_code(summary: dict[str, Any], args: argparse.Namespace) -> int: counts = summary.get("counts") if isinstance(summary.get("counts"), dict) else {} errors = int(counts.get("errors", 0) or 0) if args.strict and errors: return 1 if args.exit_code_mode != "operational": return 0 if errors: return 4 if int(counts.get("review_required", 0) or 0): return 3 if int(counts.get("changed", 0) or 0) or int(counts.get("baseline", 0) or 0): return 2 return 0 def _manifest_discovery_snapshot_path(base_dir: Path, slug: str, profile: str) -> Path: return base_dir.resolve() / f"{_slugify(slug)}-{_slugify(profile)}.discovery.json" def _candidate_counts(snapshot: dict[str, Any]) -> dict[str, int]: candidates = snapshot.get("candidates") if isinstance(snapshot.get("candidates"), dict) else {} return { "nodes": len(candidates.get("nodes", [])), "edges": len(candidates.get("edges", [])), "attributes": len(candidates.get("attributes", [])), } def _discovery_diff_counts(snapshot: dict[str, Any]) -> dict[str, int]: reconciliation = snapshot.get("reconciliation") if isinstance(snapshot.get("reconciliation"), dict) else {} diff = reconciliation.get("diff") if isinstance(reconciliation.get("diff"), dict) else {} return { "added": len(diff.get("added", [])), "changed": len(diff.get("changed", [])), "retired": len(diff.get("retired", [])), "conflicted": len(diff.get("conflicted", [])), } def _review_artifact_counts(snapshot: dict[str, Any]) -> dict[str, int]: counts: dict[str, int] = {} for artifact in snapshot.get("review_artifacts", []): if not isinstance(artifact, dict): continue artifact_type = str(artifact.get("artifact_type") or "unknown") counts[artifact_type] = counts.get(artifact_type, 0) + 1 return counts def _connector_run_summaries(snapshot: dict[str, Any]) -> list[dict[str, Any]]: summaries: list[dict[str, Any]] = [] for run in snapshot.get("connector_runs", []): if not isinstance(run, dict): continue summary = { "connector_id": run.get("connector_id"), "status": run.get("status"), "candidate_counts": run.get("candidate_counts", {}), } if run.get("message"): summary["message"] = run.get("message") summaries.append(summary) return summaries def _set_llm_result( result: dict[str, Any], snapshot: dict[str, Any], llm_enabled: bool, llm_skip_reason: str, ) -> None: if not llm_enabled: result["llm"] = { "enabled": False, "attempted": False, "status": "skipped", "reason": llm_skip_reason, } return artifact_counts = _review_artifact_counts(snapshot) failure_count = sum( count for artifact_type, count in artifact_counts.items() if artifact_type in {"llm_execution_error", "llm_output_invalid"} ) result["llm"] = { "enabled": True, "attempted": True, "status": "failed" if failure_count else "used", "failure_artifacts": failure_count, } def _scan_manifest_has_diff(item: dict[str, Any]) -> bool: diff = item.get("diff_counts") if isinstance(item.get("diff_counts"), dict) else {} return any(int(diff.get(key, 0)) for key in ("added", "changed", "retired", "conflicted")) def _int_from_nested(item: dict[str, Any], object_key: str, value_key: str) -> int: nested = item.get(object_key) if isinstance(item.get(object_key), dict) else {} try: return int(nested.get(value_key, 0)) except (TypeError, ValueError): return 0 def _sync_manifest_repo(registry_url: str, manifest_dir: Path, item: object) -> dict[str, object]: if not isinstance(item, dict): return {"slug": "", "status": "error", "error": "repository entry must be a mapping"} slug = str(item.get("slug") or "").strip() if not slug: return {"slug": "", "status": "error", "error": "repository entry requires slug"} repo_path = _manifest_optional_path(item.get("path"), manifest_dir) result: dict[str, object] = {"slug": slug, "status": "registered", "warnings": []} try: repository = _registry_post_checked( registry_url, "/repositories", { "slug": slug, "name": item.get("name") or (repo_path.name if repo_path else slug), "remote_url": item.get("remote_url") or _git_value(repo_path, "config", "--get", "remote.origin.url"), "default_branch": item.get("default_branch") or "main", "state_hub_repo_id": item.get("state_hub_repo_id"), }, ) result["repository"] = repository except RegistryRequestError as exc: return {"slug": slug, "status": "error", "error": str(exc), "warnings": []} if repo_path is None: result["warnings"].append("no repo path configured") return result if not repo_path.is_dir(): result["warnings"].append(f"repo path not found: {repo_path}") return result graph_paths = _manifest_paths(item.get("declaration_paths"), manifest_dir) or [repo_path] if not any(declaration_files(path) for path in graph_paths): result["warnings"].append("no Fabric declarations found") try: _ingest_manifest_sboms(registry_url, manifest_dir, slug, item, result) except RegistryRequestError as exc: return {"slug": slug, "status": "error", "error": str(exc), "warnings": result["warnings"]} return result report = validate_roots(graph_paths) if report.errors: return { "slug": slug, "status": "error", "error": report.summary(), "warnings": [diagnostic.format() for diagnostic in report.diagnostics], } graph = build_graph(graph_paths) if graph.load_errors: return { "slug": slug, "status": "error", "error": "; ".join(f"{path}: {message}" for path, message in graph.load_errors), "warnings": [], } try: snapshot = _registry_post_checked( registry_url, f"/repositories/{slug}/snapshots", { "commit": item.get("commit") or _git_value(repo_path, "rev-parse", "HEAD") or "working-tree", "generated_at": _utc_now(), "graph": graph.to_export(), }, ) result.update( { "status": "synced", "snapshot_id": snapshot["id"], "commit": snapshot["commit"], } ) _ingest_manifest_sboms(registry_url, manifest_dir, slug, item, result) except RegistryRequestError as exc: return {"slug": slug, "status": "error", "error": str(exc), "warnings": result["warnings"]} return result def _ingest_manifest_sboms( registry_url: str, manifest_dir: Path, slug: str, item: dict[str, object], result: dict[str, object], ) -> None: sbom_paths = _manifest_paths(item.get("sboms"), manifest_dir) single_sbom = _manifest_optional_path(item.get("sbom"), manifest_dir) if single_sbom: sbom_paths.insert(0, single_sbom) ingested: list[dict[str, object]] = [] for sbom_path in sbom_paths: if not sbom_path.is_file(): result.setdefault("warnings", []).append(f"SBOM not found: {sbom_path}") continue payload = load_yaml(sbom_path) if not isinstance(payload, dict): result.setdefault("warnings", []).append(f"SBOM must be an object: {sbom_path}") continue ingested.append( _registry_post_checked( registry_url, f"/repositories/{slug}/libraries/cyclonedx", payload, ) ) if ingested: result["libraries"] = ingested def _registry_ingest_cyclonedx(args: argparse.Namespace) -> int: payload = load_yaml(args.sbom) if not isinstance(payload, dict): print(f"ERROR {args.sbom}: CycloneDX SBOM must be a mapping/object", file=sys.stderr) return 1 result = _registry_post( args.registry_url, f"/repositories/{args.repo_slug}/libraries/cyclonedx", payload, ) if args.json: print(json.dumps(result, indent=2, sort_keys=True)) else: print(f"ingested {result['component_count']} library component(s) for {result['repo_slug']}") return 0 def _registry_ingest_discovery(args: argparse.Namespace) -> int: payload = json.loads(args.snapshot.read_text(encoding="utf-8")) if not isinstance(payload, dict): print(f"ERROR {args.snapshot}: discovery snapshot must be a JSON object", file=sys.stderr) return 1 source = payload.get("source") if isinstance(payload.get("source"), dict) else {} repo_slug = args.repo_slug or str(source.get("repo_slug") or "").strip() if not repo_slug: print("ERROR discovery snapshot source.repo_slug is required unless --repo-slug is provided", file=sys.stderr) return 1 result = _registry_post( args.registry_url, f"/repositories/{repo_slug}/discovery-snapshots", payload, ) if args.json: print(json.dumps(result, indent=2, sort_keys=True)) else: candidates = result.get("snapshot", {}).get("candidates", {}) counts = { "nodes": len(candidates.get("nodes", [])), "edges": len(candidates.get("edges", [])), "attributes": len(candidates.get("attributes", [])), } print( f"ingested discovery snapshot {result['id']} for {result['repo_slug']} " f"({result['profile']}, {result['commit']}): " f"{counts['nodes']} node candidate(s), {counts['edges']} edge candidate(s), " f"{counts['attributes']} attribute candidate(s)" ) return 0 def _registry_accept_discovery(args: argparse.Namespace) -> int: payload = { "accepted_keys": args.accepted_key, "commit": args.commit, } if args.accept_review_state is not None: payload["accept_review_states"] = args.accept_review_state result = _registry_post( args.registry_url, f"/repositories/{args.repo_slug}/discovery-snapshots/{args.discovery_snapshot_id}/accept", payload, ) if args.json: print(json.dumps(result, indent=2, sort_keys=True)) else: graph_snapshot = result["graph_snapshot"] print( f"accepted discovery snapshot {args.discovery_snapshot_id} for {args.repo_slug}; " f"graph snapshot {graph_snapshot['id']} stored for {graph_snapshot['commit']}" ) return 0 def _scan_repo(args: argparse.Namespace) -> int: snapshot = scan_repo( ScanOptions( repo_path=args.path, repo_slug=args.repo_slug, repo_name=args.repo_name, domain=args.domain, commit=args.commit, profile=args.profile, deterministic_only=not args.llm, llm_enabled=args.llm, llm_config=LLMExtractionConfig( provider=args.llm_provider, model=args.llm_model, temperature=args.llm_temperature, max_tokens=args.llm_max_tokens, min_confidence=args.llm_min_confidence, ), connectors=[ ConnectorConfig( connector_id=connector_id, connector_type="fabric_registry", source_path=str(args.connector_manifest), ) for connector_id in args.connector ], ) ) if args.previous_snapshot: try: previous = json.loads(args.previous_snapshot.read_text(encoding="utf-8")) except Exception as exc: print(f"ERROR {args.previous_snapshot}: cannot read previous snapshot: {exc}", file=sys.stderr) return 1 if not isinstance(previous, dict): print(f"ERROR {args.previous_snapshot}: previous snapshot must be a JSON object", file=sys.stderr) return 1 snapshot = reconcile_discovery_snapshots(previous, snapshot) payload = json.dumps(snapshot, indent=2, sort_keys=True) if args.output: args.output.parent.mkdir(parents=True, exist_ok=True) args.output.write_text(payload + "\n", encoding="utf-8") if args.json: print(payload) return 0 candidates = snapshot["candidates"] review_count = len(snapshot.get("review_artifacts", [])) review_summary = f", {review_count} review artifact(s)" if review_count else "" connector_count = len(snapshot.get("connector_runs", [])) connector_summary = f", {connector_count} connector run(s)" if connector_count else "" reconciliation = snapshot.get("reconciliation", {}) diff = reconciliation.get("diff") if isinstance(reconciliation, dict) else None diff_summary = "" if isinstance(diff, dict): diff_summary = ( f", diff +{len(diff.get('added', []))}" f"/~{len(diff.get('changed', []))}" f"/-{len(diff.get('retired', []))}" f"/!{len(diff.get('conflicted', []))}" ) mode = "dry-run " if args.dry_run else "" print( f"{mode}scan {snapshot['source']['repo_slug']} " f"({snapshot['source']['commit']}): " f"{len(candidates['nodes'])} node(s), " f"{len(candidates['edges'])} edge(s), " f"{len(candidates['attributes'])} attribute(s), " f"{len(snapshot['replacement_scopes'])} replacement scope(s)" f"{review_summary}" f"{connector_summary}" f"{diff_summary}" ) if args.output: print(f"wrote {args.output}") return 0 class RegistryRequestError(Exception): def __init__(self, message: str, status_code: int | None = None) -> None: super().__init__(message) self.status_code = status_code def _registry_post(registry_url: str, path: str, payload: dict[str, object]) -> dict[str, object]: try: return _registry_post_checked(registry_url, path, payload) except RegistryRequestError as exc: print(f"ERROR {exc}", file=sys.stderr) raise SystemExit(1) from exc def _registry_post_checked(registry_url: str, path: str, payload: dict[str, object]) -> dict[str, object]: data = json.dumps({key: value for key, value in payload.items() if value is not None}).encode("utf-8") request = urllib.request.Request( registry_url.rstrip("/") + path, data=data, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(request, timeout=15) as response: body = json.loads(response.read()) except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace") raise RegistryRequestError(f"registry request failed ({exc.code}): {detail}", status_code=exc.code) from exc except urllib.error.URLError as exc: raise RegistryRequestError(f"cannot reach registry at {registry_url}: {exc}") from exc if not isinstance(body, dict): raise RegistryRequestError("registry returned a non-object response") return body def _registry_get_checked(registry_url: str, path: str) -> dict[str, object]: request = urllib.request.Request(registry_url.rstrip("/") + path, method="GET") try: with urllib.request.urlopen(request, timeout=15) as response: body = json.loads(response.read()) except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace") raise RegistryRequestError(f"registry request failed ({exc.code}): {detail}", status_code=exc.code) from exc except urllib.error.URLError as exc: raise RegistryRequestError(f"cannot reach registry at {registry_url}: {exc}") from exc if not isinstance(body, dict): raise RegistryRequestError("registry returned a non-object response") return body def _manifest_optional_path(value: object, manifest_dir: Path) -> Path | None: if not isinstance(value, str) or not value.strip(): return None path = Path(value).expanduser() return path if path.is_absolute() else (manifest_dir / path).resolve() def _manifest_paths(value: object, manifest_dir: Path) -> list[Path]: if value is None: return [] if isinstance(value, str): values = [value] elif isinstance(value, list): values = value else: return [] paths: list[Path] = [] for item in values: path = _manifest_optional_path(item, manifest_dir) if path is not None: paths.append(path) return paths def _primary_repo_path(paths: list[Path]) -> Path: if not paths: return Path(".").resolve() path = paths[0].resolve() return path.parent if path.is_file() else path def _slugify(value: str) -> str: return re.sub(r"-+", "-", re.sub(r"[^a-z0-9]", "-", value.lower())).strip("-") or "repo" def _git_value(repo_path: Path | None, *args: str) -> str | None: if repo_path is None: return None try: result = subprocess.run( ["git", *args], cwd=repo_path, check=False, capture_output=True, text=True, timeout=5, ) except (OSError, subprocess.TimeoutExpired): return None if result.returncode != 0: return None value = result.stdout.strip() return value or None def _utc_now() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") def _age_hours(value: object, now: datetime) -> float | None: if not isinstance(value, str) or not value: return None try: parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) except ValueError: return None if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=timezone.utc) return max(0.0, (now - parsed).total_seconds() / 3600) def _load_graph_or_exit(paths: list[Path]) -> FabricGraph: graph = build_graph(paths) if graph.load_errors: for path, message in graph.load_errors: print(f"ERROR {path}: {message}", file=sys.stderr) raise SystemExit(1) return graph def _print_providers(graph: FabricGraph, capability: str) -> None: providers = graph.providers(capability) if not providers: print(f"no providers found for {capability}") return print("provider_id\tservice_id\tlifecycle\tenvironments\tinterfaces") for provider in providers: spec = provider.spec print( "\t".join( [ provider.id, str(spec.get("service_id", "")), str(spec.get("lifecycle", "")), ",".join(spec.get("environments", [])), ",".join(spec.get("interface_ids", [])), ] ) ) def _print_consumers( graph: FabricGraph, target: str, matches: object | None = None, ) -> None: consumer_matches = graph.consumers(target) if matches is None else list(matches) if not consumer_matches: print(f"no consumers found for {target}") return print("consumer_service_id\tdependency_id\trequires\tprovider_capability_id\tprovider_interface_id\tstatus") for match in consumer_matches: print( "\t".join( [ match.consumer_service_id, match.dependency_id, match.required_capability_type, match.provider_capability_id, match.provider_interface_id, match.status, ] ) ) def _print_unresolved(graph: FabricGraph) -> None: unresolved = graph.unresolved_dependencies() if not unresolved: print("no unresolved dependencies") return print("dependency_id\tconsumer_service_id\trequires") for dependency in unresolved: spec = dependency.spec requires = spec.get("requires", {}) print( "\t".join( [ dependency.id, str(spec.get("consumer_service_id", "")), str(requires.get("capability_id") or requires.get("capability_type", "")), ] ) ) if __name__ == "__main__": sys.exit(main())