Files
state-hub/mcp_server/server.py
tegwick c792ab0bc0 feat(tasks): add needs_human intervention flag (CUST-WP-0009)
- Migration b4c5d6e7f8a9: adds needs_human (bool) + intervention_note (text) to tasks
- API: needs_human filter on GET /tasks/; 422 if flagged without note
- 3 MCP tools: flag_for_human, clear_human_flag, list_human_interventions
- Dashboard: interventions.md with amber cards and "Mark done" button
- Policy router + workstream DoD policy (workstream-dod.md)
- Workstream lifecycle docs page + workplan CUST-WP-0010
- CLAUDE.md: add step 4 (run fix-consistency after workplan writes)
- consistency_check.py: promote C-11 unlinked tasks from INFO to WARN

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-04 19:44:14 +01:00

1160 lines
39 KiB
Python

"""Custodian State Hub MCP Server (stdio).
Thin HTTP client over the FastAPI service — no direct DB access.
All business logic stays in the API; this layer is stateless.
"""
from __future__ import annotations
import json
import os
import re
import sys
from datetime import datetime
from pathlib import Path
from typing import Any
from uuid import UUID
import httpx
from fastmcp import FastMCP
API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000").rstrip("/")
mcp = FastMCP(
name="state-hub",
instructions=(
"Custodian State Hub: tracks topics, workstreams, tasks, decisions, and progress events. "
"Start every session with get_state_summary() for orientation. "
"When working inside a single registered domain repo, prefer get_domain_summary(domain_slug) "
"— it returns the same actionable data scoped to that domain at ~10% of the token cost. "
"All writes emit a progress_event automatically."
),
)
# ---------------------------------------------------------------------------
# HTTP helpers
# ---------------------------------------------------------------------------
def _client() -> httpx.Client:
return httpx.Client(base_url=API_BASE, timeout=30.0, follow_redirects=True)
def _get(path: str, params: dict | None = None) -> Any:
if not path.endswith("/"):
path = path + "/"
with _client() as c:
r = c.get(path, params={k: v for k, v in (params or {}).items() if v is not None})
r.raise_for_status()
return r.json()
def _post(path: str, body: dict) -> Any:
if not path.endswith("/"):
path = path + "/"
with _client() as c:
r = c.post(path, json={k: v for k, v in body.items() if v is not None})
r.raise_for_status()
return r.json()
def _patch(path: str, body: dict) -> Any:
if not path.endswith("/"):
path = path + "/"
with _client() as c:
r = c.patch(path, json={k: v for k, v in body.items() if v is not None})
r.raise_for_status()
return r.json()
def _delete(path: str) -> None:
with _client() as c:
r = c.delete(path)
r.raise_for_status()
# ---------------------------------------------------------------------------
# Resources
# ---------------------------------------------------------------------------
@mcp.resource("state://summary")
def resource_summary() -> str:
"""Full StateSummary JSON — primary orientation resource."""
return json.dumps(_get("/state/summary"), indent=2)
@mcp.resource("state://topics")
def resource_topics() -> str:
"""Active topics list."""
return json.dumps(_get("/topics", {"status": "active"}), indent=2)
@mcp.resource("state://workstreams/{topic_slug}")
def resource_workstreams(topic_slug: str) -> str:
"""Workstreams for a topic (by slug)."""
topics = _get("/topics", {"status": "active"})
match = next((t for t in topics if t["slug"] == topic_slug), None)
if not match:
return json.dumps({"error": f"Topic '{topic_slug}' not found"})
return json.dumps(_get("/workstreams", {"topic_id": match["id"]}), indent=2)
@mcp.resource("state://decisions/blocking")
def resource_blocking_decisions() -> str:
"""All pending/escalated decisions."""
return json.dumps(
_get("/decisions", {"decision_type": "pending", "status": "open"}),
indent=2,
)
@mcp.resource("state://tasks/blocked")
def resource_blocked_tasks() -> str:
"""All tasks with status=blocked."""
return json.dumps(_get("/tasks", {"status": "blocked"}), indent=2)
# ---------------------------------------------------------------------------
# Query tools
# ---------------------------------------------------------------------------
@mcp.tool()
def get_state_summary() -> str:
"""Primary orientation tool. Call at the start of every session.
Returns a full snapshot: topic/workstream/task/decision totals, blocking
decisions, blocked tasks, open workstreams, and the 20 most recent events.
NOTE: This response is large (~10k tokens). When working inside a single
registered domain repo, use get_domain_summary(domain_slug) instead —
same actionable data scoped to one domain at ~10% of the token cost.
"""
return json.dumps(_get("/state/summary"), indent=2)
@mcp.tool()
def get_domain_summary(domain_slug: str) -> str:
"""Lightweight session orientation for a single domain.
Use this instead of get_state_summary() when working in a registered
domain repo — returns only what is relevant to the specified domain,
typically 80-90% fewer tokens than the full summary.
Args:
domain_slug: the domain slug, e.g. "railiance", "markitect"
Returns: topic, active workstreams, open blocking decisions for this
topic, 5 most recent progress events, and repo SBOM status for this domain.
"""
topics = _get("/topics")
topic = next((t for t in topics if t.get("domain_slug") == domain_slug), None)
if not topic:
return json.dumps({"error": f"No topic found for domain '{domain_slug}'"})
topic_id = topic["id"]
workstreams = _get("/workstreams", {"topic_id": topic_id, "status": "active"})
blocking = _get("/decisions", {"decision_type": "pending", "topic_id": topic_id})
recent = _get("/progress", {"topic_id": topic_id, "limit": 5})
repos = _get("/repos", {"domain": domain_slug})
return json.dumps({
"domain": domain_slug,
"topic_id": topic_id,
"topic_title": topic["title"],
"workstreams": workstreams,
"blocking_decisions": blocking,
"recent_progress": recent,
"repos": [{"slug": r["slug"], "last_sbom_at": r.get("last_sbom_at")} for r in repos],
}, indent=2)
@mcp.tool()
def get_topic(slug: str) -> str:
"""Return a topic (with workstreams) by slug, plus its recent progress events."""
topics = _get("/topics")
match = next((t for t in topics if t["slug"] == slug), None)
if not match:
return json.dumps({"error": f"Topic '{slug}' not found"})
topic_detail = _get(f"/topics/{match['id']}")
recent = _get("/progress", {"topic_id": match["id"], "limit": 10})
return json.dumps({"topic": topic_detail, "recent_progress": recent}, indent=2)
@mcp.tool()
def list_blocked_tasks(workstream_id: str | None = None) -> str:
"""List all tasks with status=blocked, optionally filtered by workstream_id."""
return json.dumps(_get("/tasks", {"status": "blocked", "workstream_id": workstream_id}), indent=2)
@mcp.tool()
def list_pending_decisions(topic_id: str | None = None) -> str:
"""List pending decisions sorted by deadline (nulls last).
Optionally filter by topic_id. Escalated decisions are included and
highlighted by their escalation_note.
"""
results = _get("/decisions", {"decision_type": "pending", "topic_id": topic_id})
return json.dumps(results, indent=2)
@mcp.tool()
def get_recent_progress(limit: int = 20, since: str | None = None) -> str:
"""Retrieve recent progress events to reconstruct session history.
Args:
limit: max events to return (default 20)
since: ISO datetime string — only events after this timestamp
"""
return json.dumps(_get("/progress", {"limit": limit, "since": since}), indent=2)
# ---------------------------------------------------------------------------
# Mutate tools
# ---------------------------------------------------------------------------
@mcp.tool()
def create_workstream(
topic_id: str,
title: str,
slug: str | None = None,
description: str | None = None,
owner: str | None = None,
due_date: str | None = None,
repo_id: str | None = None,
) -> str:
"""Create a new workstream under a topic and emit a progress_event.
Args:
topic_id: UUID of the parent topic
title: workstream title
slug: URL-friendly identifier (auto-generated from title if omitted)
description: optional longer description
owner: optional owner name
due_date: optional ISO date string (YYYY-MM-DD)
repo_id: UUID of the owning repository (GEMS primary; strongly recommended per ADR-001)
"""
if not slug:
slug = re.sub(r"[^a-z0-9]+", "-", title.lower()).strip("-")
ws = _post("/workstreams", {
"topic_id": topic_id,
"title": title,
"slug": slug,
"description": description,
"owner": owner,
"due_date": due_date,
"status": "active",
"repo_id": repo_id,
})
_post("/progress", {
"topic_id": topic_id,
"workstream_id": ws["id"],
"event_type": "workstream_created",
"summary": f"Workstream created: {title}",
"author": "custodian",
"detail": {"owner": owner, "slug": slug},
})
return json.dumps(ws, indent=2)
@mcp.tool()
def create_task(
workstream_id: str,
title: str,
priority: str = "medium",
description: str | None = None,
assignee: str | None = None,
due_date: str | None = None,
) -> str:
"""Create a new task and emit a progress_event.
Args:
workstream_id: UUID of the parent workstream
title: task title
priority: low | medium | high | critical
description: optional longer description
assignee: optional assignee name
due_date: optional ISO date string (YYYY-MM-DD)
"""
task = _post("/tasks", {
"workstream_id": workstream_id,
"title": title,
"priority": priority,
"description": description,
"assignee": assignee,
"due_date": due_date,
})
_post("/progress", {
"workstream_id": workstream_id,
"task_id": task["id"],
"event_type": "task_created",
"summary": f"Task created: {title}",
"author": "custodian",
"detail": {"priority": priority, "assignee": assignee},
})
return json.dumps(task, indent=2)
@mcp.tool()
def update_task_status(
task_id: str,
status: str,
blocking_reason: str | None = None,
) -> str:
"""Update a task's status. blocking_reason is required when status='blocked'.
Args:
task_id: UUID of the task
status: todo | in_progress | blocked | done | cancelled
blocking_reason: required when status=blocked
"""
body: dict[str, Any] = {"status": status}
if blocking_reason:
body["blocking_reason"] = blocking_reason
task = _patch(f"/tasks/{task_id}", body)
_post("/progress", {
"task_id": task_id,
"workstream_id": task.get("workstream_id"),
"event_type": "task_status_changed",
"summary": f"Task status → {status}: {task['title']}",
"author": "custodian",
"detail": {"blocking_reason": blocking_reason},
})
return json.dumps(task, indent=2)
@mcp.tool()
def flag_for_human(task_id: str, note: str) -> str:
"""Flag a task as requiring human intervention.
Sets needs_human=True and records the required action as intervention_note.
Emits a progress event so the flag is visible in session history.
Args:
task_id: UUID of the task to flag
note: description of the action required from the human (required)
"""
task = _patch(f"/tasks/{task_id}", {
"needs_human": True,
"intervention_note": note,
})
_post("/progress", {
"task_id": task_id,
"workstream_id": task.get("workstream_id"),
"event_type": "task_flagged_human",
"summary": f"Task flagged for human intervention: {task['title']}",
"author": "custodian",
"detail": {"intervention_note": note},
})
return json.dumps(task, indent=2)
@mcp.tool()
def clear_human_flag(task_id: str) -> str:
"""Clear the human-intervention flag from a task.
Sets needs_human=False. The intervention_note is preserved as a
historical record. Call this after the human has completed the action.
Args:
task_id: UUID of the task to clear
"""
task = _patch(f"/tasks/{task_id}", {
"needs_human": False,
})
_post("/progress", {
"task_id": task_id,
"workstream_id": task.get("workstream_id"),
"event_type": "task_flag_cleared",
"summary": f"Human-intervention flag cleared: {task['title']}",
"author": "custodian",
})
return json.dumps(task, indent=2)
@mcp.tool()
def list_human_interventions(workstream_id: str | None = None) -> str:
"""List all tasks flagged for human intervention.
Returns tasks where needs_human=True, optionally filtered to one workstream.
Use this at session start to surface Bernd's action items.
Args:
workstream_id: optional UUID to scope results to one workstream
"""
return json.dumps(
_get("/tasks", {"needs_human": "true", "workstream_id": workstream_id}),
indent=2,
)
@mcp.tool()
def record_decision(
title: str,
decision_type: str = "pending",
topic_id: str | None = None,
workstream_id: str | None = None,
description: str | None = None,
rationale: str | None = None,
decided_by: str | None = None,
deadline: str | None = None,
) -> str:
"""Record a decision (made or pending).
Pending decisions touching financial/legal topics are auto-escalated per
constitution §4.
Args:
title: decision title
decision_type: made | pending
topic_id: optional topic UUID
workstream_id: optional workstream UUID (at least one required)
description: optional context
rationale: reasoning behind the decision
decided_by: person/agent who decided
deadline: ISO datetime string for when decision is needed
"""
decision = _post("/decisions", {
"title": title,
"decision_type": decision_type,
"topic_id": topic_id,
"workstream_id": workstream_id,
"description": description,
"rationale": rationale,
"decided_by": decided_by,
"deadline": deadline,
})
_post("/progress", {
"topic_id": topic_id,
"workstream_id": workstream_id,
"decision_id": decision["id"],
"event_type": "decision_recorded",
"summary": f"Decision recorded ({decision_type}): {title}",
"author": "custodian",
"detail": {"status": decision.get("status"), "escalation_note": decision.get("escalation_note")},
})
return json.dumps(decision, indent=2)
@mcp.tool()
def resolve_decision(
decision_id: str,
rationale: str,
decided_by: str,
) -> str:
"""Mark a decision as resolved.
Args:
decision_id: UUID of the decision
rationale: final reasoning/outcome
decided_by: who resolved it
"""
decision = _patch(f"/decisions/{decision_id}", {
"status": "resolved",
"decision_type": "made",
"rationale": rationale,
"decided_by": decided_by,
"decided_at": datetime.utcnow().isoformat() + "Z",
})
_post("/progress", {
"topic_id": decision.get("topic_id"),
"workstream_id": decision.get("workstream_id"),
"decision_id": decision_id,
"event_type": "decision_resolved",
"summary": f"Decision resolved by {decided_by}: {decision['title']}",
"author": "custodian",
"detail": {"rationale": rationale},
})
return json.dumps(decision, indent=2)
@mcp.tool()
def add_progress_event(
summary: str,
event_type: str = "note",
topic_id: str | None = None,
workstream_id: str | None = None,
task_id: str | None = None,
detail: dict | None = None,
) -> str:
"""Append a progress event to the log.
Args:
summary: human-readable summary of what happened
event_type: free-form label, e.g. note | milestone | blocker | insight
topic_id: optional topic UUID
workstream_id: optional workstream UUID
task_id: optional task UUID
detail: optional structured data (JSONB)
"""
event = _post("/progress", {
"topic_id": topic_id,
"workstream_id": workstream_id,
"task_id": task_id,
"event_type": event_type,
"summary": summary,
"author": "custodian",
"detail": detail,
})
return json.dumps(event, indent=2)
@mcp.tool()
def update_workstream_status(workstream_id: str, status: str) -> str:
"""Update a workstream's status.
Args:
workstream_id: UUID of the workstream
status: active | blocked | completed | archived
"""
ws = _patch(f"/workstreams/{workstream_id}", {"status": status})
_post("/progress", {
"workstream_id": workstream_id,
"topic_id": ws.get("topic_id"),
"event_type": "workstream_status_changed",
"summary": f"Workstream status → {status}: {ws['title']}",
"author": "custodian",
})
return json.dumps(ws, indent=2)
# ---------------------------------------------------------------------------
# Next-steps suggestion tool (S2.3) — sanctioned write use case #2
# ---------------------------------------------------------------------------
@mcp.tool()
def get_next_steps() -> str:
"""Surface contextual next-action suggestions derived from hub state.
Returns suggestions based on:
- Recently resolved decisions → first open task in the same workstream
- Workstreams whose every dependency is now completed → first todo task
Each suggestion includes domain, workstream, task, and a plain-language
message. The hub surfaces *what* and *where* — the domain owns *how*.
This is one of the two sanctioned write-side use cases of the State Hub
(the other is resolve_decision). Suggestions are derived, not persisted.
"""
return json.dumps(_get("/state/next_steps"), indent=2)
# ---------------------------------------------------------------------------
# Dependency graph tools (S1.4)
# ---------------------------------------------------------------------------
@mcp.tool()
def create_dependency(
from_workstream_id: str,
to_workstream_id: str,
description: str | None = None,
) -> str:
"""Record that one workstream depends on another.
Semantics: from_workstream cannot fully proceed until to_workstream reaches
a satisfactory state.
Args:
from_workstream_id: UUID of the workstream that has the dependency
to_workstream_id: UUID of the workstream it depends on
description: optional human-readable explanation of the dependency
"""
dep = _post(f"/workstreams/{from_workstream_id}/dependencies", {
"to_workstream_id": to_workstream_id,
"description": description,
})
return json.dumps(dep, indent=2)
@mcp.tool()
def list_dependencies(workstream_id: str) -> str:
"""Return all dependency edges touching a workstream (both directions).
The response distinguishes edges where this workstream is the dependent
(depends_on) from edges where it is the blocker (blocks).
Args:
workstream_id: UUID of the workstream to inspect
"""
edges = _get(f"/workstreams/{workstream_id}/dependencies")
depends_on = [e for e in edges if e["from_workstream_id"] == workstream_id]
blocks = [e for e in edges if e["to_workstream_id"] == workstream_id]
return json.dumps({"depends_on": depends_on, "blocks": blocks}, indent=2)
# ---------------------------------------------------------------------------
# Extension points & technical debt
# ---------------------------------------------------------------------------
@mcp.tool()
def register_extension_point(
domain: str,
title: str,
ep_type: str,
description: str | None = None,
location: str | None = None,
priority: str = "medium",
ep_id: str | None = None,
topic_id: str | None = None,
workstream_id: str | None = None,
) -> str:
"""Register a discovered extension point — optional future functionality not yet committed.
Extension points capture design forks: things the system *could* do that
have been noticed and parked for deliberate later consideration.
Args:
domain: one of custodian | railiance | markitect | coulomb_social | personhood | foerster_capabilities
title: short description of the extension
ep_type: api | schema | mcp | dashboard | architecture | integration | other
description: longer explanation of what the extension would add
location: file:line or module where the extension point was noticed
priority: low | medium | high | critical
ep_id: optional human-readable ID, e.g. EP-CUST-001 (auto-assigned if omitted)
topic_id: UUID of related topic
workstream_id: UUID of related workstream
"""
ep = _post("/extension-points", {
"domain": domain, "title": title, "ep_type": ep_type,
"description": description, "location": location,
"priority": priority, "ep_id": ep_id,
"topic_id": topic_id, "workstream_id": workstream_id,
})
_post("/progress", {
"summary": f"Extension point registered: [{ep.get('ep_id') or ep['id'][:8]}] {title} ({ep_type}, {domain})",
"event_type": "extension_point",
"detail": {"id": ep["id"], "ep_id": ep.get("ep_id"), "ep_type": ep_type, "domain": domain},
})
return json.dumps(ep, indent=2)
@mcp.tool()
def list_extension_points(
domain: str | None = None,
status: str | None = None,
ep_type: str | None = None,
) -> str:
"""List extension points, optionally filtered.
Args:
domain: filter by domain
status: open | in_progress | addressed | deferred | wont_fix
ep_type: api | schema | mcp | dashboard | architecture | integration | other
"""
return json.dumps(_get("/extension-points", {
"domain": domain, "status": status, "ep_type": ep_type,
}), indent=2)
@mcp.tool()
def update_ep_status(ep_uuid: str, status: str) -> str:
"""Update the status of an extension point.
Args:
ep_uuid: UUID of the extension point
status: open | in_progress | addressed | deferred | wont_fix
"""
ep = _patch(f"/extension-points/{ep_uuid}", {"status": status})
_post("/progress", {
"summary": f"Extension point status → {status}: {ep['title']}",
"event_type": "extension_point",
"detail": {"id": ep_uuid, "status": status},
})
return json.dumps(ep, indent=2)
@mcp.tool()
def register_technical_debt(
domain: str,
title: str,
debt_type: str,
description: str | None = None,
location: str | None = None,
severity: str = "medium",
td_id: str | None = None,
topic_id: str | None = None,
workstream_id: str | None = None,
) -> str:
"""Register a technical debt item — a known quality compromise to address later.
Technical debt captures intentional or discovered shortcuts, design
weaknesses, missing tests, and similar issues that reduce codebase health.
Args:
domain: one of custodian | railiance | markitect | coulomb_social | personhood | foerster_capabilities
title: short description of the debt
debt_type: design | implementation | test | docs | dependencies | performance | security | other
description: what the issue is and what the correct fix would be
location: file:line or module where the debt lives
severity: low | medium | high | critical
td_id: optional human-readable ID, e.g. TD-CUST-001
topic_id: UUID of related topic
workstream_id: UUID of related workstream
"""
td = _post("/technical-debt", {
"domain": domain, "title": title, "debt_type": debt_type,
"description": description, "location": location,
"severity": severity, "td_id": td_id,
"topic_id": topic_id, "workstream_id": workstream_id,
})
_post("/progress", {
"summary": f"Technical debt registered: [{td.get('td_id') or td['id'][:8]}] {title} ({debt_type}, {severity}, {domain})",
"event_type": "technical_debt",
"detail": {"id": td["id"], "td_id": td.get("td_id"), "debt_type": debt_type, "severity": severity, "domain": domain},
})
return json.dumps(td, indent=2)
@mcp.tool()
def list_technical_debt(
domain: str | None = None,
status: str | None = None,
debt_type: str | None = None,
severity: str | None = None,
) -> str:
"""List technical debt items, optionally filtered.
Args:
domain: filter by domain
status: open | in_progress | resolved | deferred | wont_fix
debt_type: design | implementation | test | docs | dependencies | performance | security | other
severity: low | medium | high | critical
"""
return json.dumps(_get("/technical-debt", {
"domain": domain, "status": status,
"debt_type": debt_type, "severity": severity,
}), indent=2)
@mcp.tool()
def update_td_status(td_uuid: str, status: str) -> str:
"""Update the status of a technical debt item.
Args:
td_uuid: UUID of the technical debt item
status: open | in_progress | resolved | deferred | wont_fix
"""
td = _patch(f"/technical-debt/{td_uuid}", {"status": status})
_post("/progress", {
"summary": f"Technical debt status → {status}: {td['title']}",
"event_type": "technical_debt",
"detail": {"id": td_uuid, "status": status},
})
return json.dumps(td, indent=2)
# ---------------------------------------------------------------------------
# Domain lifecycle + repo registration tools (v0.5)
# ---------------------------------------------------------------------------
@mcp.tool()
def list_domains(status: str = "active") -> str:
"""List all registered domains.
Args:
status: active | archived | all (default: active)
"""
return json.dumps(_get("/domains", {"status": status}), indent=2)
@mcp.tool()
def create_domain(slug: str, name: str, description: str | None = None) -> str:
"""Create a new domain.
Args:
slug: URL-friendly identifier (lowercase, underscored), e.g. 'my_project'
name: Human-readable display name
description: optional longer description
"""
domain = _post("/domains", {"slug": slug, "name": name, "description": description})
_post("/progress", {
"event_type": "milestone",
"summary": f"Domain created: {slug} ({name})",
"author": "custodian",
"detail": {"slug": slug, "name": name},
})
return json.dumps(domain, indent=2)
@mcp.tool()
def rename_domain(slug: str, new_slug: str, new_name: str) -> str:
"""Rename a domain — cascades to EP/TD string columns.
Args:
slug: Current domain slug
new_slug: New URL-friendly identifier
new_name: New human-readable display name
"""
domain = _patch(f"/domains/{slug}/rename", {"new_slug": new_slug, "new_name": new_name})
_post("/progress", {
"event_type": "milestone",
"summary": f"Domain renamed: {slug}{new_slug} ({new_name})",
"author": "custodian",
"detail": {"old_slug": slug, "new_slug": new_slug, "new_name": new_name},
})
return json.dumps(domain, indent=2)
@mcp.tool()
def archive_domain(slug: str) -> str:
"""Archive a domain (soft-delete). Fails if active topics exist.
Args:
slug: Domain slug to archive
"""
domain = _patch(f"/domains/{slug}/archive", {})
_post("/progress", {
"event_type": "note",
"summary": f"Domain archived: {slug}",
"author": "custodian",
"detail": {"slug": slug},
})
return json.dumps(domain, indent=2)
@mcp.tool()
def list_domain_repos(domain_slug: str) -> str:
"""List all repositories registered under a domain.
Args:
domain_slug: Domain slug to filter by
"""
return json.dumps(_get("/repos", {"domain": domain_slug}), indent=2)
@mcp.tool()
def register_repo(
domain_slug: str,
name: str,
slug: str | None = None,
local_path: str | None = None,
remote_url: str | None = None,
description: str | None = None,
) -> str:
"""Register a git repository under a domain.
Args:
domain_slug: Domain slug (must already exist)
name: Human-readable repository name
slug: URL-friendly identifier (auto-generated from name if omitted)
local_path: Absolute local filesystem path to the repo
remote_url: Remote git URL (Gitea, GitHub, etc.)
description: optional description
"""
import re as _re
if not slug:
slug = _re.sub(r"[^a-z0-9]+", "-", name.lower()).strip("-")
repo = _post("/repos", {
"domain_slug": domain_slug,
"slug": slug,
"name": name,
"local_path": local_path,
"remote_url": remote_url,
"description": description,
})
_post("/progress", {
"event_type": "milestone",
"summary": f"Repo registered: {name} under domain '{domain_slug}'",
"author": "custodian",
"detail": {"slug": slug, "domain_slug": domain_slug, "local_path": local_path, "remote_url": remote_url},
})
return json.dumps(repo, indent=2)
# ---------------------------------------------------------------------------
# ADR-001 compliance validation
# ---------------------------------------------------------------------------
@mcp.tool()
def validate_repo_adr(repo_path: str, domain_slug: str | None = None) -> str:
"""Check whether a repository is consistent with ADR-001.
Validates that workplan files exist in workplans/ with correct frontmatter,
that state_hub_workstream_id references resolve to real DB records, and that
no active state-hub workstreams for the domain lack a backing file (orphan
detection — DB-only records are an ADR-001 violation).
Args:
repo_path: Absolute path to the repository root.
domain_slug: Domain slug for orphan detection (e.g. 'custodian').
If omitted, inferred from workplan frontmatter.
"""
import subprocess
script = Path(__file__).parent.parent / "scripts" / "validate_repo_adr.py"
cmd = [sys.executable, str(script), repo_path, "--json",
"--api-base", API_BASE]
if domain_slug:
cmd += ["--domain", domain_slug]
result = subprocess.run(cmd, capture_output=True, text=True)
try:
data = json.loads(result.stdout)
except json.JSONDecodeError:
return f"Validator script error:\n{result.stderr or result.stdout or '(no output)'}"
findings = data.get("findings", [])
summary = data.get("summary", {})
overall = data.get("result", "unknown")
failures = [f for f in findings if f["level"] == "FAIL"]
warnings = [f for f in findings if f["level"] == "WARN"]
lines = [f"ADR-001 Compliance: {repo_path}", ""]
if failures:
lines.append(f"FAILURES ({len(failures)}):")
for f in failures:
loc = f" [{f['file']}]" if f.get("file") else ""
lines.append(f" FAIL {f['check']}{loc}")
lines.append(f" {f['detail']}")
lines.append("")
if warnings:
lines.append(f"WARNINGS ({len(warnings)}):")
for f in warnings:
loc = f" [{f['file']}]" if f.get("file") else ""
lines.append(f" WARN {f['check']}{loc}")
lines.append(f" {f['detail']}")
lines.append("")
lines.append(
f"Summary: {summary.get('pass', 0)} pass | "
f"{summary.get('warn', 0)} warn | "
f"{summary.get('fail', 0)} fail"
)
lines.append(f"Result: {'FAIL' if overall == 'fail' else 'PASS (with warnings)' if overall == 'warn' else 'PASS'}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# ADR-001 consistency checking engine
# ---------------------------------------------------------------------------
@mcp.tool()
def check_repo_consistency(repo_slug: str, fix: bool = False) -> str:
"""Run ADR-001 consistency check for a registered repo.
Performs bidirectional checks between workplan files in the repo and the
state-hub DB. The file is always authoritative: drift is reported with the
file value as the expected value.
Checks: missing workplans/, parse errors, stale DB references, status/title
drift, unlinked workplans, orphan DB workstreams, repo mismatches, task
status drift, unlinked tasks, and orphan DB tasks.
Args:
repo_slug: Registered repo slug (e.g. 'the-custodian', 'activity-core').
fix: If True, apply auto-fixable issues: status drift (C-04), title drift
(C-05), create missing DB workstreams (C-06), repo mismatch (C-09),
task status drift (C-10), create unlinked tasks (C-11).
"""
import subprocess
script = Path(__file__).parent.parent / "scripts" / "consistency_check.py"
cmd = [sys.executable, str(script), "--repo", repo_slug, "--json",
"--api-base", API_BASE]
if fix:
cmd.append("--fix")
result = subprocess.run(cmd, capture_output=True, text=True)
try:
data = json.loads(result.stdout)
except json.JSONDecodeError:
return f"Consistency check script error:\n{result.stderr or result.stdout or '(no output)'}"
issues = data.get("issues", [])
summary = data.get("summary", {})
overall = data.get("result", "unknown")
fixes = data.get("fixes_applied", [])
failures = [i for i in issues if i["severity"] == "FAIL"]
warnings = [i for i in issues if i["severity"] == "WARN"]
infos = [i for i in issues if i["severity"] == "INFO"]
lines = [
f"Consistency Check: {repo_slug}",
f"Path: {data.get('repo_path', '?')}",
"",
]
for sev, group in (("FAIL", failures), ("WARN", warnings), ("INFO", infos)):
if not group:
continue
lines.append(f"{sev}S ({len(group)}):")
for i in group:
loc = f" [{i['file_path']}]" if i.get("file_path") else ""
fix_tag = " [fixable]" if i.get("fixable") else ""
lines.append(f" {i['check_id']}{loc}{fix_tag}")
lines.append(f" {i['message']}")
lines.append("")
if fixes:
lines.append(f"Fixes applied ({len(fixes)}):")
for f in fixes:
lines.append(f" {f}")
lines.append("")
lines.append(
f"Summary: {summary.get('fail', 0)} fail | "
f"{summary.get('warn', 0)} warn | "
f"{summary.get('info', 0)} info"
)
lines.append(
f"Result: {'FAIL' if overall == 'fail' else 'PASS (with warnings)' if overall in ('warn',) else 'PASS'}"
)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Contribution tracking (v0.3)
# ---------------------------------------------------------------------------
@mcp.resource("state://contributions")
def resource_contributions() -> str:
"""All contribution artifacts (BR/FR/EP/UPR)."""
return json.dumps(_get("/contributions"), indent=2)
@mcp.resource("state://sbom/aggregated")
def resource_sbom_aggregated() -> str:
"""Aggregated SBOM entries across all repos."""
return json.dumps(_get("/sbom"), indent=2)
@mcp.resource("state://sbom/{repo_slug}")
def resource_sbom_repo(repo_slug: str) -> str:
"""SBOM view for a specific repo (by slug)."""
return json.dumps(_get(f"/sbom/{repo_slug}"), indent=2)
@mcp.tool()
def register_contribution(
type: str,
title: str,
target_org: str | None = None,
target_repo: str | None = None,
body_path: str | None = None,
related_workstream_id: str | None = None,
notes: str | None = None,
) -> str:
"""Register a new upstream contribution artifact (BR/FR/EP/UPR).
Args:
type: br | fr | ep | upr
title: Short human-readable title
target_org: GitHub org or owner of the upstream project
target_repo: Repository name of the upstream project
body_path: Relative path to the Markdown artifact file in the repo
related_workstream_id: UUID of the related workstream (optional)
notes: Any additional notes (optional)
"""
contrib = _post("/contributions", {
"type": type,
"title": title,
"target_org": target_org,
"target_repo": target_repo,
"body_path": body_path,
"related_workstream_id": related_workstream_id,
"notes": notes,
})
_post("/progress", {
"workstream_id": related_workstream_id,
"event_type": "contribution_registered",
"summary": f"Contribution registered [{type.upper()}]: {title}",
"author": "custodian",
"detail": {
"contribution_id": contrib["id"],
"type": type,
"target": f"{target_org}/{target_repo}" if target_org else target_repo,
"body_path": body_path,
},
})
return json.dumps(contrib, indent=2)
@mcp.tool()
def update_contribution_status(
contribution_id: str,
status: str,
notes: str | None = None,
) -> str:
"""Update the status of a contribution artifact.
Valid transitions: draft→submitted→acknowledged→accepted→merged
↘ ↘
rejected withdrawn
Args:
contribution_id: UUID of the contribution
status: submitted | acknowledged | accepted | rejected | merged | withdrawn
notes: Optional context for the status change
"""
contrib = _patch(f"/contributions/{contribution_id}/status", {
"status": status,
"notes": notes,
})
_post("/progress", {
"event_type": "contribution_status_changed",
"summary": f"Contribution status → {status}: {contrib['title']}",
"author": "custodian",
"detail": {"contribution_id": contribution_id, "status": status, "notes": notes},
})
return json.dumps(contrib, indent=2)
@mcp.tool()
def get_contributions(
type: str | None = None,
status: str | None = None,
target_repo: str | None = None,
) -> str:
"""List contribution artifacts, optionally filtered.
Args:
type: br | fr | ep | upr (optional)
status: draft | submitted | acknowledged | accepted | rejected | merged | withdrawn (optional)
target_repo: filter by upstream repo name (optional)
"""
return json.dumps(_get("/contributions", {
"type": type, "status": status, "target_repo": target_repo,
}), indent=2)
@mcp.tool()
def ingest_sbom_tool(repo_slug: str, lockfile_path: str) -> str:
"""Ingest a lockfile into the State Hub SBOM store for a repo.
Parses the lockfile and POSTs entries to /sbom/ingest/. Each call creates
a new SBOMSnapshot; previous snapshots are retained as history.
Args:
repo_slug: Managed-repo slug (must be registered via register_repo)
lockfile_path: Absolute path to the lockfile (uv.lock, package-lock.json, Cargo.lock, etc.)
"""
import subprocess
script = Path(__file__).parent.parent / "scripts" / "ingest_sbom.py"
result = subprocess.run(
[sys.executable, str(script), "--repo", repo_slug,
"--lockfile", lockfile_path, "--api-base", API_BASE],
capture_output=True, text=True,
)
output = (result.stdout + result.stderr).strip()
if result.returncode != 0:
return f"ingest_sbom failed (exit {result.returncode}):\n{output}"
return output
@mcp.tool()
def get_licence_report() -> str:
"""Get a licence report across all ingested SBOM entries.
Returns packages grouped by SPDX licence identifier, with copyleft
flag (GPL/AGPL/LGPL/EUPL/CDDL/MPL) and repos using each licence.
"""
return json.dumps(_get("/sbom/report/licences"), indent=2)
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
mcp.run(transport="stdio")