Files
ops-bridge/tests/test_manager.py
tegwick 365c0d611a feat(BRIDGE-WP-0003): MCP server, /bridge-status skill, cross-mode coverage enforcement
Implements the full BRIDGE-WP-0003 workplan: 188 tests passing, 0 lint errors.

## What's added

**Capability registry** (`src/bridge/capabilities.py`):
- 10 capabilities with required_access_modes (cli/mcp/skill)
- Single source of truth for what OpsBridge does and where

**MCP server** (`src/bridge/mcp_server/server.py`):
- 10 FastMCP tools: bridge_up/down/restart/status/logs + 5 catalog_* tools
- 3 resources: bridge://status, catalog://domains, catalog://targets
- `.mcp.json` for project-scope auto-registration
- `scripts/register_mcp.py` for user-scope machine-global registration

**Skill** (`~/.claude/plugins/ops-bridge/bridge-status.md`):
- /bridge-status: health table with emoji indicators + remediation advice

**Cross-mode test coverage enforcement**:
- `tests/conftest.py`: capability/access_mode marks + collect_capability_coverage()
- `tests/test_mcp.py`: 31 FastMCP in-process client tests (Client(mcp) pattern)
- `tests/test_skill.py`: static skill lint against capability registry
- `tests/test_coverage_completeness.py`: meta-test that fails if any required
  (capability × mode) pair lacks a test; also validates CLI commands and MCP
  tools are registered in the capability registry

**ADR** (`architecture/adr-001-cross-mode-capability-registry.md`):
- Documents the registry pattern and FastMCP 3.x testing approach

Key implementation note: FastMCP 3.x in-process results are in
result.content[0].text (JSON string), not result.data directly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 11:33:16 +01:00

108 lines
3.6 KiB
Python

"""Tests for TunnelManager."""
import os
import signal
from unittest.mock import MagicMock, patch
import pytest
from bridge.models import BridgeState, ReconnectPolicy, TunnelConfig
from bridge.manager import TunnelManager, build_ssh_command
@pytest.fixture
def tunnel_cfg():
return TunnelConfig(
name="test-tunnel",
host="host.local",
remote_port=18000,
local_port=8000,
ssh_user="ubuntu",
ssh_key="~/.ssh/id_ops",
actor="operator.bernd",
reconnect=ReconnectPolicy(max_attempts=3, backoff_initial=1, backoff_max=5),
)
@pytest.fixture
def state_dir(tmp_path):
return tmp_path / "bridge"
class TestBuildSshCommand:
def test_basic_command(self, tunnel_cfg):
cmd = build_ssh_command(tunnel_cfg)
assert cmd[0] == "ssh"
assert "-N" in cmd
assert "-R" in cmd
assert "18000:127.0.0.1:8000" in cmd
assert "-i" in cmd
assert "ubuntu@host.local" in cmd
def test_server_alive_options(self, tunnel_cfg):
cmd = build_ssh_command(tunnel_cfg)
assert "-o" in cmd
assert "ServerAliveInterval=10" in cmd
assert "ExitOnForwardFailure=yes" in cmd
def test_ssh_key_expanded(self, tunnel_cfg):
cmd = build_ssh_command(tunnel_cfg)
key_idx = cmd.index("-i") + 1
assert not cmd[key_idx].startswith("~")
class TestTunnelManager:
def test_get_state_initial(self, tunnel_cfg, state_dir):
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
assert mgr.get_state() == BridgeState.STOPPED
def test_stop_when_not_running_is_noop(self, tunnel_cfg, state_dir):
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
# Should not raise
mgr.stop()
assert mgr.get_state() == BridgeState.STOPPED
def test_stop_kills_pid(self, tunnel_cfg, state_dir):
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
# Write a fake PID of our own process to simulate running
mgr._state.write_pid(tunnel_cfg.name, os.getpid())
mgr._state.write_state(tunnel_cfg.name, BridgeState.CONNECTED)
with patch("os.kill") as mock_kill:
mgr.stop()
# Should have sent SIGTERM
mock_kill.assert_any_call(os.getpid(), signal.SIGTERM)
assert mgr.get_state() == BridgeState.STOPPED
def test_backoff_calculation(self, tunnel_cfg, state_dir):
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
# First backoff = initial
assert mgr._next_backoff(0) == 1
# Doubles each time up to max
assert mgr._next_backoff(1) == 2
assert mgr._next_backoff(2) == 4
assert mgr._next_backoff(3) == 5 # capped at max
def test_start_daemonizes(self, tunnel_cfg, state_dir):
"""Verify start() forks without hanging."""
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
# We can't actually fork in tests; verify state transitions via mock
with patch("subprocess.Popen") as mock_popen, \
patch("os.fork", return_value=1234), \
patch("os.setsid"), \
patch("os._exit"):
mock_proc = MagicMock()
mock_proc.pid = 9999
mock_popen.return_value = mock_proc
# When fork returns non-zero we're the parent — just check PID written
mgr.start()
# After start the state should be STARTING (set before fork)
# and PID file should exist (written in parent branch)
def test_is_running_false_initially(self, tunnel_cfg, state_dir):
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
assert not mgr.is_running()