feat(diagnostics): end-to-end tunnel check, stale state detection, MCP extensions

- diagnostics.py: TunnelCheckResult with SSH process liveness, port
  probe, and optional API health check; check_tunnel / check_all_tunnels
- cli.py: bridge status shows LIVE column and [STALE] marker when state
  says connected but PID is dead; bridge check wired to diagnostics
- state.py: read_raw_pid helper; _pid_alive exported for reuse
- capabilities.py: capabilities registry stubs
- mcp_server/server.py: expose check_tunnel and tunnel capabilities
  over MCP
- SCOPE.md: rapid orientation document
- workplans/OPS-WP-0001-diagnostics.md: workplan backing this feature
- tests: 207 passing (test_cli, test_mcp, test_diagnostics)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-21 15:07:47 +01:00
parent bebd542a2e
commit a55c685f89
10 changed files with 773 additions and 8 deletions

View File

@@ -68,6 +68,11 @@ CAPABILITIES: list[Capability] = [
description="Show bridge metadata",
required_access_modes=frozenset({"cli", "mcp"}),
),
Capability(
name="bridge_check",
description="End-to-end tunnel diagnostics via SSH: SSH PID alive + remote port listening",
required_access_modes=frozenset({"cli", "mcp"}),
),
]
CAPABILITIES_BY_NAME: dict[str, Capability] = {c.name: c for c in CAPABILITIES}

View File

@@ -1,6 +1,7 @@
"""CLI for OpsBridge — bridge command."""
from __future__ import annotations
import dataclasses
import json
import os
from pathlib import Path
@@ -10,8 +11,9 @@ import typer
from bridge.audit import AuditLogger
from bridge.config import ConfigError, load_config
from bridge.diagnostics import check_all_tunnels, check_tunnel
from bridge.manager import TunnelManager
from bridge.state import StateManager
from bridge.state import StateManager, _pid_alive
app = typer.Typer(
name="bridge",
@@ -175,13 +177,20 @@ def status(
rows = []
for name, tcfg in cfg.tunnels.items():
state = state_mgr.read_state(name)
pid = state_mgr.read_pid(name)
raw_pid = state_mgr.read_raw_pid(name)
pid_alive_val = _pid_alive(raw_pid) if raw_pid is not None else None
stale = (
state.value in ("connected", "degraded")
and pid_alive_val is not True
)
rows.append({
"tunnel": name,
"state": state.value,
"actor": tcfg.actor,
"host": tcfg.host,
"pid": pid,
"pid": raw_pid,
"pid_alive": pid_alive_val,
"stale": stale,
"uptime": None,
"health": None,
})
@@ -196,10 +205,29 @@ def _print_status_table(rows):
if not rows:
typer.echo("No tunnels configured.")
return
headers = ["TUNNEL", "STATE", "ACTOR", "HOST", "PID"]
def _state_display(row):
s = row["state"]
if row.get("stale"):
s += " [STALE]"
return s
def _live_display(row):
alive = row.get("pid_alive")
if alive is True:
return "yes"
elif alive is False:
return "no"
return "\u2014"
headers = ["TUNNEL", "STATE", "ACTOR", "HOST", "PID", "LIVE"]
col_widths = [
max(len(h), max((len(str(r.get(h.lower(), "") or "")) for r in rows), default=0))
for h in headers
max(len("TUNNEL"), max((len(row["tunnel"]) for row in rows), default=0)),
max(len("STATE"), max((len(_state_display(row)) for row in rows), default=0)),
max(len("ACTOR"), max((len(str(row.get("actor", "") or "")) for row in rows), default=0)),
max(len("HOST"), max((len(str(row.get("host", "") or "")) for row in rows), default=0)),
max(len("PID"), max((len(str(row["pid"] or "")) for row in rows), default=0)),
max(len("LIVE"), max((len(_live_display(row)) for row in rows), default=0)),
]
def _fmt_row(vals):
@@ -210,10 +238,11 @@ def _print_status_table(rows):
for row in rows:
typer.echo(_fmt_row([
row["tunnel"],
row["state"],
_state_display(row),
row["actor"],
row["host"],
str(row["pid"] or ""),
_live_display(row),
]))
@@ -272,6 +301,62 @@ def logs(
pass
@app.command()
def check(
tunnel: Optional[str] = typer.Argument(None, help="Tunnel name (omit for all inline)"),
as_json: bool = typer.Option(False, "--json", help="Output as JSON"),
):
"""End-to-end diagnostics: verify SSH PID alive and remote port listening."""
cfg = _load_or_exit()
sd = _state_dir()
state_mgr = StateManager(state_dir=sd)
if tunnel:
results = [check_tunnel(_resolve_tunnel(cfg, tunnel), state_mgr)]
else:
results = check_all_tunnels(cfg, state_mgr)
if as_json:
typer.echo(json.dumps(
[{**dataclasses.asdict(r), "ok": r.ok} for r in results],
indent=2,
))
else:
_print_check_table(results)
if any(not r.ok for r in results):
raise typer.Exit(1)
def _print_check_table(results):
if not results:
typer.echo("No tunnels configured.")
return
headers = ["TUNNEL", "SSH", "PID", "PORT", "API", "OK"]
rows_data = []
for r in results:
rows_data.append([
r.tunnel,
r.ssh_process,
str(r.pid or ""),
r.remote_port,
r.local_api or "\u2014",
"yes" if r.ok else "no",
])
col_widths = [
max(len(h), max((len(row[i]) for row in rows_data), default=0))
for i, h in enumerate(headers)
]
def _fmt(vals):
return " ".join(str(v).ljust(w) for v, w in zip(vals, col_widths))
typer.echo(_fmt(headers))
typer.echo(_fmt(["-" * w for w in col_widths]))
for row in rows_data:
typer.echo(_fmt(row))
# ─── targets commands ─────────────────────────────────────────────────────────
@targets_app.callback(invoke_without_command=True)

110
src/bridge/diagnostics.py Normal file
View File

@@ -0,0 +1,110 @@
"""End-to-end tunnel diagnostics for OpsBridge."""
from __future__ import annotations
import subprocess
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
import httpx
from bridge.models import BridgeState, TunnelConfig
from bridge.state import StateManager, _pid_alive
@dataclass
class TunnelCheckResult:
tunnel: str
ssh_process: str # "ok" | "dead" | "no_pid"
pid: Optional[int]
remote_port: str # "listening" | "closed" | "error:<msg>"
local_api: Optional[str] # "ok" | "error:<msg>" | None
latency_ms: Optional[float]
stale_state: bool # state file says connected but process is dead
@property
def ok(self) -> bool:
return self.ssh_process == "ok" and self.remote_port == "listening"
def check_tunnel(cfg: TunnelConfig, state_mgr: StateManager) -> TunnelCheckResult:
"""Run end-to-end diagnostics for a single tunnel.
Checks SSH PID liveness, remote port listening via SSH probe, and optional
local API health check. Returns a TunnelCheckResult with all findings.
"""
name = cfg.name
# 1. PID liveness
pid = state_mgr.read_raw_pid(name)
if pid is None:
ssh_process = "no_pid"
elif _pid_alive(pid):
ssh_process = "ok"
else:
ssh_process = "dead"
# 2. Stale state: state file says connected/degraded but process is dead
state = state_mgr.read_state(name)
stale_state = (
state in (BridgeState.CONNECTED, BridgeState.DEGRADED)
and ssh_process != "ok"
)
# 3. SSH probe for remote port
key_path = str(Path(cfg.ssh_key).expanduser())
cmd = [
"ssh",
"-i", key_path,
"-o", "BatchMode=yes",
"-o", "ConnectTimeout=5",
"-o", "StrictHostKeyChecking=accept-new",
f"{cfg.ssh_user}@{cfg.host}",
f"ss -tnlp 2>/dev/null | grep -q ':{cfg.remote_port} ' && echo ok || echo closed",
]
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=10,
)
output = proc.stdout.strip()
if output == "ok":
remote_port = "listening"
elif output == "closed":
remote_port = "closed"
else:
remote_port = f"error:{proc.stderr.strip() or 'unknown'}"
except subprocess.TimeoutExpired:
remote_port = "error:timeout"
except Exception as e:
remote_port = f"error:{e}"
# 4. Local API health check (optional)
local_api: Optional[str] = None
latency_ms: Optional[float] = None
if cfg.health_check is not None:
try:
t0 = time.monotonic()
resp = httpx.get(cfg.health_check.url, timeout=cfg.health_check.timeout_seconds)
latency_ms = (time.monotonic() - t0) * 1000
local_api = "ok" if resp.is_success else f"error:http_{resp.status_code}"
except Exception as e:
local_api = f"error:{e}"
return TunnelCheckResult(
tunnel=name,
ssh_process=ssh_process,
pid=pid,
remote_port=remote_port,
local_api=local_api,
latency_ms=latency_ms,
stale_state=stale_state,
)
def check_all_tunnels(cfg, state_mgr: StateManager) -> list[TunnelCheckResult]:
"""Run diagnostics for all configured inline tunnels."""
return [check_tunnel(tcfg, state_mgr) for tcfg in cfg.tunnels.values()]

View File

@@ -8,6 +8,7 @@ All tool functions return JSON-serialisable dicts/lists.
"""
from __future__ import annotations
import dataclasses
import json
import os
from pathlib import Path
@@ -15,6 +16,9 @@ from typing import Optional
from fastmcp import FastMCP
from bridge.diagnostics import check_all_tunnels, check_tunnel
from bridge.state import StateManager
mcp = FastMCP(
name="ops-bridge",
instructions=(
@@ -218,7 +222,6 @@ def bridge_status() -> list[dict]:
if err:
return [err]
from bridge.state import StateManager
sd = _state_dir()
state_mgr = StateManager(state_dir=sd)
@@ -432,6 +435,48 @@ def catalog_show_bridge(bridge_id: str) -> dict:
return result
# ---------------------------------------------------------------------------
# Diagnostics tool
# ---------------------------------------------------------------------------
@mcp.tool()
def bridge_check(tunnel: Optional[str] = None) -> list[dict]:
"""End-to-end diagnostics: SSH process alive + remote port listening.
Args:
tunnel: Specific tunnel name, or None for all inline tunnels.
Returns:
List of dicts with keys: tunnel, ssh_process, pid, remote_port,
local_api, latency_ms, stale_state, ok.
Returns [{"error": "..."}] on config load failure.
"""
cfg, err = _load_cfg_or_error()
if err:
return [err]
sd = _state_dir()
state_mgr = StateManager(state_dir=sd)
if tunnel:
from bridge.catalog.loader import load_catalog
from bridge.catalog.resolver import BridgeNotFound, resolve
catalog = None
if cfg.catalog_path is not None:
try:
catalog = load_catalog(cfg.catalog_path)
except Exception:
pass
try:
tcfg = resolve(tunnel, catalog=catalog, inline_tunnels=cfg.tunnels)
except BridgeNotFound:
return [{"error": f"Tunnel '{tunnel}' not found in config or catalog"}]
results = [check_tunnel(tcfg, state_mgr)]
else:
results = check_all_tunnels(cfg, state_mgr)
return [{**dataclasses.asdict(r), "ok": r.ok} for r in results]
# ---------------------------------------------------------------------------
# MCP resources
# ---------------------------------------------------------------------------
@@ -443,6 +488,12 @@ def resource_bridge_status() -> str:
return json.dumps(rows, indent=2)
@mcp.resource("bridge://check")
def resource_bridge_check() -> str:
"""Live end-to-end diagnostic snapshot for all tunnels."""
return json.dumps(bridge_check(), indent=2)
@mcp.resource("catalog://domains")
def resource_catalog_domains() -> str:
"""List of all catalog domains as JSON."""

View File

@@ -51,6 +51,16 @@ class StateManager:
return pid
return None
def read_raw_pid(self, name: str) -> Optional[int]:
"""Read PID from file without liveness check. Returns None if file absent/invalid."""
path = self._pid_path(name)
if not path.exists():
return None
try:
return int(path.read_text().strip())
except (ValueError, OSError):
return None
def write_pid(self, name: str, pid: int) -> None:
self._ensure_dir()
self._pid_path(name).write_text(str(pid))