Files
reuse-surface/reuse_surface/cli.py
tegwick 81c55e598f
Some checks failed
ci / validate-registry (push) Has been cancelled
REUSE-WP-0015: dedup owner entries, add report gaps (T02/T03/T05)
Remove 17 owner-migrated capabilities from reuse-surface index (keep
activity-core stub). Add report gaps CLI, roster stats + gaps CI steps.
T01 remains operator-blocked on Gitea publish.
2026-06-16 02:22:17 +02:00

803 lines
27 KiB
Python

from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import Any
import yaml
from jsonschema import Draft202012Validator
from reuse_surface.catalog import write_catalog
from reuse_surface.federation import write_federated_index
from reuse_surface import hub_client
from reuse_surface.graph import check_relations, render_mermaid, write_graph
from reuse_surface.hub_sync import (
DEFAULT_SOURCES_PATH,
build_manifest,
load_sources_manifest,
write_sources_manifest,
)
from reuse_surface.overlaps import find_overlaps
from reuse_surface.reports import (
cohort_filters_from_args,
collect_gap_report,
default_roster_path,
format_cohort_json,
format_cohort_markdown,
format_gap_json,
format_gap_markdown,
select_cohort,
)
from reuse_surface.establish import (
discover_capabilities,
format_publish_check_markdown,
publish_check,
scaffold_next_steps,
scaffold_registry,
)
from reuse_surface.registry_update import (
apply_deterministic_suggestions,
collect_deterministic_suggestions,
format_suggestions_json,
format_suggestions_markdown,
suggest_llm_updates,
)
from reuse_surface.stats import (
collect_roster_stats,
collect_stats,
format_roster_stats_json,
format_roster_stats_markdown,
format_stats_json,
format_stats_markdown,
)
from reuse_surface.registry import (
ROOT,
capability_paths,
level_at_least,
load_index,
load_index_at,
load_schema,
parse_front_matter,
parse_vector,
registry_paths,
)
def _registry_root(args: argparse.Namespace) -> Path:
if getattr(args, "root", None):
return Path(args.root).resolve()
return ROOT
def _check_index_drift(
entry_paths: list[Path],
index: dict[str, Any],
repo_root: Path,
) -> list[str]:
warnings: list[str] = []
indexed_paths = {item["path"] for item in index.get("capabilities", [])}
file_paths = {str(path.relative_to(repo_root)) for path in entry_paths}
for path in sorted(file_paths - indexed_paths):
warnings.append(f"index drift: entry file not indexed: {path}")
for path in sorted(indexed_paths - file_paths):
warnings.append(f"index drift: index references missing file: {path}")
return warnings
def _capability_paths_for(repo_root: Path, target: Path | None) -> list[Path]:
if target is not None:
return [target]
cap_dir = registry_paths(repo_root)["capabilities"]
return sorted(path for path in cap_dir.glob("*.md") if path.name != ".gitkeep")
def _run_validate(
repo_root: Path,
*,
target: Path | None,
relations: bool,
) -> tuple[list[str], list[str], list[Path]]:
schema = load_schema()
validator = Draft202012Validator(schema)
paths = _capability_paths_for(repo_root, target)
errors: list[str] = []
warnings: list[str] = []
for path in paths:
try:
data = parse_front_matter(path)
except ValueError as exc:
errors.append(str(exc))
continue
for error in sorted(validator.iter_errors(data), key=lambda e: e.path):
location = ".".join(str(part) for part in error.path) or "<root>"
errors.append(f"{path}: {location}: {error.message}")
if not target:
index_path = registry_paths(repo_root)["index"]
if index_path.exists():
index = load_index_at(index_path)
warnings.extend(_check_index_drift(paths, index, repo_root))
if relations and repo_root == ROOT:
warnings.extend(check_relations())
return errors, warnings, paths
def cmd_validate(args: argparse.Namespace) -> int:
repo_root = _registry_root(args)
target = Path(args.path) if args.path else None
if target and not target.is_absolute():
target = repo_root / target
errors, warnings, paths = _run_validate(
repo_root, target=target, relations=args.relations
)
for warning in warnings:
print(f"warning: {warning}", file=sys.stderr)
for error in errors:
print(f"error: {error}", file=sys.stderr)
if errors or (args.fail_on_warnings and warnings):
return 1
print(f"ok: validated {len(paths)} capability entr{'y' if len(paths) == 1 else 'ies'}")
return 0
def _matches_query(item: dict[str, Any], args: argparse.Namespace) -> bool:
vector = parse_vector(item["vector"])
if args.discovery_min and not level_at_least(
"discovery", vector["discovery"], args.discovery_min
):
return False
if args.availability_min and not level_at_least(
"availability", vector["availability"], args.availability_min
):
return False
if args.domain and item.get("domain") != args.domain:
return False
if args.tag and args.tag not in item.get("tags", []):
return False
if args.consumption_mode:
modes = [mode.lower() for mode in item.get("consumption_modes", [])]
if args.consumption_mode.lower() not in modes:
return False
if args.keyword:
haystack = " ".join(
[
item.get("id", ""),
item.get("name", ""),
item.get("summary", ""),
" ".join(item.get("tags", [])),
]
).lower()
if args.keyword.lower() not in haystack:
return False
return True
def cmd_query(args: argparse.Namespace) -> int:
index = load_index()
matches = [
item for item in index.get("capabilities", []) if _matches_query(item, args)
]
if not matches:
print("no matches")
return 0
for item in matches:
print(
f"{item['id']} {item['vector']} {item['path']}\n"
f" {item['summary']}"
)
print(f"\n{len(matches)} match{'es' if len(matches) != 1 else ''}")
return 0
def _load_indexed_entries() -> list[tuple[dict[str, Any], dict[str, Any]]]:
index = load_index()
indexed_entries: list[tuple[dict[str, Any], dict[str, Any]]] = []
for item in index.get("capabilities", []):
path = ROOT / item["path"]
indexed_entries.append((item, parse_front_matter(path)))
return indexed_entries
def cmd_overlaps(args: argparse.Namespace) -> int:
indexed_entries = _load_indexed_entries()
candidates = find_overlaps(indexed_entries, threshold=args.threshold)
if not candidates:
print("no overlap candidates")
return 0
for candidate in candidates:
reasons = "; ".join(candidate.reasons)
print(
f"{candidate.left_id} <> {candidate.right_id} "
f"score={candidate.score:.2f} {reasons}"
)
print(f"\n{len(candidates)} candidate{'s' if len(candidates) != 1 else ''}")
return 0
def cmd_federation_compose(args: argparse.Namespace) -> int:
try:
target, warnings = write_federated_index(refresh=args.refresh)
except (FileNotFoundError, ValueError) as exc:
print(f"error: {exc}", file=sys.stderr)
return 1
for warning in warnings:
print(f"warning: {warning}", file=sys.stderr)
import yaml
data = yaml.safe_load(target.read_text(encoding="utf-8"))
count = len(data.get("capabilities", []))
print(f"ok: wrote {target.relative_to(ROOT)} ({count} capabilities)")
return 0
def cmd_graph(args: argparse.Namespace) -> int:
warnings = check_relations() if args.check else []
content = render_mermaid()
if args.stdout:
print(content, end="")
else:
path = write_graph()
from reuse_surface.catalog import GRAPH_HTML, render_graph_explorer
GRAPH_HTML.parent.mkdir(parents=True, exist_ok=True)
GRAPH_HTML.write_text(render_graph_explorer(content), encoding="utf-8")
print(f"ok: wrote {path.relative_to(ROOT)}")
print(f"ok: wrote {GRAPH_HTML.relative_to(ROOT)}")
for warning in warnings:
print(f"warning: {warning}", file=sys.stderr)
if args.fail_on_warnings and warnings:
return 1
return 0
def cmd_catalog(args: argparse.Namespace) -> int:
index = load_index()
indexed_entries = _load_indexed_entries()
paths = write_catalog(
index, indexed_entries, mermaid_source=render_mermaid()
)
for path in paths:
print(f"ok: wrote {path.relative_to(ROOT)}")
return 0
def _service_url(args: argparse.Namespace) -> str | None:
return getattr(args, "base_url", None)
def cmd_serve(args: argparse.Namespace) -> int:
from reuse_surface.hub.app import main as serve_main
serve_main()
return 0
def cmd_hub_status(args: argparse.Namespace) -> int:
try:
status, payload = hub_client.hub_status(_service_url(args))
except ValueError as exc:
print(f"error: {exc}", file=sys.stderr)
return 1
if status != 200:
print(f"error: hub returned {status}: {payload}", file=sys.stderr)
return 1
print(f"ok: {payload.get('service')} {payload.get('version')} ({payload.get('status')})")
return 0
def cmd_hub_list(args: argparse.Namespace) -> int:
try:
status, payload = hub_client.hub_list(_service_url(args))
except ValueError as exc:
print(f"error: {exc}", file=sys.stderr)
return 1
if status != 200:
print(f"error: hub returned {status}: {payload}", file=sys.stderr)
return 1
for repo in payload.get("repos", []):
enabled = "enabled" if repo.get("enabled") else "disabled"
print(f"{repo['repo']}\t{enabled}\t{repo.get('url', '')}")
print(f"\n{payload.get('count', 0)} registration(s)")
return 0
def cmd_hub_show(args: argparse.Namespace) -> int:
try:
status, payload = hub_client.hub_show(args.repo, _service_url(args))
except ValueError as exc:
print(f"error: {exc}", file=sys.stderr)
return 1
if status != 200:
print(f"error: hub returned {status}: {payload}", file=sys.stderr)
return 1
print(yaml.safe_dump(payload, sort_keys=False))
return 0
def cmd_hub_register(args: argparse.Namespace) -> int:
body: dict[str, Any] = {
"repo": args.repo,
"url": args.url,
"domain": args.domain,
"enabled": args.enabled,
"required": args.required,
}
if args.description:
body["description"] = args.description
try:
status, payload = hub_client.hub_register(body, _service_url(args))
except ValueError as exc:
print(f"error: {exc}", file=sys.stderr)
return 1
if status != 201:
print(f"error: hub returned {status}: {payload}", file=sys.stderr)
return 1
print(f"ok: registered {args.repo}")
return 0
def cmd_hub_update(args: argparse.Namespace) -> int:
body: dict[str, Any] = {}
if args.url is not None:
body["url"] = args.url
if args.enabled is not None:
body["enabled"] = args.enabled
if args.required is not None:
body["required"] = args.required
if args.domain is not None:
body["domain"] = args.domain
if args.description is not None:
body["description"] = args.description
if not body:
print("error: no fields to update", file=sys.stderr)
return 1
try:
status, payload = hub_client.hub_update(args.repo, body, _service_url(args))
except ValueError as exc:
print(f"error: {exc}", file=sys.stderr)
return 1
if status != 200:
print(f"error: hub returned {status}: {payload}", file=sys.stderr)
return 1
print(f"ok: updated {args.repo}")
return 0
def cmd_hub_sync(args: argparse.Namespace) -> int:
try:
status, payload = hub_client.hub_list(_service_url(args))
except ValueError as exc:
print(f"error: {exc}", file=sys.stderr)
return 1
if status != 200:
print(f"error: hub returned {status}: {payload}", file=sys.stderr)
return 1
output = Path(args.output) if args.output else DEFAULT_SOURCES_PATH
existing = load_sources_manifest(output) if args.merge else None
manifest = build_manifest(payload, existing, merge=args.merge)
if args.dry_run:
print(yaml.safe_dump(manifest, sort_keys=False))
return 0
written = write_sources_manifest(manifest, output)
print(
f"ok: wrote {written.relative_to(ROOT)} "
f"({len(manifest['sources'])} source(s))"
)
return 0
def cmd_stats(args: argparse.Namespace) -> int:
if args.roster:
roster_path = Path(args.roster).resolve()
stats = collect_roster_stats(
roster_path,
federation_ready=args.federation_ready,
)
if args.format == "json":
print(format_roster_stats_json(stats))
else:
print(format_roster_stats_markdown(stats), end="")
return 0
repo_root = Path(args.path or ".").resolve()
stats = collect_stats(
repo_root,
federation_ready=args.federation_ready,
raw_url=args.raw_url,
hub_url=getattr(args, "hub_url", None),
)
if args.format == "json":
print(format_stats_json(stats))
else:
print(format_stats_markdown(stats), end="")
return 0
def cmd_establish(args: argparse.Namespace) -> int:
repo_root = Path(args.path or ".").resolve()
try:
if args.scaffold:
created = scaffold_registry(
repo_root, domain=args.domain, force=args.force
)
for path in created:
print(f"ok: wrote {path.relative_to(repo_root)}")
print(scaffold_next_steps(repo_root))
return 0
if args.publish_check:
result = publish_check(repo_root, raw_url=args.raw_url)
print(format_publish_check_markdown(result), end="")
return 0 if result["ok"] else 1
if args.discover:
result = discover_capabilities(
repo_root,
domain=args.domain,
dry_run=not args.apply,
apply=args.apply,
llm_url=args.llm_url,
context_max_files=args.context_max_files,
)
if result.get("dry_run"):
print(yaml.safe_dump(result["draft"], sort_keys=False))
return 0
for path in result.get("written", []):
print(f"ok: wrote {path}")
validate_args = argparse.Namespace(
path=None,
root=str(repo_root),
relations=False,
fail_on_warnings=True,
)
return cmd_validate(validate_args)
except ValueError as exc:
print(f"error: {exc}", file=sys.stderr)
return 1
print("error: specify --scaffold, --publish-check, or --discover", file=sys.stderr)
return 1
def cmd_update(args: argparse.Namespace) -> int:
repo_root = Path(args.path or ".").resolve()
try:
capability_id = None if args.all else args.capability
if not args.all and not args.capability:
print("error: specify --capability or --all", file=sys.stderr)
return 1
if args.suggest_maturity:
cap_ids = [args.capability] if args.capability else []
if args.all:
index = load_index_at(registry_paths(repo_root)["index"])
cap_ids = [row["id"] for row in index.get("capabilities", [])]
payload = {
"suggestions": [
suggest_llm_updates(
repo_root,
cap_id,
git_since=args.from_git_since,
llm_url=args.llm_url,
)
for cap_id in cap_ids
]
}
print(json.dumps(payload, indent=2, sort_keys=True))
return 0
suggestions = collect_deterministic_suggestions(
repo_root,
capability_id=capability_id,
git_since=args.from_git_since,
)
if args.apply:
changed = apply_deterministic_suggestions(repo_root, suggestions)
for line in changed:
print(f"ok: {line}")
validate_args = argparse.Namespace(
path=None,
root=str(repo_root),
relations=False,
fail_on_warnings=True,
)
return cmd_validate(validate_args)
if args.format == "json":
print(format_suggestions_json(suggestions))
else:
print(format_suggestions_markdown(suggestions), end="")
return 0
except ValueError as exc:
print(f"error: {exc}", file=sys.stderr)
return 1
def cmd_report_cohorts(args: argparse.Namespace) -> int:
filters = cohort_filters_from_args(args)
matches = select_cohort(filters)
if args.format == "json":
print(format_cohort_json(matches, filters))
else:
print(format_cohort_markdown(matches, filters), end="")
return 0
def cmd_report_gaps(args: argparse.Namespace) -> int:
roster_path = Path(args.roster).resolve() if args.roster else default_roster_path()
if not roster_path.exists():
print(f"error: roster not found: {roster_path}", file=sys.stderr)
return 1
report = collect_gap_report(roster_path)
if args.format == "json":
print(format_gap_json(report))
else:
print(format_gap_markdown(report), end="")
return 0
def cmd_export(args: argparse.Namespace) -> int:
index = load_index()
bundle: dict[str, Any] = {
"version": index.get("version", 1),
"domain": index.get("domain"),
"updated": index.get("updated"),
"capabilities": [],
}
errors: list[str] = []
for item in index.get("capabilities", []):
path = ROOT / item["path"]
try:
front_matter = parse_front_matter(path)
except ValueError as exc:
errors.append(str(exc))
continue
bundle["capabilities"].append(
{
"index": item,
"entry": front_matter,
}
)
if errors:
for error in errors:
print(f"error: {error}", file=sys.stderr)
return 1
if args.format == "json":
print(json.dumps(bundle, indent=2, sort_keys=True))
else:
print(yaml.safe_dump(bundle, sort_keys=False))
print(
f"# exported {len(bundle['capabilities'])} capabilities",
file=sys.stderr,
)
return 0
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(prog="reuse-surface")
subparsers = parser.add_subparsers(dest="command", required=True)
validate = subparsers.add_parser("validate", help="validate capability entries")
validate.add_argument(
"path",
nargs="?",
help="optional capability markdown file; defaults to all entries",
)
validate.add_argument(
"--relations",
action="store_true",
help="check relation cycles and broken references",
)
validate.add_argument(
"--fail-on-warnings",
action="store_true",
help="exit non-zero when warnings are present",
)
validate.add_argument(
"--root",
help="registry repo root (default: reuse-surface install root)",
)
validate.set_defaults(func=cmd_validate)
federation = subparsers.add_parser(
"federation", help="federation index operations"
)
federation_sub = federation.add_subparsers(dest="federation_command", required=True)
compose = federation_sub.add_parser("compose", help="compose federated index")
compose.add_argument(
"--refresh",
action="store_true",
help="bypass remote index cache and refetch URL sources",
)
compose.set_defaults(func=cmd_federation_compose)
query = subparsers.add_parser("query", help="query capability index")
query.add_argument("--discovery-min")
query.add_argument("--availability-min")
query.add_argument("--domain")
query.add_argument("--tag")
query.add_argument("--consumption-mode")
query.add_argument("--keyword")
query.set_defaults(func=cmd_query)
export = subparsers.add_parser("export", help="export registry bundle")
export.add_argument(
"--format",
choices=["yaml", "json"],
default="yaml",
)
export.set_defaults(func=cmd_export)
overlaps = subparsers.add_parser(
"overlaps", help="detect potential duplicate capabilities"
)
overlaps.add_argument(
"--threshold",
type=float,
default=0.28,
help="token similarity threshold (0-1)",
)
overlaps.set_defaults(func=cmd_overlaps)
catalog = subparsers.add_parser(
"catalog", help="generate human-readable capability catalog"
)
catalog.set_defaults(func=cmd_catalog)
graph = subparsers.add_parser("graph", help="generate relation graph")
graph.add_argument(
"--stdout",
action="store_true",
help="print Mermaid to stdout instead of writing docs/graph/",
)
graph.add_argument(
"--check",
action="store_true",
help="report depends_on cycles and broken relation references",
)
graph.add_argument(
"--fail-on-warnings",
action="store_true",
help="exit non-zero when relation warnings are present",
)
graph.set_defaults(func=cmd_graph)
serve = subparsers.add_parser("serve", help="run federation service API")
serve.set_defaults(func=cmd_serve)
hub = subparsers.add_parser("hub", help="federation service client")
hub.add_argument(
"--base-url",
help="service base URL (or set REUSE_SURFACE_URL)",
)
hub_sub = hub.add_subparsers(dest="hub_command", required=True)
hub_status = hub_sub.add_parser("status", help="check hub health")
hub_status.set_defaults(func=cmd_hub_status)
hub_list = hub_sub.add_parser("list", help="list registered repos")
hub_list.set_defaults(func=cmd_hub_list)
hub_show = hub_sub.add_parser("show", help="show one registration")
hub_show.add_argument("--repo", required=True)
hub_show.set_defaults(func=cmd_hub_show)
hub_register = hub_sub.add_parser("register", help="register a repo index URL")
hub_register.add_argument("--repo", required=True)
hub_register.add_argument("--url", required=True)
hub_register.add_argument("--domain", default="helix_forge")
hub_register.add_argument("--description")
hub_register.add_argument("--enabled", action=argparse.BooleanOptionalAction, default=True)
hub_register.add_argument("--required", action="store_true")
hub_register.set_defaults(func=cmd_hub_register)
hub_update = hub_sub.add_parser("update", help="update a repo registration")
hub_update.add_argument("--repo", required=True)
hub_update.add_argument("--url")
hub_update.add_argument("--domain")
hub_update.add_argument("--description")
hub_update.add_argument("--enabled", action=argparse.BooleanOptionalAction, default=None)
hub_update.add_argument("--required", action=argparse.BooleanOptionalAction, default=None)
hub_update.set_defaults(func=cmd_hub_update)
hub_sync = hub_sub.add_parser(
"sync", help="write federation sources.yaml from hub registrations"
)
hub_sync.add_argument(
"--output",
help=f"manifest path (default: {DEFAULT_SOURCES_PATH.relative_to(ROOT)})",
)
hub_sync.add_argument(
"--merge",
action="store_true",
help="keep local index sources not overridden by hub repo slugs",
)
hub_sync.add_argument(
"--dry-run",
action="store_true",
help="print manifest without writing",
)
hub_sync.set_defaults(func=cmd_hub_sync)
report = subparsers.add_parser("report", help="planning and analytics reports")
report_sub = report.add_subparsers(dest="report_command", required=True)
cohorts = report_sub.add_parser(
"cohorts", help="export capability cohorts by maturity filters"
)
cohorts.add_argument("--planning-min", help="discovery minimum (implies availability-max A1)")
cohorts.add_argument("--implementation-min", help="availability minimum")
cohorts.add_argument("--discovery-min")
cohorts.add_argument("--availability-min")
cohorts.add_argument("--availability-max")
cohorts.add_argument("--domain")
cohorts.add_argument(
"--format",
choices=["markdown", "json"],
default="markdown",
)
cohorts.set_defaults(func=cmd_report_cohorts)
gaps = report_sub.add_parser(
"gaps",
help="roster publish blockers, empty scaffolds, and dedup stubs",
)
gaps.add_argument(
"--roster",
help="workstation roster YAML (default: registry/federation/local-repo-roster.yaml)",
)
gaps.add_argument(
"--format",
choices=["markdown", "json"],
default="markdown",
)
gaps.set_defaults(func=cmd_report_gaps)
stats = subparsers.add_parser("stats", help="registry maturity and federation stats")
stats.add_argument("--path", help="repo root (default: cwd)")
stats.add_argument(
"--roster",
help="workstation roster YAML (e.g. registry/federation/local-repo-roster.yaml)",
)
stats.add_argument("--federation-ready", action="store_true")
stats.add_argument("--raw-url", help="probe federation raw index URL")
stats.add_argument("--hub-url", help="hub base URL (or REUSE_SURFACE_URL)")
stats.add_argument("--format", choices=["markdown", "json"], default="markdown")
stats.set_defaults(func=cmd_stats)
establish = subparsers.add_parser(
"establish", help="bootstrap or discover capability registry"
)
establish.add_argument("--path", help="target repo root (default: cwd)")
establish.add_argument("--domain", default="helix_forge")
establish.add_argument("--force", action="store_true")
establish.add_argument("--scaffold", action="store_true")
establish.add_argument("--publish-check", action="store_true")
establish.add_argument("--discover", action="store_true")
establish.add_argument("--dry-run", action="store_true", help="discover preview (default)")
establish.add_argument("--apply", action="store_true", help="discover write + validate")
establish.add_argument("--raw-url", help="raw Gitea index URL for publish-check")
establish.add_argument("--llm-url", help="llm-connect base URL (or LLM_CONNECT_URL)")
establish.add_argument("--context-max-files", type=int, default=12)
establish.set_defaults(func=cmd_establish)
update = subparsers.add_parser("update", help="refresh registry metadata from repo signals")
update.add_argument("--path", help="repo root (default: cwd)")
update.add_argument("--capability", help="single capability id")
update.add_argument("--all", action="store_true")
update.add_argument("--from-git-since", help="git ref for change detection")
update.add_argument("--apply", action="store_true")
update.add_argument("--suggest-maturity", action="store_true")
update.add_argument("--llm-url", help="llm-connect base URL (or LLM_CONNECT_URL)")
update.add_argument("--format", choices=["markdown", "json"], default="markdown")
update.set_defaults(func=cmd_update)
args = parser.parse_args(argv)
return args.func(args)
if __name__ == "__main__":
raise SystemExit(main())