generated from coulomb/repo-seed
Adds a JSONB column `host_paths` to managed_repos mapping
hostname → absolute local path. Fixes the consistency-checker
failure when the same repo lives at different paths on different
machines (e.g. /home/worsch/marki-docx on the workstation vs
/home/tegwick/marki-docx on custodiancore).
Changes:
- Migration g4b5c6d7e8f9: adds host_paths JSONB (default {})
- Model: host_paths Mapped[dict] column
- Schemas: host_paths in RepoRead; new RepoPathRegister schema
- Router: POST /repos/{slug}/paths/ — merges one host entry
- consistency_check.py: resolve_repo_path() prefers host_paths
[hostname] over local_path; --repo-path CLI override added
- MCP: update_repo_path(slug, path, host?) tool
- Makefile: register-path target; REPO_PATH passthrough on
check-consistency and fix-consistency targets
- TOOLS.md: documents update_repo_path
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1537 lines
54 KiB
Python
1537 lines
54 KiB
Python
"""Custodian State Hub MCP Server (stdio).
|
|
|
|
Thin HTTP client over the FastAPI service — no direct DB access.
|
|
All business logic stays in the API; this layer is stateless.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from uuid import UUID
|
|
|
|
import httpx
|
|
from fastmcp import FastMCP
|
|
|
|
API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000").rstrip("/")
|
|
|
|
mcp = FastMCP(
|
|
name="state-hub",
|
|
instructions=(
|
|
"Custodian State Hub: tracks topics, workstreams, tasks, decisions, and progress events. "
|
|
"Start every session with get_state_summary() for orientation. "
|
|
"When working inside a single registered domain repo, prefer get_domain_summary(domain_slug) "
|
|
"— it returns the same actionable data scoped to that domain at ~10% of the token cost. "
|
|
"All writes emit a progress_event automatically."
|
|
),
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# HTTP helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _client() -> httpx.Client:
|
|
return httpx.Client(base_url=API_BASE, timeout=30.0, follow_redirects=True)
|
|
|
|
|
|
def _get(path: str, params: dict | None = None) -> Any:
|
|
if not path.endswith("/"):
|
|
path = path + "/"
|
|
with _client() as c:
|
|
r = c.get(path, params={k: v for k, v in (params or {}).items() if v is not None})
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
|
|
def _post(path: str, body: dict) -> Any:
|
|
if not path.endswith("/"):
|
|
path = path + "/"
|
|
with _client() as c:
|
|
r = c.post(path, json={k: v for k, v in body.items() if v is not None})
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
|
|
def _patch(path: str, body: dict) -> Any:
|
|
if not path.endswith("/"):
|
|
path = path + "/"
|
|
with _client() as c:
|
|
r = c.patch(path, json={k: v for k, v in body.items() if v is not None})
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
|
|
def _delete(path: str) -> None:
|
|
with _client() as c:
|
|
r = c.delete(path)
|
|
r.raise_for_status()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Resources
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@mcp.resource("state://summary")
|
|
def resource_summary() -> str:
|
|
"""Full StateSummary JSON — primary orientation resource."""
|
|
return json.dumps(_get("/state/summary"), indent=2)
|
|
|
|
|
|
@mcp.resource("state://topics")
|
|
def resource_topics() -> str:
|
|
"""Active topics list."""
|
|
return json.dumps(_get("/topics", {"status": "active"}), indent=2)
|
|
|
|
|
|
@mcp.resource("state://workstreams/{topic_slug}")
|
|
def resource_workstreams(topic_slug: str) -> str:
|
|
"""Workstreams for a topic (by slug)."""
|
|
topics = _get("/topics", {"status": "active"})
|
|
match = next((t for t in topics if t["slug"] == topic_slug), None)
|
|
if not match:
|
|
return json.dumps({"error": f"Topic '{topic_slug}' not found"})
|
|
return json.dumps(_get("/workstreams", {"topic_id": match["id"]}), indent=2)
|
|
|
|
|
|
@mcp.resource("state://decisions/blocking")
|
|
def resource_blocking_decisions() -> str:
|
|
"""All pending/escalated decisions."""
|
|
return json.dumps(
|
|
_get("/decisions", {"decision_type": "pending", "status": "open"}),
|
|
indent=2,
|
|
)
|
|
|
|
|
|
@mcp.resource("state://tasks/blocked")
|
|
def resource_blocked_tasks() -> str:
|
|
"""All tasks with status=blocked."""
|
|
return json.dumps(_get("/tasks", {"status": "blocked"}), indent=2)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Query tools
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@mcp.tool()
|
|
def get_state_summary() -> str:
|
|
"""Primary orientation tool. Call at the start of every session.
|
|
|
|
Returns a full snapshot: topic/workstream/task/decision totals, blocking
|
|
decisions, blocked tasks, open workstreams, and the 20 most recent events.
|
|
|
|
NOTE: This response is large (~10k tokens). When working inside a single
|
|
registered domain repo, use get_domain_summary(domain_slug) instead —
|
|
same actionable data scoped to one domain at ~10% of the token cost.
|
|
"""
|
|
return json.dumps(_get("/state/summary"), indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def get_domain_summary(domain_slug: str) -> str:
|
|
"""Lightweight session orientation for a single domain.
|
|
|
|
Use this instead of get_state_summary() when working in a registered
|
|
domain repo — returns only what is relevant to the specified domain,
|
|
typically 80-90% fewer tokens than the full summary.
|
|
|
|
Args:
|
|
domain_slug: the domain slug, e.g. "railiance", "markitect"
|
|
|
|
Returns: topic, active workstreams, open blocking decisions for this
|
|
topic, 5 most recent progress events, repo SBOM status, and goal guidance
|
|
(needs_workplan signals + alignment warnings).
|
|
"""
|
|
topics = _get("/topics")
|
|
topic = next((t for t in topics if t.get("domain_slug") == domain_slug), None)
|
|
if not topic:
|
|
return json.dumps({"error": f"No topic found for domain '{domain_slug}'"})
|
|
|
|
topic_id = topic["id"]
|
|
|
|
workstreams = _get("/workstreams", {"topic_id": topic_id, "status": "active"})
|
|
blocking = _get("/decisions", {"decision_type": "pending", "topic_id": topic_id})
|
|
recent = _get("/progress", {"topic_id": topic_id, "limit": 5})
|
|
repos = _get("/repos", {"domain": domain_slug})
|
|
|
|
# ── Goal guidance ──────────────────────────────────────────────────────────
|
|
# Fetch active repo goals per repo, then cross-reference with workstreams.
|
|
repo_by_id = {r["id"]: r for r in repos}
|
|
ws_by_repo_goal: dict[str, list] = {}
|
|
for ws in workstreams:
|
|
if ws.get("repo_goal_id"):
|
|
ws_by_repo_goal.setdefault(ws["repo_goal_id"], []).append(ws)
|
|
|
|
# repo_id → list of active workstreams (for alignment check)
|
|
ws_by_repo: dict[str, list] = {}
|
|
for ws in workstreams:
|
|
if ws.get("repo_id"):
|
|
ws_by_repo.setdefault(ws["repo_id"], []).append(ws)
|
|
|
|
needs_workplan: list[dict] = [] # active goal with no linked workstream
|
|
alignment_warnings: list[dict] = [] # workstreams not linked to active goal
|
|
|
|
for repo in repos:
|
|
repo_slug = repo["slug"]
|
|
repo_id = repo["id"]
|
|
active_goals = _get("/repo-goals", {"repo_slug": repo_slug, "status": "active"})
|
|
if not active_goals:
|
|
continue
|
|
active_goal_ids = {g["id"] for g in active_goals}
|
|
|
|
for goal in active_goals:
|
|
linked = ws_by_repo_goal.get(goal["id"], [])
|
|
if not linked:
|
|
needs_workplan.append({
|
|
"repo_slug": repo_slug,
|
|
"goal_id": goal["id"],
|
|
"goal_title": goal["title"],
|
|
"goal_description": goal["description"],
|
|
"priority": goal["priority"],
|
|
"action": (
|
|
f"No workstream is linked to repo goal '{goal['title']}'. "
|
|
"Create a workplan file in workplans/ and register a workstream "
|
|
f"with repo_goal_id='{goal['id']}' to start delivering this goal."
|
|
),
|
|
})
|
|
|
|
# Check if repo has active workstreams not tied to any active goal
|
|
repo_ws = ws_by_repo.get(repo_id, [])
|
|
unlinked_ws = [
|
|
ws for ws in repo_ws
|
|
if ws.get("repo_goal_id") not in active_goal_ids
|
|
]
|
|
if unlinked_ws:
|
|
# Most recently updated workstream = the one to suggest continuing
|
|
recent_ws = max(unlinked_ws, key=lambda w: w.get("updated_at", ""))
|
|
alignment_warnings.append({
|
|
"repo_slug": repo_slug,
|
|
"recent_workstream_id": recent_ws["id"],
|
|
"recent_workstream_title": recent_ws["title"],
|
|
"active_goal_titles": [g["title"] for g in active_goals],
|
|
"message": (
|
|
f"Workstream '{recent_ws['title']}' is not linked to the current "
|
|
f"repo goal(s) for {repo_slug}. "
|
|
"Continue this workstream if the work is still relevant, but verify "
|
|
"alignment with the active goal before committing to new tasks."
|
|
),
|
|
})
|
|
|
|
goal_guidance: dict = {}
|
|
if needs_workplan or alignment_warnings:
|
|
goal_guidance = {
|
|
"needs_workplan": needs_workplan,
|
|
"alignment_warnings": alignment_warnings,
|
|
}
|
|
|
|
result: dict = {
|
|
"domain": domain_slug,
|
|
"topic_id": topic_id,
|
|
"topic_title": topic["title"],
|
|
"workstreams": workstreams,
|
|
"blocking_decisions": blocking,
|
|
"recent_progress": recent,
|
|
"repos": [{"slug": r["slug"], "last_sbom_at": r.get("last_sbom_at")} for r in repos],
|
|
}
|
|
if goal_guidance:
|
|
result["goal_guidance"] = goal_guidance
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def get_topic(slug: str) -> str:
|
|
"""Return a topic (with workstreams) by slug, plus its recent progress events."""
|
|
topics = _get("/topics")
|
|
match = next((t for t in topics if t["slug"] == slug), None)
|
|
if not match:
|
|
return json.dumps({"error": f"Topic '{slug}' not found"})
|
|
topic_detail = _get(f"/topics/{match['id']}")
|
|
recent = _get("/progress", {"topic_id": match["id"], "limit": 10})
|
|
return json.dumps({"topic": topic_detail, "recent_progress": recent}, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def list_blocked_tasks(workstream_id: str | None = None) -> str:
|
|
"""List all tasks with status=blocked, optionally filtered by workstream_id."""
|
|
return json.dumps(_get("/tasks", {"status": "blocked", "workstream_id": workstream_id}), indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def list_pending_decisions(topic_id: str | None = None) -> str:
|
|
"""List pending decisions sorted by deadline (nulls last).
|
|
|
|
Optionally filter by topic_id. Escalated decisions are included and
|
|
highlighted by their escalation_note.
|
|
"""
|
|
results = _get("/decisions", {"decision_type": "pending", "topic_id": topic_id})
|
|
return json.dumps(results, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def get_recent_progress(limit: int = 20, since: str | None = None) -> str:
|
|
"""Retrieve recent progress events to reconstruct session history.
|
|
|
|
Args:
|
|
limit: max events to return (default 20)
|
|
since: ISO datetime string — only events after this timestamp
|
|
"""
|
|
return json.dumps(_get("/progress", {"limit": limit, "since": since}), indent=2)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Mutate tools
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@mcp.tool()
|
|
def create_workstream(
|
|
topic_id: str,
|
|
title: str,
|
|
slug: str | None = None,
|
|
description: str | None = None,
|
|
owner: str | None = None,
|
|
due_date: str | None = None,
|
|
repo_id: str | None = None,
|
|
) -> str:
|
|
"""Create a new workstream under a topic and emit a progress_event.
|
|
|
|
Args:
|
|
topic_id: UUID of the parent topic
|
|
title: workstream title
|
|
slug: URL-friendly identifier (auto-generated from title if omitted)
|
|
description: optional longer description
|
|
owner: optional owner name
|
|
due_date: optional ISO date string (YYYY-MM-DD)
|
|
repo_id: UUID of the owning repository (GEMS primary; strongly recommended per ADR-001)
|
|
"""
|
|
if not slug:
|
|
slug = re.sub(r"[^a-z0-9]+", "-", title.lower()).strip("-")
|
|
ws = _post("/workstreams", {
|
|
"topic_id": topic_id,
|
|
"title": title,
|
|
"slug": slug,
|
|
"description": description,
|
|
"owner": owner,
|
|
"due_date": due_date,
|
|
"status": "active",
|
|
"repo_id": repo_id,
|
|
})
|
|
_post("/progress", {
|
|
"topic_id": topic_id,
|
|
"workstream_id": ws["id"],
|
|
"event_type": "workstream_created",
|
|
"summary": f"Workstream created: {title}",
|
|
"author": "custodian",
|
|
"detail": {"owner": owner, "slug": slug},
|
|
})
|
|
return json.dumps(ws, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def create_task(
|
|
workstream_id: str,
|
|
title: str,
|
|
priority: str = "medium",
|
|
description: str | None = None,
|
|
assignee: str | None = None,
|
|
due_date: str | None = None,
|
|
) -> str:
|
|
"""Create a new task and emit a progress_event.
|
|
|
|
Args:
|
|
workstream_id: UUID of the parent workstream
|
|
title: task title
|
|
priority: low | medium | high | critical
|
|
description: optional longer description
|
|
assignee: optional assignee name
|
|
due_date: optional ISO date string (YYYY-MM-DD)
|
|
"""
|
|
task = _post("/tasks", {
|
|
"workstream_id": workstream_id,
|
|
"title": title,
|
|
"priority": priority,
|
|
"description": description,
|
|
"assignee": assignee,
|
|
"due_date": due_date,
|
|
})
|
|
_post("/progress", {
|
|
"workstream_id": workstream_id,
|
|
"task_id": task["id"],
|
|
"event_type": "task_created",
|
|
"summary": f"Task created: {title}",
|
|
"author": "custodian",
|
|
"detail": {"priority": priority, "assignee": assignee},
|
|
})
|
|
return json.dumps(task, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def update_task_status(
|
|
task_id: str,
|
|
status: str,
|
|
blocking_reason: str | None = None,
|
|
) -> str:
|
|
"""Update a task's status. blocking_reason is required when status='blocked'.
|
|
|
|
Args:
|
|
task_id: UUID of the task
|
|
status: todo | in_progress | blocked | done | cancelled
|
|
blocking_reason: required when status=blocked
|
|
"""
|
|
body: dict[str, Any] = {"status": status}
|
|
if blocking_reason:
|
|
body["blocking_reason"] = blocking_reason
|
|
task = _patch(f"/tasks/{task_id}", body)
|
|
_post("/progress", {
|
|
"task_id": task_id,
|
|
"workstream_id": task.get("workstream_id"),
|
|
"event_type": "task_status_changed",
|
|
"summary": f"Task status → {status}: {task['title']}",
|
|
"author": "custodian",
|
|
"detail": {"blocking_reason": blocking_reason},
|
|
})
|
|
return json.dumps(task, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def flag_for_human(task_id: str, note: str) -> str:
|
|
"""Flag a task as requiring human intervention.
|
|
|
|
Sets needs_human=True and records the required action as intervention_note.
|
|
Emits a progress event so the flag is visible in session history.
|
|
|
|
Args:
|
|
task_id: UUID of the task to flag
|
|
note: description of the action required from the human (required)
|
|
"""
|
|
task = _patch(f"/tasks/{task_id}", {
|
|
"needs_human": True,
|
|
"intervention_note": note,
|
|
})
|
|
_post("/progress", {
|
|
"task_id": task_id,
|
|
"workstream_id": task.get("workstream_id"),
|
|
"event_type": "task_flagged_human",
|
|
"summary": f"Task flagged for human intervention: {task['title']}",
|
|
"author": "custodian",
|
|
"detail": {"intervention_note": note},
|
|
})
|
|
return json.dumps(task, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def clear_human_flag(task_id: str) -> str:
|
|
"""Clear the human-intervention flag from a task.
|
|
|
|
Sets needs_human=False. The intervention_note is preserved as a
|
|
historical record. Call this after the human has completed the action.
|
|
|
|
Args:
|
|
task_id: UUID of the task to clear
|
|
"""
|
|
task = _patch(f"/tasks/{task_id}", {
|
|
"needs_human": False,
|
|
})
|
|
_post("/progress", {
|
|
"task_id": task_id,
|
|
"workstream_id": task.get("workstream_id"),
|
|
"event_type": "task_flag_cleared",
|
|
"summary": f"Human-intervention flag cleared: {task['title']}",
|
|
"author": "custodian",
|
|
})
|
|
return json.dumps(task, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def list_human_interventions(workstream_id: str | None = None) -> str:
|
|
"""List all tasks flagged for human intervention.
|
|
|
|
Returns tasks where needs_human=True, optionally filtered to one workstream.
|
|
Use this at session start to surface Bernd's action items.
|
|
|
|
Args:
|
|
workstream_id: optional UUID to scope results to one workstream
|
|
"""
|
|
return json.dumps(
|
|
_get("/tasks", {"needs_human": "true", "workstream_id": workstream_id}),
|
|
indent=2,
|
|
)
|
|
|
|
|
|
@mcp.tool()
|
|
def record_decision(
|
|
title: str,
|
|
decision_type: str = "pending",
|
|
topic_id: str | None = None,
|
|
workstream_id: str | None = None,
|
|
description: str | None = None,
|
|
rationale: str | None = None,
|
|
decided_by: str | None = None,
|
|
deadline: str | None = None,
|
|
) -> str:
|
|
"""Record a decision (made or pending).
|
|
|
|
Pending decisions touching financial/legal topics are auto-escalated per
|
|
constitution §4.
|
|
|
|
Args:
|
|
title: decision title
|
|
decision_type: made | pending
|
|
topic_id: optional topic UUID
|
|
workstream_id: optional workstream UUID (at least one required)
|
|
description: optional context
|
|
rationale: reasoning behind the decision
|
|
decided_by: person/agent who decided
|
|
deadline: ISO datetime string for when decision is needed
|
|
"""
|
|
decision = _post("/decisions", {
|
|
"title": title,
|
|
"decision_type": decision_type,
|
|
"topic_id": topic_id,
|
|
"workstream_id": workstream_id,
|
|
"description": description,
|
|
"rationale": rationale,
|
|
"decided_by": decided_by,
|
|
"deadline": deadline,
|
|
})
|
|
_post("/progress", {
|
|
"topic_id": topic_id,
|
|
"workstream_id": workstream_id,
|
|
"decision_id": decision["id"],
|
|
"event_type": "decision_recorded",
|
|
"summary": f"Decision recorded ({decision_type}): {title}",
|
|
"author": "custodian",
|
|
"detail": {"status": decision.get("status"), "escalation_note": decision.get("escalation_note")},
|
|
})
|
|
return json.dumps(decision, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def resolve_decision(
|
|
decision_id: str,
|
|
rationale: str,
|
|
decided_by: str,
|
|
) -> str:
|
|
"""Mark a decision as resolved.
|
|
|
|
Args:
|
|
decision_id: UUID of the decision
|
|
rationale: final reasoning/outcome
|
|
decided_by: who resolved it
|
|
"""
|
|
decision = _patch(f"/decisions/{decision_id}", {
|
|
"status": "resolved",
|
|
"decision_type": "made",
|
|
"rationale": rationale,
|
|
"decided_by": decided_by,
|
|
"decided_at": datetime.utcnow().isoformat() + "Z",
|
|
})
|
|
_post("/progress", {
|
|
"topic_id": decision.get("topic_id"),
|
|
"workstream_id": decision.get("workstream_id"),
|
|
"decision_id": decision_id,
|
|
"event_type": "decision_resolved",
|
|
"summary": f"Decision resolved by {decided_by}: {decision['title']}",
|
|
"author": "custodian",
|
|
"detail": {"rationale": rationale},
|
|
})
|
|
return json.dumps(decision, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def add_progress_event(
|
|
summary: str,
|
|
event_type: str = "note",
|
|
topic_id: str | None = None,
|
|
workstream_id: str | None = None,
|
|
task_id: str | None = None,
|
|
detail: dict | None = None,
|
|
) -> str:
|
|
"""Append a progress event to the log.
|
|
|
|
Args:
|
|
summary: human-readable summary of what happened
|
|
event_type: free-form label, e.g. note | milestone | blocker | insight
|
|
topic_id: optional topic UUID
|
|
workstream_id: optional workstream UUID
|
|
task_id: optional task UUID
|
|
detail: optional structured data (JSONB)
|
|
"""
|
|
event = _post("/progress", {
|
|
"topic_id": topic_id,
|
|
"workstream_id": workstream_id,
|
|
"task_id": task_id,
|
|
"event_type": event_type,
|
|
"summary": summary,
|
|
"author": "custodian",
|
|
"detail": detail,
|
|
})
|
|
return json.dumps(event, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def update_workstream_status(workstream_id: str, status: str) -> str:
|
|
"""Update a workstream's status.
|
|
|
|
Args:
|
|
workstream_id: UUID of the workstream
|
|
status: active | blocked | completed | archived
|
|
"""
|
|
ws = _patch(f"/workstreams/{workstream_id}", {"status": status})
|
|
_post("/progress", {
|
|
"workstream_id": workstream_id,
|
|
"topic_id": ws.get("topic_id"),
|
|
"event_type": "workstream_status_changed",
|
|
"summary": f"Workstream status → {status}: {ws['title']}",
|
|
"author": "custodian",
|
|
})
|
|
return json.dumps(ws, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def update_workstream(
|
|
workstream_id: str,
|
|
title: str | None = None,
|
|
description: str | None = None,
|
|
owner: str | None = None,
|
|
due_date: str | None = None,
|
|
repo_goal_id: str | None = None,
|
|
status: str | None = None,
|
|
) -> str:
|
|
"""Update fields on an existing workstream.
|
|
|
|
Args:
|
|
workstream_id: UUID of the workstream
|
|
title: new title (optional)
|
|
description: new description (optional)
|
|
owner: new owner (optional)
|
|
due_date: ISO date string YYYY-MM-DD (optional)
|
|
repo_goal_id: UUID of the repo goal to link (optional; pass empty string to clear)
|
|
status: active | blocked | completed | archived (optional)
|
|
"""
|
|
payload: dict = {}
|
|
if title is not None:
|
|
payload["title"] = title
|
|
if description is not None:
|
|
payload["description"] = description
|
|
if owner is not None:
|
|
payload["owner"] = owner
|
|
if due_date is not None:
|
|
payload["due_date"] = due_date
|
|
if status is not None:
|
|
payload["status"] = status
|
|
if repo_goal_id is not None:
|
|
payload["repo_goal_id"] = repo_goal_id if repo_goal_id else None
|
|
ws = _patch(f"/workstreams/{workstream_id}", payload)
|
|
return json.dumps(ws, indent=2)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Next-steps suggestion tool (S2.3) — sanctioned write use case #2
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@mcp.tool()
|
|
def get_next_steps() -> str:
|
|
"""Surface contextual next-action suggestions derived from hub state.
|
|
|
|
Returns suggestions based on:
|
|
- Recently resolved decisions → first open task in the same workstream
|
|
- Workstreams whose every dependency is now completed → first todo task
|
|
|
|
Each suggestion includes domain, workstream, task, and a plain-language
|
|
message. The hub surfaces *what* and *where* — the domain owns *how*.
|
|
|
|
This is one of the two sanctioned write-side use cases of the State Hub
|
|
(the other is resolve_decision). Suggestions are derived, not persisted.
|
|
"""
|
|
return json.dumps(_get("/state/next_steps"), indent=2)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Dependency graph tools (S1.4)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@mcp.tool()
|
|
def create_dependency(
|
|
from_workstream_id: str,
|
|
to_workstream_id: str,
|
|
description: str | None = None,
|
|
) -> str:
|
|
"""Record that one workstream depends on another.
|
|
|
|
Semantics: from_workstream cannot fully proceed until to_workstream reaches
|
|
a satisfactory state.
|
|
|
|
Args:
|
|
from_workstream_id: UUID of the workstream that has the dependency
|
|
to_workstream_id: UUID of the workstream it depends on
|
|
description: optional human-readable explanation of the dependency
|
|
"""
|
|
dep = _post(f"/workstreams/{from_workstream_id}/dependencies", {
|
|
"to_workstream_id": to_workstream_id,
|
|
"description": description,
|
|
})
|
|
return json.dumps(dep, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def list_dependencies(workstream_id: str) -> str:
|
|
"""Return all dependency edges touching a workstream (both directions).
|
|
|
|
The response distinguishes edges where this workstream is the dependent
|
|
(depends_on) from edges where it is the blocker (blocks).
|
|
|
|
Args:
|
|
workstream_id: UUID of the workstream to inspect
|
|
"""
|
|
edges = _get(f"/workstreams/{workstream_id}/dependencies")
|
|
depends_on = [e for e in edges if e["from_workstream_id"] == workstream_id]
|
|
blocks = [e for e in edges if e["to_workstream_id"] == workstream_id]
|
|
return json.dumps({"depends_on": depends_on, "blocks": blocks}, indent=2)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Extension points & technical debt
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@mcp.tool()
|
|
def register_extension_point(
|
|
domain: str,
|
|
title: str,
|
|
ep_type: str,
|
|
description: str | None = None,
|
|
location: str | None = None,
|
|
priority: str = "medium",
|
|
ep_id: str | None = None,
|
|
topic_id: str | None = None,
|
|
workstream_id: str | None = None,
|
|
) -> str:
|
|
"""Register a discovered extension point — optional future functionality not yet committed.
|
|
|
|
Extension points capture design forks: things the system *could* do that
|
|
have been noticed and parked for deliberate later consideration.
|
|
|
|
Args:
|
|
domain: one of custodian | railiance | markitect | coulomb_social | personhood | foerster_capabilities
|
|
title: short description of the extension
|
|
ep_type: api | schema | mcp | dashboard | architecture | integration | other
|
|
description: longer explanation of what the extension would add
|
|
location: file:line or module where the extension point was noticed
|
|
priority: low | medium | high | critical
|
|
ep_id: optional human-readable ID, e.g. EP-CUST-001 (auto-assigned if omitted)
|
|
topic_id: UUID of related topic
|
|
workstream_id: UUID of related workstream
|
|
"""
|
|
ep = _post("/extension-points", {
|
|
"domain": domain, "title": title, "ep_type": ep_type,
|
|
"description": description, "location": location,
|
|
"priority": priority, "ep_id": ep_id,
|
|
"topic_id": topic_id, "workstream_id": workstream_id,
|
|
})
|
|
_post("/progress", {
|
|
"summary": f"Extension point registered: [{ep.get('ep_id') or ep['id'][:8]}] {title} ({ep_type}, {domain})",
|
|
"event_type": "extension_point",
|
|
"detail": {"id": ep["id"], "ep_id": ep.get("ep_id"), "ep_type": ep_type, "domain": domain},
|
|
})
|
|
return json.dumps(ep, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def list_extension_points(
|
|
domain: str | None = None,
|
|
status: str | None = None,
|
|
ep_type: str | None = None,
|
|
) -> str:
|
|
"""List extension points, optionally filtered.
|
|
|
|
Args:
|
|
domain: filter by domain
|
|
status: open | in_progress | addressed | deferred | wont_fix
|
|
ep_type: api | schema | mcp | dashboard | architecture | integration | other
|
|
"""
|
|
return json.dumps(_get("/extension-points", {
|
|
"domain": domain, "status": status, "ep_type": ep_type,
|
|
}), indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def update_ep_status(ep_uuid: str, status: str) -> str:
|
|
"""Update the status of an extension point.
|
|
|
|
Args:
|
|
ep_uuid: UUID of the extension point
|
|
status: open | in_progress | addressed | deferred | wont_fix
|
|
"""
|
|
ep = _patch(f"/extension-points/{ep_uuid}", {"status": status})
|
|
_post("/progress", {
|
|
"summary": f"Extension point status → {status}: {ep['title']}",
|
|
"event_type": "extension_point",
|
|
"detail": {"id": ep_uuid, "status": status},
|
|
})
|
|
return json.dumps(ep, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def register_technical_debt(
|
|
domain: str,
|
|
title: str,
|
|
debt_type: str,
|
|
description: str | None = None,
|
|
location: str | None = None,
|
|
severity: str = "medium",
|
|
td_id: str | None = None,
|
|
topic_id: str | None = None,
|
|
workstream_id: str | None = None,
|
|
) -> str:
|
|
"""Register a technical debt item — a known quality compromise to address later.
|
|
|
|
Technical debt captures intentional or discovered shortcuts, design
|
|
weaknesses, missing tests, and similar issues that reduce codebase health.
|
|
|
|
Args:
|
|
domain: one of custodian | railiance | markitect | coulomb_social | personhood | foerster_capabilities
|
|
title: short description of the debt
|
|
debt_type: design | implementation | test | docs | dependencies | performance | security | other
|
|
description: what the issue is and what the correct fix would be
|
|
location: file:line or module where the debt lives
|
|
severity: low | medium | high | critical
|
|
td_id: optional human-readable ID, e.g. TD-CUST-001
|
|
topic_id: UUID of related topic
|
|
workstream_id: UUID of related workstream
|
|
"""
|
|
td = _post("/technical-debt", {
|
|
"domain": domain, "title": title, "debt_type": debt_type,
|
|
"description": description, "location": location,
|
|
"severity": severity, "td_id": td_id,
|
|
"topic_id": topic_id, "workstream_id": workstream_id,
|
|
})
|
|
_post("/progress", {
|
|
"summary": f"Technical debt registered: [{td.get('td_id') or td['id'][:8]}] {title} ({debt_type}, {severity}, {domain})",
|
|
"event_type": "technical_debt",
|
|
"detail": {"id": td["id"], "td_id": td.get("td_id"), "debt_type": debt_type, "severity": severity, "domain": domain},
|
|
})
|
|
return json.dumps(td, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def list_technical_debt(
|
|
domain: str | None = None,
|
|
status: str | None = None,
|
|
debt_type: str | None = None,
|
|
severity: str | None = None,
|
|
) -> str:
|
|
"""List technical debt items, optionally filtered.
|
|
|
|
Args:
|
|
domain: filter by domain
|
|
status: open | in_progress | resolved | deferred | wont_fix
|
|
debt_type: design | implementation | test | docs | dependencies | performance | security | other
|
|
severity: low | medium | high | critical
|
|
"""
|
|
return json.dumps(_get("/technical-debt", {
|
|
"domain": domain, "status": status,
|
|
"debt_type": debt_type, "severity": severity,
|
|
}), indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def update_td_status(td_uuid: str, status: str) -> str:
|
|
"""Update the status of a technical debt item.
|
|
|
|
Args:
|
|
td_uuid: UUID of the technical debt item
|
|
status: open | in_progress | resolved | deferred | wont_fix
|
|
"""
|
|
td = _patch(f"/technical-debt/{td_uuid}", {"status": status})
|
|
_post("/progress", {
|
|
"summary": f"Technical debt status → {status}: {td['title']}",
|
|
"event_type": "technical_debt",
|
|
"detail": {"id": td_uuid, "status": status},
|
|
})
|
|
return json.dumps(td, indent=2)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Domain lifecycle + repo registration tools (v0.5)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@mcp.tool()
|
|
def list_domains(status: str = "active") -> str:
|
|
"""List all registered domains.
|
|
|
|
Args:
|
|
status: active | archived | all (default: active)
|
|
"""
|
|
return json.dumps(_get("/domains", {"status": status}), indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def create_domain(slug: str, name: str, description: str | None = None) -> str:
|
|
"""Create a new domain.
|
|
|
|
Args:
|
|
slug: URL-friendly identifier (lowercase, underscored), e.g. 'my_project'
|
|
name: Human-readable display name
|
|
description: optional longer description
|
|
"""
|
|
domain = _post("/domains", {"slug": slug, "name": name, "description": description})
|
|
_post("/progress", {
|
|
"event_type": "milestone",
|
|
"summary": f"Domain created: {slug} ({name})",
|
|
"author": "custodian",
|
|
"detail": {"slug": slug, "name": name},
|
|
})
|
|
return json.dumps(domain, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def rename_domain(slug: str, new_slug: str, new_name: str) -> str:
|
|
"""Rename a domain — cascades to EP/TD string columns.
|
|
|
|
Args:
|
|
slug: Current domain slug
|
|
new_slug: New URL-friendly identifier
|
|
new_name: New human-readable display name
|
|
"""
|
|
domain = _patch(f"/domains/{slug}/rename", {"new_slug": new_slug, "new_name": new_name})
|
|
_post("/progress", {
|
|
"event_type": "milestone",
|
|
"summary": f"Domain renamed: {slug} → {new_slug} ({new_name})",
|
|
"author": "custodian",
|
|
"detail": {"old_slug": slug, "new_slug": new_slug, "new_name": new_name},
|
|
})
|
|
return json.dumps(domain, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def archive_domain(slug: str) -> str:
|
|
"""Archive a domain (soft-delete). Fails if active topics exist.
|
|
|
|
Args:
|
|
slug: Domain slug to archive
|
|
"""
|
|
domain = _patch(f"/domains/{slug}/archive", {})
|
|
_post("/progress", {
|
|
"event_type": "note",
|
|
"summary": f"Domain archived: {slug}",
|
|
"author": "custodian",
|
|
"detail": {"slug": slug},
|
|
})
|
|
return json.dumps(domain, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def list_domain_repos(domain_slug: str) -> str:
|
|
"""List all repositories registered under a domain.
|
|
|
|
Args:
|
|
domain_slug: Domain slug to filter by
|
|
"""
|
|
return json.dumps(_get("/repos", {"domain": domain_slug}), indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def register_repo(
|
|
domain_slug: str,
|
|
name: str,
|
|
slug: str | None = None,
|
|
local_path: str | None = None,
|
|
remote_url: str | None = None,
|
|
description: str | None = None,
|
|
) -> str:
|
|
"""Register a git repository under a domain.
|
|
|
|
Args:
|
|
domain_slug: Domain slug (must already exist)
|
|
name: Human-readable repository name
|
|
slug: URL-friendly identifier (auto-generated from name if omitted)
|
|
local_path: Absolute local filesystem path to the repo
|
|
remote_url: Remote git URL (Gitea, GitHub, etc.)
|
|
description: optional description
|
|
"""
|
|
import re as _re
|
|
if not slug:
|
|
slug = _re.sub(r"[^a-z0-9]+", "-", name.lower()).strip("-")
|
|
repo = _post("/repos", {
|
|
"domain_slug": domain_slug,
|
|
"slug": slug,
|
|
"name": name,
|
|
"local_path": local_path,
|
|
"remote_url": remote_url,
|
|
"description": description,
|
|
})
|
|
_post("/progress", {
|
|
"event_type": "milestone",
|
|
"summary": f"Repo registered: {name} under domain '{domain_slug}'",
|
|
"author": "custodian",
|
|
"detail": {"slug": slug, "domain_slug": domain_slug, "local_path": local_path, "remote_url": remote_url},
|
|
})
|
|
return json.dumps(repo, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def update_repo_path(repo_slug: str, path: str, host: str | None = None) -> str:
|
|
"""Register or update the local filesystem path for a repo on a specific host.
|
|
|
|
Use this when a repo lives at a different absolute path on different machines
|
|
(e.g. /home/worsch/marki-docx on the workstation vs /home/tegwick/marki-docx
|
|
on custodiancore). The consistency checker will prefer the host-specific path
|
|
over the legacy local_path field.
|
|
|
|
Args:
|
|
repo_slug: Managed-repo slug (e.g. 'marki-docx')
|
|
path: Absolute local path on the target machine (e.g. '/home/tegwick/marki-docx')
|
|
host: Hostname to register the path for. Defaults to the current machine's hostname.
|
|
"""
|
|
import socket as _socket
|
|
if not host:
|
|
host = _socket.gethostname()
|
|
repo = _post(f"/repos/{repo_slug}/paths", {"host": host, "path": path})
|
|
return json.dumps(repo, indent=2)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ADR-001 compliance validation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@mcp.tool()
|
|
def validate_repo_adr(repo_path: str, domain_slug: str | None = None) -> str:
|
|
"""Check whether a repository is consistent with ADR-001.
|
|
|
|
Validates that workplan files exist in workplans/ with correct frontmatter,
|
|
that state_hub_workstream_id references resolve to real DB records, and that
|
|
no active state-hub workstreams for the domain lack a backing file (orphan
|
|
detection — DB-only records are an ADR-001 violation).
|
|
|
|
Args:
|
|
repo_path: Absolute path to the repository root.
|
|
domain_slug: Domain slug for orphan detection (e.g. 'custodian').
|
|
If omitted, inferred from workplan frontmatter.
|
|
"""
|
|
import subprocess
|
|
script = Path(__file__).parent.parent / "scripts" / "validate_repo_adr.py"
|
|
cmd = [sys.executable, str(script), repo_path, "--json",
|
|
"--api-base", API_BASE]
|
|
if domain_slug:
|
|
cmd += ["--domain", domain_slug]
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
try:
|
|
data = json.loads(result.stdout)
|
|
except json.JSONDecodeError:
|
|
return f"Validator script error:\n{result.stderr or result.stdout or '(no output)'}"
|
|
|
|
findings = data.get("findings", [])
|
|
summary = data.get("summary", {})
|
|
overall = data.get("result", "unknown")
|
|
|
|
failures = [f for f in findings if f["level"] == "FAIL"]
|
|
warnings = [f for f in findings if f["level"] == "WARN"]
|
|
|
|
lines = [f"ADR-001 Compliance: {repo_path}", ""]
|
|
|
|
if failures:
|
|
lines.append(f"FAILURES ({len(failures)}):")
|
|
for f in failures:
|
|
loc = f" [{f['file']}]" if f.get("file") else ""
|
|
lines.append(f" FAIL {f['check']}{loc}")
|
|
lines.append(f" {f['detail']}")
|
|
lines.append("")
|
|
|
|
if warnings:
|
|
lines.append(f"WARNINGS ({len(warnings)}):")
|
|
for f in warnings:
|
|
loc = f" [{f['file']}]" if f.get("file") else ""
|
|
lines.append(f" WARN {f['check']}{loc}")
|
|
lines.append(f" {f['detail']}")
|
|
lines.append("")
|
|
|
|
lines.append(
|
|
f"Summary: {summary.get('pass', 0)} pass | "
|
|
f"{summary.get('warn', 0)} warn | "
|
|
f"{summary.get('fail', 0)} fail"
|
|
)
|
|
lines.append(f"Result: {'FAIL' if overall == 'fail' else 'PASS (with warnings)' if overall == 'warn' else 'PASS'}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ADR-001 consistency checking engine
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@mcp.tool()
|
|
def check_repo_consistency(repo_slug: str, fix: bool = False) -> str:
|
|
"""Run ADR-001 consistency check for a registered repo.
|
|
|
|
Performs bidirectional checks between workplan files in the repo and the
|
|
state-hub DB. The file is always authoritative: drift is reported with the
|
|
file value as the expected value.
|
|
|
|
Checks: missing workplans/, parse errors, stale DB references, status/title
|
|
drift, unlinked workplans, orphan DB workstreams, repo mismatches, task
|
|
status drift, unlinked tasks, and orphan DB tasks.
|
|
|
|
Args:
|
|
repo_slug: Registered repo slug (e.g. 'the-custodian', 'activity-core').
|
|
fix: If True, apply auto-fixable issues: status drift (C-04), title drift
|
|
(C-05), create missing DB workstreams (C-06), repo mismatch (C-09),
|
|
task status drift (C-10), create unlinked tasks (C-11).
|
|
"""
|
|
import subprocess
|
|
script = Path(__file__).parent.parent / "scripts" / "consistency_check.py"
|
|
cmd = [sys.executable, str(script), "--repo", repo_slug, "--json",
|
|
"--api-base", API_BASE]
|
|
if fix:
|
|
cmd.append("--fix")
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
try:
|
|
data = json.loads(result.stdout)
|
|
except json.JSONDecodeError:
|
|
return f"Consistency check script error:\n{result.stderr or result.stdout or '(no output)'}"
|
|
|
|
issues = data.get("issues", [])
|
|
summary = data.get("summary", {})
|
|
overall = data.get("result", "unknown")
|
|
fixes = data.get("fixes_applied", [])
|
|
|
|
failures = [i for i in issues if i["severity"] == "FAIL"]
|
|
warnings = [i for i in issues if i["severity"] == "WARN"]
|
|
infos = [i for i in issues if i["severity"] == "INFO"]
|
|
|
|
lines = [
|
|
f"Consistency Check: {repo_slug}",
|
|
f"Path: {data.get('repo_path', '?')}",
|
|
"",
|
|
]
|
|
|
|
for sev, group in (("FAIL", failures), ("WARN", warnings), ("INFO", infos)):
|
|
if not group:
|
|
continue
|
|
lines.append(f"{sev}S ({len(group)}):")
|
|
for i in group:
|
|
loc = f" [{i['file_path']}]" if i.get("file_path") else ""
|
|
fix_tag = " [fixable]" if i.get("fixable") else ""
|
|
lines.append(f" {i['check_id']}{loc}{fix_tag}")
|
|
lines.append(f" {i['message']}")
|
|
lines.append("")
|
|
|
|
if fixes:
|
|
lines.append(f"Fixes applied ({len(fixes)}):")
|
|
for f in fixes:
|
|
lines.append(f" {f}")
|
|
lines.append("")
|
|
|
|
lines.append(
|
|
f"Summary: {summary.get('fail', 0)} fail | "
|
|
f"{summary.get('warn', 0)} warn | "
|
|
f"{summary.get('info', 0)} info"
|
|
)
|
|
lines.append(
|
|
f"Result: {'FAIL' if overall == 'fail' else 'PASS (with warnings)' if overall in ('warn',) else 'PASS'}"
|
|
)
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Contribution tracking (v0.3)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@mcp.resource("state://contributions")
|
|
def resource_contributions() -> str:
|
|
"""All contribution artifacts (BR/FR/EP/UPR)."""
|
|
return json.dumps(_get("/contributions"), indent=2)
|
|
|
|
|
|
@mcp.resource("state://sbom/aggregated")
|
|
def resource_sbom_aggregated() -> str:
|
|
"""Aggregated SBOM entries across all repos."""
|
|
return json.dumps(_get("/sbom"), indent=2)
|
|
|
|
|
|
@mcp.resource("state://sbom/{repo_slug}")
|
|
def resource_sbom_repo(repo_slug: str) -> str:
|
|
"""SBOM view for a specific repo (by slug)."""
|
|
return json.dumps(_get(f"/sbom/{repo_slug}"), indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def register_contribution(
|
|
type: str,
|
|
title: str,
|
|
target_org: str | None = None,
|
|
target_repo: str | None = None,
|
|
body_path: str | None = None,
|
|
related_workstream_id: str | None = None,
|
|
notes: str | None = None,
|
|
) -> str:
|
|
"""Register a new upstream contribution artifact (BR/FR/EP/UPR).
|
|
|
|
Args:
|
|
type: br | fr | ep | upr
|
|
title: Short human-readable title
|
|
target_org: GitHub org or owner of the upstream project
|
|
target_repo: Repository name of the upstream project
|
|
body_path: Relative path to the Markdown artifact file in the repo
|
|
related_workstream_id: UUID of the related workstream (optional)
|
|
notes: Any additional notes (optional)
|
|
"""
|
|
contrib = _post("/contributions", {
|
|
"type": type,
|
|
"title": title,
|
|
"target_org": target_org,
|
|
"target_repo": target_repo,
|
|
"body_path": body_path,
|
|
"related_workstream_id": related_workstream_id,
|
|
"notes": notes,
|
|
})
|
|
_post("/progress", {
|
|
"workstream_id": related_workstream_id,
|
|
"event_type": "contribution_registered",
|
|
"summary": f"Contribution registered [{type.upper()}]: {title}",
|
|
"author": "custodian",
|
|
"detail": {
|
|
"contribution_id": contrib["id"],
|
|
"type": type,
|
|
"target": f"{target_org}/{target_repo}" if target_org else target_repo,
|
|
"body_path": body_path,
|
|
},
|
|
})
|
|
return json.dumps(contrib, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def update_contribution_status(
|
|
contribution_id: str,
|
|
status: str,
|
|
notes: str | None = None,
|
|
) -> str:
|
|
"""Update the status of a contribution artifact.
|
|
|
|
Valid transitions: draft→submitted→acknowledged→accepted→merged
|
|
↘ ↘
|
|
rejected withdrawn
|
|
|
|
Args:
|
|
contribution_id: UUID of the contribution
|
|
status: submitted | acknowledged | accepted | rejected | merged | withdrawn
|
|
notes: Optional context for the status change
|
|
"""
|
|
contrib = _patch(f"/contributions/{contribution_id}/status", {
|
|
"status": status,
|
|
"notes": notes,
|
|
})
|
|
_post("/progress", {
|
|
"event_type": "contribution_status_changed",
|
|
"summary": f"Contribution status → {status}: {contrib['title']}",
|
|
"author": "custodian",
|
|
"detail": {"contribution_id": contribution_id, "status": status, "notes": notes},
|
|
})
|
|
return json.dumps(contrib, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def get_contributions(
|
|
type: str | None = None,
|
|
status: str | None = None,
|
|
target_repo: str | None = None,
|
|
) -> str:
|
|
"""List contribution artifacts, optionally filtered.
|
|
|
|
Args:
|
|
type: br | fr | ep | upr (optional)
|
|
status: draft | submitted | acknowledged | accepted | rejected | merged | withdrawn (optional)
|
|
target_repo: filter by upstream repo name (optional)
|
|
"""
|
|
return json.dumps(_get("/contributions", {
|
|
"type": type, "status": status, "target_repo": target_repo,
|
|
}), indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def ingest_sbom_tool(repo_slug: str, lockfile_path: str) -> str:
|
|
"""Ingest a lockfile into the State Hub SBOM store for a repo.
|
|
|
|
Parses the lockfile and POSTs entries to /sbom/ingest/. Each call creates
|
|
a new SBOMSnapshot; previous snapshots are retained as history.
|
|
|
|
Args:
|
|
repo_slug: Managed-repo slug (must be registered via register_repo)
|
|
lockfile_path: Absolute path to the lockfile (uv.lock, package-lock.json, Cargo.lock, etc.)
|
|
"""
|
|
import subprocess
|
|
script = Path(__file__).parent.parent / "scripts" / "ingest_sbom.py"
|
|
result = subprocess.run(
|
|
[sys.executable, str(script), "--repo", repo_slug,
|
|
"--lockfile", lockfile_path, "--api-base", API_BASE],
|
|
capture_output=True, text=True,
|
|
)
|
|
output = (result.stdout + result.stderr).strip()
|
|
if result.returncode != 0:
|
|
return f"ingest_sbom failed (exit {result.returncode}):\n{output}"
|
|
return output
|
|
|
|
|
|
@mcp.tool()
|
|
def get_licence_report() -> str:
|
|
"""Get a licence report across all ingested SBOM entries.
|
|
|
|
Returns packages grouped by SPDX licence identifier, with copyleft
|
|
flag (GPL/AGPL/LGPL/EUPL/CDDL/MPL) and repos using each licence.
|
|
"""
|
|
return json.dumps(_get("/sbom/report/licences"), indent=2)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Domain goals & repo goals (v0.7)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@mcp.tool()
|
|
def create_domain_goal(domain_slug: str, title: str, description: str) -> str:
|
|
"""Create a new domain goal and make it active (superseding any existing active goal).
|
|
|
|
A domain goal captures the high-level strategic intent for a domain. Only one
|
|
domain goal can be active at a time; creating a new active one supersedes the
|
|
previous active goal.
|
|
|
|
Args:
|
|
domain_slug: Slug of the domain (e.g. 'railiance', 'markitect')
|
|
title: Short goal title
|
|
description: Full description of the goal and its boundary conditions
|
|
"""
|
|
domains = _get("/domains", {"status": "active"})
|
|
domain = next((d for d in domains if d["slug"] == domain_slug), None)
|
|
if not domain:
|
|
return json.dumps({"error": f"Domain '{domain_slug}' not found"})
|
|
goal = _post("/domain-goals", {
|
|
"domain_id": domain["id"],
|
|
"title": title,
|
|
"description": description,
|
|
"status": "active",
|
|
})
|
|
_post("/progress", {
|
|
"event_type": "goal_created",
|
|
"summary": f"Domain goal created [{domain_slug}]: {title}",
|
|
"detail": {"goal_id": goal["id"], "domain_slug": domain_slug},
|
|
})
|
|
return json.dumps(goal, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def get_domain_goals(domain_slug: str, status: str | None = None) -> str:
|
|
"""List domain goals for a domain, optionally filtered by status.
|
|
|
|
Args:
|
|
domain_slug: Slug of the domain (e.g. 'railiance')
|
|
status: active | archived | superseded (omit for all)
|
|
"""
|
|
return json.dumps(_get("/domain-goals", {"domain_slug": domain_slug, "status": status}), indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def activate_domain_goal(goal_id: str) -> str:
|
|
"""Set a domain goal as the active goal, superseding any currently active one.
|
|
|
|
Args:
|
|
goal_id: UUID of the domain goal to activate
|
|
"""
|
|
goal = _post(f"/domain-goals/{goal_id}/activate", {})
|
|
_post("/progress", {
|
|
"event_type": "goal_activated",
|
|
"summary": f"Domain goal activated: {goal['title']}",
|
|
"detail": {"goal_id": goal_id, "domain_slug": goal.get("domain_slug")},
|
|
})
|
|
return json.dumps(goal, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def create_repo_goal(
|
|
repo_slug: str,
|
|
title: str,
|
|
description: str,
|
|
domain_goal_id: str | None = None,
|
|
priority: int = 100,
|
|
) -> str:
|
|
"""Create a new repository goal.
|
|
|
|
Repository goals capture what needs to be achieved in a specific repository.
|
|
Multiple active repo goals can coexist; priority (lower number = higher priority)
|
|
determines ordering. Optionally link to the parent domain goal.
|
|
|
|
Args:
|
|
repo_slug: Slug of the repository (e.g. 'railiance-bootstrap')
|
|
title: Short goal title
|
|
description: Full description including boundary conditions and scope
|
|
domain_goal_id: UUID of the parent domain goal (optional)
|
|
priority: Integer priority — lower numbers = higher priority (default 100)
|
|
"""
|
|
repos = _get("/repos")
|
|
repo = next((r for r in repos if r["slug"] == repo_slug), None)
|
|
if not repo:
|
|
return json.dumps({"error": f"Repo '{repo_slug}' not found"})
|
|
goal = _post("/repo-goals", {
|
|
"repo_id": repo["id"],
|
|
"title": title,
|
|
"description": description,
|
|
"domain_goal_id": domain_goal_id,
|
|
"priority": priority,
|
|
"status": "active",
|
|
})
|
|
_post("/progress", {
|
|
"event_type": "goal_created",
|
|
"summary": f"Repo goal created [{repo_slug}]: {title}",
|
|
"detail": {"goal_id": goal["id"], "repo_slug": repo_slug, "priority": priority},
|
|
})
|
|
return json.dumps(goal, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def get_repo_goals(repo_slug: str, status: str | None = None) -> str:
|
|
"""List repository goals for a repo, ordered by priority.
|
|
|
|
Args:
|
|
repo_slug: Slug of the repository (e.g. 'railiance-bootstrap')
|
|
status: active | paused | completed | archived (omit for all)
|
|
"""
|
|
return json.dumps(_get("/repo-goals", {"repo_slug": repo_slug, "status": status}), indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def update_repo_goal(
|
|
goal_id: str,
|
|
title: str | None = None,
|
|
description: str | None = None,
|
|
priority: int | None = None,
|
|
status: str | None = None,
|
|
domain_goal_id: str | None = None,
|
|
) -> str:
|
|
"""Update a repository goal (title, description, priority, status, or domain link).
|
|
|
|
Args:
|
|
goal_id: UUID of the repo goal
|
|
title: New title (optional)
|
|
description: New description (optional)
|
|
priority: New priority integer — lower = higher priority (optional)
|
|
status: active | paused | completed | archived (optional)
|
|
domain_goal_id: Link or re-link to a domain goal UUID (optional)
|
|
"""
|
|
updates: dict = {}
|
|
if title is not None:
|
|
updates["title"] = title
|
|
if description is not None:
|
|
updates["description"] = description
|
|
if priority is not None:
|
|
updates["priority"] = priority
|
|
if status is not None:
|
|
updates["status"] = status
|
|
if domain_goal_id is not None:
|
|
updates["domain_goal_id"] = domain_goal_id
|
|
goal = _patch(f"/repo-goals/{goal_id}", updates)
|
|
_post("/progress", {
|
|
"event_type": "goal_updated",
|
|
"summary": f"Repo goal updated: {goal['title']}",
|
|
"detail": {"goal_id": goal_id, "changes": list(updates.keys())},
|
|
})
|
|
return json.dumps(goal, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def get_repo_dispatch(repo_slug: str) -> str:
|
|
"""Return active workstreams, pending tasks, and goal for a repo.
|
|
|
|
Use this at the start of a repo agent session to discover what work is
|
|
pending without needing to read the full state summary or scan workplan
|
|
files. The response includes:
|
|
- active_goal: the highest-priority active repo goal
|
|
- active_workstreams: list of active workstreams with pending tasks
|
|
- human_interventions: tasks that need human input (needs_human=true)
|
|
- last_state_synced_at: when the repo was last synced to the hub
|
|
|
|
Args:
|
|
repo_slug: Slug of the repository (e.g. 'marki-docx')
|
|
"""
|
|
return json.dumps(_get(f"/repos/{repo_slug}/dispatch"), indent=2)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Agent Inbox (inter-agent message passing)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@mcp.tool()
|
|
def send_message(from_agent: str, to_agent: str, subject: str, body: str, thread_id: str | None = None) -> str:
|
|
"""Send a message from one agent to another (or 'broadcast' for all).
|
|
|
|
Use this to coordinate with other Claude instances — e.g. a worker agent
|
|
reporting status back to the orchestrator, or the hub agent dispatching
|
|
instructions to a domain agent.
|
|
|
|
Args:
|
|
from_agent: Sender identifier (e.g. 'hub', 'marki-docx', 'railiance')
|
|
to_agent: Recipient identifier or 'broadcast' for all agents
|
|
subject: Short subject line (max 500 chars)
|
|
body: Full message body (markdown supported)
|
|
thread_id: UUID of the root message to create a thread (optional)
|
|
"""
|
|
payload: dict = {"from_agent": from_agent, "to_agent": to_agent, "subject": subject, "body": body}
|
|
if thread_id:
|
|
payload["thread_id"] = thread_id
|
|
msg = _post("/messages/", payload)
|
|
return json.dumps(msg, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def get_messages(to_agent: str | None = None, from_agent: str | None = None, unread_only: bool = False, limit: int = 20) -> str:
|
|
"""List messages in the agent inbox.
|
|
|
|
Call this at session start to check for pending coordination messages.
|
|
|
|
Args:
|
|
to_agent: Filter by recipient (your agent name, or omit for all)
|
|
from_agent: Filter by sender (optional)
|
|
unread_only: Return only unread messages (default: False)
|
|
limit: Maximum number of messages to return (default: 20)
|
|
"""
|
|
params: dict = {"limit": limit, "unread_only": unread_only}
|
|
if to_agent:
|
|
params["to_agent"] = to_agent
|
|
if from_agent:
|
|
params["from_agent"] = from_agent
|
|
return json.dumps(_get("/messages/", params), indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def mark_message_read(message_id: str) -> str:
|
|
"""Mark an inbox message as read.
|
|
|
|
Args:
|
|
message_id: UUID of the message to mark as read
|
|
"""
|
|
return json.dumps(_patch(f"/messages/{message_id}/read", {}), indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def reply_to_message(message_id: str, from_agent: str, body: str) -> str:
|
|
"""Reply to a message. Marks the original as read and creates a reply in the same thread.
|
|
|
|
Args:
|
|
message_id: UUID of the message to reply to
|
|
from_agent: Your agent identifier
|
|
body: Reply body (markdown supported)
|
|
"""
|
|
return json.dumps(_post(f"/messages/{message_id}/reply", {"from_agent": from_agent, "body": body}), indent=2)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
if __name__ == "__main__":
|
|
transport = os.environ.get("MCP_TRANSPORT", "stdio")
|
|
if transport == "stdio":
|
|
mcp.run(transport="stdio")
|
|
else:
|
|
port = int(os.environ.get("MCP_PORT", "8001"))
|
|
mcp.run(transport=transport, host="127.0.0.1", port=port)
|