Files
reuse-surface/reuse_surface/registry_update.py
tegwick 70a5003f6e
Some checks failed
ci / validate-registry (push) Has been cancelled
Implement REUSE-WP-0013 registry establish, update, and stats
Add stats, establish (scaffold, publish-check, discover), and update CLI
commands with optional llm-connect bridge, validate --root for sibling repos,
pytest coverage, and documentation for sibling registry onboarding.
2026-06-16 01:21:01 +02:00

273 lines
9.0 KiB
Python

from __future__ import annotations
import json
import subprocess
import textwrap
from pathlib import Path
from typing import Any
import yaml
from reuse_surface.llm_bridge import request_json_object
from reuse_surface.registry import (
entry_vector,
load_index_at,
parse_front_matter,
registry_paths,
vectors_match,
)
SAFE_EVIDENCE_PREFIXES = ("tests/", ".gitea/workflows/")
def git_changed_files(repo_root: Path, since_ref: str) -> list[str]:
result = subprocess.run(
["git", "-C", str(repo_root), "diff", "--name-only", since_ref, "HEAD"],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
raise ValueError(result.stderr.strip() or f"git diff failed for {since_ref}")
return [line.strip() for line in result.stdout.splitlines() if line.strip()]
def collect_deterministic_suggestions(
repo_root: Path,
*,
capability_id: str | None = None,
git_since: str | None = None,
) -> list[dict[str, Any]]:
paths = registry_paths(repo_root)
if not paths["index"].exists():
raise ValueError("registry index missing; run establish --scaffold first")
index = load_index_at(paths["index"])
rows = index.get("capabilities", [])
if capability_id:
rows = [row for row in rows if row["id"] == capability_id]
if not rows:
raise ValueError(f"capability not in index: {capability_id}")
changed_files = git_changed_files(repo_root, git_since) if git_since else []
suggestions: list[dict[str, Any]] = []
for row in rows:
entry_path = repo_root / row["path"]
if not entry_path.exists():
suggestions.append(
{
"capability_id": row["id"],
"kind": "missing_entry",
"detail": f"missing file {row['path']}",
}
)
continue
front_matter = parse_front_matter(entry_path)
if not vectors_match(row["vector"], front_matter):
suggestions.append(
{
"capability_id": row["id"],
"kind": "vector_drift",
"detail": "index vector differs from entry front matter",
"index_vector": row["vector"],
"entry_vector": entry_vector(front_matter),
"apply_patch": {
"field": "index.vector",
"value": entry_vector(front_matter),
},
}
)
evidence_tests = front_matter.get("evidence", {}).get("tests", [])
for changed in changed_files:
if changed.startswith("tests/") and changed not in evidence_tests:
suggestions.append(
{
"capability_id": row["id"],
"kind": "evidence_test",
"detail": f"new test file not cited: {changed}",
"apply_patch": {
"field": "evidence.tests",
"append": changed,
},
}
)
artifacts = front_matter.get("availability", {}).get("current_artifacts", [])
for changed in changed_files:
if changed.endswith(".py") and changed.startswith(
tuple(
p.name + "/"
for p in repo_root.iterdir()
if p.is_dir() and (p / "__init__.py").exists()
)
):
if changed not in artifacts:
suggestions.append(
{
"capability_id": row["id"],
"kind": "availability_artifact",
"detail": f"changed module not cited: {changed}",
"apply_patch": {
"field": "availability.current_artifacts",
"append": changed,
},
}
)
return suggestions
def apply_deterministic_suggestions(
repo_root: Path,
suggestions: list[dict[str, Any]],
) -> list[str]:
paths = registry_paths(repo_root)
index = load_index_at(paths["index"])
index_by_id = {row["id"]: row for row in index.get("capabilities", [])}
changed: list[str] = []
entry_cache: dict[str, dict[str, Any]] = {}
entry_paths: dict[str, Path] = {}
for suggestion in suggestions:
patch = suggestion.get("apply_patch")
if not patch:
continue
cap_id = suggestion["capability_id"]
if patch["field"] == "index.vector" and cap_id in index_by_id:
index_by_id[cap_id]["vector"] = patch["value"]
changed.append(f"index vector for {cap_id}")
row = index_by_id.get(cap_id)
if not row:
continue
entry_path = repo_root / row["path"]
if cap_id not in entry_cache:
entry_cache[cap_id] = parse_front_matter(entry_path)
entry_paths[cap_id] = entry_path
front_matter = entry_cache[cap_id]
if patch["field"] == "evidence.tests":
tests = front_matter.setdefault("evidence", {}).setdefault("tests", [])
if patch["append"] not in tests:
tests.append(patch["append"])
changed.append(f"{cap_id} evidence.tests += {patch['append']}")
if patch["field"] == "availability.current_artifacts":
artifacts = front_matter.setdefault("availability", {}).setdefault(
"current_artifacts", []
)
if patch["append"] not in artifacts:
artifacts.append(patch["append"])
changed.append(
f"{cap_id} availability.current_artifacts += {patch['append']}"
)
if changed:
paths["index"].write_text(
yaml.safe_dump(index, sort_keys=False, allow_unicode=True),
encoding="utf-8",
)
for cap_id, front_matter in entry_cache.items():
_write_front_matter(entry_paths[cap_id], front_matter)
return changed
def _write_front_matter(path: Path, front_matter: dict[str, Any]) -> None:
text = path.read_text(encoding="utf-8")
marker_end = text.find("\n---", 4)
body = text[marker_end + 4 :] if marker_end != -1 else "\n"
path.write_text(
"---\n"
+ yaml.safe_dump(front_matter, sort_keys=False, allow_unicode=True)
+ "---"
+ body,
encoding="utf-8",
)
def build_update_prompt(
repo_root: Path,
capability_id: str,
*,
git_since: str | None = None,
) -> str:
paths = registry_paths(repo_root)
index = load_index_at(paths["index"])
row = next((item for item in index["capabilities"] if item["id"] == capability_id), None)
if not row:
raise ValueError(f"capability not in index: {capability_id}")
entry = parse_front_matter(repo_root / row["path"])
diff = ""
if git_since:
proc = subprocess.run(
[
"git",
"-C",
str(repo_root),
"diff",
git_since,
"HEAD",
"--",
"registry/",
"reuse_surface/",
"tests/",
],
capture_output=True,
text=True,
check=False,
)
diff = proc.stdout[:12000]
return textwrap.dedent(
f"""
Suggest registry entry updates for capability `{capability_id}`.
Return ONLY JSON:
{{
"promotion_history": [
{{"date": "YYYY-MM-DD", "dimension": "availability", "from": "A3", "to": "A4", "rationale": "..."}}
],
"consumer_feedback": ["optional string notes"],
"notes": ["human review items"]
}}
Current entry YAML:
{yaml.safe_dump(entry, sort_keys=False)}
Git diff since {git_since or 'N/A'}:
{diff or '(none)'}
"""
).strip()
def suggest_llm_updates(
repo_root: Path,
capability_id: str,
*,
git_since: str | None = None,
llm_url: str | None = None,
) -> dict[str, Any]:
prompt = build_update_prompt(repo_root, capability_id, git_since=git_since)
return request_json_object(
prompt,
base_url=llm_url,
config={"temperature": 0.2, "max_tokens": 2000},
)
def format_suggestions_markdown(suggestions: list[dict[str, Any]]) -> str:
if not suggestions:
return "# Registry update suggestions\n\n_No suggestions._\n"
lines = ["# Registry update suggestions", ""]
for item in suggestions:
lines.append(f"- `{item['capability_id']}` **{item['kind']}**: {item['detail']}")
lines.append("")
lines.append(f"**{len(suggestions)}** suggestion(s). Use `--apply` to apply safe patches.")
return "\n".join(lines) + "\n"
def format_suggestions_json(suggestions: list[dict[str, Any]]) -> str:
return json.dumps({"count": len(suggestions), "suggestions": suggestions}, indent=2)