Files
ops-bridge/tests/test_integration.py
tegwick bd169a07e2 feat(directive): implement BRIDGE-WP-0004 AccessManagementDirective alignment
- ActorType enum (adm/agt/atm) replaces actor_class string; config validates
  naming convention (adm-*/agt-*/atm-*) with hard ConfigError on mismatch;
  legacy 'human'/'automation' values accepted with DeprecationWarning
- cert_command: pluggable shell string run before each SSH launch; cert written
  to state dir; -i cert appended to SSH command alongside -i key
- TTL-aware cert refresh: parses Valid-to via ssh-keygen -L; pre-emptive restart
  5 min before expiry (no backoff, no attempt increment); CERT_EXPIRING logged
- CertAcquisitionError: cert failures trigger normal backoff/retry loop
- cert_identity: Key ID parsed from cert and recorded in BRIDGE_CONNECTED event
- bridge cert-status: new CLI command; exit 1 on expired cert; --json flag
- 233 tests passing, ruff clean

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 09:38:29 +02:00

214 lines
6.8 KiB
Python

"""Integration tests for OpsBridge."""
import textwrap
from unittest.mock import MagicMock, patch
import pytest
from bridge.config import load_config
from bridge.manager import TunnelManager
from bridge.models import BridgeState, ReconnectPolicy, TunnelConfig
from bridge.state import StateManager
MINIMAL_CONFIG = textwrap.dedent("""\
tunnels:
local-test:
host: 127.0.0.1
remote_port: 19000
local_port: 8000
ssh_user: testuser
ssh_key: ~/.ssh/id_rsa
actor: adm-bernd
reconnect:
max_attempts: 2
backoff_initial: 1
backoff_max: 2
actors:
adm-bernd:
class: adm
description: Bernd
""")
@pytest.fixture
def config_file(tmp_path):
f = tmp_path / "tunnels.yaml"
f.write_text(MINIMAL_CONFIG)
return f
@pytest.fixture
def state_dir(tmp_path):
return tmp_path / "bridge"
@pytest.fixture
def tunnel_cfg():
return TunnelConfig(
name="local-test",
host="127.0.0.1",
remote_port=19000,
local_port=8000,
ssh_user="testuser",
ssh_key="~/.ssh/id_rsa",
actor="adm-bernd",
reconnect=ReconnectPolicy(max_attempts=2, backoff_initial=1, backoff_max=2),
)
class TestConfigRoundtrip:
def test_load_config_from_file(self, config_file, monkeypatch):
monkeypatch.setenv("BRIDGE_CONFIG", str(config_file))
cfg = load_config()
assert "local-test" in cfg.tunnels
t = cfg.tunnels["local-test"]
assert t.host == "127.0.0.1"
assert t.reconnect.max_attempts == 2
assert t.reconnect.backoff_initial == 1
class TestStateRoundtrip:
def test_state_persists_across_manager_instances(self, state_dir, tunnel_cfg):
mgr1 = TunnelManager(tunnel_cfg, state_dir=state_dir)
mgr1._state.write_state(tunnel_cfg.name, BridgeState.CONNECTED)
mgr2 = TunnelManager(tunnel_cfg, state_dir=state_dir)
assert mgr2.get_state() == BridgeState.CONNECTED
def test_stale_pid_cleanup(self, state_dir, tunnel_cfg):
sm = StateManager(state_dir=state_dir)
sm.write_pid(tunnel_cfg.name, 999999) # guaranteed not alive
sm.write_state(tunnel_cfg.name, BridgeState.CONNECTED)
# is_running should return False for dead pid
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
assert not mgr.is_running()
class TestReconnectLoop:
def test_reconnect_loop_gives_up_after_max_attempts(self, state_dir, tunnel_cfg):
"""Manager should set FAILED state after exhausting max_attempts."""
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
attempt_count = [0]
def fake_popen(cmd, **kwargs):
proc = MagicMock()
proc.poll.return_value = 1 # immediately "dead"
proc.returncode = 1
attempt_count[0] += 1
return proc
with patch("subprocess.Popen", side_effect=fake_popen), \
patch("time.sleep"): # skip sleeps for speed
mgr._run_loop()
assert attempt_count[0] >= 1
assert mgr.get_state() == BridgeState.FAILED
def test_reconnect_logs_events(self, state_dir, tunnel_cfg):
"""Audit log should contain reconnect events."""
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
def fake_popen(cmd, **kwargs):
proc = MagicMock()
proc.poll.return_value = 1
proc.returncode = 1
return proc
with patch("subprocess.Popen", side_effect=fake_popen), \
patch("time.sleep"):
mgr._run_loop()
events = mgr._audit.read_events(tunnel_cfg.name)
event_types = [e["event"] for e in events]
assert "bridge_started" in event_types or "bridge_reconnecting" in event_types or "bridge_disconnected" in event_types
class TestHealthCheckDegradedPath:
def test_degraded_state_on_health_failure(self, state_dir):
"""Health check failure sets state to DEGRADED."""
from bridge.health import HealthResult
hc_cfg = MagicMock()
hc_cfg.url = "http://127.0.0.1:19001/health"
hc_cfg.interval_seconds = 0
hc_cfg.timeout_seconds = 1
tunnel_cfg = TunnelConfig(
name="hc-test",
host="127.0.0.1",
remote_port=19001,
local_port=8001,
ssh_user="u",
ssh_key="k",
actor="adm-bernd",
reconnect=ReconnectPolicy(max_attempts=1, backoff_initial=1, backoff_max=1),
health_check=hc_cfg,
)
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
proc_call_count = [0]
def fake_popen(cmd, **kwargs):
proc = MagicMock()
# First call: "alive" for 1 health check cycle then dies
proc_call_count[0] += 1
if proc_call_count[0] == 1:
# Poll returns None (alive) once then dies
poll_calls = [None, 1]
proc.poll.side_effect = poll_calls + [1] * 100
proc.returncode = 1
else:
proc.poll.return_value = 1
proc.returncode = 1
return proc
failed_result = HealthResult(ok=False, error="connection refused")
async def fake_check_failing():
return failed_result
with patch("subprocess.Popen", side_effect=fake_popen), \
patch("time.sleep"), \
patch("bridge.manager.HealthChecker") as mock_hc_cls:
mock_checker = MagicMock()
mock_checker.check = MagicMock(side_effect=lambda: failed_result)
# Use asyncio.run compatibility
mock_hc_cls.return_value = mock_checker
with patch("asyncio.run", side_effect=lambda coro: failed_result):
mgr._run_loop()
# Should have set degraded at some point — check audit log
events = mgr._audit.read_events("hc-test")
event_types = [e["event"] for e in events]
assert "health_check_failed" in event_types or "bridge_disconnected" in event_types
class TestAuditTrail:
def test_full_lifecycle_logged(self, state_dir, tunnel_cfg):
"""A start + immediate-exit SSH produces at minimum started + disconnected events."""
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
def fake_popen(cmd, **kwargs):
proc = MagicMock()
proc.poll.return_value = 1
proc.returncode = 1
return proc
with patch("subprocess.Popen", side_effect=fake_popen), \
patch("time.sleep"):
mgr._run_loop()
events = mgr._audit.read_events(tunnel_cfg.name)
assert len(events) >= 2
# Each event has required fields
for e in events:
assert "timestamp" in e
assert "tunnel" in e
assert "actor" in e
assert "event" in e