"""Custodian State Hub MCP Server (stdio). Thin HTTP client over the FastAPI service — no direct DB access. All business logic stays in the API; this layer is stateless. """ from __future__ import annotations import json import os import re import sys from datetime import datetime from pathlib import Path from typing import Any from uuid import UUID import httpx from fastmcp import FastMCP API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000").rstrip("/") mcp = FastMCP( name="state-hub", instructions=( "Custodian State Hub: tracks topics, workstreams, tasks, decisions, and progress events. " "Start every session with get_state_summary() for orientation. " "When working inside a single registered domain repo, prefer get_domain_summary(domain_slug) " "— it returns the same actionable data scoped to that domain at ~10% of the token cost. " "All writes emit a progress_event automatically." ), ) # --------------------------------------------------------------------------- # HTTP helpers # --------------------------------------------------------------------------- def _client() -> httpx.Client: return httpx.Client(base_url=API_BASE, timeout=30.0, follow_redirects=True) def _get(path: str, params: dict | None = None) -> Any: if not path.endswith("/"): path = path + "/" with _client() as c: r = c.get(path, params={k: v for k, v in (params or {}).items() if v is not None}) r.raise_for_status() return r.json() def _post(path: str, body: dict) -> Any: if not path.endswith("/"): path = path + "/" with _client() as c: r = c.post(path, json={k: v for k, v in body.items() if v is not None}) r.raise_for_status() return r.json() def _patch(path: str, body: dict) -> Any: if not path.endswith("/"): path = path + "/" with _client() as c: r = c.patch(path, json={k: v for k, v in body.items() if v is not None}) r.raise_for_status() return r.json() def _delete(path: str) -> None: with _client() as c: r = c.delete(path) r.raise_for_status() # --------------------------------------------------------------------------- # Resources # --------------------------------------------------------------------------- @mcp.resource("state://summary") def resource_summary() -> str: """Full StateSummary JSON — primary orientation resource.""" return json.dumps(_get("/state/summary"), indent=2) @mcp.resource("state://topics") def resource_topics() -> str: """Active topics list.""" return json.dumps(_get("/topics", {"status": "active"}), indent=2) @mcp.resource("state://workstreams/{topic_slug}") def resource_workstreams(topic_slug: str) -> str: """Workstreams for a topic (by slug).""" topics = _get("/topics", {"status": "active"}) match = next((t for t in topics if t["slug"] == topic_slug), None) if not match: return json.dumps({"error": f"Topic '{topic_slug}' not found"}) return json.dumps(_get("/workstreams", {"topic_id": match["id"]}), indent=2) @mcp.resource("state://decisions/blocking") def resource_blocking_decisions() -> str: """All pending/escalated decisions.""" return json.dumps( _get("/decisions", {"decision_type": "pending", "status": "open"}), indent=2, ) @mcp.resource("state://tasks/blocked") def resource_blocked_tasks() -> str: """All tasks with status=blocked.""" return json.dumps(_get("/tasks", {"status": "blocked"}), indent=2) # --------------------------------------------------------------------------- # Query tools # --------------------------------------------------------------------------- @mcp.tool() def get_state_summary() -> str: """Primary orientation tool. Call at the start of every session. Returns a full snapshot: topic/workstream/task/decision totals, blocking decisions, blocked tasks, open workstreams, and the 20 most recent events. NOTE: This response is large (~10k tokens). When working inside a single registered domain repo, use get_domain_summary(domain_slug) instead — same actionable data scoped to one domain at ~10% of the token cost. """ return json.dumps(_get("/state/summary"), indent=2) @mcp.tool() def get_domain_summary(domain_slug: str) -> str: """Lightweight session orientation for a single domain. Use this instead of get_state_summary() when working in a registered domain repo — returns only what is relevant to the specified domain, typically 80-90% fewer tokens than the full summary. Args: domain_slug: the domain slug, e.g. "railiance", "markitect" Returns: topic, active workstreams, open blocking decisions for this topic, 5 most recent progress events, and repo SBOM status for this domain. """ topics = _get("/topics") topic = next((t for t in topics if t.get("domain_slug") == domain_slug), None) if not topic: return json.dumps({"error": f"No topic found for domain '{domain_slug}'"}) topic_id = topic["id"] workstreams = _get("/workstreams", {"topic_id": topic_id, "status": "active"}) blocking = _get("/decisions", {"decision_type": "pending", "topic_id": topic_id}) recent = _get("/progress", {"topic_id": topic_id, "limit": 5}) repos = _get("/repos", {"domain": domain_slug}) return json.dumps({ "domain": domain_slug, "topic_id": topic_id, "topic_title": topic["title"], "workstreams": workstreams, "blocking_decisions": blocking, "recent_progress": recent, "repos": [{"slug": r["slug"], "last_sbom_at": r.get("last_sbom_at")} for r in repos], }, indent=2) @mcp.tool() def get_topic(slug: str) -> str: """Return a topic (with workstreams) by slug, plus its recent progress events.""" topics = _get("/topics") match = next((t for t in topics if t["slug"] == slug), None) if not match: return json.dumps({"error": f"Topic '{slug}' not found"}) topic_detail = _get(f"/topics/{match['id']}") recent = _get("/progress", {"topic_id": match["id"], "limit": 10}) return json.dumps({"topic": topic_detail, "recent_progress": recent}, indent=2) @mcp.tool() def list_blocked_tasks(workstream_id: str | None = None) -> str: """List all tasks with status=blocked, optionally filtered by workstream_id.""" return json.dumps(_get("/tasks", {"status": "blocked", "workstream_id": workstream_id}), indent=2) @mcp.tool() def list_pending_decisions(topic_id: str | None = None) -> str: """List pending decisions sorted by deadline (nulls last). Optionally filter by topic_id. Escalated decisions are included and highlighted by their escalation_note. """ results = _get("/decisions", {"decision_type": "pending", "topic_id": topic_id}) return json.dumps(results, indent=2) @mcp.tool() def get_recent_progress(limit: int = 20, since: str | None = None) -> str: """Retrieve recent progress events to reconstruct session history. Args: limit: max events to return (default 20) since: ISO datetime string — only events after this timestamp """ return json.dumps(_get("/progress", {"limit": limit, "since": since}), indent=2) # --------------------------------------------------------------------------- # Mutate tools # --------------------------------------------------------------------------- @mcp.tool() def create_workstream( topic_id: str, title: str, slug: str | None = None, description: str | None = None, owner: str | None = None, due_date: str | None = None, ) -> str: """Create a new workstream under a topic and emit a progress_event. Args: topic_id: UUID of the parent topic title: workstream title slug: URL-friendly identifier (auto-generated from title if omitted) description: optional longer description owner: optional owner name due_date: optional ISO date string (YYYY-MM-DD) """ if not slug: slug = re.sub(r"[^a-z0-9]+", "-", title.lower()).strip("-") ws = _post("/workstreams", { "topic_id": topic_id, "title": title, "slug": slug, "description": description, "owner": owner, "due_date": due_date, "status": "active", }) _post("/progress", { "topic_id": topic_id, "workstream_id": ws["id"], "event_type": "workstream_created", "summary": f"Workstream created: {title}", "author": "custodian", "detail": {"owner": owner, "slug": slug}, }) return json.dumps(ws, indent=2) @mcp.tool() def create_task( workstream_id: str, title: str, priority: str = "medium", description: str | None = None, assignee: str | None = None, due_date: str | None = None, ) -> str: """Create a new task and emit a progress_event. Args: workstream_id: UUID of the parent workstream title: task title priority: low | medium | high | critical description: optional longer description assignee: optional assignee name due_date: optional ISO date string (YYYY-MM-DD) """ task = _post("/tasks", { "workstream_id": workstream_id, "title": title, "priority": priority, "description": description, "assignee": assignee, "due_date": due_date, }) _post("/progress", { "workstream_id": workstream_id, "task_id": task["id"], "event_type": "task_created", "summary": f"Task created: {title}", "author": "custodian", "detail": {"priority": priority, "assignee": assignee}, }) return json.dumps(task, indent=2) @mcp.tool() def update_task_status( task_id: str, status: str, blocking_reason: str | None = None, ) -> str: """Update a task's status. blocking_reason is required when status='blocked'. Args: task_id: UUID of the task status: todo | in_progress | blocked | done | cancelled blocking_reason: required when status=blocked """ body: dict[str, Any] = {"status": status} if blocking_reason: body["blocking_reason"] = blocking_reason task = _patch(f"/tasks/{task_id}", body) _post("/progress", { "task_id": task_id, "workstream_id": task.get("workstream_id"), "event_type": "task_status_changed", "summary": f"Task status → {status}: {task['title']}", "author": "custodian", "detail": {"blocking_reason": blocking_reason}, }) return json.dumps(task, indent=2) @mcp.tool() def record_decision( title: str, decision_type: str = "pending", topic_id: str | None = None, workstream_id: str | None = None, description: str | None = None, rationale: str | None = None, decided_by: str | None = None, deadline: str | None = None, ) -> str: """Record a decision (made or pending). Pending decisions touching financial/legal topics are auto-escalated per constitution §4. Args: title: decision title decision_type: made | pending topic_id: optional topic UUID workstream_id: optional workstream UUID (at least one required) description: optional context rationale: reasoning behind the decision decided_by: person/agent who decided deadline: ISO datetime string for when decision is needed """ decision = _post("/decisions", { "title": title, "decision_type": decision_type, "topic_id": topic_id, "workstream_id": workstream_id, "description": description, "rationale": rationale, "decided_by": decided_by, "deadline": deadline, }) _post("/progress", { "topic_id": topic_id, "workstream_id": workstream_id, "decision_id": decision["id"], "event_type": "decision_recorded", "summary": f"Decision recorded ({decision_type}): {title}", "author": "custodian", "detail": {"status": decision.get("status"), "escalation_note": decision.get("escalation_note")}, }) return json.dumps(decision, indent=2) @mcp.tool() def resolve_decision( decision_id: str, rationale: str, decided_by: str, ) -> str: """Mark a decision as resolved. Args: decision_id: UUID of the decision rationale: final reasoning/outcome decided_by: who resolved it """ decision = _patch(f"/decisions/{decision_id}", { "status": "resolved", "decision_type": "made", "rationale": rationale, "decided_by": decided_by, "decided_at": datetime.utcnow().isoformat() + "Z", }) _post("/progress", { "topic_id": decision.get("topic_id"), "workstream_id": decision.get("workstream_id"), "decision_id": decision_id, "event_type": "decision_resolved", "summary": f"Decision resolved by {decided_by}: {decision['title']}", "author": "custodian", "detail": {"rationale": rationale}, }) return json.dumps(decision, indent=2) @mcp.tool() def add_progress_event( summary: str, event_type: str = "note", topic_id: str | None = None, workstream_id: str | None = None, task_id: str | None = None, detail: dict | None = None, ) -> str: """Append a progress event to the log. Args: summary: human-readable summary of what happened event_type: free-form label, e.g. note | milestone | blocker | insight topic_id: optional topic UUID workstream_id: optional workstream UUID task_id: optional task UUID detail: optional structured data (JSONB) """ event = _post("/progress", { "topic_id": topic_id, "workstream_id": workstream_id, "task_id": task_id, "event_type": event_type, "summary": summary, "author": "custodian", "detail": detail, }) return json.dumps(event, indent=2) @mcp.tool() def update_workstream_status(workstream_id: str, status: str) -> str: """Update a workstream's status. Args: workstream_id: UUID of the workstream status: active | blocked | completed | archived """ ws = _patch(f"/workstreams/{workstream_id}", {"status": status}) _post("/progress", { "workstream_id": workstream_id, "topic_id": ws.get("topic_id"), "event_type": "workstream_status_changed", "summary": f"Workstream status → {status}: {ws['title']}", "author": "custodian", }) return json.dumps(ws, indent=2) # --------------------------------------------------------------------------- # Next-steps suggestion tool (S2.3) — sanctioned write use case #2 # --------------------------------------------------------------------------- @mcp.tool() def get_next_steps() -> str: """Surface contextual next-action suggestions derived from hub state. Returns suggestions based on: - Recently resolved decisions → first open task in the same workstream - Workstreams whose every dependency is now completed → first todo task Each suggestion includes domain, workstream, task, and a plain-language message. The hub surfaces *what* and *where* — the domain owns *how*. This is one of the two sanctioned write-side use cases of the State Hub (the other is resolve_decision). Suggestions are derived, not persisted. """ return json.dumps(_get("/state/next_steps"), indent=2) # --------------------------------------------------------------------------- # Dependency graph tools (S1.4) # --------------------------------------------------------------------------- @mcp.tool() def create_dependency( from_workstream_id: str, to_workstream_id: str, description: str | None = None, ) -> str: """Record that one workstream depends on another. Semantics: from_workstream cannot fully proceed until to_workstream reaches a satisfactory state. Args: from_workstream_id: UUID of the workstream that has the dependency to_workstream_id: UUID of the workstream it depends on description: optional human-readable explanation of the dependency """ dep = _post(f"/workstreams/{from_workstream_id}/dependencies", { "to_workstream_id": to_workstream_id, "description": description, }) return json.dumps(dep, indent=2) @mcp.tool() def list_dependencies(workstream_id: str) -> str: """Return all dependency edges touching a workstream (both directions). The response distinguishes edges where this workstream is the dependent (depends_on) from edges where it is the blocker (blocks). Args: workstream_id: UUID of the workstream to inspect """ edges = _get(f"/workstreams/{workstream_id}/dependencies") depends_on = [e for e in edges if e["from_workstream_id"] == workstream_id] blocks = [e for e in edges if e["to_workstream_id"] == workstream_id] return json.dumps({"depends_on": depends_on, "blocks": blocks}, indent=2) # --------------------------------------------------------------------------- # Extension points & technical debt # --------------------------------------------------------------------------- @mcp.tool() def register_extension_point( domain: str, title: str, ep_type: str, description: str | None = None, location: str | None = None, priority: str = "medium", ep_id: str | None = None, topic_id: str | None = None, workstream_id: str | None = None, ) -> str: """Register a discovered extension point — optional future functionality not yet committed. Extension points capture design forks: things the system *could* do that have been noticed and parked for deliberate later consideration. Args: domain: one of custodian | railiance | markitect | coulomb_social | personhood | foerster_capabilities title: short description of the extension ep_type: api | schema | mcp | dashboard | architecture | integration | other description: longer explanation of what the extension would add location: file:line or module where the extension point was noticed priority: low | medium | high | critical ep_id: optional human-readable ID, e.g. EP-CUST-001 (auto-assigned if omitted) topic_id: UUID of related topic workstream_id: UUID of related workstream """ ep = _post("/extension-points", { "domain": domain, "title": title, "ep_type": ep_type, "description": description, "location": location, "priority": priority, "ep_id": ep_id, "topic_id": topic_id, "workstream_id": workstream_id, }) _post("/progress", { "summary": f"Extension point registered: [{ep.get('ep_id') or ep['id'][:8]}] {title} ({ep_type}, {domain})", "event_type": "extension_point", "detail": {"id": ep["id"], "ep_id": ep.get("ep_id"), "ep_type": ep_type, "domain": domain}, }) return json.dumps(ep, indent=2) @mcp.tool() def list_extension_points( domain: str | None = None, status: str | None = None, ep_type: str | None = None, ) -> str: """List extension points, optionally filtered. Args: domain: filter by domain status: open | in_progress | addressed | deferred | wont_fix ep_type: api | schema | mcp | dashboard | architecture | integration | other """ return json.dumps(_get("/extension-points", { "domain": domain, "status": status, "ep_type": ep_type, }), indent=2) @mcp.tool() def update_ep_status(ep_uuid: str, status: str) -> str: """Update the status of an extension point. Args: ep_uuid: UUID of the extension point status: open | in_progress | addressed | deferred | wont_fix """ ep = _patch(f"/extension-points/{ep_uuid}", {"status": status}) _post("/progress", { "summary": f"Extension point status → {status}: {ep['title']}", "event_type": "extension_point", "detail": {"id": ep_uuid, "status": status}, }) return json.dumps(ep, indent=2) @mcp.tool() def register_technical_debt( domain: str, title: str, debt_type: str, description: str | None = None, location: str | None = None, severity: str = "medium", td_id: str | None = None, topic_id: str | None = None, workstream_id: str | None = None, ) -> str: """Register a technical debt item — a known quality compromise to address later. Technical debt captures intentional or discovered shortcuts, design weaknesses, missing tests, and similar issues that reduce codebase health. Args: domain: one of custodian | railiance | markitect | coulomb_social | personhood | foerster_capabilities title: short description of the debt debt_type: design | implementation | test | docs | dependencies | performance | security | other description: what the issue is and what the correct fix would be location: file:line or module where the debt lives severity: low | medium | high | critical td_id: optional human-readable ID, e.g. TD-CUST-001 topic_id: UUID of related topic workstream_id: UUID of related workstream """ td = _post("/technical-debt", { "domain": domain, "title": title, "debt_type": debt_type, "description": description, "location": location, "severity": severity, "td_id": td_id, "topic_id": topic_id, "workstream_id": workstream_id, }) _post("/progress", { "summary": f"Technical debt registered: [{td.get('td_id') or td['id'][:8]}] {title} ({debt_type}, {severity}, {domain})", "event_type": "technical_debt", "detail": {"id": td["id"], "td_id": td.get("td_id"), "debt_type": debt_type, "severity": severity, "domain": domain}, }) return json.dumps(td, indent=2) @mcp.tool() def list_technical_debt( domain: str | None = None, status: str | None = None, debt_type: str | None = None, severity: str | None = None, ) -> str: """List technical debt items, optionally filtered. Args: domain: filter by domain status: open | in_progress | resolved | deferred | wont_fix debt_type: design | implementation | test | docs | dependencies | performance | security | other severity: low | medium | high | critical """ return json.dumps(_get("/technical-debt", { "domain": domain, "status": status, "debt_type": debt_type, "severity": severity, }), indent=2) @mcp.tool() def update_td_status(td_uuid: str, status: str) -> str: """Update the status of a technical debt item. Args: td_uuid: UUID of the technical debt item status: open | in_progress | resolved | deferred | wont_fix """ td = _patch(f"/technical-debt/{td_uuid}", {"status": status}) _post("/progress", { "summary": f"Technical debt status → {status}: {td['title']}", "event_type": "technical_debt", "detail": {"id": td_uuid, "status": status}, }) return json.dumps(td, indent=2) # --------------------------------------------------------------------------- # Domain lifecycle + repo registration tools (v0.5) # --------------------------------------------------------------------------- @mcp.tool() def list_domains(status: str = "active") -> str: """List all registered domains. Args: status: active | archived | all (default: active) """ return json.dumps(_get("/domains", {"status": status}), indent=2) @mcp.tool() def create_domain(slug: str, name: str, description: str | None = None) -> str: """Create a new domain. Args: slug: URL-friendly identifier (lowercase, underscored), e.g. 'my_project' name: Human-readable display name description: optional longer description """ domain = _post("/domains", {"slug": slug, "name": name, "description": description}) _post("/progress", { "event_type": "milestone", "summary": f"Domain created: {slug} ({name})", "author": "custodian", "detail": {"slug": slug, "name": name}, }) return json.dumps(domain, indent=2) @mcp.tool() def rename_domain(slug: str, new_slug: str, new_name: str) -> str: """Rename a domain — cascades to EP/TD string columns. Args: slug: Current domain slug new_slug: New URL-friendly identifier new_name: New human-readable display name """ domain = _patch(f"/domains/{slug}/rename", {"new_slug": new_slug, "new_name": new_name}) _post("/progress", { "event_type": "milestone", "summary": f"Domain renamed: {slug} → {new_slug} ({new_name})", "author": "custodian", "detail": {"old_slug": slug, "new_slug": new_slug, "new_name": new_name}, }) return json.dumps(domain, indent=2) @mcp.tool() def archive_domain(slug: str) -> str: """Archive a domain (soft-delete). Fails if active topics exist. Args: slug: Domain slug to archive """ domain = _patch(f"/domains/{slug}/archive", {}) _post("/progress", { "event_type": "note", "summary": f"Domain archived: {slug}", "author": "custodian", "detail": {"slug": slug}, }) return json.dumps(domain, indent=2) @mcp.tool() def list_domain_repos(domain_slug: str) -> str: """List all repositories registered under a domain. Args: domain_slug: Domain slug to filter by """ return json.dumps(_get("/repos", {"domain": domain_slug}), indent=2) @mcp.tool() def register_repo( domain_slug: str, name: str, slug: str | None = None, local_path: str | None = None, remote_url: str | None = None, description: str | None = None, ) -> str: """Register a git repository under a domain. Args: domain_slug: Domain slug (must already exist) name: Human-readable repository name slug: URL-friendly identifier (auto-generated from name if omitted) local_path: Absolute local filesystem path to the repo remote_url: Remote git URL (Gitea, GitHub, etc.) description: optional description """ import re as _re if not slug: slug = _re.sub(r"[^a-z0-9]+", "-", name.lower()).strip("-") repo = _post("/repos", { "domain_slug": domain_slug, "slug": slug, "name": name, "local_path": local_path, "remote_url": remote_url, "description": description, }) _post("/progress", { "event_type": "milestone", "summary": f"Repo registered: {name} under domain '{domain_slug}'", "author": "custodian", "detail": {"slug": slug, "domain_slug": domain_slug, "local_path": local_path, "remote_url": remote_url}, }) return json.dumps(repo, indent=2) # --------------------------------------------------------------------------- # ADR-001 compliance validation # --------------------------------------------------------------------------- @mcp.tool() def validate_repo_adr(repo_path: str, domain_slug: str | None = None) -> str: """Check whether a repository is consistent with ADR-001. Validates that workplan files exist in workplans/ with correct frontmatter, that state_hub_workstream_id references resolve to real DB records, and that no active state-hub workstreams for the domain lack a backing file (orphan detection — DB-only records are an ADR-001 violation). Args: repo_path: Absolute path to the repository root. domain_slug: Domain slug for orphan detection (e.g. 'custodian'). If omitted, inferred from workplan frontmatter. """ import subprocess script = Path(__file__).parent.parent / "scripts" / "validate_repo_adr.py" cmd = [sys.executable, str(script), repo_path, "--json", "--api-base", API_BASE] if domain_slug: cmd += ["--domain", domain_slug] result = subprocess.run(cmd, capture_output=True, text=True) try: data = json.loads(result.stdout) except json.JSONDecodeError: return f"Validator script error:\n{result.stderr or result.stdout or '(no output)'}" findings = data.get("findings", []) summary = data.get("summary", {}) overall = data.get("result", "unknown") failures = [f for f in findings if f["level"] == "FAIL"] warnings = [f for f in findings if f["level"] == "WARN"] lines = [f"ADR-001 Compliance: {repo_path}", ""] if failures: lines.append(f"FAILURES ({len(failures)}):") for f in failures: loc = f" [{f['file']}]" if f.get("file") else "" lines.append(f" FAIL {f['check']}{loc}") lines.append(f" {f['detail']}") lines.append("") if warnings: lines.append(f"WARNINGS ({len(warnings)}):") for f in warnings: loc = f" [{f['file']}]" if f.get("file") else "" lines.append(f" WARN {f['check']}{loc}") lines.append(f" {f['detail']}") lines.append("") lines.append( f"Summary: {summary.get('pass', 0)} pass | " f"{summary.get('warn', 0)} warn | " f"{summary.get('fail', 0)} fail" ) lines.append(f"Result: {'FAIL' if overall == 'fail' else 'PASS (with warnings)' if overall == 'warn' else 'PASS'}") return "\n".join(lines) # --------------------------------------------------------------------------- # Contribution tracking (v0.3) # --------------------------------------------------------------------------- @mcp.resource("state://contributions") def resource_contributions() -> str: """All contribution artifacts (BR/FR/EP/UPR).""" return json.dumps(_get("/contributions"), indent=2) @mcp.resource("state://sbom/aggregated") def resource_sbom_aggregated() -> str: """Aggregated SBOM entries across all repos.""" return json.dumps(_get("/sbom"), indent=2) @mcp.resource("state://sbom/{repo_slug}") def resource_sbom_repo(repo_slug: str) -> str: """SBOM view for a specific repo (by slug).""" return json.dumps(_get(f"/sbom/{repo_slug}"), indent=2) @mcp.tool() def register_contribution( type: str, title: str, target_org: str | None = None, target_repo: str | None = None, body_path: str | None = None, related_workstream_id: str | None = None, notes: str | None = None, ) -> str: """Register a new upstream contribution artifact (BR/FR/EP/UPR). Args: type: br | fr | ep | upr title: Short human-readable title target_org: GitHub org or owner of the upstream project target_repo: Repository name of the upstream project body_path: Relative path to the Markdown artifact file in the repo related_workstream_id: UUID of the related workstream (optional) notes: Any additional notes (optional) """ contrib = _post("/contributions", { "type": type, "title": title, "target_org": target_org, "target_repo": target_repo, "body_path": body_path, "related_workstream_id": related_workstream_id, "notes": notes, }) _post("/progress", { "workstream_id": related_workstream_id, "event_type": "contribution_registered", "summary": f"Contribution registered [{type.upper()}]: {title}", "author": "custodian", "detail": { "contribution_id": contrib["id"], "type": type, "target": f"{target_org}/{target_repo}" if target_org else target_repo, "body_path": body_path, }, }) return json.dumps(contrib, indent=2) @mcp.tool() def update_contribution_status( contribution_id: str, status: str, notes: str | None = None, ) -> str: """Update the status of a contribution artifact. Valid transitions: draft→submitted→acknowledged→accepted→merged ↘ ↘ rejected withdrawn Args: contribution_id: UUID of the contribution status: submitted | acknowledged | accepted | rejected | merged | withdrawn notes: Optional context for the status change """ contrib = _patch(f"/contributions/{contribution_id}/status", { "status": status, "notes": notes, }) _post("/progress", { "event_type": "contribution_status_changed", "summary": f"Contribution status → {status}: {contrib['title']}", "author": "custodian", "detail": {"contribution_id": contribution_id, "status": status, "notes": notes}, }) return json.dumps(contrib, indent=2) @mcp.tool() def get_contributions( type: str | None = None, status: str | None = None, target_repo: str | None = None, ) -> str: """List contribution artifacts, optionally filtered. Args: type: br | fr | ep | upr (optional) status: draft | submitted | acknowledged | accepted | rejected | merged | withdrawn (optional) target_repo: filter by upstream repo name (optional) """ return json.dumps(_get("/contributions", { "type": type, "status": status, "target_repo": target_repo, }), indent=2) @mcp.tool() def ingest_sbom_tool(repo_slug: str, lockfile_path: str) -> str: """Ingest a lockfile into the State Hub SBOM store for a repo. Parses the lockfile and POSTs entries to /sbom/ingest/. Old entries for the repo are replaced (snapshot strategy). Args: repo_slug: Managed-repo slug (must be registered via register_repo) lockfile_path: Absolute path to the lockfile (uv.lock, package-lock.json, Cargo.lock, etc.) """ import subprocess script = Path(__file__).parent.parent / "scripts" / "ingest_sbom.py" result = subprocess.run( [sys.executable, str(script), "--repo", repo_slug, "--lockfile", lockfile_path, "--api-base", API_BASE], capture_output=True, text=True, ) output = (result.stdout + result.stderr).strip() if result.returncode != 0: return f"ingest_sbom failed (exit {result.returncode}):\n{output}" return output @mcp.tool() def get_licence_report() -> str: """Get a licence report across all ingested SBOM entries. Returns packages grouped by SPDX licence identifier, with copyleft flag (GPL/AGPL/LGPL/EUPL/CDDL/MPL) and repos using each licence. """ return json.dumps(_get("/sbom/report/licences"), indent=2) # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- if __name__ == "__main__": mcp.run(transport="stdio")