Files
railiance-fabric/railiance_fabric/cli.py

730 lines
28 KiB
Python

from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
import urllib.error
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
from .connectors import ConnectorConfig
from .loader import declaration_files, load_yaml
from .graph import FabricGraph, build_graph
from .graph_explorer import fabric_graph_explorer_payload
from .llm_extraction import LLMExtractionConfig
from .reconciliation import reconcile_discovery_snapshots
from .scanner import ScanOptions, scan_repo
from .validation import validate_roots
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="railiance-fabric",
description="Load and validate Railiance Fabric declarations.",
)
sub = parser.add_subparsers(dest="command", required=True)
validate = sub.add_parser(
"validate",
help="Validate one or more repo roots or declaration files.",
)
validate.add_argument(
"paths",
nargs="+",
type=Path,
help="Repo root, fabric directory, or declaration YAML file.",
)
validate.add_argument(
"--warnings-as-errors",
action="store_true",
help="Exit non-zero when warnings are present.",
)
providers = sub.add_parser("providers", help="List providers for a capability type or id.")
providers.add_argument("capability", help="Capability type or capability id.")
providers.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
consumers = sub.add_parser("consumers", help="List consumers of a capability or interface.")
consumers.add_argument("target", help="Capability/interface type or declaration id.")
consumers.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
dependency_path = sub.add_parser("dependency-path", help="Show dependency path for a service.")
dependency_path.add_argument("service_id", help="Service declaration id.")
dependency_path.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
unresolved = sub.add_parser("unresolved", help="Show missing or unresolved dependencies.")
unresolved.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
blast = sub.add_parser("blast-radius", help="Show consumers affected by an interface change.")
blast.add_argument("interface", help="Interface type or interface declaration id.")
blast.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
export = sub.add_parser("export", help="Export graph as JSON, Mermaid, or graph-explorer payload.")
export.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
export.add_argument("--format", choices=["json", "mermaid", "graph-explorer"], default="json")
scan = sub.add_parser("scan", help="Scan a repo for deterministic discovery candidates.")
scan.add_argument("path", nargs="?", type=Path, default=Path("."))
scan.add_argument("--repo-slug", default=None)
scan.add_argument("--repo-name", default=None)
scan.add_argument("--domain", default=None)
scan.add_argument("--commit", default=None)
scan.add_argument("--profile", default="deterministic")
scan.add_argument("--dry-run", action="store_true", help="Do not write anywhere except an explicit --output file.")
scan.add_argument("--output", type=Path, default=None, help="Write the discovery snapshot JSON to a file.")
scan.add_argument("--previous-snapshot", type=Path, default=None, help="Reconcile against a previous discovery snapshot JSON.")
scan.add_argument("--json", action="store_true", help="Print the discovery snapshot JSON to stdout.")
scan.add_argument("--llm", action="store_true", help="Enable llm-connect assisted extraction.")
scan.add_argument("--llm-provider", default="mock", help="llm-connect provider name.")
scan.add_argument("--llm-model", default="mock", help="Model name passed to llm-connect.")
scan.add_argument("--llm-temperature", type=float, default=0.0)
scan.add_argument("--llm-max-tokens", type=int, default=1500)
scan.add_argument("--llm-min-confidence", type=float, default=0.6)
scan.add_argument(
"--connector",
action="append",
choices=["local-fabric-registry"],
default=[],
help="Enable a discovery connector. May be passed more than once.",
)
scan.add_argument(
"--connector-manifest",
type=Path,
default=Path("registry/local-repos.yaml"),
help="Manifest path for the local-fabric-registry connector.",
)
registry = sub.add_parser("registry", help="Feed a running Railiance Fabric registry service.")
registry_sub = registry.add_subparsers(dest="registry_command", required=True)
sync = registry_sub.add_parser("sync", help="Register a repo and ingest its current graph snapshot.")
sync.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
sync.add_argument("--registry-url", default="http://127.0.0.1:8765")
sync.add_argument("--repo-slug", default=None)
sync.add_argument("--name", default=None)
sync.add_argument("--remote-url", default=None)
sync.add_argument("--default-branch", default="main")
sync.add_argument("--state-hub-repo-id", default=None)
sync.add_argument("--commit", default=None)
sync.add_argument("--json", action="store_true", help="Print the raw snapshot response.")
sync_manifest = registry_sub.add_parser("sync-manifest", help="Register and sync repos from an onboarding manifest.")
sync_manifest.add_argument("manifest", type=Path)
sync_manifest.add_argument("--registry-url", default=None, help="Override the manifest registry_url.")
sync_manifest.add_argument("--strict", action="store_true", help="Exit non-zero when any repo cannot be synced.")
sync_manifest.add_argument("--json", action="store_true", help="Print the raw manifest sync summary.")
cyclonedx = registry_sub.add_parser("ingest-cyclonedx", help="Ingest a CycloneDX SBOM as library inventory.")
cyclonedx.add_argument("sbom", type=Path)
cyclonedx.add_argument("--registry-url", default="http://127.0.0.1:8765")
cyclonedx.add_argument("--repo-slug", required=True)
cyclonedx.add_argument("--json", action="store_true", help="Print the raw ingest response.")
discovery = registry_sub.add_parser("ingest-discovery", help="Store a discovery snapshot for review.")
discovery.add_argument("snapshot", type=Path)
discovery.add_argument("--registry-url", default="http://127.0.0.1:8765")
discovery.add_argument("--repo-slug", default=None)
discovery.add_argument("--json", action="store_true", help="Print the raw ingest response.")
accept_discovery = registry_sub.add_parser("accept-discovery", help="Project accepted discovery candidates into a graph snapshot.")
accept_discovery.add_argument("repo_slug")
accept_discovery.add_argument("discovery_snapshot_id", type=int)
accept_discovery.add_argument("--registry-url", default="http://127.0.0.1:8765")
accept_discovery.add_argument("--accepted-key", action="append", default=[])
accept_discovery.add_argument("--accept-review-state", action="append", default=None)
accept_discovery.add_argument("--commit", default=None)
accept_discovery.add_argument("--json", action="store_true", help="Print the raw accept response.")
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
if args.command == "validate":
report = validate_roots(args.paths)
for diagnostic in report.diagnostics:
print(diagnostic.format())
print(report.summary())
if report.errors:
return 1
if args.warnings_as_errors and report.warnings:
return 1
return 0
if args.command == "providers":
graph = _load_graph_or_exit(args.paths)
_print_providers(graph, args.capability)
return 0
if args.command == "consumers":
graph = _load_graph_or_exit(args.paths)
_print_consumers(graph, args.target)
return 0
if args.command == "dependency-path":
graph = _load_graph_or_exit(args.paths)
print("\n".join(graph.dependency_path_lines(args.service_id)))
return 0
if args.command == "unresolved":
graph = _load_graph_or_exit(args.paths)
_print_unresolved(graph)
return 0
if args.command == "blast-radius":
graph = _load_graph_or_exit(args.paths)
_print_consumers(graph, args.interface, matches=graph.blast_radius(args.interface))
return 0
if args.command == "export":
graph = _load_graph_or_exit(args.paths)
if args.format == "mermaid":
print(graph.to_mermaid())
elif args.format == "graph-explorer":
print(json.dumps(fabric_graph_explorer_payload(graph.to_export()), indent=2, sort_keys=True))
else:
print(graph.to_json())
return 0
if args.command == "scan":
return _scan_repo(args)
if args.command == "registry":
if args.registry_command == "sync":
return _registry_sync(args)
if args.registry_command == "sync-manifest":
return _registry_sync_manifest(args)
if args.registry_command == "ingest-cyclonedx":
return _registry_ingest_cyclonedx(args)
if args.registry_command == "ingest-discovery":
return _registry_ingest_discovery(args)
if args.registry_command == "accept-discovery":
return _registry_accept_discovery(args)
parser.error(f"unknown command {args.command!r}")
return 2
def _registry_sync(args: argparse.Namespace) -> int:
report = validate_roots(args.paths)
for diagnostic in report.diagnostics:
print(diagnostic.format(), file=sys.stderr)
if report.errors:
print(report.summary(), file=sys.stderr)
return 1
graph = _load_graph_or_exit(args.paths)
repo_path = _primary_repo_path(args.paths)
repo_slug = args.repo_slug or _slugify(repo_path.name)
repository = _registry_post(
args.registry_url,
"/repositories",
{
"slug": repo_slug,
"name": args.name or repo_path.name,
"remote_url": args.remote_url or _git_value(repo_path, "config", "--get", "remote.origin.url"),
"default_branch": args.default_branch,
"state_hub_repo_id": args.state_hub_repo_id,
},
)
snapshot = _registry_post(
args.registry_url,
f"/repositories/{repo_slug}/snapshots",
{
"commit": args.commit or _git_value(repo_path, "rev-parse", "HEAD") or "working-tree",
"generated_at": _utc_now(),
"graph": graph.to_export(),
},
)
if args.json:
print(json.dumps({"repository": repository, "snapshot": snapshot}, indent=2, sort_keys=True))
else:
print(f"registered {repository['slug']}")
print(f"snapshot {snapshot['id']} accepted for {snapshot['commit']}")
return 0
def _registry_sync_manifest(args: argparse.Namespace) -> int:
manifest_path = args.manifest.resolve()
manifest = load_yaml(manifest_path)
if not isinstance(manifest, dict):
print(f"ERROR {manifest_path}: manifest must be a YAML mapping", file=sys.stderr)
return 1
repositories = manifest.get("repositories")
if not isinstance(repositories, list):
print(f"ERROR {manifest_path}: manifest requires a repositories list", file=sys.stderr)
return 1
registry_url = args.registry_url or str(manifest.get("registry_url") or "http://127.0.0.1:8765")
results = [
_sync_manifest_repo(registry_url, manifest_path.parent, item)
for item in repositories
]
summary = {
"manifest": str(manifest_path),
"registry_url": registry_url,
"repositories": results,
"counts": {
"total": len(results),
"synced": sum(1 for item in results if item["status"] == "synced"),
"registered": sum(1 for item in results if item["status"] == "registered"),
"errors": sum(1 for item in results if item["status"] == "error"),
},
}
if args.json:
print(json.dumps(summary, indent=2, sort_keys=True))
else:
for item in results:
if item["status"] == "synced":
print(f"synced {item['slug']} snapshot {item['snapshot_id']} ({item['commit']})")
elif item["status"] == "registered":
print(f"registered {item['slug']} ({'; '.join(item['warnings'])})")
else:
print(f"error {item['slug']}: {item['error']}")
counts = summary["counts"]
print(
f"summary: {counts['total']} repo(s), {counts['synced']} synced, "
f"{counts['registered']} registered only, {counts['errors']} error(s)"
)
if args.strict and (summary["counts"]["errors"] or summary["counts"]["registered"]):
return 1
return 0
def _sync_manifest_repo(registry_url: str, manifest_dir: Path, item: object) -> dict[str, object]:
if not isinstance(item, dict):
return {"slug": "<invalid>", "status": "error", "error": "repository entry must be a mapping"}
slug = str(item.get("slug") or "").strip()
if not slug:
return {"slug": "<missing>", "status": "error", "error": "repository entry requires slug"}
repo_path = _manifest_optional_path(item.get("path"), manifest_dir)
result: dict[str, object] = {"slug": slug, "status": "registered", "warnings": []}
try:
repository = _registry_post_checked(
registry_url,
"/repositories",
{
"slug": slug,
"name": item.get("name") or (repo_path.name if repo_path else slug),
"remote_url": item.get("remote_url") or _git_value(repo_path, "config", "--get", "remote.origin.url"),
"default_branch": item.get("default_branch") or "main",
"state_hub_repo_id": item.get("state_hub_repo_id"),
},
)
result["repository"] = repository
except RegistryRequestError as exc:
return {"slug": slug, "status": "error", "error": str(exc), "warnings": []}
if repo_path is None:
result["warnings"].append("no repo path configured")
return result
if not repo_path.is_dir():
result["warnings"].append(f"repo path not found: {repo_path}")
return result
graph_paths = _manifest_paths(item.get("declaration_paths"), manifest_dir) or [repo_path]
if not any(declaration_files(path) for path in graph_paths):
result["warnings"].append("no Fabric declarations found")
try:
_ingest_manifest_sboms(registry_url, manifest_dir, slug, item, result)
except RegistryRequestError as exc:
return {"slug": slug, "status": "error", "error": str(exc), "warnings": result["warnings"]}
return result
report = validate_roots(graph_paths)
if report.errors:
return {
"slug": slug,
"status": "error",
"error": report.summary(),
"warnings": [diagnostic.format() for diagnostic in report.diagnostics],
}
graph = build_graph(graph_paths)
if graph.load_errors:
return {
"slug": slug,
"status": "error",
"error": "; ".join(f"{path}: {message}" for path, message in graph.load_errors),
"warnings": [],
}
try:
snapshot = _registry_post_checked(
registry_url,
f"/repositories/{slug}/snapshots",
{
"commit": item.get("commit") or _git_value(repo_path, "rev-parse", "HEAD") or "working-tree",
"generated_at": _utc_now(),
"graph": graph.to_export(),
},
)
result.update(
{
"status": "synced",
"snapshot_id": snapshot["id"],
"commit": snapshot["commit"],
}
)
_ingest_manifest_sboms(registry_url, manifest_dir, slug, item, result)
except RegistryRequestError as exc:
return {"slug": slug, "status": "error", "error": str(exc), "warnings": result["warnings"]}
return result
def _ingest_manifest_sboms(
registry_url: str,
manifest_dir: Path,
slug: str,
item: dict[str, object],
result: dict[str, object],
) -> None:
sbom_paths = _manifest_paths(item.get("sboms"), manifest_dir)
single_sbom = _manifest_optional_path(item.get("sbom"), manifest_dir)
if single_sbom:
sbom_paths.insert(0, single_sbom)
ingested: list[dict[str, object]] = []
for sbom_path in sbom_paths:
if not sbom_path.is_file():
result.setdefault("warnings", []).append(f"SBOM not found: {sbom_path}")
continue
payload = load_yaml(sbom_path)
if not isinstance(payload, dict):
result.setdefault("warnings", []).append(f"SBOM must be an object: {sbom_path}")
continue
ingested.append(
_registry_post_checked(
registry_url,
f"/repositories/{slug}/libraries/cyclonedx",
payload,
)
)
if ingested:
result["libraries"] = ingested
def _registry_ingest_cyclonedx(args: argparse.Namespace) -> int:
payload = load_yaml(args.sbom)
if not isinstance(payload, dict):
print(f"ERROR {args.sbom}: CycloneDX SBOM must be a mapping/object", file=sys.stderr)
return 1
result = _registry_post(
args.registry_url,
f"/repositories/{args.repo_slug}/libraries/cyclonedx",
payload,
)
if args.json:
print(json.dumps(result, indent=2, sort_keys=True))
else:
print(f"ingested {result['component_count']} library component(s) for {result['repo_slug']}")
return 0
def _registry_ingest_discovery(args: argparse.Namespace) -> int:
payload = json.loads(args.snapshot.read_text(encoding="utf-8"))
if not isinstance(payload, dict):
print(f"ERROR {args.snapshot}: discovery snapshot must be a JSON object", file=sys.stderr)
return 1
source = payload.get("source") if isinstance(payload.get("source"), dict) else {}
repo_slug = args.repo_slug or str(source.get("repo_slug") or "").strip()
if not repo_slug:
print("ERROR discovery snapshot source.repo_slug is required unless --repo-slug is provided", file=sys.stderr)
return 1
result = _registry_post(
args.registry_url,
f"/repositories/{repo_slug}/discovery-snapshots",
payload,
)
if args.json:
print(json.dumps(result, indent=2, sort_keys=True))
else:
candidates = result.get("snapshot", {}).get("candidates", {})
counts = {
"nodes": len(candidates.get("nodes", [])),
"edges": len(candidates.get("edges", [])),
"attributes": len(candidates.get("attributes", [])),
}
print(
f"ingested discovery snapshot {result['id']} for {result['repo_slug']} "
f"({result['profile']}, {result['commit']}): "
f"{counts['nodes']} node candidate(s), {counts['edges']} edge candidate(s), "
f"{counts['attributes']} attribute candidate(s)"
)
return 0
def _registry_accept_discovery(args: argparse.Namespace) -> int:
payload = {
"accepted_keys": args.accepted_key,
"commit": args.commit,
}
if args.accept_review_state is not None:
payload["accept_review_states"] = args.accept_review_state
result = _registry_post(
args.registry_url,
f"/repositories/{args.repo_slug}/discovery-snapshots/{args.discovery_snapshot_id}/accept",
payload,
)
if args.json:
print(json.dumps(result, indent=2, sort_keys=True))
else:
graph_snapshot = result["graph_snapshot"]
print(
f"accepted discovery snapshot {args.discovery_snapshot_id} for {args.repo_slug}; "
f"graph snapshot {graph_snapshot['id']} stored for {graph_snapshot['commit']}"
)
return 0
def _scan_repo(args: argparse.Namespace) -> int:
snapshot = scan_repo(
ScanOptions(
repo_path=args.path,
repo_slug=args.repo_slug,
repo_name=args.repo_name,
domain=args.domain,
commit=args.commit,
profile=args.profile,
deterministic_only=not args.llm,
llm_enabled=args.llm,
llm_config=LLMExtractionConfig(
provider=args.llm_provider,
model=args.llm_model,
temperature=args.llm_temperature,
max_tokens=args.llm_max_tokens,
min_confidence=args.llm_min_confidence,
),
connectors=[
ConnectorConfig(
connector_id=connector_id,
connector_type="fabric_registry",
source_path=str(args.connector_manifest),
)
for connector_id in args.connector
],
)
)
if args.previous_snapshot:
try:
previous = json.loads(args.previous_snapshot.read_text(encoding="utf-8"))
except Exception as exc:
print(f"ERROR {args.previous_snapshot}: cannot read previous snapshot: {exc}", file=sys.stderr)
return 1
if not isinstance(previous, dict):
print(f"ERROR {args.previous_snapshot}: previous snapshot must be a JSON object", file=sys.stderr)
return 1
snapshot = reconcile_discovery_snapshots(previous, snapshot)
payload = json.dumps(snapshot, indent=2, sort_keys=True)
if args.output:
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(payload + "\n", encoding="utf-8")
if args.json:
print(payload)
return 0
candidates = snapshot["candidates"]
review_count = len(snapshot.get("review_artifacts", []))
review_summary = f", {review_count} review artifact(s)" if review_count else ""
connector_count = len(snapshot.get("connector_runs", []))
connector_summary = f", {connector_count} connector run(s)" if connector_count else ""
reconciliation = snapshot.get("reconciliation", {})
diff = reconciliation.get("diff") if isinstance(reconciliation, dict) else None
diff_summary = ""
if isinstance(diff, dict):
diff_summary = (
f", diff +{len(diff.get('added', []))}"
f"/~{len(diff.get('changed', []))}"
f"/-{len(diff.get('retired', []))}"
f"/!{len(diff.get('conflicted', []))}"
)
mode = "dry-run " if args.dry_run else ""
print(
f"{mode}scan {snapshot['source']['repo_slug']} "
f"({snapshot['source']['commit']}): "
f"{len(candidates['nodes'])} node(s), "
f"{len(candidates['edges'])} edge(s), "
f"{len(candidates['attributes'])} attribute(s), "
f"{len(snapshot['replacement_scopes'])} replacement scope(s)"
f"{review_summary}"
f"{connector_summary}"
f"{diff_summary}"
)
if args.output:
print(f"wrote {args.output}")
return 0
class RegistryRequestError(Exception):
pass
def _registry_post(registry_url: str, path: str, payload: dict[str, object]) -> dict[str, object]:
try:
return _registry_post_checked(registry_url, path, payload)
except RegistryRequestError as exc:
print(f"ERROR {exc}", file=sys.stderr)
raise SystemExit(1) from exc
def _registry_post_checked(registry_url: str, path: str, payload: dict[str, object]) -> dict[str, object]:
data = json.dumps({key: value for key, value in payload.items() if value is not None}).encode("utf-8")
request = urllib.request.Request(
registry_url.rstrip("/") + path,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(request, timeout=15) as response:
body = json.loads(response.read())
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise RegistryRequestError(f"registry request failed ({exc.code}): {detail}") from exc
except urllib.error.URLError as exc:
raise RegistryRequestError(f"cannot reach registry at {registry_url}: {exc}") from exc
if not isinstance(body, dict):
raise RegistryRequestError("registry returned a non-object response")
return body
def _manifest_optional_path(value: object, manifest_dir: Path) -> Path | None:
if not isinstance(value, str) or not value.strip():
return None
path = Path(value).expanduser()
return path if path.is_absolute() else (manifest_dir / path).resolve()
def _manifest_paths(value: object, manifest_dir: Path) -> list[Path]:
if value is None:
return []
if isinstance(value, str):
values = [value]
elif isinstance(value, list):
values = value
else:
return []
paths: list[Path] = []
for item in values:
path = _manifest_optional_path(item, manifest_dir)
if path is not None:
paths.append(path)
return paths
def _primary_repo_path(paths: list[Path]) -> Path:
if not paths:
return Path(".").resolve()
path = paths[0].resolve()
return path.parent if path.is_file() else path
def _slugify(value: str) -> str:
return re.sub(r"-+", "-", re.sub(r"[^a-z0-9]", "-", value.lower())).strip("-") or "repo"
def _git_value(repo_path: Path | None, *args: str) -> str | None:
if repo_path is None:
return None
try:
result = subprocess.run(
["git", *args],
cwd=repo_path,
check=False,
capture_output=True,
text=True,
timeout=5,
)
except (OSError, subprocess.TimeoutExpired):
return None
if result.returncode != 0:
return None
value = result.stdout.strip()
return value or None
def _utc_now() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def _load_graph_or_exit(paths: list[Path]) -> FabricGraph:
graph = build_graph(paths)
if graph.load_errors:
for path, message in graph.load_errors:
print(f"ERROR {path}: {message}", file=sys.stderr)
raise SystemExit(1)
return graph
def _print_providers(graph: FabricGraph, capability: str) -> None:
providers = graph.providers(capability)
if not providers:
print(f"no providers found for {capability}")
return
print("provider_id\tservice_id\tlifecycle\tenvironments\tinterfaces")
for provider in providers:
spec = provider.spec
print(
"\t".join(
[
provider.id,
str(spec.get("service_id", "")),
str(spec.get("lifecycle", "")),
",".join(spec.get("environments", [])),
",".join(spec.get("interface_ids", [])),
]
)
)
def _print_consumers(
graph: FabricGraph,
target: str,
matches: object | None = None,
) -> None:
consumer_matches = graph.consumers(target) if matches is None else list(matches)
if not consumer_matches:
print(f"no consumers found for {target}")
return
print("consumer_service_id\tdependency_id\trequires\tprovider_capability_id\tprovider_interface_id\tstatus")
for match in consumer_matches:
print(
"\t".join(
[
match.consumer_service_id,
match.dependency_id,
match.required_capability_type,
match.provider_capability_id,
match.provider_interface_id,
match.status,
]
)
)
def _print_unresolved(graph: FabricGraph) -> None:
unresolved = graph.unresolved_dependencies()
if not unresolved:
print("no unresolved dependencies")
return
print("dependency_id\tconsumer_service_id\trequires")
for dependency in unresolved:
spec = dependency.spec
requires = spec.get("requires", {})
print(
"\t".join(
[
dependency.id,
str(spec.get("consumer_service_id", "")),
str(requires.get("capability_id") or requires.get("capability_type", "")),
]
)
)
if __name__ == "__main__":
sys.exit(main())