Files
state-hub/custodian_cli.py
tegwick ebe7369249 Add create-workstream: MCP tool, CLI commands, dashboard hint
MCP server: add create_workstream(topic_id, title, slug?, owner?,
  description?, due_date?) — auto-generates slug from title if omitted;
  emits workstream_created progress event. Now 12 tools total.

CLI: add two new subcommands —
  custodian create-workstream --domain DOMAIN --title TITLE [--slug] [--owner] [--description]
  custodian create-task --workstream ID_OR_SLUG --title TITLE [--priority] [--assignee]
  create-task accepts workstream UUID or slug (resolves via API).

Dashboard: hint box below "Open Workstreams by Domain" chart listing
  registered domains that have zero workstreams, with the exact
  custodian create-workstream command to run.

TOOLS.md: updated tool count (11 → 12) and added create_workstream row.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-24 23:35:54 +01:00

320 lines
12 KiB
Python

#!/usr/bin/env python3
"""
custodian — CLI for the Custodian State Hub.
Usage:
custodian register-project [--domain DOMAIN] [--path PATH]
Run from inside the project directory you want to connect.
--domain defaults to auto-detection from the project charter.
--path defaults to current working directory.
"""
from __future__ import annotations
import argparse
import json
import os
import re
import subprocess
import sys
import urllib.error
import urllib.request
from pathlib import Path
STATE_HUB_DIR = Path(__file__).resolve().parent
API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000")
TEMPLATE = STATE_HUB_DIR / "scripts" / "project_claude_md.template"
PATCH_CWD = STATE_HUB_DIR / "scripts" / "patch_mcp_cwd.py"
VALID_DOMAINS = [
"custodian", "railiance", "markitect",
"coulomb_social", "personhood", "foerster_capabilities",
]
# ── Helpers ────────────────────────────────────────────────────────────────────
def _api_get(path: str) -> object:
url = API_BASE.rstrip("/") + path
try:
with urllib.request.urlopen(url, timeout=10) as r:
return json.loads(r.read())
except urllib.error.URLError as e:
print(f"ERROR: Cannot reach API at {API_BASE}: {e}")
print(f" Start it: cd {STATE_HUB_DIR} && make api")
sys.exit(1)
def _api_post(path: str, body: dict) -> object:
url = API_BASE.rstrip("/") + path
data = json.dumps({k: v for k, v in body.items() if v is not None}).encode()
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
with urllib.request.urlopen(req, timeout=10) as r:
return json.loads(r.read())
def _detect_domain(project_path: Path) -> str | None:
"""Try to read domain from project charter frontmatter."""
for charter in project_path.rglob("project_charter_v*.md"):
text = charter.read_text()
m = re.search(r"^domain:\s*(\S+)", text, re.MULTILINE)
if m:
return m.group(1).strip('"\'')
return None
def _check_mcp() -> bool:
claude_json = Path.home() / ".claude.json"
if not claude_json.exists():
return False
config = json.loads(claude_json.read_text())
return "state-hub" in config.get("mcpServers", {})
# ── Subcommands ────────────────────────────────────────────────────────────────
def cmd_register(args: argparse.Namespace) -> None:
project_path = Path(args.path).resolve()
if not project_path.is_dir():
print(f"ERROR: {project_path} is not a directory.")
sys.exit(1)
project_name = project_path.name
# ── Step 1: API health ─────────────────────────────────────────────────────
print(f"==> Checking API at {API_BASE} ...")
_api_get("/state/health")
print(" API OK")
# ── Step 2: Domain ─────────────────────────────────────────────────────────
domain = args.domain
if not domain:
print("==> Auto-detecting domain from project charter ...")
domain = _detect_domain(project_path)
if domain:
print(f" Detected: {domain}")
else:
print(f"ERROR: Could not auto-detect domain. Pass --domain explicitly.")
print(f" Valid: {', '.join(VALID_DOMAINS)}")
sys.exit(1)
if domain not in VALID_DOMAINS:
print(f"ERROR: Unknown domain '{domain}'. Valid: {', '.join(VALID_DOMAINS)}")
sys.exit(1)
# ── Step 3: Topic ID lookup ────────────────────────────────────────────────
print(f"==> Looking up topic for domain '{domain}' ...")
topics = _api_get("/topics/?status=active")
match = next((t for t in topics if t.get("domain") == domain), None)
if not match:
print(f"ERROR: No active topic found for domain '{domain}'.")
sys.exit(1)
topic_id = match["id"]
print(f" topic_id: {topic_id}")
# ── Step 4: MCP check ──────────────────────────────────────────────────────
print("==> Checking MCP server registration ...")
if _check_mcp():
print(" MCP OK")
else:
print("WARNING: 'state-hub' not in ~/.claude.json.")
print(f" See ~/.claude/CLAUDE.md → MCP Server Registration section.")
# ── Step 5: CLAUDE.md ──────────────────────────────────────────────────────
claude_md = project_path / "CLAUDE.md"
if claude_md.exists():
print(f"==> CLAUDE.md already exists at {claude_md} — skipping.")
else:
print(f"==> Writing CLAUDE.md to {claude_md} ...")
content = TEMPLATE.read_text()
content = content.replace("{PROJECT_NAME}", project_name)
content = content.replace("{DOMAIN}", domain)
content = content.replace("{TOPIC_ID}", topic_id)
claude_md.write_text(content)
print(" Written.")
# ── Step 6: Progress event ─────────────────────────────────────────────────
print("==> Recording registration event ...")
try:
_api_post("/progress/", {
"topic_id": topic_id,
"event_type": "milestone",
"summary": f"Project registered with State Hub: {project_name} ({domain})",
"author": "custodian",
"detail": {
"project_path": str(project_path),
"claude_md": str(claude_md),
"domain": domain,
},
})
print(" Event recorded.")
except Exception as e:
print(f" WARNING: Could not record progress event: {e}")
print()
print("Registration complete!")
print(f" Project: {project_name}")
print(f" Domain: {domain}")
print(f" Topic ID: {topic_id}")
print(f" CLAUDE.md: {claude_md}")
print()
print("Next: restart Claude Code for the MCP server to be active in this project.")
def cmd_create_workstream(args: argparse.Namespace) -> None:
"""Create a workstream under a domain's topic."""
_api_get("/state/health")
# Resolve topic_id from domain
topics = _api_get("/topics/?status=active")
match = next((t for t in topics if t.get("domain") == args.domain), None)
if not match:
print(f"ERROR: No active topic for domain '{args.domain}'.")
sys.exit(1)
topic_id = match["id"]
slug = args.slug or re.sub(r"[^a-z0-9]+", "-", args.title.lower()).strip("-")
ws = _api_post("/workstreams/", {
"topic_id": topic_id,
"title": args.title,
"slug": slug,
"description": args.description,
"owner": args.owner,
"status": "active",
})
_api_post("/progress/", {
"topic_id": topic_id,
"workstream_id": ws["id"],
"event_type": "workstream_created",
"summary": f"Workstream created: {args.title}",
"author": "custodian",
"detail": {"owner": args.owner, "slug": slug},
})
print(f"Created workstream: {ws['title']}")
print(f" id: {ws['id']}")
print(f" slug: {ws['slug']}")
print(f" domain: {args.domain}")
print(f" owner: {ws.get('owner') or ''}")
def cmd_create_task(args: argparse.Namespace) -> None:
"""Create a task under a workstream (by ID or slug)."""
_api_get("/state/health")
# Resolve workstream: accept UUID or slug
workstream_id = args.workstream
if not _is_uuid(workstream_id):
wss = _api_get("/workstreams/")
match = next((w for w in wss if w.get("slug") == workstream_id), None)
if not match:
print(f"ERROR: No workstream found with slug '{workstream_id}'.")
print(" Use 'custodian status' or check the dashboard for valid slugs.")
sys.exit(1)
workstream_id = match["id"]
task = _api_post("/tasks/", {
"workstream_id": workstream_id,
"title": args.title,
"priority": args.priority,
"description": args.description,
"assignee": args.assignee,
})
_api_post("/progress/", {
"workstream_id": workstream_id,
"task_id": task["id"],
"event_type": "task_created",
"summary": f"Task created: {args.title}",
"author": "custodian",
"detail": {"priority": args.priority},
})
print(f"Created task: {task['title']}")
print(f" id: {task['id']}")
print(f" priority: {task['priority']}")
print(f" status: {task['status']}")
def _is_uuid(s: str) -> bool:
import uuid as _uuid
try:
_uuid.UUID(s)
return True
except ValueError:
return False
def cmd_status(_args: argparse.Namespace) -> None:
"""Quick status: API health + summary totals."""
health = _api_get("/state/health")
print(f"API: {health.get('status', '?')} DB: {health.get('db', '?')}")
summary = _api_get("/state/summary")
t = summary["totals"]
print(f"Topics: {t['topics']['active']} active")
print(f"Workstreams: {t['workstreams']['active']} active, {t['workstreams']['blocked']} blocked")
print(f"Tasks: {t['tasks']['in_progress']} in-progress, {t['tasks']['todo']} todo, {t['tasks']['blocked']} blocked")
print(f"Decisions: {t['decisions']['open']} open, {t['decisions']['escalated']} escalated")
blocking = summary.get("blocking_decisions", [])
if blocking:
print(f"\nBlocking decisions ({len(blocking)}):")
for d in blocking:
deadline = d.get("deadline") or "no deadline"
print(f" [{deadline}] {d['title']}")
# ── Entry point ────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(
prog="custodian",
description="Custodian State Hub CLI",
)
sub = parser.add_subparsers(dest="command", required=True)
# register-project
reg = sub.add_parser("register-project", help="Register a project with the State Hub")
reg.add_argument(
"--domain",
choices=VALID_DOMAINS,
default=None,
help="Project domain (auto-detected from charter if omitted)",
)
reg.add_argument(
"--path",
default=os.getcwd(),
help="Project directory (defaults to current directory)",
)
# create-workstream
cws = sub.add_parser("create-workstream", help="Create a workstream under a domain topic")
cws.add_argument("--domain", choices=VALID_DOMAINS, required=True, help="Domain to create the workstream under")
cws.add_argument("--title", required=True, help="Workstream title")
cws.add_argument("--slug", default=None, help="URL slug (auto-generated from title if omitted)")
cws.add_argument("--owner", default=None, help="Owner name")
cws.add_argument("--description", default=None, help="Optional description")
# create-task
ctask = sub.add_parser("create-task", help="Create a task under a workstream")
ctask.add_argument("--workstream", required=True, metavar="ID_OR_SLUG", help="Workstream UUID or slug")
ctask.add_argument("--title", required=True, help="Task title")
ctask.add_argument("--priority", choices=["low", "medium", "high", "critical"], default="medium")
ctask.add_argument("--assignee", default=None)
ctask.add_argument("--description", default=None)
# status
sub.add_parser("status", help="Show State Hub health and summary totals")
args = parser.parse_args()
if args.command == "register-project":
cmd_register(args)
elif args.command == "create-workstream":
cmd_create_workstream(args)
elif args.command == "create-task":
cmd_create_task(args)
elif args.command == "status":
cmd_status(args)
if __name__ == "__main__":
main()