"""Nightly maintenance: detect and clear stale SSH remote port forwards.""" from __future__ import annotations import subprocess import time from dataclasses import dataclass from typing import Optional from urllib.parse import urlparse, urlunparse import httpx from bridge.diagnostics import _remote_port_probe_command, check_tunnel from bridge.manager import TunnelManager from bridge.models import TunnelConfig from bridge.state import StateManager @dataclass class CleanupAction: tunnel: str action: str # skipped | healthy | cleaned | cleaned_and_restarted | error detail: str = "" @dataclass class CleanupReport: actions: list[CleanupAction] @property def cleaned_count(self) -> int: return sum(1 for a in self.actions if a.action.startswith("cleaned")) def remote_forward_health_url(cfg: TunnelConfig) -> Optional[str]: """Map the local health_check URL to the remote forwarded port.""" if cfg.health_check is None or cfg.direction == "local": return None parsed = urlparse(cfg.health_check.url) if not parsed.hostname: return None netloc = f"{parsed.hostname}:{cfg.remote_port}" return urlunparse(parsed._replace(netloc=netloc)) def _ssh_base_cmd(cfg: TunnelConfig) -> list[str]: from pathlib import Path return [ "ssh", "-i", str(Path(cfg.ssh_key).expanduser()), "-o", "BatchMode=yes", "-o", "ConnectTimeout=10", "-o", "StrictHostKeyChecking=accept-new", f"{cfg.ssh_user}@{cfg.host}", ] def _run_ssh(cfg: TunnelConfig, remote_command: str, *, timeout: float = 30) -> subprocess.CompletedProcess[str]: return subprocess.run( [*_ssh_base_cmd(cfg), remote_command], capture_output=True, text=True, timeout=timeout, ) def remote_port_listening(cfg: TunnelConfig) -> bool: proc = _run_ssh(cfg, _remote_port_probe_command(cfg.remote_port), timeout=15) return proc.stdout.strip() == "ok" def probe_remote_forward(cfg: TunnelConfig) -> tuple[bool, str]: """Return (healthy, detail) for the remote forwarded service.""" url = remote_forward_health_url(cfg) if url is None: return True, "no remote health url configured" timeout = cfg.health_check.timeout_seconds if cfg.health_check else 5 remote_cmd = ( f"curl -sf --max-time {timeout} {url!r} >/dev/null " "&& echo ok || echo fail" ) try: proc = _run_ssh(cfg, remote_cmd, timeout=timeout + 15) except subprocess.TimeoutExpired: return False, "remote health probe timed out" output = proc.stdout.strip() if output == "ok": return True, "remote forward healthy" if proc.returncode != 0 and proc.stderr.strip(): return False, proc.stderr.strip() return False, "remote forward unhealthy" def local_service_healthy(cfg: TunnelConfig) -> Optional[bool]: if cfg.health_check is None: return None try: resp = httpx.get( cfg.health_check.url, timeout=cfg.health_check.timeout_seconds, ) return resp.is_success except Exception: return False def _remote_cleanup_script(port: int) -> str: return f"""set -eu port={port} pids="" if command -v lsof >/dev/null 2>&1; then pids=$(sudo -n lsof -t -iTCP:$port -sTCP:LISTEN 2>/dev/null || true) if [ -z "$pids" ]; then pids=$(lsof -t -iTCP:$port -sTCP:LISTEN 2>/dev/null || true) fi fi if [ -z "$pids" ] && command -v fuser >/dev/null 2>&1; then pids=$(fuser -n tcp $port 2>/dev/null | tr -s ' ' '\\n' | grep -E '^[0-9]+$' || true) fi if [ -z "$pids" ]; then echo "no_listeners" exit 0 fi echo "killing:$pids" for pid in $pids; do kill "$pid" 2>/dev/null || sudo -n kill "$pid" 2>/dev/null || true done sleep 1 if ss -tln 2>/dev/null | grep -q ":$port "; then echo "still_listening" else echo "cleared" fi """ def clear_stale_remote_binding(cfg: TunnelConfig) -> tuple[bool, str]: try: proc = _run_ssh(cfg, _remote_cleanup_script(cfg.remote_port), timeout=30) except subprocess.TimeoutExpired: return False, "remote cleanup timed out" output = proc.stdout.strip() if "cleared" in output: return True, output if "no_listeners" in output: return True, "no listeners found" if "still_listening" in output: return False, output detail = output or proc.stderr.strip() or f"exit {proc.returncode}" return False, detail def should_cleanup_tunnel( cfg: TunnelConfig, state_mgr: StateManager, ) -> tuple[bool, str]: """Decide whether a reverse tunnel's remote binding looks stale.""" if cfg.direction == "local": return False, "local tunnel" if not remote_port_listening(cfg): return False, "remote port closed" remote_ok, remote_detail = probe_remote_forward(cfg) if remote_ok: return False, remote_detail check = check_tunnel(cfg, state_mgr) local_ok = local_service_healthy(cfg) if local_ok is True and not remote_ok: return True, f"stale forward: {remote_detail}" if check.ssh_process != "ok" and check.remote_port == "listening": return True, f"orphan forward while ssh {check.ssh_process}: {remote_detail}" if check.ssh_process == "ok" and not remote_ok: return True, f"broken forward with live client: {remote_detail}" return False, remote_detail def cleanup_tunnel( cfg: TunnelConfig, state_mgr: StateManager, *, restart: bool, ) -> CleanupAction: name = cfg.name try: needed, reason = should_cleanup_tunnel(cfg, state_mgr) if not needed: return CleanupAction(name, "healthy", reason) ok, detail = clear_stale_remote_binding(cfg) if not ok: return CleanupAction(name, "error", f"cleanup failed: {detail}") if not restart: return CleanupAction(name, "cleaned", f"{reason}; {detail}") mgr = TunnelManager(cfg, state_dir=state_mgr._dir) was_running = mgr.is_running() if was_running: mgr.stop() mgr.start() action = "cleaned_and_restarted" verb = "restarted" if was_running else "started" return CleanupAction(name, action, f"{reason}; {verb} tunnel; {detail}") except Exception as exc: return CleanupAction(name, "error", str(exc)) def cleanup_all_tunnels( cfg, state_mgr: StateManager, *, restart: bool, tunnel_name: Optional[str] = None, ) -> CleanupReport: tunnels = cfg.tunnels.values() if tunnel_name is not None: if tunnel_name not in cfg.tunnels: raise KeyError(tunnel_name) tunnels = [cfg.tunnels[tunnel_name]] actions = [ cleanup_tunnel(tcfg, state_mgr, restart=restart) for tcfg in tunnels if tcfg.direction != "local" ] return CleanupReport(actions=actions) CRON_MARKER = "# ops-bridge: maintenance cleanup" CRON_SCHEDULE = "0 3 * * *" CRON_LOG = "~/.local/state/bridge/cleanup.log" def build_cron_line() -> str: bridge_bin = "~/.local/bin/bridge" return ( f"{CRON_SCHEDULE} BRIDGE_CONFIG=~/.config/bridge/tunnels.yaml " f"{bridge_bin} maintenance cleanup --restart " f">> {CRON_LOG} 2>&1 {CRON_MARKER}" ) def read_installed_cron() -> Optional[str]: proc = subprocess.run(["crontab", "-l"], capture_output=True, text=True) if proc.returncode != 0: return None for line in proc.stdout.splitlines(): if CRON_MARKER in line: return line.strip() return None def install_cleanup_cron() -> tuple[bool, str]: existing = read_installed_cron() if existing: return False, f"cron already installed: {existing}" proc = subprocess.run(["crontab", "-l"], capture_output=True, text=True) current = proc.stdout if proc.returncode == 0 else "" new_line = build_cron_line() body = current.rstrip("\n") if body: body += "\n" body += new_line + "\n" write = subprocess.run( ["crontab", "-"], input=body, capture_output=True, text=True, ) if write.returncode != 0: return False, write.stderr.strip() or "crontab write failed" return True, new_line def uninstall_cleanup_cron() -> tuple[bool, str]: proc = subprocess.run(["crontab", "-l"], capture_output=True, text=True) if proc.returncode != 0: return False, "no crontab installed" kept = [ line for line in proc.stdout.splitlines() if CRON_MARKER not in line ] if len(kept) == len(proc.stdout.splitlines()): return False, "cleanup cron not found" body = "\n".join(kept).rstrip("\n") if body: body += "\n" write = subprocess.run( ["crontab", "-"], input=body, capture_output=True, text=True, ) if write.returncode != 0: return False, write.stderr.strip() or "crontab write failed" return True, "removed cleanup cron entry"