generated from coulomb/repo-seed
1748 lines
72 KiB
Python
1748 lines
72 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import urllib.error
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from urllib.parse import quote
|
|
|
|
from .accountability_roots import (
|
|
DEFAULT_ROOT_MANIFEST_PATH,
|
|
AccountabilityEvidenceStore,
|
|
build_identity_projection,
|
|
build_ownership_review,
|
|
build_update_delta,
|
|
collect_accountability_root_evidence,
|
|
load_accountability_root_manifest,
|
|
)
|
|
from .connectors import ConnectorConfig
|
|
from .financial_baseline import financial_export_from_legacy
|
|
from .loader import declaration_files, load_yaml
|
|
from .graph import FabricGraph, build_graph
|
|
from .graph_explorer import fabric_graph_explorer_payload
|
|
from .llm_extraction import LLMExtractionConfig
|
|
from .reconciliation import reconcile_discovery_snapshots
|
|
from .registry import RESET_CONFIRMATION_TOKEN
|
|
from .scanner import EXTRACTOR_VERSION, ScanOptions, scan_repo
|
|
from .validation import validate_roots
|
|
|
|
|
|
DEFAULT_DISCOVERY_DIR = ".fabric-discovery"
|
|
DEFAULT_SNAPSHOT_DIR = "snapshots"
|
|
DEFAULT_REPORT_DIR = "reports"
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
prog="railiance-fabric",
|
|
description="Load and validate Railiance Fabric declarations.",
|
|
)
|
|
sub = parser.add_subparsers(dest="command", required=True)
|
|
|
|
validate = sub.add_parser(
|
|
"validate",
|
|
help="Validate one or more repo roots or declaration files.",
|
|
)
|
|
validate.add_argument(
|
|
"paths",
|
|
nargs="+",
|
|
type=Path,
|
|
help="Repo root, fabric directory, or declaration YAML file.",
|
|
)
|
|
validate.add_argument(
|
|
"--warnings-as-errors",
|
|
action="store_true",
|
|
help="Exit non-zero when warnings are present.",
|
|
)
|
|
|
|
providers = sub.add_parser("providers", help="List providers for a capability type or id.")
|
|
providers.add_argument("capability", help="Capability type or capability id.")
|
|
providers.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
|
|
|
|
consumers = sub.add_parser("consumers", help="List consumers of a capability or interface.")
|
|
consumers.add_argument("target", help="Capability/interface type or declaration id.")
|
|
consumers.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
|
|
|
|
dependency_path = sub.add_parser("dependency-path", help="Show dependency path for a service.")
|
|
dependency_path.add_argument("service_id", help="Service declaration id.")
|
|
dependency_path.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
|
|
|
|
unresolved = sub.add_parser("unresolved", help="Show missing or unresolved dependencies.")
|
|
unresolved.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
|
|
|
|
blast = sub.add_parser("blast-radius", help="Show consumers affected by an interface change.")
|
|
blast.add_argument("interface", help="Interface type or interface declaration id.")
|
|
blast.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
|
|
|
|
export = sub.add_parser("export", help="Export graph as JSON, Mermaid, graph-explorer, or financial payload.")
|
|
export.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
|
|
export.add_argument("--format", choices=["json", "mermaid", "graph-explorer", "financial"], default="json")
|
|
|
|
scan = sub.add_parser("scan", help="Scan a repo for deterministic discovery candidates.")
|
|
scan.add_argument("path", nargs="?", type=Path, default=Path("."))
|
|
scan.add_argument("--repo-slug", default=None)
|
|
scan.add_argument("--repo-name", default=None)
|
|
scan.add_argument("--domain", default=None)
|
|
scan.add_argument("--commit", default=None)
|
|
scan.add_argument("--profile", default="deterministic")
|
|
scan.add_argument("--dry-run", action="store_true", help="Do not write anywhere except an explicit --output file.")
|
|
scan.add_argument("--output", type=Path, default=None, help="Write the discovery snapshot JSON to a file.")
|
|
scan.add_argument("--previous-snapshot", type=Path, default=None, help="Reconcile against a previous discovery snapshot JSON.")
|
|
scan.add_argument("--json", action="store_true", help="Print the discovery snapshot JSON to stdout.")
|
|
scan.add_argument("--llm", action="store_true", help="Enable llm-connect assisted extraction.")
|
|
scan.add_argument("--llm-provider", default="mock", help="llm-connect provider name.")
|
|
scan.add_argument("--llm-model", default="mock", help="Model name passed to llm-connect.")
|
|
scan.add_argument("--llm-temperature", type=float, default=0.0)
|
|
scan.add_argument("--llm-max-tokens", type=int, default=1500)
|
|
scan.add_argument("--llm-min-confidence", type=float, default=0.6)
|
|
scan.add_argument(
|
|
"--connector",
|
|
action="append",
|
|
choices=["local-fabric-registry"],
|
|
default=[],
|
|
help="Enable a discovery connector. May be passed more than once.",
|
|
)
|
|
scan.add_argument(
|
|
"--connector-manifest",
|
|
type=Path,
|
|
default=Path("registry/local-repos.yaml"),
|
|
help="Manifest path for the local-fabric-registry connector.",
|
|
)
|
|
|
|
discover_roots = sub.add_parser(
|
|
"discover-roots",
|
|
help="Collect raw evidence from accountability root manifest entries.",
|
|
)
|
|
discover_roots.add_argument("--manifest", type=Path, default=DEFAULT_ROOT_MANIFEST_PATH)
|
|
discover_roots.add_argument("--include-remote", action="store_true", help="Allow HTTP reads from configured remote roots.")
|
|
discover_roots.add_argument("--max-items-per-root", type=int, default=200)
|
|
discover_roots.add_argument("--identity-projection", action="store_true", help="Print normalized identity candidates instead of raw evidence.")
|
|
discover_roots.add_argument("--ownership-review", action="store_true", help="Print ownership resolution and review blockers.")
|
|
discover_roots.add_argument("--delta", action="store_true", help="Print a delta against previous identity/ownership review files.")
|
|
discover_roots.add_argument("--previous-identity-projection", type=Path, default=None)
|
|
discover_roots.add_argument("--previous-ownership-review", type=Path, default=None)
|
|
discover_roots.add_argument("--store-db", type=Path, default=None, help="Persist evidence and identity candidates in a SQLite store.")
|
|
|
|
review_identity = sub.add_parser(
|
|
"review-identity",
|
|
help="Persist a review decision for a stable accountability identity candidate.",
|
|
)
|
|
review_identity.add_argument("stable_key")
|
|
review_identity.add_argument("--store-db", type=Path, required=True)
|
|
review_identity.add_argument("--decision", choices=["accept", "needs_review", "reject"], required=True)
|
|
review_identity.add_argument("--owner-actor-id", default="")
|
|
review_identity.add_argument("--fabric-id", default="")
|
|
review_identity.add_argument("--subfabric-id", default="")
|
|
review_identity.add_argument("--reviewer", default="operator")
|
|
review_identity.add_argument("--note", default="")
|
|
|
|
registry = sub.add_parser("registry", help="Feed a running Railiance Fabric registry service.")
|
|
registry_sub = registry.add_subparsers(dest="registry_command", required=True)
|
|
|
|
sync = registry_sub.add_parser("sync", help="Register a repo and ingest its current graph snapshot.")
|
|
sync.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
|
|
sync.add_argument("--registry-url", default="http://127.0.0.1:8765")
|
|
sync.add_argument("--repo-slug", default=None)
|
|
sync.add_argument("--name", default=None)
|
|
sync.add_argument("--remote-url", default=None)
|
|
sync.add_argument("--default-branch", default="main")
|
|
sync.add_argument("--state-hub-repo-id", default=None)
|
|
sync.add_argument("--commit", default=None)
|
|
sync.add_argument("--json", action="store_true", help="Print the raw snapshot response.")
|
|
|
|
sync_manifest = registry_sub.add_parser("sync-manifest", help="Register and sync repos from an onboarding manifest.")
|
|
sync_manifest.add_argument("manifest", type=Path)
|
|
sync_manifest.add_argument("--registry-url", default=None, help="Override the manifest registry_url.")
|
|
sync_manifest.add_argument("--strict", action="store_true", help="Exit non-zero when any repo cannot be synced.")
|
|
sync_manifest.add_argument("--json", action="store_true", help="Print the raw manifest sync summary.")
|
|
|
|
scan_manifest = registry_sub.add_parser("scan-manifest", help="Run discovery scans from an onboarding manifest.")
|
|
scan_manifest.add_argument("manifest", type=Path)
|
|
scan_manifest.add_argument("--registry-url", default=None, help="Override the manifest registry_url.")
|
|
scan_manifest.add_argument("--repo-slug", action="append", default=[], help="Only scan this repo slug. May be repeated.")
|
|
scan_manifest.add_argument("--profile", default="deterministic", help="Discovery scan profile name.")
|
|
scan_manifest.add_argument("--output-dir", type=Path, default=None, help="Write one discovery snapshot JSON per scanned repo.")
|
|
scan_manifest.add_argument("--previous-dir", type=Path, default=None, help="Read previous per-repo snapshots for reconciliation.")
|
|
scan_manifest.add_argument(
|
|
"--cache-dir",
|
|
type=Path,
|
|
default=None,
|
|
help="Operational cache directory. Defaults to <manifest-dir>/.fabric-discovery.",
|
|
)
|
|
scan_manifest.add_argument(
|
|
"--no-cache",
|
|
action="store_true",
|
|
help="Disable default cache snapshots and report output unless explicit paths are provided.",
|
|
)
|
|
scan_manifest.add_argument(
|
|
"--previous-source",
|
|
choices=["dir", "registry", "none"],
|
|
default="dir",
|
|
help="Where to read previous discovery snapshots from before reconciling.",
|
|
)
|
|
scan_manifest.add_argument(
|
|
"--previous-from-registry",
|
|
action="store_true",
|
|
help="Shortcut for --previous-source registry.",
|
|
)
|
|
scan_manifest.add_argument("--report-output", type=Path, default=None, help="Write an operational rescan report JSON.")
|
|
scan_manifest.add_argument("--dry-run", action="store_true", help="Do not write to the registry.")
|
|
scan_manifest.add_argument("--ingest", action="store_true", help="Store each discovery snapshot in the registry.")
|
|
scan_manifest.add_argument(
|
|
"--ingest-unchanged",
|
|
action="store_true",
|
|
help="Also ingest unchanged snapshots when --ingest is enabled.",
|
|
)
|
|
scan_manifest.add_argument("--accept", action="store_true", help="Accept ingested snapshots after scanning.")
|
|
scan_manifest.add_argument(
|
|
"--accepted-key",
|
|
action="append",
|
|
default=[],
|
|
help="Candidate stable key to include during acceptance. May be repeated.",
|
|
)
|
|
scan_manifest.add_argument(
|
|
"--accept-review-state",
|
|
action="append",
|
|
default=None,
|
|
help="Review state accepted during projection. Defaults to accepted.",
|
|
)
|
|
scan_manifest.add_argument(
|
|
"--accept-policy",
|
|
choices=["safe", "explicit"],
|
|
default="safe",
|
|
help="Safe blocks conflicts/tombstones and accepts only accepted review state; explicit allows provided overrides.",
|
|
)
|
|
scan_manifest.add_argument("--strict", action="store_true", help="Exit non-zero when any repo cannot be scanned or stored.")
|
|
scan_manifest.add_argument(
|
|
"--exit-code-mode",
|
|
choices=["default", "operational"],
|
|
default="default",
|
|
help="Use operational exit codes: 2 changes/baseline, 3 review required, 4 partial failure.",
|
|
)
|
|
scan_manifest.add_argument("--lock-file", type=Path, default=None, help="Lock file path for automation-safe runs.")
|
|
scan_manifest.add_argument("--no-lock", action="store_true", help="Disable the default cache-directory rescan lock.")
|
|
scan_manifest.add_argument("--json", action="store_true", help="Print the raw manifest scan summary.")
|
|
scan_manifest.add_argument("--llm", action="store_true", help="Enable llm-connect assisted extraction.")
|
|
scan_manifest.add_argument("--deterministic-only", action="store_true", help="Force deterministic-only scans even when --llm is set.")
|
|
scan_manifest.add_argument("--llm-provider", default="mock", help="llm-connect provider name.")
|
|
scan_manifest.add_argument("--llm-model", default="mock", help="Model name passed to llm-connect.")
|
|
scan_manifest.add_argument("--llm-temperature", type=float, default=0.0)
|
|
scan_manifest.add_argument("--llm-max-tokens", type=int, default=1500)
|
|
scan_manifest.add_argument("--llm-min-confidence", type=float, default=0.6)
|
|
scan_manifest.add_argument("--llm-max-runs", type=int, default=None, help="Maximum repos allowed to run LLM extraction.")
|
|
scan_manifest.add_argument(
|
|
"--connector",
|
|
action="append",
|
|
choices=["local-fabric-registry"],
|
|
default=[],
|
|
help="Enable a discovery connector for each repo. May be passed more than once.",
|
|
)
|
|
scan_manifest.add_argument(
|
|
"--connector-manifest",
|
|
type=Path,
|
|
default=None,
|
|
help="Manifest path for the local-fabric-registry connector. Defaults to the scanned manifest.",
|
|
)
|
|
|
|
rescan_status = registry_sub.add_parser("rescan-status", help="Show discovery rescan freshness and review status.")
|
|
rescan_status.add_argument("--registry-url", default="http://127.0.0.1:8765")
|
|
rescan_status.add_argument("--review-only", action="store_true", help="Only show repos whose latest discovery needs review.")
|
|
rescan_status.add_argument("--stale-after-hours", type=float, default=None, help="Only show discovery snapshots older than this age.")
|
|
rescan_status.add_argument("--json", action="store_true", help="Print the raw rescan status summary.")
|
|
|
|
cyclonedx = registry_sub.add_parser("ingest-cyclonedx", help="Ingest a CycloneDX SBOM as library inventory.")
|
|
cyclonedx.add_argument("sbom", type=Path)
|
|
cyclonedx.add_argument("--registry-url", default="http://127.0.0.1:8765")
|
|
cyclonedx.add_argument("--repo-slug", required=True)
|
|
cyclonedx.add_argument("--json", action="store_true", help="Print the raw ingest response.")
|
|
|
|
discovery = registry_sub.add_parser("ingest-discovery", help="Store a discovery snapshot for review.")
|
|
discovery.add_argument("snapshot", type=Path)
|
|
discovery.add_argument("--registry-url", default="http://127.0.0.1:8765")
|
|
discovery.add_argument("--repo-slug", default=None)
|
|
discovery.add_argument("--json", action="store_true", help="Print the raw ingest response.")
|
|
|
|
accept_discovery = registry_sub.add_parser("accept-discovery", help="Project accepted discovery candidates into a graph snapshot.")
|
|
accept_discovery.add_argument("repo_slug")
|
|
accept_discovery.add_argument("discovery_snapshot_id", type=int)
|
|
accept_discovery.add_argument("--registry-url", default="http://127.0.0.1:8765")
|
|
accept_discovery.add_argument("--accepted-key", action="append", default=[])
|
|
accept_discovery.add_argument("--accept-review-state", action="append", default=None)
|
|
accept_discovery.add_argument("--commit", default=None)
|
|
accept_discovery.add_argument("--json", action="store_true", help="Print the raw accept response.")
|
|
|
|
export_archive = registry_sub.add_parser(
|
|
"export-reset-archive",
|
|
help="Export registry graph/discovery data before a guarded reset.",
|
|
)
|
|
export_archive.add_argument("output", type=Path)
|
|
export_archive.add_argument("--registry-url", default="http://127.0.0.1:8765")
|
|
export_archive.add_argument("--overwrite", action="store_true", help="Overwrite an existing archive file.")
|
|
export_archive.add_argument("--json", action="store_true", help="Print archive metadata.")
|
|
|
|
reset_graph = registry_sub.add_parser(
|
|
"reset-graph-data",
|
|
help="Export an archive, then reset registry graph/discovery data with an explicit confirmation token.",
|
|
)
|
|
reset_graph.add_argument("--registry-url", default="http://127.0.0.1:8765")
|
|
reset_graph.add_argument("--archive", type=Path, required=True, help="Archive JSON file to write before reset.")
|
|
reset_graph.add_argument("--overwrite-archive", action="store_true", help="Overwrite an existing archive file.")
|
|
reset_graph.add_argument("--confirm", required=True, help=f"Must equal {RESET_CONFIRMATION_TOKEN}.")
|
|
reset_graph.add_argument("--reason", required=True)
|
|
reset_graph.add_argument("--json", action="store_true", help="Print the raw reset response.")
|
|
return parser
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
parser = build_parser()
|
|
args = parser.parse_args(argv)
|
|
|
|
if args.command == "validate":
|
|
report = validate_roots(args.paths)
|
|
for diagnostic in report.diagnostics:
|
|
print(diagnostic.format())
|
|
print(report.summary())
|
|
if report.errors:
|
|
return 1
|
|
if args.warnings_as_errors and report.warnings:
|
|
return 1
|
|
return 0
|
|
|
|
if args.command == "providers":
|
|
graph = _load_graph_or_exit(args.paths)
|
|
_print_providers(graph, args.capability)
|
|
return 0
|
|
|
|
if args.command == "consumers":
|
|
graph = _load_graph_or_exit(args.paths)
|
|
_print_consumers(graph, args.target)
|
|
return 0
|
|
|
|
if args.command == "dependency-path":
|
|
graph = _load_graph_or_exit(args.paths)
|
|
print("\n".join(graph.dependency_path_lines(args.service_id)))
|
|
return 0
|
|
|
|
if args.command == "unresolved":
|
|
graph = _load_graph_or_exit(args.paths)
|
|
_print_unresolved(graph)
|
|
return 0
|
|
|
|
if args.command == "blast-radius":
|
|
graph = _load_graph_or_exit(args.paths)
|
|
_print_consumers(graph, args.interface, matches=graph.blast_radius(args.interface))
|
|
return 0
|
|
|
|
if args.command == "export":
|
|
graph = _load_graph_or_exit(args.paths)
|
|
if args.format == "mermaid":
|
|
print(graph.to_mermaid())
|
|
elif args.format == "graph-explorer":
|
|
print(json.dumps(fabric_graph_explorer_payload(graph.to_export()), indent=2, sort_keys=True))
|
|
elif args.format == "financial":
|
|
print(json.dumps(financial_export_from_legacy(graph.to_export()), indent=2, sort_keys=True))
|
|
else:
|
|
print(graph.to_json())
|
|
return 0
|
|
|
|
if args.command == "scan":
|
|
return _scan_repo(args)
|
|
|
|
if args.command == "discover-roots":
|
|
manifest = load_accountability_root_manifest(args.manifest)
|
|
payload = collect_accountability_root_evidence(
|
|
args.manifest,
|
|
include_remote=args.include_remote,
|
|
max_items_per_root=args.max_items_per_root,
|
|
)
|
|
projection = build_identity_projection(payload, manifest)
|
|
store = AccountabilityEvidenceStore(args.store_db) if args.store_db else None
|
|
decisions = store.latest_review_decisions() if store else {}
|
|
ownership_review = build_ownership_review(projection, manifest, review_decisions=decisions)
|
|
update_delta = build_update_delta(
|
|
projection,
|
|
ownership_review,
|
|
previous_identity_projection=_load_json_file(args.previous_identity_projection)
|
|
if args.previous_identity_projection
|
|
else None,
|
|
previous_ownership_review=_load_json_file(args.previous_ownership_review)
|
|
if args.previous_ownership_review
|
|
else None,
|
|
)
|
|
if args.store_db:
|
|
store.add_evidence_run(payload, projection)
|
|
output = (
|
|
update_delta
|
|
if args.delta
|
|
else ownership_review
|
|
if args.ownership_review
|
|
else projection
|
|
if args.identity_projection
|
|
else payload
|
|
)
|
|
print(json.dumps(output, indent=2, sort_keys=True))
|
|
return 0
|
|
|
|
if args.command == "review-identity":
|
|
decision = AccountabilityEvidenceStore(args.store_db).add_review_decision(
|
|
stable_key=args.stable_key,
|
|
decision=args.decision,
|
|
reviewer=args.reviewer,
|
|
owner_actor_id=args.owner_actor_id,
|
|
fabric_id=args.fabric_id,
|
|
subfabric_id=args.subfabric_id,
|
|
note=args.note,
|
|
)
|
|
print(json.dumps(decision, indent=2, sort_keys=True))
|
|
return 0
|
|
|
|
if args.command == "registry":
|
|
if args.registry_command == "sync":
|
|
return _registry_sync(args)
|
|
if args.registry_command == "sync-manifest":
|
|
return _registry_sync_manifest(args)
|
|
if args.registry_command == "scan-manifest":
|
|
return _registry_scan_manifest(args)
|
|
if args.registry_command == "rescan-status":
|
|
return _registry_rescan_status(args)
|
|
if args.registry_command == "ingest-cyclonedx":
|
|
return _registry_ingest_cyclonedx(args)
|
|
if args.registry_command == "ingest-discovery":
|
|
return _registry_ingest_discovery(args)
|
|
if args.registry_command == "accept-discovery":
|
|
return _registry_accept_discovery(args)
|
|
if args.registry_command == "export-reset-archive":
|
|
return _registry_export_reset_archive(args)
|
|
if args.registry_command == "reset-graph-data":
|
|
return _registry_reset_graph_data(args)
|
|
|
|
parser.error(f"unknown command {args.command!r}")
|
|
return 2
|
|
|
|
|
|
def _registry_sync(args: argparse.Namespace) -> int:
|
|
report = validate_roots(args.paths)
|
|
for diagnostic in report.diagnostics:
|
|
print(diagnostic.format(), file=sys.stderr)
|
|
if report.errors:
|
|
print(report.summary(), file=sys.stderr)
|
|
return 1
|
|
|
|
graph = _load_graph_or_exit(args.paths)
|
|
repo_path = _primary_repo_path(args.paths)
|
|
repo_slug = args.repo_slug or _slugify(repo_path.name)
|
|
repository = _registry_post(
|
|
args.registry_url,
|
|
"/repositories",
|
|
{
|
|
"slug": repo_slug,
|
|
"name": args.name or repo_path.name,
|
|
"remote_url": args.remote_url or _git_value(repo_path, "config", "--get", "remote.origin.url"),
|
|
"default_branch": args.default_branch,
|
|
"state_hub_repo_id": args.state_hub_repo_id,
|
|
},
|
|
)
|
|
snapshot = _registry_post(
|
|
args.registry_url,
|
|
f"/repositories/{repo_slug}/snapshots",
|
|
{
|
|
"commit": args.commit or _git_value(repo_path, "rev-parse", "HEAD") or "working-tree",
|
|
"generated_at": _utc_now(),
|
|
"graph": graph.to_export(),
|
|
},
|
|
)
|
|
if args.json:
|
|
print(json.dumps({"repository": repository, "snapshot": snapshot}, indent=2, sort_keys=True))
|
|
else:
|
|
print(f"registered {repository['slug']}")
|
|
print(f"snapshot {snapshot['id']} accepted for {snapshot['commit']}")
|
|
return 0
|
|
|
|
|
|
def _registry_sync_manifest(args: argparse.Namespace) -> int:
|
|
manifest_path = args.manifest.resolve()
|
|
manifest = load_yaml(manifest_path)
|
|
if not isinstance(manifest, dict):
|
|
print(f"ERROR {manifest_path}: manifest must be a YAML mapping", file=sys.stderr)
|
|
return 1
|
|
repositories = manifest.get("repositories")
|
|
if not isinstance(repositories, list):
|
|
print(f"ERROR {manifest_path}: manifest requires a repositories list", file=sys.stderr)
|
|
return 1
|
|
|
|
registry_url = args.registry_url or str(manifest.get("registry_url") or "http://127.0.0.1:8765")
|
|
results = [
|
|
_sync_manifest_repo(registry_url, manifest_path.parent, item)
|
|
for item in repositories
|
|
]
|
|
summary = {
|
|
"manifest": str(manifest_path),
|
|
"registry_url": registry_url,
|
|
"repositories": results,
|
|
"counts": {
|
|
"total": len(results),
|
|
"synced": sum(1 for item in results if item["status"] == "synced"),
|
|
"registered": sum(1 for item in results if item["status"] == "registered"),
|
|
"errors": sum(1 for item in results if item["status"] == "error"),
|
|
},
|
|
}
|
|
|
|
if args.json:
|
|
print(json.dumps(summary, indent=2, sort_keys=True))
|
|
else:
|
|
for item in results:
|
|
if item["status"] == "synced":
|
|
print(f"synced {item['slug']} snapshot {item['snapshot_id']} ({item['commit']})")
|
|
elif item["status"] == "registered":
|
|
print(f"registered {item['slug']} ({'; '.join(item['warnings'])})")
|
|
else:
|
|
print(f"error {item['slug']}: {item['error']}")
|
|
counts = summary["counts"]
|
|
print(
|
|
f"summary: {counts['total']} repo(s), {counts['synced']} synced, "
|
|
f"{counts['registered']} registered only, {counts['errors']} error(s)"
|
|
)
|
|
if args.strict and (summary["counts"]["errors"] or summary["counts"]["registered"]):
|
|
return 1
|
|
return 0
|
|
|
|
|
|
def _registry_scan_manifest(args: argparse.Namespace) -> int:
|
|
if args.llm_max_runs is not None and args.llm_max_runs < 0:
|
|
print("ERROR --llm-max-runs must be zero or greater", file=sys.stderr)
|
|
return 1
|
|
if args.accept and not args.ingest and not args.dry_run:
|
|
print("ERROR --accept requires --ingest unless --dry-run is set", file=sys.stderr)
|
|
return 1
|
|
if args.accept_policy == "safe":
|
|
if args.accept_review_state and set(args.accept_review_state) != {"accepted"}:
|
|
print("ERROR --accept-review-state requires --accept-policy explicit unless it is only 'accepted'", file=sys.stderr)
|
|
return 1
|
|
if args.accepted_key:
|
|
print("ERROR --accepted-key requires --accept-policy explicit", file=sys.stderr)
|
|
return 1
|
|
if args.previous_from_registry:
|
|
args.previous_source = "registry"
|
|
|
|
manifest_path = args.manifest.resolve()
|
|
manifest = load_yaml(manifest_path)
|
|
if not isinstance(manifest, dict):
|
|
print(f"ERROR {manifest_path}: manifest must be a YAML mapping", file=sys.stderr)
|
|
return 1
|
|
repositories = manifest.get("repositories")
|
|
if not isinstance(repositories, list):
|
|
print(f"ERROR {manifest_path}: manifest requires a repositories list", file=sys.stderr)
|
|
return 1
|
|
|
|
lock_path = _scan_manifest_lock_path(args, manifest_path)
|
|
lock_fd: int | None = None
|
|
if lock_path is not None:
|
|
try:
|
|
lock_fd = _scan_manifest_acquire_lock(lock_path)
|
|
except FileExistsError:
|
|
print(f"ERROR rescan lock already exists: {lock_path}", file=sys.stderr)
|
|
return 5 if args.exit_code_mode == "operational" else 1
|
|
|
|
registry_url = args.registry_url or str(manifest.get("registry_url") or "http://127.0.0.1:8765")
|
|
allowlist = {slug for slug in args.repo_slug if slug}
|
|
selected = [
|
|
item for item in repositories
|
|
if not allowlist or _manifest_repo_slug(item) in allowlist
|
|
]
|
|
llm_runs_used = 0
|
|
results: list[dict[str, Any]] = []
|
|
for item in selected:
|
|
llm_enabled = False
|
|
llm_skip_reason = "deterministic_only"
|
|
if args.llm and not args.deterministic_only:
|
|
if args.llm_max_runs is None or llm_runs_used < args.llm_max_runs:
|
|
llm_enabled = True
|
|
llm_skip_reason = ""
|
|
else:
|
|
llm_skip_reason = "budget_exhausted"
|
|
result = _scan_manifest_repo(
|
|
args,
|
|
registry_url,
|
|
manifest_path.parent,
|
|
manifest_path,
|
|
item,
|
|
llm_enabled=llm_enabled,
|
|
llm_skip_reason=llm_skip_reason,
|
|
)
|
|
if result.get("llm", {}).get("attempted"):
|
|
llm_runs_used += 1
|
|
results.append(result)
|
|
|
|
summary = {
|
|
"apiVersion": "railiance.fabric/v1alpha1",
|
|
"kind": "FabricDiscoveryRescanReport",
|
|
"manifest": str(manifest_path),
|
|
"registry_url": registry_url,
|
|
"generated_at": _utc_now(),
|
|
"profile": args.profile,
|
|
"dry_run": args.dry_run,
|
|
"ingest": args.ingest and not args.dry_run,
|
|
"accept": args.accept and args.ingest and not args.dry_run,
|
|
"allowlist": sorted(allowlist),
|
|
"cache": _scan_manifest_cache_config(args, manifest_path),
|
|
"scanner": {"extractor_version": EXTRACTOR_VERSION},
|
|
"previous_source": args.previous_source,
|
|
"accept_policy": args.accept_policy,
|
|
"llm": {
|
|
"requested": bool(args.llm),
|
|
"deterministic_only": bool(args.deterministic_only or not args.llm),
|
|
"max_runs": args.llm_max_runs,
|
|
"used_runs": llm_runs_used,
|
|
},
|
|
"repositories": results,
|
|
"counts": _scan_manifest_counts(results),
|
|
}
|
|
report_path = _scan_manifest_report_path(args, manifest_path, summary["generated_at"])
|
|
if report_path is not None:
|
|
summary["report_path"] = str(report_path)
|
|
try:
|
|
report_path.parent.mkdir(parents=True, exist_ok=True)
|
|
report_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n", encoding="utf-8")
|
|
except Exception as exc:
|
|
_scan_manifest_release_lock(lock_fd, lock_path)
|
|
print(f"ERROR cannot write rescan report {report_path}: {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
if args.json:
|
|
print(json.dumps(summary, indent=2, sort_keys=True))
|
|
else:
|
|
_print_scan_manifest_summary(summary)
|
|
if summary.get("report_path"):
|
|
print(f"report: {summary['report_path']}")
|
|
exit_code = _scan_manifest_exit_code(summary, args)
|
|
_scan_manifest_release_lock(lock_fd, lock_path)
|
|
return exit_code
|
|
|
|
|
|
def _registry_rescan_status(args: argparse.Namespace) -> int:
|
|
status = _registry_get_checked(args.registry_url, "/status")
|
|
snapshots = status.get("latest_discovery_snapshots", [])
|
|
if not isinstance(snapshots, list):
|
|
snapshots = []
|
|
now = datetime.now(timezone.utc)
|
|
filtered: list[dict[str, Any]] = []
|
|
for item in snapshots:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
if args.review_only and not item.get("review_required"):
|
|
continue
|
|
age_hours = _age_hours(item.get("generated_at"), now)
|
|
item = {**item, "age_hours": age_hours}
|
|
if args.stale_after_hours is not None and (age_hours is None or age_hours <= args.stale_after_hours):
|
|
continue
|
|
filtered.append(item)
|
|
summary = {
|
|
"apiVersion": "railiance.fabric/v1alpha1",
|
|
"kind": "FabricDiscoveryRescanStatus",
|
|
"generated_at": _utc_now(),
|
|
"registry_url": args.registry_url,
|
|
"review_only": args.review_only,
|
|
"stale_after_hours": args.stale_after_hours,
|
|
"counts": {
|
|
"total": len(snapshots),
|
|
"shown": len(filtered),
|
|
"review_required": sum(
|
|
1 for item in snapshots
|
|
if isinstance(item, dict) and item.get("review_required")
|
|
),
|
|
},
|
|
"repositories": filtered,
|
|
}
|
|
if args.json:
|
|
print(json.dumps(summary, indent=2, sort_keys=True))
|
|
else:
|
|
for item in filtered:
|
|
age = item.get("age_hours")
|
|
age_text = f", age {age:.1f}h" if isinstance(age, float) else ""
|
|
diff = item.get("diff_counts", {}) if isinstance(item.get("diff_counts"), dict) else {}
|
|
print(
|
|
f"{item.get('health', 'unknown')} {item.get('repo_slug')} "
|
|
f"({item.get('profile')}, {item.get('commit')}): "
|
|
f"diff +{diff.get('added', 0)}/~{diff.get('changed', 0)}"
|
|
f"/-{diff.get('retired', 0)}/!{diff.get('conflicted', 0)}"
|
|
f", review artifacts {item.get('review_artifact_count', 0)}"
|
|
f", tombstones {item.get('tombstone_count', 0)}"
|
|
f"{age_text}"
|
|
)
|
|
counts = summary["counts"]
|
|
print(
|
|
f"summary: {counts['shown']} shown, {counts['total']} latest discovery snapshot(s), "
|
|
f"{counts['review_required']} review required"
|
|
)
|
|
return 0
|
|
|
|
|
|
def _scan_manifest_repo(
|
|
args: argparse.Namespace,
|
|
registry_url: str,
|
|
manifest_dir: Path,
|
|
manifest_path: Path,
|
|
item: object,
|
|
*,
|
|
llm_enabled: bool,
|
|
llm_skip_reason: str,
|
|
) -> dict[str, Any]:
|
|
if not isinstance(item, dict):
|
|
return {
|
|
"slug": "<invalid>",
|
|
"status": "error",
|
|
"scanned": False,
|
|
"ingested": False,
|
|
"accepted": False,
|
|
"error": "repository entry must be a mapping",
|
|
}
|
|
slug = str(item.get("slug") or "").strip()
|
|
if not slug:
|
|
return {
|
|
"slug": "<missing>",
|
|
"status": "error",
|
|
"scanned": False,
|
|
"ingested": False,
|
|
"accepted": False,
|
|
"error": "repository entry requires slug",
|
|
}
|
|
|
|
result: dict[str, Any] = {
|
|
"slug": slug,
|
|
"status": "pending",
|
|
"scanned": False,
|
|
"ingested": False,
|
|
"accepted": False,
|
|
"change_state": "unknown",
|
|
"llm": {
|
|
"enabled": llm_enabled,
|
|
"attempted": False,
|
|
"status": "pending" if llm_enabled else "skipped",
|
|
"reason": "" if llm_enabled else llm_skip_reason,
|
|
},
|
|
}
|
|
repo_path = _manifest_optional_path(item.get("path"), manifest_dir)
|
|
if repo_path is None:
|
|
result.update({"status": "error", "error": "no repo path configured"})
|
|
return result
|
|
result["path"] = str(repo_path)
|
|
if not repo_path.is_dir():
|
|
result.update({"status": "error", "error": f"repo path not found: {repo_path}"})
|
|
return result
|
|
|
|
commit = str(item.get("commit") or _git_value(repo_path, "rev-parse", "HEAD") or "working-tree")
|
|
connector_manifest = _scan_manifest_connector_manifest(args.connector_manifest, manifest_dir, manifest_path)
|
|
connectors = [
|
|
ConnectorConfig(
|
|
connector_id=connector_id,
|
|
connector_type="fabric_registry",
|
|
source_path=str(connector_manifest),
|
|
)
|
|
for connector_id in args.connector
|
|
]
|
|
|
|
try:
|
|
result["llm"]["attempted"] = llm_enabled
|
|
snapshot = scan_repo(
|
|
ScanOptions(
|
|
repo_path=repo_path,
|
|
repo_slug=slug,
|
|
repo_name=str(item.get("name") or repo_path.name),
|
|
domain=str(item.get("domain") or "") or None,
|
|
commit=commit,
|
|
profile=args.profile,
|
|
deterministic_only=not llm_enabled,
|
|
llm_enabled=llm_enabled,
|
|
llm_config=LLMExtractionConfig(
|
|
provider=args.llm_provider,
|
|
model=args.llm_model,
|
|
temperature=args.llm_temperature,
|
|
max_tokens=args.llm_max_tokens,
|
|
min_confidence=args.llm_min_confidence,
|
|
),
|
|
connectors=connectors,
|
|
)
|
|
)
|
|
previous, previous_metadata = _scan_manifest_previous_snapshot(
|
|
args,
|
|
registry_url,
|
|
manifest_path,
|
|
slug,
|
|
)
|
|
result["previous"] = previous_metadata
|
|
if previous is not None:
|
|
snapshot = reconcile_discovery_snapshots(previous, snapshot)
|
|
except Exception as exc:
|
|
result["status"] = "error"
|
|
result["error"] = f"scan failed: {exc}"
|
|
if llm_enabled:
|
|
result["llm"]["status"] = "failed"
|
|
result["llm"]["reason"] = "scan_failed"
|
|
return result
|
|
|
|
result.update(
|
|
{
|
|
"status": "scanned",
|
|
"scanned": True,
|
|
"commit": snapshot["source"]["commit"],
|
|
"candidate_counts": _candidate_counts(snapshot),
|
|
"diff_counts": _discovery_diff_counts(snapshot),
|
|
"review_artifact_counts": _review_artifact_counts(snapshot),
|
|
"connector_runs": _connector_run_summaries(snapshot),
|
|
"tombstone_count": len(snapshot.get("tombstones", [])),
|
|
"change_state": _scan_manifest_change_state(snapshot, result.get("previous")),
|
|
}
|
|
)
|
|
_set_llm_result(result, snapshot, llm_enabled, llm_skip_reason)
|
|
|
|
output_path = _scan_manifest_output_path(args, manifest_path, slug)
|
|
if output_path is not None:
|
|
try:
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
output_path.write_text(json.dumps(snapshot, indent=2, sort_keys=True) + "\n", encoding="utf-8")
|
|
result["output_path"] = str(output_path)
|
|
except Exception as exc:
|
|
result.update({"status": "error", "error": f"cannot write snapshot: {exc}"})
|
|
return result
|
|
|
|
if args.ingest and args.dry_run:
|
|
result["registry_action"] = "skipped_dry_run"
|
|
return result
|
|
if args.ingest:
|
|
if result["change_state"] == "unchanged" and not args.ingest_unchanged:
|
|
result.update({"status": "skipped_unchanged", "registry_action": "skipped_unchanged"})
|
|
return result
|
|
try:
|
|
repository = _registry_post_checked(
|
|
registry_url,
|
|
"/repositories",
|
|
{
|
|
"slug": slug,
|
|
"name": item.get("name") or repo_path.name,
|
|
"remote_url": item.get("remote_url") or _git_value(repo_path, "config", "--get", "remote.origin.url"),
|
|
"default_branch": item.get("default_branch") or "main",
|
|
"state_hub_repo_id": item.get("state_hub_repo_id"),
|
|
},
|
|
)
|
|
stored = _registry_post_checked(
|
|
registry_url,
|
|
f"/repositories/{slug}/discovery-snapshots",
|
|
snapshot,
|
|
)
|
|
result.update(
|
|
{
|
|
"status": "ingested",
|
|
"repository": {"slug": repository.get("slug")},
|
|
"ingested": True,
|
|
"discovery_snapshot_id": stored["id"],
|
|
}
|
|
)
|
|
if args.accept:
|
|
blockers = _scan_manifest_acceptance_blockers(snapshot, args)
|
|
if blockers:
|
|
result.update(
|
|
{
|
|
"status": "review_required",
|
|
"registry_action": "accept_blocked",
|
|
"acceptance_blockers": blockers,
|
|
}
|
|
)
|
|
return result
|
|
accept_payload: dict[str, object] = {
|
|
"commit": f"discovery:{snapshot['source']['commit']}",
|
|
}
|
|
if args.accepted_key:
|
|
accept_payload["accepted_keys"] = args.accepted_key
|
|
if args.accept_review_state is not None:
|
|
accept_payload["accept_review_states"] = args.accept_review_state
|
|
accepted = _registry_post_checked(
|
|
registry_url,
|
|
f"/repositories/{slug}/discovery-snapshots/{stored['id']}/accept",
|
|
accept_payload,
|
|
)
|
|
graph_snapshot = accepted["graph_snapshot"]
|
|
result.update(
|
|
{
|
|
"status": "accepted",
|
|
"accepted": True,
|
|
"accepted_graph_snapshot_id": graph_snapshot["id"],
|
|
"accepted_graph_commit": graph_snapshot["commit"],
|
|
}
|
|
)
|
|
except RegistryRequestError as exc:
|
|
result.update({"status": "error", "error": str(exc)})
|
|
return result
|
|
|
|
|
|
def _scan_manifest_counts(results: list[dict[str, Any]]) -> dict[str, int]:
|
|
return {
|
|
"total": len(results),
|
|
"scanned": sum(1 for item in results if item.get("scanned")),
|
|
"baseline": sum(1 for item in results if item.get("change_state") == "baseline"),
|
|
"unchanged": sum(1 for item in results if item.get("change_state") == "unchanged"),
|
|
"changed": sum(1 for item in results if _scan_manifest_has_diff(item)),
|
|
"retired": sum(_int_from_nested(item, "diff_counts", "retired") for item in results),
|
|
"conflicted": sum(_int_from_nested(item, "diff_counts", "conflicted") for item in results),
|
|
"review_required": sum(1 for item in results if _scan_manifest_needs_review(item)),
|
|
"llm_skipped": sum(1 for item in results if item.get("llm", {}).get("status") == "skipped"),
|
|
"llm_failed": sum(1 for item in results if item.get("llm", {}).get("status") == "failed"),
|
|
"ingested": sum(1 for item in results if item.get("ingested")),
|
|
"accepted": sum(1 for item in results if item.get("accepted")),
|
|
"errors": sum(1 for item in results if item.get("status") == "error"),
|
|
}
|
|
|
|
|
|
def _print_scan_manifest_summary(summary: dict[str, Any]) -> None:
|
|
for item in summary["repositories"]:
|
|
slug = item["slug"]
|
|
if item["status"] == "error":
|
|
scanned = " after scan" if item.get("scanned") else ""
|
|
print(f"error{scanned} {slug}: {item['error']}")
|
|
continue
|
|
counts = item.get("candidate_counts", {})
|
|
diff = item.get("diff_counts", {})
|
|
action = "accepted" if item.get("accepted") else "ingested" if item.get("ingested") else "scanned"
|
|
if item.get("registry_action") == "skipped_dry_run":
|
|
action = "dry-run scanned"
|
|
elif item.get("registry_action") == "skipped_unchanged":
|
|
action = "unchanged"
|
|
elif item.get("registry_action") == "accept_blocked":
|
|
action = "review required"
|
|
previous = item.get("previous") if isinstance(item.get("previous"), dict) else {}
|
|
previous_summary = f", previous {previous.get('source')}" if previous.get("source") else ""
|
|
print(
|
|
f"{action} {slug} ({item.get('commit', 'working-tree')}): "
|
|
f"{counts.get('nodes', 0)} node(s), {counts.get('edges', 0)} edge(s), "
|
|
f"{counts.get('attributes', 0)} attribute(s), "
|
|
f"diff +{diff.get('added', 0)}/~{diff.get('changed', 0)}"
|
|
f"/-{diff.get('retired', 0)}/!{diff.get('conflicted', 0)}"
|
|
f"{previous_summary}"
|
|
)
|
|
counts = summary["counts"]
|
|
print(
|
|
f"summary: {counts['total']} repo(s), {counts['scanned']} scanned, "
|
|
f"{counts['baseline']} baseline, {counts['unchanged']} unchanged, "
|
|
f"{counts['changed']} changed, {counts['retired']} retired, "
|
|
f"{counts['conflicted']} conflicted, {counts['review_required']} review required, "
|
|
f"{counts['llm_skipped']} LLM skipped, "
|
|
f"{counts['llm_failed']} LLM failed, {counts['accepted']} accepted, "
|
|
f"{counts['errors']} error(s)"
|
|
)
|
|
|
|
|
|
def _manifest_repo_slug(item: object) -> str:
|
|
if not isinstance(item, dict):
|
|
return "<invalid>"
|
|
return str(item.get("slug") or "").strip()
|
|
|
|
|
|
def _scan_manifest_connector_manifest(
|
|
connector_manifest: Path | None,
|
|
manifest_dir: Path,
|
|
manifest_path: Path,
|
|
) -> Path:
|
|
if connector_manifest is None:
|
|
return manifest_path
|
|
path = connector_manifest.expanduser()
|
|
return path.resolve() if path.is_absolute() else (manifest_dir / path).resolve()
|
|
|
|
|
|
def _scan_manifest_cache_config(args: argparse.Namespace, manifest_path: Path) -> dict[str, object]:
|
|
cache_dir = _scan_manifest_cache_dir(args, manifest_path)
|
|
if cache_dir is None:
|
|
return {"enabled": False}
|
|
return {
|
|
"enabled": True,
|
|
"cache_dir": str(cache_dir),
|
|
"snapshot_dir": str(cache_dir / DEFAULT_SNAPSHOT_DIR),
|
|
"report_dir": str(cache_dir / DEFAULT_REPORT_DIR),
|
|
}
|
|
|
|
|
|
def _scan_manifest_cache_dir(args: argparse.Namespace, manifest_path: Path) -> Path | None:
|
|
if args.no_cache:
|
|
return None
|
|
cache_dir = args.cache_dir or Path(DEFAULT_DISCOVERY_DIR)
|
|
if cache_dir.is_absolute():
|
|
return cache_dir
|
|
return (manifest_path.parent / cache_dir).resolve()
|
|
|
|
|
|
def _scan_manifest_lock_path(args: argparse.Namespace, manifest_path: Path) -> Path | None:
|
|
if args.no_lock:
|
|
return None
|
|
if args.lock_file:
|
|
path = args.lock_file.expanduser()
|
|
return path.resolve() if path.is_absolute() else (manifest_path.parent / path).resolve()
|
|
cache_dir = _scan_manifest_cache_dir(args, manifest_path)
|
|
if cache_dir is None:
|
|
return None
|
|
return cache_dir / "rescan.lock"
|
|
|
|
|
|
def _scan_manifest_acquire_lock(lock_path: Path) -> int:
|
|
lock_path.parent.mkdir(parents=True, exist_ok=True)
|
|
fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
|
|
os.write(fd, f"pid={os.getpid()}\ncreated_at={_utc_now()}\n".encode("utf-8"))
|
|
return fd
|
|
|
|
|
|
def _scan_manifest_release_lock(lock_fd: int | None, lock_path: Path | None) -> None:
|
|
if lock_fd is not None:
|
|
os.close(lock_fd)
|
|
if lock_path is not None:
|
|
try:
|
|
lock_path.unlink()
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
|
|
def _scan_manifest_output_path(args: argparse.Namespace, manifest_path: Path, slug: str) -> Path | None:
|
|
if args.output_dir:
|
|
return _manifest_discovery_snapshot_path(args.output_dir, slug, args.profile)
|
|
cache_dir = _scan_manifest_cache_dir(args, manifest_path)
|
|
if cache_dir is None:
|
|
return None
|
|
return _manifest_discovery_snapshot_path(cache_dir / DEFAULT_SNAPSHOT_DIR, slug, args.profile)
|
|
|
|
|
|
def _scan_manifest_report_path(args: argparse.Namespace, manifest_path: Path, generated_at: str) -> Path | None:
|
|
if args.report_output:
|
|
path = args.report_output.expanduser()
|
|
return path.resolve() if path.is_absolute() else (manifest_path.parent / path).resolve()
|
|
cache_dir = _scan_manifest_cache_dir(args, manifest_path)
|
|
if cache_dir is None:
|
|
return None
|
|
timestamp = _slugify(generated_at.replace(":", "").replace("+", ""))
|
|
return cache_dir / DEFAULT_REPORT_DIR / f"{timestamp}-{_slugify(args.profile)}.rescan-report.json"
|
|
|
|
|
|
def _scan_manifest_previous_snapshot(
|
|
args: argparse.Namespace,
|
|
registry_url: str,
|
|
manifest_path: Path,
|
|
slug: str,
|
|
) -> tuple[dict[str, Any] | None, dict[str, Any]]:
|
|
if args.previous_source == "none":
|
|
return None, {"source": "none", "found": False}
|
|
if args.previous_source == "registry":
|
|
try:
|
|
previous = _registry_get_checked(
|
|
registry_url,
|
|
f"/repositories/{quote(slug)}/discovery-snapshots/latest?profile={quote(args.profile)}",
|
|
)
|
|
except RegistryRequestError as exc:
|
|
if exc.status_code == 404:
|
|
return None, {"source": "registry", "found": False}
|
|
raise
|
|
snapshot = previous.get("snapshot") if isinstance(previous.get("snapshot"), dict) else None
|
|
if snapshot is None:
|
|
raise ValueError(f"registry latest discovery snapshot for {slug} has no snapshot object")
|
|
return snapshot, {
|
|
"source": "registry",
|
|
"found": True,
|
|
"discovery_snapshot_id": previous.get("id"),
|
|
"commit": previous.get("commit"),
|
|
"profile": previous.get("profile"),
|
|
}
|
|
|
|
previous_dir = args.previous_dir
|
|
if previous_dir is None and not args.no_cache:
|
|
cache_dir = _scan_manifest_cache_dir(args, manifest_path)
|
|
previous_dir = cache_dir / DEFAULT_SNAPSHOT_DIR if cache_dir is not None else None
|
|
if previous_dir is None:
|
|
return None, {"source": "dir", "found": False}
|
|
previous_path = _manifest_discovery_snapshot_path(previous_dir, slug, args.profile)
|
|
if not previous_path.is_file():
|
|
return None, {"source": "dir", "found": False, "path": str(previous_path)}
|
|
previous = json.loads(previous_path.read_text(encoding="utf-8"))
|
|
if not isinstance(previous, dict):
|
|
raise ValueError(f"previous snapshot must be a JSON object: {previous_path}")
|
|
return previous, {"source": "dir", "found": True, "path": str(previous_path)}
|
|
|
|
|
|
def _scan_manifest_change_state(snapshot: dict[str, Any], previous_metadata: object) -> str:
|
|
previous = previous_metadata if isinstance(previous_metadata, dict) else {}
|
|
if not previous.get("found"):
|
|
return "baseline"
|
|
return "changed" if any(_discovery_diff_counts(snapshot).values()) else "unchanged"
|
|
|
|
|
|
def _scan_manifest_acceptance_blockers(snapshot: dict[str, Any], args: argparse.Namespace) -> list[str]:
|
|
if args.accept_policy == "explicit":
|
|
return []
|
|
blockers: list[str] = []
|
|
diff = _discovery_diff_counts(snapshot)
|
|
if diff["conflicted"]:
|
|
blockers.append(f"{diff['conflicted']} conflicted candidate(s)")
|
|
tombstone_count = len(snapshot.get("tombstones", []))
|
|
if tombstone_count:
|
|
blockers.append(f"{tombstone_count} tombstone(s)")
|
|
review_artifacts = _review_artifact_counts(snapshot)
|
|
low_confidence = review_artifacts.get("llm_low_confidence", 0)
|
|
if low_confidence:
|
|
blockers.append(f"{low_confidence} low-confidence LLM artifact(s)")
|
|
if args.accept_review_state is not None and set(args.accept_review_state) != {"accepted"}:
|
|
blockers.append("non-accepted review states requested")
|
|
return blockers
|
|
|
|
|
|
def _scan_manifest_needs_review(item: dict[str, Any]) -> bool:
|
|
if item.get("status") == "review_required":
|
|
return True
|
|
if _int_from_nested(item, "diff_counts", "conflicted"):
|
|
return True
|
|
if int(item.get("tombstone_count") or 0):
|
|
return True
|
|
review_artifacts = item.get("review_artifact_counts")
|
|
return isinstance(review_artifacts, dict) and any(int(value or 0) for value in review_artifacts.values())
|
|
|
|
|
|
def _scan_manifest_exit_code(summary: dict[str, Any], args: argparse.Namespace) -> int:
|
|
counts = summary.get("counts") if isinstance(summary.get("counts"), dict) else {}
|
|
errors = int(counts.get("errors", 0) or 0)
|
|
if args.strict and errors:
|
|
return 1
|
|
if args.exit_code_mode != "operational":
|
|
return 0
|
|
if errors:
|
|
return 4
|
|
if int(counts.get("review_required", 0) or 0):
|
|
return 3
|
|
if int(counts.get("changed", 0) or 0) or int(counts.get("baseline", 0) or 0):
|
|
return 2
|
|
return 0
|
|
|
|
|
|
def _manifest_discovery_snapshot_path(base_dir: Path, slug: str, profile: str) -> Path:
|
|
slug_part = _slugify(slug)
|
|
raw_slug_part = slug.strip().lower()
|
|
if slug_part != raw_slug_part:
|
|
fingerprint = hashlib.sha256(slug.encode("utf-8")).hexdigest()[:8]
|
|
slug_part = f"{slug_part}-{fingerprint}"
|
|
return base_dir.resolve() / f"{slug_part}-{_slugify(profile)}.discovery.json"
|
|
|
|
|
|
def _candidate_counts(snapshot: dict[str, Any]) -> dict[str, int]:
|
|
candidates = snapshot.get("candidates") if isinstance(snapshot.get("candidates"), dict) else {}
|
|
return {
|
|
"nodes": len(candidates.get("nodes", [])),
|
|
"edges": len(candidates.get("edges", [])),
|
|
"attributes": len(candidates.get("attributes", [])),
|
|
}
|
|
|
|
|
|
def _discovery_diff_counts(snapshot: dict[str, Any]) -> dict[str, int]:
|
|
reconciliation = snapshot.get("reconciliation") if isinstance(snapshot.get("reconciliation"), dict) else {}
|
|
diff = reconciliation.get("diff") if isinstance(reconciliation.get("diff"), dict) else {}
|
|
return {
|
|
"added": len(diff.get("added", [])),
|
|
"changed": len(diff.get("changed", [])),
|
|
"retired": len(diff.get("retired", [])),
|
|
"conflicted": len(diff.get("conflicted", [])),
|
|
}
|
|
|
|
|
|
def _review_artifact_counts(snapshot: dict[str, Any]) -> dict[str, int]:
|
|
counts: dict[str, int] = {}
|
|
for artifact in snapshot.get("review_artifacts", []):
|
|
if not isinstance(artifact, dict):
|
|
continue
|
|
artifact_type = str(artifact.get("artifact_type") or "unknown")
|
|
counts[artifact_type] = counts.get(artifact_type, 0) + 1
|
|
return counts
|
|
|
|
|
|
def _connector_run_summaries(snapshot: dict[str, Any]) -> list[dict[str, Any]]:
|
|
summaries: list[dict[str, Any]] = []
|
|
for run in snapshot.get("connector_runs", []):
|
|
if not isinstance(run, dict):
|
|
continue
|
|
summary = {
|
|
"connector_id": run.get("connector_id"),
|
|
"status": run.get("status"),
|
|
"candidate_counts": run.get("candidate_counts", {}),
|
|
}
|
|
if run.get("message"):
|
|
summary["message"] = run.get("message")
|
|
summaries.append(summary)
|
|
return summaries
|
|
|
|
|
|
def _set_llm_result(
|
|
result: dict[str, Any],
|
|
snapshot: dict[str, Any],
|
|
llm_enabled: bool,
|
|
llm_skip_reason: str,
|
|
) -> None:
|
|
if not llm_enabled:
|
|
result["llm"] = {
|
|
"enabled": False,
|
|
"attempted": False,
|
|
"status": "skipped",
|
|
"reason": llm_skip_reason,
|
|
}
|
|
return
|
|
artifact_counts = _review_artifact_counts(snapshot)
|
|
failure_count = sum(
|
|
count for artifact_type, count in artifact_counts.items()
|
|
if artifact_type in {"llm_execution_error", "llm_output_invalid"}
|
|
)
|
|
result["llm"] = {
|
|
"enabled": True,
|
|
"attempted": True,
|
|
"status": "failed" if failure_count else "used",
|
|
"failure_artifacts": failure_count,
|
|
}
|
|
|
|
|
|
def _scan_manifest_has_diff(item: dict[str, Any]) -> bool:
|
|
diff = item.get("diff_counts") if isinstance(item.get("diff_counts"), dict) else {}
|
|
return any(int(diff.get(key, 0)) for key in ("added", "changed", "retired", "conflicted"))
|
|
|
|
|
|
def _int_from_nested(item: dict[str, Any], object_key: str, value_key: str) -> int:
|
|
nested = item.get(object_key) if isinstance(item.get(object_key), dict) else {}
|
|
try:
|
|
return int(nested.get(value_key, 0))
|
|
except (TypeError, ValueError):
|
|
return 0
|
|
|
|
|
|
def _sync_manifest_repo(registry_url: str, manifest_dir: Path, item: object) -> dict[str, object]:
|
|
if not isinstance(item, dict):
|
|
return {"slug": "<invalid>", "status": "error", "error": "repository entry must be a mapping"}
|
|
slug = str(item.get("slug") or "").strip()
|
|
if not slug:
|
|
return {"slug": "<missing>", "status": "error", "error": "repository entry requires slug"}
|
|
|
|
repo_path = _manifest_optional_path(item.get("path"), manifest_dir)
|
|
result: dict[str, object] = {"slug": slug, "status": "registered", "warnings": []}
|
|
try:
|
|
repository = _registry_post_checked(
|
|
registry_url,
|
|
"/repositories",
|
|
{
|
|
"slug": slug,
|
|
"name": item.get("name") or (repo_path.name if repo_path else slug),
|
|
"remote_url": item.get("remote_url") or _git_value(repo_path, "config", "--get", "remote.origin.url"),
|
|
"default_branch": item.get("default_branch") or "main",
|
|
"state_hub_repo_id": item.get("state_hub_repo_id"),
|
|
},
|
|
)
|
|
result["repository"] = repository
|
|
except RegistryRequestError as exc:
|
|
return {"slug": slug, "status": "error", "error": str(exc), "warnings": []}
|
|
|
|
if repo_path is None:
|
|
result["warnings"].append("no repo path configured")
|
|
return result
|
|
if not repo_path.is_dir():
|
|
result["warnings"].append(f"repo path not found: {repo_path}")
|
|
return result
|
|
|
|
graph_paths = _manifest_paths(item.get("declaration_paths"), manifest_dir) or [repo_path]
|
|
if not any(declaration_files(path) for path in graph_paths):
|
|
result["warnings"].append("no Fabric declarations found")
|
|
try:
|
|
_ingest_manifest_sboms(registry_url, manifest_dir, slug, item, result)
|
|
except RegistryRequestError as exc:
|
|
return {"slug": slug, "status": "error", "error": str(exc), "warnings": result["warnings"]}
|
|
return result
|
|
|
|
report = validate_roots(graph_paths)
|
|
if report.errors:
|
|
return {
|
|
"slug": slug,
|
|
"status": "error",
|
|
"error": report.summary(),
|
|
"warnings": [diagnostic.format() for diagnostic in report.diagnostics],
|
|
}
|
|
|
|
graph = build_graph(graph_paths)
|
|
if graph.load_errors:
|
|
return {
|
|
"slug": slug,
|
|
"status": "error",
|
|
"error": "; ".join(f"{path}: {message}" for path, message in graph.load_errors),
|
|
"warnings": [],
|
|
}
|
|
try:
|
|
snapshot = _registry_post_checked(
|
|
registry_url,
|
|
f"/repositories/{slug}/snapshots",
|
|
{
|
|
"commit": item.get("commit") or _git_value(repo_path, "rev-parse", "HEAD") or "working-tree",
|
|
"generated_at": _utc_now(),
|
|
"graph": graph.to_export(),
|
|
},
|
|
)
|
|
result.update(
|
|
{
|
|
"status": "synced",
|
|
"snapshot_id": snapshot["id"],
|
|
"commit": snapshot["commit"],
|
|
}
|
|
)
|
|
_ingest_manifest_sboms(registry_url, manifest_dir, slug, item, result)
|
|
except RegistryRequestError as exc:
|
|
return {"slug": slug, "status": "error", "error": str(exc), "warnings": result["warnings"]}
|
|
return result
|
|
|
|
|
|
def _ingest_manifest_sboms(
|
|
registry_url: str,
|
|
manifest_dir: Path,
|
|
slug: str,
|
|
item: dict[str, object],
|
|
result: dict[str, object],
|
|
) -> None:
|
|
sbom_paths = _manifest_paths(item.get("sboms"), manifest_dir)
|
|
single_sbom = _manifest_optional_path(item.get("sbom"), manifest_dir)
|
|
if single_sbom:
|
|
sbom_paths.insert(0, single_sbom)
|
|
ingested: list[dict[str, object]] = []
|
|
for sbom_path in sbom_paths:
|
|
if not sbom_path.is_file():
|
|
result.setdefault("warnings", []).append(f"SBOM not found: {sbom_path}")
|
|
continue
|
|
payload = load_yaml(sbom_path)
|
|
if not isinstance(payload, dict):
|
|
result.setdefault("warnings", []).append(f"SBOM must be an object: {sbom_path}")
|
|
continue
|
|
ingested.append(
|
|
_registry_post_checked(
|
|
registry_url,
|
|
f"/repositories/{slug}/libraries/cyclonedx",
|
|
payload,
|
|
)
|
|
)
|
|
if ingested:
|
|
result["libraries"] = ingested
|
|
|
|
|
|
def _registry_ingest_cyclonedx(args: argparse.Namespace) -> int:
|
|
payload = load_yaml(args.sbom)
|
|
if not isinstance(payload, dict):
|
|
print(f"ERROR {args.sbom}: CycloneDX SBOM must be a mapping/object", file=sys.stderr)
|
|
return 1
|
|
result = _registry_post(
|
|
args.registry_url,
|
|
f"/repositories/{args.repo_slug}/libraries/cyclonedx",
|
|
payload,
|
|
)
|
|
if args.json:
|
|
print(json.dumps(result, indent=2, sort_keys=True))
|
|
else:
|
|
print(f"ingested {result['component_count']} library component(s) for {result['repo_slug']}")
|
|
return 0
|
|
|
|
|
|
def _registry_ingest_discovery(args: argparse.Namespace) -> int:
|
|
payload = json.loads(args.snapshot.read_text(encoding="utf-8"))
|
|
if not isinstance(payload, dict):
|
|
print(f"ERROR {args.snapshot}: discovery snapshot must be a JSON object", file=sys.stderr)
|
|
return 1
|
|
source = payload.get("source") if isinstance(payload.get("source"), dict) else {}
|
|
repo_slug = args.repo_slug or str(source.get("repo_slug") or "").strip()
|
|
if not repo_slug:
|
|
print("ERROR discovery snapshot source.repo_slug is required unless --repo-slug is provided", file=sys.stderr)
|
|
return 1
|
|
result = _registry_post(
|
|
args.registry_url,
|
|
f"/repositories/{repo_slug}/discovery-snapshots",
|
|
payload,
|
|
)
|
|
if args.json:
|
|
print(json.dumps(result, indent=2, sort_keys=True))
|
|
else:
|
|
candidates = result.get("snapshot", {}).get("candidates", {})
|
|
counts = {
|
|
"nodes": len(candidates.get("nodes", [])),
|
|
"edges": len(candidates.get("edges", [])),
|
|
"attributes": len(candidates.get("attributes", [])),
|
|
}
|
|
print(
|
|
f"ingested discovery snapshot {result['id']} for {result['repo_slug']} "
|
|
f"({result['profile']}, {result['commit']}): "
|
|
f"{counts['nodes']} node candidate(s), {counts['edges']} edge candidate(s), "
|
|
f"{counts['attributes']} attribute candidate(s)"
|
|
)
|
|
return 0
|
|
|
|
|
|
def _registry_accept_discovery(args: argparse.Namespace) -> int:
|
|
payload = {
|
|
"accepted_keys": args.accepted_key,
|
|
"commit": args.commit,
|
|
}
|
|
if args.accept_review_state is not None:
|
|
payload["accept_review_states"] = args.accept_review_state
|
|
result = _registry_post(
|
|
args.registry_url,
|
|
f"/repositories/{args.repo_slug}/discovery-snapshots/{args.discovery_snapshot_id}/accept",
|
|
payload,
|
|
)
|
|
if args.json:
|
|
print(json.dumps(result, indent=2, sort_keys=True))
|
|
else:
|
|
graph_snapshot = result["graph_snapshot"]
|
|
print(
|
|
f"accepted discovery snapshot {args.discovery_snapshot_id} for {args.repo_slug}; "
|
|
f"graph snapshot {graph_snapshot['id']} stored for {graph_snapshot['commit']}"
|
|
)
|
|
return 0
|
|
|
|
|
|
def _registry_export_reset_archive(args: argparse.Namespace) -> int:
|
|
try:
|
|
archive = _registry_get_checked(args.registry_url, "/exports/reset-archive")
|
|
archive_sha256 = _write_json_archive(args.output, archive, overwrite=args.overwrite)
|
|
except (RegistryRequestError, OSError) as exc:
|
|
print(f"ERROR {exc}", file=sys.stderr)
|
|
return 1
|
|
metadata = {
|
|
"archive": str(args.output),
|
|
"archive_sha256": archive_sha256,
|
|
"counts": archive.get("counts", {}),
|
|
}
|
|
if args.json:
|
|
print(json.dumps(metadata, indent=2, sort_keys=True))
|
|
else:
|
|
print(f"wrote reset archive {args.output}")
|
|
print(f"archive sha256 {archive_sha256}")
|
|
return 0
|
|
|
|
|
|
def _registry_reset_graph_data(args: argparse.Namespace) -> int:
|
|
if args.confirm != RESET_CONFIRMATION_TOKEN:
|
|
print(f"ERROR --confirm must equal {RESET_CONFIRMATION_TOKEN}", file=sys.stderr)
|
|
return 1
|
|
try:
|
|
archive = _registry_get_checked(args.registry_url, "/exports/reset-archive")
|
|
archive_sha256 = _write_json_archive(args.archive, archive, overwrite=args.overwrite_archive)
|
|
result = _registry_post_checked(
|
|
args.registry_url,
|
|
"/admin/reset-graph-data",
|
|
{
|
|
"confirm": args.confirm,
|
|
"reason": args.reason,
|
|
"archive_path": str(args.archive),
|
|
"archive_sha256": archive_sha256,
|
|
},
|
|
)
|
|
except (RegistryRequestError, OSError) as exc:
|
|
print(f"ERROR {exc}", file=sys.stderr)
|
|
return 1
|
|
if args.json:
|
|
print(json.dumps(result, indent=2, sort_keys=True))
|
|
else:
|
|
print(f"wrote reset archive {args.archive}")
|
|
print(f"archive sha256 {archive_sha256}")
|
|
print(f"reset event {result['id']} recorded")
|
|
print(f"dropped {json.dumps(result.get('dropped_counts', {}), sort_keys=True)}")
|
|
print(f"preserved {result.get('repositories_preserved', 0)} repository registration(s)")
|
|
return 0
|
|
|
|
|
|
def _scan_repo(args: argparse.Namespace) -> int:
|
|
snapshot = scan_repo(
|
|
ScanOptions(
|
|
repo_path=args.path,
|
|
repo_slug=args.repo_slug,
|
|
repo_name=args.repo_name,
|
|
domain=args.domain,
|
|
commit=args.commit,
|
|
profile=args.profile,
|
|
deterministic_only=not args.llm,
|
|
llm_enabled=args.llm,
|
|
llm_config=LLMExtractionConfig(
|
|
provider=args.llm_provider,
|
|
model=args.llm_model,
|
|
temperature=args.llm_temperature,
|
|
max_tokens=args.llm_max_tokens,
|
|
min_confidence=args.llm_min_confidence,
|
|
),
|
|
connectors=[
|
|
ConnectorConfig(
|
|
connector_id=connector_id,
|
|
connector_type="fabric_registry",
|
|
source_path=str(args.connector_manifest),
|
|
)
|
|
for connector_id in args.connector
|
|
],
|
|
)
|
|
)
|
|
if args.previous_snapshot:
|
|
try:
|
|
previous = json.loads(args.previous_snapshot.read_text(encoding="utf-8"))
|
|
except Exception as exc:
|
|
print(f"ERROR {args.previous_snapshot}: cannot read previous snapshot: {exc}", file=sys.stderr)
|
|
return 1
|
|
if not isinstance(previous, dict):
|
|
print(f"ERROR {args.previous_snapshot}: previous snapshot must be a JSON object", file=sys.stderr)
|
|
return 1
|
|
snapshot = reconcile_discovery_snapshots(previous, snapshot)
|
|
payload = json.dumps(snapshot, indent=2, sort_keys=True)
|
|
if args.output:
|
|
args.output.parent.mkdir(parents=True, exist_ok=True)
|
|
args.output.write_text(payload + "\n", encoding="utf-8")
|
|
if args.json:
|
|
print(payload)
|
|
return 0
|
|
|
|
candidates = snapshot["candidates"]
|
|
review_count = len(snapshot.get("review_artifacts", []))
|
|
review_summary = f", {review_count} review artifact(s)" if review_count else ""
|
|
connector_count = len(snapshot.get("connector_runs", []))
|
|
connector_summary = f", {connector_count} connector run(s)" if connector_count else ""
|
|
reconciliation = snapshot.get("reconciliation", {})
|
|
diff = reconciliation.get("diff") if isinstance(reconciliation, dict) else None
|
|
diff_summary = ""
|
|
if isinstance(diff, dict):
|
|
diff_summary = (
|
|
f", diff +{len(diff.get('added', []))}"
|
|
f"/~{len(diff.get('changed', []))}"
|
|
f"/-{len(diff.get('retired', []))}"
|
|
f"/!{len(diff.get('conflicted', []))}"
|
|
)
|
|
mode = "dry-run " if args.dry_run else ""
|
|
print(
|
|
f"{mode}scan {snapshot['source']['repo_slug']} "
|
|
f"({snapshot['source']['commit']}): "
|
|
f"{len(candidates['nodes'])} node(s), "
|
|
f"{len(candidates['edges'])} edge(s), "
|
|
f"{len(candidates['attributes'])} attribute(s), "
|
|
f"{len(snapshot['replacement_scopes'])} replacement scope(s)"
|
|
f"{review_summary}"
|
|
f"{connector_summary}"
|
|
f"{diff_summary}"
|
|
)
|
|
if args.output:
|
|
print(f"wrote {args.output}")
|
|
return 0
|
|
|
|
|
|
class RegistryRequestError(Exception):
|
|
def __init__(self, message: str, status_code: int | None = None) -> None:
|
|
super().__init__(message)
|
|
self.status_code = status_code
|
|
|
|
|
|
def _write_json_archive(path: Path, archive: dict[str, object], *, overwrite: bool) -> str:
|
|
if path.exists() and not overwrite:
|
|
raise OSError(f"archive already exists: {path} (use --overwrite or --overwrite-archive)")
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
data = json.dumps(archive, indent=2, sort_keys=True).encode("utf-8")
|
|
path.write_bytes(data)
|
|
return hashlib.sha256(data).hexdigest()
|
|
|
|
|
|
def _registry_post(registry_url: str, path: str, payload: dict[str, object]) -> dict[str, object]:
|
|
try:
|
|
return _registry_post_checked(registry_url, path, payload)
|
|
except RegistryRequestError as exc:
|
|
print(f"ERROR {exc}", file=sys.stderr)
|
|
raise SystemExit(1) from exc
|
|
|
|
|
|
def _registry_post_checked(registry_url: str, path: str, payload: dict[str, object]) -> dict[str, object]:
|
|
data = json.dumps({key: value for key, value in payload.items() if value is not None}).encode("utf-8")
|
|
request = urllib.request.Request(
|
|
registry_url.rstrip("/") + path,
|
|
data=data,
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST",
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=15) as response:
|
|
body = json.loads(response.read())
|
|
except urllib.error.HTTPError as exc:
|
|
detail = exc.read().decode("utf-8", errors="replace")
|
|
raise RegistryRequestError(f"registry request failed ({exc.code}): {detail}", status_code=exc.code) from exc
|
|
except urllib.error.URLError as exc:
|
|
raise RegistryRequestError(f"cannot reach registry at {registry_url}: {exc}") from exc
|
|
if not isinstance(body, dict):
|
|
raise RegistryRequestError("registry returned a non-object response")
|
|
return body
|
|
|
|
|
|
def _registry_get_checked(registry_url: str, path: str) -> dict[str, object]:
|
|
request = urllib.request.Request(registry_url.rstrip("/") + path, method="GET")
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=15) as response:
|
|
body = json.loads(response.read())
|
|
except urllib.error.HTTPError as exc:
|
|
detail = exc.read().decode("utf-8", errors="replace")
|
|
raise RegistryRequestError(f"registry request failed ({exc.code}): {detail}", status_code=exc.code) from exc
|
|
except urllib.error.URLError as exc:
|
|
raise RegistryRequestError(f"cannot reach registry at {registry_url}: {exc}") from exc
|
|
if not isinstance(body, dict):
|
|
raise RegistryRequestError("registry returned a non-object response")
|
|
return body
|
|
|
|
|
|
def _manifest_optional_path(value: object, manifest_dir: Path) -> Path | None:
|
|
if not isinstance(value, str) or not value.strip():
|
|
return None
|
|
path = Path(value).expanduser()
|
|
return path if path.is_absolute() else (manifest_dir / path).resolve()
|
|
|
|
|
|
def _manifest_paths(value: object, manifest_dir: Path) -> list[Path]:
|
|
if value is None:
|
|
return []
|
|
if isinstance(value, str):
|
|
values = [value]
|
|
elif isinstance(value, list):
|
|
values = value
|
|
else:
|
|
return []
|
|
paths: list[Path] = []
|
|
for item in values:
|
|
path = _manifest_optional_path(item, manifest_dir)
|
|
if path is not None:
|
|
paths.append(path)
|
|
return paths
|
|
|
|
|
|
def _primary_repo_path(paths: list[Path]) -> Path:
|
|
if not paths:
|
|
return Path(".").resolve()
|
|
path = paths[0].resolve()
|
|
return path.parent if path.is_file() else path
|
|
|
|
|
|
def _slugify(value: str) -> str:
|
|
return re.sub(r"-+", "-", re.sub(r"[^a-z0-9]", "-", value.lower())).strip("-") or "repo"
|
|
|
|
|
|
def _git_value(repo_path: Path | None, *args: str) -> str | None:
|
|
if repo_path is None:
|
|
return None
|
|
try:
|
|
result = subprocess.run(
|
|
["git", *args],
|
|
cwd=repo_path,
|
|
check=False,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5,
|
|
)
|
|
except (OSError, subprocess.TimeoutExpired):
|
|
return None
|
|
if result.returncode != 0:
|
|
return None
|
|
value = result.stdout.strip()
|
|
return value or None
|
|
|
|
|
|
def _utc_now() -> str:
|
|
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
|
|
|
|
|
|
def _age_hours(value: object, now: datetime) -> float | None:
|
|
if not isinstance(value, str) or not value:
|
|
return None
|
|
try:
|
|
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
|
|
except ValueError:
|
|
return None
|
|
if parsed.tzinfo is None:
|
|
parsed = parsed.replace(tzinfo=timezone.utc)
|
|
return max(0.0, (now - parsed).total_seconds() / 3600)
|
|
|
|
|
|
def _load_graph_or_exit(paths: list[Path]) -> FabricGraph:
|
|
graph = build_graph(paths)
|
|
if graph.load_errors:
|
|
for path, message in graph.load_errors:
|
|
print(f"ERROR {path}: {message}", file=sys.stderr)
|
|
raise SystemExit(1)
|
|
return graph
|
|
|
|
|
|
def _load_json_file(path: Path) -> dict[str, Any]:
|
|
payload = json.loads(path.read_text(encoding="utf-8"))
|
|
if not isinstance(payload, dict):
|
|
raise ValueError(f"JSON file must contain an object: {path}")
|
|
return payload
|
|
|
|
|
|
def _print_providers(graph: FabricGraph, capability: str) -> None:
|
|
providers = graph.providers(capability)
|
|
if not providers:
|
|
print(f"no providers found for {capability}")
|
|
return
|
|
print("provider_id\tservice_id\tlifecycle\tenvironments\tinterfaces")
|
|
for provider in providers:
|
|
spec = provider.spec
|
|
print(
|
|
"\t".join(
|
|
[
|
|
provider.id,
|
|
str(spec.get("service_id", "")),
|
|
str(spec.get("lifecycle", "")),
|
|
",".join(spec.get("environments", [])),
|
|
",".join(spec.get("interface_ids", [])),
|
|
]
|
|
)
|
|
)
|
|
|
|
|
|
def _print_consumers(
|
|
graph: FabricGraph,
|
|
target: str,
|
|
matches: object | None = None,
|
|
) -> None:
|
|
consumer_matches = graph.consumers(target) if matches is None else list(matches)
|
|
if not consumer_matches:
|
|
print(f"no consumers found for {target}")
|
|
return
|
|
print("consumer_service_id\tdependency_id\trequires\tprovider_capability_id\tprovider_interface_id\tstatus")
|
|
for match in consumer_matches:
|
|
print(
|
|
"\t".join(
|
|
[
|
|
match.consumer_service_id,
|
|
match.dependency_id,
|
|
match.required_capability_type,
|
|
match.provider_capability_id,
|
|
match.provider_interface_id,
|
|
match.status,
|
|
]
|
|
)
|
|
)
|
|
|
|
|
|
def _print_unresolved(graph: FabricGraph) -> None:
|
|
unresolved = graph.unresolved_dependencies()
|
|
if not unresolved:
|
|
print("no unresolved dependencies")
|
|
return
|
|
print("dependency_id\tconsumer_service_id\trequires")
|
|
for dependency in unresolved:
|
|
spec = dependency.spec
|
|
requires = spec.get("requires", {})
|
|
print(
|
|
"\t".join(
|
|
[
|
|
dependency.id,
|
|
str(spec.get("consumer_service_id", "")),
|
|
str(requires.get("capability_id") or requires.get("capability_type", "")),
|
|
]
|
|
)
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|