from __future__ import annotations import shutil from datetime import date from pathlib import Path from typing import Any, Callable import yaml from reuse_surface.registry import ( LEVEL_ORDERS, entry_vector, load_index_at, parse_front_matter, registry_paths, vectors_match, ) SAFE_DETERMINISTIC_KINDS = frozenset( { "vector_sync", "vector_drift", "evidence_append", "evidence_test", "artifact_append", "availability_artifact", "index_updated_bump", "index_row_add", "evidence_workflow", "evidence_documentation", } ) CONFIDENCE_ORDER = {"low": 0, "medium": 1, "high": 2} DIMENSION_LEVEL_PREFIX = { "discovery": "D", "availability": "A", "completeness": "C", "reliability": "R", } def suggestion_to_patch(suggestion: dict[str, Any]) -> dict[str, Any] | None: kind = suggestion.get("kind") if kind == "missing_entry": return None patch_body = suggestion.get("apply_patch") if not patch_body: return None cap_id = suggestion["capability_id"] rationale = suggestion.get("detail", "deterministic signal") if kind == "vector_drift": return { "capability_id": cap_id, "kind": "vector_sync", "confidence": "high", "rationale": rationale, "value": patch_body["value"], } if kind in {"evidence_test", "evidence_workflow", "evidence_documentation"}: return { "capability_id": cap_id, "kind": "evidence_append", "confidence": "high", "rationale": rationale, "field_path": patch_body["field"], "append": patch_body["append"], } if kind == "availability_artifact": return { "capability_id": cap_id, "kind": "artifact_append", "confidence": "high", "rationale": rationale, "append": patch_body["append"], } if kind == "index_row_add": return { "capability_id": cap_id, "kind": "index_row_add", "confidence": "high", "rationale": rationale, "index_row": patch_body.get("index_row", {}), } if kind == "index_updated_stale": return { "capability_id": cap_id, "kind": "index_updated_bump", "confidence": "high", "rationale": rationale, "value": patch_body.get("value", date.today().isoformat()), } return None def patches_from_suggestions(suggestions: list[dict[str, Any]]) -> list[dict[str, Any]]: patches: list[dict[str, Any]] = [] for item in suggestions: patch = suggestion_to_patch(item) if patch: patches.append(patch) return patches def is_safe_patch(patch: dict[str, Any]) -> bool: return patch.get("kind") in SAFE_DETERMINISTIC_KINDS def level_delta(dimension: str, from_level: str, to_level: str) -> int: order = LEVEL_ORDERS[dimension] return order.index(to_level) - order.index(from_level) def evidence_gate(repo_root: Path, patch: dict[str, Any]) -> bool: if patch.get("kind") != "maturity_promote": return True citations = patch.get("evidence_citations") or [] if not citations: return False return all((repo_root / path).exists() for path in citations) def promotion_delta_gate(patch: dict[str, Any], max_delta: int) -> bool: if patch.get("kind") != "maturity_promote": return True dimension = patch.get("dimension") from_level = patch.get("from_level") to_level = patch.get("to_level") if not dimension or not from_level or not to_level: return False delta = level_delta(dimension, from_level, to_level) return 0 < delta <= max_delta def confidence_gate(patch: dict[str, Any], minimum: str) -> bool: return CONFIDENCE_ORDER[patch.get("confidence", "low")] >= CONFIDENCE_ORDER[minimum] def filter_auto_patches( patches: list[dict[str, Any]], repo_root: Path, *, auto_confidence: str = "high", auto_max_delta: int = 1, ) -> list[dict[str, Any]]: selected: list[dict[str, Any]] = [] for patch in patches: if is_safe_patch(patch): selected.append(patch) continue if not confidence_gate(patch, auto_confidence): continue if not evidence_gate(repo_root, patch): continue if not promotion_delta_gate(patch, auto_max_delta): continue selected.append(patch) return selected def _write_front_matter(path: Path, front_matter: dict[str, Any]) -> None: text = path.read_text(encoding="utf-8") marker_end = text.find("\n---", 4) body = text[marker_end + 4 :] if marker_end != -1 else "\n" path.write_text( "---\n" + yaml.safe_dump(front_matter, sort_keys=False, allow_unicode=True) + "---" + body, encoding="utf-8", ) def _apply_maturity_promote( front_matter: dict[str, Any], patch: dict[str, Any], ) -> list[str]: dimension = patch["dimension"] to_level = patch["to_level"] changed: list[str] = [] if dimension in {"discovery", "availability"}: front_matter.setdefault("maturity", {}).setdefault(dimension, {})["current"] = to_level if dimension == "availability": front_matter.setdefault("availability", {})["current_level"] = to_level changed.append(f"maturity.{dimension}.current -> {to_level}") else: key = "completeness" if dimension == "completeness" else "reliability" front_matter.setdefault("external_evidence", {}).setdefault(key, {})["level"] = to_level changed.append(f"external_evidence.{key}.level -> {to_level}") entry = patch.get("promotion_history_entry") if entry: history = front_matter.setdefault("promotion_history", []) history.append(entry) changed.append("promotion_history +1") return changed def _apply_patch_to_state( repo_root: Path, patch: dict[str, Any], index: dict[str, Any], entry_cache: dict[str, dict[str, Any]], entry_paths: dict[str, Path], ) -> list[str]: cap_id = patch["capability_id"] kind = patch["kind"] changed: list[str] = [] index_by_id = {row["id"]: row for row in index.get("capabilities", [])} if kind == "index_updated_bump": index["updated"] = patch.get("value", date.today().isoformat()) return ["index.updated bumped"] if kind == "index_row_add": row = patch.get("index_row", {}) if cap_id not in index_by_id and row: index.setdefault("capabilities", []).append(row) changed.append(f"index row added for {cap_id}") return changed row = index_by_id.get(cap_id) if not row: return changed if kind == "vector_sync": row["vector"] = patch["value"] changed.append(f"index vector for {cap_id}") return changed entry_path = repo_root / row["path"] if cap_id not in entry_cache: entry_cache[cap_id] = parse_front_matter(entry_path) entry_paths[cap_id] = entry_path front_matter = entry_cache[cap_id] if kind == "evidence_append": field = patch.get("field_path", "evidence.tests") parts = field.split(".") target = front_matter for part in parts[:-1]: target = target.setdefault(part, {}) items = target.setdefault(parts[-1], []) append = patch["append"] if append not in items: items.append(append) changed.append(f"{cap_id} {field} += {append}") elif kind == "artifact_append": artifacts = front_matter.setdefault("availability", {}).setdefault( "current_artifacts", [] ) append = patch["append"] if append not in artifacts: artifacts.append(append) changed.append(f"{cap_id} availability.current_artifacts += {append}") elif kind == "consumer_feedback": feedback = front_matter.setdefault("evidence", {}).setdefault( "consumer_feedback", [] ) append = patch.get("append") or patch.get("value") if append and append not in feedback: feedback.append(str(append)) changed.append(f"{cap_id} consumer_feedback +1") elif kind == "relation_add": rel = patch.get("value") or {} rel_type = rel.get("type", "related_to") target_id = rel.get("target") if target_id: relations = front_matter.setdefault("relations", {}).setdefault(rel_type, []) if target_id not in relations: relations.append(target_id) changed.append(f"{cap_id} relations.{rel_type} += {target_id}") elif kind == "maturity_promote": changed.extend(_apply_maturity_promote(front_matter, patch)) row["vector"] = entry_vector(front_matter) return changed def apply_patches(repo_root: Path, patches: list[dict[str, Any]]) -> list[str]: paths = registry_paths(repo_root) index = load_index_at(paths["index"]) entry_cache: dict[str, dict[str, Any]] = {} entry_paths: dict[str, Path] = {} changed: list[str] = [] for patch in patches: changed.extend( _apply_patch_to_state(repo_root, patch, index, entry_cache, entry_paths) ) if changed: index["updated"] = date.today().isoformat() paths["index"].write_text( yaml.safe_dump(index, sort_keys=False, allow_unicode=True), encoding="utf-8", ) for cap_id, front_matter in entry_cache.items(): _write_front_matter(entry_paths[cap_id], front_matter) return changed def _patch_to_suggestion(patch: dict[str, Any]) -> dict[str, Any] | None: kind = patch["kind"] cap_id = patch["capability_id"] if kind == "vector_sync": return { "capability_id": cap_id, "kind": "vector_drift", "apply_patch": {"field": "index.vector", "value": patch["value"]}, } if kind == "evidence_append": field = patch.get("field_path", "evidence.tests") return { "capability_id": cap_id, "kind": "evidence_test", "apply_patch": {"field": field, "append": patch["append"]}, } if kind == "artifact_append": return { "capability_id": cap_id, "kind": "availability_artifact", "apply_patch": {"field": "availability.current_artifacts", "append": patch["append"]}, } if kind == "index_updated_bump": return { "capability_id": cap_id, "kind": "index_updated_stale", "apply_patch": {"field": "index.updated", "value": patch.get("value")}, } return None def apply_patches_atomic( repo_root: Path, patches: list[dict[str, Any]], *, validate: Callable[[], tuple[int, list[str], list[str]]], ) -> tuple[list[str], int]: if not patches: return [], 0 session_dir = repo_root / ".reuse-surface-session" backup_dir = session_dir / "backup" if session_dir.exists(): shutil.rmtree(session_dir) backup_dir.mkdir(parents=True, exist_ok=True) paths = registry_paths(repo_root) touched: set[Path] = set() if paths["index"].exists(): rel = paths["index"].relative_to(repo_root) dest = backup_dir / rel dest.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(paths["index"], dest) touched.add(paths["index"]) index = load_index_at(paths["index"]) if paths["index"].exists() else {} for row in index.get("capabilities", []): entry_path = repo_root / row["path"] if entry_path.exists(): rel = entry_path.relative_to(repo_root) dest = backup_dir / rel dest.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(entry_path, dest) touched.add(entry_path) try: changed = apply_patches(repo_root, patches) code, errors, warnings = validate() if code != 0: for path in touched: rel = path.relative_to(repo_root) backup = backup_dir / rel if backup.exists(): shutil.copy2(backup, path) shutil.rmtree(session_dir, ignore_errors=True) return changed, code shutil.rmtree(session_dir, ignore_errors=True) return changed, 0 except Exception: for path in touched: rel = path.relative_to(repo_root) backup = backup_dir / rel if backup.exists(): shutil.copy2(backup, path) shutil.rmtree(session_dir, ignore_errors=True) raise