"""Tests for warden.ca — LocalCA and parse_cert_metadata.""" from datetime import datetime, timezone from pathlib import Path from unittest.mock import MagicMock, patch import pytest from warden.ca import CAError, LocalCA, parse_cert_metadata from warden.models import ActorType, CertSpec SAMPLE_SSHKEYGEN_L = """\ /tmp/key-cert.pub: Type: ssh-ed25519-cert-v01@openssh.com user certificate Public key: ED25519-CERT SHA256:abc123 Signing CA: ED25519 SHA256:xyz (using ssh-ed25519) Key ID: "agt-state-hub-bridge" Serial: 0 Valid: from 2026-03-28T10:00:00 to 2026-03-29T10:00:00 Principals: agt-task-bridge Critical Options: (none) Extensions: permit-pty """ CERT_CONTENT = "ssh-ed25519-cert-v01@openssh.com AAAA_fake_cert_data" def _mock_run_factory(cert_content: str): """Returns a mock subprocess.run that writes the cert file on sign and returns SAMPLE_SSHKEYGEN_L on -L.""" def mock_run(cmd, **kwargs): result = MagicMock() result.returncode = 0 result.stdout = "" result.stderr = "" if not isinstance(cmd, list) or not cmd: return result if cmd[0] == "ssh-keygen" and "-s" in cmd: # Signing: write cert next to the pubkey copy (last arg) pubkey_path = Path(cmd[-1]) cert_path = pubkey_path.parent / (pubkey_path.stem + "-cert.pub") cert_path.write_text(cert_content) elif cmd[0] == "ssh-keygen" and "-L" in cmd: result.stdout = SAMPLE_SSHKEYGEN_L return result return mock_run # --------------------------------------------------------------------------- # parse_cert_metadata # --------------------------------------------------------------------------- def test_parse_cert_metadata(tmp_path): cert_path = tmp_path / "key-cert.pub" cert_path.write_text(CERT_CONTENT) mock_result = MagicMock(returncode=0, stdout=SAMPLE_SSHKEYGEN_L, stderr="") with patch("warden.ca.subprocess.run", return_value=mock_result): meta = parse_cert_metadata(cert_path) assert meta["identity"] == "agt-state-hub-bridge" assert meta["principals"] == ["agt-task-bridge"] assert meta["valid_before"] == datetime(2026, 3, 29, 10, 0, 0, tzinfo=timezone.utc) def test_parse_cert_metadata_failure(tmp_path): cert_path = tmp_path / "key-cert.pub" cert_path.write_text("not a cert") mock_result = MagicMock(returncode=1, stdout="", stderr="not a certificate") with patch("warden.ca.subprocess.run", return_value=mock_result): with pytest.raises(CAError, match="ssh-keygen -L failed"): parse_cert_metadata(cert_path) def test_parse_cert_metadata_missing_valid_before(tmp_path): cert_path = tmp_path / "key-cert.pub" cert_path.write_text(CERT_CONTENT) output_no_valid = SAMPLE_SSHKEYGEN_L.replace( " Valid: from 2026-03-28T10:00:00 to 2026-03-29T10:00:00\n", "" ) mock_result = MagicMock(returncode=0, stdout=output_no_valid, stderr="") with patch("warden.ca.subprocess.run", return_value=mock_result): with pytest.raises(CAError, match="valid_before"): parse_cert_metadata(cert_path) # --------------------------------------------------------------------------- # LocalCA.sign # --------------------------------------------------------------------------- def test_local_ca_sign(tmp_path): ca_key = tmp_path / "ca_key" ca_key.write_text("fake-ca-private-key") pubkey = tmp_path / "key.pub" pubkey.write_text("ssh-ed25519 AAAA actor-key") spec = CertSpec( actor_name="agt-state-hub-bridge", actor_type=ActorType.AGT, pubkey_path=pubkey, ttl_hours=24, principals=["agt-task-bridge"], identity="agt-state-hub-bridge", ) with patch("warden.ca.subprocess.run", side_effect=_mock_run_factory(CERT_CONTENT)): ca = LocalCA(ca_key, tmp_path / "state") record = ca.sign(spec) assert record.identity == "agt-state-hub-bridge" assert record.actor_name == "agt-state-hub-bridge" assert record.principals == ["agt-task-bridge"] cert_dest = tmp_path / "state" / "agt-state-hub-bridge-cert.pub" assert cert_dest.exists() assert cert_dest.read_text().strip() == CERT_CONTENT def test_local_ca_sign_missing_pubkey(tmp_path): ca_key = tmp_path / "ca_key" ca_key.write_text("fake-ca") spec = CertSpec( actor_name="agt-test", actor_type=ActorType.AGT, pubkey_path=tmp_path / "nonexistent.pub", ttl_hours=24, principals=["agt-test"], ) ca = LocalCA(ca_key, tmp_path / "state") with pytest.raises(CAError, match="Public key not found"): ca.sign(spec) def test_local_ca_sign_missing_ca_key(tmp_path): pubkey = tmp_path / "key.pub" pubkey.write_text("ssh-ed25519 AAAA") spec = CertSpec( actor_name="agt-test", actor_type=ActorType.AGT, pubkey_path=pubkey, ttl_hours=24, principals=["agt-test"], ) ca = LocalCA(tmp_path / "nonexistent_ca", tmp_path / "state") with pytest.raises(CAError, match="CA key not found"): ca.sign(spec) def test_local_ca_sign_ssh_keygen_failure(tmp_path): ca_key = tmp_path / "ca_key" ca_key.write_text("fake-ca") pubkey = tmp_path / "key.pub" pubkey.write_text("ssh-ed25519 AAAA") spec = CertSpec( actor_name="agt-test", actor_type=ActorType.AGT, pubkey_path=pubkey, ttl_hours=24, principals=["agt-test"], ) def fail_run(cmd, **kwargs): result = MagicMock() result.returncode = 1 result.stderr = "load key: invalid format" result.stdout = "" return result ca = LocalCA(ca_key, tmp_path / "state") with patch("warden.ca.subprocess.run", side_effect=fail_run): with pytest.raises(CAError, match="Signing failed"): ca.sign(spec)