From 4e9882909f2c6b336e8cb967ce864639ac05866f Mon Sep 17 00:00:00 2001 From: tegwick Date: Fri, 19 Jun 2026 15:59:27 +0200 Subject: [PATCH] feat(maintenance): nightly stale SSH forward cleanup at 03:00 Add bridge maintenance cleanup to detect reverse tunnels whose remote port is bound but no longer forwards (zombie sshd sessions), kill the stale listeners on the remote host, and optionally restart the tunnel. Includes install-cron/uninstall-cron/show-cron helpers and README notes for the actcore-state-hub-bridge failure mode we hit on railiance01. --- Makefile | 8 +- README.txt | 25 ++++ src/bridge/cleanup.py | 308 ++++++++++++++++++++++++++++++++++++++++++ src/bridge/cli.py | 93 +++++++++++++ tests/test_cleanup.py | 132 ++++++++++++++++++ 5 files changed, 565 insertions(+), 1 deletion(-) create mode 100644 src/bridge/cleanup.py create mode 100644 tests/test_cleanup.py diff --git a/Makefile b/Makefile index 1ea4c88..6c8a3a8 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ .DEFAULT_GOAL := help -.PHONY: help setup test lint install mcp-http mcp-stop +.PHONY: help setup test lint install mcp-http mcp-stop cron-install-cron cron-uninstall-cron help: ## List available make targets @awk 'BEGIN {FS = ":.*## "}; /^[a-zA-Z0-9_.-]+:.*## / {printf " %-16s %s\n", $$1, $$2}' $(MAKEFILE_LIST) @@ -23,3 +23,9 @@ mcp-http: ## Start MCP server in SSE mode (default port 8002) mcp-stop: ## Stop MCP server running on port 8002 @lsof -ti:$${BRIDGE_MCP_PORT:-8002} | xargs -r kill -TERM && echo "MCP server stopped" || echo "No MCP server running on port $${BRIDGE_MCP_PORT:-8002}" + +cron-install-cron: ## Install 03:00 nightly stale-forward cleanup cron + bridge maintenance install-cron + +cron-uninstall-cron: ## Remove nightly stale-forward cleanup cron + bridge maintenance uninstall-cron diff --git a/README.txt b/README.txt index 5fc5f6d..b588fe9 100644 --- a/README.txt +++ b/README.txt @@ -243,6 +243,31 @@ has not yet cleaned up the socket), so the next reconnect attempt hits "remote port forwarding failed" and exits with code 255. With ClientAlive enabled, sshd evicts stale sessions within ~90 seconds and frees the port. +NIGHTLY STALE-FORWARD CLEANUP +------------------------------ + +When a bridge client dies without tearing down its SSH session, the remote +host can keep port 18000 (etc.) bound to a zombie sshd listener. The port +accepts connections but never forwards them, which breaks in-cluster proxies +such as actcore-state-hub-bridge on railiance01. + +Install a 03:00 local-time cron job that probes each reverse tunnel's remote +forward, kills stale listeners when the local service is healthy but the +remote forward is not, and restarts the tunnel: + + bridge maintenance install-cron + +Manual run: + + bridge maintenance cleanup --restart + +Inspect or remove the cron entry: + + bridge maintenance show-cron + bridge maintenance uninstall-cron + +Logs append to ~/.local/state/bridge/cleanup.log + Apply and reload (no disconnect): sudo sed -i 's/#ClientAliveInterval 0/ClientAliveInterval 30/' /etc/ssh/sshd_config diff --git a/src/bridge/cleanup.py b/src/bridge/cleanup.py new file mode 100644 index 0000000..56150c1 --- /dev/null +++ b/src/bridge/cleanup.py @@ -0,0 +1,308 @@ +"""Nightly maintenance: detect and clear stale SSH remote port forwards.""" +from __future__ import annotations + +import subprocess +import time +from dataclasses import dataclass +from typing import Optional +from urllib.parse import urlparse, urlunparse + +import httpx + +from bridge.diagnostics import _remote_port_probe_command, check_tunnel +from bridge.manager import TunnelManager +from bridge.models import TunnelConfig +from bridge.state import StateManager + + +@dataclass +class CleanupAction: + tunnel: str + action: str # skipped | healthy | cleaned | cleaned_and_restarted | error + detail: str = "" + + +@dataclass +class CleanupReport: + actions: list[CleanupAction] + + @property + def cleaned_count(self) -> int: + return sum(1 for a in self.actions if a.action.startswith("cleaned")) + + +def remote_forward_health_url(cfg: TunnelConfig) -> Optional[str]: + """Map the local health_check URL to the remote forwarded port.""" + if cfg.health_check is None or cfg.direction == "local": + return None + parsed = urlparse(cfg.health_check.url) + if not parsed.hostname: + return None + netloc = f"{parsed.hostname}:{cfg.remote_port}" + return urlunparse(parsed._replace(netloc=netloc)) + + +def _ssh_base_cmd(cfg: TunnelConfig) -> list[str]: + from pathlib import Path + + return [ + "ssh", + "-i", + str(Path(cfg.ssh_key).expanduser()), + "-o", + "BatchMode=yes", + "-o", + "ConnectTimeout=10", + "-o", + "StrictHostKeyChecking=accept-new", + f"{cfg.ssh_user}@{cfg.host}", + ] + + +def _run_ssh(cfg: TunnelConfig, remote_command: str, *, timeout: float = 30) -> subprocess.CompletedProcess[str]: + return subprocess.run( + [*_ssh_base_cmd(cfg), remote_command], + capture_output=True, + text=True, + timeout=timeout, + ) + + +def remote_port_listening(cfg: TunnelConfig) -> bool: + proc = _run_ssh(cfg, _remote_port_probe_command(cfg.remote_port), timeout=15) + return proc.stdout.strip() == "ok" + + +def probe_remote_forward(cfg: TunnelConfig) -> tuple[bool, str]: + """Return (healthy, detail) for the remote forwarded service.""" + url = remote_forward_health_url(cfg) + if url is None: + return True, "no remote health url configured" + timeout = cfg.health_check.timeout_seconds if cfg.health_check else 5 + remote_cmd = ( + f"curl -sf --max-time {timeout} {url!r} >/dev/null " + "&& echo ok || echo fail" + ) + try: + proc = _run_ssh(cfg, remote_cmd, timeout=timeout + 15) + except subprocess.TimeoutExpired: + return False, "remote health probe timed out" + output = proc.stdout.strip() + if output == "ok": + return True, "remote forward healthy" + if proc.returncode != 0 and proc.stderr.strip(): + return False, proc.stderr.strip() + return False, "remote forward unhealthy" + + +def local_service_healthy(cfg: TunnelConfig) -> Optional[bool]: + if cfg.health_check is None: + return None + try: + resp = httpx.get( + cfg.health_check.url, + timeout=cfg.health_check.timeout_seconds, + ) + return resp.is_success + except Exception: + return False + + +def _remote_cleanup_script(port: int) -> str: + return f"""set -eu +port={port} +pids="" +if command -v lsof >/dev/null 2>&1; then + pids=$(sudo -n lsof -t -iTCP:$port -sTCP:LISTEN 2>/dev/null || true) + if [ -z "$pids" ]; then + pids=$(lsof -t -iTCP:$port -sTCP:LISTEN 2>/dev/null || true) + fi +fi +if [ -z "$pids" ] && command -v fuser >/dev/null 2>&1; then + pids=$(fuser -n tcp $port 2>/dev/null | tr -s ' ' '\\n' | grep -E '^[0-9]+$' || true) +fi +if [ -z "$pids" ]; then + echo "no_listeners" + exit 0 +fi +echo "killing:$pids" +for pid in $pids; do + kill "$pid" 2>/dev/null || sudo -n kill "$pid" 2>/dev/null || true +done +sleep 1 +if ss -tln 2>/dev/null | grep -q ":$port "; then + echo "still_listening" +else + echo "cleared" +fi +""" + + +def clear_stale_remote_binding(cfg: TunnelConfig) -> tuple[bool, str]: + try: + proc = _run_ssh(cfg, _remote_cleanup_script(cfg.remote_port), timeout=30) + except subprocess.TimeoutExpired: + return False, "remote cleanup timed out" + output = proc.stdout.strip() + if "cleared" in output: + return True, output + if "no_listeners" in output: + return True, "no listeners found" + if "still_listening" in output: + return False, output + detail = output or proc.stderr.strip() or f"exit {proc.returncode}" + return False, detail + + +def should_cleanup_tunnel( + cfg: TunnelConfig, + state_mgr: StateManager, +) -> tuple[bool, str]: + """Decide whether a reverse tunnel's remote binding looks stale.""" + if cfg.direction == "local": + return False, "local tunnel" + + if not remote_port_listening(cfg): + return False, "remote port closed" + + remote_ok, remote_detail = probe_remote_forward(cfg) + if remote_ok: + return False, remote_detail + + check = check_tunnel(cfg, state_mgr) + local_ok = local_service_healthy(cfg) + + if local_ok is True and not remote_ok: + return True, f"stale forward: {remote_detail}" + + if check.ssh_process != "ok" and check.remote_port == "listening": + return True, f"orphan forward while ssh {check.ssh_process}: {remote_detail}" + + if check.ssh_process == "ok" and not remote_ok: + return True, f"broken forward with live client: {remote_detail}" + + return False, remote_detail + + +def cleanup_tunnel( + cfg: TunnelConfig, + state_mgr: StateManager, + *, + restart: bool, +) -> CleanupAction: + name = cfg.name + try: + needed, reason = should_cleanup_tunnel(cfg, state_mgr) + if not needed: + return CleanupAction(name, "healthy", reason) + + ok, detail = clear_stale_remote_binding(cfg) + if not ok: + return CleanupAction(name, "error", f"cleanup failed: {detail}") + + if not restart: + return CleanupAction(name, "cleaned", f"{reason}; {detail}") + + mgr = TunnelManager(cfg, state_dir=state_mgr._dir) + was_running = mgr.is_running() + if was_running: + mgr.stop() + mgr.start() + action = "cleaned_and_restarted" + verb = "restarted" if was_running else "started" + return CleanupAction(name, action, f"{reason}; {verb} tunnel; {detail}") + except Exception as exc: + return CleanupAction(name, "error", str(exc)) + + +def cleanup_all_tunnels( + cfg, + state_mgr: StateManager, + *, + restart: bool, + tunnel_name: Optional[str] = None, +) -> CleanupReport: + tunnels = cfg.tunnels.values() + if tunnel_name is not None: + if tunnel_name not in cfg.tunnels: + raise KeyError(tunnel_name) + tunnels = [cfg.tunnels[tunnel_name]] + + actions = [ + cleanup_tunnel(tcfg, state_mgr, restart=restart) + for tcfg in tunnels + if tcfg.direction != "local" + ] + return CleanupReport(actions=actions) + + +CRON_MARKER = "# ops-bridge: maintenance cleanup" +CRON_SCHEDULE = "0 3 * * *" +CRON_LOG = "~/.local/state/bridge/cleanup.log" + + +def build_cron_line() -> str: + bridge_bin = "~/.local/bin/bridge" + return ( + f"{CRON_SCHEDULE} BRIDGE_CONFIG=~/.config/bridge/tunnels.yaml " + f"{bridge_bin} maintenance cleanup --restart " + f">> {CRON_LOG} 2>&1 {CRON_MARKER}" + ) + + +def read_installed_cron() -> Optional[str]: + proc = subprocess.run(["crontab", "-l"], capture_output=True, text=True) + if proc.returncode != 0: + return None + for line in proc.stdout.splitlines(): + if CRON_MARKER in line: + return line.strip() + return None + + +def install_cleanup_cron() -> tuple[bool, str]: + existing = read_installed_cron() + if existing: + return False, f"cron already installed: {existing}" + + proc = subprocess.run(["crontab", "-l"], capture_output=True, text=True) + current = proc.stdout if proc.returncode == 0 else "" + new_line = build_cron_line() + body = current.rstrip("\n") + if body: + body += "\n" + body += new_line + "\n" + write = subprocess.run( + ["crontab", "-"], + input=body, + capture_output=True, + text=True, + ) + if write.returncode != 0: + return False, write.stderr.strip() or "crontab write failed" + return True, new_line + + +def uninstall_cleanup_cron() -> tuple[bool, str]: + proc = subprocess.run(["crontab", "-l"], capture_output=True, text=True) + if proc.returncode != 0: + return False, "no crontab installed" + kept = [ + line + for line in proc.stdout.splitlines() + if CRON_MARKER not in line + ] + if len(kept) == len(proc.stdout.splitlines()): + return False, "cleanup cron not found" + body = "\n".join(kept).rstrip("\n") + if body: + body += "\n" + write = subprocess.run( + ["crontab", "-"], + input=body, + capture_output=True, + text=True, + ) + if write.returncode != 0: + return False, write.stderr.strip() or "crontab write failed" + return True, "removed cleanup cron entry" \ No newline at end of file diff --git a/src/bridge/cli.py b/src/bridge/cli.py index 6807b33..2e89154 100644 --- a/src/bridge/cli.py +++ b/src/bridge/cli.py @@ -12,6 +12,13 @@ from typing import Optional import typer from bridge.audit import AuditLogger +from bridge.cleanup import ( + build_cron_line, + cleanup_all_tunnels, + install_cleanup_cron, + read_installed_cron, + uninstall_cleanup_cron, +) from bridge.config import ConfigError, load_config from bridge.diagnostics import check_all_tunnels, check_tunnel from bridge.manager import TunnelManager @@ -25,9 +32,11 @@ app = typer.Typer( targets_app = typer.Typer(help="Inspect infrastructure targets from the OpsCatalog.") catalog_app = typer.Typer(help="Inspect and validate the OpsCatalog.") +maintenance_app = typer.Typer(help="Scheduled maintenance for tunnel hygiene.") app.add_typer(targets_app, name="targets") app.add_typer(catalog_app, name="catalog") +app.add_typer(maintenance_app, name="maintenance") def _state_dir() -> Path: @@ -661,6 +670,90 @@ Full specification: """ +@maintenance_app.command("cleanup") +def maintenance_cleanup( + tunnel: Optional[str] = typer.Argument( + None, + help="Tunnel name (omit for all reverse tunnels)", + ), + restart: bool = typer.Option( + False, + "--restart", + help="Restart tunnels after clearing stale remote bindings", + ), + as_json: bool = typer.Option(False, "--json", help="Output as JSON"), +): + """Clear stale SSH remote port forwards that block tunnel reconnects.""" + cfg = _load_or_exit() + sd = _state_dir() + state_mgr = StateManager(state_dir=sd) + + try: + report = cleanup_all_tunnels( + cfg, + state_mgr, + restart=restart, + tunnel_name=tunnel, + ) + except KeyError: + typer.echo(f"Error: tunnel '{tunnel}' not found in config", err=True) + raise typer.Exit(1) + + if as_json: + payload = { + "cleaned_count": report.cleaned_count, + "actions": [ + {"tunnel": a.tunnel, "action": a.action, "detail": a.detail} + for a in report.actions + ], + } + typer.echo(json.dumps(payload, indent=2)) + return + + if not report.actions: + typer.echo("No reverse tunnels configured.") + return + + for action in report.actions: + typer.echo(f"{action.tunnel}: {action.action} — {action.detail}") + typer.echo(f"done ({report.cleaned_count} cleaned)") + + +@maintenance_app.command("install-cron") +def maintenance_install_cron(): + """Install a 03:00 daily cron job for `bridge maintenance cleanup --restart`.""" + installed, message = install_cleanup_cron() + if installed: + typer.echo("Installed nightly cleanup cron:") + typer.echo(f" {message}") + else: + typer.echo(message) + raise typer.Exit(2) + + +@maintenance_app.command("uninstall-cron") +def maintenance_uninstall_cron(): + """Remove the nightly cleanup cron job.""" + removed, message = uninstall_cleanup_cron() + if removed: + typer.echo(message) + else: + typer.echo(message) + raise typer.Exit(2) + + +@maintenance_app.command("show-cron") +def maintenance_show_cron(): + """Show the configured nightly cleanup cron line.""" + existing = read_installed_cron() + if existing: + typer.echo(existing) + else: + typer.echo("Nightly cleanup cron is not installed.") + typer.echo("Would install:") + typer.echo(f" {build_cron_line()}") + + @app.command() def conventions(): """Show the actor naming conventions enforced by tunnels.yaml.""" diff --git a/tests/test_cleanup.py b/tests/test_cleanup.py new file mode 100644 index 0000000..4ad4198 --- /dev/null +++ b/tests/test_cleanup.py @@ -0,0 +1,132 @@ +"""Tests for stale SSH forward cleanup.""" +from __future__ import annotations + +import textwrap +from unittest.mock import MagicMock, patch + +import pytest +from typer.testing import CliRunner + +from bridge.cleanup import ( + CleanupAction, + CleanupReport, + build_cron_line, + cleanup_all_tunnels, + remote_forward_health_url, + should_cleanup_tunnel, +) +from bridge.cli import app +from bridge.config import load_config +from bridge.models import HealthCheckConfig, TunnelConfig +from bridge.state import StateManager + + +def _tunnel(**overrides) -> TunnelConfig: + base = dict( + name="state-hub-railiance01", + host="92.205.62.239", + remote_port=18000, + local_port=8000, + ssh_user="tegwick", + ssh_key="~/.ssh/id_ops", + actor="agt-claude-railiance01", + health_check=HealthCheckConfig( + url="http://127.0.0.1:8000/state/health", + timeout_seconds=5, + ), + ) + base.update(overrides) + return TunnelConfig(**base) + + +class TestRemoteForwardHealthUrl: + def test_maps_local_port_to_remote(self): + cfg = _tunnel() + assert remote_forward_health_url(cfg) == "http://127.0.0.1:18000/state/health" + + def test_returns_none_for_local_tunnel(self): + cfg = _tunnel(direction="local") + assert remote_forward_health_url(cfg) is None + + +class TestShouldCleanupTunnel: + def test_skips_healthy_remote_forward(self, tmp_path): + cfg = _tunnel() + state_mgr = StateManager(state_dir=tmp_path) + with ( + patch("bridge.cleanup.remote_port_listening", return_value=True), + patch("bridge.cleanup.probe_remote_forward", return_value=(True, "ok")), + ): + needed, reason = should_cleanup_tunnel(cfg, state_mgr) + assert needed is False + + def test_detects_stale_forward_when_local_ok_remote_fails(self, tmp_path): + cfg = _tunnel() + state_mgr = StateManager(state_dir=tmp_path) + with ( + patch("bridge.cleanup.remote_port_listening", return_value=True), + patch("bridge.cleanup.probe_remote_forward", return_value=(False, "timeout")), + patch("bridge.cleanup.local_service_healthy", return_value=True), + patch( + "bridge.cleanup.check_tunnel", + return_value=MagicMock(ssh_process="ok", remote_port="listening"), + ), + ): + needed, reason = should_cleanup_tunnel(cfg, state_mgr) + assert needed is True + assert "stale forward" in reason + + +class TestCleanupAllTunnels: + def test_reports_cleaned_tunnel(self, tmp_path, monkeypatch): + monkeypatch.setenv("BRIDGE_CONFIG", str(tmp_path / "tunnels.yaml")) + (tmp_path / "tunnels.yaml").write_text( + textwrap.dedent( + """\ + tunnels: + state-hub-railiance01: + host: 92.205.62.239 + remote_port: 18000 + local_port: 8000 + ssh_user: tegwick + ssh_key: ~/.ssh/id_ops + actor: agt-claude-railiance01 + health_check: + url: http://127.0.0.1:8000/state/health + actors: + agt-claude-railiance01: + class: agt + """ + ) + ) + cfg = load_config() + state_mgr = StateManager(state_dir=tmp_path / "state") + with patch( + "bridge.cleanup.cleanup_tunnel", + return_value=CleanupAction("state-hub-railiance01", "cleaned", "cleared"), + ): + report = cleanup_all_tunnels(cfg, state_mgr, restart=False) + assert report.cleaned_count == 1 + assert report.actions[0].action == "cleaned" + + +class TestMaintenanceCli: + def test_cleanup_help(self): + runner = CliRunner() + result = runner.invoke(app, ["maintenance", "cleanup", "--help"]) + assert result.exit_code == 0 + assert "restart" in result.output.lower() + + def test_show_cron_prints_template_when_not_installed(self): + runner = CliRunner() + with patch("bridge.cli.read_installed_cron", return_value=None): + result = runner.invoke(app, ["maintenance", "show-cron"]) + assert result.exit_code == 0 + assert "0 3 * * *" in result.output + + +def test_build_cron_line_contains_marker(): + line = build_cron_line() + assert "0 3 * * *" in line + assert "maintenance cleanup --restart" in line + assert "ops-bridge: maintenance cleanup" in line \ No newline at end of file