generated from coulomb/repo-seed
WP-0016 finished: interactive registry maintain with llm-connect automation
Some checks failed
ci / validate-registry (push) Has been cancelled
Some checks failed
ci / validate-registry (push) Has been cancelled
Closes the registry maintenance loop from inside each domain repo: interactive prompting for judgment calls, full automation for safe and high-confidence changes, both backed by the llm-connect HTTP bridge. - New modules: maintain.py, maintain_llm.py, patches.py, interactive.py - Schema: schemas/registry-patch.schema.json - CLI: reuse-surface maintain; establish --scaffold --hook - Sibling templates: Makefile fragment, pre-commit hook - Deterministic signal collectors extended; validate cwd auto-detect - Docs, gap priority 28, SCOPE update - Tests: test_maintain.py, test_interactive.py (59 pytest total) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -3,6 +3,7 @@ from __future__ import annotations
|
||||
import json
|
||||
import subprocess
|
||||
import textwrap
|
||||
from datetime import date
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
@@ -17,6 +18,7 @@ from reuse_surface.registry import (
|
||||
vectors_match,
|
||||
)
|
||||
|
||||
# Safe to apply without interactive review (see patches.SAFE_DETERMINISTIC_KINDS).
|
||||
SAFE_EVIDENCE_PREFIXES = ("tests/", ".gitea/workflows/")
|
||||
|
||||
|
||||
@@ -52,6 +54,8 @@ def collect_deterministic_suggestions(
|
||||
changed_files = git_changed_files(repo_root, git_since) if git_since else []
|
||||
suggestions: list[dict[str, Any]] = []
|
||||
|
||||
suggestions.extend(_collect_index_orphans(repo_root, index, changed_files))
|
||||
|
||||
for row in rows:
|
||||
entry_path = repo_root / row["path"]
|
||||
if not entry_path.exists():
|
||||
@@ -80,43 +84,167 @@ def collect_deterministic_suggestions(
|
||||
}
|
||||
)
|
||||
|
||||
evidence_tests = front_matter.get("evidence", {}).get("tests", [])
|
||||
for changed in changed_files:
|
||||
if changed.startswith("tests/") and changed not in evidence_tests:
|
||||
suggestions.extend(
|
||||
_collect_changed_file_suggestions(row["id"], front_matter, changed_files, repo_root)
|
||||
)
|
||||
|
||||
return suggestions
|
||||
|
||||
|
||||
def _collect_index_orphans(
|
||||
repo_root: Path,
|
||||
index: dict[str, Any],
|
||||
changed_files: list[str],
|
||||
) -> list[dict[str, Any]]:
|
||||
suggestions: list[dict[str, Any]] = []
|
||||
indexed_paths = {row["path"] for row in index.get("capabilities", [])}
|
||||
cap_dir = registry_paths(repo_root)["capabilities"]
|
||||
if not cap_dir.exists():
|
||||
return suggestions
|
||||
|
||||
for entry_file in sorted(cap_dir.glob("*.md")):
|
||||
if entry_file.name == ".gitkeep":
|
||||
continue
|
||||
rel = str(entry_file.relative_to(repo_root))
|
||||
if rel in indexed_paths:
|
||||
continue
|
||||
try:
|
||||
front_matter = parse_front_matter(entry_file)
|
||||
except ValueError:
|
||||
continue
|
||||
cap_id = front_matter.get("id", entry_file.stem.replace("-", "."))
|
||||
suggestions.append(
|
||||
{
|
||||
"capability_id": cap_id,
|
||||
"kind": "index_row_add",
|
||||
"detail": f"capability file not in index: {rel}",
|
||||
"apply_patch": {
|
||||
"field": "index.capabilities",
|
||||
"index_row": {
|
||||
"id": cap_id,
|
||||
"name": front_matter.get("name", cap_id),
|
||||
"summary": front_matter.get("summary", ""),
|
||||
"vector": entry_vector(front_matter),
|
||||
"domain": front_matter.get("domain", index.get("domain", "helix_forge")),
|
||||
"status": front_matter.get("status", "draft"),
|
||||
"owner": front_matter.get("owner", repo_root.name),
|
||||
"path": rel,
|
||||
"tags": front_matter.get("tags", []),
|
||||
"consumption_modes": front_matter.get("availability", {}).get(
|
||||
"consumption_modes", ["informational"]
|
||||
),
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
index_updated = index.get("updated")
|
||||
registry_touched = any(path.startswith("registry/") for path in changed_files)
|
||||
if registry_touched and index_updated != date.today().isoformat():
|
||||
first_id = index.get("capabilities", [{}])[0].get("id", "registry")
|
||||
suggestions.append(
|
||||
{
|
||||
"capability_id": first_id,
|
||||
"kind": "index_updated_stale",
|
||||
"detail": "registry/ changed; bump index updated date",
|
||||
"apply_patch": {"field": "index.updated", "value": date.today().isoformat()},
|
||||
}
|
||||
)
|
||||
return suggestions
|
||||
|
||||
|
||||
def _pyproject_script_artifacts(repo_root: Path) -> list[str]:
|
||||
pyproject = repo_root / "pyproject.toml"
|
||||
if not pyproject.exists():
|
||||
return []
|
||||
try:
|
||||
import tomllib
|
||||
|
||||
data = tomllib.loads(pyproject.read_text(encoding="utf-8"))
|
||||
except (OSError, ValueError):
|
||||
return []
|
||||
scripts = data.get("project", {}).get("scripts", {})
|
||||
return [f"pyproject.toml:[project.scripts].{name}" for name in sorted(scripts)]
|
||||
|
||||
|
||||
def _collect_changed_file_suggestions(
|
||||
cap_id: str,
|
||||
front_matter: dict[str, Any],
|
||||
changed_files: list[str],
|
||||
repo_root: Path,
|
||||
) -> list[dict[str, Any]]:
|
||||
suggestions: list[dict[str, Any]] = []
|
||||
evidence = front_matter.setdefault("evidence", {})
|
||||
evidence_tests = evidence.get("tests", [])
|
||||
evidence_docs = evidence.get("documentation", [])
|
||||
|
||||
pkg_prefixes = tuple(
|
||||
p.name + "/"
|
||||
for p in repo_root.iterdir()
|
||||
if p.is_dir() and (p / "__init__.py").exists()
|
||||
)
|
||||
|
||||
for changed in changed_files:
|
||||
if changed.startswith("tests/") and changed not in evidence_tests:
|
||||
suggestions.append(
|
||||
{
|
||||
"capability_id": cap_id,
|
||||
"kind": "evidence_test",
|
||||
"detail": f"new test file not cited: {changed}",
|
||||
"apply_patch": {"field": "evidence.tests", "append": changed},
|
||||
}
|
||||
)
|
||||
if changed.startswith(".gitea/workflows/") and changed.endswith((".yml", ".yaml")):
|
||||
field = "evidence.tests" if "test" in changed.lower() else "evidence.documentation"
|
||||
existing = evidence_tests if field == "evidence.tests" else evidence_docs
|
||||
if changed not in existing:
|
||||
suggestions.append(
|
||||
{
|
||||
"capability_id": row["id"],
|
||||
"kind": "evidence_test",
|
||||
"detail": f"new test file not cited: {changed}",
|
||||
"capability_id": cap_id,
|
||||
"kind": "evidence_workflow",
|
||||
"detail": f"workflow changed not cited: {changed}",
|
||||
"apply_patch": {"field": field, "append": changed},
|
||||
}
|
||||
)
|
||||
if changed.startswith("docs/") and changed not in evidence_docs:
|
||||
suggestions.append(
|
||||
{
|
||||
"capability_id": cap_id,
|
||||
"kind": "evidence_documentation",
|
||||
"detail": f"doc changed not cited: {changed}",
|
||||
"apply_patch": {"field": "evidence.documentation", "append": changed},
|
||||
}
|
||||
)
|
||||
|
||||
artifacts = front_matter.get("availability", {}).get("current_artifacts", [])
|
||||
for changed in changed_files:
|
||||
if changed.endswith(".py") and changed.startswith(pkg_prefixes):
|
||||
if changed not in artifacts:
|
||||
suggestions.append(
|
||||
{
|
||||
"capability_id": cap_id,
|
||||
"kind": "availability_artifact",
|
||||
"detail": f"changed module not cited: {changed}",
|
||||
"apply_patch": {
|
||||
"field": "evidence.tests",
|
||||
"field": "availability.current_artifacts",
|
||||
"append": changed,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
artifacts = front_matter.get("availability", {}).get("current_artifacts", [])
|
||||
for changed in changed_files:
|
||||
if changed.endswith(".py") and changed.startswith(
|
||||
tuple(
|
||||
p.name + "/"
|
||||
for p in repo_root.iterdir()
|
||||
if p.is_dir() and (p / "__init__.py").exists()
|
||||
)
|
||||
):
|
||||
if changed not in artifacts:
|
||||
if changed == "pyproject.toml":
|
||||
for script_ref in _pyproject_script_artifacts(repo_root):
|
||||
if script_ref not in artifacts:
|
||||
suggestions.append(
|
||||
{
|
||||
"capability_id": row["id"],
|
||||
"capability_id": cap_id,
|
||||
"kind": "availability_artifact",
|
||||
"detail": f"changed module not cited: {changed}",
|
||||
"detail": f"CLI script not cited: {script_ref}",
|
||||
"apply_patch": {
|
||||
"field": "availability.current_artifacts",
|
||||
"append": changed,
|
||||
"append": script_ref,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
return suggestions
|
||||
|
||||
|
||||
@@ -150,11 +278,12 @@ def apply_deterministic_suggestions(
|
||||
entry_paths[cap_id] = entry_path
|
||||
|
||||
front_matter = entry_cache[cap_id]
|
||||
if patch["field"] == "evidence.tests":
|
||||
tests = front_matter.setdefault("evidence", {}).setdefault("tests", [])
|
||||
if patch["append"] not in tests:
|
||||
tests.append(patch["append"])
|
||||
changed.append(f"{cap_id} evidence.tests += {patch['append']}")
|
||||
if patch["field"] in {"evidence.tests", "evidence.documentation"}:
|
||||
bucket = patch["field"].split(".")[1]
|
||||
items = front_matter.setdefault("evidence", {}).setdefault(bucket, [])
|
||||
if patch["append"] not in items:
|
||||
items.append(patch["append"])
|
||||
changed.append(f"{cap_id} {patch['field']} += {patch['append']}")
|
||||
if patch["field"] == "availability.current_artifacts":
|
||||
artifacts = front_matter.setdefault("availability", {}).setdefault(
|
||||
"current_artifacts", []
|
||||
@@ -165,7 +294,22 @@ def apply_deterministic_suggestions(
|
||||
f"{cap_id} availability.current_artifacts += {patch['append']}"
|
||||
)
|
||||
|
||||
for suggestion in suggestions:
|
||||
patch = suggestion.get("apply_patch")
|
||||
if not patch:
|
||||
continue
|
||||
if suggestion.get("kind") == "index_row_add":
|
||||
cap_id = suggestion["capability_id"]
|
||||
row = patch.get("index_row")
|
||||
if row and cap_id not in index_by_id:
|
||||
index.setdefault("capabilities", []).append(row)
|
||||
changed.append(f"index row added for {cap_id}")
|
||||
if suggestion.get("kind") == "index_updated_stale":
|
||||
index["updated"] = patch.get("value", date.today().isoformat())
|
||||
changed.append("index.updated bumped")
|
||||
|
||||
if changed:
|
||||
index["updated"] = date.today().isoformat()
|
||||
paths["index"].write_text(
|
||||
yaml.safe_dump(index, sort_keys=False, allow_unicode=True),
|
||||
encoding="utf-8",
|
||||
|
||||
Reference in New Issue
Block a user