generated from coulomb/repo-seed
POST /topics/ was already implemented in the REST API but had no MCP wrapper, so agents couldn't create topics (e.g. inter_hub) via MCP. Tool follows the same pattern as create_domain. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2199 lines
78 KiB
Python
2199 lines
78 KiB
Python
"""Custodian State Hub MCP Server (stdio).
|
||
|
||
Thin HTTP client over the FastAPI service — no direct DB access.
|
||
All business logic stays in the API; this layer is stateless.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import os
|
||
import re
|
||
import sys
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Any, Optional
|
||
from uuid import UUID
|
||
|
||
import httpx
|
||
from fastmcp import FastMCP
|
||
|
||
API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000").rstrip("/")
|
||
|
||
mcp = FastMCP(
|
||
name="state-hub",
|
||
instructions=(
|
||
"Custodian State Hub: tracks topics, workstreams, tasks, decisions, and progress events. "
|
||
"Start every session with get_state_summary() for orientation. "
|
||
"When working inside a single registered domain repo, prefer get_domain_summary(domain_slug) "
|
||
"— it returns the same actionable data scoped to that domain at ~10% of the token cost. "
|
||
"All writes emit a progress_event automatically."
|
||
),
|
||
)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# HTTP helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _client() -> httpx.Client:
|
||
return httpx.Client(base_url=API_BASE, timeout=30.0, follow_redirects=True)
|
||
|
||
|
||
def _get(path: str, params: dict | None = None) -> Any:
|
||
if not path.endswith("/"):
|
||
path = path + "/"
|
||
try:
|
||
with _client() as c:
|
||
r = c.get(path, params={k: v for k, v in (params or {}).items() if v is not None})
|
||
r.raise_for_status()
|
||
return r.json()
|
||
except httpx.HTTPStatusError as e:
|
||
return {"error": f"API {e.response.status_code}: {e.response.text[:300]}"}
|
||
except Exception as e:
|
||
return {"error": f"Request failed: {e}"}
|
||
|
||
|
||
def _post(path: str, body: dict) -> Any:
|
||
if not path.endswith("/"):
|
||
path = path + "/"
|
||
try:
|
||
with _client() as c:
|
||
r = c.post(path, json={k: v for k, v in body.items() if v is not None})
|
||
r.raise_for_status()
|
||
return r.json()
|
||
except httpx.HTTPStatusError as e:
|
||
return {"error": f"API {e.response.status_code}: {e.response.text[:300]}"}
|
||
except Exception as e:
|
||
return {"error": f"Request failed: {e}"}
|
||
|
||
|
||
def _patch(path: str, body: dict) -> Any:
|
||
if not path.endswith("/"):
|
||
path = path + "/"
|
||
try:
|
||
with _client() as c:
|
||
r = c.patch(path, json={k: v for k, v in body.items() if v is not None})
|
||
r.raise_for_status()
|
||
return r.json()
|
||
except httpx.HTTPStatusError as e:
|
||
return {"error": f"API {e.response.status_code}: {e.response.text[:300]}"}
|
||
except Exception as e:
|
||
return {"error": f"Request failed: {e}"}
|
||
|
||
|
||
def _delete(path: str) -> None:
|
||
try:
|
||
with _client() as c:
|
||
r = c.delete(path)
|
||
r.raise_for_status()
|
||
except httpx.HTTPStatusError as e:
|
||
return {"error": f"API {e.response.status_code}: {e.response.text[:300]}"}
|
||
except Exception as e:
|
||
return {"error": f"Request failed: {e}"}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Resources
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@mcp.resource("state://summary")
|
||
def resource_summary() -> str:
|
||
"""Full StateSummary JSON — primary orientation resource."""
|
||
return json.dumps(_get("/state/summary"), indent=2)
|
||
|
||
|
||
@mcp.resource("state://topics")
|
||
def resource_topics() -> str:
|
||
"""Active topics list."""
|
||
return json.dumps(_get("/topics", {"status": "active"}), indent=2)
|
||
|
||
|
||
@mcp.resource("state://workstreams/{topic_slug}")
|
||
def resource_workstreams(topic_slug: str) -> str:
|
||
"""Workstreams for a topic (by slug)."""
|
||
topics = _get("/topics", {"status": "active"})
|
||
match = next((t for t in topics if t["slug"] == topic_slug), None)
|
||
if not match:
|
||
return json.dumps({"error": f"Topic '{topic_slug}' not found"})
|
||
return json.dumps(_get("/workstreams", {"topic_id": match["id"]}), indent=2)
|
||
|
||
|
||
@mcp.resource("state://decisions/blocking")
|
||
def resource_blocking_decisions() -> str:
|
||
"""All pending/escalated decisions."""
|
||
return json.dumps(
|
||
_get("/decisions", {"decision_type": "pending", "status": "open"}),
|
||
indent=2,
|
||
)
|
||
|
||
|
||
@mcp.resource("state://tasks/blocked")
|
||
def resource_blocked_tasks() -> str:
|
||
"""All tasks with status=blocked."""
|
||
return json.dumps(_get("/tasks", {"status": "blocked"}), indent=2)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Query tools
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@mcp.tool()
|
||
def get_state_summary() -> str:
|
||
"""Primary orientation tool. Call at the start of every session.
|
||
|
||
Returns a full snapshot: topic/workstream/task/decision totals, blocking
|
||
decisions, blocked tasks, open workstreams, and the 20 most recent events.
|
||
|
||
NOTE: This response is large (~10k tokens). When working inside a single
|
||
registered domain repo, use get_domain_summary(domain_slug) instead —
|
||
same actionable data scoped to one domain at ~10% of the token cost.
|
||
"""
|
||
return json.dumps(_get("/state/summary"), indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def get_domain_summary(domain_slug: str) -> str:
|
||
"""Lightweight session orientation for a single domain.
|
||
|
||
Use this instead of get_state_summary() when working in a registered
|
||
domain repo — returns only what is relevant to the specified domain,
|
||
typically 80-90% fewer tokens than the full summary.
|
||
|
||
Args:
|
||
domain_slug: the domain slug, e.g. "railiance", "markitect"
|
||
|
||
Returns: topic, active workstreams, open blocking decisions for this
|
||
topic, 5 most recent progress events, repo SBOM status, and goal guidance
|
||
(needs_workplan signals + alignment warnings).
|
||
"""
|
||
topics = _get("/topics")
|
||
topic = next((t for t in topics if t.get("domain_slug") == domain_slug), None)
|
||
if not topic:
|
||
return json.dumps({"error": f"No topic found for domain '{domain_slug}'"})
|
||
|
||
topic_id = topic["id"]
|
||
|
||
workstreams = _get("/workstreams", {"topic_id": topic_id, "status": "active"})
|
||
blocking = _get("/decisions", {"decision_type": "pending", "topic_id": topic_id})
|
||
recent = _get("/progress", {"topic_id": topic_id, "limit": 5})
|
||
repos = _get("/repos", {"domain": domain_slug})
|
||
|
||
# ── Goal guidance ──────────────────────────────────────────────────────────
|
||
# Fetch active repo goals per repo, then cross-reference with workstreams.
|
||
repo_by_id = {r["id"]: r for r in repos}
|
||
ws_by_repo_goal: dict[str, list] = {}
|
||
for ws in workstreams:
|
||
if ws.get("repo_goal_id"):
|
||
ws_by_repo_goal.setdefault(ws["repo_goal_id"], []).append(ws)
|
||
|
||
# repo_id → list of active workstreams (for alignment check)
|
||
ws_by_repo: dict[str, list] = {}
|
||
for ws in workstreams:
|
||
if ws.get("repo_id"):
|
||
ws_by_repo.setdefault(ws["repo_id"], []).append(ws)
|
||
|
||
needs_workplan: list[dict] = [] # active goal with no linked workstream
|
||
alignment_warnings: list[dict] = [] # workstreams not linked to active goal
|
||
|
||
for repo in repos:
|
||
repo_slug = repo["slug"]
|
||
repo_id = repo["id"]
|
||
active_goals = _get("/repo-goals", {"repo_slug": repo_slug, "status": "active"})
|
||
if not active_goals:
|
||
continue
|
||
active_goal_ids = {g["id"] for g in active_goals}
|
||
|
||
for goal in active_goals:
|
||
linked = ws_by_repo_goal.get(goal["id"], [])
|
||
if not linked:
|
||
needs_workplan.append({
|
||
"repo_slug": repo_slug,
|
||
"goal_id": goal["id"],
|
||
"goal_title": goal["title"],
|
||
"goal_description": goal["description"],
|
||
"priority": goal["priority"],
|
||
"action": (
|
||
f"No workstream is linked to repo goal '{goal['title']}'. "
|
||
"Create a workplan file in workplans/ and register a workstream "
|
||
f"with repo_goal_id='{goal['id']}' to start delivering this goal."
|
||
),
|
||
})
|
||
|
||
# Check if repo has active workstreams not tied to any active goal
|
||
repo_ws = ws_by_repo.get(repo_id, [])
|
||
unlinked_ws = [
|
||
ws for ws in repo_ws
|
||
if ws.get("repo_goal_id") not in active_goal_ids
|
||
]
|
||
if unlinked_ws:
|
||
# Most recently updated workstream = the one to suggest continuing
|
||
recent_ws = max(unlinked_ws, key=lambda w: w.get("updated_at", ""))
|
||
alignment_warnings.append({
|
||
"repo_slug": repo_slug,
|
||
"recent_workstream_id": recent_ws["id"],
|
||
"recent_workstream_title": recent_ws["title"],
|
||
"active_goal_titles": [g["title"] for g in active_goals],
|
||
"message": (
|
||
f"Workstream '{recent_ws['title']}' is not linked to the current "
|
||
f"repo goal(s) for {repo_slug}. "
|
||
"Continue this workstream if the work is still relevant, but verify "
|
||
"alignment with the active goal before committing to new tasks."
|
||
),
|
||
})
|
||
|
||
goal_guidance: dict = {}
|
||
if needs_workplan or alignment_warnings:
|
||
goal_guidance = {
|
||
"needs_workplan": needs_workplan,
|
||
"alignment_warnings": alignment_warnings,
|
||
}
|
||
|
||
result: dict = {
|
||
"domain": domain_slug,
|
||
"topic_id": topic_id,
|
||
"topic_title": topic["title"],
|
||
"workstreams": workstreams,
|
||
"blocking_decisions": blocking,
|
||
"recent_progress": recent,
|
||
"repos": [{"slug": r["slug"], "last_sbom_at": r.get("last_sbom_at")} for r in repos],
|
||
}
|
||
if goal_guidance:
|
||
result["goal_guidance"] = goal_guidance
|
||
return json.dumps(result, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def get_topic(slug: str) -> str:
|
||
"""Return a topic (with workstreams) by slug, plus its recent progress events."""
|
||
topics = _get("/topics")
|
||
match = next((t for t in topics if t["slug"] == slug), None)
|
||
if not match:
|
||
return json.dumps({"error": f"Topic '{slug}' not found"})
|
||
topic_detail = _get(f"/topics/{match['id']}")
|
||
recent = _get("/progress", {"topic_id": match["id"], "limit": 10})
|
||
return json.dumps({"topic": topic_detail, "recent_progress": recent}, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def create_topic(slug: str, title: str, domain: str, description: str | None = None) -> str:
|
||
"""Create a new topic under an existing domain.
|
||
|
||
Args:
|
||
slug: URL-safe identifier, e.g. "inter_hub" (must be unique).
|
||
title: Human-readable name, e.g. "Inter-Hub Federation".
|
||
domain: Domain slug the topic belongs to, e.g. "custodian".
|
||
description: Optional one-sentence description.
|
||
|
||
Returns the created TopicRead on success, or an error dict if the slug
|
||
already exists or the domain is not found.
|
||
"""
|
||
payload: dict = {"slug": slug, "title": title, "domain": domain}
|
||
if description:
|
||
payload["description"] = description
|
||
return json.dumps(_post("/topics", payload), indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def list_tasks(workstream_id: str, status: str | None = None) -> str:
|
||
"""List all tasks in a workstream, optionally filtered by status.
|
||
|
||
Args:
|
||
workstream_id: UUID of the workstream (required).
|
||
status: Optional filter — todo | in_progress | blocked | done | cancelled.
|
||
|
||
Returns [{id, title, status, priority, assignee, due_date, needs_human}] for every
|
||
matching task. Use this to look up task UUIDs before calling update_task_status,
|
||
or to check which tasks from a workplan file are already synced to the DB.
|
||
"""
|
||
return json.dumps(_get("/tasks", {"workstream_id": workstream_id, "status": status}), indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def list_blocked_tasks(workstream_id: str | None = None) -> str:
|
||
"""List all tasks with status=blocked, optionally filtered by workstream_id."""
|
||
return json.dumps(_get("/tasks", {"status": "blocked", "workstream_id": workstream_id}), indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def list_pending_decisions(topic_id: str | None = None) -> str:
|
||
"""List pending decisions sorted by deadline (nulls last).
|
||
|
||
Optionally filter by topic_id. Escalated decisions are included and
|
||
highlighted by their escalation_note.
|
||
"""
|
||
results = _get("/decisions", {"decision_type": "pending", "topic_id": topic_id})
|
||
return json.dumps(results, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def get_recent_progress(limit: int = 20, since: str | None = None) -> str:
|
||
"""Retrieve recent progress events to reconstruct session history.
|
||
|
||
Args:
|
||
limit: max events to return (default 20)
|
||
since: ISO datetime string — only events after this timestamp
|
||
"""
|
||
return json.dumps(_get("/progress", {"limit": limit, "since": since}), indent=2)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Mutate tools
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@mcp.tool()
|
||
def create_workstream(
|
||
topic_id: str,
|
||
title: str,
|
||
slug: str | None = None,
|
||
description: str | None = None,
|
||
owner: str | None = None,
|
||
due_date: str | None = None,
|
||
repo_id: str | None = None,
|
||
) -> str:
|
||
"""Create a new workstream under a topic and emit a progress_event.
|
||
|
||
Args:
|
||
topic_id: UUID of the parent topic
|
||
title: workstream title
|
||
slug: URL-friendly identifier (auto-generated from title if omitted)
|
||
description: optional longer description
|
||
owner: optional owner name
|
||
due_date: optional ISO date string (YYYY-MM-DD)
|
||
repo_id: UUID of the owning repository (GEMS primary; strongly recommended per ADR-001)
|
||
"""
|
||
if not slug:
|
||
slug = re.sub(r"[^a-z0-9]+", "-", title.lower()).strip("-")
|
||
ws = _post("/workstreams", {
|
||
"topic_id": topic_id,
|
||
"title": title,
|
||
"slug": slug,
|
||
"description": description,
|
||
"owner": owner,
|
||
"due_date": due_date,
|
||
"status": "active",
|
||
"repo_id": repo_id,
|
||
})
|
||
_post("/progress", {
|
||
"topic_id": topic_id,
|
||
"workstream_id": ws["id"],
|
||
"event_type": "workstream_created",
|
||
"summary": f"Workstream created: {title}",
|
||
"author": "custodian",
|
||
"detail": {"owner": owner, "slug": slug},
|
||
})
|
||
return json.dumps(ws, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def create_task(
|
||
workstream_id: str,
|
||
title: str,
|
||
priority: str = "medium",
|
||
description: str | None = None,
|
||
assignee: str | None = None,
|
||
due_date: str | None = None,
|
||
) -> str:
|
||
"""Create a new task and emit a progress_event.
|
||
|
||
Args:
|
||
workstream_id: UUID of the parent workstream
|
||
title: task title
|
||
priority: low | medium | high | critical
|
||
description: optional longer description
|
||
assignee: optional assignee name
|
||
due_date: optional ISO date string (YYYY-MM-DD)
|
||
"""
|
||
task = _post("/tasks", {
|
||
"workstream_id": workstream_id,
|
||
"title": title,
|
||
"priority": priority,
|
||
"description": description,
|
||
"assignee": assignee,
|
||
"due_date": due_date,
|
||
})
|
||
_post("/progress", {
|
||
"workstream_id": workstream_id,
|
||
"task_id": task["id"],
|
||
"event_type": "task_created",
|
||
"summary": f"Task created: {title}",
|
||
"author": "custodian",
|
||
"detail": {"priority": priority, "assignee": assignee},
|
||
})
|
||
return json.dumps(task, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def update_task_status(
|
||
task_id: str,
|
||
status: str,
|
||
blocking_reason: str | None = None,
|
||
) -> str:
|
||
"""Update a task's status. blocking_reason is required when status='blocked'.
|
||
|
||
Args:
|
||
task_id: UUID of the task
|
||
status: todo | in_progress | blocked | done | cancelled
|
||
blocking_reason: required when status=blocked
|
||
"""
|
||
body: dict[str, Any] = {"status": status}
|
||
if blocking_reason:
|
||
body["blocking_reason"] = blocking_reason
|
||
task = _patch(f"/tasks/{task_id}", body)
|
||
_post("/progress", {
|
||
"task_id": task_id,
|
||
"workstream_id": task.get("workstream_id"),
|
||
"event_type": "task_status_changed",
|
||
"summary": f"Task status → {status}: {task['title']}",
|
||
"author": "custodian",
|
||
"detail": {"blocking_reason": blocking_reason},
|
||
})
|
||
return json.dumps(task, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def flag_for_human(task_id: str, note: str) -> str:
|
||
"""Flag a task as requiring human intervention.
|
||
|
||
Sets needs_human=True and records the required action as intervention_note.
|
||
Emits a progress event so the flag is visible in session history.
|
||
|
||
Args:
|
||
task_id: UUID of the task to flag
|
||
note: description of the action required from the human (required)
|
||
"""
|
||
task = _patch(f"/tasks/{task_id}", {
|
||
"needs_human": True,
|
||
"intervention_note": note,
|
||
})
|
||
_post("/progress", {
|
||
"task_id": task_id,
|
||
"workstream_id": task.get("workstream_id"),
|
||
"event_type": "task_flagged_human",
|
||
"summary": f"Task flagged for human intervention: {task['title']}",
|
||
"author": "custodian",
|
||
"detail": {"intervention_note": note},
|
||
})
|
||
return json.dumps(task, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def clear_human_flag(task_id: str) -> str:
|
||
"""Clear the human-intervention flag from a task.
|
||
|
||
Sets needs_human=False. The intervention_note is preserved as a
|
||
historical record. Call this after the human has completed the action.
|
||
|
||
Args:
|
||
task_id: UUID of the task to clear
|
||
"""
|
||
task = _patch(f"/tasks/{task_id}", {
|
||
"needs_human": False,
|
||
})
|
||
_post("/progress", {
|
||
"task_id": task_id,
|
||
"workstream_id": task.get("workstream_id"),
|
||
"event_type": "task_flag_cleared",
|
||
"summary": f"Human-intervention flag cleared: {task['title']}",
|
||
"author": "custodian",
|
||
})
|
||
return json.dumps(task, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def list_human_interventions(workstream_id: str | None = None) -> str:
|
||
"""List all tasks flagged for human intervention.
|
||
|
||
Returns tasks where needs_human=True, optionally filtered to one workstream.
|
||
Use this at session start to surface Bernd's action items.
|
||
|
||
Args:
|
||
workstream_id: optional UUID to scope results to one workstream
|
||
"""
|
||
return json.dumps(
|
||
_get("/tasks", {"needs_human": "true", "workstream_id": workstream_id}),
|
||
indent=2,
|
||
)
|
||
|
||
|
||
@mcp.tool()
|
||
def record_decision(
|
||
title: str,
|
||
decision_type: str = "pending",
|
||
topic_id: str | None = None,
|
||
workstream_id: str | None = None,
|
||
description: str | None = None,
|
||
rationale: str | None = None,
|
||
decided_by: str | None = None,
|
||
deadline: str | None = None,
|
||
) -> str:
|
||
"""Record a decision (made or pending).
|
||
|
||
Pending decisions touching financial/legal topics are auto-escalated per
|
||
constitution §4.
|
||
|
||
Args:
|
||
title: decision title
|
||
decision_type: made | pending
|
||
topic_id: optional topic UUID
|
||
workstream_id: optional workstream UUID (at least one required)
|
||
description: optional context
|
||
rationale: reasoning behind the decision
|
||
decided_by: person/agent who decided
|
||
deadline: ISO datetime string for when decision is needed
|
||
"""
|
||
decision = _post("/decisions", {
|
||
"title": title,
|
||
"decision_type": decision_type,
|
||
"topic_id": topic_id,
|
||
"workstream_id": workstream_id,
|
||
"description": description,
|
||
"rationale": rationale,
|
||
"decided_by": decided_by,
|
||
"deadline": deadline,
|
||
})
|
||
_post("/progress", {
|
||
"topic_id": topic_id,
|
||
"workstream_id": workstream_id,
|
||
"decision_id": decision["id"],
|
||
"event_type": "decision_recorded",
|
||
"summary": f"Decision recorded ({decision_type}): {title}",
|
||
"author": "custodian",
|
||
"detail": {"status": decision.get("status"), "escalation_note": decision.get("escalation_note")},
|
||
})
|
||
return json.dumps(decision, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def resolve_decision(
|
||
decision_id: str,
|
||
rationale: str,
|
||
decided_by: str,
|
||
) -> str:
|
||
"""Mark a decision as resolved.
|
||
|
||
Args:
|
||
decision_id: UUID of the decision
|
||
rationale: final reasoning/outcome
|
||
decided_by: who resolved it
|
||
"""
|
||
decision = _patch(f"/decisions/{decision_id}", {
|
||
"status": "resolved",
|
||
"decision_type": "made",
|
||
"rationale": rationale,
|
||
"decided_by": decided_by,
|
||
"decided_at": datetime.now(tz=timezone.utc).isoformat(),
|
||
})
|
||
_post("/progress", {
|
||
"topic_id": decision.get("topic_id"),
|
||
"workstream_id": decision.get("workstream_id"),
|
||
"decision_id": decision_id,
|
||
"event_type": "decision_resolved",
|
||
"summary": f"Decision resolved by {decided_by}: {decision['title']}",
|
||
"author": "custodian",
|
||
"detail": {"rationale": rationale},
|
||
})
|
||
return json.dumps(decision, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def add_progress_event(
|
||
summary: str,
|
||
event_type: str = "note",
|
||
topic_id: str | None = None,
|
||
workstream_id: str | None = None,
|
||
task_id: str | None = None,
|
||
detail: dict | str | None = None,
|
||
) -> str:
|
||
"""Append a progress event to the log.
|
||
|
||
Args:
|
||
summary: human-readable summary of what happened
|
||
event_type: free-form label, e.g. note | milestone | blocker | insight
|
||
topic_id: optional topic UUID
|
||
workstream_id: optional workstream UUID
|
||
task_id: optional task UUID
|
||
detail: optional structured data (JSONB); accepts a dict or a JSON string
|
||
"""
|
||
if isinstance(detail, str):
|
||
try:
|
||
detail = json.loads(detail)
|
||
except (json.JSONDecodeError, ValueError):
|
||
detail = {"raw": detail}
|
||
event = _post("/progress", {
|
||
"topic_id": topic_id,
|
||
"workstream_id": workstream_id,
|
||
"task_id": task_id,
|
||
"event_type": event_type,
|
||
"summary": summary,
|
||
"author": "custodian",
|
||
"detail": detail,
|
||
})
|
||
return json.dumps(event, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def update_workstream_status(workstream_id: str, status: str) -> str:
|
||
"""Update a workstream's status.
|
||
|
||
Args:
|
||
workstream_id: UUID of the workstream
|
||
status: active | blocked | completed | archived
|
||
"""
|
||
ws = _patch(f"/workstreams/{workstream_id}", {"status": status})
|
||
_post("/progress", {
|
||
"workstream_id": workstream_id,
|
||
"topic_id": ws.get("topic_id"),
|
||
"event_type": "workstream_status_changed",
|
||
"summary": f"Workstream status → {status}: {ws['title']}",
|
||
"author": "custodian",
|
||
})
|
||
return json.dumps(ws, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def update_workstream(
|
||
workstream_id: str,
|
||
title: str | None = None,
|
||
description: str | None = None,
|
||
owner: str | None = None,
|
||
due_date: str | None = None,
|
||
repo_goal_id: str | None = None,
|
||
status: str | None = None,
|
||
) -> str:
|
||
"""Update fields on an existing workstream.
|
||
|
||
Args:
|
||
workstream_id: UUID of the workstream
|
||
title: new title (optional)
|
||
description: new description (optional)
|
||
owner: new owner (optional)
|
||
due_date: ISO date string YYYY-MM-DD (optional)
|
||
repo_goal_id: UUID of the repo goal to link (optional; pass empty string to clear)
|
||
status: active | blocked | completed | archived (optional)
|
||
"""
|
||
payload: dict = {}
|
||
if title is not None:
|
||
payload["title"] = title
|
||
if description is not None:
|
||
payload["description"] = description
|
||
if owner is not None:
|
||
payload["owner"] = owner
|
||
if due_date is not None:
|
||
payload["due_date"] = due_date
|
||
if status is not None:
|
||
payload["status"] = status
|
||
if repo_goal_id is not None:
|
||
payload["repo_goal_id"] = repo_goal_id if repo_goal_id else None
|
||
ws = _patch(f"/workstreams/{workstream_id}", payload)
|
||
return json.dumps(ws, indent=2)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Next-steps suggestion tool (S2.3) — sanctioned write use case #2
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@mcp.tool()
|
||
def get_next_steps() -> str:
|
||
"""Surface contextual next-action suggestions derived from hub state.
|
||
|
||
Returns suggestions based on:
|
||
- Recently resolved decisions → first open task in the same workstream
|
||
- Workstreams whose every dependency is now completed → first todo task
|
||
|
||
Each suggestion includes domain, workstream, task, and a plain-language
|
||
message. The hub surfaces *what* and *where* — the domain owns *how*.
|
||
|
||
This is one of the two sanctioned write-side use cases of the State Hub
|
||
(the other is resolve_decision). Suggestions are derived, not persisted.
|
||
"""
|
||
return json.dumps(_get("/state/next_steps"), indent=2)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Dependency graph tools (S1.4)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@mcp.tool()
|
||
def create_dependency(
|
||
from_workstream_id: str,
|
||
to_workstream_id: str,
|
||
description: str | None = None,
|
||
) -> str:
|
||
"""Record that one workstream depends on another.
|
||
|
||
Semantics: from_workstream cannot fully proceed until to_workstream reaches
|
||
a satisfactory state.
|
||
|
||
Args:
|
||
from_workstream_id: UUID of the workstream that has the dependency
|
||
to_workstream_id: UUID of the workstream it depends on
|
||
description: optional human-readable explanation of the dependency
|
||
"""
|
||
dep = _post(f"/workstreams/{from_workstream_id}/dependencies", {
|
||
"to_workstream_id": to_workstream_id,
|
||
"description": description,
|
||
})
|
||
return json.dumps(dep, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def list_dependencies(workstream_id: str) -> str:
|
||
"""Return all dependency edges touching a workstream (both directions).
|
||
|
||
The response distinguishes edges where this workstream is the dependent
|
||
(depends_on) from edges where it is the blocker (blocks).
|
||
|
||
Args:
|
||
workstream_id: UUID of the workstream to inspect
|
||
"""
|
||
edges = _get(f"/workstreams/{workstream_id}/dependencies")
|
||
depends_on = [e for e in edges if e["from_workstream_id"] == workstream_id]
|
||
blocks = [e for e in edges if e["to_workstream_id"] == workstream_id]
|
||
return json.dumps({"depends_on": depends_on, "blocks": blocks}, indent=2)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Extension points & technical debt
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@mcp.tool()
|
||
def register_extension_point(
|
||
domain: str,
|
||
title: str,
|
||
ep_type: str,
|
||
description: str | None = None,
|
||
location: str | None = None,
|
||
priority: str = "medium",
|
||
ep_id: str | None = None,
|
||
topic_id: str | None = None,
|
||
workstream_id: str | None = None,
|
||
) -> str:
|
||
"""Register a discovered extension point — optional future functionality not yet committed.
|
||
|
||
Extension points capture design forks: things the system *could* do that
|
||
have been noticed and parked for deliberate later consideration.
|
||
|
||
Args:
|
||
domain: one of custodian | railiance | markitect | coulomb_social | personhood | foerster_capabilities
|
||
title: short description of the extension
|
||
ep_type: api | schema | mcp | dashboard | architecture | integration | other
|
||
description: longer explanation of what the extension would add
|
||
location: file:line or module where the extension point was noticed
|
||
priority: low | medium | high | critical
|
||
ep_id: optional human-readable ID, e.g. EP-CUST-001 (auto-assigned if omitted)
|
||
topic_id: UUID of related topic
|
||
workstream_id: UUID of related workstream
|
||
"""
|
||
ep = _post("/extension-points", {
|
||
"domain": domain, "title": title, "ep_type": ep_type,
|
||
"description": description, "location": location,
|
||
"priority": priority, "ep_id": ep_id,
|
||
"topic_id": topic_id, "workstream_id": workstream_id,
|
||
})
|
||
_post("/progress", {
|
||
"summary": f"Extension point registered: [{ep.get('ep_id') or ep['id'][:8]}] {title} ({ep_type}, {domain})",
|
||
"event_type": "extension_point",
|
||
"detail": {"id": ep["id"], "ep_id": ep.get("ep_id"), "ep_type": ep_type, "domain": domain},
|
||
})
|
||
return json.dumps(ep, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def list_extension_points(
|
||
domain: str | None = None,
|
||
status: str | None = None,
|
||
ep_type: str | None = None,
|
||
) -> str:
|
||
"""List extension points, optionally filtered.
|
||
|
||
Args:
|
||
domain: filter by domain
|
||
status: open | in_progress | addressed | deferred | wont_fix
|
||
ep_type: api | schema | mcp | dashboard | architecture | integration | other
|
||
"""
|
||
return json.dumps(_get("/extension-points", {
|
||
"domain": domain, "status": status, "ep_type": ep_type,
|
||
}), indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def update_ep_status(ep_uuid: str, status: str) -> str:
|
||
"""Update the status of an extension point.
|
||
|
||
Args:
|
||
ep_uuid: UUID of the extension point
|
||
status: open | in_progress | addressed | deferred | wont_fix
|
||
"""
|
||
ep = _patch(f"/extension-points/{ep_uuid}", {"status": status})
|
||
_post("/progress", {
|
||
"summary": f"Extension point status → {status}: {ep['title']}",
|
||
"event_type": "extension_point",
|
||
"detail": {"id": ep_uuid, "status": status},
|
||
})
|
||
return json.dumps(ep, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def register_technical_debt(
|
||
domain: str,
|
||
title: str,
|
||
debt_type: str,
|
||
description: str | None = None,
|
||
location: str | None = None,
|
||
severity: str = "medium",
|
||
td_id: str | None = None,
|
||
topic_id: str | None = None,
|
||
workstream_id: str | None = None,
|
||
) -> str:
|
||
"""Register a technical debt item — a known quality compromise to address later.
|
||
|
||
Technical debt captures intentional or discovered shortcuts, design
|
||
weaknesses, missing tests, and similar issues that reduce codebase health.
|
||
|
||
Args:
|
||
domain: one of custodian | railiance | markitect | coulomb_social | personhood | foerster_capabilities
|
||
title: short description of the debt
|
||
debt_type: design | implementation | test | docs | dependencies | performance | security | other
|
||
description: what the issue is and what the correct fix would be
|
||
location: file:line or module where the debt lives
|
||
severity: low | medium | high | critical
|
||
td_id: optional human-readable ID, e.g. TD-CUST-001
|
||
topic_id: UUID of related topic
|
||
workstream_id: UUID of related workstream
|
||
"""
|
||
td = _post("/technical-debt", {
|
||
"domain": domain, "title": title, "debt_type": debt_type,
|
||
"description": description, "location": location,
|
||
"severity": severity, "td_id": td_id,
|
||
"topic_id": topic_id, "workstream_id": workstream_id,
|
||
})
|
||
_post("/progress", {
|
||
"summary": f"Technical debt registered: [{td.get('td_id') or td['id'][:8]}] {title} ({debt_type}, {severity}, {domain})",
|
||
"event_type": "technical_debt",
|
||
"detail": {"id": td["id"], "td_id": td.get("td_id"), "debt_type": debt_type, "severity": severity, "domain": domain},
|
||
})
|
||
return json.dumps(td, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def list_technical_debt(
|
||
domain: str | None = None,
|
||
status: str | None = None,
|
||
debt_type: str | None = None,
|
||
severity: str | None = None,
|
||
) -> str:
|
||
"""List technical debt items, optionally filtered.
|
||
|
||
Args:
|
||
domain: filter by domain
|
||
status: open | in_progress | resolved | deferred | wont_fix
|
||
debt_type: design | implementation | test | docs | dependencies | performance | security | other
|
||
severity: low | medium | high | critical
|
||
"""
|
||
return json.dumps(_get("/technical-debt", {
|
||
"domain": domain, "status": status,
|
||
"debt_type": debt_type, "severity": severity,
|
||
}), indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def update_td_status(td_uuid: str, status: str) -> str:
|
||
"""Update the status of a technical debt item.
|
||
|
||
Args:
|
||
td_uuid: UUID of the technical debt item
|
||
status: open | in_progress | resolved | deferred | wont_fix
|
||
"""
|
||
td = _patch(f"/technical-debt/{td_uuid}", {"status": status})
|
||
_post("/progress", {
|
||
"summary": f"Technical debt status → {status}: {td['title']}",
|
||
"event_type": "technical_debt",
|
||
"detail": {"id": td_uuid, "status": status},
|
||
})
|
||
return json.dumps(td, indent=2)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Domain lifecycle + repo registration tools (v0.5)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@mcp.tool()
|
||
def list_domains(status: str = "active") -> str:
|
||
"""List all registered domains.
|
||
|
||
Args:
|
||
status: active | archived | all (default: active)
|
||
"""
|
||
return json.dumps(_get("/domains", {"status": status}), indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def create_domain(slug: str, name: str, description: str | None = None) -> str:
|
||
"""Create a new domain.
|
||
|
||
Args:
|
||
slug: URL-friendly identifier (lowercase, underscored), e.g. 'my_project'
|
||
name: Human-readable display name
|
||
description: optional longer description
|
||
"""
|
||
domain = _post("/domains", {"slug": slug, "name": name, "description": description})
|
||
_post("/progress", {
|
||
"event_type": "milestone",
|
||
"summary": f"Domain created: {slug} ({name})",
|
||
"author": "custodian",
|
||
"detail": {"slug": slug, "name": name},
|
||
})
|
||
return json.dumps(domain, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def rename_domain(slug: str, new_slug: str, new_name: str) -> str:
|
||
"""Rename a domain — cascades to EP/TD string columns.
|
||
|
||
Args:
|
||
slug: Current domain slug
|
||
new_slug: New URL-friendly identifier
|
||
new_name: New human-readable display name
|
||
"""
|
||
domain = _patch(f"/domains/{slug}/rename", {"new_slug": new_slug, "new_name": new_name})
|
||
_post("/progress", {
|
||
"event_type": "milestone",
|
||
"summary": f"Domain renamed: {slug} → {new_slug} ({new_name})",
|
||
"author": "custodian",
|
||
"detail": {"old_slug": slug, "new_slug": new_slug, "new_name": new_name},
|
||
})
|
||
return json.dumps(domain, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def archive_domain(slug: str) -> str:
|
||
"""Archive a domain (soft-delete). Fails if active topics exist.
|
||
|
||
Args:
|
||
slug: Domain slug to archive
|
||
"""
|
||
domain = _patch(f"/domains/{slug}/archive", {})
|
||
_post("/progress", {
|
||
"event_type": "note",
|
||
"summary": f"Domain archived: {slug}",
|
||
"author": "custodian",
|
||
"detail": {"slug": slug},
|
||
})
|
||
return json.dumps(domain, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def list_domain_repos(domain_slug: str) -> str:
|
||
"""List all repositories registered under a domain.
|
||
|
||
Args:
|
||
domain_slug: Domain slug to filter by
|
||
"""
|
||
return json.dumps(_get("/repos", {"domain": domain_slug}), indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def register_repo(
|
||
domain_slug: str,
|
||
name: str,
|
||
slug: str | None = None,
|
||
local_path: str | None = None,
|
||
remote_url: str | None = None,
|
||
description: str | None = None,
|
||
) -> str:
|
||
"""Register a git repository under a domain.
|
||
|
||
Args:
|
||
domain_slug: Domain slug (must already exist)
|
||
name: Human-readable repository name
|
||
slug: URL-friendly identifier (auto-generated from name if omitted)
|
||
local_path: Absolute local filesystem path to the repo
|
||
remote_url: Remote git URL (Gitea, GitHub, etc.)
|
||
description: optional description
|
||
"""
|
||
import re as _re
|
||
if not slug:
|
||
slug = _re.sub(r"[^a-z0-9]+", "-", name.lower()).strip("-")
|
||
repo = _post("/repos", {
|
||
"domain_slug": domain_slug,
|
||
"slug": slug,
|
||
"name": name,
|
||
"local_path": local_path,
|
||
"remote_url": remote_url,
|
||
"description": description,
|
||
})
|
||
_post("/progress", {
|
||
"event_type": "milestone",
|
||
"summary": f"Repo registered: {name} under domain '{domain_slug}'",
|
||
"author": "custodian",
|
||
"detail": {"slug": slug, "domain_slug": domain_slug, "local_path": local_path, "remote_url": remote_url},
|
||
})
|
||
return json.dumps(repo, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def update_repo_path(repo_slug: str, path: str, host: str | None = None) -> str:
|
||
"""Register or update the local filesystem path for a repo on a specific host.
|
||
|
||
Use this when a repo lives at a different absolute path on different machines
|
||
(e.g. /home/worsch/marki-docx on the workstation vs /home/tegwick/marki-docx
|
||
on custodiancore). The consistency checker will prefer the host-specific path
|
||
over the legacy local_path field.
|
||
|
||
Args:
|
||
repo_slug: Managed-repo slug (e.g. 'marki-docx')
|
||
path: Absolute local path on the target machine (e.g. '/home/tegwick/marki-docx')
|
||
host: Hostname to register the path for. Defaults to the current machine's hostname.
|
||
"""
|
||
import socket as _socket
|
||
if not host:
|
||
host = _socket.gethostname()
|
||
repo = _post(f"/repos/{repo_slug}/paths", {"host": host, "path": path})
|
||
return json.dumps(repo, indent=2)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Shared path resolution helper
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _resolve_repo_path(repo: dict) -> str:
|
||
"""Return the best local filesystem path for *repo* on this host.
|
||
|
||
Resolution order — each candidate is expanded (supports ``~``) and
|
||
verified to exist before being accepted:
|
||
|
||
1. ``host_paths[hostname]`` — host-specific override
|
||
2. ``local_path`` — default fallback
|
||
|
||
Returns the resolved path string, or ``""`` if no valid path is found.
|
||
"""
|
||
import socket as _socket
|
||
hostname = _socket.gethostname()
|
||
host_paths = repo.get("host_paths") or {}
|
||
|
||
candidates = []
|
||
if host_paths.get(hostname):
|
||
candidates.append(host_paths[hostname])
|
||
if repo.get("local_path"):
|
||
candidates.append(repo["local_path"])
|
||
|
||
for raw in candidates:
|
||
resolved = str(Path(raw).expanduser())
|
||
if Path(resolved).is_dir():
|
||
return resolved
|
||
|
||
return ""
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Kaizen Agents
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _kaizen_agents_dir() -> Path:
|
||
"""Resolve the kaizen-agentic agents/ directory."""
|
||
repo = _get("/repos/kaizen-agentic")
|
||
base = _resolve_repo_path(repo)
|
||
if not base:
|
||
import socket as _socket
|
||
hostname = _socket.gethostname()
|
||
raise FileNotFoundError(
|
||
f"kaizen-agentic path not found on host '{hostname}'. "
|
||
"Register it with update_repo_path('kaizen-agentic', '/path/to/repo')."
|
||
)
|
||
agents_dir = Path(base) / "agents"
|
||
if not agents_dir.is_dir():
|
||
raise FileNotFoundError(f"agents/ directory not found at {agents_dir}")
|
||
return agents_dir
|
||
|
||
|
||
@mcp.tool()
|
||
def list_kaizen_agents(category: str | None = None) -> str:
|
||
"""List all available kaizen agent personas.
|
||
|
||
Reads agent metadata from kaizen-agentic/agents/agent-*.md frontmatter.
|
||
Each agent is a specialized instruction set Claude can load and follow.
|
||
|
||
Args:
|
||
category: Optional filter (e.g. 'testing', 'quality', 'process', 'infrastructure').
|
||
Returns all agents when omitted.
|
||
|
||
Returns:
|
||
JSON list of {name, description, category, file} objects.
|
||
"""
|
||
import re as _re
|
||
agents_dir = _kaizen_agents_dir()
|
||
result = []
|
||
for f in sorted(agents_dir.glob("agent-*.md")):
|
||
name = f.stem.removeprefix("agent-")
|
||
text = f.read_text(encoding="utf-8")
|
||
# Extract optional YAML frontmatter fields
|
||
fm_match = _re.match(r"^---\n(.*?\n)---\n", text, _re.DOTALL)
|
||
meta: dict = {}
|
||
if fm_match:
|
||
for line in fm_match.group(1).splitlines():
|
||
if ":" in line:
|
||
k, _, v = line.partition(":")
|
||
meta[k.strip()] = v.strip()
|
||
agent_category = meta.get("category", "")
|
||
if category and agent_category.lower() != category.lower():
|
||
continue
|
||
# Fall back to first non-empty line after frontmatter as description
|
||
desc = meta.get("description", "")
|
||
if not desc:
|
||
for line in text.split("\n"):
|
||
line = line.strip()
|
||
if line and not line.startswith("#") and not line.startswith("---"):
|
||
desc = line[:120]
|
||
break
|
||
result.append({"name": name, "description": desc, "category": agent_category, "file": f.name})
|
||
return json.dumps(result, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def get_kaizen_agent(name: str) -> str:
|
||
"""Load the full instructions for a kaizen agent persona.
|
||
|
||
Read the returned markdown and follow the instructions it contains.
|
||
Use list_kaizen_agents() to discover available agent names.
|
||
|
||
Args:
|
||
name: Agent name without 'agent-' prefix (e.g. 'tdd-workflow', 'code-refactoring').
|
||
|
||
Returns:
|
||
Full markdown content of the agent definition file.
|
||
"""
|
||
agents_dir = _kaizen_agents_dir()
|
||
agent_file = agents_dir / f"agent-{name}.md"
|
||
if not agent_file.exists():
|
||
available = [f.stem.removeprefix("agent-") for f in sorted(agents_dir.glob("agent-*.md"))]
|
||
return json.dumps({"error": f"Agent '{name}' not found.", "available": available})
|
||
return agent_file.read_text(encoding="utf-8")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# ADR-001 compliance validation
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@mcp.tool()
|
||
def validate_repo_adr(repo_slug: str, domain_slug: str | None = None) -> str:
|
||
"""Check whether a repository is consistent with ADR-001.
|
||
|
||
Validates that workplan files exist in workplans/ with correct frontmatter,
|
||
that state_hub_workstream_id references resolve to real DB records, and that
|
||
no active state-hub workstreams for the domain lack a backing file (orphan
|
||
detection — DB-only records are an ADR-001 violation).
|
||
|
||
The repo path is resolved from the DB: host_paths[hostname] is tried first
|
||
(with existence check), then local_path — both support ~ expansion. This tool always runs against
|
||
the server's copy of the repo. Remote agents on a different branch should
|
||
sync first, or run validate_repo_adr.py locally with
|
||
--api-base http://127.0.0.1:18000.
|
||
|
||
Args:
|
||
repo_slug: Registered repo slug (e.g. 'the-custodian', 'ops-bridge').
|
||
domain_slug: Domain slug for orphan detection (e.g. 'custodian').
|
||
If omitted, inferred from workplan frontmatter.
|
||
"""
|
||
import socket as _socket
|
||
import subprocess
|
||
|
||
repo = _get(f"/repos/{repo_slug}")
|
||
if isinstance(repo, dict) and repo.get("error"):
|
||
return f"Repo '{repo_slug}' not found: {repo['error']}"
|
||
|
||
repo_path = _resolve_repo_path(repo)
|
||
if not repo_path:
|
||
hostname = _socket.gethostname()
|
||
return (
|
||
f"⚠ No accessible path found for repo '{repo_slug}' on host '{hostname}'.\n"
|
||
f"Register with: update_repo_path('{repo_slug}', '/path/to/repo')\n"
|
||
f"Remote agents: run validate_repo_adr.py locally with "
|
||
f"--api-base {API_BASE}"
|
||
)
|
||
|
||
script = Path(__file__).parent.parent / "scripts" / "validate_repo_adr.py"
|
||
cmd = [sys.executable, str(script), repo_path, "--json",
|
||
"--api-base", API_BASE]
|
||
if domain_slug:
|
||
cmd += ["--domain", domain_slug]
|
||
|
||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||
try:
|
||
data = json.loads(result.stdout)
|
||
except json.JSONDecodeError:
|
||
return f"Validator script error:\n{result.stderr or result.stdout or '(no output)'}"
|
||
|
||
findings = data.get("findings", [])
|
||
summary = data.get("summary", {})
|
||
overall = data.get("result", "unknown")
|
||
|
||
failures = [f for f in findings if f["level"] == "FAIL"]
|
||
warnings = [f for f in findings if f["level"] == "WARN"]
|
||
|
||
lines = [f"ADR-001 Compliance: {repo_slug} ({repo_path})", ""]
|
||
|
||
if failures:
|
||
lines.append(f"FAILURES ({len(failures)}):")
|
||
for f in failures:
|
||
loc = f" [{f['file']}]" if f.get("file") else ""
|
||
lines.append(f" FAIL {f['check']}{loc}")
|
||
lines.append(f" {f['detail']}")
|
||
lines.append("")
|
||
|
||
if warnings:
|
||
lines.append(f"WARNINGS ({len(warnings)}):")
|
||
for f in warnings:
|
||
loc = f" [{f['file']}]" if f.get("file") else ""
|
||
lines.append(f" WARN {f['check']}{loc}")
|
||
lines.append(f" {f['detail']}")
|
||
lines.append("")
|
||
|
||
lines.append(
|
||
f"Summary: {summary.get('pass', 0)} pass | "
|
||
f"{summary.get('warn', 0)} warn | "
|
||
f"{summary.get('fail', 0)} fail"
|
||
)
|
||
lines.append(f"Result: {'FAIL' if overall == 'fail' else 'PASS (with warnings)' if overall == 'warn' else 'PASS'}")
|
||
return "\n".join(lines)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# ADR-001 consistency checking engine
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@mcp.tool()
|
||
def check_repo_consistency(repo_slug: str, fix: bool = False) -> str:
|
||
"""Run ADR-001 consistency check for a registered repo.
|
||
|
||
Performs bidirectional checks between workplan files in the repo and the
|
||
state-hub DB. The file is always authoritative: drift is reported with the
|
||
file value as the expected value.
|
||
|
||
Checks: missing workplans/, parse errors, stale DB references, status/title
|
||
drift, unlinked workplans, orphan DB workstreams, repo mismatches, task
|
||
status drift, unlinked tasks, and orphan DB tasks.
|
||
|
||
Args:
|
||
repo_slug: Registered repo slug (e.g. 'the-custodian', 'activity-core').
|
||
fix: If True, apply auto-fixable issues: status drift (C-04), title drift
|
||
(C-05), create missing DB workstreams (C-06), repo mismatch (C-09),
|
||
task status drift (C-10), create unlinked tasks (C-11).
|
||
"""
|
||
import socket as _socket
|
||
import subprocess
|
||
|
||
# Pre-flight: verify this host has the repo path registered and accessible.
|
||
repo = _get(f"/repos/{repo_slug}")
|
||
if isinstance(repo, dict) and repo.get("error"):
|
||
return f"Repo '{repo_slug}' not found: {repo['error']}"
|
||
repo_path = _resolve_repo_path(repo)
|
||
if not repo_path:
|
||
hostname = _socket.gethostname()
|
||
return (
|
||
f"⚠ No accessible path found for repo '{repo_slug}' on host '{hostname}'.\n"
|
||
f"Register with: update_repo_path('{repo_slug}', '/path/to/repo')\n"
|
||
f"Remote agents: run consistency_check.py locally with "
|
||
f"--api-base {API_BASE}"
|
||
)
|
||
|
||
script = Path(__file__).parent.parent / "scripts" / "consistency_check.py"
|
||
cmd = [sys.executable, str(script), "--repo", repo_slug, "--json",
|
||
"--api-base", API_BASE]
|
||
if fix:
|
||
cmd.append("--fix")
|
||
|
||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||
try:
|
||
data = json.loads(result.stdout)
|
||
except json.JSONDecodeError:
|
||
return f"Consistency check script error:\n{result.stderr or result.stdout or '(no output)'}"
|
||
|
||
issues = data.get("issues", [])
|
||
summary = data.get("summary", {})
|
||
overall = data.get("result", "unknown")
|
||
fixes = data.get("fixes_applied", [])
|
||
|
||
failures = [i for i in issues if i["severity"] == "FAIL"]
|
||
warnings = [i for i in issues if i["severity"] == "WARN"]
|
||
infos = [i for i in issues if i["severity"] == "INFO"]
|
||
|
||
lines = [
|
||
f"Consistency Check: {repo_slug}",
|
||
f"Path: {data.get('repo_path', '?')}",
|
||
"",
|
||
]
|
||
|
||
for sev, group in (("FAIL", failures), ("WARN", warnings), ("INFO", infos)):
|
||
if not group:
|
||
continue
|
||
lines.append(f"{sev}S ({len(group)}):")
|
||
for i in group:
|
||
loc = f" [{i['file_path']}]" if i.get("file_path") else ""
|
||
fix_tag = " [fixable]" if i.get("fixable") else ""
|
||
lines.append(f" {i['check_id']}{loc}{fix_tag}")
|
||
lines.append(f" {i['message']}")
|
||
lines.append("")
|
||
|
||
if fixes:
|
||
lines.append(f"Fixes applied ({len(fixes)}):")
|
||
for f in fixes:
|
||
lines.append(f" {f}")
|
||
lines.append("")
|
||
|
||
lines.append(
|
||
f"Summary: {summary.get('fail', 0)} fail | "
|
||
f"{summary.get('warn', 0)} warn | "
|
||
f"{summary.get('info', 0)} info"
|
||
)
|
||
lines.append(
|
||
f"Result: {'FAIL' if overall == 'fail' else 'PASS (with warnings)' if overall in ('warn',) else 'PASS'}"
|
||
)
|
||
return "\n".join(lines)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Contribution tracking (v0.3)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@mcp.resource("state://contributions")
|
||
def resource_contributions() -> str:
|
||
"""All contribution artifacts (BR/FR/EP/UPR)."""
|
||
return json.dumps(_get("/contributions"), indent=2)
|
||
|
||
|
||
@mcp.resource("state://sbom/aggregated")
|
||
def resource_sbom_aggregated() -> str:
|
||
"""Aggregated SBOM entries across all repos."""
|
||
return json.dumps(_get("/sbom"), indent=2)
|
||
|
||
|
||
@mcp.resource("state://sbom/{repo_slug}")
|
||
def resource_sbom_repo(repo_slug: str) -> str:
|
||
"""SBOM view for a specific repo (by slug)."""
|
||
return json.dumps(_get(f"/sbom/{repo_slug}"), indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def register_contribution(
|
||
type: str,
|
||
title: str,
|
||
target_org: str | None = None,
|
||
target_repo: str | None = None,
|
||
body_path: str | None = None,
|
||
related_workstream_id: str | None = None,
|
||
notes: str | None = None,
|
||
) -> str:
|
||
"""Register a new upstream contribution artifact (BR/FR/EP/UPR).
|
||
|
||
Args:
|
||
type: br | fr | ep | upr
|
||
title: Short human-readable title
|
||
target_org: GitHub org or owner of the upstream project
|
||
target_repo: Repository name of the upstream project
|
||
body_path: Relative path to the Markdown artifact file in the repo
|
||
related_workstream_id: UUID of the related workstream (optional)
|
||
notes: Any additional notes (optional)
|
||
"""
|
||
contrib = _post("/contributions", {
|
||
"type": type,
|
||
"title": title,
|
||
"target_org": target_org,
|
||
"target_repo": target_repo,
|
||
"body_path": body_path,
|
||
"related_workstream_id": related_workstream_id,
|
||
"notes": notes,
|
||
})
|
||
_post("/progress", {
|
||
"workstream_id": related_workstream_id,
|
||
"event_type": "contribution_registered",
|
||
"summary": f"Contribution registered [{type.upper()}]: {title}",
|
||
"author": "custodian",
|
||
"detail": {
|
||
"contribution_id": contrib["id"],
|
||
"type": type,
|
||
"target": f"{target_org}/{target_repo}" if target_org else target_repo,
|
||
"body_path": body_path,
|
||
},
|
||
})
|
||
return json.dumps(contrib, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def update_contribution_status(
|
||
contribution_id: str,
|
||
status: str,
|
||
notes: str | None = None,
|
||
) -> str:
|
||
"""Update the status of a contribution artifact.
|
||
|
||
Valid transitions: draft→submitted→acknowledged→accepted→merged
|
||
↘ ↘
|
||
rejected withdrawn
|
||
|
||
Args:
|
||
contribution_id: UUID of the contribution
|
||
status: submitted | acknowledged | accepted | rejected | merged | withdrawn
|
||
notes: Optional context for the status change
|
||
"""
|
||
contrib = _patch(f"/contributions/{contribution_id}/status", {
|
||
"status": status,
|
||
"notes": notes,
|
||
})
|
||
_post("/progress", {
|
||
"event_type": "contribution_status_changed",
|
||
"summary": f"Contribution status → {status}: {contrib['title']}",
|
||
"author": "custodian",
|
||
"detail": {"contribution_id": contribution_id, "status": status, "notes": notes},
|
||
})
|
||
return json.dumps(contrib, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def get_contributions(
|
||
type: str | None = None,
|
||
status: str | None = None,
|
||
target_repo: str | None = None,
|
||
) -> str:
|
||
"""List contribution artifacts, optionally filtered.
|
||
|
||
Args:
|
||
type: br | fr | ep | upr (optional)
|
||
status: draft | submitted | acknowledged | accepted | rejected | merged | withdrawn (optional)
|
||
target_repo: filter by upstream repo name (optional)
|
||
"""
|
||
return json.dumps(_get("/contributions", {
|
||
"type": type, "status": status, "target_repo": target_repo,
|
||
}), indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def ingest_sbom_tool(repo_slug: str, lockfile_path: str | None = None) -> str:
|
||
"""Ingest a lockfile into the State Hub SBOM store for a repo.
|
||
|
||
Parses the lockfile and POSTs entries to /sbom/ingest/. Each call creates
|
||
a new SBOMSnapshot; previous snapshots are retained as history.
|
||
|
||
The repo root is resolved from the DB using the current machine's hostname
|
||
(host_paths[hostname] → local_path fallback). lockfile_path, when given,
|
||
is treated as relative to the repo root. Omit it to auto-detect the lockfile.
|
||
|
||
Args:
|
||
repo_slug: Managed-repo slug (must be registered via register_repo)
|
||
lockfile_path: Path to the lockfile, relative to repo root
|
||
(e.g. "uv.lock", "frontend/package-lock.json").
|
||
Omit to auto-detect from the repo root.
|
||
"""
|
||
import socket as _socket
|
||
import subprocess
|
||
|
||
repo = _get(f"/repos/{repo_slug}")
|
||
if isinstance(repo, dict) and repo.get("error"):
|
||
return f"Repo '{repo_slug}' not found: {repo['error']}"
|
||
|
||
repo_root = _resolve_repo_path(repo)
|
||
if not repo_root:
|
||
hostname = _socket.gethostname()
|
||
return (
|
||
f"⚠ No accessible path found for repo '{repo_slug}' on host '{hostname}'.\n"
|
||
f"Register with: update_repo_path('{repo_slug}', '/path/to/repo')"
|
||
)
|
||
|
||
script = Path(__file__).parent.parent / "scripts" / "ingest_sbom.py"
|
||
cmd = [sys.executable, str(script), "--repo", repo_slug,
|
||
"--repo-path", repo_root, "--api-base", API_BASE]
|
||
|
||
if lockfile_path:
|
||
resolved = Path(repo_root) / lockfile_path
|
||
if not resolved.exists():
|
||
return f"⚠ Lockfile not found: {resolved}"
|
||
cmd += ["--lockfile", str(resolved)]
|
||
|
||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||
output = (result.stdout + result.stderr).strip()
|
||
if result.returncode != 0:
|
||
return f"ingest_sbom failed (exit {result.returncode}):\n{output}"
|
||
return output
|
||
|
||
|
||
@mcp.tool()
|
||
def get_licence_report() -> str:
|
||
"""Get a licence report across all ingested SBOM entries.
|
||
|
||
Returns packages grouped by SPDX licence identifier, with copyleft
|
||
flag (GPL/AGPL/LGPL/EUPL/CDDL/MPL) and repos using each licence.
|
||
"""
|
||
return json.dumps(_get("/sbom/report/licences"), indent=2)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Domain goals & repo goals (v0.7)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@mcp.tool()
|
||
def create_domain_goal(domain_slug: str, title: str, description: str) -> str:
|
||
"""Create a new domain goal and make it active (superseding any existing active goal).
|
||
|
||
A domain goal captures the high-level strategic intent for a domain. Only one
|
||
domain goal can be active at a time; creating a new active one supersedes the
|
||
previous active goal.
|
||
|
||
Args:
|
||
domain_slug: Slug of the domain (e.g. 'railiance', 'markitect')
|
||
title: Short goal title
|
||
description: Full description of the goal and its boundary conditions
|
||
"""
|
||
domains = _get("/domains", {"status": "active"})
|
||
domain = next((d for d in domains if d["slug"] == domain_slug), None)
|
||
if not domain:
|
||
return json.dumps({"error": f"Domain '{domain_slug}' not found"})
|
||
goal = _post("/domain-goals", {
|
||
"domain_id": domain["id"],
|
||
"title": title,
|
||
"description": description,
|
||
"status": "active",
|
||
})
|
||
_post("/progress", {
|
||
"event_type": "goal_created",
|
||
"summary": f"Domain goal created [{domain_slug}]: {title}",
|
||
"detail": {"goal_id": goal["id"], "domain_slug": domain_slug},
|
||
})
|
||
return json.dumps(goal, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def get_domain_goals(domain_slug: str, status: str | None = None) -> str:
|
||
"""List domain goals for a domain, optionally filtered by status.
|
||
|
||
Args:
|
||
domain_slug: Slug of the domain (e.g. 'railiance')
|
||
status: active | archived | superseded (omit for all)
|
||
"""
|
||
return json.dumps(_get("/domain-goals", {"domain_slug": domain_slug, "status": status}), indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def activate_domain_goal(goal_id: str) -> str:
|
||
"""Set a domain goal as the active goal, superseding any currently active one.
|
||
|
||
Args:
|
||
goal_id: UUID of the domain goal to activate
|
||
"""
|
||
goal = _post(f"/domain-goals/{goal_id}/activate", {})
|
||
_post("/progress", {
|
||
"event_type": "goal_activated",
|
||
"summary": f"Domain goal activated: {goal['title']}",
|
||
"detail": {"goal_id": goal_id, "domain_slug": goal.get("domain_slug")},
|
||
})
|
||
return json.dumps(goal, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def create_repo_goal(
|
||
repo_slug: str,
|
||
title: str,
|
||
description: str,
|
||
domain_goal_id: str | None = None,
|
||
priority: int = 100,
|
||
) -> str:
|
||
"""Create a new repository goal.
|
||
|
||
Repository goals capture what needs to be achieved in a specific repository.
|
||
Multiple active repo goals can coexist; priority (lower number = higher priority)
|
||
determines ordering. Optionally link to the parent domain goal.
|
||
|
||
Args:
|
||
repo_slug: Slug of the repository (e.g. 'railiance-bootstrap')
|
||
title: Short goal title
|
||
description: Full description including boundary conditions and scope
|
||
domain_goal_id: UUID of the parent domain goal (optional)
|
||
priority: Integer priority — lower numbers = higher priority (default 100)
|
||
"""
|
||
repos = _get("/repos")
|
||
repo = next((r for r in repos if r["slug"] == repo_slug), None)
|
||
if not repo:
|
||
return json.dumps({"error": f"Repo '{repo_slug}' not found"})
|
||
goal = _post("/repo-goals", {
|
||
"repo_id": repo["id"],
|
||
"title": title,
|
||
"description": description,
|
||
"domain_goal_id": domain_goal_id,
|
||
"priority": priority,
|
||
"status": "active",
|
||
})
|
||
_post("/progress", {
|
||
"event_type": "goal_created",
|
||
"summary": f"Repo goal created [{repo_slug}]: {title}",
|
||
"detail": {"goal_id": goal["id"], "repo_slug": repo_slug, "priority": priority},
|
||
})
|
||
return json.dumps(goal, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def get_repo_goals(repo_slug: str, status: str | None = None) -> str:
|
||
"""List repository goals for a repo, ordered by priority.
|
||
|
||
Args:
|
||
repo_slug: Slug of the repository (e.g. 'railiance-bootstrap')
|
||
status: active | paused | completed | archived (omit for all)
|
||
"""
|
||
return json.dumps(_get("/repo-goals", {"repo_slug": repo_slug, "status": status}), indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def update_repo_goal(
|
||
goal_id: str,
|
||
title: str | None = None,
|
||
description: str | None = None,
|
||
priority: int | None = None,
|
||
status: str | None = None,
|
||
domain_goal_id: str | None = None,
|
||
) -> str:
|
||
"""Update a repository goal (title, description, priority, status, or domain link).
|
||
|
||
Args:
|
||
goal_id: UUID of the repo goal
|
||
title: New title (optional)
|
||
description: New description (optional)
|
||
priority: New priority integer — lower = higher priority (optional)
|
||
status: active | paused | completed | archived (optional)
|
||
domain_goal_id: Link or re-link to a domain goal UUID (optional)
|
||
"""
|
||
updates: dict = {}
|
||
if title is not None:
|
||
updates["title"] = title
|
||
if description is not None:
|
||
updates["description"] = description
|
||
if priority is not None:
|
||
updates["priority"] = priority
|
||
if status is not None:
|
||
updates["status"] = status
|
||
if domain_goal_id is not None:
|
||
updates["domain_goal_id"] = domain_goal_id
|
||
goal = _patch(f"/repo-goals/{goal_id}", updates)
|
||
_post("/progress", {
|
||
"event_type": "goal_updated",
|
||
"summary": f"Repo goal updated: {goal['title']}",
|
||
"detail": {"goal_id": goal_id, "changes": list(updates.keys())},
|
||
})
|
||
return json.dumps(goal, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def get_repo_dispatch(repo_slug: str) -> str:
|
||
"""Return active workstreams, pending tasks, and goal for a repo.
|
||
|
||
Use this at the start of a repo agent session to discover what work is
|
||
pending without needing to read the full state summary or scan workplan
|
||
files. The response includes:
|
||
- active_goal: the highest-priority active repo goal
|
||
- active_workstreams: list of active workstreams with pending tasks
|
||
- human_interventions: tasks that need human input (needs_human=true)
|
||
- last_state_synced_at: when the repo was last synced to the hub
|
||
|
||
Args:
|
||
repo_slug: Slug of the repository (e.g. 'marki-docx')
|
||
"""
|
||
return json.dumps(_get(f"/repos/{repo_slug}/dispatch"), indent=2)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Agent Inbox (inter-agent message passing)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@mcp.tool()
|
||
def send_message(from_agent: str, to_agent: str, subject: str, body: str, thread_id: str | None = None) -> str:
|
||
"""Send a message from one agent to another (or 'broadcast' for all).
|
||
|
||
Use this to coordinate with other Claude instances — e.g. a worker agent
|
||
reporting status back to the orchestrator, or the hub agent dispatching
|
||
instructions to a domain agent.
|
||
|
||
Args:
|
||
from_agent: Sender identifier (e.g. 'hub', 'marki-docx', 'railiance')
|
||
to_agent: Recipient identifier or 'broadcast' for all agents
|
||
subject: Short subject line (max 500 chars)
|
||
body: Full message body (markdown supported)
|
||
thread_id: UUID of the root message to create a thread (optional)
|
||
"""
|
||
payload: dict = {"from_agent": from_agent, "to_agent": to_agent, "subject": subject, "body": body}
|
||
if thread_id:
|
||
payload["thread_id"] = thread_id
|
||
msg = _post("/messages/", payload)
|
||
return json.dumps(msg, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def get_messages(to_agent: str | None = None, from_agent: str | None = None, unread_only: bool = False, limit: int = 20) -> str:
|
||
"""List messages in the agent inbox.
|
||
|
||
Call this at session start to check for pending coordination messages.
|
||
|
||
Args:
|
||
to_agent: Filter by recipient (your agent name, or omit for all)
|
||
from_agent: Filter by sender (optional)
|
||
unread_only: Return only unread messages (default: False)
|
||
limit: Maximum number of messages to return (default: 20)
|
||
"""
|
||
params: dict = {"limit": limit, "unread_only": unread_only}
|
||
if to_agent:
|
||
params["to_agent"] = to_agent
|
||
if from_agent:
|
||
params["from_agent"] = from_agent
|
||
return json.dumps(_get("/messages/", params), indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def mark_message_read(message_id: str) -> str:
|
||
"""Mark an inbox message as read.
|
||
|
||
Args:
|
||
message_id: UUID of the message to mark as read
|
||
"""
|
||
return json.dumps(_patch(f"/messages/{message_id}/read", {}), indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def reply_to_message(message_id: str, from_agent: str, body: str) -> str:
|
||
"""Reply to a message. Marks the original as read and creates a reply in the same thread.
|
||
|
||
Args:
|
||
message_id: UUID of the message to reply to
|
||
from_agent: Your agent identifier
|
||
body: Reply body (markdown supported)
|
||
"""
|
||
return json.dumps(_post(f"/messages/{message_id}/reply", {"from_agent": from_agent, "body": body}), indent=2)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Capability Catalog & Requests
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@mcp.tool()
|
||
def register_capability(
|
||
domain: str,
|
||
capability_type: str,
|
||
title: str,
|
||
description: str | None = None,
|
||
keywords: list[str] | None = None,
|
||
) -> str:
|
||
"""Register a capability that a domain can provide. Used for routing requests.
|
||
|
||
Args:
|
||
domain: Domain slug (e.g. 'railiance', 'markitect')
|
||
capability_type: Category (e.g. 'infrastructure', 'api', 'data', 'security', 'documentation')
|
||
title: Short title for this capability
|
||
description: Longer description (optional)
|
||
keywords: List of keywords for routing (e.g. ['cluster', 'k8s', 'privacy'])
|
||
"""
|
||
entry = _post("/capability-catalog", {
|
||
"domain": domain,
|
||
"capability_type": capability_type,
|
||
"title": title,
|
||
"description": description,
|
||
"keywords": keywords or [],
|
||
})
|
||
return json.dumps(entry, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def list_capabilities(
|
||
domain: str | None = None,
|
||
capability_type: str | None = None,
|
||
) -> str:
|
||
"""Browse the capability catalog — what domains can provide.
|
||
|
||
Args:
|
||
domain: Filter by domain slug (optional)
|
||
capability_type: Filter by type (optional)
|
||
"""
|
||
return json.dumps(_get("/capability-catalog", {
|
||
"domain": domain,
|
||
"capability_type": capability_type,
|
||
}), indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def request_capability(
|
||
title: str,
|
||
description: str,
|
||
capability_type: str,
|
||
requesting_agent: str,
|
||
requesting_domain: str,
|
||
requesting_workstream_id: str | None = None,
|
||
priority: str = "medium",
|
||
blocking_task_id: str | None = None,
|
||
) -> str:
|
||
"""Request a capability from another domain. Auto-routes to the responsible
|
||
domain via the capability catalog. If no unique match, broadcasts to all.
|
||
|
||
Args:
|
||
title: Short title (e.g. 'Privacy idea instance on cluster')
|
||
description: Detailed description of what you need
|
||
capability_type: Category (e.g. 'infrastructure', 'api', 'data', 'security')
|
||
requesting_agent: Your agent identifier (e.g. 'net-kingdom-worker')
|
||
requesting_domain: Your domain slug (e.g. 'custodian')
|
||
requesting_workstream_id: UUID of your workstream (optional)
|
||
priority: low | medium | high | critical (default: medium)
|
||
blocking_task_id: UUID of the task blocked until this is fulfilled (optional)
|
||
"""
|
||
req = _post("/capability-requests", {
|
||
"title": title,
|
||
"description": description,
|
||
"capability_type": capability_type,
|
||
"requesting_agent": requesting_agent,
|
||
"requesting_domain": requesting_domain,
|
||
"requesting_workstream_id": requesting_workstream_id,
|
||
"priority": priority,
|
||
"blocking_task_id": blocking_task_id,
|
||
})
|
||
_post("/progress", {
|
||
"event_type": "capability_requested",
|
||
"summary": f"Capability requested: {title} ({capability_type})",
|
||
"author": requesting_agent,
|
||
"detail": {
|
||
"capability_request_id": req.get("id"),
|
||
"capability_type": capability_type,
|
||
"routed_to": req.get("fulfilling_domain_slug"),
|
||
},
|
||
})
|
||
return json.dumps(req, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def accept_capability_request(
|
||
request_id: str,
|
||
fulfilling_agent: str,
|
||
fulfilling_workstream_id: str | None = None,
|
||
) -> str:
|
||
"""Accept a capability request. Assigns yourself as the fulfilling agent.
|
||
|
||
Args:
|
||
request_id: UUID of the capability request
|
||
fulfilling_agent: Your agent identifier (e.g. 'railiance-worker')
|
||
fulfilling_workstream_id: UUID of your workstream for this work (optional)
|
||
"""
|
||
result = _post(f"/capability-requests/{request_id}/accept", {
|
||
"fulfilling_agent": fulfilling_agent,
|
||
"fulfilling_workstream_id": fulfilling_workstream_id,
|
||
})
|
||
return json.dumps(result, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def update_capability_request_status(
|
||
request_id: str,
|
||
status: str,
|
||
note: str | None = None,
|
||
) -> str:
|
||
"""Advance a capability request through its lifecycle.
|
||
|
||
On 'completed': auto-unblocks the blocking task if one was set.
|
||
|
||
Args:
|
||
request_id: UUID of the capability request
|
||
status: in_progress | ready_for_review | completed | rejected | withdrawn
|
||
note: Optional note (required for rejection, recommended for completion)
|
||
"""
|
||
result = _patch(f"/capability-requests/{request_id}/status", {
|
||
"status": status,
|
||
"note": note,
|
||
})
|
||
return json.dumps(result, indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def patch_capability_request(
|
||
request_id: str,
|
||
catalog_entry_id: Optional[str] = None,
|
||
priority: Optional[str] = None,
|
||
blocking_task_id: Optional[str] = None,
|
||
fulfilling_workstream_id: Optional[str] = None,
|
||
) -> dict:
|
||
"""Correct mutable metadata on a capability request.
|
||
|
||
Correcting catalog_entry_id automatically re-derives the fulfilling domain.
|
||
Use this when the hub mis-routed a request (wrong catalog entry or domain).
|
||
Only provided (non-None) fields are updated.
|
||
|
||
Args:
|
||
request_id: UUID of the capability request to patch.
|
||
catalog_entry_id: Correct catalog entry UUID. Re-derives fulfilling domain.
|
||
priority: New priority (low/medium/high/critical).
|
||
blocking_task_id: UUID of the task this request unblocks on completion.
|
||
fulfilling_workstream_id: UUID of the workstream delivering this capability.
|
||
|
||
Returns:
|
||
Updated capability request dict, or {"error": "..."}.
|
||
"""
|
||
body: dict = {}
|
||
if catalog_entry_id is not None:
|
||
body["catalog_entry_id"] = catalog_entry_id
|
||
if priority is not None:
|
||
body["priority"] = priority
|
||
if blocking_task_id is not None:
|
||
body["blocking_task_id"] = blocking_task_id
|
||
if fulfilling_workstream_id is not None:
|
||
body["fulfilling_workstream_id"] = fulfilling_workstream_id
|
||
|
||
if not body:
|
||
return {"error": "no fields provided to patch"}
|
||
|
||
return json.dumps(_patch(f"/capability-requests/{request_id}", body), indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def list_capability_requests(
|
||
domain: str | None = None,
|
||
status: str | None = None,
|
||
capability_type: str | None = None,
|
||
) -> str:
|
||
"""List capability requests with optional filters.
|
||
|
||
Args:
|
||
domain: Filter by requesting OR fulfilling domain slug
|
||
status: Filter by status (requested/accepted/in_progress/ready_for_review/completed/rejected/withdrawn)
|
||
capability_type: Filter by capability type
|
||
"""
|
||
return json.dumps(_get("/capability-requests", {
|
||
"domain": domain,
|
||
"status": status,
|
||
"capability_type": capability_type,
|
||
}), indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def get_capability_request(request_id: str) -> str:
|
||
"""Get a single capability request by ID.
|
||
|
||
Args:
|
||
request_id: UUID of the capability request
|
||
"""
|
||
return json.dumps(_get(f"/capability-requests/{request_id}"), indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def dispute_capability_routing(
|
||
request_id: str,
|
||
reason: str,
|
||
disputed_by: str,
|
||
suggested_domain: Optional[str] = None,
|
||
) -> str:
|
||
"""Flag a capability request routing as incorrect. Transitions to routing_disputed.
|
||
|
||
Args:
|
||
request_id: UUID of the capability request
|
||
reason: Why the routing is wrong
|
||
disputed_by: Agent raising the dispute (e.g. 'netkingdom-worker')
|
||
suggested_domain: The domain slug this should be routed to (optional)
|
||
"""
|
||
return json.dumps(_post(f"/capability-requests/{request_id}/dispute", {
|
||
"reason": reason,
|
||
"disputed_by": disputed_by,
|
||
"suggested_domain": suggested_domain,
|
||
}), indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def reroute_capability_request(
|
||
request_id: str,
|
||
note: str,
|
||
rerouted_by: str,
|
||
domain: Optional[str] = None,
|
||
catalog_entry_id: Optional[str] = None,
|
||
) -> str:
|
||
"""Re-route a disputed capability request to a new domain. Resets to requested.
|
||
|
||
Args:
|
||
request_id: UUID of the capability request (must be routing_disputed)
|
||
note: Reason for the re-routing decision
|
||
rerouted_by: Agent performing the re-route (e.g. 'custodian')
|
||
domain: Target domain slug (used if catalog_entry_id not provided)
|
||
catalog_entry_id: Preferred — UUID of catalog entry; re-derives domain automatically
|
||
"""
|
||
return json.dumps(_post(f"/capability-requests/{request_id}/reroute", {
|
||
"note": note,
|
||
"rerouted_by": rerouted_by,
|
||
"domain": domain,
|
||
"catalog_entry_id": catalog_entry_id,
|
||
}), indent=2)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Third-Party Services Catalog (TPSC)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@mcp.tool()
|
||
def register_service(
|
||
slug: str,
|
||
name: str,
|
||
provider: str | None = None,
|
||
category: str | None = None,
|
||
pricing_model: str = "unknown",
|
||
gdpr_maturity: str = "unknown",
|
||
gdpr_notes: str | None = None,
|
||
dpa_available: bool = False,
|
||
tos_url: str | None = None,
|
||
privacy_policy_url: str | None = None,
|
||
data_processing_regions: list[str] | None = None,
|
||
data_retention_notes: str | None = None,
|
||
website_url: str | None = None,
|
||
) -> str:
|
||
"""Register or update a service in the Third-Party Services Catalog (TPSC).
|
||
|
||
GDPR maturity scale (CNIL/IAPP CMMI-aligned):
|
||
unknown | non_compliant | initial | developing | defined | managed | certified
|
||
|
||
Pricing model: free | paid | freemium | usage_based | unknown
|
||
|
||
Args:
|
||
slug: Unique identifier (e.g. 'openai-api', 'stripe')
|
||
name: Human-readable service name
|
||
provider: Company/organisation name
|
||
category: Category (e.g. 'llm_inference', 'storage', 'payments', 'search')
|
||
pricing_model: free | paid | freemium | usage_based | unknown
|
||
gdpr_maturity: GDPR compliance maturity level (see scale above)
|
||
gdpr_notes: Free-text GDPR notes (DPA details, transfer mechanisms, etc.)
|
||
dpa_available: Whether a Data Processing Agreement is available
|
||
tos_url: Terms of Service URL
|
||
privacy_policy_url: Privacy Policy URL
|
||
data_processing_regions: List of regions where data is processed (e.g. ['us', 'eu'])
|
||
data_retention_notes: Data retention policy summary
|
||
website_url: Service website URL
|
||
"""
|
||
return json.dumps(_post("/tpsc/catalog", {
|
||
"slug": slug,
|
||
"name": name,
|
||
"provider": provider,
|
||
"category": category,
|
||
"website_url": website_url,
|
||
"pricing_model": pricing_model,
|
||
"gdpr_maturity": gdpr_maturity,
|
||
"gdpr_notes": gdpr_notes,
|
||
"dpa_available": dpa_available,
|
||
"tos_url": tos_url,
|
||
"privacy_policy_url": privacy_policy_url,
|
||
"data_processing_regions": data_processing_regions or [],
|
||
"data_retention_notes": data_retention_notes,
|
||
}), indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def list_services(
|
||
gdpr_maturity: str | None = None,
|
||
category: str | None = None,
|
||
pricing_model: str | None = None,
|
||
) -> str:
|
||
"""Browse the Third-Party Services Catalog (TPSC).
|
||
|
||
Returns services with their GDPR maturity level and gdpr_warning flag
|
||
(True when maturity is unknown, non_compliant, or initial — may limit
|
||
use in corporate/GDPR-regulated environments).
|
||
|
||
Args:
|
||
gdpr_maturity: Filter by maturity level (unknown/non_compliant/initial/developing/defined/managed/certified)
|
||
category: Filter by category (e.g. 'llm_inference', 'storage')
|
||
pricing_model: Filter by pricing model (free/paid/freemium/usage_based/unknown)
|
||
"""
|
||
return json.dumps(_get("/tpsc/catalog", {
|
||
"gdpr_maturity": gdpr_maturity,
|
||
"category": category,
|
||
"pricing_model": pricing_model,
|
||
}), indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def ingest_tpsc_tool(repo_slug: str) -> str:
|
||
"""Ingest tpsc.yaml service dependency declarations for a repo.
|
||
|
||
Reads <repo_root>/tpsc.yaml, resolves service slugs against the catalog,
|
||
and creates a new TPSC snapshot. The repo path is resolved the same way
|
||
as the SBOM ingest tool (host_paths → local_path with existence check).
|
||
|
||
Args:
|
||
repo_slug: Registered repo slug (e.g. 'llm-connect', 'markitect-project')
|
||
"""
|
||
import socket as _socket
|
||
import subprocess
|
||
|
||
repo = _get(f"/repos/{repo_slug}")
|
||
if isinstance(repo, dict) and repo.get("error"):
|
||
return f"Repo '{repo_slug}' not found: {repo['error']}"
|
||
|
||
repo_root = _resolve_repo_path(repo)
|
||
if not repo_root:
|
||
hostname = _socket.gethostname()
|
||
return (
|
||
f"⚠ No accessible path found for repo '{repo_slug}' on host '{hostname}'.\n"
|
||
f"Register with: update_repo_path('{repo_slug}', '/path/to/repo')"
|
||
)
|
||
|
||
script = Path(__file__).parent.parent / "scripts" / "ingest_tpsc.py"
|
||
result = subprocess.run(
|
||
["uv", "run", "python", str(script), "--repo", repo_slug],
|
||
capture_output=True, text=True,
|
||
cwd=str(Path(__file__).parent.parent),
|
||
)
|
||
output = result.stdout + result.stderr
|
||
if result.returncode != 0:
|
||
return f"ingest_tpsc failed (exit {result.returncode}):\n{output}"
|
||
return output.strip()
|
||
|
||
|
||
@mcp.tool()
|
||
def get_gdpr_report() -> str:
|
||
"""Get an aggregated GDPR compliance report across all repos' latest TPSC snapshots.
|
||
|
||
Returns a warning summary for services with gdpr_maturity in:
|
||
unknown | non_compliant | initial
|
||
|
||
These may limit usability in GDPR-regulated / corporate environments.
|
||
Services at 'developing' or above have at least a DPA available.
|
||
"""
|
||
return json.dumps(_get("/tpsc/report/gdpr"), indent=2)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Repository Definition of Integrated (DoI)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@mcp.tool()
|
||
def check_repo_doi(repo_slug: str) -> str:
|
||
"""Evaluate the 14 DoI criteria for a repo and return a full report.
|
||
|
||
Criteria are grouped into three tiers:
|
||
Core (C1–C4): registered, domain, path, remote URL
|
||
Standard (C5–C9): SCOPE.md, CLAUDE.md, workplan, SBOM, TPSC
|
||
Full (C10–C14): repo goal, capabilities, agents, clean consistency, host paths
|
||
|
||
Status values: pass | fail | warn | skip
|
||
|
||
The 'tier' field shows the highest tier where ALL criteria pass or warn:
|
||
none | core | standard | full
|
||
|
||
Args:
|
||
repo_slug: Registered repo slug (e.g. 'llm-connect', 'the-custodian')
|
||
"""
|
||
return json.dumps(_get(f"/repos/{repo_slug}/doi"), indent=2)
|
||
|
||
|
||
@mcp.tool()
|
||
def get_doi_summary() -> str:
|
||
"""Return DoI tier for all active repos, sorted worst-first.
|
||
|
||
Useful at session start to spot repos that need integration work.
|
||
Tiers: none (red) → core → standard → full (green).
|
||
"""
|
||
return json.dumps(_get("/repos/doi/summary"), indent=2)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Entry point
|
||
# ---------------------------------------------------------------------------
|
||
|
||
if __name__ == "__main__":
|
||
transport = os.environ.get("MCP_TRANSPORT", "stdio")
|
||
if transport == "stdio":
|
||
mcp.run(transport="stdio")
|
||
else:
|
||
port = int(os.environ.get("MCP_PORT", "8001"))
|
||
mcp.run(transport=transport, host="127.0.0.1", port=port)
|