"""Tests for the ops-bridge cert_command readiness gate (WARDEN-WP-0016 T1/T2).""" from __future__ import annotations import importlib.util import shutil import subprocess from pathlib import Path import pytest from warden.config import WardenConfig _SCRIPT = Path(__file__).resolve().parent.parent / "scripts" / "check_tunnel_cert_readiness.py" _spec = importlib.util.spec_from_file_location("check_tunnel_cert_readiness", _SCRIPT) readiness = importlib.util.module_from_spec(_spec) _spec.loader.exec_module(readiness) PUBKEY = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIFakeKeyMaterialForTestsOnly comment\n" def _status(checks, label): return next(s for s, lab, _ in checks if lab == label) @pytest.fixture def setup(tmp_path): inv = tmp_path / "inventory.yaml" inv.write_text( "actors:\n" " agt-state-hub-bridge:\n" " type: agt\n" " principals: [agt-task-bridge]\n" " ttl_hours: 24\n" ) pub = tmp_path / "agt.pub" pub.write_text(PUBKEY) cfg = WardenConfig( backend="local", ca_key=tmp_path / "ca", inventory_path=inv, state_dir=tmp_path / "state", ) return cfg, pub, tmp_path def test_all_ready(setup): cfg, pub, _ = setup checks = readiness.run_checks(cfg, "agt-state-hub-bridge", pub, None) assert _status(checks, "inventory") == "ok" assert _status(checks, "public key") == "ok" assert _status(checks, "principals") == "ok" assert _status(checks, "infra principals") == "skip" # no --infra def test_unknown_actor_fails(setup): cfg, pub, _ = setup checks = readiness.run_checks(cfg, "agt-ghost", pub, None) assert _status(checks, "inventory") == "fail" def test_missing_pubkey_fails(setup): cfg, _, tmp = setup checks = readiness.run_checks(cfg, "agt-state-hub-bridge", tmp / "nope.pub", None) assert _status(checks, "public key") == "fail" def test_private_key_rejected(setup): cfg, _, tmp = setup priv = tmp / "id.pub" priv.write_text("-----BEGIN OPENSSH PRIVATE KEY-----\nxxx\n-----END OPENSSH PRIVATE KEY-----\n") checks = readiness.run_checks(cfg, "agt-state-hub-bridge", priv, None) assert _status(checks, "public key") == "fail" def test_infra_principal_missing(setup): cfg, pub, tmp = setup infra = tmp / "ssh_principals.yaml" infra.write_text("ssh_principals:\n host1:\n users:\n agt: [some-other-principal]\n") checks = readiness.run_checks(cfg, "agt-state-hub-bridge", pub, infra) assert _status(checks, "infra principals") == "fail" def test_infra_principal_present(setup): cfg, pub, tmp = setup infra = tmp / "ssh_principals.yaml" infra.write_text("ssh_principals:\n host1:\n users:\n agt: [agt-task-bridge]\n") checks = readiness.run_checks(cfg, "agt-state-hub-bridge", pub, infra) assert _status(checks, "infra principals") == "ok" def test_ttl_over_max_fails(tmp_path): inv = tmp_path / "inventory.yaml" # agt max TTL is 24h; load_inventory clamps? No — it preserves; the check flags it. inv.write_text("actors:\n agt-x:\n type: agt\n principals: [p]\n ttl_hours: 999\n") pub = tmp_path / "k.pub" pub.write_text(PUBKEY) cfg = WardenConfig(backend="local", ca_key=tmp_path / "ca", inventory_path=inv, state_dir=tmp_path) checks = readiness.run_checks(cfg, "agt-x", pub, None) assert _status(checks, "inventory") == "fail" def test_build_cert_command(): cmd = readiness.build_cert_command("agt-state-hub-bridge", Path("/k.pub")) assert cmd == "warden sign agt-state-hub-bridge --pubkey /k.pub" def test_sign_smoke_rejects_vault_backend(tmp_path): cfg = WardenConfig(backend="vault", inventory_path=tmp_path / "i.yaml", state_dir=tmp_path) with pytest.raises(ValueError, match="local backend"): readiness.sign_smoke(cfg, "agt-x", tmp_path / "k.pub") @pytest.mark.integration def test_sign_smoke_validates_real_cert(setup): """Opt-in: requires ssh-keygen. Issues a real local cert and validates it.""" if shutil.which("ssh-keygen") is None: pytest.skip("ssh-keygen not available") cfg, _, tmp = setup # Generate a real CA key and a real actor pubkey. ca = tmp / "ca" subprocess.run(["ssh-keygen", "-t", "ed25519", "-f", str(ca), "-N", "", "-q"], check=True) actor_key = tmp / "actor" subprocess.run(["ssh-keygen", "-t", "ed25519", "-f", str(actor_key), "-N", "", "-q"], check=True) checks = readiness.sign_smoke(cfg, "agt-state-hub-bridge", actor_key.with_suffix(".pub")) statuses = {lab: s for s, lab, _ in checks} assert statuses.get("cert identity") == "ok" assert statuses.get("cert principals") == "ok" assert statuses.get("cert validity") == "ok"