Files
reuse-surface/reuse_surface/cli.py
tegwick 0dbef6d1a3 Complete REUSE-WP-0003: registry CLI, docs alignment, and coverage
Align INTENT.md with delivered layout, add CapabilityRegistryConcept guide,
extend schema with promotion_history, ship reuse-surface validate/query/export
CLI, register three more helix_forge capabilities, and refresh SCOPE and gap
analysis to reflect A3 tooling and D5/A3/C4/R2 self-assessment.
2026-06-15 01:12:09 +02:00

192 lines
5.7 KiB
Python

from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import Any
import yaml
from jsonschema import Draft202012Validator
from reuse_surface.registry import (
CAPABILITIES_DIR,
INDEX_PATH,
ROOT,
capability_paths,
level_at_least,
load_index,
load_schema,
parse_front_matter,
parse_vector,
)
def _check_index_drift(entry_paths: list[Path], index: dict[str, Any]) -> list[str]:
warnings: list[str] = []
indexed_paths = {item["path"] for item in index.get("capabilities", [])}
file_paths = {str(path.relative_to(ROOT)) for path in entry_paths}
for path in sorted(file_paths - indexed_paths):
warnings.append(f"index drift: entry file not indexed: {path}")
for path in sorted(indexed_paths - file_paths):
warnings.append(f"index drift: index references missing file: {path}")
return warnings
def cmd_validate(args: argparse.Namespace) -> int:
schema = load_schema()
validator = Draft202012Validator(schema)
target = Path(args.path) if args.path else None
paths = capability_paths(target)
errors: list[str] = []
warnings: list[str] = []
for path in paths:
try:
data = parse_front_matter(path)
except ValueError as exc:
errors.append(str(exc))
continue
for error in sorted(validator.iter_errors(data), key=lambda e: e.path):
location = ".".join(str(part) for part in error.path) or "<root>"
errors.append(f"{path}: {location}: {error.message}")
if not target:
index = load_index()
warnings.extend(_check_index_drift(paths, index))
for warning in warnings:
print(f"warning: {warning}", file=sys.stderr)
for error in errors:
print(f"error: {error}", file=sys.stderr)
if errors:
return 1
print(f"ok: validated {len(paths)} capability entr{'y' if len(paths) == 1 else 'ies'}")
return 0
def _matches_query(item: dict[str, Any], args: argparse.Namespace) -> bool:
vector = parse_vector(item["vector"])
if args.discovery_min and not level_at_least(
"discovery", vector["discovery"], args.discovery_min
):
return False
if args.availability_min and not level_at_least(
"availability", vector["availability"], args.availability_min
):
return False
if args.domain and item.get("domain") != args.domain:
return False
if args.tag and args.tag not in item.get("tags", []):
return False
if args.consumption_mode:
modes = [mode.lower() for mode in item.get("consumption_modes", [])]
if args.consumption_mode.lower() not in modes:
return False
if args.keyword:
haystack = " ".join(
[
item.get("id", ""),
item.get("name", ""),
item.get("summary", ""),
" ".join(item.get("tags", [])),
]
).lower()
if args.keyword.lower() not in haystack:
return False
return True
def cmd_query(args: argparse.Namespace) -> int:
index = load_index()
matches = [
item for item in index.get("capabilities", []) if _matches_query(item, args)
]
if not matches:
print("no matches")
return 0
for item in matches:
print(
f"{item['id']} {item['vector']} {item['path']}\n"
f" {item['summary']}"
)
print(f"\n{len(matches)} match{'es' if len(matches) != 1 else ''}")
return 0
def cmd_export(args: argparse.Namespace) -> int:
index = load_index()
bundle: dict[str, Any] = {
"version": index.get("version", 1),
"domain": index.get("domain"),
"updated": index.get("updated"),
"capabilities": [],
}
errors: list[str] = []
for item in index.get("capabilities", []):
path = ROOT / item["path"]
try:
front_matter = parse_front_matter(path)
except ValueError as exc:
errors.append(str(exc))
continue
bundle["capabilities"].append(
{
"index": item,
"entry": front_matter,
}
)
if errors:
for error in errors:
print(f"error: {error}", file=sys.stderr)
return 1
if args.format == "json":
print(json.dumps(bundle, indent=2, sort_keys=True))
else:
print(yaml.safe_dump(bundle, sort_keys=False))
print(
f"# exported {len(bundle['capabilities'])} capabilities",
file=sys.stderr,
)
return 0
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(prog="reuse-surface")
subparsers = parser.add_subparsers(dest="command", required=True)
validate = subparsers.add_parser("validate", help="validate capability entries")
validate.add_argument(
"path",
nargs="?",
help="optional capability markdown file; defaults to all entries",
)
validate.set_defaults(func=cmd_validate)
query = subparsers.add_parser("query", help="query capability index")
query.add_argument("--discovery-min")
query.add_argument("--availability-min")
query.add_argument("--domain")
query.add_argument("--tag")
query.add_argument("--consumption-mode")
query.add_argument("--keyword")
query.set_defaults(func=cmd_query)
export = subparsers.add_parser("export", help="export registry bundle")
export.add_argument(
"--format",
choices=["yaml", "json"],
default="yaml",
)
export.set_defaults(func=cmd_export)
args = parser.parse_args(argv)
return args.func(args)
if __name__ == "__main__":
raise SystemExit(main())