diff --git a/docs/repo-reality-scanner.md b/docs/repo-reality-scanner.md index 9ac0a00..28eef60 100644 --- a/docs/repo-reality-scanner.md +++ b/docs/repo-reality-scanner.md @@ -210,6 +210,54 @@ Failures do not corrupt the scan. Missing catalogs become `connector_failed` artifacts, and future remote connectors should use `connector_rate_limited` when backoff is required. +## Multi-Repo Orchestration + +Known local repos can be scanned from the same onboarding manifest used by +`registry sync-manifest`: + +```bash +railiance-fabric registry scan-manifest registry/local-repos.yaml \ + --dry-run \ + --output-dir .fabric-discovery +``` + +The command isolates each repo. A missing path, invalid previous snapshot, or +registry write failure is reported for that repo without aborting the rest of +the run. The summary includes repo counts for scanned, changed, retired, +conflicted, LLM skipped, LLM failed, ingested, accepted, and errors so it can be +copied into State Hub progress notes or future automation output. + +Useful controls: + +- `--repo-slug ` can be repeated to scan an allowlist. +- `--profile ` tags the scan profile and output filename. +- `--previous-dir ` reconciles each repo against + `-.discovery.json` from an earlier run. +- `--llm` enables LLM-assisted extraction; `--deterministic-only` forces the + offline rule path. +- `--llm-max-runs ` caps how many repos may attempt LLM extraction in one + orchestration run, while `--llm-max-tokens` remains the per-repo request cap. +- `--connector local-fabric-registry` attaches manifest-derived registry facts + to every repo scan. +- `--ingest` stores discovery snapshots in the registry; `--accept` then + projects accepted candidates into graph snapshots. `--dry-run` suppresses + registry writes even when those flags are present. + +Example review cycle: + +```bash +railiance-fabric registry scan-manifest registry/local-repos.yaml \ + --repo-slug railiance-fabric \ + --previous-dir .fabric-discovery \ + --output-dir .fabric-discovery \ + --connector local-fabric-registry \ + --dry-run +``` + +After review, rerun with `--ingest` to store the snapshots. Add `--accept` only +when candidates marked `review_state: accepted` should be projected into the +registry graph. + ## Identity Identity is the main safety boundary. The scanner must not append guesses on diff --git a/railiance_fabric/cli.py b/railiance_fabric/cli.py index d605949..7c015b5 100644 --- a/railiance_fabric/cli.py +++ b/railiance_fabric/cli.py @@ -9,6 +9,7 @@ import urllib.error import urllib.request from datetime import datetime, timezone from pathlib import Path +from typing import Any from .connectors import ConnectorConfig from .loader import declaration_files, load_yaml @@ -117,6 +118,40 @@ def build_parser() -> argparse.ArgumentParser: sync_manifest.add_argument("--strict", action="store_true", help="Exit non-zero when any repo cannot be synced.") sync_manifest.add_argument("--json", action="store_true", help="Print the raw manifest sync summary.") + scan_manifest = registry_sub.add_parser("scan-manifest", help="Run discovery scans from an onboarding manifest.") + scan_manifest.add_argument("manifest", type=Path) + scan_manifest.add_argument("--registry-url", default=None, help="Override the manifest registry_url.") + scan_manifest.add_argument("--repo-slug", action="append", default=[], help="Only scan this repo slug. May be repeated.") + scan_manifest.add_argument("--profile", default="deterministic", help="Discovery scan profile name.") + scan_manifest.add_argument("--output-dir", type=Path, default=None, help="Write one discovery snapshot JSON per scanned repo.") + scan_manifest.add_argument("--previous-dir", type=Path, default=None, help="Read previous per-repo snapshots for reconciliation.") + scan_manifest.add_argument("--dry-run", action="store_true", help="Do not write to the registry.") + scan_manifest.add_argument("--ingest", action="store_true", help="Store each discovery snapshot in the registry.") + scan_manifest.add_argument("--accept", action="store_true", help="Accept ingested snapshots after scanning.") + scan_manifest.add_argument("--strict", action="store_true", help="Exit non-zero when any repo cannot be scanned or stored.") + scan_manifest.add_argument("--json", action="store_true", help="Print the raw manifest scan summary.") + scan_manifest.add_argument("--llm", action="store_true", help="Enable llm-connect assisted extraction.") + scan_manifest.add_argument("--deterministic-only", action="store_true", help="Force deterministic-only scans even when --llm is set.") + scan_manifest.add_argument("--llm-provider", default="mock", help="llm-connect provider name.") + scan_manifest.add_argument("--llm-model", default="mock", help="Model name passed to llm-connect.") + scan_manifest.add_argument("--llm-temperature", type=float, default=0.0) + scan_manifest.add_argument("--llm-max-tokens", type=int, default=1500) + scan_manifest.add_argument("--llm-min-confidence", type=float, default=0.6) + scan_manifest.add_argument("--llm-max-runs", type=int, default=None, help="Maximum repos allowed to run LLM extraction.") + scan_manifest.add_argument( + "--connector", + action="append", + choices=["local-fabric-registry"], + default=[], + help="Enable a discovery connector for each repo. May be passed more than once.", + ) + scan_manifest.add_argument( + "--connector-manifest", + type=Path, + default=None, + help="Manifest path for the local-fabric-registry connector. Defaults to the scanned manifest.", + ) + cyclonedx = registry_sub.add_parser("ingest-cyclonedx", help="Ingest a CycloneDX SBOM as library inventory.") cyclonedx.add_argument("sbom", type=Path) cyclonedx.add_argument("--registry-url", default="http://127.0.0.1:8765") @@ -198,6 +233,8 @@ def main(argv: list[str] | None = None) -> int: return _registry_sync(args) if args.registry_command == "sync-manifest": return _registry_sync_manifest(args) + if args.registry_command == "scan-manifest": + return _registry_scan_manifest(args) if args.registry_command == "ingest-cyclonedx": return _registry_ingest_cyclonedx(args) if args.registry_command == "ingest-discovery": @@ -296,6 +333,407 @@ def _registry_sync_manifest(args: argparse.Namespace) -> int: return 0 +def _registry_scan_manifest(args: argparse.Namespace) -> int: + if args.llm_max_runs is not None and args.llm_max_runs < 0: + print("ERROR --llm-max-runs must be zero or greater", file=sys.stderr) + return 1 + if args.accept and not args.ingest and not args.dry_run: + print("ERROR --accept requires --ingest unless --dry-run is set", file=sys.stderr) + return 1 + + manifest_path = args.manifest.resolve() + manifest = load_yaml(manifest_path) + if not isinstance(manifest, dict): + print(f"ERROR {manifest_path}: manifest must be a YAML mapping", file=sys.stderr) + return 1 + repositories = manifest.get("repositories") + if not isinstance(repositories, list): + print(f"ERROR {manifest_path}: manifest requires a repositories list", file=sys.stderr) + return 1 + + registry_url = args.registry_url or str(manifest.get("registry_url") or "http://127.0.0.1:8765") + allowlist = {slug for slug in args.repo_slug if slug} + selected = [ + item for item in repositories + if not allowlist or _manifest_repo_slug(item) in allowlist + ] + llm_runs_used = 0 + results: list[dict[str, Any]] = [] + for item in selected: + llm_enabled = False + llm_skip_reason = "deterministic_only" + if args.llm and not args.deterministic_only: + if args.llm_max_runs is None or llm_runs_used < args.llm_max_runs: + llm_enabled = True + llm_skip_reason = "" + else: + llm_skip_reason = "budget_exhausted" + result = _scan_manifest_repo( + args, + registry_url, + manifest_path.parent, + manifest_path, + item, + llm_enabled=llm_enabled, + llm_skip_reason=llm_skip_reason, + ) + if result.get("llm", {}).get("attempted"): + llm_runs_used += 1 + results.append(result) + + summary = { + "manifest": str(manifest_path), + "registry_url": registry_url, + "generated_at": _utc_now(), + "profile": args.profile, + "dry_run": args.dry_run, + "ingest": args.ingest and not args.dry_run, + "accept": args.accept and args.ingest and not args.dry_run, + "allowlist": sorted(allowlist), + "llm": { + "requested": bool(args.llm), + "deterministic_only": bool(args.deterministic_only or not args.llm), + "max_runs": args.llm_max_runs, + "used_runs": llm_runs_used, + }, + "repositories": results, + "counts": _scan_manifest_counts(results), + } + + if args.json: + print(json.dumps(summary, indent=2, sort_keys=True)) + else: + _print_scan_manifest_summary(summary) + if args.strict and summary["counts"]["errors"]: + return 1 + return 0 + + +def _scan_manifest_repo( + args: argparse.Namespace, + registry_url: str, + manifest_dir: Path, + manifest_path: Path, + item: object, + *, + llm_enabled: bool, + llm_skip_reason: str, +) -> dict[str, Any]: + if not isinstance(item, dict): + return { + "slug": "", + "status": "error", + "scanned": False, + "ingested": False, + "accepted": False, + "error": "repository entry must be a mapping", + } + slug = str(item.get("slug") or "").strip() + if not slug: + return { + "slug": "", + "status": "error", + "scanned": False, + "ingested": False, + "accepted": False, + "error": "repository entry requires slug", + } + + result: dict[str, Any] = { + "slug": slug, + "status": "pending", + "scanned": False, + "ingested": False, + "accepted": False, + "llm": { + "enabled": llm_enabled, + "attempted": False, + "status": "pending" if llm_enabled else "skipped", + "reason": "" if llm_enabled else llm_skip_reason, + }, + } + repo_path = _manifest_optional_path(item.get("path"), manifest_dir) + if repo_path is None: + result.update({"status": "error", "error": "no repo path configured"}) + return result + result["path"] = str(repo_path) + if not repo_path.is_dir(): + result.update({"status": "error", "error": f"repo path not found: {repo_path}"}) + return result + + commit = str(item.get("commit") or _git_value(repo_path, "rev-parse", "HEAD") or "working-tree") + connector_manifest = _scan_manifest_connector_manifest(args.connector_manifest, manifest_dir, manifest_path) + connectors = [ + ConnectorConfig( + connector_id=connector_id, + connector_type="fabric_registry", + source_path=str(connector_manifest), + ) + for connector_id in args.connector + ] + + try: + result["llm"]["attempted"] = llm_enabled + snapshot = scan_repo( + ScanOptions( + repo_path=repo_path, + repo_slug=slug, + repo_name=str(item.get("name") or repo_path.name), + domain=str(item.get("domain") or "") or None, + commit=commit, + profile=args.profile, + deterministic_only=not llm_enabled, + llm_enabled=llm_enabled, + llm_config=LLMExtractionConfig( + provider=args.llm_provider, + model=args.llm_model, + temperature=args.llm_temperature, + max_tokens=args.llm_max_tokens, + min_confidence=args.llm_min_confidence, + ), + connectors=connectors, + ) + ) + previous_path = _manifest_discovery_snapshot_path(args.previous_dir, slug, args.profile) if args.previous_dir else None + if previous_path and previous_path.is_file(): + previous = json.loads(previous_path.read_text(encoding="utf-8")) + if not isinstance(previous, dict): + raise ValueError(f"previous snapshot must be a JSON object: {previous_path}") + snapshot = reconcile_discovery_snapshots(previous, snapshot) + result["previous_snapshot_path"] = str(previous_path) + elif previous_path: + result["previous_snapshot_path"] = None + except Exception as exc: + result["status"] = "error" + result["error"] = f"scan failed: {exc}" + if llm_enabled: + result["llm"]["status"] = "failed" + result["llm"]["reason"] = "scan_failed" + return result + + result.update( + { + "status": "scanned", + "scanned": True, + "commit": snapshot["source"]["commit"], + "candidate_counts": _candidate_counts(snapshot), + "diff_counts": _discovery_diff_counts(snapshot), + "review_artifact_counts": _review_artifact_counts(snapshot), + "connector_runs": _connector_run_summaries(snapshot), + } + ) + _set_llm_result(result, snapshot, llm_enabled, llm_skip_reason) + + if args.output_dir: + output_path = _manifest_discovery_snapshot_path(args.output_dir, slug, args.profile) + try: + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(json.dumps(snapshot, indent=2, sort_keys=True) + "\n", encoding="utf-8") + result["output_path"] = str(output_path) + except Exception as exc: + result.update({"status": "error", "error": f"cannot write snapshot: {exc}"}) + return result + + if args.ingest and args.dry_run: + result["registry_action"] = "skipped_dry_run" + return result + if args.ingest: + try: + repository = _registry_post_checked( + registry_url, + "/repositories", + { + "slug": slug, + "name": item.get("name") or repo_path.name, + "remote_url": item.get("remote_url") or _git_value(repo_path, "config", "--get", "remote.origin.url"), + "default_branch": item.get("default_branch") or "main", + "state_hub_repo_id": item.get("state_hub_repo_id"), + }, + ) + stored = _registry_post_checked( + registry_url, + f"/repositories/{slug}/discovery-snapshots", + snapshot, + ) + result.update( + { + "status": "ingested", + "repository": {"slug": repository.get("slug")}, + "ingested": True, + "discovery_snapshot_id": stored["id"], + } + ) + if args.accept: + accepted = _registry_post_checked( + registry_url, + f"/repositories/{slug}/discovery-snapshots/{stored['id']}/accept", + {"commit": f"discovery:{snapshot['source']['commit']}"}, + ) + graph_snapshot = accepted["graph_snapshot"] + result.update( + { + "status": "accepted", + "accepted": True, + "accepted_graph_snapshot_id": graph_snapshot["id"], + "accepted_graph_commit": graph_snapshot["commit"], + } + ) + except RegistryRequestError as exc: + result.update({"status": "error", "error": str(exc)}) + return result + + +def _scan_manifest_counts(results: list[dict[str, Any]]) -> dict[str, int]: + return { + "total": len(results), + "scanned": sum(1 for item in results if item.get("scanned")), + "changed": sum(1 for item in results if _scan_manifest_has_diff(item)), + "retired": sum(_int_from_nested(item, "diff_counts", "retired") for item in results), + "conflicted": sum(_int_from_nested(item, "diff_counts", "conflicted") for item in results), + "llm_skipped": sum(1 for item in results if item.get("llm", {}).get("status") == "skipped"), + "llm_failed": sum(1 for item in results if item.get("llm", {}).get("status") == "failed"), + "ingested": sum(1 for item in results if item.get("ingested")), + "accepted": sum(1 for item in results if item.get("accepted")), + "errors": sum(1 for item in results if item.get("status") == "error"), + } + + +def _print_scan_manifest_summary(summary: dict[str, Any]) -> None: + for item in summary["repositories"]: + slug = item["slug"] + if item["status"] == "error": + scanned = " after scan" if item.get("scanned") else "" + print(f"error{scanned} {slug}: {item['error']}") + continue + counts = item.get("candidate_counts", {}) + diff = item.get("diff_counts", {}) + action = "accepted" if item.get("accepted") else "ingested" if item.get("ingested") else "scanned" + if item.get("registry_action") == "skipped_dry_run": + action = "dry-run scanned" + print( + f"{action} {slug} ({item.get('commit', 'working-tree')}): " + f"{counts.get('nodes', 0)} node(s), {counts.get('edges', 0)} edge(s), " + f"{counts.get('attributes', 0)} attribute(s), " + f"diff +{diff.get('added', 0)}/~{diff.get('changed', 0)}" + f"/-{diff.get('retired', 0)}/!{diff.get('conflicted', 0)}" + ) + counts = summary["counts"] + print( + f"summary: {counts['total']} repo(s), {counts['scanned']} scanned, " + f"{counts['changed']} changed, {counts['retired']} retired, " + f"{counts['conflicted']} conflicted, {counts['llm_skipped']} LLM skipped, " + f"{counts['llm_failed']} LLM failed, {counts['accepted']} accepted, " + f"{counts['errors']} error(s)" + ) + + +def _manifest_repo_slug(item: object) -> str: + if not isinstance(item, dict): + return "" + return str(item.get("slug") or "").strip() + + +def _scan_manifest_connector_manifest( + connector_manifest: Path | None, + manifest_dir: Path, + manifest_path: Path, +) -> Path: + if connector_manifest is None: + return manifest_path + path = connector_manifest.expanduser() + return path.resolve() if path.is_absolute() else (manifest_dir / path).resolve() + + +def _manifest_discovery_snapshot_path(base_dir: Path, slug: str, profile: str) -> Path: + return base_dir.resolve() / f"{_slugify(slug)}-{_slugify(profile)}.discovery.json" + + +def _candidate_counts(snapshot: dict[str, Any]) -> dict[str, int]: + candidates = snapshot.get("candidates") if isinstance(snapshot.get("candidates"), dict) else {} + return { + "nodes": len(candidates.get("nodes", [])), + "edges": len(candidates.get("edges", [])), + "attributes": len(candidates.get("attributes", [])), + } + + +def _discovery_diff_counts(snapshot: dict[str, Any]) -> dict[str, int]: + reconciliation = snapshot.get("reconciliation") if isinstance(snapshot.get("reconciliation"), dict) else {} + diff = reconciliation.get("diff") if isinstance(reconciliation.get("diff"), dict) else {} + return { + "added": len(diff.get("added", [])), + "changed": len(diff.get("changed", [])), + "retired": len(diff.get("retired", [])), + "conflicted": len(diff.get("conflicted", [])), + } + + +def _review_artifact_counts(snapshot: dict[str, Any]) -> dict[str, int]: + counts: dict[str, int] = {} + for artifact in snapshot.get("review_artifacts", []): + if not isinstance(artifact, dict): + continue + artifact_type = str(artifact.get("artifact_type") or "unknown") + counts[artifact_type] = counts.get(artifact_type, 0) + 1 + return counts + + +def _connector_run_summaries(snapshot: dict[str, Any]) -> list[dict[str, Any]]: + summaries: list[dict[str, Any]] = [] + for run in snapshot.get("connector_runs", []): + if not isinstance(run, dict): + continue + summary = { + "connector_id": run.get("connector_id"), + "status": run.get("status"), + "candidate_counts": run.get("candidate_counts", {}), + } + if run.get("message"): + summary["message"] = run.get("message") + summaries.append(summary) + return summaries + + +def _set_llm_result( + result: dict[str, Any], + snapshot: dict[str, Any], + llm_enabled: bool, + llm_skip_reason: str, +) -> None: + if not llm_enabled: + result["llm"] = { + "enabled": False, + "attempted": False, + "status": "skipped", + "reason": llm_skip_reason, + } + return + artifact_counts = _review_artifact_counts(snapshot) + failure_count = sum( + count for artifact_type, count in artifact_counts.items() + if artifact_type in {"llm_execution_error", "llm_output_invalid"} + ) + result["llm"] = { + "enabled": True, + "attempted": True, + "status": "failed" if failure_count else "used", + "failure_artifacts": failure_count, + } + + +def _scan_manifest_has_diff(item: dict[str, Any]) -> bool: + diff = item.get("diff_counts") if isinstance(item.get("diff_counts"), dict) else {} + return any(int(diff.get(key, 0)) for key in ("added", "changed", "retired", "conflicted")) + + +def _int_from_nested(item: dict[str, Any], object_key: str, value_key: str) -> int: + nested = item.get(object_key) if isinstance(item.get(object_key), dict) else {} + try: + return int(nested.get(value_key, 0)) + except (TypeError, ValueError): + return 0 + + def _sync_manifest_repo(registry_url: str, manifest_dir: Path, item: object) -> dict[str, object]: if not isinstance(item, dict): return {"slug": "", "status": "error", "error": "repository entry must be a mapping"} diff --git a/tests/test_scan_manifest.py b/tests/test_scan_manifest.py new file mode 100644 index 0000000..6c26ef8 --- /dev/null +++ b/tests/test_scan_manifest.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +import json +import threading +from http.server import ThreadingHTTPServer +from pathlib import Path + +from railiance_fabric.cli import main as cli_main +from railiance_fabric.registry import RegistryStore +from railiance_fabric.server import RegistryHandler + + +def test_registry_scan_manifest_dry_run_keeps_repo_failures_isolated(tmp_path: Path, capsys) -> None: + repo = _minimal_repo(tmp_path, "fixture-repo") + manifest = _manifest( + tmp_path, + [ + {"slug": "fixture-repo", "name": "Fixture Repo", "path": str(repo)}, + {"slug": "missing-repo", "name": "Missing Repo", "path": str(tmp_path / "missing-repo")}, + ], + ) + output_dir = tmp_path / "snapshots" + + assert cli_main( + [ + "registry", + "scan-manifest", + str(manifest), + "--dry-run", + "--output-dir", + str(output_dir), + "--json", + ] + ) == 0 + + summary = json.loads(capsys.readouterr().out) + assert summary["counts"]["total"] == 2 + assert summary["counts"]["scanned"] == 1 + assert summary["counts"]["errors"] == 1 + assert summary["counts"]["llm_skipped"] == 2 + assert summary["repositories"][0]["status"] == "scanned" + assert summary["repositories"][1]["status"] == "error" + assert (output_dir / "fixture-repo-deterministic.discovery.json").is_file() + + +def test_registry_scan_manifest_ingests_and_accepts_snapshots(tmp_path: Path, capsys) -> None: + repo = _minimal_repo(tmp_path, "fixture-repo") + manifest = _manifest( + tmp_path, + [{"slug": "fixture-repo", "name": "Fixture Repo", "domain": "testing", "path": str(repo)}], + ) + store = RegistryStore(tmp_path / "registry.sqlite3") + store.init_schema() + + class Handler(RegistryHandler): + pass + + Handler.store = store + server = ThreadingHTTPServer(("127.0.0.1", 0), Handler) + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + try: + base_url = f"http://127.0.0.1:{server.server_port}" + assert cli_main( + [ + "registry", + "scan-manifest", + str(manifest), + "--registry-url", + base_url, + "--repo-slug", + "fixture-repo", + "--ingest", + "--accept", + "--json", + ] + ) == 0 + + summary = json.loads(capsys.readouterr().out) + assert summary["counts"]["total"] == 1 + assert summary["counts"]["scanned"] == 1 + assert summary["counts"]["ingested"] == 1 + assert summary["counts"]["accepted"] == 1 + assert summary["counts"]["errors"] == 0 + assert summary["repositories"][0]["discovery_snapshot_id"] == 1 + assert store.list_discovery_snapshots("fixture-repo")[0]["id"] == 1 + assert store.latest_snapshots()[0]["commit"].startswith("discovery:") + finally: + server.shutdown() + server.server_close() + thread.join(timeout=5) + + +def _minimal_repo(tmp_path: Path, slug: str) -> Path: + repo = tmp_path / slug + repo.mkdir() + (repo / "README.md").write_text(f"# {slug}\n", encoding="utf-8") + return repo + + +def _manifest(tmp_path: Path, repositories: list[dict[str, object]]) -> Path: + manifest = tmp_path / "local-repos.yaml" + manifest.write_text( + json.dumps( + { + "apiVersion": "railiance.fabric/v1alpha1", + "kind": "RegistryOnboardingManifest", + "registry_url": "http://127.0.0.1:8765", + "repositories": repositories, + } + ), + encoding="utf-8", + ) + return manifest diff --git a/workplans/RAIL-FAB-WP-0010-repo-reality-scanner.md b/workplans/RAIL-FAB-WP-0010-repo-reality-scanner.md index 9efe480..1f48c9e 100644 --- a/workplans/RAIL-FAB-WP-0010-repo-reality-scanner.md +++ b/workplans/RAIL-FAB-WP-0010-repo-reality-scanner.md @@ -273,7 +273,7 @@ Acceptance notes: ```task id: RAIL-FAB-WP-0010-T07 -status: todo +status: done priority: medium state_hub_task_id: "28014246-0a64-4d69-8065-98de881bffb4" ```