Files
state-hub/scripts/normalize_attached_repo_workplans.py
tegwick ae2302df64 Fix workplan frontmatter join and prefix inference (STATE-WP-0067)
Repair glued --- delimiters, infer prefixes from frontmatter ids, and
support bare WP-* workplan schemes.
2026-06-22 23:16:15 +02:00

243 lines
7.5 KiB
Python

#!/usr/bin/env python3
"""Normalize workplan frontmatter and task status literals in attached repos."""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
import urllib.request
from collections import Counter
from pathlib import Path
API_BASE = "http://127.0.0.1:8000"
HOME_ROOT = Path("/home/worsch")
WP_FILE_RE = re.compile(r"^([A-Z][A-Z0-9-]*-WP)-\d+")
TASK_BLOCK_RE = re.compile(r"```task\n(.*?)```", re.DOTALL)
TASK_STATUS_MAP = {
"blocked": "wait",
"in_progress": "progress",
"cancelled": "cancel",
"canceled": "cancel",
}
def fetch(path: str):
with urllib.request.urlopen(f"{API_BASE}{path}") as response:
return json.load(response)
def dirty_repo_slugs(home: Path = HOME_ROOT) -> list[str]:
slugs: list[str] = []
for path in sorted(home.iterdir()):
if not (path / ".git").is_dir():
continue
result = subprocess.run(
["git", "-C", str(path), "status", "--porcelain"],
capture_output=True,
text=True,
check=False,
)
if result.stdout.strip():
slugs.append(path.name)
return slugs
def choose_repos(repos: list[dict], only_slugs: set[str] | None) -> list[dict]:
by_slug = {repo["slug"]: repo for repo in repos if repo.get("slug")}
if only_slugs is not None:
return [by_slug[slug] for slug in sorted(only_slugs) if slug in by_slug]
return sorted(by_slug.values(), key=lambda repo: repo["slug"])
def repair_frontmatter_delimiter(text: str) -> str:
"""Fix a glued closing --- delimiter introduced by an earlier buggy join."""
if not text.startswith("---\n"):
return text
repaired = re.sub(r'\"---', '"\n---', text, count=1)
repaired = re.sub(r"(\d{4}-\d{2}-\d{2})---", r"\1\n---", repaired, count=1)
repaired = re.sub(r"([0-9a-f-]{36})---", r"\1\n---", repaired, count=1)
if repaired != text and not repaired.split("---", 2)[1].endswith("\n"):
repaired = repaired.replace("\n---\n", "\n---\n", 1)
return repaired
def split_frontmatter(text: str) -> tuple[str | None, str]:
text = repair_frontmatter_delimiter(text)
if not text.startswith("---\n"):
return None, text
end = text.find("\n---", 4)
if end == -1:
return None, text
body = text[end + 4 :]
if body and not body.startswith("\n"):
body = "\n" + body
return text[4:end], body
def join_frontmatter(frontmatter: str, body: str) -> str:
fm = frontmatter.rstrip("\n") + "\n"
if body and not body.startswith("\n"):
body = "\n" + body
return f"---\n{fm}---{body}"
def normalize_frontmatter(frontmatter: str, domain_slug: str, topic_slug: str | None) -> tuple[str, bool]:
changed = False
fm = frontmatter
if domain_slug:
new_fm, count = re.subn(
r"^domain:\s*.+$",
f"domain: {domain_slug}",
fm,
count=1,
flags=re.MULTILINE,
)
if count:
fm = new_fm
changed = True
elif "domain:" not in fm:
fm = fm.rstrip() + f"\ndomain: {domain_slug}\n"
changed = True
if topic_slug:
if re.search(r"^topic_slug:\s", fm, re.MULTILINE):
new_fm, count = re.subn(
r"^topic_slug:\s*.+$",
f"topic_slug: {topic_slug}",
fm,
count=1,
flags=re.MULTILINE,
)
if count:
fm = new_fm
changed = True
else:
if re.search(r"^domain:\s", fm, re.MULTILINE):
fm = re.sub(
r"^(domain:\s*.+)$",
rf"\1\ntopic_slug: {topic_slug}",
fm,
count=1,
flags=re.MULTILINE,
)
else:
fm = fm.rstrip() + f"\ntopic_slug: {topic_slug}\n"
changed = True
return fm, changed
def normalize_task_blocks(body: str) -> tuple[str, bool]:
changed = False
def repl(match: re.Match[str]) -> str:
nonlocal changed
block = match.group(1)
updated = block
for legacy, canon in TASK_STATUS_MAP.items():
new_block, count = re.subn(
rf"^status:\s*{re.escape(legacy)}\s*$",
f"status: {canon}",
updated,
count=1,
flags=re.MULTILINE,
)
if count:
updated = new_block
changed = True
return f"```task\n{updated}```"
return TASK_BLOCK_RE.sub(repl, body), changed
def normalize_workplan_file(
path: Path,
domain_slug: str,
topic_slug: str | None,
*,
dry_run: bool,
) -> bool:
original = path.read_text(encoding="utf-8")
repaired = repair_frontmatter_delimiter(original)
frontmatter, body = split_frontmatter(repaired)
if frontmatter is None:
return False
fm, fm_changed = normalize_frontmatter(frontmatter, domain_slug, topic_slug)
body, body_changed = normalize_task_blocks(body)
delimiter_changed = repaired != original
if not (fm_changed or body_changed or delimiter_changed):
return False
updated = join_frontmatter(fm, body)
if not dry_run:
path.write_text(updated, encoding="utf-8")
return True
def repo_topic_slug(repo: dict, topics_by_id: dict[str, dict]) -> str | None:
topic_id = repo.get("topic_id")
if not topic_id:
return None
topic = topics_by_id.get(topic_id)
return topic.get("slug") if topic else None
def normalize_repo(repo: dict, topics_by_id: dict[str, dict], *, dry_run: bool) -> list[str]:
path = Path(repo["local_path"])
workplans_dir = path / "workplans"
if not workplans_dir.is_dir():
return []
domain_slug = repo.get("domain_slug") or ""
topic_slug = repo_topic_slug(repo, topics_by_id)
updated_files: list[str] = []
for workplan in sorted(workplans_dir.glob("*.md")):
if workplan.name.startswith("ADHOC"):
continue
if normalize_workplan_file(workplan, domain_slug, topic_slug, dry_run=dry_run):
updated_files.append(str(workplan.relative_to(path)))
return updated_files
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--repo", action="append", dest="repos", help="Repo slug to normalize")
parser.add_argument("--dirty", action="store_true", help="Normalize repos with local git changes")
parser.add_argument("--dry-run", action="store_true", help="Report changes without writing")
args = parser.parse_args()
only_slugs: set[str] | None
if args.repos:
only_slugs = set(args.repos)
elif args.dirty:
only_slugs = set(dirty_repo_slugs())
else:
parser.error("Specify --repo SLUG and/or --dirty")
repos = fetch("/repos/")
topics = fetch("/topics/?status=active")
topics_by_id = {topic["id"]: topic for topic in topics}
selected = choose_repos(repos, only_slugs)
total_files = 0
for repo in selected:
updated = normalize_repo(repo, topics_by_id, dry_run=args.dry_run)
if updated:
total_files += len(updated)
mode = "would update" if args.dry_run else "updated"
print(f"{repo['slug']}: {mode} {len(updated)} workplan(s)")
for name in updated:
print(f" - {name}")
print(f"Done. {total_files} workplan file(s) {'would change' if args.dry_run else 'changed'}.")
return 0
if __name__ == "__main__":
sys.exit(main())