diff --git a/Makefile b/Makefile index 1ea4c88..6c8a3a8 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ .DEFAULT_GOAL := help -.PHONY: help setup test lint install mcp-http mcp-stop +.PHONY: help setup test lint install mcp-http mcp-stop cron-install-cron cron-uninstall-cron help: ## List available make targets @awk 'BEGIN {FS = ":.*## "}; /^[a-zA-Z0-9_.-]+:.*## / {printf " %-16s %s\n", $$1, $$2}' $(MAKEFILE_LIST) @@ -23,3 +23,9 @@ mcp-http: ## Start MCP server in SSE mode (default port 8002) mcp-stop: ## Stop MCP server running on port 8002 @lsof -ti:$${BRIDGE_MCP_PORT:-8002} | xargs -r kill -TERM && echo "MCP server stopped" || echo "No MCP server running on port $${BRIDGE_MCP_PORT:-8002}" + +cron-install-cron: ## Install 03:00 nightly stale-forward cleanup cron + bridge maintenance install-cron + +cron-uninstall-cron: ## Remove nightly stale-forward cleanup cron + bridge maintenance uninstall-cron diff --git a/README.txt b/README.txt index 5fc5f6d..b588fe9 100644 --- a/README.txt +++ b/README.txt @@ -243,6 +243,31 @@ has not yet cleaned up the socket), so the next reconnect attempt hits "remote port forwarding failed" and exits with code 255. With ClientAlive enabled, sshd evicts stale sessions within ~90 seconds and frees the port. +NIGHTLY STALE-FORWARD CLEANUP +------------------------------ + +When a bridge client dies without tearing down its SSH session, the remote +host can keep port 18000 (etc.) bound to a zombie sshd listener. The port +accepts connections but never forwards them, which breaks in-cluster proxies +such as actcore-state-hub-bridge on railiance01. + +Install a 03:00 local-time cron job that probes each reverse tunnel's remote +forward, kills stale listeners when the local service is healthy but the +remote forward is not, and restarts the tunnel: + + bridge maintenance install-cron + +Manual run: + + bridge maintenance cleanup --restart + +Inspect or remove the cron entry: + + bridge maintenance show-cron + bridge maintenance uninstall-cron + +Logs append to ~/.local/state/bridge/cleanup.log + Apply and reload (no disconnect): sudo sed -i 's/#ClientAliveInterval 0/ClientAliveInterval 30/' /etc/ssh/sshd_config diff --git a/src/bridge/cleanup.py b/src/bridge/cleanup.py new file mode 100644 index 0000000..56150c1 --- /dev/null +++ b/src/bridge/cleanup.py @@ -0,0 +1,308 @@ +"""Nightly maintenance: detect and clear stale SSH remote port forwards.""" +from __future__ import annotations + +import subprocess +import time +from dataclasses import dataclass +from typing import Optional +from urllib.parse import urlparse, urlunparse + +import httpx + +from bridge.diagnostics import _remote_port_probe_command, check_tunnel +from bridge.manager import TunnelManager +from bridge.models import TunnelConfig +from bridge.state import StateManager + + +@dataclass +class CleanupAction: + tunnel: str + action: str # skipped | healthy | cleaned | cleaned_and_restarted | error + detail: str = "" + + +@dataclass +class CleanupReport: + actions: list[CleanupAction] + + @property + def cleaned_count(self) -> int: + return sum(1 for a in self.actions if a.action.startswith("cleaned")) + + +def remote_forward_health_url(cfg: TunnelConfig) -> Optional[str]: + """Map the local health_check URL to the remote forwarded port.""" + if cfg.health_check is None or cfg.direction == "local": + return None + parsed = urlparse(cfg.health_check.url) + if not parsed.hostname: + return None + netloc = f"{parsed.hostname}:{cfg.remote_port}" + return urlunparse(parsed._replace(netloc=netloc)) + + +def _ssh_base_cmd(cfg: TunnelConfig) -> list[str]: + from pathlib import Path + + return [ + "ssh", + "-i", + str(Path(cfg.ssh_key).expanduser()), + "-o", + "BatchMode=yes", + "-o", + "ConnectTimeout=10", + "-o", + "StrictHostKeyChecking=accept-new", + f"{cfg.ssh_user}@{cfg.host}", + ] + + +def _run_ssh(cfg: TunnelConfig, remote_command: str, *, timeout: float = 30) -> subprocess.CompletedProcess[str]: + return subprocess.run( + [*_ssh_base_cmd(cfg), remote_command], + capture_output=True, + text=True, + timeout=timeout, + ) + + +def remote_port_listening(cfg: TunnelConfig) -> bool: + proc = _run_ssh(cfg, _remote_port_probe_command(cfg.remote_port), timeout=15) + return proc.stdout.strip() == "ok" + + +def probe_remote_forward(cfg: TunnelConfig) -> tuple[bool, str]: + """Return (healthy, detail) for the remote forwarded service.""" + url = remote_forward_health_url(cfg) + if url is None: + return True, "no remote health url configured" + timeout = cfg.health_check.timeout_seconds if cfg.health_check else 5 + remote_cmd = ( + f"curl -sf --max-time {timeout} {url!r} >/dev/null " + "&& echo ok || echo fail" + ) + try: + proc = _run_ssh(cfg, remote_cmd, timeout=timeout + 15) + except subprocess.TimeoutExpired: + return False, "remote health probe timed out" + output = proc.stdout.strip() + if output == "ok": + return True, "remote forward healthy" + if proc.returncode != 0 and proc.stderr.strip(): + return False, proc.stderr.strip() + return False, "remote forward unhealthy" + + +def local_service_healthy(cfg: TunnelConfig) -> Optional[bool]: + if cfg.health_check is None: + return None + try: + resp = httpx.get( + cfg.health_check.url, + timeout=cfg.health_check.timeout_seconds, + ) + return resp.is_success + except Exception: + return False + + +def _remote_cleanup_script(port: int) -> str: + return f"""set -eu +port={port} +pids="" +if command -v lsof >/dev/null 2>&1; then + pids=$(sudo -n lsof -t -iTCP:$port -sTCP:LISTEN 2>/dev/null || true) + if [ -z "$pids" ]; then + pids=$(lsof -t -iTCP:$port -sTCP:LISTEN 2>/dev/null || true) + fi +fi +if [ -z "$pids" ] && command -v fuser >/dev/null 2>&1; then + pids=$(fuser -n tcp $port 2>/dev/null | tr -s ' ' '\\n' | grep -E '^[0-9]+$' || true) +fi +if [ -z "$pids" ]; then + echo "no_listeners" + exit 0 +fi +echo "killing:$pids" +for pid in $pids; do + kill "$pid" 2>/dev/null || sudo -n kill "$pid" 2>/dev/null || true +done +sleep 1 +if ss -tln 2>/dev/null | grep -q ":$port "; then + echo "still_listening" +else + echo "cleared" +fi +""" + + +def clear_stale_remote_binding(cfg: TunnelConfig) -> tuple[bool, str]: + try: + proc = _run_ssh(cfg, _remote_cleanup_script(cfg.remote_port), timeout=30) + except subprocess.TimeoutExpired: + return False, "remote cleanup timed out" + output = proc.stdout.strip() + if "cleared" in output: + return True, output + if "no_listeners" in output: + return True, "no listeners found" + if "still_listening" in output: + return False, output + detail = output or proc.stderr.strip() or f"exit {proc.returncode}" + return False, detail + + +def should_cleanup_tunnel( + cfg: TunnelConfig, + state_mgr: StateManager, +) -> tuple[bool, str]: + """Decide whether a reverse tunnel's remote binding looks stale.""" + if cfg.direction == "local": + return False, "local tunnel" + + if not remote_port_listening(cfg): + return False, "remote port closed" + + remote_ok, remote_detail = probe_remote_forward(cfg) + if remote_ok: + return False, remote_detail + + check = check_tunnel(cfg, state_mgr) + local_ok = local_service_healthy(cfg) + + if local_ok is True and not remote_ok: + return True, f"stale forward: {remote_detail}" + + if check.ssh_process != "ok" and check.remote_port == "listening": + return True, f"orphan forward while ssh {check.ssh_process}: {remote_detail}" + + if check.ssh_process == "ok" and not remote_ok: + return True, f"broken forward with live client: {remote_detail}" + + return False, remote_detail + + +def cleanup_tunnel( + cfg: TunnelConfig, + state_mgr: StateManager, + *, + restart: bool, +) -> CleanupAction: + name = cfg.name + try: + needed, reason = should_cleanup_tunnel(cfg, state_mgr) + if not needed: + return CleanupAction(name, "healthy", reason) + + ok, detail = clear_stale_remote_binding(cfg) + if not ok: + return CleanupAction(name, "error", f"cleanup failed: {detail}") + + if not restart: + return CleanupAction(name, "cleaned", f"{reason}; {detail}") + + mgr = TunnelManager(cfg, state_dir=state_mgr._dir) + was_running = mgr.is_running() + if was_running: + mgr.stop() + mgr.start() + action = "cleaned_and_restarted" + verb = "restarted" if was_running else "started" + return CleanupAction(name, action, f"{reason}; {verb} tunnel; {detail}") + except Exception as exc: + return CleanupAction(name, "error", str(exc)) + + +def cleanup_all_tunnels( + cfg, + state_mgr: StateManager, + *, + restart: bool, + tunnel_name: Optional[str] = None, +) -> CleanupReport: + tunnels = cfg.tunnels.values() + if tunnel_name is not None: + if tunnel_name not in cfg.tunnels: + raise KeyError(tunnel_name) + tunnels = [cfg.tunnels[tunnel_name]] + + actions = [ + cleanup_tunnel(tcfg, state_mgr, restart=restart) + for tcfg in tunnels + if tcfg.direction != "local" + ] + return CleanupReport(actions=actions) + + +CRON_MARKER = "# ops-bridge: maintenance cleanup" +CRON_SCHEDULE = "0 3 * * *" +CRON_LOG = "~/.local/state/bridge/cleanup.log" + + +def build_cron_line() -> str: + bridge_bin = "~/.local/bin/bridge" + return ( + f"{CRON_SCHEDULE} BRIDGE_CONFIG=~/.config/bridge/tunnels.yaml " + f"{bridge_bin} maintenance cleanup --restart " + f">> {CRON_LOG} 2>&1 {CRON_MARKER}" + ) + + +def read_installed_cron() -> Optional[str]: + proc = subprocess.run(["crontab", "-l"], capture_output=True, text=True) + if proc.returncode != 0: + return None + for line in proc.stdout.splitlines(): + if CRON_MARKER in line: + return line.strip() + return None + + +def install_cleanup_cron() -> tuple[bool, str]: + existing = read_installed_cron() + if existing: + return False, f"cron already installed: {existing}" + + proc = subprocess.run(["crontab", "-l"], capture_output=True, text=True) + current = proc.stdout if proc.returncode == 0 else "" + new_line = build_cron_line() + body = current.rstrip("\n") + if body: + body += "\n" + body += new_line + "\n" + write = subprocess.run( + ["crontab", "-"], + input=body, + capture_output=True, + text=True, + ) + if write.returncode != 0: + return False, write.stderr.strip() or "crontab write failed" + return True, new_line + + +def uninstall_cleanup_cron() -> tuple[bool, str]: + proc = subprocess.run(["crontab", "-l"], capture_output=True, text=True) + if proc.returncode != 0: + return False, "no crontab installed" + kept = [ + line + for line in proc.stdout.splitlines() + if CRON_MARKER not in line + ] + if len(kept) == len(proc.stdout.splitlines()): + return False, "cleanup cron not found" + body = "\n".join(kept).rstrip("\n") + if body: + body += "\n" + write = subprocess.run( + ["crontab", "-"], + input=body, + capture_output=True, + text=True, + ) + if write.returncode != 0: + return False, write.stderr.strip() or "crontab write failed" + return True, "removed cleanup cron entry" \ No newline at end of file diff --git a/src/bridge/cli.py b/src/bridge/cli.py index 6807b33..2e89154 100644 --- a/src/bridge/cli.py +++ b/src/bridge/cli.py @@ -12,6 +12,13 @@ from typing import Optional import typer from bridge.audit import AuditLogger +from bridge.cleanup import ( + build_cron_line, + cleanup_all_tunnels, + install_cleanup_cron, + read_installed_cron, + uninstall_cleanup_cron, +) from bridge.config import ConfigError, load_config from bridge.diagnostics import check_all_tunnels, check_tunnel from bridge.manager import TunnelManager @@ -25,9 +32,11 @@ app = typer.Typer( targets_app = typer.Typer(help="Inspect infrastructure targets from the OpsCatalog.") catalog_app = typer.Typer(help="Inspect and validate the OpsCatalog.") +maintenance_app = typer.Typer(help="Scheduled maintenance for tunnel hygiene.") app.add_typer(targets_app, name="targets") app.add_typer(catalog_app, name="catalog") +app.add_typer(maintenance_app, name="maintenance") def _state_dir() -> Path: @@ -661,6 +670,90 @@ Full specification: """ +@maintenance_app.command("cleanup") +def maintenance_cleanup( + tunnel: Optional[str] = typer.Argument( + None, + help="Tunnel name (omit for all reverse tunnels)", + ), + restart: bool = typer.Option( + False, + "--restart", + help="Restart tunnels after clearing stale remote bindings", + ), + as_json: bool = typer.Option(False, "--json", help="Output as JSON"), +): + """Clear stale SSH remote port forwards that block tunnel reconnects.""" + cfg = _load_or_exit() + sd = _state_dir() + state_mgr = StateManager(state_dir=sd) + + try: + report = cleanup_all_tunnels( + cfg, + state_mgr, + restart=restart, + tunnel_name=tunnel, + ) + except KeyError: + typer.echo(f"Error: tunnel '{tunnel}' not found in config", err=True) + raise typer.Exit(1) + + if as_json: + payload = { + "cleaned_count": report.cleaned_count, + "actions": [ + {"tunnel": a.tunnel, "action": a.action, "detail": a.detail} + for a in report.actions + ], + } + typer.echo(json.dumps(payload, indent=2)) + return + + if not report.actions: + typer.echo("No reverse tunnels configured.") + return + + for action in report.actions: + typer.echo(f"{action.tunnel}: {action.action} — {action.detail}") + typer.echo(f"done ({report.cleaned_count} cleaned)") + + +@maintenance_app.command("install-cron") +def maintenance_install_cron(): + """Install a 03:00 daily cron job for `bridge maintenance cleanup --restart`.""" + installed, message = install_cleanup_cron() + if installed: + typer.echo("Installed nightly cleanup cron:") + typer.echo(f" {message}") + else: + typer.echo(message) + raise typer.Exit(2) + + +@maintenance_app.command("uninstall-cron") +def maintenance_uninstall_cron(): + """Remove the nightly cleanup cron job.""" + removed, message = uninstall_cleanup_cron() + if removed: + typer.echo(message) + else: + typer.echo(message) + raise typer.Exit(2) + + +@maintenance_app.command("show-cron") +def maintenance_show_cron(): + """Show the configured nightly cleanup cron line.""" + existing = read_installed_cron() + if existing: + typer.echo(existing) + else: + typer.echo("Nightly cleanup cron is not installed.") + typer.echo("Would install:") + typer.echo(f" {build_cron_line()}") + + @app.command() def conventions(): """Show the actor naming conventions enforced by tunnels.yaml.""" diff --git a/tests/test_cleanup.py b/tests/test_cleanup.py new file mode 100644 index 0000000..4ad4198 --- /dev/null +++ b/tests/test_cleanup.py @@ -0,0 +1,132 @@ +"""Tests for stale SSH forward cleanup.""" +from __future__ import annotations + +import textwrap +from unittest.mock import MagicMock, patch + +import pytest +from typer.testing import CliRunner + +from bridge.cleanup import ( + CleanupAction, + CleanupReport, + build_cron_line, + cleanup_all_tunnels, + remote_forward_health_url, + should_cleanup_tunnel, +) +from bridge.cli import app +from bridge.config import load_config +from bridge.models import HealthCheckConfig, TunnelConfig +from bridge.state import StateManager + + +def _tunnel(**overrides) -> TunnelConfig: + base = dict( + name="state-hub-railiance01", + host="92.205.62.239", + remote_port=18000, + local_port=8000, + ssh_user="tegwick", + ssh_key="~/.ssh/id_ops", + actor="agt-claude-railiance01", + health_check=HealthCheckConfig( + url="http://127.0.0.1:8000/state/health", + timeout_seconds=5, + ), + ) + base.update(overrides) + return TunnelConfig(**base) + + +class TestRemoteForwardHealthUrl: + def test_maps_local_port_to_remote(self): + cfg = _tunnel() + assert remote_forward_health_url(cfg) == "http://127.0.0.1:18000/state/health" + + def test_returns_none_for_local_tunnel(self): + cfg = _tunnel(direction="local") + assert remote_forward_health_url(cfg) is None + + +class TestShouldCleanupTunnel: + def test_skips_healthy_remote_forward(self, tmp_path): + cfg = _tunnel() + state_mgr = StateManager(state_dir=tmp_path) + with ( + patch("bridge.cleanup.remote_port_listening", return_value=True), + patch("bridge.cleanup.probe_remote_forward", return_value=(True, "ok")), + ): + needed, reason = should_cleanup_tunnel(cfg, state_mgr) + assert needed is False + + def test_detects_stale_forward_when_local_ok_remote_fails(self, tmp_path): + cfg = _tunnel() + state_mgr = StateManager(state_dir=tmp_path) + with ( + patch("bridge.cleanup.remote_port_listening", return_value=True), + patch("bridge.cleanup.probe_remote_forward", return_value=(False, "timeout")), + patch("bridge.cleanup.local_service_healthy", return_value=True), + patch( + "bridge.cleanup.check_tunnel", + return_value=MagicMock(ssh_process="ok", remote_port="listening"), + ), + ): + needed, reason = should_cleanup_tunnel(cfg, state_mgr) + assert needed is True + assert "stale forward" in reason + + +class TestCleanupAllTunnels: + def test_reports_cleaned_tunnel(self, tmp_path, monkeypatch): + monkeypatch.setenv("BRIDGE_CONFIG", str(tmp_path / "tunnels.yaml")) + (tmp_path / "tunnels.yaml").write_text( + textwrap.dedent( + """\ + tunnels: + state-hub-railiance01: + host: 92.205.62.239 + remote_port: 18000 + local_port: 8000 + ssh_user: tegwick + ssh_key: ~/.ssh/id_ops + actor: agt-claude-railiance01 + health_check: + url: http://127.0.0.1:8000/state/health + actors: + agt-claude-railiance01: + class: agt + """ + ) + ) + cfg = load_config() + state_mgr = StateManager(state_dir=tmp_path / "state") + with patch( + "bridge.cleanup.cleanup_tunnel", + return_value=CleanupAction("state-hub-railiance01", "cleaned", "cleared"), + ): + report = cleanup_all_tunnels(cfg, state_mgr, restart=False) + assert report.cleaned_count == 1 + assert report.actions[0].action == "cleaned" + + +class TestMaintenanceCli: + def test_cleanup_help(self): + runner = CliRunner() + result = runner.invoke(app, ["maintenance", "cleanup", "--help"]) + assert result.exit_code == 0 + assert "restart" in result.output.lower() + + def test_show_cron_prints_template_when_not_installed(self): + runner = CliRunner() + with patch("bridge.cli.read_installed_cron", return_value=None): + result = runner.invoke(app, ["maintenance", "show-cron"]) + assert result.exit_code == 0 + assert "0 3 * * *" in result.output + + +def test_build_cron_line_contains_marker(): + line = build_cron_line() + assert "0 3 * * *" in line + assert "maintenance cleanup --restart" in line + assert "ops-bridge: maintenance cleanup" in line \ No newline at end of file