"""Tests for warden CLI commands.""" import json from datetime import datetime, timezone from pathlib import Path from unittest.mock import patch from typer.testing import CliRunner from warden.cli import app from warden.models import CertRecord runner = CliRunner() CERT_CONTENT = "ssh-ed25519-cert-v01@openssh.com AAAA_fake_cert_data" def _write_config(tmp_path: Path, ca_key: Path, state_dir: Path, inventory_path: Path) -> Path: cfg_path = tmp_path / "warden.yaml" cfg_path.write_text( f"backend: local\n" f"ca_key: {ca_key}\n" f"state_dir: {state_dir}\n" f"inventory_path: {inventory_path}\n" ) return cfg_path def _write_inventory(path: Path, actors: list[dict]) -> None: import yaml path.parent.mkdir(parents=True, exist_ok=True) actors_dict = {a["name"]: {k: v for k, v in a.items() if k != "name"} for a in actors} path.write_text(yaml.dump({"actors": actors_dict})) def _make_cert_record(tmp_path: Path, actor_name: str = "agt-test") -> CertRecord: cert_path = tmp_path / f"{actor_name}-cert.pub" cert_path.write_text(CERT_CONTENT) return CertRecord( identity=actor_name, valid_before=datetime(2030, 1, 1, tzinfo=timezone.utc), cert_path=cert_path, signed_at=datetime(2026, 3, 28, 10, 0, 0, tzinfo=timezone.utc), principals=["agt-task"], actor_name=actor_name, ) # --------------------------------------------------------------------------- # warden sign # --------------------------------------------------------------------------- def test_sign_exits_0_outputs_cert(tmp_path, monkeypatch): state_dir = tmp_path / "state" state_dir.mkdir() ca_key = tmp_path / "ca_key" ca_key.write_text("fake") pubkey = tmp_path / "key.pub" pubkey.write_text("ssh-ed25519 AAAA") inv_path = tmp_path / "inventory.yaml" _write_inventory(inv_path, [{"name": "agt-test", "type": "agt", "principals": ["agt-task"], "ttl_hours": 24}]) cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) record = _make_cert_record(state_dir) with patch("warden.cli.LocalCA") as MockCA: MockCA.return_value.sign.return_value = record result = runner.invoke(app, ["sign", "agt-test", "--pubkey", str(pubkey)]) assert result.exit_code == 0 assert CERT_CONTENT in result.output def test_sign_exits_1_unknown_actor(tmp_path, monkeypatch): state_dir = tmp_path / "state" ca_key = tmp_path / "ca_key" ca_key.write_text("fake") pubkey = tmp_path / "key.pub" pubkey.write_text("ssh-ed25519 AAAA") inv_path = tmp_path / "inventory.yaml" _write_inventory(inv_path, []) cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) result = runner.invoke(app, ["sign", "agt-unknown", "--pubkey", str(pubkey)]) assert result.exit_code == 1 def test_sign_exits_1_on_config_error(tmp_path, monkeypatch): monkeypatch.setenv("WARDEN_CONFIG", str(tmp_path / "nonexistent.yaml")) result = runner.invoke(app, ["sign", "agt-test", "--pubkey", "/tmp/k.pub"]) assert result.exit_code == 1 # --------------------------------------------------------------------------- # warden issue # --------------------------------------------------------------------------- def test_issue_exits_1_on_vault_backend(tmp_path, monkeypatch): state_dir = tmp_path / "state" inv_path = tmp_path / "inventory.yaml" _write_inventory(inv_path, []) cfg_path = tmp_path / "warden.yaml" cfg_path.write_text( f"backend: vault\n" f"state_dir: {state_dir}\n" f"inventory_path: {inv_path}\n" f"vault:\n addr: http://127.0.0.1:8200\n" ) monkeypatch.setenv("WARDEN_CONFIG", str(cfg_path)) result = runner.invoke(app, ["issue", "agt-test"]) assert result.exit_code == 1 assert "local" in result.output def test_issue_exits_0_local_backend(tmp_path, monkeypatch): state_dir = tmp_path / "state" state_dir.mkdir() ca_key = tmp_path / "ca_key" ca_key.write_text("fake") inv_path = tmp_path / "inventory.yaml" _write_inventory(inv_path, [{"name": "agt-test", "type": "agt", "principals": ["agt-task"], "ttl_hours": 24}]) cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) pubkey = state_dir / "keys" / "agt-test_ed25519.pub" record = _make_cert_record(state_dir) with patch("warden.cli.LocalCA") as MockCA: MockCA.return_value.generate_keypair.return_value = ( state_dir / "keys" / "agt-test_ed25519", pubkey, ) MockCA.return_value.sign.return_value = record result = runner.invoke(app, ["issue", "agt-test"]) assert result.exit_code == 0 assert "agt-test" in result.output # --------------------------------------------------------------------------- # warden status # --------------------------------------------------------------------------- def test_status_no_certs_empty_state_dir(tmp_path, monkeypatch): state_dir = tmp_path / "state" state_dir.mkdir() ca_key = tmp_path / "ca_key" ca_key.write_text("fake") inv_path = tmp_path / "inventory.yaml" _write_inventory(inv_path, []) cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) result = runner.invoke(app, ["status"]) assert result.exit_code == 0 assert "No certificates" in result.output def test_status_state_dir_override_no_config(tmp_path): """--state-dir bypasses config loading entirely.""" state_dir = tmp_path / "state" state_dir.mkdir() result = runner.invoke(app, ["status", "--state-dir", str(state_dir)]) assert result.exit_code == 0 assert "No certificates" in result.output def test_status_exits_1_expired_cert(tmp_path, monkeypatch): state_dir = tmp_path / "state" state_dir.mkdir() cert = state_dir / "agt-test-cert.pub" cert.write_text(CERT_CONTENT) meta = { "identity": "agt-test", "valid_before": datetime(2020, 1, 1, tzinfo=timezone.utc), "principals": ["agt-task"], } with patch("warden.cli.parse_cert_metadata", return_value=meta): result = runner.invoke(app, ["status", "--state-dir", str(state_dir)]) assert result.exit_code == 1 # --------------------------------------------------------------------------- # warden scorecard # --------------------------------------------------------------------------- def test_scorecard_exits_0_clean(tmp_path, monkeypatch): state_dir = tmp_path / "state" state_dir.mkdir() ca_key = tmp_path / "ca_key" ca_key.write_text("fake") inv_path = tmp_path / "inventory.yaml" _write_inventory(inv_path, [{"name": "agt-bridge", "type": "agt", "principals": ["agt-task"], "ttl_hours": 24}]) cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) result = runner.invoke(app, ["scorecard"]) assert result.exit_code == 0 def test_scorecard_exits_1_on_fail(tmp_path, monkeypatch): state_dir = tmp_path / "state" state_dir.mkdir() ca_key = tmp_path / "ca_key" ca_key.write_text("fake") inv_path = tmp_path / "inventory.yaml" # bad-name has wrong prefix for AGT type _write_inventory(inv_path, [{"name": "bad-name", "type": "agt", "principals": ["x"], "ttl_hours": 24}]) cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) result = runner.invoke(app, ["scorecard"]) assert result.exit_code == 1 # --------------------------------------------------------------------------- # warden inventory add / list / remove # --------------------------------------------------------------------------- def test_inventory_round_trip(tmp_path, monkeypatch): state_dir = tmp_path / "state" ca_key = tmp_path / "ca_key" ca_key.write_text("fake") inv_path = tmp_path / "inventory.yaml" _write_inventory(inv_path, []) cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) add_result = runner.invoke(app, ["inventory", "add", "agt-test", "--type", "agt"]) assert add_result.exit_code == 0 list_result = runner.invoke(app, ["inventory", "list"]) assert list_result.exit_code == 0 assert "agt-test" in list_result.output remove_result = runner.invoke(app, ["inventory", "remove", "agt-test"]) assert remove_result.exit_code == 0 list_result2 = runner.invoke(app, ["inventory", "list"]) assert "agt-test" not in list_result2.output def test_inventory_list_json(tmp_path, monkeypatch): state_dir = tmp_path / "state" ca_key = tmp_path / "ca_key" ca_key.write_text("fake") inv_path = tmp_path / "inventory.yaml" _write_inventory(inv_path, [{"name": "agt-bridge", "type": "agt", "principals": ["agt-task"], "ttl_hours": 24}]) cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) result = runner.invoke(app, ["inventory", "list", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert "agt-bridge" in data # --------------------------------------------------------------------------- # warden log # --------------------------------------------------------------------------- def test_log_exits_0_no_log(tmp_path, monkeypatch): state_dir = tmp_path / "state" state_dir.mkdir() ca_key = tmp_path / "ca_key" ca_key.write_text("fake") inv_path = tmp_path / "inventory.yaml" _write_inventory(inv_path, []) cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) result = runner.invoke(app, ["log"]) assert result.exit_code == 0 def test_log_json_parseable(tmp_path, monkeypatch): state_dir = tmp_path / "state" state_dir.mkdir() log_path = state_dir / "signatures.log" entry = {"timestamp": "2026-03-28T10:00:00+00:00", "actor": "agt-test", "actor_type": "agt", "identity": "agt-test", "principals": ["agt-task"], "ttl_hours": 24, "valid_before": "2026-03-29T10:00:00+00:00", "cert_path": "/tmp/cert.pub", "backend": "local"} log_path.write_text(json.dumps(entry) + "\n") ca_key = tmp_path / "ca_key" ca_key.write_text("fake") inv_path = tmp_path / "inventory.yaml" _write_inventory(inv_path, []) cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) result = runner.invoke(app, ["log", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert isinstance(data, list) assert data[0]["actor"] == "agt-test" def test_log_filters_by_actor(tmp_path, monkeypatch): state_dir = tmp_path / "state" state_dir.mkdir() log_path = state_dir / "signatures.log" for actor in ["agt-alpha", "agt-beta"]: entry = {"timestamp": "2026-03-28T10:00:00+00:00", "actor": actor, "actor_type": "agt", "identity": actor, "principals": [actor], "ttl_hours": 24, "valid_before": "2026-03-29T10:00:00+00:00", "cert_path": "/tmp/cert.pub", "backend": "local"} log_path.open("a").write(json.dumps(entry) + "\n") ca_key = tmp_path / "ca_key" ca_key.write_text("fake") inv_path = tmp_path / "inventory.yaml" _write_inventory(inv_path, []) cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) result = runner.invoke(app, ["log", "agt-alpha", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert len(data) == 1 assert data[0]["actor"] == "agt-alpha" # --------------------------------------------------------------------------- # warden cleanup # --------------------------------------------------------------------------- def test_cleanup_dry_run_no_delete(tmp_path, monkeypatch): state_dir = tmp_path / "state" state_dir.mkdir() cert = state_dir / "agt-test-cert.pub" cert.write_text(CERT_CONTENT) ca_key = tmp_path / "ca_key" ca_key.write_text("fake") inv_path = tmp_path / "inventory.yaml" _write_inventory(inv_path, []) cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) meta = { "identity": "agt-test", "valid_before": datetime(2020, 1, 1, tzinfo=timezone.utc), "principals": ["agt-task"], } with patch("warden.cli.parse_cert_metadata", return_value=meta): result = runner.invoke(app, ["cleanup", "--dry-run"]) assert result.exit_code == 0 assert cert.exists() # not deleted assert "would remove" in result.output def test_cleanup_removes_stale_cert(tmp_path, monkeypatch): state_dir = tmp_path / "state" state_dir.mkdir() cert = state_dir / "agt-test-cert.pub" cert.write_text(CERT_CONTENT) ca_key = tmp_path / "ca_key" ca_key.write_text("fake") inv_path = tmp_path / "inventory.yaml" _write_inventory(inv_path, []) cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) meta = { "identity": "agt-test", "valid_before": datetime(2020, 1, 1, tzinfo=timezone.utc), "principals": ["agt-task"], } with patch("warden.cli.parse_cert_metadata", return_value=meta): result = runner.invoke(app, ["cleanup"]) assert result.exit_code == 0 assert not cert.exists() def test_cleanup_exits_0_nothing_to_clean(tmp_path, monkeypatch): state_dir = tmp_path / "state" state_dir.mkdir() ca_key = tmp_path / "ca_key" ca_key.write_text("fake") inv_path = tmp_path / "inventory.yaml" _write_inventory(inv_path, []) cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) result = runner.invoke(app, ["cleanup"]) assert result.exit_code == 0 assert "No stale" in result.output