feat(directive): implement BRIDGE-WP-0004 AccessManagementDirective alignment

- ActorType enum (adm/agt/atm) replaces actor_class string; config validates
  naming convention (adm-*/agt-*/atm-*) with hard ConfigError on mismatch;
  legacy 'human'/'automation' values accepted with DeprecationWarning
- cert_command: pluggable shell string run before each SSH launch; cert written
  to state dir; -i cert appended to SSH command alongside -i key
- TTL-aware cert refresh: parses Valid-to via ssh-keygen -L; pre-emptive restart
  5 min before expiry (no backoff, no attempt increment); CERT_EXPIRING logged
- CertAcquisitionError: cert failures trigger normal backoff/retry loop
- cert_identity: Key ID parsed from cert and recorded in BRIDGE_CONNECTED event
- bridge cert-status: new CLI command; exit 1 on expired cert; --json flag
- 233 tests passing, ruff clean

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-15 09:38:29 +02:00
parent 22601ef3e6
commit bd169a07e2
17 changed files with 730 additions and 145 deletions

View File

@@ -105,3 +105,99 @@ class TestTunnelManager:
def test_is_running_false_initially(self, tunnel_cfg, state_dir):
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
assert not mgr.is_running()
class TestBuildSshCommandWithCert:
def test_no_cert_path_omits_extra_i(self, tunnel_cfg):
cmd = build_ssh_command(tunnel_cfg)
assert cmd.count("-i") == 1
def test_cert_path_appends_after_key(self, tunnel_cfg, tmp_path):
cert = tmp_path / "test-cert.pub"
cert.write_text("cert")
cmd = build_ssh_command(tunnel_cfg, cert_path=cert)
i_indices = [i for i, x in enumerate(cmd) if x == "-i"]
assert len(i_indices) == 2
key_idx, cert_idx = i_indices
assert not cmd[key_idx + 1].endswith("-cert.pub") # key comes first
assert cmd[cert_idx + 1] == str(cert)
class TestRunCertCommand:
def test_returns_none_when_no_cert_command(self, tunnel_cfg, tmp_path):
from bridge.manager import _run_cert_command
assert _run_cert_command(tunnel_cfg, tmp_path) is None
def test_writes_cert_and_returns_path(self, tunnel_cfg, tmp_path):
from bridge.manager import _run_cert_command
tunnel_cfg.cert_command = "echo 'ssh-rsa-cert AAAA'"
path = _run_cert_command(tunnel_cfg, tmp_path)
assert path is not None
assert path.exists()
assert "ssh-rsa-cert" in path.read_text()
def test_raises_on_nonzero_exit(self, tunnel_cfg, tmp_path):
from bridge.manager import _run_cert_command
from bridge.models import CertAcquisitionError
tunnel_cfg.cert_command = "exit 1"
with pytest.raises(CertAcquisitionError):
_run_cert_command(tunnel_cfg, tmp_path)
class TestActorTypeFromName:
def test_adm_prefix(self):
from bridge.manager import _actor_type_from_name
assert _actor_type_from_name("adm-bernd") == "adm"
def test_agt_prefix(self):
from bridge.manager import _actor_type_from_name
assert _actor_type_from_name("agt-claude") == "agt"
def test_atm_prefix(self):
from bridge.manager import _actor_type_from_name
assert _actor_type_from_name("atm-cron") == "atm"
def test_unknown_prefix(self):
from bridge.manager import _actor_type_from_name
assert _actor_type_from_name("operator.bernd") == "unknown"
class TestTtlRefresh:
def test_parse_cert_expiry_returns_none_for_missing_file(self, tmp_path):
from bridge.manager import _parse_cert_expiry
missing = tmp_path / "no.pub"
result = _parse_cert_expiry(missing)
assert result is None
def test_parse_cert_identity_returns_none_for_missing_file(self, tmp_path):
from bridge.manager import _parse_cert_identity
missing = tmp_path / "no.pub"
result = _parse_cert_identity(missing)
assert result is None
def test_parse_cert_identity_from_keygen_output(self, tmp_path):
from unittest.mock import patch, MagicMock
from bridge.manager import _parse_cert_identity
cert = tmp_path / "test.pub"
cert.write_text("fake")
with patch("subprocess.run") as mock_run:
mock_run.return_value = MagicMock(
stdout='test.pub:\n Key ID: "agt-bridge"\n',
returncode=0,
)
result = _parse_cert_identity(cert)
assert result == "agt-bridge"
def test_parse_cert_expiry_from_keygen_output(self, tmp_path):
from unittest.mock import patch, MagicMock
from bridge.manager import _parse_cert_expiry
cert = tmp_path / "test.pub"
cert.write_text("fake")
with patch("subprocess.run") as mock_run:
mock_run.return_value = MagicMock(
stdout="test.pub:\n Valid: from 2026-05-15T10:00:00 to 2030-05-15T22:00:00\n",
returncode=0,
)
result = _parse_cert_expiry(cert)
assert result is not None
assert result.year == 2030