#!/usr/bin/env python3 """ custodian — CLI for the Custodian State Hub. Usage: custodian register-project [--domain DOMAIN] [--path PATH] Run from inside the project directory you want to connect. --domain defaults to auto-detection from the project charter. --path defaults to current working directory. """ from __future__ import annotations import argparse import json import os import re import subprocess import sys import urllib.error import urllib.request from pathlib import Path STATE_HUB_DIR = Path(__file__).resolve().parent API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000") TEMPLATE = STATE_HUB_DIR / "scripts" / "project_claude_md.template" PATCH_CWD = STATE_HUB_DIR / "scripts" / "patch_mcp_cwd.py" # ── Helpers ──────────────────────────────────────────────────────────────────── def _api_get(path: str) -> object: url = API_BASE.rstrip("/") + path try: with urllib.request.urlopen(url, timeout=10) as r: return json.loads(r.read()) except urllib.error.URLError as e: print(f"ERROR: Cannot reach API at {API_BASE}: {e}") print(f" Start it: cd {STATE_HUB_DIR} && make api") sys.exit(1) def _api_post(path: str, body: dict) -> object: url = API_BASE.rstrip("/") + path data = json.dumps({k: v for k, v in body.items() if v is not None}).encode() req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}) with urllib.request.urlopen(req, timeout=10) as r: return json.loads(r.read()) def _detect_domain(project_path: Path) -> str | None: """Try to read domain from project charter frontmatter.""" for charter in project_path.rglob("project_charter_v*.md"): text = charter.read_text() m = re.search(r"^domain:\s*(\S+)", text, re.MULTILINE) if m: return m.group(1).strip('"\'') return None def _check_mcp() -> bool: claude_json = Path.home() / ".claude.json" if not claude_json.exists(): return False config = json.loads(claude_json.read_text()) return "state-hub" in config.get("mcpServers", {}) # ── Subcommands ──────────────────────────────────────────────────────────────── def cmd_register(args: argparse.Namespace) -> None: project_path = Path(args.path).resolve() if not project_path.is_dir(): print(f"ERROR: {project_path} is not a directory.") sys.exit(1) project_name = project_path.name # ── Step 1: API health ───────────────────────────────────────────────────── print(f"==> Checking API at {API_BASE} ...") _api_get("/state/health") print(" API OK") # ── Step 2: Domain ───────────────────────────────────────────────────────── domain = args.domain valid_domains = [d["slug"] for d in _api_get("/domains/?status=active")] if not domain: print("==> Auto-detecting domain from project charter ...") domain = _detect_domain(project_path) if domain: print(f" Detected: {domain}") else: print(f"ERROR: Could not auto-detect domain. Pass --domain explicitly.") print(f" Valid: {', '.join(valid_domains)}") sys.exit(1) if domain not in valid_domains: print(f"ERROR: Unknown domain '{domain}'. Valid: {', '.join(valid_domains)}") sys.exit(1) # ── Step 3: Topic ID lookup (auto-create if new domain) ─────────────────── print(f"==> Looking up topic for domain '{domain}' ...") topics = _api_get("/topics/?status=active") match = next((t for t in topics if t.get("domain_slug") == domain), None) if not match: print(f" No topic found — creating one for domain '{domain}' ...") slug = re.sub(r"[^a-z0-9]+", "-", domain.lower()).strip("-") try: match = _api_post("/topics/", { "slug": slug, "title": project_name, "domain": domain, "status": "active", }) print(f" Topic created: {match['title']} ({match['id']})") except Exception as e: print(f"ERROR: Could not create topic for domain '{domain}': {e}") sys.exit(1) topic_id = match["id"] print(f" topic_id: {topic_id}") # ── Step 4: MCP check ────────────────────────────────────────────────────── print("==> Checking MCP server registration ...") if _check_mcp(): print(" MCP OK") else: print("WARNING: 'state-hub' not in ~/.claude.json.") print(f" See ~/.claude/CLAUDE.md → MCP Server Registration section.") # ── Step 5: CLAUDE.md ────────────────────────────────────────────────────── claude_md = project_path / "CLAUDE.md" if claude_md.exists(): print(f"==> CLAUDE.md already exists at {claude_md} — skipping.") else: print(f"==> Writing CLAUDE.md to {claude_md} ...") content = TEMPLATE.read_text() content = content.replace("{PROJECT_NAME}", project_name) content = content.replace("{DOMAIN}", domain) content = content.replace("{TOPIC_ID}", topic_id) claude_md.write_text(content) print(" Written.") # ── Step 6: Progress event ───────────────────────────────────────────────── print("==> Recording registration event ...") try: _api_post("/progress/", { "topic_id": topic_id, "event_type": "milestone", "summary": f"Project registered with State Hub: {project_name} ({domain})", "author": "custodian", "detail": { "project_path": str(project_path), "claude_md": str(claude_md), "domain": domain, }, }) print(" Event recorded.") except Exception as e: print(f" WARNING: Could not record progress event: {e}") print() print("Registration complete!") print(f" Project: {project_name}") print(f" Domain: {domain}") print(f" Topic ID: {topic_id}") print(f" CLAUDE.md: {claude_md}") print() print("Next: restart Claude Code for the MCP server to be active in this project.") def cmd_create_workstream(args: argparse.Namespace) -> None: """Create a workstream under a domain's topic.""" _api_get("/state/health") # Resolve topic_id from domain topics = _api_get("/topics/?status=active") match = next((t for t in topics if t.get("domain_slug") == args.domain), None) if not match: print(f"ERROR: No active topic for domain '{args.domain}'.") sys.exit(1) topic_id = match["id"] slug = args.slug or re.sub(r"[^a-z0-9]+", "-", args.title.lower()).strip("-") ws = _api_post("/workstreams/", { "topic_id": topic_id, "title": args.title, "slug": slug, "description": args.description, "owner": args.owner, "status": "active", }) _api_post("/progress/", { "topic_id": topic_id, "workstream_id": ws["id"], "event_type": "workstream_created", "summary": f"Workstream created: {args.title}", "author": "custodian", "detail": {"owner": args.owner, "slug": slug}, }) print(f"Created workstream: {ws['title']}") print(f" id: {ws['id']}") print(f" slug: {ws['slug']}") print(f" domain: {args.domain}") print(f" owner: {ws.get('owner') or '—'}") def cmd_create_task(args: argparse.Namespace) -> None: """Create a task under a workstream (by ID or slug).""" _api_get("/state/health") # Resolve workstream: accept UUID or slug workstream_id = args.workstream if not _is_uuid(workstream_id): wss = _api_get("/workstreams/") match = next((w for w in wss if w.get("slug") == workstream_id), None) if not match: print(f"ERROR: No workstream found with slug '{workstream_id}'.") print(" Use 'custodian status' or check the dashboard for valid slugs.") sys.exit(1) workstream_id = match["id"] task = _api_post("/tasks/", { "workstream_id": workstream_id, "title": args.title, "priority": args.priority, "description": args.description, "assignee": args.assignee, }) _api_post("/progress/", { "workstream_id": workstream_id, "task_id": task["id"], "event_type": "task_created", "summary": f"Task created: {args.title}", "author": "custodian", "detail": {"priority": args.priority}, }) print(f"Created task: {task['title']}") print(f" id: {task['id']}") print(f" priority: {task['priority']}") print(f" status: {task['status']}") def _is_uuid(s: str) -> bool: import uuid as _uuid try: _uuid.UUID(s) return True except ValueError: return False def cmd_status(_args: argparse.Namespace) -> None: """Quick status: API health + summary totals.""" health = _api_get("/state/health") print(f"API: {health.get('status', '?')} DB: {health.get('db', '?')}") summary = _api_get("/state/summary") t = summary["totals"] print(f"Topics: {t['topics']['active']} active") print(f"Workstreams: {t['workstreams']['active']} active, {t['workstreams']['blocked']} blocked") print(f"Tasks: {t['tasks']['in_progress']} in-progress, {t['tasks']['todo']} todo, {t['tasks']['blocked']} blocked") print(f"Decisions: {t['decisions']['open']} open, {t['decisions']['escalated']} escalated") blocking = summary.get("blocking_decisions", []) if blocking: print(f"\nBlocking decisions ({len(blocking)}):") for d in blocking: deadline = d.get("deadline") or "no deadline" print(f" [{deadline}] {d['title']}") # ── Entry point ──────────────────────────────────────────────────────────────── def main() -> None: parser = argparse.ArgumentParser( prog="custodian", description="Custodian State Hub CLI", ) sub = parser.add_subparsers(dest="command", required=True) # register-project reg = sub.add_parser("register-project", help="Register a project with the State Hub") reg.add_argument( "--domain", default=None, help="Project domain slug (auto-detected from charter if omitted)", ) reg.add_argument( "--path", default=os.getcwd(), help="Project directory (defaults to current directory)", ) # create-workstream cws = sub.add_parser("create-workstream", help="Create a workstream under a domain topic") cws.add_argument("--domain", required=True, help="Domain slug to create the workstream under") cws.add_argument("--title", required=True, help="Workstream title") cws.add_argument("--slug", default=None, help="URL slug (auto-generated from title if omitted)") cws.add_argument("--owner", default=None, help="Owner name") cws.add_argument("--description", default=None, help="Optional description") # create-task ctask = sub.add_parser("create-task", help="Create a task under a workstream") ctask.add_argument("--workstream", required=True, metavar="ID_OR_SLUG", help="Workstream UUID or slug") ctask.add_argument("--title", required=True, help="Task title") ctask.add_argument("--priority", choices=["low", "medium", "high", "critical"], default="medium") ctask.add_argument("--assignee", default=None) ctask.add_argument("--description", default=None) # status sub.add_parser("status", help="Show State Hub health and summary totals") args = parser.parse_args() if args.command == "register-project": cmd_register(args) elif args.command == "create-workstream": cmd_create_workstream(args) elif args.command == "create-task": cmd_create_task(args) elif args.command == "status": cmd_status(args) if __name__ == "__main__": main()