generated from coulomb/repo-seed
Implements the full BRIDGE-WP-0003 workplan: 188 tests passing, 0 lint errors. ## What's added **Capability registry** (`src/bridge/capabilities.py`): - 10 capabilities with required_access_modes (cli/mcp/skill) - Single source of truth for what OpsBridge does and where **MCP server** (`src/bridge/mcp_server/server.py`): - 10 FastMCP tools: bridge_up/down/restart/status/logs + 5 catalog_* tools - 3 resources: bridge://status, catalog://domains, catalog://targets - `.mcp.json` for project-scope auto-registration - `scripts/register_mcp.py` for user-scope machine-global registration **Skill** (`~/.claude/plugins/ops-bridge/bridge-status.md`): - /bridge-status: health table with emoji indicators + remediation advice **Cross-mode test coverage enforcement**: - `tests/conftest.py`: capability/access_mode marks + collect_capability_coverage() - `tests/test_mcp.py`: 31 FastMCP in-process client tests (Client(mcp) pattern) - `tests/test_skill.py`: static skill lint against capability registry - `tests/test_coverage_completeness.py`: meta-test that fails if any required (capability × mode) pair lacks a test; also validates CLI commands and MCP tools are registered in the capability registry **ADR** (`architecture/adr-001-cross-mode-capability-registry.md`): - Documents the registry pattern and FastMCP 3.x testing approach Key implementation note: FastMCP 3.x in-process results are in result.content[0].text (JSON string), not result.data directly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
108 lines
3.6 KiB
Python
108 lines
3.6 KiB
Python
"""Tests for TunnelManager."""
|
|
import os
|
|
import signal
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from bridge.models import BridgeState, ReconnectPolicy, TunnelConfig
|
|
from bridge.manager import TunnelManager, build_ssh_command
|
|
|
|
|
|
@pytest.fixture
|
|
def tunnel_cfg():
|
|
return TunnelConfig(
|
|
name="test-tunnel",
|
|
host="host.local",
|
|
remote_port=18000,
|
|
local_port=8000,
|
|
ssh_user="ubuntu",
|
|
ssh_key="~/.ssh/id_ops",
|
|
actor="operator.bernd",
|
|
reconnect=ReconnectPolicy(max_attempts=3, backoff_initial=1, backoff_max=5),
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def state_dir(tmp_path):
|
|
return tmp_path / "bridge"
|
|
|
|
|
|
class TestBuildSshCommand:
|
|
def test_basic_command(self, tunnel_cfg):
|
|
cmd = build_ssh_command(tunnel_cfg)
|
|
assert cmd[0] == "ssh"
|
|
assert "-N" in cmd
|
|
assert "-R" in cmd
|
|
assert "18000:127.0.0.1:8000" in cmd
|
|
assert "-i" in cmd
|
|
assert "ubuntu@host.local" in cmd
|
|
|
|
def test_server_alive_options(self, tunnel_cfg):
|
|
cmd = build_ssh_command(tunnel_cfg)
|
|
assert "-o" in cmd
|
|
assert "ServerAliveInterval=10" in cmd
|
|
assert "ExitOnForwardFailure=yes" in cmd
|
|
|
|
def test_ssh_key_expanded(self, tunnel_cfg):
|
|
cmd = build_ssh_command(tunnel_cfg)
|
|
key_idx = cmd.index("-i") + 1
|
|
assert not cmd[key_idx].startswith("~")
|
|
|
|
|
|
class TestTunnelManager:
|
|
def test_get_state_initial(self, tunnel_cfg, state_dir):
|
|
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
|
|
assert mgr.get_state() == BridgeState.STOPPED
|
|
|
|
def test_stop_when_not_running_is_noop(self, tunnel_cfg, state_dir):
|
|
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
|
|
# Should not raise
|
|
mgr.stop()
|
|
assert mgr.get_state() == BridgeState.STOPPED
|
|
|
|
def test_stop_kills_pid(self, tunnel_cfg, state_dir):
|
|
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
|
|
# Write a fake PID of our own process to simulate running
|
|
mgr._state.write_pid(tunnel_cfg.name, os.getpid())
|
|
mgr._state.write_state(tunnel_cfg.name, BridgeState.CONNECTED)
|
|
|
|
with patch("os.kill") as mock_kill:
|
|
mgr.stop()
|
|
|
|
# Should have sent SIGTERM
|
|
mock_kill.assert_any_call(os.getpid(), signal.SIGTERM)
|
|
assert mgr.get_state() == BridgeState.STOPPED
|
|
|
|
def test_backoff_calculation(self, tunnel_cfg, state_dir):
|
|
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
|
|
# First backoff = initial
|
|
assert mgr._next_backoff(0) == 1
|
|
# Doubles each time up to max
|
|
assert mgr._next_backoff(1) == 2
|
|
assert mgr._next_backoff(2) == 4
|
|
assert mgr._next_backoff(3) == 5 # capped at max
|
|
|
|
def test_start_daemonizes(self, tunnel_cfg, state_dir):
|
|
"""Verify start() forks without hanging."""
|
|
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
|
|
|
|
# We can't actually fork in tests; verify state transitions via mock
|
|
with patch("subprocess.Popen") as mock_popen, \
|
|
patch("os.fork", return_value=1234), \
|
|
patch("os.setsid"), \
|
|
patch("os._exit"):
|
|
mock_proc = MagicMock()
|
|
mock_proc.pid = 9999
|
|
mock_popen.return_value = mock_proc
|
|
|
|
# When fork returns non-zero we're the parent — just check PID written
|
|
mgr.start()
|
|
|
|
# After start the state should be STARTING (set before fork)
|
|
# and PID file should exist (written in parent branch)
|
|
|
|
def test_is_running_false_initially(self, tunnel_cfg, state_dir):
|
|
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
|
|
assert not mgr.is_running()
|