from __future__ import annotations import argparse import json import sys from pathlib import Path from typing import Any import yaml from jsonschema import Draft202012Validator from reuse_surface.catalog import write_catalog from reuse_surface.federation import write_federated_index from reuse_surface import hub_client from reuse_surface.graph import check_relations, render_mermaid, write_graph from reuse_surface.hub_sync import ( DEFAULT_SOURCES_PATH, build_manifest, load_sources_manifest, write_sources_manifest, ) from reuse_surface.overlaps import find_overlaps from reuse_surface.reports import ( cohort_filters_from_args, collect_gap_report, default_roster_path, format_cohort_json, format_cohort_markdown, format_gap_json, format_gap_markdown, select_cohort, ) from reuse_surface.establish import ( discover_capabilities, format_publish_check_markdown, publish_check, scaffold_next_steps, scaffold_registry, ) from reuse_surface.registry_update import ( apply_deterministic_suggestions, collect_deterministic_suggestions, format_suggestions_json, format_suggestions_markdown, suggest_llm_updates, ) from reuse_surface.stats import ( collect_roster_stats, collect_stats, format_roster_stats_json, format_roster_stats_markdown, format_stats_json, format_stats_markdown, ) from reuse_surface.registry import ( ROOT, capability_paths, level_at_least, load_index, load_index_at, load_schema, parse_front_matter, parse_vector, registry_paths, ) def _registry_root(args: argparse.Namespace) -> Path: if getattr(args, "root", None): return Path(args.root).resolve() return ROOT def _check_index_drift( entry_paths: list[Path], index: dict[str, Any], repo_root: Path, ) -> list[str]: warnings: list[str] = [] indexed_paths = {item["path"] for item in index.get("capabilities", [])} file_paths = {str(path.relative_to(repo_root)) for path in entry_paths} for path in sorted(file_paths - indexed_paths): warnings.append(f"index drift: entry file not indexed: {path}") for path in sorted(indexed_paths - file_paths): warnings.append(f"index drift: index references missing file: {path}") return warnings def _capability_paths_for(repo_root: Path, target: Path | None) -> list[Path]: if target is not None: return [target] cap_dir = registry_paths(repo_root)["capabilities"] return sorted(path for path in cap_dir.glob("*.md") if path.name != ".gitkeep") def _run_validate( repo_root: Path, *, target: Path | None, relations: bool, ) -> tuple[list[str], list[str], list[Path]]: schema = load_schema() validator = Draft202012Validator(schema) paths = _capability_paths_for(repo_root, target) errors: list[str] = [] warnings: list[str] = [] for path in paths: try: data = parse_front_matter(path) except ValueError as exc: errors.append(str(exc)) continue for error in sorted(validator.iter_errors(data), key=lambda e: e.path): location = ".".join(str(part) for part in error.path) or "" errors.append(f"{path}: {location}: {error.message}") if not target: index_path = registry_paths(repo_root)["index"] if index_path.exists(): index = load_index_at(index_path) warnings.extend(_check_index_drift(paths, index, repo_root)) if relations and repo_root == ROOT: warnings.extend(check_relations()) return errors, warnings, paths def cmd_validate(args: argparse.Namespace) -> int: repo_root = _registry_root(args) target = Path(args.path) if args.path else None if target and not target.is_absolute(): target = repo_root / target errors, warnings, paths = _run_validate( repo_root, target=target, relations=args.relations ) for warning in warnings: print(f"warning: {warning}", file=sys.stderr) for error in errors: print(f"error: {error}", file=sys.stderr) if errors or (args.fail_on_warnings and warnings): return 1 print(f"ok: validated {len(paths)} capability entr{'y' if len(paths) == 1 else 'ies'}") return 0 def _matches_query(item: dict[str, Any], args: argparse.Namespace) -> bool: vector = parse_vector(item["vector"]) if args.discovery_min and not level_at_least( "discovery", vector["discovery"], args.discovery_min ): return False if args.availability_min and not level_at_least( "availability", vector["availability"], args.availability_min ): return False if args.domain and item.get("domain") != args.domain: return False if args.tag and args.tag not in item.get("tags", []): return False if args.consumption_mode: modes = [mode.lower() for mode in item.get("consumption_modes", [])] if args.consumption_mode.lower() not in modes: return False if args.keyword: haystack = " ".join( [ item.get("id", ""), item.get("name", ""), item.get("summary", ""), " ".join(item.get("tags", [])), ] ).lower() if args.keyword.lower() not in haystack: return False return True def cmd_query(args: argparse.Namespace) -> int: index = load_index() matches = [ item for item in index.get("capabilities", []) if _matches_query(item, args) ] if not matches: print("no matches") return 0 for item in matches: print( f"{item['id']} {item['vector']} {item['path']}\n" f" {item['summary']}" ) print(f"\n{len(matches)} match{'es' if len(matches) != 1 else ''}") return 0 def _load_indexed_entries() -> list[tuple[dict[str, Any], dict[str, Any]]]: index = load_index() indexed_entries: list[tuple[dict[str, Any], dict[str, Any]]] = [] for item in index.get("capabilities", []): path = ROOT / item["path"] indexed_entries.append((item, parse_front_matter(path))) return indexed_entries def cmd_overlaps(args: argparse.Namespace) -> int: indexed_entries = _load_indexed_entries() candidates = find_overlaps(indexed_entries, threshold=args.threshold) if not candidates: print("no overlap candidates") return 0 for candidate in candidates: reasons = "; ".join(candidate.reasons) print( f"{candidate.left_id} <> {candidate.right_id} " f"score={candidate.score:.2f} {reasons}" ) print(f"\n{len(candidates)} candidate{'s' if len(candidates) != 1 else ''}") return 0 def cmd_federation_compose(args: argparse.Namespace) -> int: try: target, warnings = write_federated_index(refresh=args.refresh) except (FileNotFoundError, ValueError) as exc: print(f"error: {exc}", file=sys.stderr) return 1 for warning in warnings: print(f"warning: {warning}", file=sys.stderr) import yaml data = yaml.safe_load(target.read_text(encoding="utf-8")) count = len(data.get("capabilities", [])) print(f"ok: wrote {target.relative_to(ROOT)} ({count} capabilities)") return 0 def cmd_graph(args: argparse.Namespace) -> int: warnings = check_relations() if args.check else [] content = render_mermaid() if args.stdout: print(content, end="") else: path = write_graph() from reuse_surface.catalog import GRAPH_HTML, render_graph_explorer GRAPH_HTML.parent.mkdir(parents=True, exist_ok=True) GRAPH_HTML.write_text(render_graph_explorer(content), encoding="utf-8") print(f"ok: wrote {path.relative_to(ROOT)}") print(f"ok: wrote {GRAPH_HTML.relative_to(ROOT)}") for warning in warnings: print(f"warning: {warning}", file=sys.stderr) if args.fail_on_warnings and warnings: return 1 return 0 def cmd_catalog(args: argparse.Namespace) -> int: index = load_index() indexed_entries = _load_indexed_entries() paths = write_catalog( index, indexed_entries, mermaid_source=render_mermaid() ) for path in paths: print(f"ok: wrote {path.relative_to(ROOT)}") return 0 def _service_url(args: argparse.Namespace) -> str | None: return getattr(args, "base_url", None) def cmd_serve(args: argparse.Namespace) -> int: from reuse_surface.hub.app import main as serve_main serve_main() return 0 def cmd_hub_status(args: argparse.Namespace) -> int: try: status, payload = hub_client.hub_status(_service_url(args)) except ValueError as exc: print(f"error: {exc}", file=sys.stderr) return 1 if status != 200: print(f"error: hub returned {status}: {payload}", file=sys.stderr) return 1 print(f"ok: {payload.get('service')} {payload.get('version')} ({payload.get('status')})") return 0 def cmd_hub_list(args: argparse.Namespace) -> int: try: status, payload = hub_client.hub_list(_service_url(args)) except ValueError as exc: print(f"error: {exc}", file=sys.stderr) return 1 if status != 200: print(f"error: hub returned {status}: {payload}", file=sys.stderr) return 1 for repo in payload.get("repos", []): enabled = "enabled" if repo.get("enabled") else "disabled" print(f"{repo['repo']}\t{enabled}\t{repo.get('url', '')}") print(f"\n{payload.get('count', 0)} registration(s)") return 0 def cmd_hub_show(args: argparse.Namespace) -> int: try: status, payload = hub_client.hub_show(args.repo, _service_url(args)) except ValueError as exc: print(f"error: {exc}", file=sys.stderr) return 1 if status != 200: print(f"error: hub returned {status}: {payload}", file=sys.stderr) return 1 print(yaml.safe_dump(payload, sort_keys=False)) return 0 def cmd_hub_register(args: argparse.Namespace) -> int: body: dict[str, Any] = { "repo": args.repo, "url": args.url, "domain": args.domain, "enabled": args.enabled, "required": args.required, } if args.description: body["description"] = args.description try: status, payload = hub_client.hub_register(body, _service_url(args)) except ValueError as exc: print(f"error: {exc}", file=sys.stderr) return 1 if status != 201: print(f"error: hub returned {status}: {payload}", file=sys.stderr) return 1 print(f"ok: registered {args.repo}") return 0 def cmd_hub_update(args: argparse.Namespace) -> int: body: dict[str, Any] = {} if args.url is not None: body["url"] = args.url if args.enabled is not None: body["enabled"] = args.enabled if args.required is not None: body["required"] = args.required if args.domain is not None: body["domain"] = args.domain if args.description is not None: body["description"] = args.description if not body: print("error: no fields to update", file=sys.stderr) return 1 try: status, payload = hub_client.hub_update(args.repo, body, _service_url(args)) except ValueError as exc: print(f"error: {exc}", file=sys.stderr) return 1 if status != 200: print(f"error: hub returned {status}: {payload}", file=sys.stderr) return 1 print(f"ok: updated {args.repo}") return 0 def cmd_hub_sync(args: argparse.Namespace) -> int: try: status, payload = hub_client.hub_list(_service_url(args)) except ValueError as exc: print(f"error: {exc}", file=sys.stderr) return 1 if status != 200: print(f"error: hub returned {status}: {payload}", file=sys.stderr) return 1 output = Path(args.output) if args.output else DEFAULT_SOURCES_PATH existing = load_sources_manifest(output) if args.merge else None manifest = build_manifest(payload, existing, merge=args.merge) if args.dry_run: print(yaml.safe_dump(manifest, sort_keys=False)) return 0 written = write_sources_manifest(manifest, output) print( f"ok: wrote {written.relative_to(ROOT)} " f"({len(manifest['sources'])} source(s))" ) return 0 def cmd_stats(args: argparse.Namespace) -> int: if args.roster: roster_path = Path(args.roster).resolve() stats = collect_roster_stats( roster_path, federation_ready=args.federation_ready, ) if args.format == "json": print(format_roster_stats_json(stats)) else: print(format_roster_stats_markdown(stats), end="") return 0 repo_root = Path(args.path or ".").resolve() stats = collect_stats( repo_root, federation_ready=args.federation_ready, raw_url=args.raw_url, hub_url=getattr(args, "hub_url", None), ) if args.format == "json": print(format_stats_json(stats)) else: print(format_stats_markdown(stats), end="") return 0 def cmd_establish(args: argparse.Namespace) -> int: repo_root = Path(args.path or ".").resolve() try: if args.scaffold: created = scaffold_registry( repo_root, domain=args.domain, force=args.force ) for path in created: print(f"ok: wrote {path.relative_to(repo_root)}") print(scaffold_next_steps(repo_root)) return 0 if args.publish_check: result = publish_check(repo_root, raw_url=args.raw_url) print(format_publish_check_markdown(result), end="") return 0 if result["ok"] else 1 if args.discover: result = discover_capabilities( repo_root, domain=args.domain, dry_run=not args.apply, apply=args.apply, llm_url=args.llm_url, context_max_files=args.context_max_files, ) if result.get("dry_run"): print(yaml.safe_dump(result["draft"], sort_keys=False)) return 0 for path in result.get("written", []): print(f"ok: wrote {path}") validate_args = argparse.Namespace( path=None, root=str(repo_root), relations=False, fail_on_warnings=True, ) return cmd_validate(validate_args) except ValueError as exc: print(f"error: {exc}", file=sys.stderr) return 1 print("error: specify --scaffold, --publish-check, or --discover", file=sys.stderr) return 1 def cmd_update(args: argparse.Namespace) -> int: repo_root = Path(args.path or ".").resolve() try: capability_id = None if args.all else args.capability if not args.all and not args.capability: print("error: specify --capability or --all", file=sys.stderr) return 1 if args.suggest_maturity: cap_ids = [args.capability] if args.capability else [] if args.all: index = load_index_at(registry_paths(repo_root)["index"]) cap_ids = [row["id"] for row in index.get("capabilities", [])] payload = { "suggestions": [ suggest_llm_updates( repo_root, cap_id, git_since=args.from_git_since, llm_url=args.llm_url, ) for cap_id in cap_ids ] } print(json.dumps(payload, indent=2, sort_keys=True)) return 0 suggestions = collect_deterministic_suggestions( repo_root, capability_id=capability_id, git_since=args.from_git_since, ) if args.apply: changed = apply_deterministic_suggestions(repo_root, suggestions) for line in changed: print(f"ok: {line}") validate_args = argparse.Namespace( path=None, root=str(repo_root), relations=False, fail_on_warnings=True, ) return cmd_validate(validate_args) if args.format == "json": print(format_suggestions_json(suggestions)) else: print(format_suggestions_markdown(suggestions), end="") return 0 except ValueError as exc: print(f"error: {exc}", file=sys.stderr) return 1 def cmd_report_cohorts(args: argparse.Namespace) -> int: filters = cohort_filters_from_args(args) matches = select_cohort(filters) if args.format == "json": print(format_cohort_json(matches, filters)) else: print(format_cohort_markdown(matches, filters), end="") return 0 def cmd_report_gaps(args: argparse.Namespace) -> int: roster_path = Path(args.roster).resolve() if args.roster else default_roster_path() if not roster_path.exists(): print(f"error: roster not found: {roster_path}", file=sys.stderr) return 1 report = collect_gap_report(roster_path) if args.format == "json": print(format_gap_json(report)) else: print(format_gap_markdown(report), end="") return 0 def cmd_export(args: argparse.Namespace) -> int: index = load_index() bundle: dict[str, Any] = { "version": index.get("version", 1), "domain": index.get("domain"), "updated": index.get("updated"), "capabilities": [], } errors: list[str] = [] for item in index.get("capabilities", []): path = ROOT / item["path"] try: front_matter = parse_front_matter(path) except ValueError as exc: errors.append(str(exc)) continue bundle["capabilities"].append( { "index": item, "entry": front_matter, } ) if errors: for error in errors: print(f"error: {error}", file=sys.stderr) return 1 if args.format == "json": print(json.dumps(bundle, indent=2, sort_keys=True)) else: print(yaml.safe_dump(bundle, sort_keys=False)) print( f"# exported {len(bundle['capabilities'])} capabilities", file=sys.stderr, ) return 0 def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(prog="reuse-surface") subparsers = parser.add_subparsers(dest="command", required=True) validate = subparsers.add_parser("validate", help="validate capability entries") validate.add_argument( "path", nargs="?", help="optional capability markdown file; defaults to all entries", ) validate.add_argument( "--relations", action="store_true", help="check relation cycles and broken references", ) validate.add_argument( "--fail-on-warnings", action="store_true", help="exit non-zero when warnings are present", ) validate.add_argument( "--root", help="registry repo root (default: reuse-surface install root)", ) validate.set_defaults(func=cmd_validate) federation = subparsers.add_parser( "federation", help="federation index operations" ) federation_sub = federation.add_subparsers(dest="federation_command", required=True) compose = federation_sub.add_parser("compose", help="compose federated index") compose.add_argument( "--refresh", action="store_true", help="bypass remote index cache and refetch URL sources", ) compose.set_defaults(func=cmd_federation_compose) query = subparsers.add_parser("query", help="query capability index") query.add_argument("--discovery-min") query.add_argument("--availability-min") query.add_argument("--domain") query.add_argument("--tag") query.add_argument("--consumption-mode") query.add_argument("--keyword") query.set_defaults(func=cmd_query) export = subparsers.add_parser("export", help="export registry bundle") export.add_argument( "--format", choices=["yaml", "json"], default="yaml", ) export.set_defaults(func=cmd_export) overlaps = subparsers.add_parser( "overlaps", help="detect potential duplicate capabilities" ) overlaps.add_argument( "--threshold", type=float, default=0.28, help="token similarity threshold (0-1)", ) overlaps.set_defaults(func=cmd_overlaps) catalog = subparsers.add_parser( "catalog", help="generate human-readable capability catalog" ) catalog.set_defaults(func=cmd_catalog) graph = subparsers.add_parser("graph", help="generate relation graph") graph.add_argument( "--stdout", action="store_true", help="print Mermaid to stdout instead of writing docs/graph/", ) graph.add_argument( "--check", action="store_true", help="report depends_on cycles and broken relation references", ) graph.add_argument( "--fail-on-warnings", action="store_true", help="exit non-zero when relation warnings are present", ) graph.set_defaults(func=cmd_graph) serve = subparsers.add_parser("serve", help="run federation service API") serve.set_defaults(func=cmd_serve) hub = subparsers.add_parser("hub", help="federation service client") hub.add_argument( "--base-url", help="service base URL (or set REUSE_SURFACE_URL)", ) hub_sub = hub.add_subparsers(dest="hub_command", required=True) hub_status = hub_sub.add_parser("status", help="check hub health") hub_status.set_defaults(func=cmd_hub_status) hub_list = hub_sub.add_parser("list", help="list registered repos") hub_list.set_defaults(func=cmd_hub_list) hub_show = hub_sub.add_parser("show", help="show one registration") hub_show.add_argument("--repo", required=True) hub_show.set_defaults(func=cmd_hub_show) hub_register = hub_sub.add_parser("register", help="register a repo index URL") hub_register.add_argument("--repo", required=True) hub_register.add_argument("--url", required=True) hub_register.add_argument("--domain", default="helix_forge") hub_register.add_argument("--description") hub_register.add_argument("--enabled", action=argparse.BooleanOptionalAction, default=True) hub_register.add_argument("--required", action="store_true") hub_register.set_defaults(func=cmd_hub_register) hub_update = hub_sub.add_parser("update", help="update a repo registration") hub_update.add_argument("--repo", required=True) hub_update.add_argument("--url") hub_update.add_argument("--domain") hub_update.add_argument("--description") hub_update.add_argument("--enabled", action=argparse.BooleanOptionalAction, default=None) hub_update.add_argument("--required", action=argparse.BooleanOptionalAction, default=None) hub_update.set_defaults(func=cmd_hub_update) hub_sync = hub_sub.add_parser( "sync", help="write federation sources.yaml from hub registrations" ) hub_sync.add_argument( "--output", help=f"manifest path (default: {DEFAULT_SOURCES_PATH.relative_to(ROOT)})", ) hub_sync.add_argument( "--merge", action="store_true", help="keep local index sources not overridden by hub repo slugs", ) hub_sync.add_argument( "--dry-run", action="store_true", help="print manifest without writing", ) hub_sync.set_defaults(func=cmd_hub_sync) report = subparsers.add_parser("report", help="planning and analytics reports") report_sub = report.add_subparsers(dest="report_command", required=True) cohorts = report_sub.add_parser( "cohorts", help="export capability cohorts by maturity filters" ) cohorts.add_argument("--planning-min", help="discovery minimum (implies availability-max A1)") cohorts.add_argument("--implementation-min", help="availability minimum") cohorts.add_argument("--discovery-min") cohorts.add_argument("--availability-min") cohorts.add_argument("--availability-max") cohorts.add_argument("--domain") cohorts.add_argument( "--format", choices=["markdown", "json"], default="markdown", ) cohorts.set_defaults(func=cmd_report_cohorts) gaps = report_sub.add_parser( "gaps", help="roster publish blockers, empty scaffolds, and dedup stubs", ) gaps.add_argument( "--roster", help="workstation roster YAML (default: registry/federation/local-repo-roster.yaml)", ) gaps.add_argument( "--format", choices=["markdown", "json"], default="markdown", ) gaps.set_defaults(func=cmd_report_gaps) stats = subparsers.add_parser("stats", help="registry maturity and federation stats") stats.add_argument("--path", help="repo root (default: cwd)") stats.add_argument( "--roster", help="workstation roster YAML (e.g. registry/federation/local-repo-roster.yaml)", ) stats.add_argument("--federation-ready", action="store_true") stats.add_argument("--raw-url", help="probe federation raw index URL") stats.add_argument("--hub-url", help="hub base URL (or REUSE_SURFACE_URL)") stats.add_argument("--format", choices=["markdown", "json"], default="markdown") stats.set_defaults(func=cmd_stats) establish = subparsers.add_parser( "establish", help="bootstrap or discover capability registry" ) establish.add_argument("--path", help="target repo root (default: cwd)") establish.add_argument("--domain", default="helix_forge") establish.add_argument("--force", action="store_true") establish.add_argument("--scaffold", action="store_true") establish.add_argument("--publish-check", action="store_true") establish.add_argument("--discover", action="store_true") establish.add_argument("--dry-run", action="store_true", help="discover preview (default)") establish.add_argument("--apply", action="store_true", help="discover write + validate") establish.add_argument("--raw-url", help="raw Gitea index URL for publish-check") establish.add_argument("--llm-url", help="llm-connect base URL (or LLM_CONNECT_URL)") establish.add_argument("--context-max-files", type=int, default=12) establish.set_defaults(func=cmd_establish) update = subparsers.add_parser("update", help="refresh registry metadata from repo signals") update.add_argument("--path", help="repo root (default: cwd)") update.add_argument("--capability", help="single capability id") update.add_argument("--all", action="store_true") update.add_argument("--from-git-since", help="git ref for change detection") update.add_argument("--apply", action="store_true") update.add_argument("--suggest-maturity", action="store_true") update.add_argument("--llm-url", help="llm-connect base URL (or LLM_CONNECT_URL)") update.add_argument("--format", choices=["markdown", "json"], default="markdown") update.set_defaults(func=cmd_update) args = parser.parse_args(argv) return args.func(args) if __name__ == "__main__": raise SystemExit(main())