Files
state-hub/mcp_server/server.py
tegwick f3fca3088f feat(ops-bridge): add HTTP/SSE MCP transport for remote Claude Code sessions
server.py: MCP_TRANSPORT and MCP_PORT env vars select transport at startup
  (default: stdio — no behaviour change for local use)
Makefile: `make mcp-http` starts SSE server on 127.0.0.1:8001

Remote registration (one-liner on COULOMBCORE after tunnel is up):
  claude mcp add-json -s user state-hub \
    '{"type":"sse","url":"http://127.0.0.1:18001/sse"}'

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-16 00:13:14 +01:00

1430 lines
49 KiB
Python

"""Custodian State Hub MCP Server (stdio).
Thin HTTP client over the FastAPI service — no direct DB access.
All business logic stays in the API; this layer is stateless.
"""
from __future__ import annotations
import json
import os
import re
import sys
from datetime import datetime
from pathlib import Path
from typing import Any
from uuid import UUID
import httpx
from fastmcp import FastMCP
API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000").rstrip("/")
mcp = FastMCP(
name="state-hub",
instructions=(
"Custodian State Hub: tracks topics, workstreams, tasks, decisions, and progress events. "
"Start every session with get_state_summary() for orientation. "
"When working inside a single registered domain repo, prefer get_domain_summary(domain_slug) "
"— it returns the same actionable data scoped to that domain at ~10% of the token cost. "
"All writes emit a progress_event automatically."
),
)
# ---------------------------------------------------------------------------
# HTTP helpers
# ---------------------------------------------------------------------------
def _client() -> httpx.Client:
return httpx.Client(base_url=API_BASE, timeout=30.0, follow_redirects=True)
def _get(path: str, params: dict | None = None) -> Any:
if not path.endswith("/"):
path = path + "/"
with _client() as c:
r = c.get(path, params={k: v for k, v in (params or {}).items() if v is not None})
r.raise_for_status()
return r.json()
def _post(path: str, body: dict) -> Any:
if not path.endswith("/"):
path = path + "/"
with _client() as c:
r = c.post(path, json={k: v for k, v in body.items() if v is not None})
r.raise_for_status()
return r.json()
def _patch(path: str, body: dict) -> Any:
if not path.endswith("/"):
path = path + "/"
with _client() as c:
r = c.patch(path, json={k: v for k, v in body.items() if v is not None})
r.raise_for_status()
return r.json()
def _delete(path: str) -> None:
with _client() as c:
r = c.delete(path)
r.raise_for_status()
# ---------------------------------------------------------------------------
# Resources
# ---------------------------------------------------------------------------
@mcp.resource("state://summary")
def resource_summary() -> str:
"""Full StateSummary JSON — primary orientation resource."""
return json.dumps(_get("/state/summary"), indent=2)
@mcp.resource("state://topics")
def resource_topics() -> str:
"""Active topics list."""
return json.dumps(_get("/topics", {"status": "active"}), indent=2)
@mcp.resource("state://workstreams/{topic_slug}")
def resource_workstreams(topic_slug: str) -> str:
"""Workstreams for a topic (by slug)."""
topics = _get("/topics", {"status": "active"})
match = next((t for t in topics if t["slug"] == topic_slug), None)
if not match:
return json.dumps({"error": f"Topic '{topic_slug}' not found"})
return json.dumps(_get("/workstreams", {"topic_id": match["id"]}), indent=2)
@mcp.resource("state://decisions/blocking")
def resource_blocking_decisions() -> str:
"""All pending/escalated decisions."""
return json.dumps(
_get("/decisions", {"decision_type": "pending", "status": "open"}),
indent=2,
)
@mcp.resource("state://tasks/blocked")
def resource_blocked_tasks() -> str:
"""All tasks with status=blocked."""
return json.dumps(_get("/tasks", {"status": "blocked"}), indent=2)
# ---------------------------------------------------------------------------
# Query tools
# ---------------------------------------------------------------------------
@mcp.tool()
def get_state_summary() -> str:
"""Primary orientation tool. Call at the start of every session.
Returns a full snapshot: topic/workstream/task/decision totals, blocking
decisions, blocked tasks, open workstreams, and the 20 most recent events.
NOTE: This response is large (~10k tokens). When working inside a single
registered domain repo, use get_domain_summary(domain_slug) instead —
same actionable data scoped to one domain at ~10% of the token cost.
"""
return json.dumps(_get("/state/summary"), indent=2)
@mcp.tool()
def get_domain_summary(domain_slug: str) -> str:
"""Lightweight session orientation for a single domain.
Use this instead of get_state_summary() when working in a registered
domain repo — returns only what is relevant to the specified domain,
typically 80-90% fewer tokens than the full summary.
Args:
domain_slug: the domain slug, e.g. "railiance", "markitect"
Returns: topic, active workstreams, open blocking decisions for this
topic, 5 most recent progress events, repo SBOM status, and goal guidance
(needs_workplan signals + alignment warnings).
"""
topics = _get("/topics")
topic = next((t for t in topics if t.get("domain_slug") == domain_slug), None)
if not topic:
return json.dumps({"error": f"No topic found for domain '{domain_slug}'"})
topic_id = topic["id"]
workstreams = _get("/workstreams", {"topic_id": topic_id, "status": "active"})
blocking = _get("/decisions", {"decision_type": "pending", "topic_id": topic_id})
recent = _get("/progress", {"topic_id": topic_id, "limit": 5})
repos = _get("/repos", {"domain": domain_slug})
# ── Goal guidance ──────────────────────────────────────────────────────────
# Fetch active repo goals per repo, then cross-reference with workstreams.
repo_by_id = {r["id"]: r for r in repos}
ws_by_repo_goal: dict[str, list] = {}
for ws in workstreams:
if ws.get("repo_goal_id"):
ws_by_repo_goal.setdefault(ws["repo_goal_id"], []).append(ws)
# repo_id → list of active workstreams (for alignment check)
ws_by_repo: dict[str, list] = {}
for ws in workstreams:
if ws.get("repo_id"):
ws_by_repo.setdefault(ws["repo_id"], []).append(ws)
needs_workplan: list[dict] = [] # active goal with no linked workstream
alignment_warnings: list[dict] = [] # workstreams not linked to active goal
for repo in repos:
repo_slug = repo["slug"]
repo_id = repo["id"]
active_goals = _get("/repo-goals", {"repo_slug": repo_slug, "status": "active"})
if not active_goals:
continue
active_goal_ids = {g["id"] for g in active_goals}
for goal in active_goals:
linked = ws_by_repo_goal.get(goal["id"], [])
if not linked:
needs_workplan.append({
"repo_slug": repo_slug,
"goal_id": goal["id"],
"goal_title": goal["title"],
"goal_description": goal["description"],
"priority": goal["priority"],
"action": (
f"No workstream is linked to repo goal '{goal['title']}'. "
"Create a workplan file in workplans/ and register a workstream "
f"with repo_goal_id='{goal['id']}' to start delivering this goal."
),
})
# Check if repo has active workstreams not tied to any active goal
repo_ws = ws_by_repo.get(repo_id, [])
unlinked_ws = [
ws for ws in repo_ws
if ws.get("repo_goal_id") not in active_goal_ids
]
if unlinked_ws:
# Most recently updated workstream = the one to suggest continuing
recent_ws = max(unlinked_ws, key=lambda w: w.get("updated_at", ""))
alignment_warnings.append({
"repo_slug": repo_slug,
"recent_workstream_id": recent_ws["id"],
"recent_workstream_title": recent_ws["title"],
"active_goal_titles": [g["title"] for g in active_goals],
"message": (
f"Workstream '{recent_ws['title']}' is not linked to the current "
f"repo goal(s) for {repo_slug}. "
"Continue this workstream if the work is still relevant, but verify "
"alignment with the active goal before committing to new tasks."
),
})
goal_guidance: dict = {}
if needs_workplan or alignment_warnings:
goal_guidance = {
"needs_workplan": needs_workplan,
"alignment_warnings": alignment_warnings,
}
result: dict = {
"domain": domain_slug,
"topic_id": topic_id,
"topic_title": topic["title"],
"workstreams": workstreams,
"blocking_decisions": blocking,
"recent_progress": recent,
"repos": [{"slug": r["slug"], "last_sbom_at": r.get("last_sbom_at")} for r in repos],
}
if goal_guidance:
result["goal_guidance"] = goal_guidance
return json.dumps(result, indent=2)
@mcp.tool()
def get_topic(slug: str) -> str:
"""Return a topic (with workstreams) by slug, plus its recent progress events."""
topics = _get("/topics")
match = next((t for t in topics if t["slug"] == slug), None)
if not match:
return json.dumps({"error": f"Topic '{slug}' not found"})
topic_detail = _get(f"/topics/{match['id']}")
recent = _get("/progress", {"topic_id": match["id"], "limit": 10})
return json.dumps({"topic": topic_detail, "recent_progress": recent}, indent=2)
@mcp.tool()
def list_blocked_tasks(workstream_id: str | None = None) -> str:
"""List all tasks with status=blocked, optionally filtered by workstream_id."""
return json.dumps(_get("/tasks", {"status": "blocked", "workstream_id": workstream_id}), indent=2)
@mcp.tool()
def list_pending_decisions(topic_id: str | None = None) -> str:
"""List pending decisions sorted by deadline (nulls last).
Optionally filter by topic_id. Escalated decisions are included and
highlighted by their escalation_note.
"""
results = _get("/decisions", {"decision_type": "pending", "topic_id": topic_id})
return json.dumps(results, indent=2)
@mcp.tool()
def get_recent_progress(limit: int = 20, since: str | None = None) -> str:
"""Retrieve recent progress events to reconstruct session history.
Args:
limit: max events to return (default 20)
since: ISO datetime string — only events after this timestamp
"""
return json.dumps(_get("/progress", {"limit": limit, "since": since}), indent=2)
# ---------------------------------------------------------------------------
# Mutate tools
# ---------------------------------------------------------------------------
@mcp.tool()
def create_workstream(
topic_id: str,
title: str,
slug: str | None = None,
description: str | None = None,
owner: str | None = None,
due_date: str | None = None,
repo_id: str | None = None,
) -> str:
"""Create a new workstream under a topic and emit a progress_event.
Args:
topic_id: UUID of the parent topic
title: workstream title
slug: URL-friendly identifier (auto-generated from title if omitted)
description: optional longer description
owner: optional owner name
due_date: optional ISO date string (YYYY-MM-DD)
repo_id: UUID of the owning repository (GEMS primary; strongly recommended per ADR-001)
"""
if not slug:
slug = re.sub(r"[^a-z0-9]+", "-", title.lower()).strip("-")
ws = _post("/workstreams", {
"topic_id": topic_id,
"title": title,
"slug": slug,
"description": description,
"owner": owner,
"due_date": due_date,
"status": "active",
"repo_id": repo_id,
})
_post("/progress", {
"topic_id": topic_id,
"workstream_id": ws["id"],
"event_type": "workstream_created",
"summary": f"Workstream created: {title}",
"author": "custodian",
"detail": {"owner": owner, "slug": slug},
})
return json.dumps(ws, indent=2)
@mcp.tool()
def create_task(
workstream_id: str,
title: str,
priority: str = "medium",
description: str | None = None,
assignee: str | None = None,
due_date: str | None = None,
) -> str:
"""Create a new task and emit a progress_event.
Args:
workstream_id: UUID of the parent workstream
title: task title
priority: low | medium | high | critical
description: optional longer description
assignee: optional assignee name
due_date: optional ISO date string (YYYY-MM-DD)
"""
task = _post("/tasks", {
"workstream_id": workstream_id,
"title": title,
"priority": priority,
"description": description,
"assignee": assignee,
"due_date": due_date,
})
_post("/progress", {
"workstream_id": workstream_id,
"task_id": task["id"],
"event_type": "task_created",
"summary": f"Task created: {title}",
"author": "custodian",
"detail": {"priority": priority, "assignee": assignee},
})
return json.dumps(task, indent=2)
@mcp.tool()
def update_task_status(
task_id: str,
status: str,
blocking_reason: str | None = None,
) -> str:
"""Update a task's status. blocking_reason is required when status='blocked'.
Args:
task_id: UUID of the task
status: todo | in_progress | blocked | done | cancelled
blocking_reason: required when status=blocked
"""
body: dict[str, Any] = {"status": status}
if blocking_reason:
body["blocking_reason"] = blocking_reason
task = _patch(f"/tasks/{task_id}", body)
_post("/progress", {
"task_id": task_id,
"workstream_id": task.get("workstream_id"),
"event_type": "task_status_changed",
"summary": f"Task status → {status}: {task['title']}",
"author": "custodian",
"detail": {"blocking_reason": blocking_reason},
})
return json.dumps(task, indent=2)
@mcp.tool()
def flag_for_human(task_id: str, note: str) -> str:
"""Flag a task as requiring human intervention.
Sets needs_human=True and records the required action as intervention_note.
Emits a progress event so the flag is visible in session history.
Args:
task_id: UUID of the task to flag
note: description of the action required from the human (required)
"""
task = _patch(f"/tasks/{task_id}", {
"needs_human": True,
"intervention_note": note,
})
_post("/progress", {
"task_id": task_id,
"workstream_id": task.get("workstream_id"),
"event_type": "task_flagged_human",
"summary": f"Task flagged for human intervention: {task['title']}",
"author": "custodian",
"detail": {"intervention_note": note},
})
return json.dumps(task, indent=2)
@mcp.tool()
def clear_human_flag(task_id: str) -> str:
"""Clear the human-intervention flag from a task.
Sets needs_human=False. The intervention_note is preserved as a
historical record. Call this after the human has completed the action.
Args:
task_id: UUID of the task to clear
"""
task = _patch(f"/tasks/{task_id}", {
"needs_human": False,
})
_post("/progress", {
"task_id": task_id,
"workstream_id": task.get("workstream_id"),
"event_type": "task_flag_cleared",
"summary": f"Human-intervention flag cleared: {task['title']}",
"author": "custodian",
})
return json.dumps(task, indent=2)
@mcp.tool()
def list_human_interventions(workstream_id: str | None = None) -> str:
"""List all tasks flagged for human intervention.
Returns tasks where needs_human=True, optionally filtered to one workstream.
Use this at session start to surface Bernd's action items.
Args:
workstream_id: optional UUID to scope results to one workstream
"""
return json.dumps(
_get("/tasks", {"needs_human": "true", "workstream_id": workstream_id}),
indent=2,
)
@mcp.tool()
def record_decision(
title: str,
decision_type: str = "pending",
topic_id: str | None = None,
workstream_id: str | None = None,
description: str | None = None,
rationale: str | None = None,
decided_by: str | None = None,
deadline: str | None = None,
) -> str:
"""Record a decision (made or pending).
Pending decisions touching financial/legal topics are auto-escalated per
constitution §4.
Args:
title: decision title
decision_type: made | pending
topic_id: optional topic UUID
workstream_id: optional workstream UUID (at least one required)
description: optional context
rationale: reasoning behind the decision
decided_by: person/agent who decided
deadline: ISO datetime string for when decision is needed
"""
decision = _post("/decisions", {
"title": title,
"decision_type": decision_type,
"topic_id": topic_id,
"workstream_id": workstream_id,
"description": description,
"rationale": rationale,
"decided_by": decided_by,
"deadline": deadline,
})
_post("/progress", {
"topic_id": topic_id,
"workstream_id": workstream_id,
"decision_id": decision["id"],
"event_type": "decision_recorded",
"summary": f"Decision recorded ({decision_type}): {title}",
"author": "custodian",
"detail": {"status": decision.get("status"), "escalation_note": decision.get("escalation_note")},
})
return json.dumps(decision, indent=2)
@mcp.tool()
def resolve_decision(
decision_id: str,
rationale: str,
decided_by: str,
) -> str:
"""Mark a decision as resolved.
Args:
decision_id: UUID of the decision
rationale: final reasoning/outcome
decided_by: who resolved it
"""
decision = _patch(f"/decisions/{decision_id}", {
"status": "resolved",
"decision_type": "made",
"rationale": rationale,
"decided_by": decided_by,
"decided_at": datetime.utcnow().isoformat() + "Z",
})
_post("/progress", {
"topic_id": decision.get("topic_id"),
"workstream_id": decision.get("workstream_id"),
"decision_id": decision_id,
"event_type": "decision_resolved",
"summary": f"Decision resolved by {decided_by}: {decision['title']}",
"author": "custodian",
"detail": {"rationale": rationale},
})
return json.dumps(decision, indent=2)
@mcp.tool()
def add_progress_event(
summary: str,
event_type: str = "note",
topic_id: str | None = None,
workstream_id: str | None = None,
task_id: str | None = None,
detail: dict | None = None,
) -> str:
"""Append a progress event to the log.
Args:
summary: human-readable summary of what happened
event_type: free-form label, e.g. note | milestone | blocker | insight
topic_id: optional topic UUID
workstream_id: optional workstream UUID
task_id: optional task UUID
detail: optional structured data (JSONB)
"""
event = _post("/progress", {
"topic_id": topic_id,
"workstream_id": workstream_id,
"task_id": task_id,
"event_type": event_type,
"summary": summary,
"author": "custodian",
"detail": detail,
})
return json.dumps(event, indent=2)
@mcp.tool()
def update_workstream_status(workstream_id: str, status: str) -> str:
"""Update a workstream's status.
Args:
workstream_id: UUID of the workstream
status: active | blocked | completed | archived
"""
ws = _patch(f"/workstreams/{workstream_id}", {"status": status})
_post("/progress", {
"workstream_id": workstream_id,
"topic_id": ws.get("topic_id"),
"event_type": "workstream_status_changed",
"summary": f"Workstream status → {status}: {ws['title']}",
"author": "custodian",
})
return json.dumps(ws, indent=2)
@mcp.tool()
def update_workstream(
workstream_id: str,
title: str | None = None,
description: str | None = None,
owner: str | None = None,
due_date: str | None = None,
repo_goal_id: str | None = None,
status: str | None = None,
) -> str:
"""Update fields on an existing workstream.
Args:
workstream_id: UUID of the workstream
title: new title (optional)
description: new description (optional)
owner: new owner (optional)
due_date: ISO date string YYYY-MM-DD (optional)
repo_goal_id: UUID of the repo goal to link (optional; pass empty string to clear)
status: active | blocked | completed | archived (optional)
"""
payload: dict = {}
if title is not None:
payload["title"] = title
if description is not None:
payload["description"] = description
if owner is not None:
payload["owner"] = owner
if due_date is not None:
payload["due_date"] = due_date
if status is not None:
payload["status"] = status
if repo_goal_id is not None:
payload["repo_goal_id"] = repo_goal_id if repo_goal_id else None
ws = _patch(f"/workstreams/{workstream_id}", payload)
return json.dumps(ws, indent=2)
# ---------------------------------------------------------------------------
# Next-steps suggestion tool (S2.3) — sanctioned write use case #2
# ---------------------------------------------------------------------------
@mcp.tool()
def get_next_steps() -> str:
"""Surface contextual next-action suggestions derived from hub state.
Returns suggestions based on:
- Recently resolved decisions → first open task in the same workstream
- Workstreams whose every dependency is now completed → first todo task
Each suggestion includes domain, workstream, task, and a plain-language
message. The hub surfaces *what* and *where* — the domain owns *how*.
This is one of the two sanctioned write-side use cases of the State Hub
(the other is resolve_decision). Suggestions are derived, not persisted.
"""
return json.dumps(_get("/state/next_steps"), indent=2)
# ---------------------------------------------------------------------------
# Dependency graph tools (S1.4)
# ---------------------------------------------------------------------------
@mcp.tool()
def create_dependency(
from_workstream_id: str,
to_workstream_id: str,
description: str | None = None,
) -> str:
"""Record that one workstream depends on another.
Semantics: from_workstream cannot fully proceed until to_workstream reaches
a satisfactory state.
Args:
from_workstream_id: UUID of the workstream that has the dependency
to_workstream_id: UUID of the workstream it depends on
description: optional human-readable explanation of the dependency
"""
dep = _post(f"/workstreams/{from_workstream_id}/dependencies", {
"to_workstream_id": to_workstream_id,
"description": description,
})
return json.dumps(dep, indent=2)
@mcp.tool()
def list_dependencies(workstream_id: str) -> str:
"""Return all dependency edges touching a workstream (both directions).
The response distinguishes edges where this workstream is the dependent
(depends_on) from edges where it is the blocker (blocks).
Args:
workstream_id: UUID of the workstream to inspect
"""
edges = _get(f"/workstreams/{workstream_id}/dependencies")
depends_on = [e for e in edges if e["from_workstream_id"] == workstream_id]
blocks = [e for e in edges if e["to_workstream_id"] == workstream_id]
return json.dumps({"depends_on": depends_on, "blocks": blocks}, indent=2)
# ---------------------------------------------------------------------------
# Extension points & technical debt
# ---------------------------------------------------------------------------
@mcp.tool()
def register_extension_point(
domain: str,
title: str,
ep_type: str,
description: str | None = None,
location: str | None = None,
priority: str = "medium",
ep_id: str | None = None,
topic_id: str | None = None,
workstream_id: str | None = None,
) -> str:
"""Register a discovered extension point — optional future functionality not yet committed.
Extension points capture design forks: things the system *could* do that
have been noticed and parked for deliberate later consideration.
Args:
domain: one of custodian | railiance | markitect | coulomb_social | personhood | foerster_capabilities
title: short description of the extension
ep_type: api | schema | mcp | dashboard | architecture | integration | other
description: longer explanation of what the extension would add
location: file:line or module where the extension point was noticed
priority: low | medium | high | critical
ep_id: optional human-readable ID, e.g. EP-CUST-001 (auto-assigned if omitted)
topic_id: UUID of related topic
workstream_id: UUID of related workstream
"""
ep = _post("/extension-points", {
"domain": domain, "title": title, "ep_type": ep_type,
"description": description, "location": location,
"priority": priority, "ep_id": ep_id,
"topic_id": topic_id, "workstream_id": workstream_id,
})
_post("/progress", {
"summary": f"Extension point registered: [{ep.get('ep_id') or ep['id'][:8]}] {title} ({ep_type}, {domain})",
"event_type": "extension_point",
"detail": {"id": ep["id"], "ep_id": ep.get("ep_id"), "ep_type": ep_type, "domain": domain},
})
return json.dumps(ep, indent=2)
@mcp.tool()
def list_extension_points(
domain: str | None = None,
status: str | None = None,
ep_type: str | None = None,
) -> str:
"""List extension points, optionally filtered.
Args:
domain: filter by domain
status: open | in_progress | addressed | deferred | wont_fix
ep_type: api | schema | mcp | dashboard | architecture | integration | other
"""
return json.dumps(_get("/extension-points", {
"domain": domain, "status": status, "ep_type": ep_type,
}), indent=2)
@mcp.tool()
def update_ep_status(ep_uuid: str, status: str) -> str:
"""Update the status of an extension point.
Args:
ep_uuid: UUID of the extension point
status: open | in_progress | addressed | deferred | wont_fix
"""
ep = _patch(f"/extension-points/{ep_uuid}", {"status": status})
_post("/progress", {
"summary": f"Extension point status → {status}: {ep['title']}",
"event_type": "extension_point",
"detail": {"id": ep_uuid, "status": status},
})
return json.dumps(ep, indent=2)
@mcp.tool()
def register_technical_debt(
domain: str,
title: str,
debt_type: str,
description: str | None = None,
location: str | None = None,
severity: str = "medium",
td_id: str | None = None,
topic_id: str | None = None,
workstream_id: str | None = None,
) -> str:
"""Register a technical debt item — a known quality compromise to address later.
Technical debt captures intentional or discovered shortcuts, design
weaknesses, missing tests, and similar issues that reduce codebase health.
Args:
domain: one of custodian | railiance | markitect | coulomb_social | personhood | foerster_capabilities
title: short description of the debt
debt_type: design | implementation | test | docs | dependencies | performance | security | other
description: what the issue is and what the correct fix would be
location: file:line or module where the debt lives
severity: low | medium | high | critical
td_id: optional human-readable ID, e.g. TD-CUST-001
topic_id: UUID of related topic
workstream_id: UUID of related workstream
"""
td = _post("/technical-debt", {
"domain": domain, "title": title, "debt_type": debt_type,
"description": description, "location": location,
"severity": severity, "td_id": td_id,
"topic_id": topic_id, "workstream_id": workstream_id,
})
_post("/progress", {
"summary": f"Technical debt registered: [{td.get('td_id') or td['id'][:8]}] {title} ({debt_type}, {severity}, {domain})",
"event_type": "technical_debt",
"detail": {"id": td["id"], "td_id": td.get("td_id"), "debt_type": debt_type, "severity": severity, "domain": domain},
})
return json.dumps(td, indent=2)
@mcp.tool()
def list_technical_debt(
domain: str | None = None,
status: str | None = None,
debt_type: str | None = None,
severity: str | None = None,
) -> str:
"""List technical debt items, optionally filtered.
Args:
domain: filter by domain
status: open | in_progress | resolved | deferred | wont_fix
debt_type: design | implementation | test | docs | dependencies | performance | security | other
severity: low | medium | high | critical
"""
return json.dumps(_get("/technical-debt", {
"domain": domain, "status": status,
"debt_type": debt_type, "severity": severity,
}), indent=2)
@mcp.tool()
def update_td_status(td_uuid: str, status: str) -> str:
"""Update the status of a technical debt item.
Args:
td_uuid: UUID of the technical debt item
status: open | in_progress | resolved | deferred | wont_fix
"""
td = _patch(f"/technical-debt/{td_uuid}", {"status": status})
_post("/progress", {
"summary": f"Technical debt status → {status}: {td['title']}",
"event_type": "technical_debt",
"detail": {"id": td_uuid, "status": status},
})
return json.dumps(td, indent=2)
# ---------------------------------------------------------------------------
# Domain lifecycle + repo registration tools (v0.5)
# ---------------------------------------------------------------------------
@mcp.tool()
def list_domains(status: str = "active") -> str:
"""List all registered domains.
Args:
status: active | archived | all (default: active)
"""
return json.dumps(_get("/domains", {"status": status}), indent=2)
@mcp.tool()
def create_domain(slug: str, name: str, description: str | None = None) -> str:
"""Create a new domain.
Args:
slug: URL-friendly identifier (lowercase, underscored), e.g. 'my_project'
name: Human-readable display name
description: optional longer description
"""
domain = _post("/domains", {"slug": slug, "name": name, "description": description})
_post("/progress", {
"event_type": "milestone",
"summary": f"Domain created: {slug} ({name})",
"author": "custodian",
"detail": {"slug": slug, "name": name},
})
return json.dumps(domain, indent=2)
@mcp.tool()
def rename_domain(slug: str, new_slug: str, new_name: str) -> str:
"""Rename a domain — cascades to EP/TD string columns.
Args:
slug: Current domain slug
new_slug: New URL-friendly identifier
new_name: New human-readable display name
"""
domain = _patch(f"/domains/{slug}/rename", {"new_slug": new_slug, "new_name": new_name})
_post("/progress", {
"event_type": "milestone",
"summary": f"Domain renamed: {slug}{new_slug} ({new_name})",
"author": "custodian",
"detail": {"old_slug": slug, "new_slug": new_slug, "new_name": new_name},
})
return json.dumps(domain, indent=2)
@mcp.tool()
def archive_domain(slug: str) -> str:
"""Archive a domain (soft-delete). Fails if active topics exist.
Args:
slug: Domain slug to archive
"""
domain = _patch(f"/domains/{slug}/archive", {})
_post("/progress", {
"event_type": "note",
"summary": f"Domain archived: {slug}",
"author": "custodian",
"detail": {"slug": slug},
})
return json.dumps(domain, indent=2)
@mcp.tool()
def list_domain_repos(domain_slug: str) -> str:
"""List all repositories registered under a domain.
Args:
domain_slug: Domain slug to filter by
"""
return json.dumps(_get("/repos", {"domain": domain_slug}), indent=2)
@mcp.tool()
def register_repo(
domain_slug: str,
name: str,
slug: str | None = None,
local_path: str | None = None,
remote_url: str | None = None,
description: str | None = None,
) -> str:
"""Register a git repository under a domain.
Args:
domain_slug: Domain slug (must already exist)
name: Human-readable repository name
slug: URL-friendly identifier (auto-generated from name if omitted)
local_path: Absolute local filesystem path to the repo
remote_url: Remote git URL (Gitea, GitHub, etc.)
description: optional description
"""
import re as _re
if not slug:
slug = _re.sub(r"[^a-z0-9]+", "-", name.lower()).strip("-")
repo = _post("/repos", {
"domain_slug": domain_slug,
"slug": slug,
"name": name,
"local_path": local_path,
"remote_url": remote_url,
"description": description,
})
_post("/progress", {
"event_type": "milestone",
"summary": f"Repo registered: {name} under domain '{domain_slug}'",
"author": "custodian",
"detail": {"slug": slug, "domain_slug": domain_slug, "local_path": local_path, "remote_url": remote_url},
})
return json.dumps(repo, indent=2)
# ---------------------------------------------------------------------------
# ADR-001 compliance validation
# ---------------------------------------------------------------------------
@mcp.tool()
def validate_repo_adr(repo_path: str, domain_slug: str | None = None) -> str:
"""Check whether a repository is consistent with ADR-001.
Validates that workplan files exist in workplans/ with correct frontmatter,
that state_hub_workstream_id references resolve to real DB records, and that
no active state-hub workstreams for the domain lack a backing file (orphan
detection — DB-only records are an ADR-001 violation).
Args:
repo_path: Absolute path to the repository root.
domain_slug: Domain slug for orphan detection (e.g. 'custodian').
If omitted, inferred from workplan frontmatter.
"""
import subprocess
script = Path(__file__).parent.parent / "scripts" / "validate_repo_adr.py"
cmd = [sys.executable, str(script), repo_path, "--json",
"--api-base", API_BASE]
if domain_slug:
cmd += ["--domain", domain_slug]
result = subprocess.run(cmd, capture_output=True, text=True)
try:
data = json.loads(result.stdout)
except json.JSONDecodeError:
return f"Validator script error:\n{result.stderr or result.stdout or '(no output)'}"
findings = data.get("findings", [])
summary = data.get("summary", {})
overall = data.get("result", "unknown")
failures = [f for f in findings if f["level"] == "FAIL"]
warnings = [f for f in findings if f["level"] == "WARN"]
lines = [f"ADR-001 Compliance: {repo_path}", ""]
if failures:
lines.append(f"FAILURES ({len(failures)}):")
for f in failures:
loc = f" [{f['file']}]" if f.get("file") else ""
lines.append(f" FAIL {f['check']}{loc}")
lines.append(f" {f['detail']}")
lines.append("")
if warnings:
lines.append(f"WARNINGS ({len(warnings)}):")
for f in warnings:
loc = f" [{f['file']}]" if f.get("file") else ""
lines.append(f" WARN {f['check']}{loc}")
lines.append(f" {f['detail']}")
lines.append("")
lines.append(
f"Summary: {summary.get('pass', 0)} pass | "
f"{summary.get('warn', 0)} warn | "
f"{summary.get('fail', 0)} fail"
)
lines.append(f"Result: {'FAIL' if overall == 'fail' else 'PASS (with warnings)' if overall == 'warn' else 'PASS'}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# ADR-001 consistency checking engine
# ---------------------------------------------------------------------------
@mcp.tool()
def check_repo_consistency(repo_slug: str, fix: bool = False) -> str:
"""Run ADR-001 consistency check for a registered repo.
Performs bidirectional checks between workplan files in the repo and the
state-hub DB. The file is always authoritative: drift is reported with the
file value as the expected value.
Checks: missing workplans/, parse errors, stale DB references, status/title
drift, unlinked workplans, orphan DB workstreams, repo mismatches, task
status drift, unlinked tasks, and orphan DB tasks.
Args:
repo_slug: Registered repo slug (e.g. 'the-custodian', 'activity-core').
fix: If True, apply auto-fixable issues: status drift (C-04), title drift
(C-05), create missing DB workstreams (C-06), repo mismatch (C-09),
task status drift (C-10), create unlinked tasks (C-11).
"""
import subprocess
script = Path(__file__).parent.parent / "scripts" / "consistency_check.py"
cmd = [sys.executable, str(script), "--repo", repo_slug, "--json",
"--api-base", API_BASE]
if fix:
cmd.append("--fix")
result = subprocess.run(cmd, capture_output=True, text=True)
try:
data = json.loads(result.stdout)
except json.JSONDecodeError:
return f"Consistency check script error:\n{result.stderr or result.stdout or '(no output)'}"
issues = data.get("issues", [])
summary = data.get("summary", {})
overall = data.get("result", "unknown")
fixes = data.get("fixes_applied", [])
failures = [i for i in issues if i["severity"] == "FAIL"]
warnings = [i for i in issues if i["severity"] == "WARN"]
infos = [i for i in issues if i["severity"] == "INFO"]
lines = [
f"Consistency Check: {repo_slug}",
f"Path: {data.get('repo_path', '?')}",
"",
]
for sev, group in (("FAIL", failures), ("WARN", warnings), ("INFO", infos)):
if not group:
continue
lines.append(f"{sev}S ({len(group)}):")
for i in group:
loc = f" [{i['file_path']}]" if i.get("file_path") else ""
fix_tag = " [fixable]" if i.get("fixable") else ""
lines.append(f" {i['check_id']}{loc}{fix_tag}")
lines.append(f" {i['message']}")
lines.append("")
if fixes:
lines.append(f"Fixes applied ({len(fixes)}):")
for f in fixes:
lines.append(f" {f}")
lines.append("")
lines.append(
f"Summary: {summary.get('fail', 0)} fail | "
f"{summary.get('warn', 0)} warn | "
f"{summary.get('info', 0)} info"
)
lines.append(
f"Result: {'FAIL' if overall == 'fail' else 'PASS (with warnings)' if overall in ('warn',) else 'PASS'}"
)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Contribution tracking (v0.3)
# ---------------------------------------------------------------------------
@mcp.resource("state://contributions")
def resource_contributions() -> str:
"""All contribution artifacts (BR/FR/EP/UPR)."""
return json.dumps(_get("/contributions"), indent=2)
@mcp.resource("state://sbom/aggregated")
def resource_sbom_aggregated() -> str:
"""Aggregated SBOM entries across all repos."""
return json.dumps(_get("/sbom"), indent=2)
@mcp.resource("state://sbom/{repo_slug}")
def resource_sbom_repo(repo_slug: str) -> str:
"""SBOM view for a specific repo (by slug)."""
return json.dumps(_get(f"/sbom/{repo_slug}"), indent=2)
@mcp.tool()
def register_contribution(
type: str,
title: str,
target_org: str | None = None,
target_repo: str | None = None,
body_path: str | None = None,
related_workstream_id: str | None = None,
notes: str | None = None,
) -> str:
"""Register a new upstream contribution artifact (BR/FR/EP/UPR).
Args:
type: br | fr | ep | upr
title: Short human-readable title
target_org: GitHub org or owner of the upstream project
target_repo: Repository name of the upstream project
body_path: Relative path to the Markdown artifact file in the repo
related_workstream_id: UUID of the related workstream (optional)
notes: Any additional notes (optional)
"""
contrib = _post("/contributions", {
"type": type,
"title": title,
"target_org": target_org,
"target_repo": target_repo,
"body_path": body_path,
"related_workstream_id": related_workstream_id,
"notes": notes,
})
_post("/progress", {
"workstream_id": related_workstream_id,
"event_type": "contribution_registered",
"summary": f"Contribution registered [{type.upper()}]: {title}",
"author": "custodian",
"detail": {
"contribution_id": contrib["id"],
"type": type,
"target": f"{target_org}/{target_repo}" if target_org else target_repo,
"body_path": body_path,
},
})
return json.dumps(contrib, indent=2)
@mcp.tool()
def update_contribution_status(
contribution_id: str,
status: str,
notes: str | None = None,
) -> str:
"""Update the status of a contribution artifact.
Valid transitions: draft→submitted→acknowledged→accepted→merged
↘ ↘
rejected withdrawn
Args:
contribution_id: UUID of the contribution
status: submitted | acknowledged | accepted | rejected | merged | withdrawn
notes: Optional context for the status change
"""
contrib = _patch(f"/contributions/{contribution_id}/status", {
"status": status,
"notes": notes,
})
_post("/progress", {
"event_type": "contribution_status_changed",
"summary": f"Contribution status → {status}: {contrib['title']}",
"author": "custodian",
"detail": {"contribution_id": contribution_id, "status": status, "notes": notes},
})
return json.dumps(contrib, indent=2)
@mcp.tool()
def get_contributions(
type: str | None = None,
status: str | None = None,
target_repo: str | None = None,
) -> str:
"""List contribution artifacts, optionally filtered.
Args:
type: br | fr | ep | upr (optional)
status: draft | submitted | acknowledged | accepted | rejected | merged | withdrawn (optional)
target_repo: filter by upstream repo name (optional)
"""
return json.dumps(_get("/contributions", {
"type": type, "status": status, "target_repo": target_repo,
}), indent=2)
@mcp.tool()
def ingest_sbom_tool(repo_slug: str, lockfile_path: str) -> str:
"""Ingest a lockfile into the State Hub SBOM store for a repo.
Parses the lockfile and POSTs entries to /sbom/ingest/. Each call creates
a new SBOMSnapshot; previous snapshots are retained as history.
Args:
repo_slug: Managed-repo slug (must be registered via register_repo)
lockfile_path: Absolute path to the lockfile (uv.lock, package-lock.json, Cargo.lock, etc.)
"""
import subprocess
script = Path(__file__).parent.parent / "scripts" / "ingest_sbom.py"
result = subprocess.run(
[sys.executable, str(script), "--repo", repo_slug,
"--lockfile", lockfile_path, "--api-base", API_BASE],
capture_output=True, text=True,
)
output = (result.stdout + result.stderr).strip()
if result.returncode != 0:
return f"ingest_sbom failed (exit {result.returncode}):\n{output}"
return output
@mcp.tool()
def get_licence_report() -> str:
"""Get a licence report across all ingested SBOM entries.
Returns packages grouped by SPDX licence identifier, with copyleft
flag (GPL/AGPL/LGPL/EUPL/CDDL/MPL) and repos using each licence.
"""
return json.dumps(_get("/sbom/report/licences"), indent=2)
# ---------------------------------------------------------------------------
# Domain goals & repo goals (v0.7)
# ---------------------------------------------------------------------------
@mcp.tool()
def create_domain_goal(domain_slug: str, title: str, description: str) -> str:
"""Create a new domain goal and make it active (superseding any existing active goal).
A domain goal captures the high-level strategic intent for a domain. Only one
domain goal can be active at a time; creating a new active one supersedes the
previous active goal.
Args:
domain_slug: Slug of the domain (e.g. 'railiance', 'markitect')
title: Short goal title
description: Full description of the goal and its boundary conditions
"""
domains = _get("/domains", {"status": "active"})
domain = next((d for d in domains if d["slug"] == domain_slug), None)
if not domain:
return json.dumps({"error": f"Domain '{domain_slug}' not found"})
goal = _post("/domain-goals", {
"domain_id": domain["id"],
"title": title,
"description": description,
"status": "active",
})
_post("/progress", {
"event_type": "goal_created",
"summary": f"Domain goal created [{domain_slug}]: {title}",
"detail": {"goal_id": goal["id"], "domain_slug": domain_slug},
})
return json.dumps(goal, indent=2)
@mcp.tool()
def get_domain_goals(domain_slug: str, status: str | None = None) -> str:
"""List domain goals for a domain, optionally filtered by status.
Args:
domain_slug: Slug of the domain (e.g. 'railiance')
status: active | archived | superseded (omit for all)
"""
return json.dumps(_get("/domain-goals", {"domain_slug": domain_slug, "status": status}), indent=2)
@mcp.tool()
def activate_domain_goal(goal_id: str) -> str:
"""Set a domain goal as the active goal, superseding any currently active one.
Args:
goal_id: UUID of the domain goal to activate
"""
goal = _post(f"/domain-goals/{goal_id}/activate", {})
_post("/progress", {
"event_type": "goal_activated",
"summary": f"Domain goal activated: {goal['title']}",
"detail": {"goal_id": goal_id, "domain_slug": goal.get("domain_slug")},
})
return json.dumps(goal, indent=2)
@mcp.tool()
def create_repo_goal(
repo_slug: str,
title: str,
description: str,
domain_goal_id: str | None = None,
priority: int = 100,
) -> str:
"""Create a new repository goal.
Repository goals capture what needs to be achieved in a specific repository.
Multiple active repo goals can coexist; priority (lower number = higher priority)
determines ordering. Optionally link to the parent domain goal.
Args:
repo_slug: Slug of the repository (e.g. 'railiance-bootstrap')
title: Short goal title
description: Full description including boundary conditions and scope
domain_goal_id: UUID of the parent domain goal (optional)
priority: Integer priority — lower numbers = higher priority (default 100)
"""
repos = _get("/repos")
repo = next((r for r in repos if r["slug"] == repo_slug), None)
if not repo:
return json.dumps({"error": f"Repo '{repo_slug}' not found"})
goal = _post("/repo-goals", {
"repo_id": repo["id"],
"title": title,
"description": description,
"domain_goal_id": domain_goal_id,
"priority": priority,
"status": "active",
})
_post("/progress", {
"event_type": "goal_created",
"summary": f"Repo goal created [{repo_slug}]: {title}",
"detail": {"goal_id": goal["id"], "repo_slug": repo_slug, "priority": priority},
})
return json.dumps(goal, indent=2)
@mcp.tool()
def get_repo_goals(repo_slug: str, status: str | None = None) -> str:
"""List repository goals for a repo, ordered by priority.
Args:
repo_slug: Slug of the repository (e.g. 'railiance-bootstrap')
status: active | paused | completed | archived (omit for all)
"""
return json.dumps(_get("/repo-goals", {"repo_slug": repo_slug, "status": status}), indent=2)
@mcp.tool()
def update_repo_goal(
goal_id: str,
title: str | None = None,
description: str | None = None,
priority: int | None = None,
status: str | None = None,
domain_goal_id: str | None = None,
) -> str:
"""Update a repository goal (title, description, priority, status, or domain link).
Args:
goal_id: UUID of the repo goal
title: New title (optional)
description: New description (optional)
priority: New priority integer — lower = higher priority (optional)
status: active | paused | completed | archived (optional)
domain_goal_id: Link or re-link to a domain goal UUID (optional)
"""
updates: dict = {}
if title is not None:
updates["title"] = title
if description is not None:
updates["description"] = description
if priority is not None:
updates["priority"] = priority
if status is not None:
updates["status"] = status
if domain_goal_id is not None:
updates["domain_goal_id"] = domain_goal_id
goal = _patch(f"/repo-goals/{goal_id}", updates)
_post("/progress", {
"event_type": "goal_updated",
"summary": f"Repo goal updated: {goal['title']}",
"detail": {"goal_id": goal_id, "changes": list(updates.keys())},
})
return json.dumps(goal, indent=2)
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
transport = os.environ.get("MCP_TRANSPORT", "stdio")
if transport == "stdio":
mcp.run(transport="stdio")
else:
port = int(os.environ.get("MCP_PORT", "8001"))
mcp.run(transport=transport, host="127.0.0.1", port=port)