"""Tests for TunnelManager.""" import os import signal from unittest.mock import MagicMock, patch import pytest from bridge.models import BridgeState, ReconnectPolicy, TunnelConfig from bridge.manager import TunnelManager, build_ssh_command @pytest.fixture def tunnel_cfg(): return TunnelConfig( name="test-tunnel", host="host.local", remote_port=18000, local_port=8000, ssh_user="ubuntu", ssh_key="~/.ssh/id_ops", actor="operator.bernd", reconnect=ReconnectPolicy(max_attempts=3, backoff_initial=1, backoff_max=5), ) @pytest.fixture def state_dir(tmp_path): return tmp_path / "bridge" class TestBuildSshCommand: def test_basic_command(self, tunnel_cfg): cmd = build_ssh_command(tunnel_cfg) assert cmd[0] == "ssh" assert "-N" in cmd assert "-R" in cmd assert "18000:127.0.0.1:8000" in cmd assert "-i" in cmd assert "ubuntu@host.local" in cmd def test_server_alive_options(self, tunnel_cfg): cmd = build_ssh_command(tunnel_cfg) assert "-o" in cmd assert "ServerAliveInterval=10" in cmd assert "ExitOnForwardFailure=yes" in cmd def test_ssh_key_expanded(self, tunnel_cfg): cmd = build_ssh_command(tunnel_cfg) key_idx = cmd.index("-i") + 1 assert not cmd[key_idx].startswith("~") class TestTunnelManager: def test_get_state_initial(self, tunnel_cfg, state_dir): mgr = TunnelManager(tunnel_cfg, state_dir=state_dir) assert mgr.get_state() == BridgeState.STOPPED def test_stop_when_not_running_is_noop(self, tunnel_cfg, state_dir): mgr = TunnelManager(tunnel_cfg, state_dir=state_dir) # Should not raise mgr.stop() assert mgr.get_state() == BridgeState.STOPPED def test_stop_kills_pid(self, tunnel_cfg, state_dir): mgr = TunnelManager(tunnel_cfg, state_dir=state_dir) # Write a fake PID of our own process to simulate running mgr._state.write_pid(tunnel_cfg.name, os.getpid()) mgr._state.write_state(tunnel_cfg.name, BridgeState.CONNECTED) with patch("os.kill") as mock_kill: mgr.stop() # Should have sent SIGTERM mock_kill.assert_any_call(os.getpid(), signal.SIGTERM) assert mgr.get_state() == BridgeState.STOPPED def test_backoff_calculation(self, tunnel_cfg, state_dir): mgr = TunnelManager(tunnel_cfg, state_dir=state_dir) # First backoff = initial assert mgr._next_backoff(0) == 1 # Doubles each time up to max assert mgr._next_backoff(1) == 2 assert mgr._next_backoff(2) == 4 assert mgr._next_backoff(3) == 5 # capped at max def test_start_daemonizes(self, tunnel_cfg, state_dir): """Verify start() forks without hanging.""" mgr = TunnelManager(tunnel_cfg, state_dir=state_dir) # We can't actually fork in tests; verify state transitions via mock with patch("subprocess.Popen") as mock_popen, \ patch("os.fork", return_value=1234), \ patch("os.setsid"), \ patch("os._exit"): mock_proc = MagicMock() mock_proc.pid = 9999 mock_popen.return_value = mock_proc # When fork returns non-zero we're the parent — just check PID written mgr.start() # After start the state should be STARTING (set before fork) # and PID file should exist (written in parent branch) def test_is_running_false_initially(self, tunnel_cfg, state_dir): mgr = TunnelManager(tunnel_cfg, state_dir=state_dir) assert not mgr.is_running()