"""OpsBridge MCP server — exposes bridge and catalog operations as FastMCP tools. Entry point (stdio): uv run python src/bridge/mcp_server/server.py The server imports the Python library directly — no subprocess required. All tool functions return JSON-serialisable dicts/lists. """ from __future__ import annotations import dataclasses import json import os from pathlib import Path from typing import Optional from fastmcp import FastMCP from bridge.diagnostics import check_all_tunnels, check_tunnel from bridge.state import StateManager mcp = FastMCP( name="ops-bridge", instructions=( "OpsBridge MCP server. Use bridge_status to check tunnel health, " "bridge_up/down/restart to manage lifecycle, bridge_logs for audit history. " "catalog_* tools require catalog_path to be configured in tunnels.yaml." ), ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _state_dir() -> Path: return Path(os.environ.get("BRIDGE_STATE_DIR", str(Path.home() / ".local" / "state" / "bridge"))) def _load_cfg(): from bridge.config import load_config return load_config() def _load_cfg_or_error() -> tuple: """Return (cfg, None) or (None, error_dict).""" try: return _load_cfg(), None except Exception as e: return None, {"error": str(e)} def _load_catalog(cfg): """Return (catalog, None) or (None, error_dict).""" if cfg.catalog_path is None: return None, {"error": "catalog_path not configured"} try: from bridge.catalog.loader import load_catalog return load_catalog(cfg.catalog_path), None except Exception as e: return None, {"error": f"Failed to load catalog: {e}"} # --------------------------------------------------------------------------- # Bridge lifecycle tools # --------------------------------------------------------------------------- @mcp.tool() def bridge_up(tunnel: Optional[str] = None) -> dict: """Start one or all configured tunnels. Args: tunnel: Tunnel name to start. If omitted, starts all inline tunnels. Returns: {"started": [...], "already_running": [...]} or {"error": "..."} """ cfg, err = _load_cfg_or_error() if err: return err from bridge.manager import TunnelManager sd = _state_dir() started = [] already_running = [] if tunnel: from bridge.catalog.loader import load_catalog from bridge.catalog.resolver import BridgeNotFound, resolve catalog = None if cfg.catalog_path is not None: try: catalog = load_catalog(cfg.catalog_path) except Exception: pass try: tcfg = resolve(tunnel, catalog=catalog, inline_tunnels=cfg.tunnels) except BridgeNotFound: return {"error": f"Tunnel '{tunnel}' not found in config or catalog"} mgr = TunnelManager(tcfg, state_dir=sd) if mgr.is_running(): already_running.append(tunnel) else: mgr.start() started.append(tunnel) else: for name, tcfg in cfg.tunnels.items(): mgr = TunnelManager(tcfg, state_dir=sd) if mgr.is_running(): already_running.append(name) else: mgr.start() started.append(name) return {"started": started, "already_running": already_running} @mcp.tool() def bridge_down(tunnel: Optional[str] = None) -> dict: """Stop one or all configured tunnels. Args: tunnel: Tunnel name to stop. If omitted, stops all inline tunnels. Returns: {"stopped": [...], "not_running": [...]} or {"error": "..."} """ cfg, err = _load_cfg_or_error() if err: return err from bridge.manager import TunnelManager sd = _state_dir() stopped = [] not_running = [] if tunnel: from bridge.catalog.loader import load_catalog from bridge.catalog.resolver import BridgeNotFound, resolve catalog = None if cfg.catalog_path is not None: try: catalog = load_catalog(cfg.catalog_path) except Exception: pass try: tcfg = resolve(tunnel, catalog=catalog, inline_tunnels=cfg.tunnels) except BridgeNotFound: return {"error": f"Tunnel '{tunnel}' not found in config or catalog"} mgr = TunnelManager(tcfg, state_dir=sd) if not mgr.is_running(): not_running.append(tunnel) else: mgr.stop() stopped.append(tunnel) else: for name, tcfg in cfg.tunnels.items(): mgr = TunnelManager(tcfg, state_dir=sd) if not mgr.is_running(): not_running.append(name) else: mgr.stop() stopped.append(name) return {"stopped": stopped, "not_running": not_running} @mcp.tool() def bridge_restart(tunnel: Optional[str] = None) -> dict: """Restart one or all configured tunnels. Args: tunnel: Tunnel name to restart. If omitted, restarts all inline tunnels. Returns: {"restarted": [...]} or {"error": "..."} """ cfg, err = _load_cfg_or_error() if err: return err from bridge.manager import TunnelManager sd = _state_dir() restarted = [] if tunnel: from bridge.catalog.loader import load_catalog from bridge.catalog.resolver import BridgeNotFound, resolve catalog = None if cfg.catalog_path is not None: try: catalog = load_catalog(cfg.catalog_path) except Exception: pass try: tcfg = resolve(tunnel, catalog=catalog, inline_tunnels=cfg.tunnels) except BridgeNotFound: return {"error": f"Tunnel '{tunnel}' not found in config or catalog"} mgr = TunnelManager(tcfg, state_dir=sd) mgr.stop() mgr.start() restarted.append(tunnel) else: for name, tcfg in cfg.tunnels.items(): mgr = TunnelManager(tcfg, state_dir=sd) mgr.stop() mgr.start() restarted.append(name) return {"restarted": restarted} @mcp.tool() def bridge_status() -> list[dict]: """Return status of all configured tunnels. Returns: List of tunnel status dicts, each with keys: tunnel, state, actor, host, pid, uptime, health """ cfg, err = _load_cfg_or_error() if err: return [err] sd = _state_dir() state_mgr = StateManager(state_dir=sd) rows = [] for name, tcfg in cfg.tunnels.items(): state = state_mgr.read_state(name) pid = state_mgr.read_pid(name) rows.append({ "tunnel": name, "state": state.value, "actor": tcfg.actor, "host": tcfg.host, "pid": pid, "uptime": None, "health": None, }) return rows @mcp.tool() def bridge_logs(tunnel: str, lines: int = 50) -> list[dict]: """Return recent audit log entries for a tunnel. Args: tunnel: Tunnel name. lines: Maximum number of log entries to return (default 50). Returns: List of audit event dicts (timestamp, event, actor, detail). """ cfg, err = _load_cfg_or_error() if err: return [err] from bridge.catalog.loader import load_catalog from bridge.catalog.resolver import BridgeNotFound, resolve catalog = None if cfg.catalog_path is not None: try: catalog = load_catalog(cfg.catalog_path) except Exception: pass try: resolve(tunnel, catalog=catalog, inline_tunnels=cfg.tunnels) except BridgeNotFound: return [{"error": f"Tunnel '{tunnel}' not found in config or catalog"}] from bridge.audit import AuditLogger sd = _state_dir() logger = AuditLogger(state_dir=sd) events = logger.read_events(tunnel) return events[-lines:] if events else [] # --------------------------------------------------------------------------- # Catalog tools # --------------------------------------------------------------------------- @mcp.tool() def catalog_list_targets(domain: Optional[str] = None) -> list[dict]: """List all infrastructure targets from the OpsCatalog. Args: domain: Optional domain filter. Returns: List of target dicts (id, domain, kind, description, reachable_via). Returns [{"error": "..."}] when catalog is not configured or fails to load. """ cfg, err = _load_cfg_or_error() if err: return [err] catalog, err = _load_catalog(cfg) if err: return [err] targets = [] for t in catalog.targets.values(): if domain and t.domain != domain: continue targets.append({ "id": t.id, "domain": t.domain, "kind": t.kind, "description": t.description or "", "reachable_via": list(t.reachable_via), }) return targets @mcp.tool() def catalog_show_target(target_id: str) -> dict: """Show full metadata for a catalog target. Args: target_id: The target identifier. Returns: Target metadata dict, or {"error": "..."}. """ cfg, err = _load_cfg_or_error() if err: return err catalog, err = _load_catalog(cfg) if err: return err if target_id not in catalog.targets: return {"error": f"Target '{target_id}' not found"} t = catalog.targets[target_id] return { "id": t.id, "domain": t.domain, "kind": t.kind, "description": t.description or "", "reachable_via": list(t.reachable_via), } @mcp.tool() def catalog_list_domains() -> list[dict]: """List all domains in the OpsCatalog with target and bridge counts. Returns: List of domain dicts (id, name, environment, target_count, bridge_count). Returns [{"error": "..."}] when catalog is not configured or fails to load. """ cfg, err = _load_cfg_or_error() if err: return [err] catalog, err = _load_catalog(cfg) if err: return [err] domains = [] for d in catalog.domains.values(): target_count = sum(1 for t in catalog.targets.values() if t.domain == d.id) bridge_count = sum(1 for b in catalog.bridges.values() if b.domain == d.id) domains.append({ "id": d.id, "name": d.name, "environment": d.environment, "description": d.description or "", "target_count": target_count, "bridge_count": bridge_count, }) return domains @mcp.tool() def catalog_validate() -> dict: """Validate the OpsCatalog for consistency errors. Returns: {"valid": True} or {"valid": False, "errors": ["..."]} """ cfg, err = _load_cfg_or_error() if err: return {"valid": False, "errors": [err["error"]]} catalog, err = _load_catalog(cfg) if err: return {"valid": False, "errors": [err["error"]]} from bridge.catalog.validator import validate_catalog errors = validate_catalog(catalog) if errors: return {"valid": False, "errors": errors} return {"valid": True, "errors": []} @mcp.tool() def catalog_show_bridge(bridge_id: str) -> dict: """Show full metadata for a catalog bridge definition. Args: bridge_id: The bridge identifier. Returns: Bridge metadata dict, or {"error": "..."}. """ cfg, err = _load_cfg_or_error() if err: return err catalog, err = _load_catalog(cfg) if err: return err if bridge_id not in catalog.bridges: return {"error": f"Bridge '{bridge_id}' not found"} b = catalog.bridges[bridge_id] result = { "id": b.id, "domain": b.domain, "target": b.target, "host": b.host, "remote_port": b.remote_port, "local_port": b.local_port, "ssh_user": b.ssh_user, "actor": b.actor, "access_method": b.access_method, "description": b.description or "", } if b.health_check: result["health_check"] = { "url": b.health_check.url, "interval_seconds": b.health_check.interval_seconds, "timeout_seconds": b.health_check.timeout_seconds, } return result # --------------------------------------------------------------------------- # Diagnostics tool # --------------------------------------------------------------------------- @mcp.tool() def bridge_check(tunnel: Optional[str] = None) -> list[dict]: """End-to-end diagnostics: SSH process alive + remote port listening. Args: tunnel: Specific tunnel name, or None for all inline tunnels. Returns: List of dicts with keys: tunnel, ssh_process, pid, remote_port, local_api, latency_ms, stale_state, ok. Returns [{"error": "..."}] on config load failure. """ cfg, err = _load_cfg_or_error() if err: return [err] sd = _state_dir() state_mgr = StateManager(state_dir=sd) if tunnel: from bridge.catalog.loader import load_catalog from bridge.catalog.resolver import BridgeNotFound, resolve catalog = None if cfg.catalog_path is not None: try: catalog = load_catalog(cfg.catalog_path) except Exception: pass try: tcfg = resolve(tunnel, catalog=catalog, inline_tunnels=cfg.tunnels) except BridgeNotFound: return [{"error": f"Tunnel '{tunnel}' not found in config or catalog"}] results = [check_tunnel(tcfg, state_mgr)] else: results = check_all_tunnels(cfg, state_mgr) return [{**dataclasses.asdict(r), "ok": r.ok} for r in results] # --------------------------------------------------------------------------- # MCP resources # --------------------------------------------------------------------------- @mcp.resource("bridge://status") def resource_bridge_status() -> str: """Live snapshot of all tunnel states as JSON.""" rows = bridge_status() return json.dumps(rows, indent=2) @mcp.resource("bridge://check") def resource_bridge_check() -> str: """Live end-to-end diagnostic snapshot for all tunnels.""" return json.dumps(bridge_check(), indent=2) @mcp.resource("catalog://domains") def resource_catalog_domains() -> str: """List of all catalog domains as JSON.""" domains = catalog_list_domains() return json.dumps(domains, indent=2) @mcp.resource("catalog://targets") def resource_catalog_targets() -> str: """List of all catalog targets as JSON.""" targets = catalog_list_targets() return json.dumps(targets, indent=2) # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="OpsBridge MCP server") parser.add_argument("--http", action="store_true", help="Run in SSE/HTTP mode instead of stdio") args = parser.parse_args() if args.http: port = int(os.environ.get("BRIDGE_MCP_PORT", "8002")) mcp.run(transport="sse", host="127.0.0.1", port=port) else: mcp.run(transport="stdio")