from __future__ import annotations import argparse import json import sys from pathlib import Path from typing import Any import yaml from jsonschema import Draft202012Validator from reuse_surface.catalog import write_catalog from reuse_surface.federation import write_federated_index from reuse_surface.graph import check_relations, render_mermaid, write_graph from reuse_surface.overlaps import find_overlaps from reuse_surface.registry import ( ROOT, capability_paths, level_at_least, load_index, load_schema, parse_front_matter, parse_vector, ) def _check_index_drift(entry_paths: list[Path], index: dict[str, Any]) -> list[str]: warnings: list[str] = [] indexed_paths = {item["path"] for item in index.get("capabilities", [])} file_paths = {str(path.relative_to(ROOT)) for path in entry_paths} for path in sorted(file_paths - indexed_paths): warnings.append(f"index drift: entry file not indexed: {path}") for path in sorted(indexed_paths - file_paths): warnings.append(f"index drift: index references missing file: {path}") return warnings def cmd_validate(args: argparse.Namespace) -> int: schema = load_schema() validator = Draft202012Validator(schema) target = Path(args.path) if args.path else None paths = capability_paths(target) errors: list[str] = [] warnings: list[str] = [] for path in paths: try: data = parse_front_matter(path) except ValueError as exc: errors.append(str(exc)) continue for error in sorted(validator.iter_errors(data), key=lambda e: e.path): location = ".".join(str(part) for part in error.path) or "" errors.append(f"{path}: {location}: {error.message}") if not target: index = load_index() warnings.extend(_check_index_drift(paths, index)) if args.relations: warnings.extend(check_relations()) for warning in warnings: print(f"warning: {warning}", file=sys.stderr) for error in errors: print(f"error: {error}", file=sys.stderr) if errors or (args.fail_on_warnings and warnings): return 1 print(f"ok: validated {len(paths)} capability entr{'y' if len(paths) == 1 else 'ies'}") return 0 def _matches_query(item: dict[str, Any], args: argparse.Namespace) -> bool: vector = parse_vector(item["vector"]) if args.discovery_min and not level_at_least( "discovery", vector["discovery"], args.discovery_min ): return False if args.availability_min and not level_at_least( "availability", vector["availability"], args.availability_min ): return False if args.domain and item.get("domain") != args.domain: return False if args.tag and args.tag not in item.get("tags", []): return False if args.consumption_mode: modes = [mode.lower() for mode in item.get("consumption_modes", [])] if args.consumption_mode.lower() not in modes: return False if args.keyword: haystack = " ".join( [ item.get("id", ""), item.get("name", ""), item.get("summary", ""), " ".join(item.get("tags", [])), ] ).lower() if args.keyword.lower() not in haystack: return False return True def cmd_query(args: argparse.Namespace) -> int: index = load_index() matches = [ item for item in index.get("capabilities", []) if _matches_query(item, args) ] if not matches: print("no matches") return 0 for item in matches: print( f"{item['id']} {item['vector']} {item['path']}\n" f" {item['summary']}" ) print(f"\n{len(matches)} match{'es' if len(matches) != 1 else ''}") return 0 def _load_indexed_entries() -> list[tuple[dict[str, Any], dict[str, Any]]]: index = load_index() indexed_entries: list[tuple[dict[str, Any], dict[str, Any]]] = [] for item in index.get("capabilities", []): path = ROOT / item["path"] indexed_entries.append((item, parse_front_matter(path))) return indexed_entries def cmd_overlaps(args: argparse.Namespace) -> int: indexed_entries = _load_indexed_entries() candidates = find_overlaps(indexed_entries, threshold=args.threshold) if not candidates: print("no overlap candidates") return 0 for candidate in candidates: reasons = "; ".join(candidate.reasons) print( f"{candidate.left_id} <> {candidate.right_id} " f"score={candidate.score:.2f} {reasons}" ) print(f"\n{len(candidates)} candidate{'s' if len(candidates) != 1 else ''}") return 0 def cmd_federation_compose(args: argparse.Namespace) -> int: try: target, warnings = write_federated_index() except (FileNotFoundError, ValueError) as exc: print(f"error: {exc}", file=sys.stderr) return 1 for warning in warnings: print(f"warning: {warning}", file=sys.stderr) import yaml data = yaml.safe_load(target.read_text(encoding="utf-8")) count = len(data.get("capabilities", [])) print(f"ok: wrote {target.relative_to(ROOT)} ({count} capabilities)") return 0 def cmd_graph(args: argparse.Namespace) -> int: warnings = check_relations() if args.check else [] content = render_mermaid() if args.stdout: print(content, end="") else: path = write_graph() from reuse_surface.catalog import GRAPH_HTML, render_graph_explorer GRAPH_HTML.parent.mkdir(parents=True, exist_ok=True) GRAPH_HTML.write_text(render_graph_explorer(content), encoding="utf-8") print(f"ok: wrote {path.relative_to(ROOT)}") print(f"ok: wrote {GRAPH_HTML.relative_to(ROOT)}") for warning in warnings: print(f"warning: {warning}", file=sys.stderr) if args.fail_on_warnings and warnings: return 1 return 0 def cmd_catalog(args: argparse.Namespace) -> int: index = load_index() indexed_entries = _load_indexed_entries() paths = write_catalog( index, indexed_entries, mermaid_source=render_mermaid() ) for path in paths: print(f"ok: wrote {path.relative_to(ROOT)}") return 0 def cmd_export(args: argparse.Namespace) -> int: index = load_index() bundle: dict[str, Any] = { "version": index.get("version", 1), "domain": index.get("domain"), "updated": index.get("updated"), "capabilities": [], } errors: list[str] = [] for item in index.get("capabilities", []): path = ROOT / item["path"] try: front_matter = parse_front_matter(path) except ValueError as exc: errors.append(str(exc)) continue bundle["capabilities"].append( { "index": item, "entry": front_matter, } ) if errors: for error in errors: print(f"error: {error}", file=sys.stderr) return 1 if args.format == "json": print(json.dumps(bundle, indent=2, sort_keys=True)) else: print(yaml.safe_dump(bundle, sort_keys=False)) print( f"# exported {len(bundle['capabilities'])} capabilities", file=sys.stderr, ) return 0 def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(prog="reuse-surface") subparsers = parser.add_subparsers(dest="command", required=True) validate = subparsers.add_parser("validate", help="validate capability entries") validate.add_argument( "path", nargs="?", help="optional capability markdown file; defaults to all entries", ) validate.add_argument( "--relations", action="store_true", help="check relation cycles and broken references", ) validate.add_argument( "--fail-on-warnings", action="store_true", help="exit non-zero when warnings are present", ) validate.set_defaults(func=cmd_validate) federation = subparsers.add_parser( "federation", help="federation index operations" ) federation_sub = federation.add_subparsers(dest="federation_command", required=True) compose = federation_sub.add_parser("compose", help="compose federated index") compose.set_defaults(func=cmd_federation_compose) query = subparsers.add_parser("query", help="query capability index") query.add_argument("--discovery-min") query.add_argument("--availability-min") query.add_argument("--domain") query.add_argument("--tag") query.add_argument("--consumption-mode") query.add_argument("--keyword") query.set_defaults(func=cmd_query) export = subparsers.add_parser("export", help="export registry bundle") export.add_argument( "--format", choices=["yaml", "json"], default="yaml", ) export.set_defaults(func=cmd_export) overlaps = subparsers.add_parser( "overlaps", help="detect potential duplicate capabilities" ) overlaps.add_argument( "--threshold", type=float, default=0.28, help="token similarity threshold (0-1)", ) overlaps.set_defaults(func=cmd_overlaps) catalog = subparsers.add_parser( "catalog", help="generate human-readable capability catalog" ) catalog.set_defaults(func=cmd_catalog) graph = subparsers.add_parser("graph", help="generate relation graph") graph.add_argument( "--stdout", action="store_true", help="print Mermaid to stdout instead of writing docs/graph/", ) graph.add_argument( "--check", action="store_true", help="report depends_on cycles and broken relation references", ) graph.add_argument( "--fail-on-warnings", action="store_true", help="exit non-zero when relation warnings are present", ) graph.set_defaults(func=cmd_graph) args = parser.parse_args(argv) return args.func(args) if __name__ == "__main__": raise SystemExit(main())