from __future__ import annotations import argparse import json import sys from pathlib import Path from typing import Any import yaml from jsonschema import Draft202012Validator from reuse_surface.catalog import write_catalog from reuse_surface.overlaps import find_overlaps from reuse_surface.registry import ( ROOT, capability_paths, level_at_least, load_index, load_schema, parse_front_matter, parse_vector, ) def _check_index_drift(entry_paths: list[Path], index: dict[str, Any]) -> list[str]: warnings: list[str] = [] indexed_paths = {item["path"] for item in index.get("capabilities", [])} file_paths = {str(path.relative_to(ROOT)) for path in entry_paths} for path in sorted(file_paths - indexed_paths): warnings.append(f"index drift: entry file not indexed: {path}") for path in sorted(indexed_paths - file_paths): warnings.append(f"index drift: index references missing file: {path}") return warnings def cmd_validate(args: argparse.Namespace) -> int: schema = load_schema() validator = Draft202012Validator(schema) target = Path(args.path) if args.path else None paths = capability_paths(target) errors: list[str] = [] warnings: list[str] = [] for path in paths: try: data = parse_front_matter(path) except ValueError as exc: errors.append(str(exc)) continue for error in sorted(validator.iter_errors(data), key=lambda e: e.path): location = ".".join(str(part) for part in error.path) or "" errors.append(f"{path}: {location}: {error.message}") if not target: index = load_index() warnings.extend(_check_index_drift(paths, index)) for warning in warnings: print(f"warning: {warning}", file=sys.stderr) for error in errors: print(f"error: {error}", file=sys.stderr) if errors: return 1 print(f"ok: validated {len(paths)} capability entr{'y' if len(paths) == 1 else 'ies'}") return 0 def _matches_query(item: dict[str, Any], args: argparse.Namespace) -> bool: vector = parse_vector(item["vector"]) if args.discovery_min and not level_at_least( "discovery", vector["discovery"], args.discovery_min ): return False if args.availability_min and not level_at_least( "availability", vector["availability"], args.availability_min ): return False if args.domain and item.get("domain") != args.domain: return False if args.tag and args.tag not in item.get("tags", []): return False if args.consumption_mode: modes = [mode.lower() for mode in item.get("consumption_modes", [])] if args.consumption_mode.lower() not in modes: return False if args.keyword: haystack = " ".join( [ item.get("id", ""), item.get("name", ""), item.get("summary", ""), " ".join(item.get("tags", [])), ] ).lower() if args.keyword.lower() not in haystack: return False return True def cmd_query(args: argparse.Namespace) -> int: index = load_index() matches = [ item for item in index.get("capabilities", []) if _matches_query(item, args) ] if not matches: print("no matches") return 0 for item in matches: print( f"{item['id']} {item['vector']} {item['path']}\n" f" {item['summary']}" ) print(f"\n{len(matches)} match{'es' if len(matches) != 1 else ''}") return 0 def _load_indexed_entries() -> list[tuple[dict[str, Any], dict[str, Any]]]: index = load_index() indexed_entries: list[tuple[dict[str, Any], dict[str, Any]]] = [] for item in index.get("capabilities", []): path = ROOT / item["path"] indexed_entries.append((item, parse_front_matter(path))) return indexed_entries def cmd_overlaps(args: argparse.Namespace) -> int: indexed_entries = _load_indexed_entries() candidates = find_overlaps(indexed_entries, threshold=args.threshold) if not candidates: print("no overlap candidates") return 0 for candidate in candidates: reasons = "; ".join(candidate.reasons) print( f"{candidate.left_id} <> {candidate.right_id} " f"score={candidate.score:.2f} {reasons}" ) print(f"\n{len(candidates)} candidate{'s' if len(candidates) != 1 else ''}") return 0 def cmd_catalog(args: argparse.Namespace) -> int: index = load_index() indexed_entries = _load_indexed_entries() md_path, html_path = write_catalog(index, indexed_entries) print(f"ok: wrote {md_path.relative_to(ROOT)}") print(f"ok: wrote {html_path.relative_to(ROOT)}") return 0 def cmd_export(args: argparse.Namespace) -> int: index = load_index() bundle: dict[str, Any] = { "version": index.get("version", 1), "domain": index.get("domain"), "updated": index.get("updated"), "capabilities": [], } errors: list[str] = [] for item in index.get("capabilities", []): path = ROOT / item["path"] try: front_matter = parse_front_matter(path) except ValueError as exc: errors.append(str(exc)) continue bundle["capabilities"].append( { "index": item, "entry": front_matter, } ) if errors: for error in errors: print(f"error: {error}", file=sys.stderr) return 1 if args.format == "json": print(json.dumps(bundle, indent=2, sort_keys=True)) else: print(yaml.safe_dump(bundle, sort_keys=False)) print( f"# exported {len(bundle['capabilities'])} capabilities", file=sys.stderr, ) return 0 def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(prog="reuse-surface") subparsers = parser.add_subparsers(dest="command", required=True) validate = subparsers.add_parser("validate", help="validate capability entries") validate.add_argument( "path", nargs="?", help="optional capability markdown file; defaults to all entries", ) validate.set_defaults(func=cmd_validate) query = subparsers.add_parser("query", help="query capability index") query.add_argument("--discovery-min") query.add_argument("--availability-min") query.add_argument("--domain") query.add_argument("--tag") query.add_argument("--consumption-mode") query.add_argument("--keyword") query.set_defaults(func=cmd_query) export = subparsers.add_parser("export", help="export registry bundle") export.add_argument( "--format", choices=["yaml", "json"], default="yaml", ) export.set_defaults(func=cmd_export) overlaps = subparsers.add_parser( "overlaps", help="detect potential duplicate capabilities" ) overlaps.add_argument( "--threshold", type=float, default=0.28, help="token similarity threshold (0-1)", ) overlaps.set_defaults(func=cmd_overlaps) catalog = subparsers.add_parser( "catalog", help="generate human-readable capability catalog" ) catalog.set_defaults(func=cmd_catalog) args = parser.parse_args(argv) return args.func(args) if __name__ == "__main__": raise SystemExit(main())