#!/usr/bin/env python3 """Normalize workplan frontmatter and task status literals in attached repos.""" from __future__ import annotations import argparse import json import re import subprocess import sys import urllib.request from collections import Counter from pathlib import Path API_BASE = "http://127.0.0.1:8000" HOME_ROOT = Path("/home/worsch") WP_FILE_RE = re.compile(r"^([A-Z][A-Z0-9-]*-WP)-\d+") TASK_BLOCK_RE = re.compile(r"```task\n(.*?)```", re.DOTALL) TASK_STATUS_MAP = { "blocked": "wait", "in_progress": "progress", "cancelled": "cancel", "canceled": "cancel", } def fetch(path: str): with urllib.request.urlopen(f"{API_BASE}{path}") as response: return json.load(response) def dirty_repo_slugs(home: Path = HOME_ROOT) -> list[str]: slugs: list[str] = [] for path in sorted(home.iterdir()): if not (path / ".git").is_dir(): continue result = subprocess.run( ["git", "-C", str(path), "status", "--porcelain"], capture_output=True, text=True, check=False, ) if result.stdout.strip(): slugs.append(path.name) return slugs def choose_repos(repos: list[dict], only_slugs: set[str] | None) -> list[dict]: by_slug = {repo["slug"]: repo for repo in repos if repo.get("slug")} if only_slugs is not None: return [by_slug[slug] for slug in sorted(only_slugs) if slug in by_slug] return sorted(by_slug.values(), key=lambda repo: repo["slug"]) def repair_frontmatter_delimiter(text: str) -> str: """Fix a glued closing --- delimiter introduced by an earlier buggy join.""" if not text.startswith("---\n"): return text repaired = re.sub(r'\"---', '"\n---', text, count=1) repaired = re.sub(r"(\d{4}-\d{2}-\d{2})---", r"\1\n---", repaired, count=1) repaired = re.sub(r"([0-9a-f-]{36})---", r"\1\n---", repaired, count=1) if repaired != text and not repaired.split("---", 2)[1].endswith("\n"): repaired = repaired.replace("\n---\n", "\n---\n", 1) return repaired def split_frontmatter(text: str) -> tuple[str | None, str]: text = repair_frontmatter_delimiter(text) if not text.startswith("---\n"): return None, text end = text.find("\n---", 4) if end == -1: return None, text body = text[end + 4 :] if body and not body.startswith("\n"): body = "\n" + body return text[4:end], body def join_frontmatter(frontmatter: str, body: str) -> str: fm = frontmatter.rstrip("\n") + "\n" if body and not body.startswith("\n"): body = "\n" + body return f"---\n{fm}---{body}" def normalize_frontmatter(frontmatter: str, domain_slug: str, topic_slug: str | None) -> tuple[str, bool]: changed = False fm = frontmatter if domain_slug: new_fm, count = re.subn( r"^domain:\s*.+$", f"domain: {domain_slug}", fm, count=1, flags=re.MULTILINE, ) if count: fm = new_fm changed = True elif "domain:" not in fm: fm = fm.rstrip() + f"\ndomain: {domain_slug}\n" changed = True if topic_slug: if re.search(r"^topic_slug:\s", fm, re.MULTILINE): new_fm, count = re.subn( r"^topic_slug:\s*.+$", f"topic_slug: {topic_slug}", fm, count=1, flags=re.MULTILINE, ) if count: fm = new_fm changed = True else: if re.search(r"^domain:\s", fm, re.MULTILINE): fm = re.sub( r"^(domain:\s*.+)$", rf"\1\ntopic_slug: {topic_slug}", fm, count=1, flags=re.MULTILINE, ) else: fm = fm.rstrip() + f"\ntopic_slug: {topic_slug}\n" changed = True return fm, changed def normalize_task_blocks(body: str) -> tuple[str, bool]: changed = False def repl(match: re.Match[str]) -> str: nonlocal changed block = match.group(1) updated = block for legacy, canon in TASK_STATUS_MAP.items(): new_block, count = re.subn( rf"^status:\s*{re.escape(legacy)}\s*$", f"status: {canon}", updated, count=1, flags=re.MULTILINE, ) if count: updated = new_block changed = True return f"```task\n{updated}```" return TASK_BLOCK_RE.sub(repl, body), changed def normalize_workplan_file( path: Path, domain_slug: str, topic_slug: str | None, *, dry_run: bool, ) -> bool: original = path.read_text(encoding="utf-8") repaired = repair_frontmatter_delimiter(original) frontmatter, body = split_frontmatter(repaired) if frontmatter is None: return False fm, fm_changed = normalize_frontmatter(frontmatter, domain_slug, topic_slug) body, body_changed = normalize_task_blocks(body) delimiter_changed = repaired != original if not (fm_changed or body_changed or delimiter_changed): return False updated = join_frontmatter(fm, body) if not dry_run: path.write_text(updated, encoding="utf-8") return True def repo_topic_slug(repo: dict, topics_by_id: dict[str, dict]) -> str | None: topic_id = repo.get("topic_id") if not topic_id: return None topic = topics_by_id.get(topic_id) return topic.get("slug") if topic else None def normalize_repo(repo: dict, topics_by_id: dict[str, dict], *, dry_run: bool) -> list[str]: path = Path(repo["local_path"]) workplans_dir = path / "workplans" if not workplans_dir.is_dir(): return [] domain_slug = repo.get("domain_slug") or "" topic_slug = repo_topic_slug(repo, topics_by_id) updated_files: list[str] = [] for workplan in sorted(workplans_dir.glob("*.md")): if workplan.name.startswith("ADHOC"): continue if normalize_workplan_file(workplan, domain_slug, topic_slug, dry_run=dry_run): updated_files.append(str(workplan.relative_to(path))) return updated_files def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--repo", action="append", dest="repos", help="Repo slug to normalize") parser.add_argument("--dirty", action="store_true", help="Normalize repos with local git changes") parser.add_argument("--dry-run", action="store_true", help="Report changes without writing") args = parser.parse_args() only_slugs: set[str] | None if args.repos: only_slugs = set(args.repos) elif args.dirty: only_slugs = set(dirty_repo_slugs()) else: parser.error("Specify --repo SLUG and/or --dirty") repos = fetch("/repos/") topics = fetch("/topics/?status=active") topics_by_id = {topic["id"]: topic for topic in topics} selected = choose_repos(repos, only_slugs) total_files = 0 for repo in selected: updated = normalize_repo(repo, topics_by_id, dry_run=args.dry_run) if updated: total_files += len(updated) mode = "would update" if args.dry_run else "updated" print(f"{repo['slug']}: {mode} {len(updated)} workplan(s)") for name in updated: print(f" - {name}") print(f"Done. {total_files} workplan file(s) {'would change' if args.dry_run else 'changed'}.") return 0 if __name__ == "__main__": sys.exit(main())