Files
reuse-surface/reuse_surface/establish.py
tegwick 70a5003f6e
Some checks failed
ci / validate-registry (push) Has been cancelled
Implement REUSE-WP-0013 registry establish, update, and stats
Add stats, establish (scaffold, publish-check, discover), and update CLI
commands with optional llm-connect bridge, validate --root for sibling repos,
pytest coverage, and documentation for sibling registry onboarding.
2026-06-16 01:21:01 +02:00

448 lines
15 KiB
Python

from __future__ import annotations
import json
import textwrap
import urllib.error
import urllib.request
from datetime import date
from pathlib import Path
from typing import Any
import yaml
from reuse_surface.llm_bridge import request_registry_draft
from reuse_surface.registry import load_index_at, registry_paths
SCAFFOLD_README = """# Capability Registry
Markdown-first capability index for federation and reuse planning.
## Authoring
1. Copy a capability entry template (see reuse-surface `templates/capability-entry.template.md`).
2. Add the row to `indexes/capabilities.yaml`.
3. Run `reuse-surface validate` from a checkout with the CLI installed.
4. Merge to `main` and verify publish with `reuse-surface establish --publish-check`.
Federation contract: reuse-surface `docs/RegistryFederation.md`.
"""
CONTEXT_FILES = (
"INTENT.md",
"SCOPE.md",
"AGENTS.md",
"README.md",
"pyproject.toml",
"Cargo.toml",
"go.mod",
)
def scaffold_registry(
repo_root: Path,
*,
domain: str = "helix_forge",
force: bool = False,
) -> list[Path]:
paths = registry_paths(repo_root)
created: list[Path] = []
if paths["registry"].exists() and not force:
raise ValueError(
f"registry already exists at {paths['registry']}; use --force to overwrite"
)
paths["registry"].mkdir(parents=True, exist_ok=True)
paths["capabilities"].mkdir(parents=True, exist_ok=True)
paths["index"].parent.mkdir(parents=True, exist_ok=True)
readme = paths["registry"] / "README.md"
if force or not readme.exists():
readme.write_text(SCAFFOLD_README, encoding="utf-8")
created.append(readme)
gitkeep = paths["capabilities"] / ".gitkeep"
if force or not gitkeep.exists():
gitkeep.write_text("", encoding="utf-8")
created.append(gitkeep)
index_data = {
"version": 1,
"updated": date.today().isoformat(),
"domain": domain,
"capabilities": [],
}
if force or not paths["index"].exists():
paths["index"].write_text(
yaml.safe_dump(index_data, sort_keys=False, allow_unicode=True),
encoding="utf-8",
)
created.append(paths["index"])
return created
def scaffold_next_steps(repo_root: Path) -> str:
return textwrap.dedent(
f"""
Next steps:
1. Add capability entries under {repo_root / 'registry/capabilities'}
2. Update {repo_root / 'registry/indexes/capabilities.yaml'}
3. reuse-surface validate
4. git push origin main
5. reuse-surface establish --publish-check --raw-url <gitea-raw-url>
6. reuse-surface hub register --repo <slug> --url <raw-url>
"""
).strip()
def publish_check(
repo_root: Path,
*,
raw_url: str | None = None,
) -> dict[str, Any]:
paths = registry_paths(repo_root)
result: dict[str, Any] = {
"repo_root": str(repo_root),
"checks": [],
"ok": True,
}
if paths["index"].exists():
try:
data = load_index_at(paths["index"])
valid = isinstance(data, dict) and isinstance(data.get("capabilities"), list)
result["checks"].append(
{
"name": "local_index_yaml",
"ok": valid,
"detail": f"{len(data.get('capabilities', []))} capabilities"
if valid
else "invalid structure",
}
)
if not valid:
result["ok"] = False
except (OSError, yaml.YAMLError) as exc:
result["checks"].append(
{"name": "local_index_yaml", "ok": False, "detail": str(exc)}
)
result["ok"] = False
else:
result["checks"].append(
{
"name": "local_index_yaml",
"ok": False,
"detail": "registry/indexes/capabilities.yaml missing",
}
)
result["ok"] = False
if raw_url:
probe = _probe_raw_url(raw_url)
result["checks"].append(
{
"name": "raw_url_probe",
"ok": probe["ok"],
"detail": f"HTTP {probe.get('status')} {probe.get('content_type', '')}".strip(),
"url": raw_url,
}
)
if probe["ok"]:
body_probe = _fetch_yaml_snippet(raw_url)
result["checks"].append(body_probe)
if not body_probe.get("ok"):
result["ok"] = False
else:
result["ok"] = False
result["remediation"] = (
"Merge registry/indexes/capabilities.yaml to main and confirm "
"Gitea raw URL returns 200 YAML. See docs/RegistryFederation.md."
)
return result
def _probe_raw_url(url: str) -> dict[str, Any]:
request = urllib.request.Request(
url,
method="HEAD",
headers={"User-Agent": "reuse-surface/0.1"},
)
try:
with urllib.request.urlopen(request, timeout=30) as response:
return {
"ok": response.status == 200,
"status": response.status,
"content_type": response.headers.get("Content-Type", ""),
}
except urllib.error.HTTPError as exc:
return {
"ok": False,
"status": exc.code,
"content_type": exc.headers.get("Content-Type", ""),
}
def _fetch_yaml_snippet(url: str) -> dict[str, Any]:
request = urllib.request.Request(url, headers={"User-Agent": "reuse-surface/0.1"})
try:
with urllib.request.urlopen(request, timeout=30) as response:
body = response.read().decode("utf-8")
except urllib.error.HTTPError as exc:
return {"name": "raw_url_body", "ok": False, "detail": f"HTTP {exc.code}"}
except urllib.error.URLError as exc:
return {"name": "raw_url_body", "ok": False, "detail": str(exc.reason)}
try:
data = yaml.safe_load(body)
except yaml.YAMLError as exc:
return {"name": "raw_url_body", "ok": False, "detail": str(exc)}
ok = isinstance(data, dict) and "capabilities" in data
return {
"name": "raw_url_body",
"ok": ok,
"detail": "valid capabilities.yaml shape" if ok else "body is not valid index YAML",
}
def collect_context(repo_root: Path, *, max_files: int = 12) -> str:
chunks: list[str] = []
used = 0
for name in CONTEXT_FILES:
if used >= max_files:
break
path = repo_root / name
if path.is_file():
chunks.append(f"### {name}\n{path.read_text(encoding='utf-8')[:8000]}")
used += 1
pkg_dirs = sorted(
[
item
for item in repo_root.iterdir()
if item.is_dir()
and not item.name.startswith(".")
and item.name not in {"registry", "tests", "docs", "workplans", "node_modules"}
]
)
for pkg in pkg_dirs[: max(0, max_files - used)]:
init = pkg / "__init__.py"
if init.exists():
chunks.append(f"### {pkg.name}/__init__.py\n{init.read_text(encoding='utf-8')[:2000]}")
return "\n\n".join(chunks)
def build_discover_prompt(context: str, domain: str) -> str:
schema_hint = json.dumps(
{
"domain": domain,
"capabilities": [
{
"id": "capability.domain.name",
"name": "Human Name",
"summary": "One sentence.",
"owner": "team",
"vector": "D2 / A0 / C0 / R0",
"tags": ["tag"],
"consumption_modes": ["informational"],
"discovery_intent": "What this enables.",
"discovery_includes": ["included behavior"],
"discovery_excludes": ["excluded behavior"],
}
],
},
indent=2,
)
return textwrap.dedent(
f"""
You are drafting a capability registry index for helix_forge reuse-surface.
Return ONLY a JSON object matching this shape (no markdown fences):
{schema_hint}
Rules:
- Propose 1-5 distinct capabilities grounded in the repository context.
- Use IDs matching ^capability\\.[a-z0-9]+(\\.[a-z0-9-]+)+$
- Default vector D2 / A0 / C0 / R0 unless strong delivery evidence exists.
- domain: {domain}
Repository context:
{context}
"""
).strip()
def discover_capabilities(
repo_root: Path,
*,
domain: str = "helix_forge",
dry_run: bool = True,
apply: bool = False,
llm_url: str | None = None,
context_max_files: int = 12,
) -> dict[str, Any]:
if apply and dry_run:
raise ValueError("use either --dry-run or --apply, not both")
if not apply and not dry_run:
dry_run = True
context = collect_context(repo_root, max_files=context_max_files)
if not context.strip():
raise ValueError("no context files found for discovery")
prompt = build_discover_prompt(context, domain)
draft = request_registry_draft(
prompt,
base_url=llm_url,
config={"temperature": 0.2, "max_tokens": 4000},
)
result: dict[str, Any] = {"draft": draft, "written": [], "dry_run": dry_run}
if dry_run:
return result
paths = registry_paths(repo_root)
if not paths["index"].exists():
scaffold_registry(repo_root, domain=domain, force=False)
index = load_index_at(paths["index"]) if paths["index"].exists() else {
"version": 1,
"domain": domain,
"capabilities": [],
}
existing_ids = {row["id"] for row in index.get("capabilities", [])}
for item in draft.get("capabilities", []):
cap_id = item["id"]
if cap_id in existing_ids:
continue
filename = cap_id.replace(".", "-") + ".md"
rel_path = f"registry/capabilities/{filename}"
entry_path = repo_root / rel_path
entry_body = _render_entry_from_draft(item, domain)
entry_path.parent.mkdir(parents=True, exist_ok=True)
entry_path.write_text(entry_body, encoding="utf-8")
vector = item.get("vector", "D2 / A0 / C0 / R0")
index.setdefault("capabilities", []).append(
{
"id": cap_id,
"name": item["name"],
"summary": item["summary"],
"vector": vector,
"domain": domain,
"status": "draft",
"owner": item.get("owner", repo_root.name),
"path": rel_path,
"tags": item.get("tags", []),
"consumption_modes": item.get("consumption_modes", ["informational"]),
}
)
result["written"].append(rel_path)
index["updated"] = date.today().isoformat()
index["domain"] = draft.get("domain", domain)
paths["index"].write_text(
yaml.safe_dump(index, sort_keys=False, allow_unicode=True),
encoding="utf-8",
)
result["written"].append(str(paths["index"].relative_to(repo_root)))
return result
def _render_entry_from_draft(item: dict[str, Any], domain: str) -> str:
vector = item.get("vector", "D2 / A0 / C0 / R0")
d, a, c, r = [part.strip() for part in vector.split("/")]
front_matter = {
"id": item["id"],
"name": item["name"],
"summary": item["summary"],
"owner": item.get("owner", domain),
"status": "draft",
"domain": domain,
"tags": item.get("tags") or ["draft"],
"maturity": {
"discovery": {
"current": d,
"target": "D5",
"confidence": "low",
"rationale": "Auto-drafted by reuse-surface establish --discover; review required.",
},
"availability": {
"current": a,
"target": "A3",
"confidence": "low",
"rationale": "Auto-drafted; confirm consumption modes and artifacts.",
},
},
"external_evidence": {
"completeness": {
"level": c,
"confidence": "low",
"basis": "scope_vs_intent_and_consumer_expectations",
"satisfied_expectations": [],
"broken_expectations": [],
"out_of_scope_expectations": [],
},
"reliability": {
"level": r,
"confidence": "low",
"basis": "consumer_quality_signals",
"known_reliability_risks": ["auto-drafted entry without consumer evidence"],
},
},
"discovery": {
"intent": item.get("discovery_intent", item["summary"]),
"includes": item.get("discovery_includes") or [],
"excludes": item.get("discovery_excludes") or [],
"assumptions": [],
"use_cases": [],
"research_memos": [],
},
"availability": {
"current_level": a,
"target_level": "A3",
"current_artifacts": [],
"target_artifacts": [],
"consumption_modes": item.get("consumption_modes") or ["informational"],
},
"relations": {"depends_on": [], "supports": [], "related_to": []},
"evidence": {
"documentation": [],
"tests": [],
"consumer_feedback": [],
"bug_reports": [],
"incidents": [],
},
"consumer_guidance": {
"recommended_for": ["planning reuse after human review"],
"not_recommended_for": ["implementation reuse before validation"],
"known_limitations": ["discover draft — verify maturity claims"],
},
"promotion_history": [],
}
markdown = (
f"# {item['name']}\n\n"
"Auto-drafted capability entry. Review maturity, evidence, and relations "
"before promoting.\n"
)
return (
"---\n"
+ yaml.safe_dump(front_matter, sort_keys=False, allow_unicode=True)
+ "---\n\n"
+ markdown
)
def format_publish_check_markdown(result: dict[str, Any]) -> str:
lines = ["# Federation publish check", ""]
lines.append(f"**Repo:** `{result['repo_root']}`")
lines.append(f"**Result:** {'PASS' if result['ok'] else 'FAIL'}")
lines.append("")
for check in result["checks"]:
status = "ok" if check["ok"] else "FAIL"
detail = check.get("detail", "")
name = check["name"]
lines.append(f"- **{name}**: {status}{detail}")
if check.get("url"):
lines.append(f" `{check['url']}`")
if result.get("remediation"):
lines.append("")
lines.append(f"**Remediation:** {result['remediation']}")
return "\n".join(lines) + "\n"