Files
state-hub/scripts/update_agent_instruction_files.py
tegwick fcb41e8c25 Add STATE-WP-0067 attached-repo agent and workplan normalization
Infer workplan prefixes from on-disk filenames instead of first-token
derivation, add a frontmatter normalization script, and wire Make targets
for dirty-repo sweeps.
2026-06-22 23:15:15 +02:00

253 lines
8.3 KiB
Python

from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
import urllib.request
from collections import Counter
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
TEMPLATE_DIR = ROOT / "scripts" / "project_rules"
API_BASE = "http://127.0.0.1:8000"
HOME_ROOT = Path("/home/worsch")
WP_FILE_RE = re.compile(r"^([A-Z][A-Z0-9-]*-WP)-\d+")
def fetch(path: str):
with urllib.request.urlopen(f"{API_BASE}{path}") as response:
return json.load(response)
EXTENSION_MARKER = "<!-- REPO-AGENTS-EXTENSIONS -->"
def render(template: str, values: dict[str, str]) -> str:
for key, value in values.items():
template = template.replace("{" + key + "}", value)
return template
def read_agents_extensions(agents_path: Path) -> str:
if not agents_path.exists():
return ""
text = agents_path.read_text(encoding="utf-8")
if EXTENSION_MARKER not in text:
return ""
return text.split(EXTENSION_MARKER, 1)[1]
def build_agents_md(template: str, values: dict[str, str], extensions: str) -> str:
body = render(template, values)
if extensions.strip():
if EXTENSION_MARKER in body:
body = body.split(EXTENSION_MARKER, 1)[0] + EXTENSION_MARKER + extensions
else:
body = body.rstrip() + "\n\n" + EXTENSION_MARKER + extensions
return body
def repo_topic_id(repo: dict, topics: list[dict]) -> str:
if repo.get("topic_id"):
return repo["topic_id"]
match = next((t for t in topics if t.get("domain_slug") == repo.get("domain_slug")), None)
return match["id"] if match else "(none)"
def default_wp_prefix(repo_slug: str) -> str:
first = repo_slug.split("-", 1)[0].upper()
return f"{first}-WP"
def infer_wp_prefix(repo_path: Path, repo_slug: str) -> str:
"""Prefer established on-disk workplan prefixes over first-token derivation."""
counts: Counter[str] = Counter()
workplans_dir = repo_path / "workplans"
if workplans_dir.is_dir():
for workplan in workplans_dir.glob("*.md"):
if workplan.name.startswith("ADHOC"):
continue
match = WP_FILE_RE.match(workplan.name)
if match:
counts[match.group(1)] += 1
if not counts:
return default_wp_prefix(repo_slug)
top_prefix, top_count = counts.most_common(1)[0]
if len(counts) > 1:
print(
f"warning: {repo_slug} has multiple workplan prefixes {dict(counts)}; "
f"using {top_prefix} ({top_count} files)",
file=sys.stderr,
)
return top_prefix
def brief_domain(path: Path) -> str | None:
brief = path / ".custodian-brief.md"
if not brief.exists():
return None
match = re.search(r"^\*\*Domain:\*\*\s+(\S+)\s*$", brief.read_text(encoding="utf-8"), re.MULTILINE)
return match.group(1) if match else None
def dirty_repo_slugs(home: Path = HOME_ROOT) -> list[str]:
slugs: list[str] = []
for path in sorted(home.iterdir()):
if not (path / ".git").is_dir():
continue
result = subprocess.run(
["git", "-C", str(path), "status", "--porcelain"],
capture_output=True,
text=True,
check=False,
)
if result.stdout.strip():
slugs.append(path.name)
return slugs
def choose_repos(repos: list[dict], only_slugs: set[str] | None = None) -> list[dict]:
by_path: dict[str, list[dict]] = {}
for repo in repos:
local_path = repo.get("local_path") or ""
path = Path(local_path)
if not local_path.startswith("/home/worsch/") or not path.exists():
continue
by_path.setdefault(str(path), []).append(repo)
chosen: list[dict] = []
for local_path, candidates in sorted(by_path.items()):
path = Path(local_path)
domain = brief_domain(path)
if domain:
domain_matches = [r for r in candidates if r.get("domain_slug") == domain]
if domain_matches:
candidates = domain_matches
active = [r for r in candidates if r.get("status") == "active"]
chosen.append(active[0] if active else candidates[0])
if only_slugs is not None:
chosen = [repo for repo in chosen if repo.get("slug") in only_slugs]
return chosen
def update_repo(
repo: dict,
topics: list[dict],
*,
agents_template: str,
claude_template: str,
scope_template: str,
credential_routing_template: str,
rule_templates: dict[str, str],
) -> str:
path = Path(repo["local_path"])
repo_slug = repo["slug"]
project_name = repo.get("name") or path.name
description = repo.get("description") or f"{project_name} - (fill in purpose)"
prefix = infer_wp_prefix(path, repo_slug)
values = {
"PROJECT_NAME": project_name,
"PROJECT_DESCRIPTION": description,
"DOMAIN": repo.get("domain_slug") or "",
"TOPIC_ID": repo_topic_id(repo, topics),
"REPO_SLUG": repo_slug,
"WP_PREFIX": prefix,
"CREDENTIAL_ROUTING": render(
credential_routing_template,
{
"PROJECT_NAME": project_name,
"PROJECT_DESCRIPTION": description,
"DOMAIN": repo.get("domain_slug") or "",
"TOPIC_ID": repo_topic_id(repo, topics),
"REPO_SLUG": repo_slug,
"WP_PREFIX": prefix,
},
),
}
agents_path = path / "AGENTS.md"
extensions = read_agents_extensions(agents_path)
agents_path.write_text(build_agents_md(agents_template, values, extensions), encoding="utf-8")
(path / "CLAUDE.md").write_text(render(claude_template, values), encoding="utf-8")
scope_path = path / "SCOPE.md"
if not scope_path.exists():
scope_path.write_text(render(scope_template, values), encoding="utf-8")
rules_dir = path / ".claude" / "rules"
rules_dir.mkdir(parents=True, exist_ok=True)
for name, template in rule_templates.items():
(rules_dir / f"{name}.md").write_text(render(template, values), encoding="utf-8")
return f"{repo_slug}\t{path}\t{prefix}"
def main() -> int:
parser = argparse.ArgumentParser(description="Regenerate agent instruction files from templates.")
parser.add_argument("--repo", action="append", dest="repos", help="Limit to repo slug(s)")
parser.add_argument("--dirty", action="store_true", help="Limit to repos with local git changes")
args = parser.parse_args()
only_slugs: set[str] | None = None
if args.repos:
only_slugs = set(args.repos)
elif args.dirty:
only_slugs = set(dirty_repo_slugs())
repos = fetch("/repos/")
topics = fetch("/topics/?status=active")
agents_template = (TEMPLATE_DIR / "agents-codex.template").read_text(encoding="utf-8")
claude_template = (TEMPLATE_DIR / "claude-md.template").read_text(encoding="utf-8")
scope_template = (TEMPLATE_DIR / "scope.template").read_text(encoding="utf-8")
credential_routing_template = (TEMPLATE_DIR / "credential-routing.template").read_text(
encoding="utf-8"
)
rule_names = [
"repo-identity",
"session-protocol",
"first-session",
"workplan-convention",
"stack-and-commands",
"architecture",
"repo-boundary",
"credential-routing",
"agents",
]
rule_templates: dict[str, str] = {}
for name in rule_names:
if name == "credential-routing":
rule_templates[name] = (
"# Credential and access routing\n\n"
+ credential_routing_template.lstrip().removeprefix(
"## Credential and access routing\n\n"
)
)
else:
rule_templates[name] = (TEMPLATE_DIR / f"{name}.template").read_text(encoding="utf-8")
updated: list[str] = []
for repo in choose_repos(repos, only_slugs):
updated.append(
update_repo(
repo,
topics,
agents_template=agents_template,
claude_template=claude_template,
scope_template=scope_template,
credential_routing_template=credential_routing_template,
rule_templates=rule_templates,
)
)
print(f"Updated {len(updated)} local repo(s):")
for line in updated:
print(line)
return 0
if __name__ == "__main__":
raise SystemExit(main())