Files
reuse-surface/reuse_surface/patches.py
tegwick b24ec507aa
Some checks failed
ci / validate-registry (push) Has been cancelled
WP-0016 finished: interactive registry maintain with llm-connect automation
Closes the registry maintenance loop from inside each domain repo:
interactive prompting for judgment calls, full automation for safe and
high-confidence changes, both backed by the llm-connect HTTP bridge.

- New modules: maintain.py, maintain_llm.py, patches.py, interactive.py
- Schema: schemas/registry-patch.schema.json
- CLI: reuse-surface maintain; establish --scaffold --hook
- Sibling templates: Makefile fragment, pre-commit hook
- Deterministic signal collectors extended; validate cwd auto-detect
- Docs, gap priority 28, SCOPE update
- Tests: test_maintain.py, test_interactive.py (59 pytest total)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-18 04:00:39 +02:00

391 lines
13 KiB
Python

from __future__ import annotations
import shutil
from datetime import date
from pathlib import Path
from typing import Any, Callable
import yaml
from reuse_surface.registry import (
LEVEL_ORDERS,
entry_vector,
load_index_at,
parse_front_matter,
registry_paths,
vectors_match,
)
SAFE_DETERMINISTIC_KINDS = frozenset(
{
"vector_sync",
"vector_drift",
"evidence_append",
"evidence_test",
"artifact_append",
"availability_artifact",
"index_updated_bump",
"index_row_add",
"evidence_workflow",
"evidence_documentation",
}
)
CONFIDENCE_ORDER = {"low": 0, "medium": 1, "high": 2}
DIMENSION_LEVEL_PREFIX = {
"discovery": "D",
"availability": "A",
"completeness": "C",
"reliability": "R",
}
def suggestion_to_patch(suggestion: dict[str, Any]) -> dict[str, Any] | None:
kind = suggestion.get("kind")
if kind == "missing_entry":
return None
patch_body = suggestion.get("apply_patch")
if not patch_body:
return None
cap_id = suggestion["capability_id"]
rationale = suggestion.get("detail", "deterministic signal")
if kind == "vector_drift":
return {
"capability_id": cap_id,
"kind": "vector_sync",
"confidence": "high",
"rationale": rationale,
"value": patch_body["value"],
}
if kind in {"evidence_test", "evidence_workflow", "evidence_documentation"}:
return {
"capability_id": cap_id,
"kind": "evidence_append",
"confidence": "high",
"rationale": rationale,
"field_path": patch_body["field"],
"append": patch_body["append"],
}
if kind == "availability_artifact":
return {
"capability_id": cap_id,
"kind": "artifact_append",
"confidence": "high",
"rationale": rationale,
"append": patch_body["append"],
}
if kind == "index_row_add":
return {
"capability_id": cap_id,
"kind": "index_row_add",
"confidence": "high",
"rationale": rationale,
"index_row": patch_body.get("index_row", {}),
}
if kind == "index_updated_stale":
return {
"capability_id": cap_id,
"kind": "index_updated_bump",
"confidence": "high",
"rationale": rationale,
"value": patch_body.get("value", date.today().isoformat()),
}
return None
def patches_from_suggestions(suggestions: list[dict[str, Any]]) -> list[dict[str, Any]]:
patches: list[dict[str, Any]] = []
for item in suggestions:
patch = suggestion_to_patch(item)
if patch:
patches.append(patch)
return patches
def is_safe_patch(patch: dict[str, Any]) -> bool:
return patch.get("kind") in SAFE_DETERMINISTIC_KINDS
def level_delta(dimension: str, from_level: str, to_level: str) -> int:
order = LEVEL_ORDERS[dimension]
return order.index(to_level) - order.index(from_level)
def evidence_gate(repo_root: Path, patch: dict[str, Any]) -> bool:
if patch.get("kind") != "maturity_promote":
return True
citations = patch.get("evidence_citations") or []
if not citations:
return False
return all((repo_root / path).exists() for path in citations)
def promotion_delta_gate(patch: dict[str, Any], max_delta: int) -> bool:
if patch.get("kind") != "maturity_promote":
return True
dimension = patch.get("dimension")
from_level = patch.get("from_level")
to_level = patch.get("to_level")
if not dimension or not from_level or not to_level:
return False
delta = level_delta(dimension, from_level, to_level)
return 0 < delta <= max_delta
def confidence_gate(patch: dict[str, Any], minimum: str) -> bool:
return CONFIDENCE_ORDER[patch.get("confidence", "low")] >= CONFIDENCE_ORDER[minimum]
def filter_auto_patches(
patches: list[dict[str, Any]],
repo_root: Path,
*,
auto_confidence: str = "high",
auto_max_delta: int = 1,
) -> list[dict[str, Any]]:
selected: list[dict[str, Any]] = []
for patch in patches:
if is_safe_patch(patch):
selected.append(patch)
continue
if not confidence_gate(patch, auto_confidence):
continue
if not evidence_gate(repo_root, patch):
continue
if not promotion_delta_gate(patch, auto_max_delta):
continue
selected.append(patch)
return selected
def _write_front_matter(path: Path, front_matter: dict[str, Any]) -> None:
text = path.read_text(encoding="utf-8")
marker_end = text.find("\n---", 4)
body = text[marker_end + 4 :] if marker_end != -1 else "\n"
path.write_text(
"---\n"
+ yaml.safe_dump(front_matter, sort_keys=False, allow_unicode=True)
+ "---"
+ body,
encoding="utf-8",
)
def _apply_maturity_promote(
front_matter: dict[str, Any],
patch: dict[str, Any],
) -> list[str]:
dimension = patch["dimension"]
to_level = patch["to_level"]
changed: list[str] = []
if dimension in {"discovery", "availability"}:
front_matter.setdefault("maturity", {}).setdefault(dimension, {})["current"] = to_level
if dimension == "availability":
front_matter.setdefault("availability", {})["current_level"] = to_level
changed.append(f"maturity.{dimension}.current -> {to_level}")
else:
key = "completeness" if dimension == "completeness" else "reliability"
front_matter.setdefault("external_evidence", {}).setdefault(key, {})["level"] = to_level
changed.append(f"external_evidence.{key}.level -> {to_level}")
entry = patch.get("promotion_history_entry")
if entry:
history = front_matter.setdefault("promotion_history", [])
history.append(entry)
changed.append("promotion_history +1")
return changed
def _apply_patch_to_state(
repo_root: Path,
patch: dict[str, Any],
index: dict[str, Any],
entry_cache: dict[str, dict[str, Any]],
entry_paths: dict[str, Path],
) -> list[str]:
cap_id = patch["capability_id"]
kind = patch["kind"]
changed: list[str] = []
index_by_id = {row["id"]: row for row in index.get("capabilities", [])}
if kind == "index_updated_bump":
index["updated"] = patch.get("value", date.today().isoformat())
return ["index.updated bumped"]
if kind == "index_row_add":
row = patch.get("index_row", {})
if cap_id not in index_by_id and row:
index.setdefault("capabilities", []).append(row)
changed.append(f"index row added for {cap_id}")
return changed
row = index_by_id.get(cap_id)
if not row:
return changed
if kind == "vector_sync":
row["vector"] = patch["value"]
changed.append(f"index vector for {cap_id}")
return changed
entry_path = repo_root / row["path"]
if cap_id not in entry_cache:
entry_cache[cap_id] = parse_front_matter(entry_path)
entry_paths[cap_id] = entry_path
front_matter = entry_cache[cap_id]
if kind == "evidence_append":
field = patch.get("field_path", "evidence.tests")
parts = field.split(".")
target = front_matter
for part in parts[:-1]:
target = target.setdefault(part, {})
items = target.setdefault(parts[-1], [])
append = patch["append"]
if append not in items:
items.append(append)
changed.append(f"{cap_id} {field} += {append}")
elif kind == "artifact_append":
artifacts = front_matter.setdefault("availability", {}).setdefault(
"current_artifacts", []
)
append = patch["append"]
if append not in artifacts:
artifacts.append(append)
changed.append(f"{cap_id} availability.current_artifacts += {append}")
elif kind == "consumer_feedback":
feedback = front_matter.setdefault("evidence", {}).setdefault(
"consumer_feedback", []
)
append = patch.get("append") or patch.get("value")
if append and append not in feedback:
feedback.append(str(append))
changed.append(f"{cap_id} consumer_feedback +1")
elif kind == "relation_add":
rel = patch.get("value") or {}
rel_type = rel.get("type", "related_to")
target_id = rel.get("target")
if target_id:
relations = front_matter.setdefault("relations", {}).setdefault(rel_type, [])
if target_id not in relations:
relations.append(target_id)
changed.append(f"{cap_id} relations.{rel_type} += {target_id}")
elif kind == "maturity_promote":
changed.extend(_apply_maturity_promote(front_matter, patch))
row["vector"] = entry_vector(front_matter)
return changed
def apply_patches(repo_root: Path, patches: list[dict[str, Any]]) -> list[str]:
paths = registry_paths(repo_root)
index = load_index_at(paths["index"])
entry_cache: dict[str, dict[str, Any]] = {}
entry_paths: dict[str, Path] = {}
changed: list[str] = []
for patch in patches:
changed.extend(
_apply_patch_to_state(repo_root, patch, index, entry_cache, entry_paths)
)
if changed:
index["updated"] = date.today().isoformat()
paths["index"].write_text(
yaml.safe_dump(index, sort_keys=False, allow_unicode=True),
encoding="utf-8",
)
for cap_id, front_matter in entry_cache.items():
_write_front_matter(entry_paths[cap_id], front_matter)
return changed
def _patch_to_suggestion(patch: dict[str, Any]) -> dict[str, Any] | None:
kind = patch["kind"]
cap_id = patch["capability_id"]
if kind == "vector_sync":
return {
"capability_id": cap_id,
"kind": "vector_drift",
"apply_patch": {"field": "index.vector", "value": patch["value"]},
}
if kind == "evidence_append":
field = patch.get("field_path", "evidence.tests")
return {
"capability_id": cap_id,
"kind": "evidence_test",
"apply_patch": {"field": field, "append": patch["append"]},
}
if kind == "artifact_append":
return {
"capability_id": cap_id,
"kind": "availability_artifact",
"apply_patch": {"field": "availability.current_artifacts", "append": patch["append"]},
}
if kind == "index_updated_bump":
return {
"capability_id": cap_id,
"kind": "index_updated_stale",
"apply_patch": {"field": "index.updated", "value": patch.get("value")},
}
return None
def apply_patches_atomic(
repo_root: Path,
patches: list[dict[str, Any]],
*,
validate: Callable[[], tuple[int, list[str], list[str]]],
) -> tuple[list[str], int]:
if not patches:
return [], 0
session_dir = repo_root / ".reuse-surface-session"
backup_dir = session_dir / "backup"
if session_dir.exists():
shutil.rmtree(session_dir)
backup_dir.mkdir(parents=True, exist_ok=True)
paths = registry_paths(repo_root)
touched: set[Path] = set()
if paths["index"].exists():
rel = paths["index"].relative_to(repo_root)
dest = backup_dir / rel
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(paths["index"], dest)
touched.add(paths["index"])
index = load_index_at(paths["index"]) if paths["index"].exists() else {}
for row in index.get("capabilities", []):
entry_path = repo_root / row["path"]
if entry_path.exists():
rel = entry_path.relative_to(repo_root)
dest = backup_dir / rel
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(entry_path, dest)
touched.add(entry_path)
try:
changed = apply_patches(repo_root, patches)
code, errors, warnings = validate()
if code != 0:
for path in touched:
rel = path.relative_to(repo_root)
backup = backup_dir / rel
if backup.exists():
shutil.copy2(backup, path)
shutil.rmtree(session_dir, ignore_errors=True)
return changed, code
shutil.rmtree(session_dir, ignore_errors=True)
return changed, 0
except Exception:
for path in touched:
rel = path.relative_to(repo_root)
backup = backup_dir / rel
if backup.exists():
shutil.copy2(backup, path)
shutil.rmtree(session_dir, ignore_errors=True)
raise