Files
state-hub/mcp_server/server.py
tegwick af3fdfde80 feat(token-tracking): introduce token note taxonomy (measured/userbased/workplan/heuristic)
Tier 1 (exact counts) now defaults to note="measured" instead of null,
signalling the counts were read from the Claude Code status bar.
Callers can pass note="userbased" when a human provided the numbers.

  measured  — agent read exact counts from the Claude Code status bar
  userbased — counts provided by a human
  workplan  — prorated from workplan total across task count
  heuristic — server fallback, 1000/500, no agent input

Added token_note field to TaskUpdate schema and exposed note param on
update_task_status and record_interactive_task MCP tools.
TOOLS.md documents the full taxonomy. 185 tests pass.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-29 18:47:40 +02:00

2475 lines
88 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Custodian State Hub MCP Server (stdio).
Thin HTTP client over the FastAPI service — no direct DB access.
All business logic stays in the API; this layer is stateless.
"""
from __future__ import annotations
import json
import os
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
from uuid import UUID
import httpx
from fastmcp import FastMCP
API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000").rstrip("/")
mcp = FastMCP(
name="state-hub",
instructions=(
"Custodian State Hub: tracks topics, workstreams, tasks, decisions, and progress events. "
"Start every session with get_state_summary() for orientation. "
"When working inside a single registered domain repo, prefer get_domain_summary(domain_slug) "
"— it returns the same actionable data scoped to that domain at ~10% of the token cost. "
"All writes emit a progress_event automatically."
),
)
# ---------------------------------------------------------------------------
# HTTP helpers
# ---------------------------------------------------------------------------
def _client() -> httpx.Client:
return httpx.Client(base_url=API_BASE, timeout=30.0, follow_redirects=True)
def _get(path: str, params: dict | None = None) -> Any:
if not path.endswith("/"):
path = path + "/"
try:
with _client() as c:
r = c.get(path, params={k: v for k, v in (params or {}).items() if v is not None})
r.raise_for_status()
return r.json()
except httpx.HTTPStatusError as e:
return {"error": f"API {e.response.status_code}: {e.response.text[:300]}"}
except Exception as e:
return {"error": f"Request failed: {e}"}
def _post(path: str, body: dict) -> Any:
if not path.endswith("/"):
path = path + "/"
try:
with _client() as c:
r = c.post(path, json={k: v for k, v in body.items() if v is not None})
r.raise_for_status()
return r.json()
except httpx.HTTPStatusError as e:
return {"error": f"API {e.response.status_code}: {e.response.text[:300]}"}
except Exception as e:
return {"error": f"Request failed: {e}"}
def _patch(path: str, body: dict) -> Any:
if not path.endswith("/"):
path = path + "/"
try:
with _client() as c:
r = c.patch(path, json={k: v for k, v in body.items() if v is not None})
r.raise_for_status()
return r.json()
except httpx.HTTPStatusError as e:
return {"error": f"API {e.response.status_code}: {e.response.text[:300]}"}
except Exception as e:
return {"error": f"Request failed: {e}"}
def _delete(path: str) -> None:
try:
with _client() as c:
r = c.delete(path)
r.raise_for_status()
except httpx.HTTPStatusError as e:
return {"error": f"API {e.response.status_code}: {e.response.text[:300]}"}
except Exception as e:
return {"error": f"Request failed: {e}"}
# ---------------------------------------------------------------------------
# Resources
# ---------------------------------------------------------------------------
@mcp.resource("state://summary")
def resource_summary() -> str:
"""Full StateSummary JSON — primary orientation resource."""
return json.dumps(_get("/state/summary"), indent=2)
@mcp.resource("state://topics")
def resource_topics() -> str:
"""Active topics list."""
return json.dumps(_get("/topics", {"status": "active"}), indent=2)
@mcp.resource("state://workstreams/{topic_slug}")
def resource_workstreams(topic_slug: str) -> str:
"""Workstreams for a topic (by slug)."""
topics = _get("/topics", {"status": "active"})
match = next((t for t in topics if t["slug"] == topic_slug), None)
if not match:
return json.dumps({"error": f"Topic '{topic_slug}' not found"})
return json.dumps(_get("/workstreams", {"topic_id": match["id"]}), indent=2)
@mcp.resource("state://decisions/blocking")
def resource_blocking_decisions() -> str:
"""All pending/escalated decisions."""
return json.dumps(
_get("/decisions", {"decision_type": "pending", "status": "open"}),
indent=2,
)
@mcp.resource("state://tasks/blocked")
def resource_blocked_tasks() -> str:
"""All tasks with status=blocked."""
return json.dumps(_get("/tasks", {"status": "blocked"}), indent=2)
# ---------------------------------------------------------------------------
# Query tools
# ---------------------------------------------------------------------------
@mcp.tool()
def get_state_summary() -> str:
"""Primary orientation tool. Call at the start of every session.
Returns a full snapshot: topic/workstream/task/decision totals, blocking
decisions, blocked tasks, open workstreams, and the 20 most recent events.
NOTE: This response is large (~10k tokens). When working inside a single
registered domain repo, use get_domain_summary(domain_slug) instead —
same actionable data scoped to one domain at ~10% of the token cost.
"""
return json.dumps(_get("/state/summary"), indent=2)
@mcp.tool()
def get_domain_summary(domain_slug: str) -> str:
"""Lightweight session orientation for a single domain.
Use this instead of get_state_summary() when working in a registered
domain repo — returns only what is relevant to the specified domain,
typically 80-90% fewer tokens than the full summary.
Args:
domain_slug: the domain slug, e.g. "railiance", "markitect"
Returns: topic, active workstreams, open blocking decisions for this
topic, 5 most recent progress events, repo SBOM status, and goal guidance
(needs_workplan signals + alignment warnings).
"""
topics = _get("/topics")
topic = next((t for t in topics if t.get("domain_slug") == domain_slug), None)
if not topic:
return json.dumps({"error": f"No topic found for domain '{domain_slug}'"})
topic_id = topic["id"]
workstreams = _get("/workstreams", {"topic_id": topic_id, "status": "active"})
blocking = _get("/decisions", {"decision_type": "pending", "topic_id": topic_id})
recent = _get("/progress", {"topic_id": topic_id, "limit": 5})
repos = _get("/repos", {"domain": domain_slug})
# ── Goal guidance ──────────────────────────────────────────────────────────
# Fetch active repo goals per repo, then cross-reference with workstreams.
repo_by_id = {r["id"]: r for r in repos}
ws_by_repo_goal: dict[str, list] = {}
for ws in workstreams:
if ws.get("repo_goal_id"):
ws_by_repo_goal.setdefault(ws["repo_goal_id"], []).append(ws)
# repo_id → list of active workstreams (for alignment check)
ws_by_repo: dict[str, list] = {}
for ws in workstreams:
if ws.get("repo_id"):
ws_by_repo.setdefault(ws["repo_id"], []).append(ws)
needs_workplan: list[dict] = [] # active goal with no linked workstream
alignment_warnings: list[dict] = [] # workstreams not linked to active goal
for repo in repos:
repo_slug = repo["slug"]
repo_id = repo["id"]
active_goals = _get("/repo-goals", {"repo_slug": repo_slug, "status": "active"})
if not active_goals:
continue
active_goal_ids = {g["id"] for g in active_goals}
for goal in active_goals:
linked = ws_by_repo_goal.get(goal["id"], [])
if not linked:
needs_workplan.append({
"repo_slug": repo_slug,
"goal_id": goal["id"],
"goal_title": goal["title"],
"goal_description": goal["description"],
"priority": goal["priority"],
"action": (
f"No workstream is linked to repo goal '{goal['title']}'. "
"Create a workplan file in workplans/ and register a workstream "
f"with repo_goal_id='{goal['id']}' to start delivering this goal."
),
})
# Check if repo has active workstreams not tied to any active goal
repo_ws = ws_by_repo.get(repo_id, [])
unlinked_ws = [
ws for ws in repo_ws
if ws.get("repo_goal_id") not in active_goal_ids
]
if unlinked_ws:
# Most recently updated workstream = the one to suggest continuing
recent_ws = max(unlinked_ws, key=lambda w: w.get("updated_at", ""))
alignment_warnings.append({
"repo_slug": repo_slug,
"recent_workstream_id": recent_ws["id"],
"recent_workstream_title": recent_ws["title"],
"active_goal_titles": [g["title"] for g in active_goals],
"message": (
f"Workstream '{recent_ws['title']}' is not linked to the current "
f"repo goal(s) for {repo_slug}. "
"Continue this workstream if the work is still relevant, but verify "
"alignment with the active goal before committing to new tasks."
),
})
goal_guidance: dict = {}
if needs_workplan or alignment_warnings:
goal_guidance = {
"needs_workplan": needs_workplan,
"alignment_warnings": alignment_warnings,
}
result: dict = {
"domain": domain_slug,
"topic_id": topic_id,
"topic_title": topic["title"],
"workstreams": workstreams,
"blocking_decisions": blocking,
"recent_progress": recent,
"repos": [{"slug": r["slug"], "last_sbom_at": r.get("last_sbom_at")} for r in repos],
}
if goal_guidance:
result["goal_guidance"] = goal_guidance
return json.dumps(result, indent=2)
@mcp.tool()
def get_topic(slug: str) -> str:
"""Return a topic (with workstreams) by slug, plus its recent progress events."""
topics = _get("/topics")
match = next((t for t in topics if t["slug"] == slug), None)
if not match:
return json.dumps({"error": f"Topic '{slug}' not found"})
topic_detail = _get(f"/topics/{match['id']}")
recent = _get("/progress", {"topic_id": match["id"], "limit": 10})
return json.dumps({"topic": topic_detail, "recent_progress": recent}, indent=2)
@mcp.tool()
def create_topic(slug: str, title: str, domain: str, description: str | None = None) -> str:
"""Create a new topic under an existing domain.
Args:
slug: URL-safe identifier, e.g. "inter_hub" (must be unique).
title: Human-readable name, e.g. "Inter-Hub Federation".
domain: Domain slug the topic belongs to, e.g. "custodian".
description: Optional one-sentence description.
Returns the created TopicRead on success, or an error dict if the slug
already exists or the domain is not found.
"""
payload: dict = {"slug": slug, "title": title, "domain": domain}
if description:
payload["description"] = description
return json.dumps(_post("/topics", payload), indent=2)
@mcp.tool()
def list_tasks(workstream_id: str, status: str | None = None) -> str:
"""List all tasks in a workstream, optionally filtered by status.
Args:
workstream_id: UUID of the workstream (required).
status: Optional filter — todo | in_progress | blocked | done | cancelled.
Returns [{id, title, status, priority, assignee, due_date, needs_human}] for every
matching task. Use this to look up task UUIDs before calling update_task_status,
or to check which tasks from a workplan file are already synced to the DB.
"""
return json.dumps(_get("/tasks", {"workstream_id": workstream_id, "status": status}), indent=2)
@mcp.tool()
def list_blocked_tasks(workstream_id: str | None = None) -> str:
"""List all tasks with status=blocked, optionally filtered by workstream_id."""
return json.dumps(_get("/tasks", {"status": "blocked", "workstream_id": workstream_id}), indent=2)
@mcp.tool()
def list_pending_decisions(topic_id: str | None = None) -> str:
"""List pending decisions sorted by deadline (nulls last).
Optionally filter by topic_id. Escalated decisions are included and
highlighted by their escalation_note.
"""
results = _get("/decisions", {"decision_type": "pending", "topic_id": topic_id})
return json.dumps(results, indent=2)
@mcp.tool()
def get_recent_progress(limit: int = 20, since: str | None = None) -> str:
"""Retrieve recent progress events to reconstruct session history.
Args:
limit: max events to return (default 20)
since: ISO datetime string — only events after this timestamp
"""
return json.dumps(_get("/progress", {"limit": limit, "since": since}), indent=2)
# ---------------------------------------------------------------------------
# Mutate tools
# ---------------------------------------------------------------------------
@mcp.tool()
def create_workstream(
topic_id: str,
title: str,
slug: str | None = None,
description: str | None = None,
owner: str | None = None,
due_date: str | None = None,
repo_id: str | None = None,
) -> str:
"""Create a new workstream under a topic and emit a progress_event.
Args:
topic_id: UUID of the parent topic
title: workstream title
slug: URL-friendly identifier (auto-generated from title if omitted)
description: optional longer description
owner: optional owner name
due_date: optional ISO date string (YYYY-MM-DD)
repo_id: UUID of the owning repository (GEMS primary; strongly recommended per ADR-001)
"""
if not slug:
slug = re.sub(r"[^a-z0-9]+", "-", title.lower()).strip("-")
ws = _post("/workstreams", {
"topic_id": topic_id,
"title": title,
"slug": slug,
"description": description,
"owner": owner,
"due_date": due_date,
"status": "active",
"repo_id": repo_id,
})
_post("/progress", {
"topic_id": topic_id,
"workstream_id": ws["id"],
"event_type": "workstream_created",
"summary": f"Workstream created: {title}",
"author": "custodian",
"detail": {"owner": owner, "slug": slug},
})
return json.dumps(ws, indent=2)
@mcp.tool()
def create_task(
workstream_id: str,
title: str,
priority: str = "medium",
description: str | None = None,
assignee: str | None = None,
due_date: str | None = None,
) -> str:
"""Create a new task and emit a progress_event.
Args:
workstream_id: UUID of the parent workstream
title: task title
priority: low | medium | high | critical
description: optional longer description
assignee: optional assignee name
due_date: optional ISO date string (YYYY-MM-DD)
"""
task = _post("/tasks", {
"workstream_id": workstream_id,
"title": title,
"priority": priority,
"description": description,
"assignee": assignee,
"due_date": due_date,
})
_post("/progress", {
"workstream_id": workstream_id,
"task_id": task["id"],
"event_type": "task_created",
"summary": f"Task created: {title}",
"author": "custodian",
"detail": {"priority": priority, "assignee": assignee},
})
return json.dumps(task, indent=2)
@mcp.tool()
def update_task_status(
task_id: str,
status: str,
blocking_reason: Optional[str] = None,
tokens_in: Optional[int] = None,
tokens_out: Optional[int] = None,
workplan_tokens_in: Optional[int] = None,
workplan_tokens_out: Optional[int] = None,
note: Optional[str] = None,
model: Optional[str] = None,
agent: Optional[str] = None,
session_id: Optional[str] = None,
) -> str:
"""Update a task's status. blocking_reason is required when status='blocked'.
When status='done', always records a token event using the best available data:
Tier 1 (best): pass tokens_in + tokens_out — exact counts from the session
note defaults to "measured"; pass note="userbased" if the
numbers were provided by a human rather than read from the bar
Tier 2: pass workplan_tokens_in + workplan_tokens_out — total workplan
effort prorated across task count (note="workplan")
Tier 3 (fallback): no token args — heuristic 1000 in / 500 out (note="heuristic")
Best practice: read tokens from the Claude Code status bar and pass exact counts.
Args:
task_id: UUID of the task
status: todo | in_progress | blocked | done | cancelled
blocking_reason: required when status=blocked
tokens_in: exact input token count for this task (Tier 1)
tokens_out: exact output token count for this task (Tier 1)
workplan_tokens_in: total input tokens for the whole workplan (Tier 2)
workplan_tokens_out: total output tokens for the whole workplan (Tier 2)
note: override the auto note — use "userbased" when counts came from a human;
omit to get the default ("measured" for Tier 1, "workplan"/"heuristic" otherwise)
model: model identifier, e.g. 'claude-sonnet-4-6'
agent: agent name, e.g. 'custodian', 'ralph'
session_id: agent session identifier
"""
body: dict[str, Any] = {
"status": status,
"model": model,
"agent": agent,
"session_id": session_id,
}
if blocking_reason:
body["blocking_reason"] = blocking_reason
if tokens_in is not None:
body["tokens_in"] = tokens_in
if tokens_out is not None:
body["tokens_out"] = tokens_out
if workplan_tokens_in is not None:
body["workplan_tokens_in"] = workplan_tokens_in
if workplan_tokens_out is not None:
body["workplan_tokens_out"] = workplan_tokens_out
if note is not None:
body["token_note"] = note
task = _patch(f"/tasks/{task_id}", body)
_post("/progress", {
"task_id": task_id,
"workstream_id": task.get("workstream_id"),
"event_type": "task_status_changed",
"summary": f"Task status → {status}: {task['title']}",
"author": "custodian",
"detail": {"blocking_reason": blocking_reason},
})
return json.dumps(task, indent=2)
@mcp.tool()
def flag_for_human(task_id: str, note: str) -> str:
"""Flag a task as requiring human intervention.
Sets needs_human=True and records the required action as intervention_note.
Emits a progress event so the flag is visible in session history.
Args:
task_id: UUID of the task to flag
note: description of the action required from the human (required)
"""
task = _patch(f"/tasks/{task_id}", {
"needs_human": True,
"intervention_note": note,
})
_post("/progress", {
"task_id": task_id,
"workstream_id": task.get("workstream_id"),
"event_type": "task_flagged_human",
"summary": f"Task flagged for human intervention: {task['title']}",
"author": "custodian",
"detail": {"intervention_note": note},
})
return json.dumps(task, indent=2)
@mcp.tool()
def clear_human_flag(task_id: str) -> str:
"""Clear the human-intervention flag from a task.
Sets needs_human=False. The intervention_note is preserved as a
historical record. Call this after the human has completed the action.
Args:
task_id: UUID of the task to clear
"""
task = _patch(f"/tasks/{task_id}", {
"needs_human": False,
})
_post("/progress", {
"task_id": task_id,
"workstream_id": task.get("workstream_id"),
"event_type": "task_flag_cleared",
"summary": f"Human-intervention flag cleared: {task['title']}",
"author": "custodian",
})
return json.dumps(task, indent=2)
@mcp.tool()
def list_human_interventions(workstream_id: str | None = None) -> str:
"""List all tasks flagged for human intervention.
Returns tasks where needs_human=True, optionally filtered to one workstream.
Use this at session start to surface Bernd's action items.
Args:
workstream_id: optional UUID to scope results to one workstream
"""
return json.dumps(
_get("/tasks", {"needs_human": "true", "workstream_id": workstream_id}),
indent=2,
)
@mcp.tool()
def record_decision(
title: str,
decision_type: str = "pending",
topic_id: str | None = None,
workstream_id: str | None = None,
description: str | None = None,
rationale: str | None = None,
decided_by: str | None = None,
deadline: str | None = None,
) -> str:
"""Record a decision (made or pending).
Pending decisions touching financial/legal topics are auto-escalated per
constitution §4.
Args:
title: decision title
decision_type: made | pending
topic_id: optional topic UUID
workstream_id: optional workstream UUID (at least one required)
description: optional context
rationale: reasoning behind the decision
decided_by: person/agent who decided
deadline: ISO datetime string for when decision is needed
"""
decision = _post("/decisions", {
"title": title,
"decision_type": decision_type,
"topic_id": topic_id,
"workstream_id": workstream_id,
"description": description,
"rationale": rationale,
"decided_by": decided_by,
"deadline": deadline,
})
_post("/progress", {
"topic_id": topic_id,
"workstream_id": workstream_id,
"decision_id": decision["id"],
"event_type": "decision_recorded",
"summary": f"Decision recorded ({decision_type}): {title}",
"author": "custodian",
"detail": {"status": decision.get("status"), "escalation_note": decision.get("escalation_note")},
})
return json.dumps(decision, indent=2)
@mcp.tool()
def resolve_decision(
decision_id: str,
rationale: str,
decided_by: str,
) -> str:
"""Mark a decision as resolved.
Args:
decision_id: UUID of the decision
rationale: final reasoning/outcome
decided_by: who resolved it
"""
decision = _patch(f"/decisions/{decision_id}", {
"status": "resolved",
"decision_type": "made",
"rationale": rationale,
"decided_by": decided_by,
"decided_at": datetime.now(tz=timezone.utc).isoformat(),
})
_post("/progress", {
"topic_id": decision.get("topic_id"),
"workstream_id": decision.get("workstream_id"),
"decision_id": decision_id,
"event_type": "decision_resolved",
"summary": f"Decision resolved by {decided_by}: {decision['title']}",
"author": "custodian",
"detail": {"rationale": rationale},
})
return json.dumps(decision, indent=2)
@mcp.tool()
def add_progress_event(
summary: str,
event_type: str = "note",
topic_id: str | None = None,
workstream_id: str | None = None,
task_id: str | None = None,
detail: dict | str | None = None,
) -> str:
"""Append a progress event to the log.
Args:
summary: human-readable summary of what happened
event_type: free-form label, e.g. note | milestone | blocker | insight
topic_id: optional topic UUID
workstream_id: optional workstream UUID
task_id: optional task UUID
detail: optional structured data (JSONB); accepts a dict or a JSON string
"""
if isinstance(detail, str):
try:
detail = json.loads(detail)
except (json.JSONDecodeError, ValueError):
detail = {"raw": detail}
event = _post("/progress", {
"topic_id": topic_id,
"workstream_id": workstream_id,
"task_id": task_id,
"event_type": event_type,
"summary": summary,
"author": "custodian",
"detail": detail,
})
return json.dumps(event, indent=2)
@mcp.tool()
def update_workstream_status(workstream_id: str, status: str) -> str:
"""Update a workstream's status.
Args:
workstream_id: UUID of the workstream
status: active | blocked | completed | archived
"""
ws = _patch(f"/workstreams/{workstream_id}", {"status": status})
_post("/progress", {
"workstream_id": workstream_id,
"topic_id": ws.get("topic_id"),
"event_type": "workstream_status_changed",
"summary": f"Workstream status → {status}: {ws['title']}",
"author": "custodian",
})
return json.dumps(ws, indent=2)
@mcp.tool()
def update_workstream(
workstream_id: str,
title: str | None = None,
description: str | None = None,
owner: str | None = None,
due_date: str | None = None,
repo_goal_id: str | None = None,
status: str | None = None,
) -> str:
"""Update fields on an existing workstream.
Args:
workstream_id: UUID of the workstream
title: new title (optional)
description: new description (optional)
owner: new owner (optional)
due_date: ISO date string YYYY-MM-DD (optional)
repo_goal_id: UUID of the repo goal to link (optional; pass empty string to clear)
status: active | blocked | completed | archived (optional)
"""
payload: dict = {}
if title is not None:
payload["title"] = title
if description is not None:
payload["description"] = description
if owner is not None:
payload["owner"] = owner
if due_date is not None:
payload["due_date"] = due_date
if status is not None:
payload["status"] = status
if repo_goal_id is not None:
payload["repo_goal_id"] = repo_goal_id if repo_goal_id else None
ws = _patch(f"/workstreams/{workstream_id}", payload)
return json.dumps(ws, indent=2)
# ---------------------------------------------------------------------------
# Next-steps suggestion tool (S2.3) — sanctioned write use case #2
# ---------------------------------------------------------------------------
@mcp.tool()
def get_next_steps() -> str:
"""Surface contextual next-action suggestions derived from hub state.
Returns suggestions based on:
- Recently resolved decisions → first open task in the same workstream
- Workstreams whose every dependency is now completed → first todo task
Each suggestion includes domain, workstream, task, and a plain-language
message. The hub surfaces *what* and *where* — the domain owns *how*.
This is one of the two sanctioned write-side use cases of the State Hub
(the other is resolve_decision). Suggestions are derived, not persisted.
"""
return json.dumps(_get("/state/next_steps"), indent=2)
# ---------------------------------------------------------------------------
# Dependency graph tools (S1.4)
# ---------------------------------------------------------------------------
@mcp.tool()
def create_dependency(
from_workstream_id: str,
to_workstream_id: str,
description: str | None = None,
) -> str:
"""Record that one workstream depends on another.
Semantics: from_workstream cannot fully proceed until to_workstream reaches
a satisfactory state.
Args:
from_workstream_id: UUID of the workstream that has the dependency
to_workstream_id: UUID of the workstream it depends on
description: optional human-readable explanation of the dependency
"""
dep = _post(f"/workstreams/{from_workstream_id}/dependencies", {
"to_workstream_id": to_workstream_id,
"description": description,
})
return json.dumps(dep, indent=2)
@mcp.tool()
def list_dependencies(workstream_id: str) -> str:
"""Return all dependency edges touching a workstream (both directions).
The response distinguishes edges where this workstream is the dependent
(depends_on) from edges where it is the blocker (blocks).
Args:
workstream_id: UUID of the workstream to inspect
"""
edges = _get(f"/workstreams/{workstream_id}/dependencies")
depends_on = [e for e in edges if e["from_workstream_id"] == workstream_id]
blocks = [e for e in edges if e["to_workstream_id"] == workstream_id]
return json.dumps({"depends_on": depends_on, "blocks": blocks}, indent=2)
# ---------------------------------------------------------------------------
# Extension points & technical debt
# ---------------------------------------------------------------------------
@mcp.tool()
def register_extension_point(
domain: str,
title: str,
ep_type: str,
description: str | None = None,
location: str | None = None,
priority: str = "medium",
ep_id: str | None = None,
topic_id: str | None = None,
workstream_id: str | None = None,
) -> str:
"""Register a discovered extension point — optional future functionality not yet committed.
Extension points capture design forks: things the system *could* do that
have been noticed and parked for deliberate later consideration.
Args:
domain: one of custodian | railiance | markitect | coulomb_social | personhood | foerster_capabilities
title: short description of the extension
ep_type: api | schema | mcp | dashboard | architecture | integration | other
description: longer explanation of what the extension would add
location: file:line or module where the extension point was noticed
priority: low | medium | high | critical
ep_id: optional human-readable ID, e.g. EP-CUST-001 (auto-assigned if omitted)
topic_id: UUID of related topic
workstream_id: UUID of related workstream
"""
ep = _post("/extension-points", {
"domain": domain, "title": title, "ep_type": ep_type,
"description": description, "location": location,
"priority": priority, "ep_id": ep_id,
"topic_id": topic_id, "workstream_id": workstream_id,
})
_post("/progress", {
"summary": f"Extension point registered: [{ep.get('ep_id') or ep['id'][:8]}] {title} ({ep_type}, {domain})",
"event_type": "extension_point",
"detail": {"id": ep["id"], "ep_id": ep.get("ep_id"), "ep_type": ep_type, "domain": domain},
})
return json.dumps(ep, indent=2)
@mcp.tool()
def list_extension_points(
domain: str | None = None,
status: str | None = None,
ep_type: str | None = None,
) -> str:
"""List extension points, optionally filtered.
Args:
domain: filter by domain
status: open | in_progress | addressed | deferred | wont_fix
ep_type: api | schema | mcp | dashboard | architecture | integration | other
"""
return json.dumps(_get("/extension-points", {
"domain": domain, "status": status, "ep_type": ep_type,
}), indent=2)
@mcp.tool()
def update_ep_status(ep_uuid: str, status: str) -> str:
"""Update the status of an extension point.
Args:
ep_uuid: UUID of the extension point
status: open | in_progress | addressed | deferred | wont_fix
"""
ep = _patch(f"/extension-points/{ep_uuid}", {"status": status})
_post("/progress", {
"summary": f"Extension point status → {status}: {ep['title']}",
"event_type": "extension_point",
"detail": {"id": ep_uuid, "status": status},
})
return json.dumps(ep, indent=2)
@mcp.tool()
def register_technical_debt(
domain: str,
title: str,
debt_type: str,
description: str | None = None,
location: str | None = None,
severity: str = "medium",
td_id: str | None = None,
topic_id: str | None = None,
workstream_id: str | None = None,
) -> str:
"""Register a technical debt item — a known quality compromise to address later.
Technical debt captures intentional or discovered shortcuts, design
weaknesses, missing tests, and similar issues that reduce codebase health.
Args:
domain: one of custodian | railiance | markitect | coulomb_social | personhood | foerster_capabilities
title: short description of the debt
debt_type: design | implementation | test | docs | dependencies | performance | security | other
description: what the issue is and what the correct fix would be
location: file:line or module where the debt lives
severity: low | medium | high | critical
td_id: optional human-readable ID, e.g. TD-CUST-001
topic_id: UUID of related topic
workstream_id: UUID of related workstream
"""
td = _post("/technical-debt", {
"domain": domain, "title": title, "debt_type": debt_type,
"description": description, "location": location,
"severity": severity, "td_id": td_id,
"topic_id": topic_id, "workstream_id": workstream_id,
})
_post("/progress", {
"summary": f"Technical debt registered: [{td.get('td_id') or td['id'][:8]}] {title} ({debt_type}, {severity}, {domain})",
"event_type": "technical_debt",
"detail": {"id": td["id"], "td_id": td.get("td_id"), "debt_type": debt_type, "severity": severity, "domain": domain},
})
return json.dumps(td, indent=2)
@mcp.tool()
def list_technical_debt(
domain: str | None = None,
status: str | None = None,
debt_type: str | None = None,
severity: str | None = None,
) -> str:
"""List technical debt items, optionally filtered.
Args:
domain: filter by domain
status: open | in_progress | resolved | deferred | wont_fix
debt_type: design | implementation | test | docs | dependencies | performance | security | other
severity: low | medium | high | critical
"""
return json.dumps(_get("/technical-debt", {
"domain": domain, "status": status,
"debt_type": debt_type, "severity": severity,
}), indent=2)
@mcp.tool()
def update_td_status(td_uuid: str, status: str) -> str:
"""Update the status of a technical debt item.
Args:
td_uuid: UUID of the technical debt item
status: open | in_progress | resolved | deferred | wont_fix
"""
td = _patch(f"/technical-debt/{td_uuid}", {"status": status})
_post("/progress", {
"summary": f"Technical debt status → {status}: {td['title']}",
"event_type": "technical_debt",
"detail": {"id": td_uuid, "status": status},
})
return json.dumps(td, indent=2)
# ---------------------------------------------------------------------------
# Domain lifecycle + repo registration tools (v0.5)
# ---------------------------------------------------------------------------
@mcp.tool()
def list_domains(status: str = "active") -> str:
"""List all registered domains.
Args:
status: active | archived | all (default: active)
"""
return json.dumps(_get("/domains", {"status": status}), indent=2)
@mcp.tool()
def create_domain(slug: str, name: str, description: str | None = None) -> str:
"""Create a new domain.
Args:
slug: URL-friendly identifier (lowercase, underscored), e.g. 'my_project'
name: Human-readable display name
description: optional longer description
"""
domain = _post("/domains", {"slug": slug, "name": name, "description": description})
_post("/progress", {
"event_type": "milestone",
"summary": f"Domain created: {slug} ({name})",
"author": "custodian",
"detail": {"slug": slug, "name": name},
})
return json.dumps(domain, indent=2)
@mcp.tool()
def rename_domain(slug: str, new_slug: str, new_name: str) -> str:
"""Rename a domain — cascades to EP/TD string columns.
Args:
slug: Current domain slug
new_slug: New URL-friendly identifier
new_name: New human-readable display name
"""
domain = _patch(f"/domains/{slug}/rename", {"new_slug": new_slug, "new_name": new_name})
_post("/progress", {
"event_type": "milestone",
"summary": f"Domain renamed: {slug}{new_slug} ({new_name})",
"author": "custodian",
"detail": {"old_slug": slug, "new_slug": new_slug, "new_name": new_name},
})
return json.dumps(domain, indent=2)
@mcp.tool()
def archive_domain(slug: str) -> str:
"""Archive a domain (soft-delete). Fails if active topics exist.
Args:
slug: Domain slug to archive
"""
domain = _patch(f"/domains/{slug}/archive", {})
_post("/progress", {
"event_type": "note",
"summary": f"Domain archived: {slug}",
"author": "custodian",
"detail": {"slug": slug},
})
return json.dumps(domain, indent=2)
@mcp.tool()
def list_domain_repos(domain_slug: str) -> str:
"""List all repositories registered under a domain.
Args:
domain_slug: Domain slug to filter by
"""
return json.dumps(_get("/repos", {"domain": domain_slug}), indent=2)
@mcp.tool()
def register_repo(
domain_slug: str,
name: str,
slug: str | None = None,
local_path: str | None = None,
remote_url: str | None = None,
description: str | None = None,
) -> str:
"""Register a git repository under a domain.
Args:
domain_slug: Domain slug (must already exist)
name: Human-readable repository name
slug: URL-friendly identifier (auto-generated from name if omitted)
local_path: Absolute local filesystem path to the repo
remote_url: Remote git URL (Gitea, GitHub, etc.)
description: optional description
"""
import re as _re
if not slug:
slug = _re.sub(r"[^a-z0-9]+", "-", name.lower()).strip("-")
repo = _post("/repos", {
"domain_slug": domain_slug,
"slug": slug,
"name": name,
"local_path": local_path,
"remote_url": remote_url,
"description": description,
})
_post("/progress", {
"event_type": "milestone",
"summary": f"Repo registered: {name} under domain '{domain_slug}'",
"author": "custodian",
"detail": {"slug": slug, "domain_slug": domain_slug, "local_path": local_path, "remote_url": remote_url},
})
return json.dumps(repo, indent=2)
@mcp.tool()
def update_repo_path(repo_slug: str, path: str, host: str | None = None) -> str:
"""Register or update the local filesystem path for a repo on a specific host.
Use this when a repo lives at a different absolute path on different machines
(e.g. /home/worsch/marki-docx on the workstation vs /home/tegwick/marki-docx
on custodiancore). The consistency checker will prefer the host-specific path
over the legacy local_path field.
Args:
repo_slug: Managed-repo slug (e.g. 'marki-docx')
path: Absolute local path on the target machine (e.g. '/home/tegwick/marki-docx')
host: Hostname to register the path for. Defaults to the current machine's hostname.
"""
import socket as _socket
if not host:
host = _socket.gethostname()
repo = _post(f"/repos/{repo_slug}/paths", {"host": host, "path": path})
return json.dumps(repo, indent=2)
# ---------------------------------------------------------------------------
# Shared path resolution helper
# ---------------------------------------------------------------------------
def _resolve_repo_path(repo: dict) -> str:
"""Return the best local filesystem path for *repo* on this host.
Resolution order — each candidate is expanded (supports ``~``) and
verified to exist before being accepted:
1. ``host_paths[hostname]`` — host-specific override
2. ``local_path`` — default fallback
Returns the resolved path string, or ``""`` if no valid path is found.
"""
import socket as _socket
hostname = _socket.gethostname()
host_paths = repo.get("host_paths") or {}
candidates = []
if host_paths.get(hostname):
candidates.append(host_paths[hostname])
if repo.get("local_path"):
candidates.append(repo["local_path"])
for raw in candidates:
resolved = str(Path(raw).expanduser())
if Path(resolved).is_dir():
return resolved
return ""
# ---------------------------------------------------------------------------
# Kaizen Agents
# ---------------------------------------------------------------------------
def _kaizen_agents_dir() -> Path:
"""Resolve the kaizen-agentic agents/ directory."""
repo = _get("/repos/kaizen-agentic")
base = _resolve_repo_path(repo)
if not base:
import socket as _socket
hostname = _socket.gethostname()
raise FileNotFoundError(
f"kaizen-agentic path not found on host '{hostname}'. "
"Register it with update_repo_path('kaizen-agentic', '/path/to/repo')."
)
agents_dir = Path(base) / "agents"
if not agents_dir.is_dir():
raise FileNotFoundError(f"agents/ directory not found at {agents_dir}")
return agents_dir
@mcp.tool()
def list_kaizen_agents(category: str | None = None) -> str:
"""List all available kaizen agent personas.
Reads agent metadata from kaizen-agentic/agents/agent-*.md frontmatter.
Each agent is a specialized instruction set Claude can load and follow.
Args:
category: Optional filter (e.g. 'testing', 'quality', 'process', 'infrastructure').
Returns all agents when omitted.
Returns:
JSON list of {name, description, category, file} objects.
"""
import re as _re
agents_dir = _kaizen_agents_dir()
result = []
for f in sorted(agents_dir.glob("agent-*.md")):
name = f.stem.removeprefix("agent-")
text = f.read_text(encoding="utf-8")
# Extract optional YAML frontmatter fields
fm_match = _re.match(r"^---\n(.*?\n)---\n", text, _re.DOTALL)
meta: dict = {}
if fm_match:
for line in fm_match.group(1).splitlines():
if ":" in line:
k, _, v = line.partition(":")
meta[k.strip()] = v.strip()
agent_category = meta.get("category", "")
if category and agent_category.lower() != category.lower():
continue
# Fall back to first non-empty line after frontmatter as description
desc = meta.get("description", "")
if not desc:
for line in text.split("\n"):
line = line.strip()
if line and not line.startswith("#") and not line.startswith("---"):
desc = line[:120]
break
result.append({"name": name, "description": desc, "category": agent_category, "file": f.name})
return json.dumps(result, indent=2)
@mcp.tool()
def get_kaizen_agent(name: str) -> str:
"""Load the full instructions for a kaizen agent persona.
Read the returned markdown and follow the instructions it contains.
Use list_kaizen_agents() to discover available agent names.
Args:
name: Agent name without 'agent-' prefix (e.g. 'tdd-workflow', 'code-refactoring').
Returns:
Full markdown content of the agent definition file.
"""
agents_dir = _kaizen_agents_dir()
agent_file = agents_dir / f"agent-{name}.md"
if not agent_file.exists():
available = [f.stem.removeprefix("agent-") for f in sorted(agents_dir.glob("agent-*.md"))]
return json.dumps({"error": f"Agent '{name}' not found.", "available": available})
return agent_file.read_text(encoding="utf-8")
# ---------------------------------------------------------------------------
# ADR-001 compliance validation
# ---------------------------------------------------------------------------
@mcp.tool()
def validate_repo_adr(repo_slug: str, domain_slug: str | None = None) -> str:
"""Check whether a repository is consistent with ADR-001.
Validates that workplan files exist in workplans/ with correct frontmatter,
that state_hub_workstream_id references resolve to real DB records, and that
no active state-hub workstreams for the domain lack a backing file (orphan
detection — DB-only records are an ADR-001 violation).
The repo path is resolved from the DB: host_paths[hostname] is tried first
(with existence check), then local_path — both support ~ expansion. This tool always runs against
the server's copy of the repo. Remote agents on a different branch should
sync first, or run validate_repo_adr.py locally with
--api-base http://127.0.0.1:18000.
Args:
repo_slug: Registered repo slug (e.g. 'the-custodian', 'ops-bridge').
domain_slug: Domain slug for orphan detection (e.g. 'custodian').
If omitted, inferred from workplan frontmatter.
"""
import socket as _socket
import subprocess
repo = _get(f"/repos/{repo_slug}")
if isinstance(repo, dict) and repo.get("error"):
return f"Repo '{repo_slug}' not found: {repo['error']}"
repo_path = _resolve_repo_path(repo)
if not repo_path:
hostname = _socket.gethostname()
return (
f"⚠ No accessible path found for repo '{repo_slug}' on host '{hostname}'.\n"
f"Register with: update_repo_path('{repo_slug}', '/path/to/repo')\n"
f"Remote agents: run validate_repo_adr.py locally with "
f"--api-base {API_BASE}"
)
script = Path(__file__).parent.parent / "scripts" / "validate_repo_adr.py"
cmd = [sys.executable, str(script), repo_path, "--json",
"--api-base", API_BASE]
if domain_slug:
cmd += ["--domain", domain_slug]
result = subprocess.run(cmd, capture_output=True, text=True)
try:
data = json.loads(result.stdout)
except json.JSONDecodeError:
return f"Validator script error:\n{result.stderr or result.stdout or '(no output)'}"
findings = data.get("findings", [])
summary = data.get("summary", {})
overall = data.get("result", "unknown")
failures = [f for f in findings if f["level"] == "FAIL"]
warnings = [f for f in findings if f["level"] == "WARN"]
lines = [f"ADR-001 Compliance: {repo_slug} ({repo_path})", ""]
if failures:
lines.append(f"FAILURES ({len(failures)}):")
for f in failures:
loc = f" [{f['file']}]" if f.get("file") else ""
lines.append(f" FAIL {f['check']}{loc}")
lines.append(f" {f['detail']}")
lines.append("")
if warnings:
lines.append(f"WARNINGS ({len(warnings)}):")
for f in warnings:
loc = f" [{f['file']}]" if f.get("file") else ""
lines.append(f" WARN {f['check']}{loc}")
lines.append(f" {f['detail']}")
lines.append("")
lines.append(
f"Summary: {summary.get('pass', 0)} pass | "
f"{summary.get('warn', 0)} warn | "
f"{summary.get('fail', 0)} fail"
)
lines.append(f"Result: {'FAIL' if overall == 'fail' else 'PASS (with warnings)' if overall == 'warn' else 'PASS'}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# ADR-001 consistency checking engine
# ---------------------------------------------------------------------------
@mcp.tool()
def check_repo_consistency(repo_slug: str, fix: bool = False) -> str:
"""Run ADR-001 consistency check for a registered repo.
Performs bidirectional checks between workplan files in the repo and the
state-hub DB. The file is always authoritative: drift is reported with the
file value as the expected value.
Checks: missing workplans/, parse errors, stale DB references, status/title
drift, unlinked workplans, orphan DB workstreams, repo mismatches, task
status drift, unlinked tasks, and orphan DB tasks.
Args:
repo_slug: Registered repo slug (e.g. 'the-custodian', 'activity-core').
fix: If True, apply auto-fixable issues: status drift (C-04), title drift
(C-05), create missing DB workstreams (C-06), repo mismatch (C-09),
task status drift (C-10), create unlinked tasks (C-11).
"""
import socket as _socket
import subprocess
# Pre-flight: verify this host has the repo path registered and accessible.
repo = _get(f"/repos/{repo_slug}")
if isinstance(repo, dict) and repo.get("error"):
return f"Repo '{repo_slug}' not found: {repo['error']}"
repo_path = _resolve_repo_path(repo)
if not repo_path:
hostname = _socket.gethostname()
return (
f"⚠ No accessible path found for repo '{repo_slug}' on host '{hostname}'.\n"
f"Register with: update_repo_path('{repo_slug}', '/path/to/repo')\n"
f"Remote agents: run consistency_check.py locally with "
f"--api-base {API_BASE}"
)
script = Path(__file__).parent.parent / "scripts" / "consistency_check.py"
cmd = [sys.executable, str(script), "--repo", repo_slug, "--json",
"--api-base", API_BASE]
if fix:
cmd.append("--fix")
result = subprocess.run(cmd, capture_output=True, text=True)
try:
data = json.loads(result.stdout)
except json.JSONDecodeError:
return f"Consistency check script error:\n{result.stderr or result.stdout or '(no output)'}"
issues = data.get("issues", [])
summary = data.get("summary", {})
overall = data.get("result", "unknown")
fixes = data.get("fixes_applied", [])
failures = [i for i in issues if i["severity"] == "FAIL"]
warnings = [i for i in issues if i["severity"] == "WARN"]
infos = [i for i in issues if i["severity"] == "INFO"]
lines = [
f"Consistency Check: {repo_slug}",
f"Path: {data.get('repo_path', '?')}",
"",
]
for sev, group in (("FAIL", failures), ("WARN", warnings), ("INFO", infos)):
if not group:
continue
lines.append(f"{sev}S ({len(group)}):")
for i in group:
loc = f" [{i['file_path']}]" if i.get("file_path") else ""
fix_tag = " [fixable]" if i.get("fixable") else ""
lines.append(f" {i['check_id']}{loc}{fix_tag}")
lines.append(f" {i['message']}")
lines.append("")
if fixes:
lines.append(f"Fixes applied ({len(fixes)}):")
for f in fixes:
lines.append(f" {f}")
lines.append("")
lines.append(
f"Summary: {summary.get('fail', 0)} fail | "
f"{summary.get('warn', 0)} warn | "
f"{summary.get('info', 0)} info"
)
lines.append(
f"Result: {'FAIL' if overall == 'fail' else 'PASS (with warnings)' if overall in ('warn',) else 'PASS'}"
)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Contribution tracking (v0.3)
# ---------------------------------------------------------------------------
@mcp.resource("state://contributions")
def resource_contributions() -> str:
"""All contribution artifacts (BR/FR/EP/UPR)."""
return json.dumps(_get("/contributions"), indent=2)
@mcp.resource("state://sbom/aggregated")
def resource_sbom_aggregated() -> str:
"""Aggregated SBOM entries across all repos."""
return json.dumps(_get("/sbom"), indent=2)
@mcp.resource("state://sbom/{repo_slug}")
def resource_sbom_repo(repo_slug: str) -> str:
"""SBOM view for a specific repo (by slug)."""
return json.dumps(_get(f"/sbom/{repo_slug}"), indent=2)
@mcp.tool()
def register_contribution(
type: str,
title: str,
target_org: str | None = None,
target_repo: str | None = None,
body_path: str | None = None,
related_workstream_id: str | None = None,
notes: str | None = None,
) -> str:
"""Register a new upstream contribution artifact (BR/FR/EP/UPR).
Args:
type: br | fr | ep | upr
title: Short human-readable title
target_org: GitHub org or owner of the upstream project
target_repo: Repository name of the upstream project
body_path: Relative path to the Markdown artifact file in the repo
related_workstream_id: UUID of the related workstream (optional)
notes: Any additional notes (optional)
"""
contrib = _post("/contributions", {
"type": type,
"title": title,
"target_org": target_org,
"target_repo": target_repo,
"body_path": body_path,
"related_workstream_id": related_workstream_id,
"notes": notes,
})
_post("/progress", {
"workstream_id": related_workstream_id,
"event_type": "contribution_registered",
"summary": f"Contribution registered [{type.upper()}]: {title}",
"author": "custodian",
"detail": {
"contribution_id": contrib["id"],
"type": type,
"target": f"{target_org}/{target_repo}" if target_org else target_repo,
"body_path": body_path,
},
})
return json.dumps(contrib, indent=2)
@mcp.tool()
def update_contribution_status(
contribution_id: str,
status: str,
notes: str | None = None,
) -> str:
"""Update the status of a contribution artifact.
Valid transitions: draft→submitted→acknowledged→accepted→merged
↘ ↘
rejected withdrawn
Args:
contribution_id: UUID of the contribution
status: submitted | acknowledged | accepted | rejected | merged | withdrawn
notes: Optional context for the status change
"""
contrib = _patch(f"/contributions/{contribution_id}/status", {
"status": status,
"notes": notes,
})
_post("/progress", {
"event_type": "contribution_status_changed",
"summary": f"Contribution status → {status}: {contrib['title']}",
"author": "custodian",
"detail": {"contribution_id": contribution_id, "status": status, "notes": notes},
})
return json.dumps(contrib, indent=2)
@mcp.tool()
def get_contributions(
type: str | None = None,
status: str | None = None,
target_repo: str | None = None,
) -> str:
"""List contribution artifacts, optionally filtered.
Args:
type: br | fr | ep | upr (optional)
status: draft | submitted | acknowledged | accepted | rejected | merged | withdrawn (optional)
target_repo: filter by upstream repo name (optional)
"""
return json.dumps(_get("/contributions", {
"type": type, "status": status, "target_repo": target_repo,
}), indent=2)
@mcp.tool()
def ingest_sbom_tool(repo_slug: str, lockfile_path: str | None = None) -> str:
"""Ingest a lockfile into the State Hub SBOM store for a repo.
Parses the lockfile and POSTs entries to /sbom/ingest/. Each call creates
a new SBOMSnapshot; previous snapshots are retained as history.
The repo root is resolved from the DB using the current machine's hostname
(host_paths[hostname] → local_path fallback). lockfile_path, when given,
is treated as relative to the repo root. Omit it to auto-detect the lockfile.
Args:
repo_slug: Managed-repo slug (must be registered via register_repo)
lockfile_path: Path to the lockfile, relative to repo root
(e.g. "uv.lock", "frontend/package-lock.json").
Omit to auto-detect from the repo root.
"""
import socket as _socket
import subprocess
repo = _get(f"/repos/{repo_slug}")
if isinstance(repo, dict) and repo.get("error"):
return f"Repo '{repo_slug}' not found: {repo['error']}"
repo_root = _resolve_repo_path(repo)
if not repo_root:
hostname = _socket.gethostname()
return (
f"⚠ No accessible path found for repo '{repo_slug}' on host '{hostname}'.\n"
f"Register with: update_repo_path('{repo_slug}', '/path/to/repo')"
)
script = Path(__file__).parent.parent / "scripts" / "ingest_sbom.py"
cmd = [sys.executable, str(script), "--repo", repo_slug,
"--repo-path", repo_root, "--api-base", API_BASE]
if lockfile_path:
resolved = Path(repo_root) / lockfile_path
if not resolved.exists():
return f"⚠ Lockfile not found: {resolved}"
cmd += ["--lockfile", str(resolved)]
result = subprocess.run(cmd, capture_output=True, text=True)
output = (result.stdout + result.stderr).strip()
if result.returncode != 0:
return f"ingest_sbom failed (exit {result.returncode}):\n{output}"
return output
@mcp.tool()
def get_licence_report() -> str:
"""Get a licence report across all ingested SBOM entries.
Returns packages grouped by SPDX licence identifier, with copyleft
flag (GPL/AGPL/LGPL/EUPL/CDDL/MPL) and repos using each licence.
"""
return json.dumps(_get("/sbom/report/licences"), indent=2)
# ---------------------------------------------------------------------------
# Domain goals & repo goals (v0.7)
# ---------------------------------------------------------------------------
@mcp.tool()
def create_domain_goal(domain_slug: str, title: str, description: str) -> str:
"""Create a new domain goal and make it active (superseding any existing active goal).
A domain goal captures the high-level strategic intent for a domain. Only one
domain goal can be active at a time; creating a new active one supersedes the
previous active goal.
Args:
domain_slug: Slug of the domain (e.g. 'railiance', 'markitect')
title: Short goal title
description: Full description of the goal and its boundary conditions
"""
domains = _get("/domains", {"status": "active"})
domain = next((d for d in domains if d["slug"] == domain_slug), None)
if not domain:
return json.dumps({"error": f"Domain '{domain_slug}' not found"})
goal = _post("/domain-goals", {
"domain_id": domain["id"],
"title": title,
"description": description,
"status": "active",
})
_post("/progress", {
"event_type": "goal_created",
"summary": f"Domain goal created [{domain_slug}]: {title}",
"detail": {"goal_id": goal["id"], "domain_slug": domain_slug},
})
return json.dumps(goal, indent=2)
@mcp.tool()
def get_domain_goals(domain_slug: str, status: str | None = None) -> str:
"""List domain goals for a domain, optionally filtered by status.
Args:
domain_slug: Slug of the domain (e.g. 'railiance')
status: active | archived | superseded (omit for all)
"""
return json.dumps(_get("/domain-goals", {"domain_slug": domain_slug, "status": status}), indent=2)
@mcp.tool()
def activate_domain_goal(goal_id: str) -> str:
"""Set a domain goal as the active goal, superseding any currently active one.
Args:
goal_id: UUID of the domain goal to activate
"""
goal = _post(f"/domain-goals/{goal_id}/activate", {})
_post("/progress", {
"event_type": "goal_activated",
"summary": f"Domain goal activated: {goal['title']}",
"detail": {"goal_id": goal_id, "domain_slug": goal.get("domain_slug")},
})
return json.dumps(goal, indent=2)
@mcp.tool()
def create_repo_goal(
repo_slug: str,
title: str,
description: str,
domain_goal_id: str | None = None,
priority: int = 100,
) -> str:
"""Create a new repository goal.
Repository goals capture what needs to be achieved in a specific repository.
Multiple active repo goals can coexist; priority (lower number = higher priority)
determines ordering. Optionally link to the parent domain goal.
Args:
repo_slug: Slug of the repository (e.g. 'railiance-bootstrap')
title: Short goal title
description: Full description including boundary conditions and scope
domain_goal_id: UUID of the parent domain goal (optional)
priority: Integer priority — lower numbers = higher priority (default 100)
"""
repos = _get("/repos")
repo = next((r for r in repos if r["slug"] == repo_slug), None)
if not repo:
return json.dumps({"error": f"Repo '{repo_slug}' not found"})
goal = _post("/repo-goals", {
"repo_id": repo["id"],
"title": title,
"description": description,
"domain_goal_id": domain_goal_id,
"priority": priority,
"status": "active",
})
_post("/progress", {
"event_type": "goal_created",
"summary": f"Repo goal created [{repo_slug}]: {title}",
"detail": {"goal_id": goal["id"], "repo_slug": repo_slug, "priority": priority},
})
return json.dumps(goal, indent=2)
@mcp.tool()
def get_repo_goals(repo_slug: str, status: str | None = None) -> str:
"""List repository goals for a repo, ordered by priority.
Args:
repo_slug: Slug of the repository (e.g. 'railiance-bootstrap')
status: active | paused | completed | archived (omit for all)
"""
return json.dumps(_get("/repo-goals", {"repo_slug": repo_slug, "status": status}), indent=2)
@mcp.tool()
def update_repo_goal(
goal_id: str,
title: str | None = None,
description: str | None = None,
priority: int | None = None,
status: str | None = None,
domain_goal_id: str | None = None,
) -> str:
"""Update a repository goal (title, description, priority, status, or domain link).
Args:
goal_id: UUID of the repo goal
title: New title (optional)
description: New description (optional)
priority: New priority integer — lower = higher priority (optional)
status: active | paused | completed | archived (optional)
domain_goal_id: Link or re-link to a domain goal UUID (optional)
"""
updates: dict = {}
if title is not None:
updates["title"] = title
if description is not None:
updates["description"] = description
if priority is not None:
updates["priority"] = priority
if status is not None:
updates["status"] = status
if domain_goal_id is not None:
updates["domain_goal_id"] = domain_goal_id
goal = _patch(f"/repo-goals/{goal_id}", updates)
_post("/progress", {
"event_type": "goal_updated",
"summary": f"Repo goal updated: {goal['title']}",
"detail": {"goal_id": goal_id, "changes": list(updates.keys())},
})
return json.dumps(goal, indent=2)
@mcp.tool()
def get_repo_dispatch(repo_slug: str) -> str:
"""Return active workstreams, pending tasks, and goal for a repo.
Use this at the start of a repo agent session to discover what work is
pending without needing to read the full state summary or scan workplan
files. The response includes:
- active_goal: the highest-priority active repo goal
- active_workstreams: list of active workstreams with pending tasks
- human_interventions: tasks that need human input (needs_human=true)
- last_state_synced_at: when the repo was last synced to the hub
Args:
repo_slug: Slug of the repository (e.g. 'marki-docx')
"""
return json.dumps(_get(f"/repos/{repo_slug}/dispatch"), indent=2)
# ---------------------------------------------------------------------------
# Agent Inbox (inter-agent message passing)
# ---------------------------------------------------------------------------
@mcp.tool()
def send_message(from_agent: str, to_agent: str, subject: str, body: str, thread_id: str | None = None) -> str:
"""Send a message from one agent to another (or 'broadcast' for all).
Use this to coordinate with other Claude instances — e.g. a worker agent
reporting status back to the orchestrator, or the hub agent dispatching
instructions to a domain agent.
Args:
from_agent: Sender identifier (e.g. 'hub', 'marki-docx', 'railiance')
to_agent: Recipient identifier or 'broadcast' for all agents
subject: Short subject line (max 500 chars)
body: Full message body (markdown supported)
thread_id: UUID of the root message to create a thread (optional)
"""
payload: dict = {"from_agent": from_agent, "to_agent": to_agent, "subject": subject, "body": body}
if thread_id:
payload["thread_id"] = thread_id
msg = _post("/messages/", payload)
return json.dumps(msg, indent=2)
@mcp.tool()
def get_messages(to_agent: str | None = None, from_agent: str | None = None, unread_only: bool = False, limit: int = 20) -> str:
"""List messages in the agent inbox.
Call this at session start to check for pending coordination messages.
Args:
to_agent: Filter by recipient (your agent name, or omit for all)
from_agent: Filter by sender (optional)
unread_only: Return only unread messages (default: False)
limit: Maximum number of messages to return (default: 20)
"""
params: dict = {"limit": limit, "unread_only": unread_only}
if to_agent:
params["to_agent"] = to_agent
if from_agent:
params["from_agent"] = from_agent
return json.dumps(_get("/messages/", params), indent=2)
@mcp.tool()
def mark_message_read(message_id: str) -> str:
"""Mark an inbox message as read.
Args:
message_id: UUID of the message to mark as read
"""
return json.dumps(_patch(f"/messages/{message_id}/read", {}), indent=2)
@mcp.tool()
def reply_to_message(message_id: str, from_agent: str, body: str) -> str:
"""Reply to a message. Marks the original as read and creates a reply in the same thread.
Args:
message_id: UUID of the message to reply to
from_agent: Your agent identifier
body: Reply body (markdown supported)
"""
return json.dumps(_post(f"/messages/{message_id}/reply", {"from_agent": from_agent, "body": body}), indent=2)
# ---------------------------------------------------------------------------
# Capability Catalog & Requests
# ---------------------------------------------------------------------------
@mcp.tool()
def register_capability(
domain: str,
capability_type: str,
title: str,
description: str | None = None,
keywords: list[str] | None = None,
) -> str:
"""Register a capability that a domain can provide. Used for routing requests.
Args:
domain: Domain slug (e.g. 'railiance', 'markitect')
capability_type: Category (e.g. 'infrastructure', 'api', 'data', 'security', 'documentation')
title: Short title for this capability
description: Longer description (optional)
keywords: List of keywords for routing (e.g. ['cluster', 'k8s', 'privacy'])
"""
entry = _post("/capability-catalog", {
"domain": domain,
"capability_type": capability_type,
"title": title,
"description": description,
"keywords": keywords or [],
})
return json.dumps(entry, indent=2)
@mcp.tool()
def list_capabilities(
domain: str | None = None,
capability_type: str | None = None,
) -> str:
"""Browse the capability catalog — what domains can provide.
Args:
domain: Filter by domain slug (optional)
capability_type: Filter by type (optional)
"""
return json.dumps(_get("/capability-catalog", {
"domain": domain,
"capability_type": capability_type,
}), indent=2)
@mcp.tool()
def request_capability(
title: str,
description: str,
capability_type: str,
requesting_agent: str,
requesting_domain: str,
requesting_workstream_id: str | None = None,
priority: str = "medium",
blocking_task_id: str | None = None,
) -> str:
"""Request a capability from another domain. Auto-routes to the responsible
domain via the capability catalog. If no unique match, broadcasts to all.
Args:
title: Short title (e.g. 'Privacy idea instance on cluster')
description: Detailed description of what you need
capability_type: Category (e.g. 'infrastructure', 'api', 'data', 'security')
requesting_agent: Your agent identifier (e.g. 'net-kingdom-worker')
requesting_domain: Your domain slug (e.g. 'custodian')
requesting_workstream_id: UUID of your workstream (optional)
priority: low | medium | high | critical (default: medium)
blocking_task_id: UUID of the task blocked until this is fulfilled (optional)
"""
req = _post("/capability-requests", {
"title": title,
"description": description,
"capability_type": capability_type,
"requesting_agent": requesting_agent,
"requesting_domain": requesting_domain,
"requesting_workstream_id": requesting_workstream_id,
"priority": priority,
"blocking_task_id": blocking_task_id,
})
_post("/progress", {
"event_type": "capability_requested",
"summary": f"Capability requested: {title} ({capability_type})",
"author": requesting_agent,
"detail": {
"capability_request_id": req.get("id"),
"capability_type": capability_type,
"routed_to": req.get("fulfilling_domain_slug"),
},
})
return json.dumps(req, indent=2)
@mcp.tool()
def accept_capability_request(
request_id: str,
fulfilling_agent: str,
fulfilling_workstream_id: str | None = None,
) -> str:
"""Accept a capability request. Assigns yourself as the fulfilling agent.
Args:
request_id: UUID of the capability request
fulfilling_agent: Your agent identifier (e.g. 'railiance-worker')
fulfilling_workstream_id: UUID of your workstream for this work (optional)
"""
result = _post(f"/capability-requests/{request_id}/accept", {
"fulfilling_agent": fulfilling_agent,
"fulfilling_workstream_id": fulfilling_workstream_id,
})
return json.dumps(result, indent=2)
@mcp.tool()
def update_capability_request_status(
request_id: str,
status: str,
note: str | None = None,
) -> str:
"""Advance a capability request through its lifecycle.
On 'completed': auto-unblocks the blocking task if one was set.
Args:
request_id: UUID of the capability request
status: in_progress | ready_for_review | completed | rejected | withdrawn
note: Optional note (required for rejection, recommended for completion)
"""
result = _patch(f"/capability-requests/{request_id}/status", {
"status": status,
"note": note,
})
return json.dumps(result, indent=2)
@mcp.tool()
def patch_capability_request(
request_id: str,
catalog_entry_id: Optional[str] = None,
priority: Optional[str] = None,
blocking_task_id: Optional[str] = None,
fulfilling_workstream_id: Optional[str] = None,
) -> dict:
"""Correct mutable metadata on a capability request.
Correcting catalog_entry_id automatically re-derives the fulfilling domain.
Use this when the hub mis-routed a request (wrong catalog entry or domain).
Only provided (non-None) fields are updated.
Args:
request_id: UUID of the capability request to patch.
catalog_entry_id: Correct catalog entry UUID. Re-derives fulfilling domain.
priority: New priority (low/medium/high/critical).
blocking_task_id: UUID of the task this request unblocks on completion.
fulfilling_workstream_id: UUID of the workstream delivering this capability.
Returns:
Updated capability request dict, or {"error": "..."}.
"""
body: dict = {}
if catalog_entry_id is not None:
body["catalog_entry_id"] = catalog_entry_id
if priority is not None:
body["priority"] = priority
if blocking_task_id is not None:
body["blocking_task_id"] = blocking_task_id
if fulfilling_workstream_id is not None:
body["fulfilling_workstream_id"] = fulfilling_workstream_id
if not body:
return {"error": "no fields provided to patch"}
return json.dumps(_patch(f"/capability-requests/{request_id}", body), indent=2)
@mcp.tool()
def list_capability_requests(
domain: str | None = None,
status: str | None = None,
capability_type: str | None = None,
) -> str:
"""List capability requests with optional filters.
Args:
domain: Filter by requesting OR fulfilling domain slug
status: Filter by status (requested/accepted/in_progress/ready_for_review/completed/rejected/withdrawn)
capability_type: Filter by capability type
"""
return json.dumps(_get("/capability-requests", {
"domain": domain,
"status": status,
"capability_type": capability_type,
}), indent=2)
@mcp.tool()
def get_capability_request(request_id: str) -> str:
"""Get a single capability request by ID.
Args:
request_id: UUID of the capability request
"""
return json.dumps(_get(f"/capability-requests/{request_id}"), indent=2)
@mcp.tool()
def dispute_capability_routing(
request_id: str,
reason: str,
disputed_by: str,
suggested_domain: Optional[str] = None,
) -> str:
"""Flag a capability request routing as incorrect. Transitions to routing_disputed.
Args:
request_id: UUID of the capability request
reason: Why the routing is wrong
disputed_by: Agent raising the dispute (e.g. 'netkingdom-worker')
suggested_domain: The domain slug this should be routed to (optional)
"""
return json.dumps(_post(f"/capability-requests/{request_id}/dispute", {
"reason": reason,
"disputed_by": disputed_by,
"suggested_domain": suggested_domain,
}), indent=2)
@mcp.tool()
def reroute_capability_request(
request_id: str,
note: str,
rerouted_by: str,
domain: Optional[str] = None,
catalog_entry_id: Optional[str] = None,
) -> str:
"""Re-route a disputed capability request to a new domain. Resets to requested.
Args:
request_id: UUID of the capability request (must be routing_disputed)
note: Reason for the re-routing decision
rerouted_by: Agent performing the re-route (e.g. 'custodian')
domain: Target domain slug (used if catalog_entry_id not provided)
catalog_entry_id: Preferred — UUID of catalog entry; re-derives domain automatically
"""
return json.dumps(_post(f"/capability-requests/{request_id}/reroute", {
"note": note,
"rerouted_by": rerouted_by,
"domain": domain,
"catalog_entry_id": catalog_entry_id,
}), indent=2)
# ---------------------------------------------------------------------------
# Third-Party Services Catalog (TPSC)
# ---------------------------------------------------------------------------
@mcp.tool()
def register_service(
slug: str,
name: str,
provider: str | None = None,
category: str | None = None,
pricing_model: str = "unknown",
gdpr_maturity: str = "unknown",
gdpr_notes: str | None = None,
dpa_available: bool = False,
tos_url: str | None = None,
privacy_policy_url: str | None = None,
data_processing_regions: list[str] | None = None,
data_retention_notes: str | None = None,
website_url: str | None = None,
) -> str:
"""Register or update a service in the Third-Party Services Catalog (TPSC).
GDPR maturity scale (CNIL/IAPP CMMI-aligned):
unknown | non_compliant | initial | developing | defined | managed | certified
Pricing model: free | paid | freemium | usage_based | unknown
Args:
slug: Unique identifier (e.g. 'openai-api', 'stripe')
name: Human-readable service name
provider: Company/organisation name
category: Category (e.g. 'llm_inference', 'storage', 'payments', 'search')
pricing_model: free | paid | freemium | usage_based | unknown
gdpr_maturity: GDPR compliance maturity level (see scale above)
gdpr_notes: Free-text GDPR notes (DPA details, transfer mechanisms, etc.)
dpa_available: Whether a Data Processing Agreement is available
tos_url: Terms of Service URL
privacy_policy_url: Privacy Policy URL
data_processing_regions: List of regions where data is processed (e.g. ['us', 'eu'])
data_retention_notes: Data retention policy summary
website_url: Service website URL
"""
return json.dumps(_post("/tpsc/catalog", {
"slug": slug,
"name": name,
"provider": provider,
"category": category,
"website_url": website_url,
"pricing_model": pricing_model,
"gdpr_maturity": gdpr_maturity,
"gdpr_notes": gdpr_notes,
"dpa_available": dpa_available,
"tos_url": tos_url,
"privacy_policy_url": privacy_policy_url,
"data_processing_regions": data_processing_regions or [],
"data_retention_notes": data_retention_notes,
}), indent=2)
@mcp.tool()
def list_services(
gdpr_maturity: str | None = None,
category: str | None = None,
pricing_model: str | None = None,
) -> str:
"""Browse the Third-Party Services Catalog (TPSC).
Returns services with their GDPR maturity level and gdpr_warning flag
(True when maturity is unknown, non_compliant, or initial — may limit
use in corporate/GDPR-regulated environments).
Args:
gdpr_maturity: Filter by maturity level (unknown/non_compliant/initial/developing/defined/managed/certified)
category: Filter by category (e.g. 'llm_inference', 'storage')
pricing_model: Filter by pricing model (free/paid/freemium/usage_based/unknown)
"""
return json.dumps(_get("/tpsc/catalog", {
"gdpr_maturity": gdpr_maturity,
"category": category,
"pricing_model": pricing_model,
}), indent=2)
@mcp.tool()
def ingest_tpsc_tool(repo_slug: str) -> str:
"""Ingest tpsc.yaml service dependency declarations for a repo.
Reads <repo_root>/tpsc.yaml, resolves service slugs against the catalog,
and creates a new TPSC snapshot. The repo path is resolved the same way
as the SBOM ingest tool (host_paths → local_path with existence check).
Args:
repo_slug: Registered repo slug (e.g. 'llm-connect', 'markitect-project')
"""
import socket as _socket
import subprocess
repo = _get(f"/repos/{repo_slug}")
if isinstance(repo, dict) and repo.get("error"):
return f"Repo '{repo_slug}' not found: {repo['error']}"
repo_root = _resolve_repo_path(repo)
if not repo_root:
hostname = _socket.gethostname()
return (
f"⚠ No accessible path found for repo '{repo_slug}' on host '{hostname}'.\n"
f"Register with: update_repo_path('{repo_slug}', '/path/to/repo')"
)
script = Path(__file__).parent.parent / "scripts" / "ingest_tpsc.py"
result = subprocess.run(
["uv", "run", "python", str(script), "--repo", repo_slug],
capture_output=True, text=True,
cwd=str(Path(__file__).parent.parent),
)
output = result.stdout + result.stderr
if result.returncode != 0:
return f"ingest_tpsc failed (exit {result.returncode}):\n{output}"
return output.strip()
@mcp.tool()
def get_gdpr_report() -> str:
"""Get an aggregated GDPR compliance report across all repos' latest TPSC snapshots.
Returns a warning summary for services with gdpr_maturity in:
unknown | non_compliant | initial
These may limit usability in GDPR-regulated / corporate environments.
Services at 'developing' or above have at least a DPA available.
"""
return json.dumps(_get("/tpsc/report/gdpr"), indent=2)
# ---------------------------------------------------------------------------
# Repository Definition of Integrated (DoI)
# ---------------------------------------------------------------------------
@mcp.tool()
def check_repo_doi(repo_slug: str) -> str:
"""Evaluate the 14 DoI criteria for a repo and return a full report.
Criteria are grouped into three tiers:
Core (C1C4): registered, domain, path, remote URL
Standard (C5C9): SCOPE.md, CLAUDE.md, workplan, SBOM, TPSC
Full (C10C14): repo goal, capabilities, agents, clean consistency, host paths
Status values: pass | fail | warn | skip
The 'tier' field shows the highest tier where ALL criteria pass or warn:
none | core | standard | full
Args:
repo_slug: Registered repo slug (e.g. 'llm-connect', 'the-custodian')
"""
return json.dumps(_get(f"/repos/{repo_slug}/doi"), indent=2)
@mcp.tool()
def get_doi_summary() -> str:
"""Return DoI tier for all active repos, sorted worst-first.
Useful at session start to spot repos that need integration work.
Tiers: none (red) → core → standard → full (green).
"""
return json.dumps(_get("/repos/doi/summary"), indent=2)
# ---------------------------------------------------------------------------
# Interactive / ad-hoc task recording
# ---------------------------------------------------------------------------
@mcp.tool()
def record_interactive_task(
title: str,
repo_slug: str,
tokens_in: Optional[int] = None,
tokens_out: Optional[int] = None,
note: Optional[str] = None,
model: Optional[str] = None,
agent: Optional[str] = None,
description: Optional[str] = None,
session_id: Optional[str] = None,
) -> str:
"""Record ad-hoc interactive work as a task with token consumption.
Finds or creates a persistent 'interactive-<repo>' workstream for the repo,
creates the task, marks it done immediately, and records a token event.
Token note convention:
"measured" — exact counts read from the Claude Code status bar (default when
tokens_in/tokens_out provided and note omitted)
"userbased" — counts provided by a human (pass note="userbased" explicitly)
"heuristic" — server fallback when no counts given (automatic)
Use this for work done outside a formal workplan: quick fixes, config changes,
code reviews, one-off investigations, or any session work worth tracking.
Args:
title: Short description of the work done
repo_slug: Registered repo slug, e.g. 'the-custodian', 'inter-hub'
tokens_in: Input token count (Tier 1 — read from Claude Code status bar)
tokens_out: Output token count (Tier 1)
note: Override token note — use "userbased" when counts came from a human
model: Model identifier, e.g. 'claude-sonnet-4-6'
agent: Agent name, e.g. 'custodian', 'ralph'
description: Optional longer description of what was done
session_id: Agent session identifier
"""
# Resolve repo
repos = _get("/repos/")
if isinstance(repos, dict) and "error" in repos:
return json.dumps(repos)
repo = next((r for r in (repos or []) if r.get("slug") == repo_slug), None)
if not repo:
return json.dumps({"error": f"Repo not found: {repo_slug!r}. Register it first with register_repo()."})
repo_id = repo["id"]
domain_slug = repo.get("domain_slug") or repo.get("domain")
ws_slug = f"interactive-{repo_slug}"
# Find or create the interactive workstream
existing = _get("/workstreams/", {"slug": ws_slug})
ws = existing[0] if isinstance(existing, list) and existing else None
if not ws:
# Find a topic for this domain to satisfy the FK
topics = _get("/topics/")
topic = next(
(t for t in (topics if isinstance(topics, list) else [])
if t.get("domain_slug") == domain_slug or t.get("domain") == domain_slug),
None,
)
if not topic:
return json.dumps({"error": f"No topic found for domain {domain_slug!r} — cannot create workstream."})
ws = _post("/workstreams", {
"topic_id": topic["id"],
"slug": ws_slug,
"title": f"Interactive — {repo_slug}",
"description": "Ad-hoc tasks created outside a formal workplan.",
"owner": "custodian",
"repo_id": repo_id,
})
if "error" in ws:
return json.dumps(ws)
# Create task
task = _post("/tasks", {
"workstream_id": ws["id"],
"title": title,
"description": description,
"priority": "medium",
})
if "error" in task:
return json.dumps(task)
# Mark done — triggers three-tier token recording in the router
body: dict[str, Any] = {
"status": "done",
"model": model,
"agent": agent,
"session_id": session_id,
}
if tokens_in is not None:
body["tokens_in"] = tokens_in
if tokens_out is not None:
body["tokens_out"] = tokens_out
if note is not None:
body["token_note"] = note
_patch(f"/tasks/{task['id']}", body)
effective_note = note or ("measured" if tokens_in is not None else "heuristic")
return json.dumps({
"task_id": task["id"],
"workstream_id": ws["id"],
"workstream_slug": ws_slug,
"title": title,
"token_note": effective_note,
}, indent=2)
# ---------------------------------------------------------------------------
# Token events
# ---------------------------------------------------------------------------
@mcp.tool()
def record_token_event(
tokens_in: int,
tokens_out: int,
task_id: Optional[str] = None,
workstream_id: Optional[str] = None,
repo_id: Optional[str] = None,
model: Optional[str] = None,
agent: Optional[str] = None,
ref_type: Optional[str] = None,
ref_id: Optional[str] = None,
note: Optional[str] = None,
session_id: Optional[str] = None,
) -> str:
"""Record AI token consumption for a task, workstream, or session.
workstream_id is auto-populated from the task if task_id is provided and
workstream_id is omitted. Returns the created event id and running total
for the task/workstream (if applicable).
Args:
tokens_in: Input token count
tokens_out: Output token count
task_id: UUID of the task (nullable)
workstream_id: UUID of the workstream (nullable; auto-filled from task)
repo_id: UUID of the managed repo (nullable)
model: Model identifier, e.g. 'claude-sonnet-4-6'
agent: Agent name, e.g. 'custodian', 'ralph'
ref_type: 'task'|'workstream'|'commit'|'release'|'session'
ref_id: Commit SHA, release tag, or other reference string
note: Free-text note
session_id: Agent session identifier
"""
body = {
"tokens_in": tokens_in,
"tokens_out": tokens_out,
"task_id": task_id,
"workstream_id": workstream_id,
"repo_id": repo_id,
"model": model,
"agent": agent,
"ref_type": ref_type,
"ref_id": ref_id,
"note": note,
"session_id": session_id,
}
result = _post("/token-events", body)
if "error" in result:
return json.dumps(result)
out = {
"event_id": result.get("id"),
"tokens_total": result.get("tokens_total"),
"tokens_in": result.get("tokens_in"),
"tokens_out": result.get("tokens_out"),
}
# Append running total for the task if available
scope_id = task_id or workstream_id
scope = "task" if task_id else ("workstream" if workstream_id else None)
if scope and scope_id:
summary = _get("/token-events/summary", {"scope": scope, "id": scope_id})
if "error" not in summary:
out["running_total"] = {
"scope": scope,
"scope_id": scope_id,
"tokens_total": summary.get("tokens_total"),
"event_count": summary.get("event_count"),
}
return json.dumps(out, indent=2)
@mcp.tool()
def get_token_summary(scope: str, id: str) -> str:
"""Return token consumption summary for a given scope.
Returns a formatted table of token usage aggregated by scope.
Args:
scope: One of: task | workstream | repo | commit | release | session
id: UUID for task/workstream/repo scopes; ref_id string for commit/release/session
"""
result = _get("/token-events/summary", {"scope": scope, "id": id})
if "error" in result:
return json.dumps(result)
lines = [
f"Token Summary — {scope}: {id}",
f"{'' * 50}",
f" tokens_in : {result.get('tokens_in', 0):>10,}",
f" tokens_out : {result.get('tokens_out', 0):>10,}",
f" tokens_total: {result.get('tokens_total', 0):>10,}",
f" event_count : {result.get('event_count', 0):>10,}",
]
by_model = result.get("by_model", {})
if by_model:
lines.append("\nBy model:")
for m, t in sorted(by_model.items(), key=lambda x: -x[1]):
lines.append(f" {m:<35} {t:>10,}")
by_agent = result.get("by_agent", {})
if by_agent:
lines.append("\nBy agent:")
for a, t in sorted(by_agent.items(), key=lambda x: -x[1]):
lines.append(f" {a:<35} {t:>10,}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
transport = os.environ.get("MCP_TRANSPORT", "stdio")
if transport == "stdio":
mcp.run(transport="stdio")
else:
port = int(os.environ.get("MCP_PORT", "8001"))
mcp.run(transport=transport, host="127.0.0.1", port=port)