from __future__ import annotations import argparse import json import re import subprocess import sys import urllib.request from collections import Counter from pathlib import Path ROOT = Path(__file__).resolve().parent.parent TEMPLATE_DIR = ROOT / "scripts" / "project_rules" API_BASE = "http://127.0.0.1:8000" HOME_ROOT = Path("/home/worsch") WP_FILE_RE = re.compile(r"^([A-Za-z][A-Za-z0-9-]*-WP)-\d+", re.IGNORECASE) WP_BARE_RE = re.compile(r"^(WP)-\d+") ID_PREFIX_RE = re.compile(r"^id:\s*([A-Z][A-Z0-9-]*-WP)-\d+", re.MULTILINE) ID_BARE_RE = re.compile(r"^id:\s*(WP)-\d+", re.MULTILINE) def fetch(path: str): with urllib.request.urlopen(f"{API_BASE}{path}") as response: return json.load(response) EXTENSION_MARKER = "" def render(template: str, values: dict[str, str]) -> str: for key, value in values.items(): template = template.replace("{" + key + "}", value) return template def read_agents_extensions(agents_path: Path) -> str: if not agents_path.exists(): return "" text = agents_path.read_text(encoding="utf-8") if EXTENSION_MARKER not in text: return "" return text.split(EXTENSION_MARKER, 1)[1] def build_agents_md(template: str, values: dict[str, str], extensions: str) -> str: body = render(template, values) if extensions.strip(): if EXTENSION_MARKER in body: body = body.split(EXTENSION_MARKER, 1)[0] + EXTENSION_MARKER + extensions else: body = body.rstrip() + "\n\n" + EXTENSION_MARKER + extensions return body def repo_topic_id(repo: dict, topics: list[dict]) -> str: if repo.get("topic_id"): return repo["topic_id"] match = next((t for t in topics if t.get("domain_slug") == repo.get("domain_slug")), None) return match["id"] if match else "(none)" def default_wp_prefix(repo_slug: str) -> str: first = repo_slug.split("-", 1)[0].upper() return f"{first}-WP" def infer_wp_prefix(repo_path: Path, repo_slug: str) -> str: """Prefer established on-disk workplan prefixes over first-token derivation.""" counts: Counter[str] = Counter() workplans_dir = repo_path / "workplans" if workplans_dir.is_dir(): for workplan in workplans_dir.glob("*.md"): if workplan.name.startswith("ADHOC"): continue text = workplan.read_text(encoding="utf-8", errors="replace") id_match = ID_PREFIX_RE.search(text) or ID_BARE_RE.search(text) if id_match: counts[id_match.group(1)] += 1 continue file_match = WP_FILE_RE.match(workplan.name) or WP_BARE_RE.match(workplan.name) if file_match: counts[file_match.group(1).upper()] += 1 if not counts: return default_wp_prefix(repo_slug) top_prefix, top_count = counts.most_common(1)[0] if len(counts) > 1: print( f"warning: {repo_slug} has multiple workplan prefixes {dict(counts)}; " f"using {top_prefix} ({top_count} files)", file=sys.stderr, ) return top_prefix def brief_domain(path: Path) -> str | None: brief = path / ".custodian-brief.md" if not brief.exists(): return None match = re.search(r"^\*\*Domain:\*\*\s+(\S+)\s*$", brief.read_text(encoding="utf-8"), re.MULTILINE) return match.group(1) if match else None def dirty_repo_slugs(home: Path = HOME_ROOT) -> list[str]: slugs: list[str] = [] for path in sorted(home.iterdir()): if not (path / ".git").is_dir(): continue result = subprocess.run( ["git", "-C", str(path), "status", "--porcelain"], capture_output=True, text=True, check=False, ) if result.stdout.strip(): slugs.append(path.name) return slugs def choose_repos(repos: list[dict], only_slugs: set[str] | None = None) -> list[dict]: by_path: dict[str, list[dict]] = {} for repo in repos: local_path = repo.get("local_path") or "" path = Path(local_path) if not local_path.startswith("/home/worsch/") or not path.exists(): continue by_path.setdefault(str(path), []).append(repo) chosen: list[dict] = [] for local_path, candidates in sorted(by_path.items()): path = Path(local_path) domain = brief_domain(path) if domain: domain_matches = [r for r in candidates if r.get("domain_slug") == domain] if domain_matches: candidates = domain_matches active = [r for r in candidates if r.get("status") == "active"] chosen.append(active[0] if active else candidates[0]) if only_slugs is not None: chosen = [repo for repo in chosen if repo.get("slug") in only_slugs] return chosen def update_repo( repo: dict, topics: list[dict], *, agents_template: str, claude_template: str, scope_template: str, credential_routing_template: str, rule_templates: dict[str, str], ) -> str: path = Path(repo["local_path"]) repo_slug = repo["slug"] project_name = repo.get("name") or path.name description = repo.get("description") or f"{project_name} - (fill in purpose)" prefix = infer_wp_prefix(path, repo_slug) values = { "PROJECT_NAME": project_name, "PROJECT_DESCRIPTION": description, "DOMAIN": repo.get("domain_slug") or "", "TOPIC_ID": repo_topic_id(repo, topics), "REPO_SLUG": repo_slug, "WP_PREFIX": prefix, "CREDENTIAL_ROUTING": render( credential_routing_template, { "PROJECT_NAME": project_name, "PROJECT_DESCRIPTION": description, "DOMAIN": repo.get("domain_slug") or "", "TOPIC_ID": repo_topic_id(repo, topics), "REPO_SLUG": repo_slug, "WP_PREFIX": prefix, }, ), } agents_path = path / "AGENTS.md" extensions = read_agents_extensions(agents_path) agents_path.write_text(build_agents_md(agents_template, values, extensions), encoding="utf-8") (path / "CLAUDE.md").write_text(render(claude_template, values), encoding="utf-8") scope_path = path / "SCOPE.md" if not scope_path.exists(): scope_path.write_text(render(scope_template, values), encoding="utf-8") rules_dir = path / ".claude" / "rules" rules_dir.mkdir(parents=True, exist_ok=True) for name, template in rule_templates.items(): (rules_dir / f"{name}.md").write_text(render(template, values), encoding="utf-8") return f"{repo_slug}\t{path}\t{prefix}" def main() -> int: parser = argparse.ArgumentParser(description="Regenerate agent instruction files from templates.") parser.add_argument("--repo", action="append", dest="repos", help="Limit to repo slug(s)") parser.add_argument("--dirty", action="store_true", help="Limit to repos with local git changes") args = parser.parse_args() only_slugs: set[str] | None = None if args.repos: only_slugs = set(args.repos) elif args.dirty: only_slugs = set(dirty_repo_slugs()) repos = fetch("/repos/") topics = fetch("/topics/?status=active") agents_template = (TEMPLATE_DIR / "agents-codex.template").read_text(encoding="utf-8") claude_template = (TEMPLATE_DIR / "claude-md.template").read_text(encoding="utf-8") scope_template = (TEMPLATE_DIR / "scope.template").read_text(encoding="utf-8") credential_routing_template = (TEMPLATE_DIR / "credential-routing.template").read_text( encoding="utf-8" ) rule_names = [ "repo-identity", "session-protocol", "first-session", "workplan-convention", "stack-and-commands", "architecture", "repo-boundary", "credential-routing", "agents", ] rule_templates: dict[str, str] = {} for name in rule_names: if name == "credential-routing": rule_templates[name] = ( "# Credential and access routing\n\n" + credential_routing_template.lstrip().removeprefix( "## Credential and access routing\n\n" ) ) else: rule_templates[name] = (TEMPLATE_DIR / f"{name}.template").read_text(encoding="utf-8") updated: list[str] = [] for repo in choose_repos(repos, only_slugs): updated.append( update_repo( repo, topics, agents_template=agents_template, claude_template=claude_template, scope_template=scope_template, credential_routing_template=credential_routing_template, rule_templates=rule_templates, ) ) print(f"Updated {len(updated)} local repo(s):") for line in updated: print(line) return 0 if __name__ == "__main__": raise SystemExit(main())