Files
railiance-fabric/railiance_fabric/cli.py

539 lines
20 KiB
Python

from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
import urllib.error
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
from .loader import declaration_files, load_yaml
from .graph import FabricGraph, build_graph
from .graph_explorer import fabric_graph_explorer_payload
from .validation import validate_roots
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="railiance-fabric",
description="Load and validate Railiance Fabric declarations.",
)
sub = parser.add_subparsers(dest="command", required=True)
validate = sub.add_parser(
"validate",
help="Validate one or more repo roots or declaration files.",
)
validate.add_argument(
"paths",
nargs="+",
type=Path,
help="Repo root, fabric directory, or declaration YAML file.",
)
validate.add_argument(
"--warnings-as-errors",
action="store_true",
help="Exit non-zero when warnings are present.",
)
providers = sub.add_parser("providers", help="List providers for a capability type or id.")
providers.add_argument("capability", help="Capability type or capability id.")
providers.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
consumers = sub.add_parser("consumers", help="List consumers of a capability or interface.")
consumers.add_argument("target", help="Capability/interface type or declaration id.")
consumers.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
dependency_path = sub.add_parser("dependency-path", help="Show dependency path for a service.")
dependency_path.add_argument("service_id", help="Service declaration id.")
dependency_path.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
unresolved = sub.add_parser("unresolved", help="Show missing or unresolved dependencies.")
unresolved.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
blast = sub.add_parser("blast-radius", help="Show consumers affected by an interface change.")
blast.add_argument("interface", help="Interface type or interface declaration id.")
blast.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
export = sub.add_parser("export", help="Export graph as JSON, Mermaid, or graph-explorer payload.")
export.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
export.add_argument("--format", choices=["json", "mermaid", "graph-explorer"], default="json")
registry = sub.add_parser("registry", help="Feed a running Railiance Fabric registry service.")
registry_sub = registry.add_subparsers(dest="registry_command", required=True)
sync = registry_sub.add_parser("sync", help="Register a repo and ingest its current graph snapshot.")
sync.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
sync.add_argument("--registry-url", default="http://127.0.0.1:8765")
sync.add_argument("--repo-slug", default=None)
sync.add_argument("--name", default=None)
sync.add_argument("--remote-url", default=None)
sync.add_argument("--default-branch", default="main")
sync.add_argument("--state-hub-repo-id", default=None)
sync.add_argument("--commit", default=None)
sync.add_argument("--json", action="store_true", help="Print the raw snapshot response.")
sync_manifest = registry_sub.add_parser("sync-manifest", help="Register and sync repos from an onboarding manifest.")
sync_manifest.add_argument("manifest", type=Path)
sync_manifest.add_argument("--registry-url", default=None, help="Override the manifest registry_url.")
sync_manifest.add_argument("--strict", action="store_true", help="Exit non-zero when any repo cannot be synced.")
sync_manifest.add_argument("--json", action="store_true", help="Print the raw manifest sync summary.")
cyclonedx = registry_sub.add_parser("ingest-cyclonedx", help="Ingest a CycloneDX SBOM as library inventory.")
cyclonedx.add_argument("sbom", type=Path)
cyclonedx.add_argument("--registry-url", default="http://127.0.0.1:8765")
cyclonedx.add_argument("--repo-slug", required=True)
cyclonedx.add_argument("--json", action="store_true", help="Print the raw ingest response.")
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
if args.command == "validate":
report = validate_roots(args.paths)
for diagnostic in report.diagnostics:
print(diagnostic.format())
print(report.summary())
if report.errors:
return 1
if args.warnings_as_errors and report.warnings:
return 1
return 0
if args.command == "providers":
graph = _load_graph_or_exit(args.paths)
_print_providers(graph, args.capability)
return 0
if args.command == "consumers":
graph = _load_graph_or_exit(args.paths)
_print_consumers(graph, args.target)
return 0
if args.command == "dependency-path":
graph = _load_graph_or_exit(args.paths)
print("\n".join(graph.dependency_path_lines(args.service_id)))
return 0
if args.command == "unresolved":
graph = _load_graph_or_exit(args.paths)
_print_unresolved(graph)
return 0
if args.command == "blast-radius":
graph = _load_graph_or_exit(args.paths)
_print_consumers(graph, args.interface, matches=graph.blast_radius(args.interface))
return 0
if args.command == "export":
graph = _load_graph_or_exit(args.paths)
if args.format == "mermaid":
print(graph.to_mermaid())
elif args.format == "graph-explorer":
print(json.dumps(fabric_graph_explorer_payload(graph.to_export()), indent=2, sort_keys=True))
else:
print(graph.to_json())
return 0
if args.command == "registry":
if args.registry_command == "sync":
return _registry_sync(args)
if args.registry_command == "sync-manifest":
return _registry_sync_manifest(args)
if args.registry_command == "ingest-cyclonedx":
return _registry_ingest_cyclonedx(args)
parser.error(f"unknown command {args.command!r}")
return 2
def _registry_sync(args: argparse.Namespace) -> int:
report = validate_roots(args.paths)
for diagnostic in report.diagnostics:
print(diagnostic.format(), file=sys.stderr)
if report.errors:
print(report.summary(), file=sys.stderr)
return 1
graph = _load_graph_or_exit(args.paths)
repo_path = _primary_repo_path(args.paths)
repo_slug = args.repo_slug or _slugify(repo_path.name)
repository = _registry_post(
args.registry_url,
"/repositories",
{
"slug": repo_slug,
"name": args.name or repo_path.name,
"remote_url": args.remote_url or _git_value(repo_path, "config", "--get", "remote.origin.url"),
"default_branch": args.default_branch,
"state_hub_repo_id": args.state_hub_repo_id,
},
)
snapshot = _registry_post(
args.registry_url,
f"/repositories/{repo_slug}/snapshots",
{
"commit": args.commit or _git_value(repo_path, "rev-parse", "HEAD") or "working-tree",
"generated_at": _utc_now(),
"graph": graph.to_export(),
},
)
if args.json:
print(json.dumps({"repository": repository, "snapshot": snapshot}, indent=2, sort_keys=True))
else:
print(f"registered {repository['slug']}")
print(f"snapshot {snapshot['id']} accepted for {snapshot['commit']}")
return 0
def _registry_sync_manifest(args: argparse.Namespace) -> int:
manifest_path = args.manifest.resolve()
manifest = load_yaml(manifest_path)
if not isinstance(manifest, dict):
print(f"ERROR {manifest_path}: manifest must be a YAML mapping", file=sys.stderr)
return 1
repositories = manifest.get("repositories")
if not isinstance(repositories, list):
print(f"ERROR {manifest_path}: manifest requires a repositories list", file=sys.stderr)
return 1
registry_url = args.registry_url or str(manifest.get("registry_url") or "http://127.0.0.1:8765")
results = [
_sync_manifest_repo(registry_url, manifest_path.parent, item)
for item in repositories
]
summary = {
"manifest": str(manifest_path),
"registry_url": registry_url,
"repositories": results,
"counts": {
"total": len(results),
"synced": sum(1 for item in results if item["status"] == "synced"),
"registered": sum(1 for item in results if item["status"] == "registered"),
"errors": sum(1 for item in results if item["status"] == "error"),
},
}
if args.json:
print(json.dumps(summary, indent=2, sort_keys=True))
else:
for item in results:
if item["status"] == "synced":
print(f"synced {item['slug']} snapshot {item['snapshot_id']} ({item['commit']})")
elif item["status"] == "registered":
print(f"registered {item['slug']} ({'; '.join(item['warnings'])})")
else:
print(f"error {item['slug']}: {item['error']}")
counts = summary["counts"]
print(
f"summary: {counts['total']} repo(s), {counts['synced']} synced, "
f"{counts['registered']} registered only, {counts['errors']} error(s)"
)
if args.strict and (summary["counts"]["errors"] or summary["counts"]["registered"]):
return 1
return 0
def _sync_manifest_repo(registry_url: str, manifest_dir: Path, item: object) -> dict[str, object]:
if not isinstance(item, dict):
return {"slug": "<invalid>", "status": "error", "error": "repository entry must be a mapping"}
slug = str(item.get("slug") or "").strip()
if not slug:
return {"slug": "<missing>", "status": "error", "error": "repository entry requires slug"}
repo_path = _manifest_optional_path(item.get("path"), manifest_dir)
result: dict[str, object] = {"slug": slug, "status": "registered", "warnings": []}
try:
repository = _registry_post_checked(
registry_url,
"/repositories",
{
"slug": slug,
"name": item.get("name") or (repo_path.name if repo_path else slug),
"remote_url": item.get("remote_url") or _git_value(repo_path, "config", "--get", "remote.origin.url"),
"default_branch": item.get("default_branch") or "main",
"state_hub_repo_id": item.get("state_hub_repo_id"),
},
)
result["repository"] = repository
except RegistryRequestError as exc:
return {"slug": slug, "status": "error", "error": str(exc), "warnings": []}
if repo_path is None:
result["warnings"].append("no repo path configured")
return result
if not repo_path.is_dir():
result["warnings"].append(f"repo path not found: {repo_path}")
return result
graph_paths = _manifest_paths(item.get("declaration_paths"), manifest_dir) or [repo_path]
if not any(declaration_files(path) for path in graph_paths):
result["warnings"].append("no Fabric declarations found")
try:
_ingest_manifest_sboms(registry_url, manifest_dir, slug, item, result)
except RegistryRequestError as exc:
return {"slug": slug, "status": "error", "error": str(exc), "warnings": result["warnings"]}
return result
report = validate_roots(graph_paths)
if report.errors:
return {
"slug": slug,
"status": "error",
"error": report.summary(),
"warnings": [diagnostic.format() for diagnostic in report.diagnostics],
}
graph = build_graph(graph_paths)
if graph.load_errors:
return {
"slug": slug,
"status": "error",
"error": "; ".join(f"{path}: {message}" for path, message in graph.load_errors),
"warnings": [],
}
try:
snapshot = _registry_post_checked(
registry_url,
f"/repositories/{slug}/snapshots",
{
"commit": item.get("commit") or _git_value(repo_path, "rev-parse", "HEAD") or "working-tree",
"generated_at": _utc_now(),
"graph": graph.to_export(),
},
)
result.update(
{
"status": "synced",
"snapshot_id": snapshot["id"],
"commit": snapshot["commit"],
}
)
_ingest_manifest_sboms(registry_url, manifest_dir, slug, item, result)
except RegistryRequestError as exc:
return {"slug": slug, "status": "error", "error": str(exc), "warnings": result["warnings"]}
return result
def _ingest_manifest_sboms(
registry_url: str,
manifest_dir: Path,
slug: str,
item: dict[str, object],
result: dict[str, object],
) -> None:
sbom_paths = _manifest_paths(item.get("sboms"), manifest_dir)
single_sbom = _manifest_optional_path(item.get("sbom"), manifest_dir)
if single_sbom:
sbom_paths.insert(0, single_sbom)
ingested: list[dict[str, object]] = []
for sbom_path in sbom_paths:
if not sbom_path.is_file():
result.setdefault("warnings", []).append(f"SBOM not found: {sbom_path}")
continue
payload = load_yaml(sbom_path)
if not isinstance(payload, dict):
result.setdefault("warnings", []).append(f"SBOM must be an object: {sbom_path}")
continue
ingested.append(
_registry_post_checked(
registry_url,
f"/repositories/{slug}/libraries/cyclonedx",
payload,
)
)
if ingested:
result["libraries"] = ingested
def _registry_ingest_cyclonedx(args: argparse.Namespace) -> int:
payload = load_yaml(args.sbom)
if not isinstance(payload, dict):
print(f"ERROR {args.sbom}: CycloneDX SBOM must be a mapping/object", file=sys.stderr)
return 1
result = _registry_post(
args.registry_url,
f"/repositories/{args.repo_slug}/libraries/cyclonedx",
payload,
)
if args.json:
print(json.dumps(result, indent=2, sort_keys=True))
else:
print(f"ingested {result['component_count']} library component(s) for {result['repo_slug']}")
return 0
class RegistryRequestError(Exception):
pass
def _registry_post(registry_url: str, path: str, payload: dict[str, object]) -> dict[str, object]:
try:
return _registry_post_checked(registry_url, path, payload)
except RegistryRequestError as exc:
print(f"ERROR {exc}", file=sys.stderr)
raise SystemExit(1) from exc
def _registry_post_checked(registry_url: str, path: str, payload: dict[str, object]) -> dict[str, object]:
data = json.dumps({key: value for key, value in payload.items() if value is not None}).encode("utf-8")
request = urllib.request.Request(
registry_url.rstrip("/") + path,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(request, timeout=15) as response:
body = json.loads(response.read())
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise RegistryRequestError(f"registry request failed ({exc.code}): {detail}") from exc
except urllib.error.URLError as exc:
raise RegistryRequestError(f"cannot reach registry at {registry_url}: {exc}") from exc
if not isinstance(body, dict):
raise RegistryRequestError("registry returned a non-object response")
return body
def _manifest_optional_path(value: object, manifest_dir: Path) -> Path | None:
if not isinstance(value, str) or not value.strip():
return None
path = Path(value).expanduser()
return path if path.is_absolute() else (manifest_dir / path).resolve()
def _manifest_paths(value: object, manifest_dir: Path) -> list[Path]:
if value is None:
return []
if isinstance(value, str):
values = [value]
elif isinstance(value, list):
values = value
else:
return []
paths: list[Path] = []
for item in values:
path = _manifest_optional_path(item, manifest_dir)
if path is not None:
paths.append(path)
return paths
def _primary_repo_path(paths: list[Path]) -> Path:
if not paths:
return Path(".").resolve()
path = paths[0].resolve()
return path.parent if path.is_file() else path
def _slugify(value: str) -> str:
return re.sub(r"-+", "-", re.sub(r"[^a-z0-9]", "-", value.lower())).strip("-") or "repo"
def _git_value(repo_path: Path | None, *args: str) -> str | None:
if repo_path is None:
return None
try:
result = subprocess.run(
["git", *args],
cwd=repo_path,
check=False,
capture_output=True,
text=True,
timeout=5,
)
except (OSError, subprocess.TimeoutExpired):
return None
if result.returncode != 0:
return None
value = result.stdout.strip()
return value or None
def _utc_now() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def _load_graph_or_exit(paths: list[Path]) -> FabricGraph:
graph = build_graph(paths)
if graph.load_errors:
for path, message in graph.load_errors:
print(f"ERROR {path}: {message}", file=sys.stderr)
raise SystemExit(1)
return graph
def _print_providers(graph: FabricGraph, capability: str) -> None:
providers = graph.providers(capability)
if not providers:
print(f"no providers found for {capability}")
return
print("provider_id\tservice_id\tlifecycle\tenvironments\tinterfaces")
for provider in providers:
spec = provider.spec
print(
"\t".join(
[
provider.id,
str(spec.get("service_id", "")),
str(spec.get("lifecycle", "")),
",".join(spec.get("environments", [])),
",".join(spec.get("interface_ids", [])),
]
)
)
def _print_consumers(
graph: FabricGraph,
target: str,
matches: object | None = None,
) -> None:
consumer_matches = graph.consumers(target) if matches is None else list(matches)
if not consumer_matches:
print(f"no consumers found for {target}")
return
print("consumer_service_id\tdependency_id\trequires\tprovider_capability_id\tprovider_interface_id\tstatus")
for match in consumer_matches:
print(
"\t".join(
[
match.consumer_service_id,
match.dependency_id,
match.required_capability_type,
match.provider_capability_id,
match.provider_interface_id,
match.status,
]
)
)
def _print_unresolved(graph: FabricGraph) -> None:
unresolved = graph.unresolved_dependencies()
if not unresolved:
print("no unresolved dependencies")
return
print("dependency_id\tconsumer_service_id\trequires")
for dependency in unresolved:
spec = dependency.spec
requires = spec.get("requires", {})
print(
"\t".join(
[
dependency.id,
str(spec.get("consumer_service_id", "")),
str(requires.get("capability_id") or requires.get("capability_type", "")),
]
)
)
if __name__ == "__main__":
sys.exit(main())