import os, time from typing import Any, Dict, List, Optional from fastapi import FastAPI, Body import httpx PROM = os.getenv("PROM_URL", "http://monitoring-kube-prometheus-prometheus.monitoring:9090") LOKI = os.getenv("LOKI_URL", "http://loki.logging:3100") K8S = os.getenv("K8S_API", "https://kubernetes.default.svc") SERVICE_TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token" CA_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" NAMESPACE_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" def _sa_headers() -> Dict[str, str]: token = "" try: with open(SERVICE_TOKEN_PATH, "r") as f: token = f.read().strip() except FileNotFoundError: pass return {"Authorization": f"Bearer {token}"} if token else {} def _ssl_params() -> Dict[str, Any]: return {"verify": CA_PATH} if os.path.exists(CA_PATH) else {} app = FastAPI(title="MCP Telemetry Bridge", version="0.1.0") RESOURCES = [ {"uri":"res://dashboards/top-pods-by-cpu.promql","mimeType":"text/plain","content": "topk(10, sum by (pod, namespace) (rate(container_cpu_usage_seconds_total{container!=\"\",image!=\"\"}[1m])))"}, {"uri":"res://dashboards/pod-restarts.promql","mimeType":"text/plain","content": "sum by (pod, namespace) (increase(kube_pod_container_status_restarts_total[10m])) > 0"}, {"uri":"res://dashboards/warn-events.logql","mimeType":"text/plain","content": "{app=\"kube-apiserver\"} |= \"Warning\""}, ] TOOLS = [ {"name":"promql.query","inputSchema":{"type":"object","properties":{"expr":{"type":"string"},"range":{"type":"string"}}}}, {"name":"loki.query","inputSchema":{"type":"object","properties":{"logql":{"type":"string"},"limit":{"type":"integer"},"since":{"type":"string"}}}}, {"name":"k8s.get","inputSchema":{"type":"object","properties":{"kind":{"type":"string"},"namespace":{"type":"string"},"name":{"type":"string"}}}}, {"name":"k8s.events","inputSchema":{"type":"object","properties":{"namespace":{"type":"string"},"since":{"type":"string"}}}}, {"name":"inventory.snapshot","inputSchema":{"type":"object","properties":{}}}, ] PROMPTS = [ {"name":"Triage-Now","description":"Summarize current alerts, top offenders and recent warnings."} ] @app.get("/healthz") def healthz(): return {"status":"ok","ts": int(time.time())} @app.get("/mcp/schema") def mcp_schema(): return {"resources": RESOURCES, "tools": TOOLS, "prompts": PROMPTS} @app.get("/mcp/resource") def mcp_resource(uri: str): for r in RESOURCES: if r["uri"] == uri: return {"uri": uri, "mimeType": r["mimeType"], "content": r["content"]} return {"error": "not found", "uri": uri} @app.post("/tools/promql.query") async def promql_query(payload: Dict[str, Any] = Body(...)): expr = payload.get("expr") rng = payload.get("range") params = {"query": expr} if not rng else {"query": f"sum_over_time(({expr})[{rng}])"} async with httpx.AsyncClient() as c: r = await c.get(f"{PROM}/api/v1/query", params=params, timeout=30.0) return r.json() @app.post("/tools/loki.query") async def loki_query(payload: Dict[str, Any] = Body(...)): logql = payload.get("logql") limit = payload.get("limit", 100) params = {"query": logql, "limit": str(limit)} async with httpx.AsyncClient() as c: r = await c.get(f"{LOKI}/loki/api/v1/query", params=params, timeout=30.0) return r.json() @app.post("/tools/k8s.get") async def k8s_get(payload: Dict[str, Any] = Body(...)): kind = payload.get("kind", "").lower() ns = payload.get("namespace") name = payload.get("name") # Map a few common kinds mapping = { "pods": ("/api/v1", "pods"), "pod": ("/api/v1", "pods"), "namespaces": ("/api/v1", "namespaces"), "nodes": ("/api/v1", "nodes"), "services": ("/api/v1", "services"), "events": ("/api/v1", "events"), "deployments": ("/apis/apps/v1", "deployments"), "daemonsets": ("/apis/apps/v1", "daemonsets"), "statefulsets": ("/apis/apps/v1", "statefulsets"), "replicasets": ("/apis/apps/v1", "replicasets"), } if kind not in mapping: return {"error":"unsupported kind", "kind": kind} base, res = mapping[kind] url = f"{K8S}{base}" if ns: url += f"/namespaces/{ns}/{res}" else: url += f"/{res}" if name: url += f"/{name}" async with httpx.AsyncClient(**_ssl_params(), headers=_sa_headers()) as c: r = await c.get(url, timeout=30.0) return r.json() @app.post("/tools/k8s.events") async def k8s_events(payload: Dict[str, Any] = Body(...)): ns = payload.get("namespace") url = f"{K8S}/api/v1" if ns: url += f"/namespaces/{ns}/events" else: url += "/events" async with httpx.AsyncClient(**_ssl_params(), headers=_sa_headers()) as c: r = await c.get(url, timeout=30.0) return r.json() @app.post("/tools/inventory.snapshot") async def inventory_snapshot(): # Minimal cluster inventory async with httpx.AsyncClient(**_ssl_params(), headers=_sa_headers()) as c: nodes = (await c.get(f"{K8S}/api/v1/nodes", timeout=30.0)).json() ns = (await c.get(f"{K8S}/api/v1/namespaces", timeout=30.0)).json() dpls = (await c.get(f"{K8S}/apis/apps/v1/deployments", timeout=30.0)).json() ds = (await c.get(f"{K8S}/apis/apps/v1/daemonsets", timeout=30.0)).json() sts = (await c.get(f"{K8S}/apis/apps/v1/statefulsets", timeout=30.0)).json() return {"nodes": nodes, "namespaces": ns, "deployments": dpls, "daemonsets": ds, "statefulsets": sts}