from __future__ import annotations import json from typing import Any import httpx from fastmcp import FastMCP from hub_core.utils.routing import normalize_trailing_slash class HubCoreMCPServer: """FastMCP base server for generic FOS hub tools. The MCP layer is intentionally a thin HTTP client. Hubs keep their business rules in FastAPI routers and inject only the API base URL here. """ def __init__( self, *, name: str, api_base: str, instructions: str | None = None, register_tools: bool = True, ) -> None: self.api_base = api_base.rstrip("/") self.mcp = FastMCP( name=name, instructions=instructions or "Generic FOS hub MCP server.", ) if register_tools: self.register_core_tools() def register_core_tools(self) -> None: @self.mcp.tool() def get_state_summary() -> str: return self._json(self._get("/state/summary/")) @self.mcp.tool() def list_domains(status: str | None = None) -> str: return self._json(self._get("/domains/", {"status": status})) @self.mcp.tool() def get_domain_summary(domain_slug: str) -> str: return self._json(self._get(f"/domains/{domain_slug}/")) @self.mcp.tool() def get_domain(domain_slug: str) -> str: return self._json(self._get(f"/domains/{domain_slug}/")) @self.mcp.tool() def send_message( from_agent: str, to_agent: str, subject: str, body: str, thread_id: str | None = None, ) -> str: return self._json( self._post( "/messages/", { "from_agent": from_agent, "to_agent": to_agent, "subject": subject, "body": body, "thread_id": thread_id, }, ) ) @self.mcp.tool() def get_messages( to_agent: str | None = None, from_agent: str | None = None, unread_only: bool = False, limit: int = 50, ) -> str: return self._json( self._get( "/messages/", { "to_agent": to_agent, "from_agent": from_agent, "unread_only": unread_only, "limit": limit, }, ) ) @self.mcp.tool() def mark_message_read(message_id: str) -> str: return self._json(self._patch(f"/messages/{message_id}/read/", {})) @self.mcp.tool() def reply_to_message(message_id: str, from_agent: str, body: str) -> str: return self._json( self._post( f"/messages/{message_id}/reply/", {"from_agent": from_agent, "body": body}, ) ) @self.mcp.tool() def register_capability( domain: str, capability_type: str, title: str, description: str | None = None, keywords: list[str] | None = None, repo_slug: str | None = None, ) -> str: return self._json( self._post( "/capability-catalog/", { "domain": domain, "capability_type": capability_type, "title": title, "description": description, "keywords": keywords or [], "repo_slug": repo_slug, }, ) ) @self.mcp.tool() def list_capabilities( domain: str | None = None, capability_type: str | None = None, status: str | None = None, ) -> str: return self._json( self._get( "/capability-catalog/", {"domain": domain, "capability_type": capability_type, "status": status}, ) ) @self.mcp.tool() def request_capability( title: str, capability_type: str, requesting_domain: str, requesting_agent: str, description: str | None = None, priority: str = "medium", request_context: dict[str, Any] | None = None, catalog_entry_id: str | None = None, ) -> str: return self._json( self._post( "/capability-requests/", { "title": title, "description": description, "capability_type": capability_type, "priority": priority, "requesting_domain": requesting_domain, "requesting_agent": requesting_agent, "request_context": request_context, "catalog_entry_id": catalog_entry_id, }, ) ) @self.mcp.tool() def accept_capability_request( request_id: str, fulfilling_agent: str, fulfillment_context: dict[str, Any] | None = None, ) -> str: return self._json( self._post( f"/capability-requests/{request_id}/accept/", { "fulfilling_agent": fulfilling_agent, "fulfillment_context": fulfillment_context, }, ) ) @self.mcp.tool() def update_capability_request_status( request_id: str, status: str, note: str | None = None, ) -> str: return self._json( self._patch( f"/capability-requests/{request_id}/status/", {"status": status, "note": note}, ) ) @self.mcp.tool() def list_capability_requests( domain: str | None = None, status: str | None = None, capability_type: str | None = None, ) -> str: return self._json( self._get( "/capability-requests/", {"domain": domain, "status": status, "capability_type": capability_type}, ) ) @self.mcp.tool() def get_capability_request(request_id: str) -> str: return self._json(self._get(f"/capability-requests/{request_id}/")) @self.mcp.tool() def register_repo( domain_slug: str, slug: str, name: str, local_path: str | None = None, remote_url: str | None = None, git_fingerprint: str | None = None, description: str | None = None, ) -> str: return self._json( self._post( "/repos/", { "domain_slug": domain_slug, "slug": slug, "name": name, "local_path": local_path, "remote_url": remote_url, "git_fingerprint": git_fingerprint, "description": description, }, ) ) @self.mcp.tool() def update_repo_path(repo_slug: str, host: str, path: str) -> str: return self._json( self._post(f"/repos/{repo_slug}/paths/", {"host": host, "path": path}) ) @self.mcp.tool() def list_domain_repos(domain_slug: str) -> str: return self._json(self._get("/repos/", {"domain": domain_slug})) @self.mcp.tool() def check_repo_doi(repo_slug: str, force_refresh: bool = False) -> str: return self._json( self._get( f"/repos/{repo_slug}/doi/", {"force_refresh": force_refresh}, ) ) @self.mcp.tool() def get_doi_summary() -> str: return self._json(self._get("/repos/doi/summary/")) @self.mcp.tool() def register_service( slug: str, name: str, provider: str | None = None, category: str | None = None, website_url: str | None = None, pricing_model: str = "unknown", gdpr_maturity: str = "unknown", ) -> str: return self._json( self._post( "/tpsc/catalog/", { "slug": slug, "name": name, "provider": provider, "category": category, "website_url": website_url, "pricing_model": pricing_model, "gdpr_maturity": gdpr_maturity, }, ) ) @self.mcp.tool() def list_services( gdpr_maturity: str | None = None, category: str | None = None, pricing_model: str | None = None, ) -> str: return self._json( self._get( "/tpsc/catalog/", { "gdpr_maturity": gdpr_maturity, "category": category, "pricing_model": pricing_model, }, ) ) @self.mcp.tool() def ingest_tpsc_tool(repo_slug: str, source_file: str, entries: list[dict[str, Any]]) -> str: return self._json( self._post( "/tpsc/ingest/", {"repo_slug": repo_slug, "source_file": source_file, "entries": entries}, ) ) @self.mcp.tool() def get_gdpr_report() -> str: return self._json(self._get("/tpsc/report/gdpr/")) @self.mcp.tool() def get_risks( since: str | None = None, limit: int = 100, offset: int = 0, ) -> str: return self._json( self._get( "/progress/risks/", {"since": since, "limit": limit, "offset": offset}, ) ) @self.mcp.tool() def get_alerts( since: str | None = None, limit: int = 100, offset: int = 0, ) -> str: return self._json( self._get( "/progress/alerts/", {"since": since, "limit": limit, "offset": offset}, ) ) @self.mcp.tool() def append_progress( event_type: str, summary: str, detail: dict[str, Any] | None = None, subject_refs: dict[str, Any] | None = None, author: str | None = None, session_id: str | None = None, ) -> str: return self._json( self._post( "/progress/", { "event_type": event_type, "summary": summary, "detail": detail, "subject_refs": subject_refs, "author": author, "session_id": session_id, }, ) ) def _get(self, path: str, params: dict[str, Any] | None = None) -> Any: try: with self._client() as client: response = client.get( normalize_trailing_slash(path), params=self._clean(params or {}), ) response.raise_for_status() return response.json() except httpx.HTTPStatusError as exc: return {"error": f"API {exc.response.status_code}: {exc.response.text[:300]}"} except Exception as exc: return {"error": f"Request failed: {exc}"} def _post(self, path: str, body: dict[str, Any]) -> Any: try: with self._client() as client: response = client.post(normalize_trailing_slash(path), json=self._clean(body)) response.raise_for_status() return response.json() except httpx.HTTPStatusError as exc: return {"error": f"API {exc.response.status_code}: {exc.response.text[:300]}"} except Exception as exc: return {"error": f"Request failed: {exc}"} def _patch(self, path: str, body: dict[str, Any]) -> Any: try: with self._client() as client: response = client.patch(normalize_trailing_slash(path), json=self._clean(body)) response.raise_for_status() return response.json() except httpx.HTTPStatusError as exc: return {"error": f"API {exc.response.status_code}: {exc.response.text[:300]}"} except Exception as exc: return {"error": f"Request failed: {exc}"} def _client(self) -> httpx.Client: return httpx.Client(base_url=self.api_base, timeout=30.0, follow_redirects=True) @staticmethod def _clean(data: dict[str, Any]) -> dict[str, Any]: return {key: value for key, value in data.items() if value is not None} @staticmethod def _json(data: Any) -> str: return json.dumps(data, indent=2, default=str)