"""End-to-end tunnel diagnostics for OpsBridge.""" from __future__ import annotations import socket import subprocess import time from dataclasses import dataclass from pathlib import Path from typing import Optional import httpx from bridge.models import BridgeState, TunnelConfig from bridge.state import StateManager, _pid_alive def _remote_port_probe_command(remote_port: int) -> str: """Build a portable remote shell probe for a listening TCP port.""" return ( f"port={remote_port}; " "if command -v ss >/dev/null 2>&1; then " "ss -tnlp 2>/dev/null | grep -q \":$port \" && echo ok || echo closed; " "elif command -v netstat >/dev/null 2>&1; then " "netstat -tnlp 2>/dev/null | " "grep -q \"[.:]$port[[:space:]]\" && echo ok || echo closed; " "else " "hex=$(printf '%04X' \"$port\"); " "awk -v p=\":$hex\" " "'NR > 1 && $4 == \"0A\" && index($2, p) { found = 1 } " "END { print found ? \"ok\" : \"closed\" }' " "/proc/net/tcp /proc/net/tcp6 2>/dev/null; " "fi" ) def _probe_local_port(local_port: int) -> str: """Check whether the local side of an SSH -L tunnel is accepting TCP.""" try: with socket.create_connection(("127.0.0.1", local_port), timeout=5): return "listening" except ConnectionRefusedError: return "closed" except socket.timeout: return "error:timeout" except OSError as e: return f"error:{e}" @dataclass class TunnelCheckResult: tunnel: str ssh_process: str # "ok" | "dead" | "no_pid" pid: Optional[int] remote_port: str # "listening" | "closed" | "error:" local_api: Optional[str] # "ok" | "error:" | None latency_ms: Optional[float] stale_state: bool # state file says connected but process is dead @property def ok(self) -> bool: return self.ssh_process == "ok" and self.remote_port == "listening" def check_tunnel(cfg: TunnelConfig, state_mgr: StateManager) -> TunnelCheckResult: """Run end-to-end diagnostics for a single tunnel. Checks SSH PID liveness, remote port listening via SSH probe, and optional local API health check. Returns a TunnelCheckResult with all findings. """ name = cfg.name # 1. PID liveness pid = state_mgr.read_raw_pid(name) if pid is None: ssh_process = "no_pid" elif _pid_alive(pid): ssh_process = "ok" else: ssh_process = "dead" # 2. Stale state: state file says connected/degraded but process is dead state = state_mgr.read_state(name) stale_state = ( state in (BridgeState.CONNECTED, BridgeState.DEGRADED) and ssh_process != "ok" ) # 3. Port probe: reverse tunnels listen remotely; local tunnels listen here. if cfg.direction == "local": remote_port = _probe_local_port(cfg.local_port) else: key_path = str(Path(cfg.ssh_key).expanduser()) cmd = [ "ssh", "-i", key_path, "-o", "BatchMode=yes", "-o", "ConnectTimeout=5", "-o", "StrictHostKeyChecking=accept-new", f"{cfg.ssh_user}@{cfg.host}", _remote_port_probe_command(cfg.remote_port), ] try: proc = subprocess.run( cmd, capture_output=True, text=True, timeout=10, ) output = proc.stdout.strip() if output == "ok": remote_port = "listening" elif output == "closed": remote_port = "closed" else: remote_port = f"error:{proc.stderr.strip() or 'unknown'}" except subprocess.TimeoutExpired: remote_port = "error:timeout" except Exception as e: remote_port = f"error:{e}" # 4. Local API health check (optional) local_api: Optional[str] = None latency_ms: Optional[float] = None if cfg.health_check is not None: try: t0 = time.monotonic() resp = httpx.get(cfg.health_check.url, timeout=cfg.health_check.timeout_seconds) latency_ms = (time.monotonic() - t0) * 1000 local_api = "ok" if resp.is_success else f"error:http_{resp.status_code}" except Exception as e: local_api = f"error:{e}" return TunnelCheckResult( tunnel=name, ssh_process=ssh_process, pid=pid, remote_port=remote_port, local_api=local_api, latency_ms=latency_ms, stale_state=stale_state, ) def check_all_tunnels(cfg, state_mgr: StateManager) -> list[TunnelCheckResult]: """Run diagnostics for all configured inline tunnels.""" return [check_tunnel(tcfg, state_mgr) for tcfg in cfg.tunnels.values()]