generated from coulomb/repo-seed
Some checks failed
ci / validate-registry (push) Has been cancelled
Add stats, establish (scaffold, publish-check, discover), and update CLI commands with optional llm-connect bridge, validate --root for sibling repos, pytest coverage, and documentation for sibling registry onboarding.
448 lines
15 KiB
Python
448 lines
15 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import textwrap
|
|
import urllib.error
|
|
import urllib.request
|
|
from datetime import date
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yaml
|
|
|
|
from reuse_surface.llm_bridge import request_registry_draft
|
|
from reuse_surface.registry import load_index_at, registry_paths
|
|
|
|
SCAFFOLD_README = """# Capability Registry
|
|
|
|
Markdown-first capability index for federation and reuse planning.
|
|
|
|
## Authoring
|
|
|
|
1. Copy a capability entry template (see reuse-surface `templates/capability-entry.template.md`).
|
|
2. Add the row to `indexes/capabilities.yaml`.
|
|
3. Run `reuse-surface validate` from a checkout with the CLI installed.
|
|
4. Merge to `main` and verify publish with `reuse-surface establish --publish-check`.
|
|
|
|
Federation contract: reuse-surface `docs/RegistryFederation.md`.
|
|
"""
|
|
|
|
CONTEXT_FILES = (
|
|
"INTENT.md",
|
|
"SCOPE.md",
|
|
"AGENTS.md",
|
|
"README.md",
|
|
"pyproject.toml",
|
|
"Cargo.toml",
|
|
"go.mod",
|
|
)
|
|
|
|
|
|
def scaffold_registry(
|
|
repo_root: Path,
|
|
*,
|
|
domain: str = "helix_forge",
|
|
force: bool = False,
|
|
) -> list[Path]:
|
|
paths = registry_paths(repo_root)
|
|
created: list[Path] = []
|
|
if paths["registry"].exists() and not force:
|
|
raise ValueError(
|
|
f"registry already exists at {paths['registry']}; use --force to overwrite"
|
|
)
|
|
|
|
paths["registry"].mkdir(parents=True, exist_ok=True)
|
|
paths["capabilities"].mkdir(parents=True, exist_ok=True)
|
|
paths["index"].parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
readme = paths["registry"] / "README.md"
|
|
if force or not readme.exists():
|
|
readme.write_text(SCAFFOLD_README, encoding="utf-8")
|
|
created.append(readme)
|
|
|
|
gitkeep = paths["capabilities"] / ".gitkeep"
|
|
if force or not gitkeep.exists():
|
|
gitkeep.write_text("", encoding="utf-8")
|
|
created.append(gitkeep)
|
|
|
|
index_data = {
|
|
"version": 1,
|
|
"updated": date.today().isoformat(),
|
|
"domain": domain,
|
|
"capabilities": [],
|
|
}
|
|
if force or not paths["index"].exists():
|
|
paths["index"].write_text(
|
|
yaml.safe_dump(index_data, sort_keys=False, allow_unicode=True),
|
|
encoding="utf-8",
|
|
)
|
|
created.append(paths["index"])
|
|
return created
|
|
|
|
|
|
def scaffold_next_steps(repo_root: Path) -> str:
|
|
return textwrap.dedent(
|
|
f"""
|
|
Next steps:
|
|
1. Add capability entries under {repo_root / 'registry/capabilities'}
|
|
2. Update {repo_root / 'registry/indexes/capabilities.yaml'}
|
|
3. reuse-surface validate
|
|
4. git push origin main
|
|
5. reuse-surface establish --publish-check --raw-url <gitea-raw-url>
|
|
6. reuse-surface hub register --repo <slug> --url <raw-url>
|
|
"""
|
|
).strip()
|
|
|
|
|
|
def publish_check(
|
|
repo_root: Path,
|
|
*,
|
|
raw_url: str | None = None,
|
|
) -> dict[str, Any]:
|
|
paths = registry_paths(repo_root)
|
|
result: dict[str, Any] = {
|
|
"repo_root": str(repo_root),
|
|
"checks": [],
|
|
"ok": True,
|
|
}
|
|
|
|
if paths["index"].exists():
|
|
try:
|
|
data = load_index_at(paths["index"])
|
|
valid = isinstance(data, dict) and isinstance(data.get("capabilities"), list)
|
|
result["checks"].append(
|
|
{
|
|
"name": "local_index_yaml",
|
|
"ok": valid,
|
|
"detail": f"{len(data.get('capabilities', []))} capabilities"
|
|
if valid
|
|
else "invalid structure",
|
|
}
|
|
)
|
|
if not valid:
|
|
result["ok"] = False
|
|
except (OSError, yaml.YAMLError) as exc:
|
|
result["checks"].append(
|
|
{"name": "local_index_yaml", "ok": False, "detail": str(exc)}
|
|
)
|
|
result["ok"] = False
|
|
else:
|
|
result["checks"].append(
|
|
{
|
|
"name": "local_index_yaml",
|
|
"ok": False,
|
|
"detail": "registry/indexes/capabilities.yaml missing",
|
|
}
|
|
)
|
|
result["ok"] = False
|
|
|
|
if raw_url:
|
|
probe = _probe_raw_url(raw_url)
|
|
result["checks"].append(
|
|
{
|
|
"name": "raw_url_probe",
|
|
"ok": probe["ok"],
|
|
"detail": f"HTTP {probe.get('status')} {probe.get('content_type', '')}".strip(),
|
|
"url": raw_url,
|
|
}
|
|
)
|
|
if probe["ok"]:
|
|
body_probe = _fetch_yaml_snippet(raw_url)
|
|
result["checks"].append(body_probe)
|
|
if not body_probe.get("ok"):
|
|
result["ok"] = False
|
|
else:
|
|
result["ok"] = False
|
|
result["remediation"] = (
|
|
"Merge registry/indexes/capabilities.yaml to main and confirm "
|
|
"Gitea raw URL returns 200 YAML. See docs/RegistryFederation.md."
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
def _probe_raw_url(url: str) -> dict[str, Any]:
|
|
request = urllib.request.Request(
|
|
url,
|
|
method="HEAD",
|
|
headers={"User-Agent": "reuse-surface/0.1"},
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=30) as response:
|
|
return {
|
|
"ok": response.status == 200,
|
|
"status": response.status,
|
|
"content_type": response.headers.get("Content-Type", ""),
|
|
}
|
|
except urllib.error.HTTPError as exc:
|
|
return {
|
|
"ok": False,
|
|
"status": exc.code,
|
|
"content_type": exc.headers.get("Content-Type", ""),
|
|
}
|
|
|
|
|
|
def _fetch_yaml_snippet(url: str) -> dict[str, Any]:
|
|
request = urllib.request.Request(url, headers={"User-Agent": "reuse-surface/0.1"})
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=30) as response:
|
|
body = response.read().decode("utf-8")
|
|
except urllib.error.HTTPError as exc:
|
|
return {"name": "raw_url_body", "ok": False, "detail": f"HTTP {exc.code}"}
|
|
except urllib.error.URLError as exc:
|
|
return {"name": "raw_url_body", "ok": False, "detail": str(exc.reason)}
|
|
try:
|
|
data = yaml.safe_load(body)
|
|
except yaml.YAMLError as exc:
|
|
return {"name": "raw_url_body", "ok": False, "detail": str(exc)}
|
|
ok = isinstance(data, dict) and "capabilities" in data
|
|
return {
|
|
"name": "raw_url_body",
|
|
"ok": ok,
|
|
"detail": "valid capabilities.yaml shape" if ok else "body is not valid index YAML",
|
|
}
|
|
|
|
|
|
def collect_context(repo_root: Path, *, max_files: int = 12) -> str:
|
|
chunks: list[str] = []
|
|
used = 0
|
|
for name in CONTEXT_FILES:
|
|
if used >= max_files:
|
|
break
|
|
path = repo_root / name
|
|
if path.is_file():
|
|
chunks.append(f"### {name}\n{path.read_text(encoding='utf-8')[:8000]}")
|
|
used += 1
|
|
pkg_dirs = sorted(
|
|
[
|
|
item
|
|
for item in repo_root.iterdir()
|
|
if item.is_dir()
|
|
and not item.name.startswith(".")
|
|
and item.name not in {"registry", "tests", "docs", "workplans", "node_modules"}
|
|
]
|
|
)
|
|
for pkg in pkg_dirs[: max(0, max_files - used)]:
|
|
init = pkg / "__init__.py"
|
|
if init.exists():
|
|
chunks.append(f"### {pkg.name}/__init__.py\n{init.read_text(encoding='utf-8')[:2000]}")
|
|
return "\n\n".join(chunks)
|
|
|
|
|
|
def build_discover_prompt(context: str, domain: str) -> str:
|
|
schema_hint = json.dumps(
|
|
{
|
|
"domain": domain,
|
|
"capabilities": [
|
|
{
|
|
"id": "capability.domain.name",
|
|
"name": "Human Name",
|
|
"summary": "One sentence.",
|
|
"owner": "team",
|
|
"vector": "D2 / A0 / C0 / R0",
|
|
"tags": ["tag"],
|
|
"consumption_modes": ["informational"],
|
|
"discovery_intent": "What this enables.",
|
|
"discovery_includes": ["included behavior"],
|
|
"discovery_excludes": ["excluded behavior"],
|
|
}
|
|
],
|
|
},
|
|
indent=2,
|
|
)
|
|
return textwrap.dedent(
|
|
f"""
|
|
You are drafting a capability registry index for helix_forge reuse-surface.
|
|
|
|
Return ONLY a JSON object matching this shape (no markdown fences):
|
|
{schema_hint}
|
|
|
|
Rules:
|
|
- Propose 1-5 distinct capabilities grounded in the repository context.
|
|
- Use IDs matching ^capability\\.[a-z0-9]+(\\.[a-z0-9-]+)+$
|
|
- Default vector D2 / A0 / C0 / R0 unless strong delivery evidence exists.
|
|
- domain: {domain}
|
|
|
|
Repository context:
|
|
{context}
|
|
"""
|
|
).strip()
|
|
|
|
|
|
def discover_capabilities(
|
|
repo_root: Path,
|
|
*,
|
|
domain: str = "helix_forge",
|
|
dry_run: bool = True,
|
|
apply: bool = False,
|
|
llm_url: str | None = None,
|
|
context_max_files: int = 12,
|
|
) -> dict[str, Any]:
|
|
if apply and dry_run:
|
|
raise ValueError("use either --dry-run or --apply, not both")
|
|
if not apply and not dry_run:
|
|
dry_run = True
|
|
|
|
context = collect_context(repo_root, max_files=context_max_files)
|
|
if not context.strip():
|
|
raise ValueError("no context files found for discovery")
|
|
|
|
prompt = build_discover_prompt(context, domain)
|
|
draft = request_registry_draft(
|
|
prompt,
|
|
base_url=llm_url,
|
|
config={"temperature": 0.2, "max_tokens": 4000},
|
|
)
|
|
|
|
result: dict[str, Any] = {"draft": draft, "written": [], "dry_run": dry_run}
|
|
if dry_run:
|
|
return result
|
|
|
|
paths = registry_paths(repo_root)
|
|
if not paths["index"].exists():
|
|
scaffold_registry(repo_root, domain=domain, force=False)
|
|
|
|
index = load_index_at(paths["index"]) if paths["index"].exists() else {
|
|
"version": 1,
|
|
"domain": domain,
|
|
"capabilities": [],
|
|
}
|
|
existing_ids = {row["id"] for row in index.get("capabilities", [])}
|
|
|
|
for item in draft.get("capabilities", []):
|
|
cap_id = item["id"]
|
|
if cap_id in existing_ids:
|
|
continue
|
|
filename = cap_id.replace(".", "-") + ".md"
|
|
rel_path = f"registry/capabilities/{filename}"
|
|
entry_path = repo_root / rel_path
|
|
entry_body = _render_entry_from_draft(item, domain)
|
|
entry_path.parent.mkdir(parents=True, exist_ok=True)
|
|
entry_path.write_text(entry_body, encoding="utf-8")
|
|
vector = item.get("vector", "D2 / A0 / C0 / R0")
|
|
index.setdefault("capabilities", []).append(
|
|
{
|
|
"id": cap_id,
|
|
"name": item["name"],
|
|
"summary": item["summary"],
|
|
"vector": vector,
|
|
"domain": domain,
|
|
"status": "draft",
|
|
"owner": item.get("owner", repo_root.name),
|
|
"path": rel_path,
|
|
"tags": item.get("tags", []),
|
|
"consumption_modes": item.get("consumption_modes", ["informational"]),
|
|
}
|
|
)
|
|
result["written"].append(rel_path)
|
|
|
|
index["updated"] = date.today().isoformat()
|
|
index["domain"] = draft.get("domain", domain)
|
|
paths["index"].write_text(
|
|
yaml.safe_dump(index, sort_keys=False, allow_unicode=True),
|
|
encoding="utf-8",
|
|
)
|
|
result["written"].append(str(paths["index"].relative_to(repo_root)))
|
|
return result
|
|
|
|
|
|
def _render_entry_from_draft(item: dict[str, Any], domain: str) -> str:
|
|
vector = item.get("vector", "D2 / A0 / C0 / R0")
|
|
d, a, c, r = [part.strip() for part in vector.split("/")]
|
|
front_matter = {
|
|
"id": item["id"],
|
|
"name": item["name"],
|
|
"summary": item["summary"],
|
|
"owner": item.get("owner", domain),
|
|
"status": "draft",
|
|
"domain": domain,
|
|
"tags": item.get("tags") or ["draft"],
|
|
"maturity": {
|
|
"discovery": {
|
|
"current": d,
|
|
"target": "D5",
|
|
"confidence": "low",
|
|
"rationale": "Auto-drafted by reuse-surface establish --discover; review required.",
|
|
},
|
|
"availability": {
|
|
"current": a,
|
|
"target": "A3",
|
|
"confidence": "low",
|
|
"rationale": "Auto-drafted; confirm consumption modes and artifacts.",
|
|
},
|
|
},
|
|
"external_evidence": {
|
|
"completeness": {
|
|
"level": c,
|
|
"confidence": "low",
|
|
"basis": "scope_vs_intent_and_consumer_expectations",
|
|
"satisfied_expectations": [],
|
|
"broken_expectations": [],
|
|
"out_of_scope_expectations": [],
|
|
},
|
|
"reliability": {
|
|
"level": r,
|
|
"confidence": "low",
|
|
"basis": "consumer_quality_signals",
|
|
"known_reliability_risks": ["auto-drafted entry without consumer evidence"],
|
|
},
|
|
},
|
|
"discovery": {
|
|
"intent": item.get("discovery_intent", item["summary"]),
|
|
"includes": item.get("discovery_includes") or [],
|
|
"excludes": item.get("discovery_excludes") or [],
|
|
"assumptions": [],
|
|
"use_cases": [],
|
|
"research_memos": [],
|
|
},
|
|
"availability": {
|
|
"current_level": a,
|
|
"target_level": "A3",
|
|
"current_artifacts": [],
|
|
"target_artifacts": [],
|
|
"consumption_modes": item.get("consumption_modes") or ["informational"],
|
|
},
|
|
"relations": {"depends_on": [], "supports": [], "related_to": []},
|
|
"evidence": {
|
|
"documentation": [],
|
|
"tests": [],
|
|
"consumer_feedback": [],
|
|
"bug_reports": [],
|
|
"incidents": [],
|
|
},
|
|
"consumer_guidance": {
|
|
"recommended_for": ["planning reuse after human review"],
|
|
"not_recommended_for": ["implementation reuse before validation"],
|
|
"known_limitations": ["discover draft — verify maturity claims"],
|
|
},
|
|
"promotion_history": [],
|
|
}
|
|
markdown = (
|
|
f"# {item['name']}\n\n"
|
|
"Auto-drafted capability entry. Review maturity, evidence, and relations "
|
|
"before promoting.\n"
|
|
)
|
|
return (
|
|
"---\n"
|
|
+ yaml.safe_dump(front_matter, sort_keys=False, allow_unicode=True)
|
|
+ "---\n\n"
|
|
+ markdown
|
|
)
|
|
|
|
|
|
def format_publish_check_markdown(result: dict[str, Any]) -> str:
|
|
lines = ["# Federation publish check", ""]
|
|
lines.append(f"**Repo:** `{result['repo_root']}`")
|
|
lines.append(f"**Result:** {'PASS' if result['ok'] else 'FAIL'}")
|
|
lines.append("")
|
|
for check in result["checks"]:
|
|
status = "ok" if check["ok"] else "FAIL"
|
|
detail = check.get("detail", "")
|
|
name = check["name"]
|
|
lines.append(f"- **{name}**: {status} — {detail}")
|
|
if check.get("url"):
|
|
lines.append(f" `{check['url']}`")
|
|
if result.get("remediation"):
|
|
lines.append("")
|
|
lines.append(f"**Remediation:** {result['remediation']}")
|
|
return "\n".join(lines) + "\n" |