Files
state-hub/mcp_server/server.py
tegwick 090a206f3d feat(state-hub): add Extension Points and Technical Debt tracking
New entity types (DB tables, API routers, Pydantic schemas, Alembic
migration a3f1c2d4e5b6):
- extension_points: ep_id, domain, title, ep_type, status, priority,
  location, description, topic_id, workstream_id
- technical_debt: td_id, domain, title, debt_type, severity, status,
  location, description, topic_id, workstream_id

MCP server: 6 new tools — register_extension_point, list_extension_points,
update_ep_status, register_technical_debt, list_technical_debt,
update_td_status (each write emits a progress_event)

Dashboard: two new pages (extensions.md, techdept.md) with KPI sidebar,
charts, urgent-items section, and filterable card lists. Both added to
nav in observablehq.config.js.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-27 07:29:51 +01:00

638 lines
21 KiB
Python

"""Custodian State Hub MCP Server (stdio).
Thin HTTP client over the FastAPI service — no direct DB access.
All business logic stays in the API; this layer is stateless.
"""
from __future__ import annotations
import json
import os
import re
import sys
from datetime import datetime
from typing import Any
from uuid import UUID
import httpx
from fastmcp import FastMCP
API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000").rstrip("/")
mcp = FastMCP(
name="state-hub",
instructions=(
"Custodian State Hub: tracks topics, workstreams, tasks, decisions, and progress events. "
"Start every session with get_state_summary() for orientation. "
"All writes emit a progress_event automatically."
),
)
# ---------------------------------------------------------------------------
# HTTP helpers
# ---------------------------------------------------------------------------
def _client() -> httpx.Client:
return httpx.Client(base_url=API_BASE, timeout=30.0, follow_redirects=True)
def _get(path: str, params: dict | None = None) -> Any:
if not path.endswith("/"):
path = path + "/"
with _client() as c:
r = c.get(path, params={k: v for k, v in (params or {}).items() if v is not None})
r.raise_for_status()
return r.json()
def _post(path: str, body: dict) -> Any:
if not path.endswith("/"):
path = path + "/"
with _client() as c:
r = c.post(path, json={k: v for k, v in body.items() if v is not None})
r.raise_for_status()
return r.json()
def _patch(path: str, body: dict) -> Any:
if not path.endswith("/"):
path = path + "/"
with _client() as c:
r = c.patch(path, json={k: v for k, v in body.items() if v is not None})
r.raise_for_status()
return r.json()
def _delete(path: str) -> None:
with _client() as c:
r = c.delete(path)
r.raise_for_status()
# ---------------------------------------------------------------------------
# Resources
# ---------------------------------------------------------------------------
@mcp.resource("state://summary")
def resource_summary() -> str:
"""Full StateSummary JSON — primary orientation resource."""
return json.dumps(_get("/state/summary"), indent=2)
@mcp.resource("state://topics")
def resource_topics() -> str:
"""Active topics list."""
return json.dumps(_get("/topics", {"status": "active"}), indent=2)
@mcp.resource("state://workstreams/{topic_slug}")
def resource_workstreams(topic_slug: str) -> str:
"""Workstreams for a topic (by slug)."""
topics = _get("/topics", {"status": "active"})
match = next((t for t in topics if t["slug"] == topic_slug), None)
if not match:
return json.dumps({"error": f"Topic '{topic_slug}' not found"})
return json.dumps(_get("/workstreams", {"topic_id": match["id"]}), indent=2)
@mcp.resource("state://decisions/blocking")
def resource_blocking_decisions() -> str:
"""All pending/escalated decisions."""
return json.dumps(
_get("/decisions", {"decision_type": "pending", "status": "open"}),
indent=2,
)
@mcp.resource("state://tasks/blocked")
def resource_blocked_tasks() -> str:
"""All tasks with status=blocked."""
return json.dumps(_get("/tasks", {"status": "blocked"}), indent=2)
# ---------------------------------------------------------------------------
# Query tools
# ---------------------------------------------------------------------------
@mcp.tool()
def get_state_summary() -> str:
"""Primary orientation tool. Call at the start of every session.
Returns a full snapshot: topic/workstream/task/decision totals, blocking
decisions, blocked tasks, open workstreams, and the 20 most recent events.
"""
return json.dumps(_get("/state/summary"), indent=2)
@mcp.tool()
def get_topic(slug: str) -> str:
"""Return a topic (with workstreams) by slug, plus its recent progress events."""
topics = _get("/topics")
match = next((t for t in topics if t["slug"] == slug), None)
if not match:
return json.dumps({"error": f"Topic '{slug}' not found"})
topic_detail = _get(f"/topics/{match['id']}")
recent = _get("/progress", {"topic_id": match["id"], "limit": 10})
return json.dumps({"topic": topic_detail, "recent_progress": recent}, indent=2)
@mcp.tool()
def list_blocked_tasks(workstream_id: str | None = None) -> str:
"""List all tasks with status=blocked, optionally filtered by workstream_id."""
return json.dumps(_get("/tasks", {"status": "blocked", "workstream_id": workstream_id}), indent=2)
@mcp.tool()
def list_pending_decisions(topic_id: str | None = None) -> str:
"""List pending decisions sorted by deadline (nulls last).
Optionally filter by topic_id. Escalated decisions are included and
highlighted by their escalation_note.
"""
results = _get("/decisions", {"decision_type": "pending", "topic_id": topic_id})
return json.dumps(results, indent=2)
@mcp.tool()
def get_recent_progress(limit: int = 20, since: str | None = None) -> str:
"""Retrieve recent progress events to reconstruct session history.
Args:
limit: max events to return (default 20)
since: ISO datetime string — only events after this timestamp
"""
return json.dumps(_get("/progress", {"limit": limit, "since": since}), indent=2)
# ---------------------------------------------------------------------------
# Mutate tools
# ---------------------------------------------------------------------------
@mcp.tool()
def create_workstream(
topic_id: str,
title: str,
slug: str | None = None,
description: str | None = None,
owner: str | None = None,
due_date: str | None = None,
) -> str:
"""Create a new workstream under a topic and emit a progress_event.
Args:
topic_id: UUID of the parent topic
title: workstream title
slug: URL-friendly identifier (auto-generated from title if omitted)
description: optional longer description
owner: optional owner name
due_date: optional ISO date string (YYYY-MM-DD)
"""
if not slug:
slug = re.sub(r"[^a-z0-9]+", "-", title.lower()).strip("-")
ws = _post("/workstreams", {
"topic_id": topic_id,
"title": title,
"slug": slug,
"description": description,
"owner": owner,
"due_date": due_date,
"status": "active",
})
_post("/progress", {
"topic_id": topic_id,
"workstream_id": ws["id"],
"event_type": "workstream_created",
"summary": f"Workstream created: {title}",
"author": "custodian",
"detail": {"owner": owner, "slug": slug},
})
return json.dumps(ws, indent=2)
@mcp.tool()
def create_task(
workstream_id: str,
title: str,
priority: str = "medium",
description: str | None = None,
assignee: str | None = None,
due_date: str | None = None,
) -> str:
"""Create a new task and emit a progress_event.
Args:
workstream_id: UUID of the parent workstream
title: task title
priority: low | medium | high | critical
description: optional longer description
assignee: optional assignee name
due_date: optional ISO date string (YYYY-MM-DD)
"""
task = _post("/tasks", {
"workstream_id": workstream_id,
"title": title,
"priority": priority,
"description": description,
"assignee": assignee,
"due_date": due_date,
})
_post("/progress", {
"workstream_id": workstream_id,
"task_id": task["id"],
"event_type": "task_created",
"summary": f"Task created: {title}",
"author": "custodian",
"detail": {"priority": priority, "assignee": assignee},
})
return json.dumps(task, indent=2)
@mcp.tool()
def update_task_status(
task_id: str,
status: str,
blocking_reason: str | None = None,
) -> str:
"""Update a task's status. blocking_reason is required when status='blocked'.
Args:
task_id: UUID of the task
status: todo | in_progress | blocked | done | cancelled
blocking_reason: required when status=blocked
"""
body: dict[str, Any] = {"status": status}
if blocking_reason:
body["blocking_reason"] = blocking_reason
task = _patch(f"/tasks/{task_id}", body)
_post("/progress", {
"task_id": task_id,
"workstream_id": task.get("workstream_id"),
"event_type": "task_status_changed",
"summary": f"Task status → {status}: {task['title']}",
"author": "custodian",
"detail": {"blocking_reason": blocking_reason},
})
return json.dumps(task, indent=2)
@mcp.tool()
def record_decision(
title: str,
decision_type: str = "pending",
topic_id: str | None = None,
workstream_id: str | None = None,
description: str | None = None,
rationale: str | None = None,
decided_by: str | None = None,
deadline: str | None = None,
) -> str:
"""Record a decision (made or pending).
Pending decisions touching financial/legal topics are auto-escalated per
constitution §4.
Args:
title: decision title
decision_type: made | pending
topic_id: optional topic UUID
workstream_id: optional workstream UUID (at least one required)
description: optional context
rationale: reasoning behind the decision
decided_by: person/agent who decided
deadline: ISO datetime string for when decision is needed
"""
decision = _post("/decisions", {
"title": title,
"decision_type": decision_type,
"topic_id": topic_id,
"workstream_id": workstream_id,
"description": description,
"rationale": rationale,
"decided_by": decided_by,
"deadline": deadline,
})
_post("/progress", {
"topic_id": topic_id,
"workstream_id": workstream_id,
"decision_id": decision["id"],
"event_type": "decision_recorded",
"summary": f"Decision recorded ({decision_type}): {title}",
"author": "custodian",
"detail": {"status": decision.get("status"), "escalation_note": decision.get("escalation_note")},
})
return json.dumps(decision, indent=2)
@mcp.tool()
def resolve_decision(
decision_id: str,
rationale: str,
decided_by: str,
) -> str:
"""Mark a decision as resolved.
Args:
decision_id: UUID of the decision
rationale: final reasoning/outcome
decided_by: who resolved it
"""
decision = _patch(f"/decisions/{decision_id}", {
"status": "resolved",
"decision_type": "made",
"rationale": rationale,
"decided_by": decided_by,
"decided_at": datetime.utcnow().isoformat() + "Z",
})
_post("/progress", {
"topic_id": decision.get("topic_id"),
"workstream_id": decision.get("workstream_id"),
"decision_id": decision_id,
"event_type": "decision_resolved",
"summary": f"Decision resolved by {decided_by}: {decision['title']}",
"author": "custodian",
"detail": {"rationale": rationale},
})
return json.dumps(decision, indent=2)
@mcp.tool()
def add_progress_event(
summary: str,
event_type: str = "note",
topic_id: str | None = None,
workstream_id: str | None = None,
task_id: str | None = None,
detail: dict | None = None,
) -> str:
"""Append a progress event to the log.
Args:
summary: human-readable summary of what happened
event_type: free-form label, e.g. note | milestone | blocker | insight
topic_id: optional topic UUID
workstream_id: optional workstream UUID
task_id: optional task UUID
detail: optional structured data (JSONB)
"""
event = _post("/progress", {
"topic_id": topic_id,
"workstream_id": workstream_id,
"task_id": task_id,
"event_type": event_type,
"summary": summary,
"author": "custodian",
"detail": detail,
})
return json.dumps(event, indent=2)
@mcp.tool()
def update_workstream_status(workstream_id: str, status: str) -> str:
"""Update a workstream's status.
Args:
workstream_id: UUID of the workstream
status: active | blocked | completed | archived
"""
ws = _patch(f"/workstreams/{workstream_id}", {"status": status})
_post("/progress", {
"workstream_id": workstream_id,
"topic_id": ws.get("topic_id"),
"event_type": "workstream_status_changed",
"summary": f"Workstream status → {status}: {ws['title']}",
"author": "custodian",
})
return json.dumps(ws, indent=2)
# ---------------------------------------------------------------------------
# Next-steps suggestion tool (S2.3) — sanctioned write use case #2
# ---------------------------------------------------------------------------
@mcp.tool()
def get_next_steps() -> str:
"""Surface contextual next-action suggestions derived from hub state.
Returns suggestions based on:
- Recently resolved decisions → first open task in the same workstream
- Workstreams whose every dependency is now completed → first todo task
Each suggestion includes domain, workstream, task, and a plain-language
message. The hub surfaces *what* and *where* — the domain owns *how*.
This is one of the two sanctioned write-side use cases of the State Hub
(the other is resolve_decision). Suggestions are derived, not persisted.
"""
return json.dumps(_get("/state/next_steps"), indent=2)
# ---------------------------------------------------------------------------
# Dependency graph tools (S1.4)
# ---------------------------------------------------------------------------
@mcp.tool()
def create_dependency(
from_workstream_id: str,
to_workstream_id: str,
description: str | None = None,
) -> str:
"""Record that one workstream depends on another.
Semantics: from_workstream cannot fully proceed until to_workstream reaches
a satisfactory state.
Args:
from_workstream_id: UUID of the workstream that has the dependency
to_workstream_id: UUID of the workstream it depends on
description: optional human-readable explanation of the dependency
"""
dep = _post(f"/workstreams/{from_workstream_id}/dependencies", {
"to_workstream_id": to_workstream_id,
"description": description,
})
return json.dumps(dep, indent=2)
@mcp.tool()
def list_dependencies(workstream_id: str) -> str:
"""Return all dependency edges touching a workstream (both directions).
The response distinguishes edges where this workstream is the dependent
(depends_on) from edges where it is the blocker (blocks).
Args:
workstream_id: UUID of the workstream to inspect
"""
edges = _get(f"/workstreams/{workstream_id}/dependencies")
depends_on = [e for e in edges if e["from_workstream_id"] == workstream_id]
blocks = [e for e in edges if e["to_workstream_id"] == workstream_id]
return json.dumps({"depends_on": depends_on, "blocks": blocks}, indent=2)
# ---------------------------------------------------------------------------
# Extension points & technical debt
# ---------------------------------------------------------------------------
@mcp.tool()
def register_extension_point(
domain: str,
title: str,
ep_type: str,
description: str | None = None,
location: str | None = None,
priority: str = "medium",
ep_id: str | None = None,
topic_id: str | None = None,
workstream_id: str | None = None,
) -> str:
"""Register a discovered extension point — optional future functionality not yet committed.
Extension points capture design forks: things the system *could* do that
have been noticed and parked for deliberate later consideration.
Args:
domain: one of custodian | railiance | markitect | coulomb_social | personhood | foerster_capabilities
title: short description of the extension
ep_type: api | schema | mcp | dashboard | architecture | integration | other
description: longer explanation of what the extension would add
location: file:line or module where the extension point was noticed
priority: low | medium | high | critical
ep_id: optional human-readable ID, e.g. EP-CUST-001 (auto-assigned if omitted)
topic_id: UUID of related topic
workstream_id: UUID of related workstream
"""
ep = _post("/extension-points", {
"domain": domain, "title": title, "ep_type": ep_type,
"description": description, "location": location,
"priority": priority, "ep_id": ep_id,
"topic_id": topic_id, "workstream_id": workstream_id,
})
_post("/progress", {
"summary": f"Extension point registered: [{ep.get('ep_id') or ep['id'][:8]}] {title} ({ep_type}, {domain})",
"event_type": "extension_point",
"detail": {"id": ep["id"], "ep_id": ep.get("ep_id"), "ep_type": ep_type, "domain": domain},
})
return json.dumps(ep, indent=2)
@mcp.tool()
def list_extension_points(
domain: str | None = None,
status: str | None = None,
ep_type: str | None = None,
) -> str:
"""List extension points, optionally filtered.
Args:
domain: filter by domain
status: open | in_progress | addressed | deferred | wont_fix
ep_type: api | schema | mcp | dashboard | architecture | integration | other
"""
return json.dumps(_get("/extension-points", {
"domain": domain, "status": status, "ep_type": ep_type,
}), indent=2)
@mcp.tool()
def update_ep_status(ep_uuid: str, status: str) -> str:
"""Update the status of an extension point.
Args:
ep_uuid: UUID of the extension point
status: open | in_progress | addressed | deferred | wont_fix
"""
ep = _patch(f"/extension-points/{ep_uuid}", {"status": status})
_post("/progress", {
"summary": f"Extension point status → {status}: {ep['title']}",
"event_type": "extension_point",
"detail": {"id": ep_uuid, "status": status},
})
return json.dumps(ep, indent=2)
@mcp.tool()
def register_technical_debt(
domain: str,
title: str,
debt_type: str,
description: str | None = None,
location: str | None = None,
severity: str = "medium",
td_id: str | None = None,
topic_id: str | None = None,
workstream_id: str | None = None,
) -> str:
"""Register a technical debt item — a known quality compromise to address later.
Technical debt captures intentional or discovered shortcuts, design
weaknesses, missing tests, and similar issues that reduce codebase health.
Args:
domain: one of custodian | railiance | markitect | coulomb_social | personhood | foerster_capabilities
title: short description of the debt
debt_type: design | implementation | test | docs | dependencies | performance | security | other
description: what the issue is and what the correct fix would be
location: file:line or module where the debt lives
severity: low | medium | high | critical
td_id: optional human-readable ID, e.g. TD-CUST-001
topic_id: UUID of related topic
workstream_id: UUID of related workstream
"""
td = _post("/technical-debt", {
"domain": domain, "title": title, "debt_type": debt_type,
"description": description, "location": location,
"severity": severity, "td_id": td_id,
"topic_id": topic_id, "workstream_id": workstream_id,
})
_post("/progress", {
"summary": f"Technical debt registered: [{td.get('td_id') or td['id'][:8]}] {title} ({debt_type}, {severity}, {domain})",
"event_type": "technical_debt",
"detail": {"id": td["id"], "td_id": td.get("td_id"), "debt_type": debt_type, "severity": severity, "domain": domain},
})
return json.dumps(td, indent=2)
@mcp.tool()
def list_technical_debt(
domain: str | None = None,
status: str | None = None,
debt_type: str | None = None,
severity: str | None = None,
) -> str:
"""List technical debt items, optionally filtered.
Args:
domain: filter by domain
status: open | in_progress | resolved | deferred | wont_fix
debt_type: design | implementation | test | docs | dependencies | performance | security | other
severity: low | medium | high | critical
"""
return json.dumps(_get("/technical-debt", {
"domain": domain, "status": status,
"debt_type": debt_type, "severity": severity,
}), indent=2)
@mcp.tool()
def update_td_status(td_uuid: str, status: str) -> str:
"""Update the status of a technical debt item.
Args:
td_uuid: UUID of the technical debt item
status: open | in_progress | resolved | deferred | wont_fix
"""
td = _patch(f"/technical-debt/{td_uuid}", {"status": status})
_post("/progress", {
"summary": f"Technical debt status → {status}: {td['title']}",
"event_type": "technical_debt",
"detail": {"id": td_uuid, "status": status},
})
return json.dumps(td, indent=2)
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
mcp.run(transport="stdio")