Implements the first live layer of the Custodian cognitive infrastructure: PostgreSQL schema, FastAPI REST API, FastMCP stdio server, and Observable Framework telemetry dashboard. - state-hub/: full stack (docker-compose, FastAPI, Alembic, MCP server, dashboard) - 5 DB tables: topics, workstreams, tasks, decisions, progress_events - 11 MCP tools + 5 resources registered in .mcp.json - Observable dashboard: Overview, Workstreams, Decisions, Progress pages - CLAUDE.md: session protocol (get_state_summary / add_progress_event ritual) - ~/.claude/CLAUDE.md: global cross-project reference to the hub - scripts/pull_image.py: WSL2 TLS-resilient Docker image downloader Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
361 lines
11 KiB
Python
361 lines
11 KiB
Python
"""Custodian State Hub MCP Server (stdio).
|
|
|
|
Thin HTTP client over the FastAPI service — no direct DB access.
|
|
All business logic stays in the API; this layer is stateless.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
from datetime import datetime
|
|
from typing import Any
|
|
from uuid import UUID
|
|
|
|
import httpx
|
|
from fastmcp import FastMCP
|
|
|
|
API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000").rstrip("/")
|
|
|
|
mcp = FastMCP(
|
|
name="state-hub",
|
|
instructions=(
|
|
"Custodian State Hub: tracks topics, workstreams, tasks, decisions, and progress events. "
|
|
"Start every session with get_state_summary() for orientation. "
|
|
"All writes emit a progress_event automatically."
|
|
),
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# HTTP helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _client() -> httpx.Client:
|
|
return httpx.Client(base_url=API_BASE, timeout=30.0)
|
|
|
|
|
|
def _get(path: str, params: dict | None = None) -> Any:
|
|
with _client() as c:
|
|
r = c.get(path, params={k: v for k, v in (params or {}).items() if v is not None})
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
|
|
def _post(path: str, body: dict) -> Any:
|
|
if not path.endswith("/"):
|
|
path = path + "/"
|
|
with _client() as c:
|
|
r = c.post(path, json={k: v for k, v in body.items() if v is not None})
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
|
|
def _patch(path: str, body: dict) -> Any:
|
|
with _client() as c:
|
|
r = c.patch(path, json={k: v for k, v in body.items() if v is not None})
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Resources
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@mcp.resource("state://summary")
|
|
def resource_summary() -> str:
|
|
"""Full StateSummary JSON — primary orientation resource."""
|
|
return json.dumps(_get("/state/summary"), indent=2)
|
|
|
|
|
|
@mcp.resource("state://topics")
|
|
def resource_topics() -> str:
|
|
"""Active topics list."""
|
|
return json.dumps(_get("/topics", {"status": "active"}), indent=2)
|
|
|
|
|
|
@mcp.resource("state://workstreams/{topic_slug}")
|
|
def resource_workstreams(topic_slug: str) -> str:
|
|
"""Workstreams for a topic (by slug)."""
|
|
topics = _get("/topics", {"status": "active"})
|
|
match = next((t for t in topics if t["slug"] == topic_slug), None)
|
|
if not match:
|
|
return json.dumps({"error": f"Topic '{topic_slug}' not found"})
|
|
return json.dumps(_get("/workstreams", {"topic_id": match["id"]}), indent=2)
|
|
|
|
|
|
@mcp.resource("state://decisions/blocking")
|
|
def resource_blocking_decisions() -> str:
|
|
"""All pending/escalated decisions."""
|
|
return json.dumps(
|
|
_get("/decisions", {"decision_type": "pending", "status": "open"}),
|
|
indent=2,
|
|
)
|
|
|
|
|
|
@mcp.resource("state://tasks/blocked")
|
|
def resource_blocked_tasks() -> str:
|
|
"""All tasks with status=blocked."""
|
|
return json.dumps(_get("/tasks", {"status": "blocked"}), indent=2)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Query tools
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@mcp.tool()
|
|
def get_state_summary() -> str:
|
|
"""Primary orientation tool. Call at the start of every session.
|
|
|
|
Returns a full snapshot: topic/workstream/task/decision totals, blocking
|
|
decisions, blocked tasks, open workstreams, and the 20 most recent events.
|
|
"""
|
|
return json.dumps(_get("/state/summary"), indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def get_topic(slug: str) -> str:
|
|
"""Return a topic (with workstreams) by slug, plus its recent progress events."""
|
|
topics = _get("/topics")
|
|
match = next((t for t in topics if t["slug"] == slug), None)
|
|
if not match:
|
|
return json.dumps({"error": f"Topic '{slug}' not found"})
|
|
topic_detail = _get(f"/topics/{match['id']}")
|
|
recent = _get("/progress", {"topic_id": match["id"], "limit": 10})
|
|
return json.dumps({"topic": topic_detail, "recent_progress": recent}, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def list_blocked_tasks(workstream_id: str | None = None) -> str:
|
|
"""List all tasks with status=blocked, optionally filtered by workstream_id."""
|
|
return json.dumps(_get("/tasks", {"status": "blocked", "workstream_id": workstream_id}), indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def list_pending_decisions(topic_id: str | None = None) -> str:
|
|
"""List pending decisions sorted by deadline (nulls last).
|
|
|
|
Optionally filter by topic_id. Escalated decisions are included and
|
|
highlighted by their escalation_note.
|
|
"""
|
|
results = _get("/decisions", {"decision_type": "pending", "topic_id": topic_id})
|
|
return json.dumps(results, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def get_recent_progress(limit: int = 20, since: str | None = None) -> str:
|
|
"""Retrieve recent progress events to reconstruct session history.
|
|
|
|
Args:
|
|
limit: max events to return (default 20)
|
|
since: ISO datetime string — only events after this timestamp
|
|
"""
|
|
return json.dumps(_get("/progress", {"limit": limit, "since": since}), indent=2)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Mutate tools
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@mcp.tool()
|
|
def create_task(
|
|
workstream_id: str,
|
|
title: str,
|
|
priority: str = "medium",
|
|
description: str | None = None,
|
|
assignee: str | None = None,
|
|
due_date: str | None = None,
|
|
) -> str:
|
|
"""Create a new task and emit a progress_event.
|
|
|
|
Args:
|
|
workstream_id: UUID of the parent workstream
|
|
title: task title
|
|
priority: low | medium | high | critical
|
|
description: optional longer description
|
|
assignee: optional assignee name
|
|
due_date: optional ISO date string (YYYY-MM-DD)
|
|
"""
|
|
task = _post("/tasks", {
|
|
"workstream_id": workstream_id,
|
|
"title": title,
|
|
"priority": priority,
|
|
"description": description,
|
|
"assignee": assignee,
|
|
"due_date": due_date,
|
|
})
|
|
_post("/progress", {
|
|
"workstream_id": workstream_id,
|
|
"task_id": task["id"],
|
|
"event_type": "task_created",
|
|
"summary": f"Task created: {title}",
|
|
"author": "custodian",
|
|
"detail": {"priority": priority, "assignee": assignee},
|
|
})
|
|
return json.dumps(task, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def update_task_status(
|
|
task_id: str,
|
|
status: str,
|
|
blocking_reason: str | None = None,
|
|
) -> str:
|
|
"""Update a task's status. blocking_reason is required when status='blocked'.
|
|
|
|
Args:
|
|
task_id: UUID of the task
|
|
status: todo | in_progress | blocked | done | cancelled
|
|
blocking_reason: required when status=blocked
|
|
"""
|
|
body: dict[str, Any] = {"status": status}
|
|
if blocking_reason:
|
|
body["blocking_reason"] = blocking_reason
|
|
task = _patch(f"/tasks/{task_id}", body)
|
|
_post("/progress", {
|
|
"task_id": task_id,
|
|
"workstream_id": task.get("workstream_id"),
|
|
"event_type": "task_status_changed",
|
|
"summary": f"Task status → {status}: {task['title']}",
|
|
"author": "custodian",
|
|
"detail": {"blocking_reason": blocking_reason},
|
|
})
|
|
return json.dumps(task, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def record_decision(
|
|
title: str,
|
|
decision_type: str = "pending",
|
|
topic_id: str | None = None,
|
|
workstream_id: str | None = None,
|
|
description: str | None = None,
|
|
rationale: str | None = None,
|
|
decided_by: str | None = None,
|
|
deadline: str | None = None,
|
|
) -> str:
|
|
"""Record a decision (made or pending).
|
|
|
|
Pending decisions touching financial/legal topics are auto-escalated per
|
|
constitution §4.
|
|
|
|
Args:
|
|
title: decision title
|
|
decision_type: made | pending
|
|
topic_id: optional topic UUID
|
|
workstream_id: optional workstream UUID (at least one required)
|
|
description: optional context
|
|
rationale: reasoning behind the decision
|
|
decided_by: person/agent who decided
|
|
deadline: ISO datetime string for when decision is needed
|
|
"""
|
|
decision = _post("/decisions", {
|
|
"title": title,
|
|
"decision_type": decision_type,
|
|
"topic_id": topic_id,
|
|
"workstream_id": workstream_id,
|
|
"description": description,
|
|
"rationale": rationale,
|
|
"decided_by": decided_by,
|
|
"deadline": deadline,
|
|
})
|
|
_post("/progress", {
|
|
"topic_id": topic_id,
|
|
"workstream_id": workstream_id,
|
|
"decision_id": decision["id"],
|
|
"event_type": "decision_recorded",
|
|
"summary": f"Decision recorded ({decision_type}): {title}",
|
|
"author": "custodian",
|
|
"detail": {"status": decision.get("status"), "escalation_note": decision.get("escalation_note")},
|
|
})
|
|
return json.dumps(decision, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def resolve_decision(
|
|
decision_id: str,
|
|
rationale: str,
|
|
decided_by: str,
|
|
) -> str:
|
|
"""Mark a decision as resolved.
|
|
|
|
Args:
|
|
decision_id: UUID of the decision
|
|
rationale: final reasoning/outcome
|
|
decided_by: who resolved it
|
|
"""
|
|
decision = _patch(f"/decisions/{decision_id}", {
|
|
"status": "resolved",
|
|
"decision_type": "made",
|
|
"rationale": rationale,
|
|
"decided_by": decided_by,
|
|
"decided_at": datetime.utcnow().isoformat() + "Z",
|
|
})
|
|
_post("/progress", {
|
|
"topic_id": decision.get("topic_id"),
|
|
"workstream_id": decision.get("workstream_id"),
|
|
"decision_id": decision_id,
|
|
"event_type": "decision_resolved",
|
|
"summary": f"Decision resolved by {decided_by}: {decision['title']}",
|
|
"author": "custodian",
|
|
"detail": {"rationale": rationale},
|
|
})
|
|
return json.dumps(decision, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def add_progress_event(
|
|
summary: str,
|
|
event_type: str = "note",
|
|
topic_id: str | None = None,
|
|
workstream_id: str | None = None,
|
|
task_id: str | None = None,
|
|
detail: dict | None = None,
|
|
) -> str:
|
|
"""Append a progress event to the log.
|
|
|
|
Args:
|
|
summary: human-readable summary of what happened
|
|
event_type: free-form label, e.g. note | milestone | blocker | insight
|
|
topic_id: optional topic UUID
|
|
workstream_id: optional workstream UUID
|
|
task_id: optional task UUID
|
|
detail: optional structured data (JSONB)
|
|
"""
|
|
event = _post("/progress", {
|
|
"topic_id": topic_id,
|
|
"workstream_id": workstream_id,
|
|
"task_id": task_id,
|
|
"event_type": event_type,
|
|
"summary": summary,
|
|
"author": "custodian",
|
|
"detail": detail,
|
|
})
|
|
return json.dumps(event, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def update_workstream_status(workstream_id: str, status: str) -> str:
|
|
"""Update a workstream's status.
|
|
|
|
Args:
|
|
workstream_id: UUID of the workstream
|
|
status: active | blocked | completed | archived
|
|
"""
|
|
ws = _patch(f"/workstreams/{workstream_id}", {"status": status})
|
|
_post("/progress", {
|
|
"workstream_id": workstream_id,
|
|
"topic_id": ws.get("topic_id"),
|
|
"event_type": "workstream_status_changed",
|
|
"summary": f"Workstream status → {status}: {ws['title']}",
|
|
"author": "custodian",
|
|
})
|
|
return json.dumps(ws, indent=2)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
if __name__ == "__main__":
|
|
mcp.run(transport="stdio")
|