generated from coulomb/repo-seed
- File permissions: os.chmod(cert, 0o600) after every sign in LocalCA and VaultCA; chmod(privkey, 0o600) and chmod(pubkey, 0o644) after generate_keypair - Scorecard: add check_file_permissions() that flags world/group-readable cert and key files; run_scorecard now returns 6 checks - warden status --state-dir: bypasses config loading entirely for operators who have a cert but no warden.yaml installed - tests/test_vault.py: 11 VaultCA unit tests covering success, HTTP 403, RequestError, missing token, missing role, missing pubkey, TTL enforcement, eviction, signatures log, and cert mode 600 - tests/test_ca.py: generate_keypair tests (paths, args, overwrite, error, permissions) and cert mode 600 assertion after sign - tests/test_scorecard.py: file_permissions check tests (pass, fail cert, fail keys dir); scorecard count updated to 6 - tests/test_cli.py: covers sign, issue, status, scorecard, inventory, log, cleanup commands using CliRunner and tmp config/inventory files - tests/test_integration.py: @pytest.mark.integration tests against real ssh-keygen; excluded from default suite via pyproject addopts - pyproject.toml: addopts = "-m 'not integration'", integration marker declared All 100 unit tests pass; 3 integration tests pass; ruff clean. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
396 lines
14 KiB
Python
396 lines
14 KiB
Python
"""Tests for warden CLI commands."""
|
|
import json
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
from typer.testing import CliRunner
|
|
|
|
from warden.cli import app
|
|
from warden.models import CertRecord
|
|
|
|
runner = CliRunner()
|
|
|
|
CERT_CONTENT = "ssh-ed25519-cert-v01@openssh.com AAAA_fake_cert_data"
|
|
|
|
|
|
def _write_config(tmp_path: Path, ca_key: Path, state_dir: Path, inventory_path: Path) -> Path:
|
|
cfg_path = tmp_path / "warden.yaml"
|
|
cfg_path.write_text(
|
|
f"backend: local\n"
|
|
f"ca_key: {ca_key}\n"
|
|
f"state_dir: {state_dir}\n"
|
|
f"inventory_path: {inventory_path}\n"
|
|
)
|
|
return cfg_path
|
|
|
|
|
|
def _write_inventory(path: Path, actors: list[dict]) -> None:
|
|
import yaml
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
actors_dict = {a["name"]: {k: v for k, v in a.items() if k != "name"} for a in actors}
|
|
path.write_text(yaml.dump({"actors": actors_dict}))
|
|
|
|
|
|
def _make_cert_record(tmp_path: Path, actor_name: str = "agt-test") -> CertRecord:
|
|
cert_path = tmp_path / f"{actor_name}-cert.pub"
|
|
cert_path.write_text(CERT_CONTENT)
|
|
return CertRecord(
|
|
identity=actor_name,
|
|
valid_before=datetime(2030, 1, 1, tzinfo=timezone.utc),
|
|
cert_path=cert_path,
|
|
signed_at=datetime(2026, 3, 28, 10, 0, 0, tzinfo=timezone.utc),
|
|
principals=["agt-task"],
|
|
actor_name=actor_name,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# warden sign
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_sign_exits_0_outputs_cert(tmp_path, monkeypatch):
|
|
state_dir = tmp_path / "state"
|
|
state_dir.mkdir()
|
|
ca_key = tmp_path / "ca_key"
|
|
ca_key.write_text("fake")
|
|
pubkey = tmp_path / "key.pub"
|
|
pubkey.write_text("ssh-ed25519 AAAA")
|
|
inv_path = tmp_path / "inventory.yaml"
|
|
_write_inventory(inv_path, [{"name": "agt-test", "type": "agt", "principals": ["agt-task"], "ttl_hours": 24}])
|
|
cfg = _write_config(tmp_path, ca_key, state_dir, inv_path)
|
|
monkeypatch.setenv("WARDEN_CONFIG", str(cfg))
|
|
|
|
record = _make_cert_record(state_dir)
|
|
with patch("warden.cli.LocalCA") as MockCA:
|
|
MockCA.return_value.sign.return_value = record
|
|
result = runner.invoke(app, ["sign", "agt-test", "--pubkey", str(pubkey)])
|
|
|
|
assert result.exit_code == 0
|
|
assert CERT_CONTENT in result.output
|
|
|
|
|
|
def test_sign_exits_1_unknown_actor(tmp_path, monkeypatch):
|
|
state_dir = tmp_path / "state"
|
|
ca_key = tmp_path / "ca_key"
|
|
ca_key.write_text("fake")
|
|
pubkey = tmp_path / "key.pub"
|
|
pubkey.write_text("ssh-ed25519 AAAA")
|
|
inv_path = tmp_path / "inventory.yaml"
|
|
_write_inventory(inv_path, [])
|
|
cfg = _write_config(tmp_path, ca_key, state_dir, inv_path)
|
|
monkeypatch.setenv("WARDEN_CONFIG", str(cfg))
|
|
|
|
result = runner.invoke(app, ["sign", "agt-unknown", "--pubkey", str(pubkey)])
|
|
assert result.exit_code == 1
|
|
|
|
|
|
def test_sign_exits_1_on_config_error(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("WARDEN_CONFIG", str(tmp_path / "nonexistent.yaml"))
|
|
result = runner.invoke(app, ["sign", "agt-test", "--pubkey", "/tmp/k.pub"])
|
|
assert result.exit_code == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# warden issue
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_issue_exits_1_on_vault_backend(tmp_path, monkeypatch):
|
|
state_dir = tmp_path / "state"
|
|
inv_path = tmp_path / "inventory.yaml"
|
|
_write_inventory(inv_path, [])
|
|
cfg_path = tmp_path / "warden.yaml"
|
|
cfg_path.write_text(
|
|
f"backend: vault\n"
|
|
f"state_dir: {state_dir}\n"
|
|
f"inventory_path: {inv_path}\n"
|
|
f"vault:\n addr: http://127.0.0.1:8200\n"
|
|
)
|
|
monkeypatch.setenv("WARDEN_CONFIG", str(cfg_path))
|
|
|
|
result = runner.invoke(app, ["issue", "agt-test"])
|
|
assert result.exit_code == 1
|
|
assert "local" in result.output
|
|
|
|
|
|
def test_issue_exits_0_local_backend(tmp_path, monkeypatch):
|
|
state_dir = tmp_path / "state"
|
|
state_dir.mkdir()
|
|
ca_key = tmp_path / "ca_key"
|
|
ca_key.write_text("fake")
|
|
inv_path = tmp_path / "inventory.yaml"
|
|
_write_inventory(inv_path, [{"name": "agt-test", "type": "agt", "principals": ["agt-task"], "ttl_hours": 24}])
|
|
cfg = _write_config(tmp_path, ca_key, state_dir, inv_path)
|
|
monkeypatch.setenv("WARDEN_CONFIG", str(cfg))
|
|
|
|
pubkey = state_dir / "keys" / "agt-test_ed25519.pub"
|
|
record = _make_cert_record(state_dir)
|
|
with patch("warden.cli.LocalCA") as MockCA:
|
|
MockCA.return_value.generate_keypair.return_value = (
|
|
state_dir / "keys" / "agt-test_ed25519",
|
|
pubkey,
|
|
)
|
|
MockCA.return_value.sign.return_value = record
|
|
result = runner.invoke(app, ["issue", "agt-test"])
|
|
|
|
assert result.exit_code == 0
|
|
assert "agt-test" in result.output
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# warden status
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_status_no_certs_empty_state_dir(tmp_path, monkeypatch):
|
|
state_dir = tmp_path / "state"
|
|
state_dir.mkdir()
|
|
ca_key = tmp_path / "ca_key"
|
|
ca_key.write_text("fake")
|
|
inv_path = tmp_path / "inventory.yaml"
|
|
_write_inventory(inv_path, [])
|
|
cfg = _write_config(tmp_path, ca_key, state_dir, inv_path)
|
|
monkeypatch.setenv("WARDEN_CONFIG", str(cfg))
|
|
|
|
result = runner.invoke(app, ["status"])
|
|
assert result.exit_code == 0
|
|
assert "No certificates" in result.output
|
|
|
|
|
|
def test_status_state_dir_override_no_config(tmp_path):
|
|
"""--state-dir bypasses config loading entirely."""
|
|
state_dir = tmp_path / "state"
|
|
state_dir.mkdir()
|
|
result = runner.invoke(app, ["status", "--state-dir", str(state_dir)])
|
|
assert result.exit_code == 0
|
|
assert "No certificates" in result.output
|
|
|
|
|
|
def test_status_exits_1_expired_cert(tmp_path, monkeypatch):
|
|
state_dir = tmp_path / "state"
|
|
state_dir.mkdir()
|
|
cert = state_dir / "agt-test-cert.pub"
|
|
cert.write_text(CERT_CONTENT)
|
|
|
|
meta = {
|
|
"identity": "agt-test",
|
|
"valid_before": datetime(2020, 1, 1, tzinfo=timezone.utc),
|
|
"principals": ["agt-task"],
|
|
}
|
|
with patch("warden.cli.parse_cert_metadata", return_value=meta):
|
|
result = runner.invoke(app, ["status", "--state-dir", str(state_dir)])
|
|
|
|
assert result.exit_code == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# warden scorecard
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_scorecard_exits_0_clean(tmp_path, monkeypatch):
|
|
state_dir = tmp_path / "state"
|
|
state_dir.mkdir()
|
|
ca_key = tmp_path / "ca_key"
|
|
ca_key.write_text("fake")
|
|
inv_path = tmp_path / "inventory.yaml"
|
|
_write_inventory(inv_path, [{"name": "agt-bridge", "type": "agt", "principals": ["agt-task"], "ttl_hours": 24}])
|
|
cfg = _write_config(tmp_path, ca_key, state_dir, inv_path)
|
|
monkeypatch.setenv("WARDEN_CONFIG", str(cfg))
|
|
|
|
result = runner.invoke(app, ["scorecard"])
|
|
assert result.exit_code == 0
|
|
|
|
|
|
def test_scorecard_exits_1_on_fail(tmp_path, monkeypatch):
|
|
state_dir = tmp_path / "state"
|
|
state_dir.mkdir()
|
|
ca_key = tmp_path / "ca_key"
|
|
ca_key.write_text("fake")
|
|
inv_path = tmp_path / "inventory.yaml"
|
|
# bad-name has wrong prefix for AGT type
|
|
_write_inventory(inv_path, [{"name": "bad-name", "type": "agt", "principals": ["x"], "ttl_hours": 24}])
|
|
cfg = _write_config(tmp_path, ca_key, state_dir, inv_path)
|
|
monkeypatch.setenv("WARDEN_CONFIG", str(cfg))
|
|
|
|
result = runner.invoke(app, ["scorecard"])
|
|
assert result.exit_code == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# warden inventory add / list / remove
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_inventory_round_trip(tmp_path, monkeypatch):
|
|
state_dir = tmp_path / "state"
|
|
ca_key = tmp_path / "ca_key"
|
|
ca_key.write_text("fake")
|
|
inv_path = tmp_path / "inventory.yaml"
|
|
_write_inventory(inv_path, [])
|
|
cfg = _write_config(tmp_path, ca_key, state_dir, inv_path)
|
|
monkeypatch.setenv("WARDEN_CONFIG", str(cfg))
|
|
|
|
add_result = runner.invoke(app, ["inventory", "add", "agt-test", "--type", "agt"])
|
|
assert add_result.exit_code == 0
|
|
|
|
list_result = runner.invoke(app, ["inventory", "list"])
|
|
assert list_result.exit_code == 0
|
|
assert "agt-test" in list_result.output
|
|
|
|
remove_result = runner.invoke(app, ["inventory", "remove", "agt-test"])
|
|
assert remove_result.exit_code == 0
|
|
|
|
list_result2 = runner.invoke(app, ["inventory", "list"])
|
|
assert "agt-test" not in list_result2.output
|
|
|
|
|
|
def test_inventory_list_json(tmp_path, monkeypatch):
|
|
state_dir = tmp_path / "state"
|
|
ca_key = tmp_path / "ca_key"
|
|
ca_key.write_text("fake")
|
|
inv_path = tmp_path / "inventory.yaml"
|
|
_write_inventory(inv_path, [{"name": "agt-bridge", "type": "agt", "principals": ["agt-task"], "ttl_hours": 24}])
|
|
cfg = _write_config(tmp_path, ca_key, state_dir, inv_path)
|
|
monkeypatch.setenv("WARDEN_CONFIG", str(cfg))
|
|
|
|
result = runner.invoke(app, ["inventory", "list", "--json"])
|
|
assert result.exit_code == 0
|
|
data = json.loads(result.output)
|
|
assert "agt-bridge" in data
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# warden log
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_log_exits_0_no_log(tmp_path, monkeypatch):
|
|
state_dir = tmp_path / "state"
|
|
state_dir.mkdir()
|
|
ca_key = tmp_path / "ca_key"
|
|
ca_key.write_text("fake")
|
|
inv_path = tmp_path / "inventory.yaml"
|
|
_write_inventory(inv_path, [])
|
|
cfg = _write_config(tmp_path, ca_key, state_dir, inv_path)
|
|
monkeypatch.setenv("WARDEN_CONFIG", str(cfg))
|
|
|
|
result = runner.invoke(app, ["log"])
|
|
assert result.exit_code == 0
|
|
|
|
|
|
def test_log_json_parseable(tmp_path, monkeypatch):
|
|
state_dir = tmp_path / "state"
|
|
state_dir.mkdir()
|
|
log_path = state_dir / "signatures.log"
|
|
entry = {"timestamp": "2026-03-28T10:00:00+00:00", "actor": "agt-test",
|
|
"actor_type": "agt", "identity": "agt-test", "principals": ["agt-task"],
|
|
"ttl_hours": 24, "valid_before": "2026-03-29T10:00:00+00:00",
|
|
"cert_path": "/tmp/cert.pub", "backend": "local"}
|
|
log_path.write_text(json.dumps(entry) + "\n")
|
|
|
|
ca_key = tmp_path / "ca_key"
|
|
ca_key.write_text("fake")
|
|
inv_path = tmp_path / "inventory.yaml"
|
|
_write_inventory(inv_path, [])
|
|
cfg = _write_config(tmp_path, ca_key, state_dir, inv_path)
|
|
monkeypatch.setenv("WARDEN_CONFIG", str(cfg))
|
|
|
|
result = runner.invoke(app, ["log", "--json"])
|
|
assert result.exit_code == 0
|
|
data = json.loads(result.output)
|
|
assert isinstance(data, list)
|
|
assert data[0]["actor"] == "agt-test"
|
|
|
|
|
|
def test_log_filters_by_actor(tmp_path, monkeypatch):
|
|
state_dir = tmp_path / "state"
|
|
state_dir.mkdir()
|
|
log_path = state_dir / "signatures.log"
|
|
for actor in ["agt-alpha", "agt-beta"]:
|
|
entry = {"timestamp": "2026-03-28T10:00:00+00:00", "actor": actor,
|
|
"actor_type": "agt", "identity": actor, "principals": [actor],
|
|
"ttl_hours": 24, "valid_before": "2026-03-29T10:00:00+00:00",
|
|
"cert_path": "/tmp/cert.pub", "backend": "local"}
|
|
log_path.open("a").write(json.dumps(entry) + "\n")
|
|
|
|
ca_key = tmp_path / "ca_key"
|
|
ca_key.write_text("fake")
|
|
inv_path = tmp_path / "inventory.yaml"
|
|
_write_inventory(inv_path, [])
|
|
cfg = _write_config(tmp_path, ca_key, state_dir, inv_path)
|
|
monkeypatch.setenv("WARDEN_CONFIG", str(cfg))
|
|
|
|
result = runner.invoke(app, ["log", "agt-alpha", "--json"])
|
|
assert result.exit_code == 0
|
|
data = json.loads(result.output)
|
|
assert len(data) == 1
|
|
assert data[0]["actor"] == "agt-alpha"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# warden cleanup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_cleanup_dry_run_no_delete(tmp_path, monkeypatch):
|
|
state_dir = tmp_path / "state"
|
|
state_dir.mkdir()
|
|
cert = state_dir / "agt-test-cert.pub"
|
|
cert.write_text(CERT_CONTENT)
|
|
|
|
ca_key = tmp_path / "ca_key"
|
|
ca_key.write_text("fake")
|
|
inv_path = tmp_path / "inventory.yaml"
|
|
_write_inventory(inv_path, [])
|
|
cfg = _write_config(tmp_path, ca_key, state_dir, inv_path)
|
|
monkeypatch.setenv("WARDEN_CONFIG", str(cfg))
|
|
|
|
meta = {
|
|
"identity": "agt-test",
|
|
"valid_before": datetime(2020, 1, 1, tzinfo=timezone.utc),
|
|
"principals": ["agt-task"],
|
|
}
|
|
with patch("warden.cli.parse_cert_metadata", return_value=meta):
|
|
result = runner.invoke(app, ["cleanup", "--dry-run"])
|
|
|
|
assert result.exit_code == 0
|
|
assert cert.exists() # not deleted
|
|
assert "would remove" in result.output
|
|
|
|
|
|
def test_cleanup_removes_stale_cert(tmp_path, monkeypatch):
|
|
state_dir = tmp_path / "state"
|
|
state_dir.mkdir()
|
|
cert = state_dir / "agt-test-cert.pub"
|
|
cert.write_text(CERT_CONTENT)
|
|
|
|
ca_key = tmp_path / "ca_key"
|
|
ca_key.write_text("fake")
|
|
inv_path = tmp_path / "inventory.yaml"
|
|
_write_inventory(inv_path, [])
|
|
cfg = _write_config(tmp_path, ca_key, state_dir, inv_path)
|
|
monkeypatch.setenv("WARDEN_CONFIG", str(cfg))
|
|
|
|
meta = {
|
|
"identity": "agt-test",
|
|
"valid_before": datetime(2020, 1, 1, tzinfo=timezone.utc),
|
|
"principals": ["agt-task"],
|
|
}
|
|
with patch("warden.cli.parse_cert_metadata", return_value=meta):
|
|
result = runner.invoke(app, ["cleanup"])
|
|
|
|
assert result.exit_code == 0
|
|
assert not cert.exists()
|
|
|
|
|
|
def test_cleanup_exits_0_nothing_to_clean(tmp_path, monkeypatch):
|
|
state_dir = tmp_path / "state"
|
|
state_dir.mkdir()
|
|
|
|
ca_key = tmp_path / "ca_key"
|
|
ca_key.write_text("fake")
|
|
inv_path = tmp_path / "inventory.yaml"
|
|
_write_inventory(inv_path, [])
|
|
cfg = _write_config(tmp_path, ca_key, state_dir, inv_path)
|
|
monkeypatch.setenv("WARDEN_CONFIG", str(cfg))
|
|
|
|
result = runner.invoke(app, ["cleanup"])
|
|
assert result.exit_code == 0
|
|
assert "No stale" in result.output
|