Add multi-repo discovery scan orchestration

This commit is contained in:
2026-05-19 18:29:04 +02:00
parent 68cf01aa39
commit be7252019f
4 changed files with 601 additions and 1 deletions

View File

@@ -210,6 +210,54 @@ Failures do not corrupt the scan. Missing catalogs become
`connector_failed` artifacts, and future remote connectors should use
`connector_rate_limited` when backoff is required.
## Multi-Repo Orchestration
Known local repos can be scanned from the same onboarding manifest used by
`registry sync-manifest`:
```bash
railiance-fabric registry scan-manifest registry/local-repos.yaml \
--dry-run \
--output-dir .fabric-discovery
```
The command isolates each repo. A missing path, invalid previous snapshot, or
registry write failure is reported for that repo without aborting the rest of
the run. The summary includes repo counts for scanned, changed, retired,
conflicted, LLM skipped, LLM failed, ingested, accepted, and errors so it can be
copied into State Hub progress notes or future automation output.
Useful controls:
- `--repo-slug <slug>` can be repeated to scan an allowlist.
- `--profile <name>` tags the scan profile and output filename.
- `--previous-dir <dir>` reconciles each repo against
`<slug>-<profile>.discovery.json` from an earlier run.
- `--llm` enables LLM-assisted extraction; `--deterministic-only` forces the
offline rule path.
- `--llm-max-runs <n>` caps how many repos may attempt LLM extraction in one
orchestration run, while `--llm-max-tokens` remains the per-repo request cap.
- `--connector local-fabric-registry` attaches manifest-derived registry facts
to every repo scan.
- `--ingest` stores discovery snapshots in the registry; `--accept` then
projects accepted candidates into graph snapshots. `--dry-run` suppresses
registry writes even when those flags are present.
Example review cycle:
```bash
railiance-fabric registry scan-manifest registry/local-repos.yaml \
--repo-slug railiance-fabric \
--previous-dir .fabric-discovery \
--output-dir .fabric-discovery \
--connector local-fabric-registry \
--dry-run
```
After review, rerun with `--ingest` to store the snapshots. Add `--accept` only
when candidates marked `review_state: accepted` should be projected into the
registry graph.
## Identity
Identity is the main safety boundary. The scanner must not append guesses on

View File

@@ -9,6 +9,7 @@ import urllib.error
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from .connectors import ConnectorConfig
from .loader import declaration_files, load_yaml
@@ -117,6 +118,40 @@ def build_parser() -> argparse.ArgumentParser:
sync_manifest.add_argument("--strict", action="store_true", help="Exit non-zero when any repo cannot be synced.")
sync_manifest.add_argument("--json", action="store_true", help="Print the raw manifest sync summary.")
scan_manifest = registry_sub.add_parser("scan-manifest", help="Run discovery scans from an onboarding manifest.")
scan_manifest.add_argument("manifest", type=Path)
scan_manifest.add_argument("--registry-url", default=None, help="Override the manifest registry_url.")
scan_manifest.add_argument("--repo-slug", action="append", default=[], help="Only scan this repo slug. May be repeated.")
scan_manifest.add_argument("--profile", default="deterministic", help="Discovery scan profile name.")
scan_manifest.add_argument("--output-dir", type=Path, default=None, help="Write one discovery snapshot JSON per scanned repo.")
scan_manifest.add_argument("--previous-dir", type=Path, default=None, help="Read previous per-repo snapshots for reconciliation.")
scan_manifest.add_argument("--dry-run", action="store_true", help="Do not write to the registry.")
scan_manifest.add_argument("--ingest", action="store_true", help="Store each discovery snapshot in the registry.")
scan_manifest.add_argument("--accept", action="store_true", help="Accept ingested snapshots after scanning.")
scan_manifest.add_argument("--strict", action="store_true", help="Exit non-zero when any repo cannot be scanned or stored.")
scan_manifest.add_argument("--json", action="store_true", help="Print the raw manifest scan summary.")
scan_manifest.add_argument("--llm", action="store_true", help="Enable llm-connect assisted extraction.")
scan_manifest.add_argument("--deterministic-only", action="store_true", help="Force deterministic-only scans even when --llm is set.")
scan_manifest.add_argument("--llm-provider", default="mock", help="llm-connect provider name.")
scan_manifest.add_argument("--llm-model", default="mock", help="Model name passed to llm-connect.")
scan_manifest.add_argument("--llm-temperature", type=float, default=0.0)
scan_manifest.add_argument("--llm-max-tokens", type=int, default=1500)
scan_manifest.add_argument("--llm-min-confidence", type=float, default=0.6)
scan_manifest.add_argument("--llm-max-runs", type=int, default=None, help="Maximum repos allowed to run LLM extraction.")
scan_manifest.add_argument(
"--connector",
action="append",
choices=["local-fabric-registry"],
default=[],
help="Enable a discovery connector for each repo. May be passed more than once.",
)
scan_manifest.add_argument(
"--connector-manifest",
type=Path,
default=None,
help="Manifest path for the local-fabric-registry connector. Defaults to the scanned manifest.",
)
cyclonedx = registry_sub.add_parser("ingest-cyclonedx", help="Ingest a CycloneDX SBOM as library inventory.")
cyclonedx.add_argument("sbom", type=Path)
cyclonedx.add_argument("--registry-url", default="http://127.0.0.1:8765")
@@ -198,6 +233,8 @@ def main(argv: list[str] | None = None) -> int:
return _registry_sync(args)
if args.registry_command == "sync-manifest":
return _registry_sync_manifest(args)
if args.registry_command == "scan-manifest":
return _registry_scan_manifest(args)
if args.registry_command == "ingest-cyclonedx":
return _registry_ingest_cyclonedx(args)
if args.registry_command == "ingest-discovery":
@@ -296,6 +333,407 @@ def _registry_sync_manifest(args: argparse.Namespace) -> int:
return 0
def _registry_scan_manifest(args: argparse.Namespace) -> int:
if args.llm_max_runs is not None and args.llm_max_runs < 0:
print("ERROR --llm-max-runs must be zero or greater", file=sys.stderr)
return 1
if args.accept and not args.ingest and not args.dry_run:
print("ERROR --accept requires --ingest unless --dry-run is set", file=sys.stderr)
return 1
manifest_path = args.manifest.resolve()
manifest = load_yaml(manifest_path)
if not isinstance(manifest, dict):
print(f"ERROR {manifest_path}: manifest must be a YAML mapping", file=sys.stderr)
return 1
repositories = manifest.get("repositories")
if not isinstance(repositories, list):
print(f"ERROR {manifest_path}: manifest requires a repositories list", file=sys.stderr)
return 1
registry_url = args.registry_url or str(manifest.get("registry_url") or "http://127.0.0.1:8765")
allowlist = {slug for slug in args.repo_slug if slug}
selected = [
item for item in repositories
if not allowlist or _manifest_repo_slug(item) in allowlist
]
llm_runs_used = 0
results: list[dict[str, Any]] = []
for item in selected:
llm_enabled = False
llm_skip_reason = "deterministic_only"
if args.llm and not args.deterministic_only:
if args.llm_max_runs is None or llm_runs_used < args.llm_max_runs:
llm_enabled = True
llm_skip_reason = ""
else:
llm_skip_reason = "budget_exhausted"
result = _scan_manifest_repo(
args,
registry_url,
manifest_path.parent,
manifest_path,
item,
llm_enabled=llm_enabled,
llm_skip_reason=llm_skip_reason,
)
if result.get("llm", {}).get("attempted"):
llm_runs_used += 1
results.append(result)
summary = {
"manifest": str(manifest_path),
"registry_url": registry_url,
"generated_at": _utc_now(),
"profile": args.profile,
"dry_run": args.dry_run,
"ingest": args.ingest and not args.dry_run,
"accept": args.accept and args.ingest and not args.dry_run,
"allowlist": sorted(allowlist),
"llm": {
"requested": bool(args.llm),
"deterministic_only": bool(args.deterministic_only or not args.llm),
"max_runs": args.llm_max_runs,
"used_runs": llm_runs_used,
},
"repositories": results,
"counts": _scan_manifest_counts(results),
}
if args.json:
print(json.dumps(summary, indent=2, sort_keys=True))
else:
_print_scan_manifest_summary(summary)
if args.strict and summary["counts"]["errors"]:
return 1
return 0
def _scan_manifest_repo(
args: argparse.Namespace,
registry_url: str,
manifest_dir: Path,
manifest_path: Path,
item: object,
*,
llm_enabled: bool,
llm_skip_reason: str,
) -> dict[str, Any]:
if not isinstance(item, dict):
return {
"slug": "<invalid>",
"status": "error",
"scanned": False,
"ingested": False,
"accepted": False,
"error": "repository entry must be a mapping",
}
slug = str(item.get("slug") or "").strip()
if not slug:
return {
"slug": "<missing>",
"status": "error",
"scanned": False,
"ingested": False,
"accepted": False,
"error": "repository entry requires slug",
}
result: dict[str, Any] = {
"slug": slug,
"status": "pending",
"scanned": False,
"ingested": False,
"accepted": False,
"llm": {
"enabled": llm_enabled,
"attempted": False,
"status": "pending" if llm_enabled else "skipped",
"reason": "" if llm_enabled else llm_skip_reason,
},
}
repo_path = _manifest_optional_path(item.get("path"), manifest_dir)
if repo_path is None:
result.update({"status": "error", "error": "no repo path configured"})
return result
result["path"] = str(repo_path)
if not repo_path.is_dir():
result.update({"status": "error", "error": f"repo path not found: {repo_path}"})
return result
commit = str(item.get("commit") or _git_value(repo_path, "rev-parse", "HEAD") or "working-tree")
connector_manifest = _scan_manifest_connector_manifest(args.connector_manifest, manifest_dir, manifest_path)
connectors = [
ConnectorConfig(
connector_id=connector_id,
connector_type="fabric_registry",
source_path=str(connector_manifest),
)
for connector_id in args.connector
]
try:
result["llm"]["attempted"] = llm_enabled
snapshot = scan_repo(
ScanOptions(
repo_path=repo_path,
repo_slug=slug,
repo_name=str(item.get("name") or repo_path.name),
domain=str(item.get("domain") or "") or None,
commit=commit,
profile=args.profile,
deterministic_only=not llm_enabled,
llm_enabled=llm_enabled,
llm_config=LLMExtractionConfig(
provider=args.llm_provider,
model=args.llm_model,
temperature=args.llm_temperature,
max_tokens=args.llm_max_tokens,
min_confidence=args.llm_min_confidence,
),
connectors=connectors,
)
)
previous_path = _manifest_discovery_snapshot_path(args.previous_dir, slug, args.profile) if args.previous_dir else None
if previous_path and previous_path.is_file():
previous = json.loads(previous_path.read_text(encoding="utf-8"))
if not isinstance(previous, dict):
raise ValueError(f"previous snapshot must be a JSON object: {previous_path}")
snapshot = reconcile_discovery_snapshots(previous, snapshot)
result["previous_snapshot_path"] = str(previous_path)
elif previous_path:
result["previous_snapshot_path"] = None
except Exception as exc:
result["status"] = "error"
result["error"] = f"scan failed: {exc}"
if llm_enabled:
result["llm"]["status"] = "failed"
result["llm"]["reason"] = "scan_failed"
return result
result.update(
{
"status": "scanned",
"scanned": True,
"commit": snapshot["source"]["commit"],
"candidate_counts": _candidate_counts(snapshot),
"diff_counts": _discovery_diff_counts(snapshot),
"review_artifact_counts": _review_artifact_counts(snapshot),
"connector_runs": _connector_run_summaries(snapshot),
}
)
_set_llm_result(result, snapshot, llm_enabled, llm_skip_reason)
if args.output_dir:
output_path = _manifest_discovery_snapshot_path(args.output_dir, slug, args.profile)
try:
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(snapshot, indent=2, sort_keys=True) + "\n", encoding="utf-8")
result["output_path"] = str(output_path)
except Exception as exc:
result.update({"status": "error", "error": f"cannot write snapshot: {exc}"})
return result
if args.ingest and args.dry_run:
result["registry_action"] = "skipped_dry_run"
return result
if args.ingest:
try:
repository = _registry_post_checked(
registry_url,
"/repositories",
{
"slug": slug,
"name": item.get("name") or repo_path.name,
"remote_url": item.get("remote_url") or _git_value(repo_path, "config", "--get", "remote.origin.url"),
"default_branch": item.get("default_branch") or "main",
"state_hub_repo_id": item.get("state_hub_repo_id"),
},
)
stored = _registry_post_checked(
registry_url,
f"/repositories/{slug}/discovery-snapshots",
snapshot,
)
result.update(
{
"status": "ingested",
"repository": {"slug": repository.get("slug")},
"ingested": True,
"discovery_snapshot_id": stored["id"],
}
)
if args.accept:
accepted = _registry_post_checked(
registry_url,
f"/repositories/{slug}/discovery-snapshots/{stored['id']}/accept",
{"commit": f"discovery:{snapshot['source']['commit']}"},
)
graph_snapshot = accepted["graph_snapshot"]
result.update(
{
"status": "accepted",
"accepted": True,
"accepted_graph_snapshot_id": graph_snapshot["id"],
"accepted_graph_commit": graph_snapshot["commit"],
}
)
except RegistryRequestError as exc:
result.update({"status": "error", "error": str(exc)})
return result
def _scan_manifest_counts(results: list[dict[str, Any]]) -> dict[str, int]:
return {
"total": len(results),
"scanned": sum(1 for item in results if item.get("scanned")),
"changed": sum(1 for item in results if _scan_manifest_has_diff(item)),
"retired": sum(_int_from_nested(item, "diff_counts", "retired") for item in results),
"conflicted": sum(_int_from_nested(item, "diff_counts", "conflicted") for item in results),
"llm_skipped": sum(1 for item in results if item.get("llm", {}).get("status") == "skipped"),
"llm_failed": sum(1 for item in results if item.get("llm", {}).get("status") == "failed"),
"ingested": sum(1 for item in results if item.get("ingested")),
"accepted": sum(1 for item in results if item.get("accepted")),
"errors": sum(1 for item in results if item.get("status") == "error"),
}
def _print_scan_manifest_summary(summary: dict[str, Any]) -> None:
for item in summary["repositories"]:
slug = item["slug"]
if item["status"] == "error":
scanned = " after scan" if item.get("scanned") else ""
print(f"error{scanned} {slug}: {item['error']}")
continue
counts = item.get("candidate_counts", {})
diff = item.get("diff_counts", {})
action = "accepted" if item.get("accepted") else "ingested" if item.get("ingested") else "scanned"
if item.get("registry_action") == "skipped_dry_run":
action = "dry-run scanned"
print(
f"{action} {slug} ({item.get('commit', 'working-tree')}): "
f"{counts.get('nodes', 0)} node(s), {counts.get('edges', 0)} edge(s), "
f"{counts.get('attributes', 0)} attribute(s), "
f"diff +{diff.get('added', 0)}/~{diff.get('changed', 0)}"
f"/-{diff.get('retired', 0)}/!{diff.get('conflicted', 0)}"
)
counts = summary["counts"]
print(
f"summary: {counts['total']} repo(s), {counts['scanned']} scanned, "
f"{counts['changed']} changed, {counts['retired']} retired, "
f"{counts['conflicted']} conflicted, {counts['llm_skipped']} LLM skipped, "
f"{counts['llm_failed']} LLM failed, {counts['accepted']} accepted, "
f"{counts['errors']} error(s)"
)
def _manifest_repo_slug(item: object) -> str:
if not isinstance(item, dict):
return "<invalid>"
return str(item.get("slug") or "").strip()
def _scan_manifest_connector_manifest(
connector_manifest: Path | None,
manifest_dir: Path,
manifest_path: Path,
) -> Path:
if connector_manifest is None:
return manifest_path
path = connector_manifest.expanduser()
return path.resolve() if path.is_absolute() else (manifest_dir / path).resolve()
def _manifest_discovery_snapshot_path(base_dir: Path, slug: str, profile: str) -> Path:
return base_dir.resolve() / f"{_slugify(slug)}-{_slugify(profile)}.discovery.json"
def _candidate_counts(snapshot: dict[str, Any]) -> dict[str, int]:
candidates = snapshot.get("candidates") if isinstance(snapshot.get("candidates"), dict) else {}
return {
"nodes": len(candidates.get("nodes", [])),
"edges": len(candidates.get("edges", [])),
"attributes": len(candidates.get("attributes", [])),
}
def _discovery_diff_counts(snapshot: dict[str, Any]) -> dict[str, int]:
reconciliation = snapshot.get("reconciliation") if isinstance(snapshot.get("reconciliation"), dict) else {}
diff = reconciliation.get("diff") if isinstance(reconciliation.get("diff"), dict) else {}
return {
"added": len(diff.get("added", [])),
"changed": len(diff.get("changed", [])),
"retired": len(diff.get("retired", [])),
"conflicted": len(diff.get("conflicted", [])),
}
def _review_artifact_counts(snapshot: dict[str, Any]) -> dict[str, int]:
counts: dict[str, int] = {}
for artifact in snapshot.get("review_artifacts", []):
if not isinstance(artifact, dict):
continue
artifact_type = str(artifact.get("artifact_type") or "unknown")
counts[artifact_type] = counts.get(artifact_type, 0) + 1
return counts
def _connector_run_summaries(snapshot: dict[str, Any]) -> list[dict[str, Any]]:
summaries: list[dict[str, Any]] = []
for run in snapshot.get("connector_runs", []):
if not isinstance(run, dict):
continue
summary = {
"connector_id": run.get("connector_id"),
"status": run.get("status"),
"candidate_counts": run.get("candidate_counts", {}),
}
if run.get("message"):
summary["message"] = run.get("message")
summaries.append(summary)
return summaries
def _set_llm_result(
result: dict[str, Any],
snapshot: dict[str, Any],
llm_enabled: bool,
llm_skip_reason: str,
) -> None:
if not llm_enabled:
result["llm"] = {
"enabled": False,
"attempted": False,
"status": "skipped",
"reason": llm_skip_reason,
}
return
artifact_counts = _review_artifact_counts(snapshot)
failure_count = sum(
count for artifact_type, count in artifact_counts.items()
if artifact_type in {"llm_execution_error", "llm_output_invalid"}
)
result["llm"] = {
"enabled": True,
"attempted": True,
"status": "failed" if failure_count else "used",
"failure_artifacts": failure_count,
}
def _scan_manifest_has_diff(item: dict[str, Any]) -> bool:
diff = item.get("diff_counts") if isinstance(item.get("diff_counts"), dict) else {}
return any(int(diff.get(key, 0)) for key in ("added", "changed", "retired", "conflicted"))
def _int_from_nested(item: dict[str, Any], object_key: str, value_key: str) -> int:
nested = item.get(object_key) if isinstance(item.get(object_key), dict) else {}
try:
return int(nested.get(value_key, 0))
except (TypeError, ValueError):
return 0
def _sync_manifest_repo(registry_url: str, manifest_dir: Path, item: object) -> dict[str, object]:
if not isinstance(item, dict):
return {"slug": "<invalid>", "status": "error", "error": "repository entry must be a mapping"}

114
tests/test_scan_manifest.py Normal file
View File

@@ -0,0 +1,114 @@
from __future__ import annotations
import json
import threading
from http.server import ThreadingHTTPServer
from pathlib import Path
from railiance_fabric.cli import main as cli_main
from railiance_fabric.registry import RegistryStore
from railiance_fabric.server import RegistryHandler
def test_registry_scan_manifest_dry_run_keeps_repo_failures_isolated(tmp_path: Path, capsys) -> None:
repo = _minimal_repo(tmp_path, "fixture-repo")
manifest = _manifest(
tmp_path,
[
{"slug": "fixture-repo", "name": "Fixture Repo", "path": str(repo)},
{"slug": "missing-repo", "name": "Missing Repo", "path": str(tmp_path / "missing-repo")},
],
)
output_dir = tmp_path / "snapshots"
assert cli_main(
[
"registry",
"scan-manifest",
str(manifest),
"--dry-run",
"--output-dir",
str(output_dir),
"--json",
]
) == 0
summary = json.loads(capsys.readouterr().out)
assert summary["counts"]["total"] == 2
assert summary["counts"]["scanned"] == 1
assert summary["counts"]["errors"] == 1
assert summary["counts"]["llm_skipped"] == 2
assert summary["repositories"][0]["status"] == "scanned"
assert summary["repositories"][1]["status"] == "error"
assert (output_dir / "fixture-repo-deterministic.discovery.json").is_file()
def test_registry_scan_manifest_ingests_and_accepts_snapshots(tmp_path: Path, capsys) -> None:
repo = _minimal_repo(tmp_path, "fixture-repo")
manifest = _manifest(
tmp_path,
[{"slug": "fixture-repo", "name": "Fixture Repo", "domain": "testing", "path": str(repo)}],
)
store = RegistryStore(tmp_path / "registry.sqlite3")
store.init_schema()
class Handler(RegistryHandler):
pass
Handler.store = store
server = ThreadingHTTPServer(("127.0.0.1", 0), Handler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
try:
base_url = f"http://127.0.0.1:{server.server_port}"
assert cli_main(
[
"registry",
"scan-manifest",
str(manifest),
"--registry-url",
base_url,
"--repo-slug",
"fixture-repo",
"--ingest",
"--accept",
"--json",
]
) == 0
summary = json.loads(capsys.readouterr().out)
assert summary["counts"]["total"] == 1
assert summary["counts"]["scanned"] == 1
assert summary["counts"]["ingested"] == 1
assert summary["counts"]["accepted"] == 1
assert summary["counts"]["errors"] == 0
assert summary["repositories"][0]["discovery_snapshot_id"] == 1
assert store.list_discovery_snapshots("fixture-repo")[0]["id"] == 1
assert store.latest_snapshots()[0]["commit"].startswith("discovery:")
finally:
server.shutdown()
server.server_close()
thread.join(timeout=5)
def _minimal_repo(tmp_path: Path, slug: str) -> Path:
repo = tmp_path / slug
repo.mkdir()
(repo / "README.md").write_text(f"# {slug}\n", encoding="utf-8")
return repo
def _manifest(tmp_path: Path, repositories: list[dict[str, object]]) -> Path:
manifest = tmp_path / "local-repos.yaml"
manifest.write_text(
json.dumps(
{
"apiVersion": "railiance.fabric/v1alpha1",
"kind": "RegistryOnboardingManifest",
"registry_url": "http://127.0.0.1:8765",
"repositories": repositories,
}
),
encoding="utf-8",
)
return manifest

View File

@@ -273,7 +273,7 @@ Acceptance notes:
```task
id: RAIL-FAB-WP-0010-T07
status: todo
status: done
priority: medium
state_hub_task_id: "28014246-0a64-4d69-8065-98de881bffb4"
```