#!/usr/bin/env python3 """ custodian — CLI for the Custodian State Hub. Usage: custodian register-project [--domain DOMAIN] [--path PATH] Run from inside the project directory you want to connect. --domain defaults to auto-detection from the project charter. --path defaults to current working directory. """ from __future__ import annotations import argparse import json import os import re import subprocess import sys import urllib.error import urllib.request from pathlib import Path from statehub_register import run_register as run_statehub_register STATE_HUB_DIR = Path(__file__).resolve().parent API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000") TEMPLATE = STATE_HUB_DIR / "scripts" / "project_claude_md.template" PATCH_CWD = STATE_HUB_DIR / "scripts" / "patch_mcp_cwd.py" _SUGGESTION_PREAMBLE = """\ """ _ONBOARDING_TASKS = [ ( "Integrate CLAUDE.custodian.md → CLAUDE.md", "high", "A CLAUDE.custodian.md suggestion file was written by the custodian registration workflow. " "Read both files, merge the hub integration block into the existing CLAUDE.md " "(preserve all project-specific conventions), then delete CLAUDE.custodian.md and commit.", ), ( "Write first workplan and initialise workplans/", "high", "Create a workplans/ directory and write the first workplan file following ADR-001 " "(~/the-custodian/canon/architecture/adr-001-workplans-as-repo-artefacts.md). " "Cover the repo's primary near-term work strand. Register the workstream in the state hub via MCP.", ), ( "Ingest SBOM", "medium", # path substituted at call time "", ), ( "Register known EPs and TDs", "low", "Catalogue any known extension points (future enhancement hooks) and technical debt items " "using the register_extension_point() and register_technical_debt() MCP tools.", ), ] # ── Helpers ──────────────────────────────────────────────────────────────────── def _api_get(path: str) -> object: url = API_BASE.rstrip("/") + path try: with urllib.request.urlopen(url, timeout=10) as r: return json.loads(r.read()) except urllib.error.URLError as e: print(f"ERROR: Cannot reach API at {API_BASE}: {e}") print(f" Start it: cd {STATE_HUB_DIR} && make api") sys.exit(1) def _api_post(path: str, body: dict) -> object: url = API_BASE.rstrip("/") + path data = json.dumps({k: v for k, v in body.items() if v is not None}).encode() req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}) with urllib.request.urlopen(req, timeout=10) as r: return json.loads(r.read()) def _api_patch(path: str, body: dict) -> object: url = API_BASE.rstrip("/") + path data = json.dumps({k: v for k, v in body.items() if v is not None}).encode() req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json"}, method="PATCH", ) with urllib.request.urlopen(req, timeout=10) as r: return json.loads(r.read()) def _find_repo_by_slug(repo_slug: str) -> dict | None: repos = _api_get("/repos/") return next((r for r in repos if r.get("slug") == repo_slug), None) def _detect_domain(project_path: Path) -> str | None: """Try to read domain from project charter frontmatter.""" for charter in project_path.rglob("project_charter_v*.md"): text = charter.read_text() m = re.search(r"^domain:\s*(\S+)", text, re.MULTILINE) if m: return m.group(1).strip('"\'') return None def _check_mcp() -> bool: claude_json = Path.home() / ".claude.json" if not claude_json.exists(): return False config = json.loads(claude_json.read_text()) return "state-hub" in config.get("mcpServers", {}) # ── Subcommands ──────────────────────────────────────────────────────────────── def cmd_register(args: argparse.Namespace) -> None: """Register a project/repo with the State Hub and generate onboarding tasks.""" project_path = Path(args.path).resolve() if not project_path.is_dir(): print(f"ERROR: {project_path} is not a directory.") sys.exit(1) project_name = project_path.name repo_slug = re.sub(r"-+", "-", re.sub(r"[^a-z0-9]", "-", project_name.lower())).strip("-") # ── Step 1: API health ───────────────────────────────────────────────────── print(f"==> Checking API at {API_BASE} ...") _api_get("/state/health") print(" API OK") # ── Step 2: Domain ───────────────────────────────────────────────────────── domain = args.domain valid_domains = [d["slug"] for d in _api_get("/domains/?status=active")] if not domain: print("==> Auto-detecting domain from project charter ...") domain = _detect_domain(project_path) if domain: print(f" Detected: {domain}") else: print("ERROR: Could not auto-detect domain. Pass --domain explicitly.") print(f" Valid: {', '.join(valid_domains)}") sys.exit(1) if domain not in valid_domains: print(f"ERROR: Unknown domain '{domain}'. Valid: {', '.join(valid_domains)}") sys.exit(1) # ── Step 3: Topic ID lookup (auto-create if new domain) ─────────────────── print(f"==> Looking up topic for domain '{domain}' ...") topics = _api_get("/topics/?status=active") match = next((t for t in topics if t.get("domain_slug") == domain), None) if not match: print(f" No topic found — creating one for domain '{domain}' ...") t_slug = re.sub(r"[^a-z0-9]+", "-", domain.lower()).strip("-") try: match = _api_post("/topics/", { "slug": t_slug, "title": project_name, "domain": domain, "status": "active", }) print(f" Topic created: {match['title']} ({match['id']})") except Exception as e: print(f"ERROR: Could not create topic for domain '{domain}': {e}") sys.exit(1) topic_id = match["id"] print(f" topic_id: {topic_id}") # ── Step 4: MCP check ────────────────────────────────────────────────────── print("==> Checking MCP server registration ...") if _check_mcp(): print(" MCP OK") else: print("WARNING: 'state-hub' not in ~/.claude.json.") print(" See ~/.claude/CLAUDE.md → MCP Server Registration section.") # ── Step 5: Write CLAUDE.custodian.md ───────────────────────────────────── suggestion_file = project_path / "CLAUDE.custodian.md" print(f"==> Writing custodian suggestion to {suggestion_file} ...") content = ( _SUGGESTION_PREAMBLE + TEMPLATE.read_text() .replace("{PROJECT_NAME}", project_name) .replace("{DOMAIN}", domain) .replace("{TOPIC_ID}", topic_id) .replace("{REPO_SLUG}", repo_slug) ) suggestion_file.write_text(content) print(" Written. The repo agent integrates it into CLAUDE.md then deletes it.") # ── Step 6: Register repo ───────────────────────────────────────────────── print(f"==> Registering repo '{repo_slug}' under domain '{domain}' ...") repo = None try: repo = _api_post("/repos/", { "domain_slug": domain, "slug": repo_slug, "name": project_name, "local_path": str(project_path), }) print(" Registered.") except urllib.error.HTTPError as e: if e.code != 409: print(f" NOTE: {e} — repo registration failed, continuing.") else: print(" Repo already registered, reusing existing record.") repo = _find_repo_by_slug(repo_slug) except Exception as e: print(f" NOTE: {e} — repo may already be registered, continuing.") repo = _find_repo_by_slug(repo_slug) repo_id = repo.get("id") if isinstance(repo, dict) else None if repo_id: print(f" repo_id: {repo_id}") else: print(" WARNING: Could not resolve repo_id; onboarding workstream will remain domain-level.") # ── Step 7: Onboarding workstream + tasks ───────────────────────────────── ws_slug = f"repo-integration-{repo_slug}" print(f"==> Creating onboarding workstream '{ws_slug}' ...") # Check if it already exists existing_ws = next( (w for w in _api_get("/workstreams/") if w.get("slug") == ws_slug and w.get("status") == "active"), None, ) if existing_ws: print(" Onboarding workstream already exists — skipping task creation.") if repo_id and not existing_ws.get("repo_id"): existing_owner = existing_ws.get("owner") _api_patch(f"/workstreams/{existing_ws['id']}/", { "repo_id": repo_id, "owner": repo_slug if existing_owner in (None, domain) else existing_owner, }) print(" Attached existing onboarding workstream to repo.") elif repo_id and existing_ws.get("repo_id") != repo_id: print( " WARNING: Existing onboarding workstream is attached to a different repo_id; " "leaving it unchanged." ) else: try: ws = _api_post("/workstreams/", { "topic_id": topic_id, "title": f"Repo Integration: {repo_slug}", "slug": ws_slug, "description": ( f"Bootstrapping workstream created by the custodian during registration of " f"'{repo_slug}'. Contains onboarding tasks for the repo agent to execute. " f"ADR-001 exception: this workstream is DB-first because the repo has no " f"workplans/ directory yet. Task T2 produces the first workplan file." ), "owner": repo_slug, "status": "active", "repo_id": repo_id, }) ws_id = ws["id"] sbom_desc = ( f"Capture the repo's dependency snapshot. From state-hub dir: " f"make ingest-sbom REPO={repo_slug} SCAN=1 REPO_PATH={project_path}" ) tasks = [ (_ONBOARDING_TASKS[0][0], _ONBOARDING_TASKS[0][1], _ONBOARDING_TASKS[0][2]), (_ONBOARDING_TASKS[1][0], _ONBOARDING_TASKS[1][1], _ONBOARDING_TASKS[1][2]), (_ONBOARDING_TASKS[2][0], _ONBOARDING_TASKS[2][1], sbom_desc), (_ONBOARDING_TASKS[3][0], _ONBOARDING_TASKS[3][1], _ONBOARDING_TASKS[3][2]), ] for title, priority, description in tasks: _api_post("/tasks/", { "workstream_id": ws_id, "title": title, "priority": priority, "description": description, }) print(f" Created with {len(tasks)} onboarding tasks.") print(f" The {domain} repo agent will see these at next session start.") except Exception as e: print(f" WARNING: Could not create onboarding tasks: {e}") ws_id = None # ── Step 8: Progress event ───────────────────────────────────────────────── print("==> Recording registration event ...") try: _api_post("/progress/", { "topic_id": topic_id, "event_type": "milestone", "summary": f"Repo registered: {project_name} ({domain}) — onboarding tasks created", "author": "custodian", "detail": { "project_path": str(project_path), "suggestion_file": str(suggestion_file), "repo_slug": repo_slug, "domain": domain, "onboarding_workstream_slug": ws_slug, }, }) print(" Event recorded.") except Exception as e: print(f" WARNING: Could not record progress event: {e}") print() print("Registration complete!") print(f" Project: {project_name}") print(f" Domain: {domain}") print(f" Repo slug: {repo_slug}") print(f" Topic ID: {topic_id}") print(f" Suggestion: {suggestion_file}") print() print("Next: open the repo in Claude Code.") print(" The repo agent will pick up 4 onboarding tasks and integrate autonomously.") def cmd_ingest_sbom(args: argparse.Namespace) -> None: """Ingest SBOM for the current (or specified) repo. Auto-detects slug from registration.""" project_path = Path(args.path).resolve() _api_get("/state/health") # Resolve repo slug: explicit override, or look up by local_path repo_slug = args.slug if not repo_slug: repos = _api_get("/repos/") repo = next((r for r in repos if r.get("local_path") == str(project_path)), None) if not repo: print(f"ERROR: No registered repo found for path '{project_path}'.") print(" Register first: custodian register-project --domain ") print(" Or pass --slug explicitly.") sys.exit(1) repo_slug = repo["slug"] print(f"==> Ingesting SBOM for '{repo_slug}' from {project_path} ...") python = STATE_HUB_DIR / ".venv" / "bin" / "python" ingest_script = STATE_HUB_DIR / "scripts" / "ingest_sbom.py" if not python.exists(): print(f"ERROR: .venv not found at {STATE_HUB_DIR}. Run 'make install' in the state-hub directory.") sys.exit(1) cmd = [str(python), str(ingest_script), "--repo", repo_slug, "--scan", "--repo-path", str(project_path)] if args.dry_run: cmd.append("--dry-run") result = subprocess.run(cmd) sys.exit(result.returncode) def cmd_create_workstream(args: argparse.Namespace) -> None: """Create a workstream under a domain's topic.""" _api_get("/state/health") # Resolve topic_id from domain topics = _api_get("/topics/?status=active") match = next((t for t in topics if t.get("domain_slug") == args.domain), None) if not match: print(f"ERROR: No active topic for domain '{args.domain}'.") sys.exit(1) topic_id = match["id"] slug = args.slug or re.sub(r"[^a-z0-9]+", "-", args.title.lower()).strip("-") ws = _api_post("/workstreams/", { "topic_id": topic_id, "title": args.title, "slug": slug, "description": args.description, "owner": args.owner, "status": "active", }) _api_post("/progress/", { "topic_id": topic_id, "workstream_id": ws["id"], "event_type": "workstream_created", "summary": f"Workstream created: {args.title}", "author": "custodian", "detail": {"owner": args.owner, "slug": slug}, }) print(f"Created workstream: {ws['title']}") print(f" id: {ws['id']}") print(f" slug: {ws['slug']}") print(f" domain: {args.domain}") print(f" owner: {ws.get('owner') or '—'}") def cmd_create_task(args: argparse.Namespace) -> None: """Create a task under a workstream (by ID or slug).""" _api_get("/state/health") # Resolve workstream: accept UUID or slug workstream_id = args.workstream if not _is_uuid(workstream_id): wss = _api_get("/workstreams/") match = next((w for w in wss if w.get("slug") == workstream_id), None) if not match: print(f"ERROR: No workstream found with slug '{workstream_id}'.") print(" Use 'custodian status' or check the dashboard for valid slugs.") sys.exit(1) workstream_id = match["id"] task = _api_post("/tasks/", { "workstream_id": workstream_id, "title": args.title, "priority": args.priority, "description": args.description, "assignee": args.assignee, }) _api_post("/progress/", { "workstream_id": workstream_id, "task_id": task["id"], "event_type": "task_created", "summary": f"Task created: {args.title}", "author": "custodian", "detail": {"priority": args.priority}, }) print(f"Created task: {task['title']}") print(f" id: {task['id']}") print(f" priority: {task['priority']}") print(f" status: {task['status']}") def _is_uuid(s: str) -> bool: import uuid as _uuid try: _uuid.UUID(s) return True except ValueError: return False def cmd_status(_args: argparse.Namespace) -> None: """Quick status: API health + summary totals.""" health = _api_get("/state/health") print(f"API: {health.get('status', '?')} DB: {health.get('db', '?')}") summary = _api_get("/state/summary") t = summary["totals"] print(f"Topics: {t['topics']['active']} active") print(f"Workstreams: {t['workstreams']['active']} active, {t['workstreams']['blocked']} blocked") print(f"Tasks: {t['tasks']['in_progress']} in-progress, {t['tasks']['todo']} todo, {t['tasks']['blocked']} blocked") print(f"Decisions: {t['decisions']['open']} open, {t['decisions']['escalated']} escalated") blocking = summary.get("blocking_decisions", []) if blocking: print(f"\nBlocking decisions ({len(blocking)}):") for d in blocking: deadline = d.get("deadline") or "no deadline" print(f" [{deadline}] {d['title']}") # ── Entry point ──────────────────────────────────────────────────────────────── def main() -> None: parser = argparse.ArgumentParser( prog=Path(sys.argv[0]).name, description="Custodian State Hub CLI", ) sub = parser.add_subparsers(dest="command", required=True) # register statehub_reg = sub.add_parser( "register", help="Register the current repo with State Hub and prime it for Codex", ) statehub_reg.add_argument("--path", default=os.getcwd(), help="Repo directory (defaults to cwd)") statehub_reg.add_argument("--domain", default=None, help="State Hub domain slug") statehub_reg.add_argument("--repo-slug", default=None, help="Repo slug (auto-detected if omitted)") statehub_reg.add_argument("--wp-prefix", default=None, help="Workplan prefix, e.g. STATE-WP") statehub_reg.add_argument("--description", default=None, help="One-sentence repo description") statehub_reg.add_argument( "--intent", default=None, help="Repo intent text to use when INTENT.md is absent and inference is insufficient", ) statehub_reg.add_argument("--api-base", default=API_BASE, help="State Hub API base URL") statehub_reg.add_argument( "--llm-provider", default=os.environ.get("STATEHUB_REGISTER_LLM_PROVIDER", "claude-code"), help="llm-connect provider: claude-code, openrouter, openai, gemini, or mock", ) statehub_reg.add_argument( "--llm-model", default=os.environ.get("STATEHUB_REGISTER_LLM_MODEL"), help="Model name passed to llm-connect", ) statehub_reg.add_argument( "--llm-api-key", default=os.environ.get("STATEHUB_REGISTER_LLM_API_KEY"), help="API key for API-backed llm-connect providers", ) statehub_reg.add_argument( "--llm-timeout", type=int, default=int(os.environ.get("STATEHUB_REGISTER_LLM_TIMEOUT", "120")), help="LLM timeout in seconds", ) statehub_reg.add_argument("--no-llm", action="store_true", help="Skip LLM inference and use files/prompts") statehub_reg.add_argument("--force", action="store_true", help="Overwrite generated repo files") # register-project reg = sub.add_parser("register-project", help="Register a project with the State Hub") reg.add_argument( "--domain", default=None, help="Project domain slug (auto-detected from charter if omitted)", ) reg.add_argument( "--path", default=os.getcwd(), help="Project directory (defaults to current directory)", ) # ingest-sbom ing = sub.add_parser("ingest-sbom", help="Ingest SBOM for the repo at the current directory") ing.add_argument("--path", default=os.getcwd(), help="Repo directory (defaults to cwd)") ing.add_argument("--slug", default=None, help="Repo slug (auto-detected from path if omitted)") ing.add_argument("--dry-run", action="store_true", help="Parse lockfiles but do not submit to API") # create-workstream cws = sub.add_parser("create-workstream", help="Create a workstream under a domain topic") cws.add_argument("--domain", required=True, help="Domain slug to create the workstream under") cws.add_argument("--title", required=True, help="Workstream title") cws.add_argument("--slug", default=None, help="URL slug (auto-generated from title if omitted)") cws.add_argument("--owner", default=None, help="Owner name") cws.add_argument("--description", default=None, help="Optional description") # create-task ctask = sub.add_parser("create-task", help="Create a task under a workstream") ctask.add_argument("--workstream", required=True, metavar="ID_OR_SLUG", help="Workstream UUID or slug") ctask.add_argument("--title", required=True, help="Task title") ctask.add_argument("--priority", choices=["low", "medium", "high", "critical"], default="medium") ctask.add_argument("--assignee", default=None) ctask.add_argument("--description", default=None) # status sub.add_parser("status", help="Show State Hub health and summary totals") args = parser.parse_args() if args.command == "register": run_statehub_register(args) elif args.command == "register-project": cmd_register(args) elif args.command == "ingest-sbom": cmd_ingest_sbom(args) elif args.command == "create-workstream": cmd_create_workstream(args) elif args.command == "create-task": cmd_create_task(args) elif args.command == "status": cmd_status(args) if __name__ == "__main__": main()