Files
ops-bridge/tests/test_models.py
tegwick bd169a07e2 feat(directive): implement BRIDGE-WP-0004 AccessManagementDirective alignment
- ActorType enum (adm/agt/atm) replaces actor_class string; config validates
  naming convention (adm-*/agt-*/atm-*) with hard ConfigError on mismatch;
  legacy 'human'/'automation' values accepted with DeprecationWarning
- cert_command: pluggable shell string run before each SSH launch; cert written
  to state dir; -i cert appended to SSH command alongside -i key
- TTL-aware cert refresh: parses Valid-to via ssh-keygen -L; pre-emptive restart
  5 min before expiry (no backoff, no attempt increment); CERT_EXPIRING logged
- CertAcquisitionError: cert failures trigger normal backoff/retry loop
- cert_identity: Key ID parsed from cert and recorded in BRIDGE_CONNECTED event
- bridge cert-status: new CLI command; exit 1 on expired cert; --json flag
- 233 tests passing, ruff clean

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 09:38:29 +02:00

76 lines
2.1 KiB
Python

"""Tests for domain models."""
from bridge.models import (
ActorInfo,
BridgeState,
HealthCheckConfig,
ReconnectPolicy,
TunnelConfig,
)
class TestBridgeState:
def test_all_states_defined(self):
states = {s.value for s in BridgeState}
assert states == {"stopped", "starting", "connected", "degraded", "reconnecting", "failed"}
def test_state_is_string(self):
assert BridgeState.STOPPED == "stopped"
class TestReconnectPolicy:
def test_defaults(self):
p = ReconnectPolicy()
assert p.max_attempts == 0
assert p.backoff_initial == 5
assert p.backoff_max == 60
def test_custom(self):
p = ReconnectPolicy(max_attempts=3, backoff_initial=2, backoff_max=30)
assert p.max_attempts == 3
class TestHealthCheckConfig:
def test_required_url(self):
h = HealthCheckConfig(url="http://127.0.0.1:18000/health")
assert h.url == "http://127.0.0.1:18000/health"
assert h.interval_seconds == 30
assert h.timeout_seconds == 5
class TestTunnelConfig:
def test_minimal(self):
t = TunnelConfig(
name="test-tunnel",
host="host.local",
remote_port=18000,
local_port=8000,
ssh_user="ubuntu",
ssh_key="~/.ssh/id_ops",
actor="operator.bernd",
)
assert t.name == "test-tunnel"
assert t.health_check is None
assert isinstance(t.reconnect, ReconnectPolicy)
def test_with_health_check(self):
hc = HealthCheckConfig(url="http://127.0.0.1:18000/health")
t = TunnelConfig(
name="test",
host="h",
remote_port=1,
local_port=2,
ssh_user="u",
ssh_key="k",
actor="a",
health_check=hc,
)
assert t.health_check is hc
class TestActorInfo:
def test_fields(self):
from bridge.models import ActorType
a = ActorInfo(name="adm-bernd", actor_type=ActorType.ADM, description="Bernd")
assert a.name == "adm-bernd"
assert a.actor_type == ActorType.ADM