Files
reuse-surface/reuse_surface/cli.py
tegwick cbcd097214
Some checks failed
ci / validate-registry (push) Has been cancelled
Align naming with coulomb.social reuse-surface conventions
Use reuse.coulomb.social, REUSE_SURFACE_URL/TOKEN env vars, reuse-surface
image and reuse-surface-env secret. Replace reuse-surface-hub entrypoint with
reuse-surface serve; CLI uses --base-url.
2026-06-15 09:02:02 +02:00

465 lines
15 KiB
Python

from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import Any
import yaml
from jsonschema import Draft202012Validator
from reuse_surface.catalog import write_catalog
from reuse_surface.federation import write_federated_index
from reuse_surface import hub_client
from reuse_surface.graph import check_relations, render_mermaid, write_graph
from reuse_surface.overlaps import find_overlaps
from reuse_surface.registry import (
ROOT,
capability_paths,
level_at_least,
load_index,
load_schema,
parse_front_matter,
parse_vector,
)
def _check_index_drift(entry_paths: list[Path], index: dict[str, Any]) -> list[str]:
warnings: list[str] = []
indexed_paths = {item["path"] for item in index.get("capabilities", [])}
file_paths = {str(path.relative_to(ROOT)) for path in entry_paths}
for path in sorted(file_paths - indexed_paths):
warnings.append(f"index drift: entry file not indexed: {path}")
for path in sorted(indexed_paths - file_paths):
warnings.append(f"index drift: index references missing file: {path}")
return warnings
def cmd_validate(args: argparse.Namespace) -> int:
schema = load_schema()
validator = Draft202012Validator(schema)
target = Path(args.path) if args.path else None
paths = capability_paths(target)
errors: list[str] = []
warnings: list[str] = []
for path in paths:
try:
data = parse_front_matter(path)
except ValueError as exc:
errors.append(str(exc))
continue
for error in sorted(validator.iter_errors(data), key=lambda e: e.path):
location = ".".join(str(part) for part in error.path) or "<root>"
errors.append(f"{path}: {location}: {error.message}")
if not target:
index = load_index()
warnings.extend(_check_index_drift(paths, index))
if args.relations:
warnings.extend(check_relations())
for warning in warnings:
print(f"warning: {warning}", file=sys.stderr)
for error in errors:
print(f"error: {error}", file=sys.stderr)
if errors or (args.fail_on_warnings and warnings):
return 1
print(f"ok: validated {len(paths)} capability entr{'y' if len(paths) == 1 else 'ies'}")
return 0
def _matches_query(item: dict[str, Any], args: argparse.Namespace) -> bool:
vector = parse_vector(item["vector"])
if args.discovery_min and not level_at_least(
"discovery", vector["discovery"], args.discovery_min
):
return False
if args.availability_min and not level_at_least(
"availability", vector["availability"], args.availability_min
):
return False
if args.domain and item.get("domain") != args.domain:
return False
if args.tag and args.tag not in item.get("tags", []):
return False
if args.consumption_mode:
modes = [mode.lower() for mode in item.get("consumption_modes", [])]
if args.consumption_mode.lower() not in modes:
return False
if args.keyword:
haystack = " ".join(
[
item.get("id", ""),
item.get("name", ""),
item.get("summary", ""),
" ".join(item.get("tags", [])),
]
).lower()
if args.keyword.lower() not in haystack:
return False
return True
def cmd_query(args: argparse.Namespace) -> int:
index = load_index()
matches = [
item for item in index.get("capabilities", []) if _matches_query(item, args)
]
if not matches:
print("no matches")
return 0
for item in matches:
print(
f"{item['id']} {item['vector']} {item['path']}\n"
f" {item['summary']}"
)
print(f"\n{len(matches)} match{'es' if len(matches) != 1 else ''}")
return 0
def _load_indexed_entries() -> list[tuple[dict[str, Any], dict[str, Any]]]:
index = load_index()
indexed_entries: list[tuple[dict[str, Any], dict[str, Any]]] = []
for item in index.get("capabilities", []):
path = ROOT / item["path"]
indexed_entries.append((item, parse_front_matter(path)))
return indexed_entries
def cmd_overlaps(args: argparse.Namespace) -> int:
indexed_entries = _load_indexed_entries()
candidates = find_overlaps(indexed_entries, threshold=args.threshold)
if not candidates:
print("no overlap candidates")
return 0
for candidate in candidates:
reasons = "; ".join(candidate.reasons)
print(
f"{candidate.left_id} <> {candidate.right_id} "
f"score={candidate.score:.2f} {reasons}"
)
print(f"\n{len(candidates)} candidate{'s' if len(candidates) != 1 else ''}")
return 0
def cmd_federation_compose(args: argparse.Namespace) -> int:
try:
target, warnings = write_federated_index(refresh=args.refresh)
except (FileNotFoundError, ValueError) as exc:
print(f"error: {exc}", file=sys.stderr)
return 1
for warning in warnings:
print(f"warning: {warning}", file=sys.stderr)
import yaml
data = yaml.safe_load(target.read_text(encoding="utf-8"))
count = len(data.get("capabilities", []))
print(f"ok: wrote {target.relative_to(ROOT)} ({count} capabilities)")
return 0
def cmd_graph(args: argparse.Namespace) -> int:
warnings = check_relations() if args.check else []
content = render_mermaid()
if args.stdout:
print(content, end="")
else:
path = write_graph()
from reuse_surface.catalog import GRAPH_HTML, render_graph_explorer
GRAPH_HTML.parent.mkdir(parents=True, exist_ok=True)
GRAPH_HTML.write_text(render_graph_explorer(content), encoding="utf-8")
print(f"ok: wrote {path.relative_to(ROOT)}")
print(f"ok: wrote {GRAPH_HTML.relative_to(ROOT)}")
for warning in warnings:
print(f"warning: {warning}", file=sys.stderr)
if args.fail_on_warnings and warnings:
return 1
return 0
def cmd_catalog(args: argparse.Namespace) -> int:
index = load_index()
indexed_entries = _load_indexed_entries()
paths = write_catalog(
index, indexed_entries, mermaid_source=render_mermaid()
)
for path in paths:
print(f"ok: wrote {path.relative_to(ROOT)}")
return 0
def _service_url(args: argparse.Namespace) -> str | None:
return getattr(args, "base_url", None)
def cmd_serve(args: argparse.Namespace) -> int:
from reuse_surface.hub.app import main as serve_main
serve_main()
return 0
def cmd_hub_status(args: argparse.Namespace) -> int:
try:
status, payload = hub_client.hub_status(_service_url(args))
except ValueError as exc:
print(f"error: {exc}", file=sys.stderr)
return 1
if status != 200:
print(f"error: hub returned {status}: {payload}", file=sys.stderr)
return 1
print(f"ok: {payload.get('service')} {payload.get('version')} ({payload.get('status')})")
return 0
def cmd_hub_list(args: argparse.Namespace) -> int:
try:
status, payload = hub_client.hub_list(_service_url(args))
except ValueError as exc:
print(f"error: {exc}", file=sys.stderr)
return 1
if status != 200:
print(f"error: hub returned {status}: {payload}", file=sys.stderr)
return 1
for repo in payload.get("repos", []):
enabled = "enabled" if repo.get("enabled") else "disabled"
print(f"{repo['repo']}\t{enabled}\t{repo.get('url', '')}")
print(f"\n{payload.get('count', 0)} registration(s)")
return 0
def cmd_hub_show(args: argparse.Namespace) -> int:
try:
status, payload = hub_client.hub_show(args.repo, _service_url(args))
except ValueError as exc:
print(f"error: {exc}", file=sys.stderr)
return 1
if status != 200:
print(f"error: hub returned {status}: {payload}", file=sys.stderr)
return 1
print(yaml.safe_dump(payload, sort_keys=False))
return 0
def cmd_hub_register(args: argparse.Namespace) -> int:
body: dict[str, Any] = {
"repo": args.repo,
"url": args.url,
"domain": args.domain,
"enabled": args.enabled,
"required": args.required,
}
if args.description:
body["description"] = args.description
try:
status, payload = hub_client.hub_register(body, _service_url(args))
except ValueError as exc:
print(f"error: {exc}", file=sys.stderr)
return 1
if status != 201:
print(f"error: hub returned {status}: {payload}", file=sys.stderr)
return 1
print(f"ok: registered {args.repo}")
return 0
def cmd_hub_update(args: argparse.Namespace) -> int:
body: dict[str, Any] = {}
if args.url is not None:
body["url"] = args.url
if args.enabled is not None:
body["enabled"] = args.enabled
if args.required is not None:
body["required"] = args.required
if args.domain is not None:
body["domain"] = args.domain
if args.description is not None:
body["description"] = args.description
if not body:
print("error: no fields to update", file=sys.stderr)
return 1
try:
status, payload = hub_client.hub_update(args.repo, body, _service_url(args))
except ValueError as exc:
print(f"error: {exc}", file=sys.stderr)
return 1
if status != 200:
print(f"error: hub returned {status}: {payload}", file=sys.stderr)
return 1
print(f"ok: updated {args.repo}")
return 0
def cmd_export(args: argparse.Namespace) -> int:
index = load_index()
bundle: dict[str, Any] = {
"version": index.get("version", 1),
"domain": index.get("domain"),
"updated": index.get("updated"),
"capabilities": [],
}
errors: list[str] = []
for item in index.get("capabilities", []):
path = ROOT / item["path"]
try:
front_matter = parse_front_matter(path)
except ValueError as exc:
errors.append(str(exc))
continue
bundle["capabilities"].append(
{
"index": item,
"entry": front_matter,
}
)
if errors:
for error in errors:
print(f"error: {error}", file=sys.stderr)
return 1
if args.format == "json":
print(json.dumps(bundle, indent=2, sort_keys=True))
else:
print(yaml.safe_dump(bundle, sort_keys=False))
print(
f"# exported {len(bundle['capabilities'])} capabilities",
file=sys.stderr,
)
return 0
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(prog="reuse-surface")
subparsers = parser.add_subparsers(dest="command", required=True)
validate = subparsers.add_parser("validate", help="validate capability entries")
validate.add_argument(
"path",
nargs="?",
help="optional capability markdown file; defaults to all entries",
)
validate.add_argument(
"--relations",
action="store_true",
help="check relation cycles and broken references",
)
validate.add_argument(
"--fail-on-warnings",
action="store_true",
help="exit non-zero when warnings are present",
)
validate.set_defaults(func=cmd_validate)
federation = subparsers.add_parser(
"federation", help="federation index operations"
)
federation_sub = federation.add_subparsers(dest="federation_command", required=True)
compose = federation_sub.add_parser("compose", help="compose federated index")
compose.add_argument(
"--refresh",
action="store_true",
help="bypass remote index cache and refetch URL sources",
)
compose.set_defaults(func=cmd_federation_compose)
query = subparsers.add_parser("query", help="query capability index")
query.add_argument("--discovery-min")
query.add_argument("--availability-min")
query.add_argument("--domain")
query.add_argument("--tag")
query.add_argument("--consumption-mode")
query.add_argument("--keyword")
query.set_defaults(func=cmd_query)
export = subparsers.add_parser("export", help="export registry bundle")
export.add_argument(
"--format",
choices=["yaml", "json"],
default="yaml",
)
export.set_defaults(func=cmd_export)
overlaps = subparsers.add_parser(
"overlaps", help="detect potential duplicate capabilities"
)
overlaps.add_argument(
"--threshold",
type=float,
default=0.28,
help="token similarity threshold (0-1)",
)
overlaps.set_defaults(func=cmd_overlaps)
catalog = subparsers.add_parser(
"catalog", help="generate human-readable capability catalog"
)
catalog.set_defaults(func=cmd_catalog)
graph = subparsers.add_parser("graph", help="generate relation graph")
graph.add_argument(
"--stdout",
action="store_true",
help="print Mermaid to stdout instead of writing docs/graph/",
)
graph.add_argument(
"--check",
action="store_true",
help="report depends_on cycles and broken relation references",
)
graph.add_argument(
"--fail-on-warnings",
action="store_true",
help="exit non-zero when relation warnings are present",
)
graph.set_defaults(func=cmd_graph)
serve = subparsers.add_parser("serve", help="run federation service API")
serve.set_defaults(func=cmd_serve)
hub = subparsers.add_parser("hub", help="federation service client")
hub.add_argument(
"--base-url",
help="service base URL (or set REUSE_SURFACE_URL)",
)
hub_sub = hub.add_subparsers(dest="hub_command", required=True)
hub_status = hub_sub.add_parser("status", help="check hub health")
hub_status.set_defaults(func=cmd_hub_status)
hub_list = hub_sub.add_parser("list", help="list registered repos")
hub_list.set_defaults(func=cmd_hub_list)
hub_show = hub_sub.add_parser("show", help="show one registration")
hub_show.add_argument("--repo", required=True)
hub_show.set_defaults(func=cmd_hub_show)
hub_register = hub_sub.add_parser("register", help="register a repo index URL")
hub_register.add_argument("--repo", required=True)
hub_register.add_argument("--url", required=True)
hub_register.add_argument("--domain", default="helix_forge")
hub_register.add_argument("--description")
hub_register.add_argument("--enabled", action=argparse.BooleanOptionalAction, default=True)
hub_register.add_argument("--required", action="store_true")
hub_register.set_defaults(func=cmd_hub_register)
hub_update = hub_sub.add_parser("update", help="update a repo registration")
hub_update.add_argument("--repo", required=True)
hub_update.add_argument("--url")
hub_update.add_argument("--domain")
hub_update.add_argument("--description")
hub_update.add_argument("--enabled", action=argparse.BooleanOptionalAction, default=None)
hub_update.add_argument("--required", action=argparse.BooleanOptionalAction, default=None)
hub_update.set_defaults(func=cmd_hub_update)
args = parser.parse_args(argv)
return args.func(args)
if __name__ == "__main__":
raise SystemExit(main())