diff --git a/pyproject.toml b/pyproject.toml index 9f84eff..b10d037 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,8 @@ packages = ["src/warden"] [tool.pytest.ini_options] testpaths = ["tests"] pythonpath = ["src"] +addopts = "-m 'not integration'" +markers = ["integration: requires ssh-keygen binary; run with pytest -m integration"] [tool.ruff] line-length = 88 diff --git a/src/warden/ca.py b/src/warden/ca.py index a7507bb..4cdc307 100644 --- a/src/warden/ca.py +++ b/src/warden/ca.py @@ -175,6 +175,7 @@ class LocalCA(CABackend): _evict_cert(spec.actor_name, self._state_dir) dest = self._state_dir / f"{spec.actor_name}-cert.pub" shutil.copy2(cert_path_tmp, dest) + os.chmod(dest, 0o600) record = CertRecord( identity=meta["identity"] or spec.identity, @@ -211,4 +212,6 @@ class LocalCA(CABackend): if result.returncode != 0: raise CAError(f"Key generation failed: {result.stderr.strip()}") + os.chmod(privkey, 0o600) + os.chmod(pubkey, 0o644) return privkey, pubkey diff --git a/src/warden/cli.py b/src/warden/cli.py index 9874b5a..702e21e 100644 --- a/src/warden/cli.py +++ b/src/warden/cli.py @@ -173,16 +173,22 @@ def issue( def status( actor_name: Annotated[Optional[str], typer.Argument(help="Actor name (omit for all)")] = None, output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False, + state_dir_override: Annotated[Optional[Path], typer.Option("--state-dir", help="State dir path (bypasses config)")] = None, ) -> None: """Show certificate status. Exits 1 if any cert is expired.""" - cfg = _load_cfg() now = datetime.now(timezone.utc) + if state_dir_override is not None: + state_dir = state_dir_override + else: + cfg = _load_cfg() + state_dir = cfg.state_dir + if actor_name: - cert_path = cfg.state_dir / f"{actor_name}-cert.pub" + cert_path = state_dir / f"{actor_name}-cert.pub" paths = [cert_path] if cert_path.exists() else [] else: - paths = sorted(cfg.state_dir.glob("*-cert.pub")) if cfg.state_dir.exists() else [] + paths = sorted(state_dir.glob("*-cert.pub")) if state_dir.exists() else [] if not paths: msg = ( diff --git a/src/warden/scorecard.py b/src/warden/scorecard.py index 69543b4..e479580 100644 --- a/src/warden/scorecard.py +++ b/src/warden/scorecard.py @@ -126,6 +126,32 @@ def check_ttl_policy(state_dir: Path, inventory: PrincipalsInventory) -> CheckRe ) +def check_file_permissions(state_dir: Path) -> CheckResult: + """Cert files and keys must not be group- or world-readable (mode 600/644).""" + if not state_dir.exists(): + return CheckResult("file_permissions", passed=True, detail="no state dir") + + bad = [] + for cert_path in state_dir.glob("*-cert.pub"): + if cert_path.stat().st_mode & 0o044: + bad.append(cert_path.name) + + keys_dir = state_dir / "keys" + if keys_dir.exists(): + for key_path in keys_dir.iterdir(): + if key_path.stat().st_mode & 0o044: + bad.append(f"keys/{key_path.name}") + + return CheckResult( + name="file_permissions", + passed=len(bad) == 0, + detail=( + f"world/group readable files: {bad} — run 'chmod 600'" if bad + else "all cert/key files have restricted permissions" + ), + ) + + def run_scorecard(state_dir: Path, inventory: PrincipalsInventory) -> List[CheckResult]: """Run all cert-side scorecard checks. Returns list of CheckResult.""" return [ @@ -134,4 +160,5 @@ def run_scorecard(state_dir: Path, inventory: PrincipalsInventory) -> List[Check check_no_expired_certs(state_dir), check_no_stale_certs(state_dir), check_ttl_policy(state_dir, inventory), + check_file_permissions(state_dir), ] diff --git a/src/warden/vault.py b/src/warden/vault.py index 9029398..04e0d73 100644 --- a/src/warden/vault.py +++ b/src/warden/vault.py @@ -76,6 +76,7 @@ class VaultCA(CABackend): _evict_cert(spec.actor_name, self._state_dir) dest = self._state_dir / f"{spec.actor_name}-cert.pub" dest.write_text(cert_text + "\n") + os.chmod(dest, 0o600) # Parse metadata by writing to a tempfile and running ssh-keygen -L with tempfile.NamedTemporaryFile( diff --git a/tests/test_ca.py b/tests/test_ca.py index 9934678..70bb249 100644 --- a/tests/test_ca.py +++ b/tests/test_ca.py @@ -335,6 +335,28 @@ def test_local_ca_sign_evicts_existing_cert(tmp_path): assert len(list(state.glob("agt-state-hub-bridge-cert.pub"))) == 1 +def test_local_ca_sign_cert_mode_600(tmp_path): + ca_key = tmp_path / "ca_key" + ca_key.write_text("fake-ca") + pubkey = tmp_path / "key.pub" + pubkey.write_text("ssh-ed25519 AAAA actor-key") + state = tmp_path / "state" + + spec = CertSpec( + actor_name="agt-state-hub-bridge", + actor_type=ActorType.AGT, + pubkey_path=pubkey, + ttl_hours=24, + principals=["agt-task-bridge"], + identity="agt-state-hub-bridge", + ) + with patch("warden.ca.subprocess.run", side_effect=_mock_run_factory(CERT_CONTENT)): + ca = LocalCA(ca_key, state) + record = ca.sign(spec) + + assert oct(record.cert_path.stat().st_mode & 0o777) == oct(0o600) + + def test_local_ca_sign_writes_signature_log(tmp_path): ca_key = tmp_path / "ca_key" ca_key.write_text("fake-ca") @@ -360,3 +382,102 @@ def test_local_ca_sign_writes_signature_log(tmp_path): assert entry["actor"] == "agt-state-hub-bridge" assert entry["backend"] == "local" assert entry["ttl_hours"] == 24 + + +# --------------------------------------------------------------------------- +# LocalCA.generate_keypair +# --------------------------------------------------------------------------- + +def _mock_keygen_gen(cmd, **kwargs): + """Mock for generate_keypair: writes privkey and pubkey based on -f arg.""" + result = MagicMock() + result.returncode = 0 + result.stdout = "" + result.stderr = "" + if "-f" in cmd: + idx = cmd.index("-f") + privkey = Path(cmd[idx + 1]) + privkey.parent.mkdir(parents=True, exist_ok=True) + privkey.write_text("fake private key") + (privkey.parent / (privkey.name + ".pub")).write_text("ssh-ed25519 AAAA pubkey") + return result + + +def test_generate_keypair_returns_paths(tmp_path): + ca_key = tmp_path / "ca_key" + ca_key.write_text("fake-ca") + ca = LocalCA(ca_key, tmp_path / "state") + + with patch("warden.ca.subprocess.run", side_effect=_mock_keygen_gen): + privkey, pubkey = ca.generate_keypair("agt-test") + + assert privkey.name == "agt-test_ed25519" + assert pubkey.name == "agt-test_ed25519.pub" + assert str(privkey).endswith("state/keys/agt-test_ed25519") + + +def test_generate_keypair_ed25519_no_passphrase(tmp_path): + ca_key = tmp_path / "ca_key" + ca_key.write_text("fake-ca") + ca = LocalCA(ca_key, tmp_path / "state") + + calls = [] + + def capturing_mock(cmd, **kwargs): + calls.append(cmd) + return _mock_keygen_gen(cmd, **kwargs) + + with patch("warden.ca.subprocess.run", side_effect=capturing_mock): + ca.generate_keypair("agt-test") + + assert len(calls) == 1 + cmd = calls[0] + assert "-t" in cmd and cmd[cmd.index("-t") + 1] == "ed25519" + assert "-N" in cmd and cmd[cmd.index("-N") + 1] == "" + assert "-C" in cmd and cmd[cmd.index("-C") + 1] == "agt-test" + + +def test_generate_keypair_overwrites_existing(tmp_path): + ca_key = tmp_path / "ca_key" + ca_key.write_text("fake-ca") + state = tmp_path / "state" + keys_dir = state / "keys" + keys_dir.mkdir(parents=True) + old_priv = keys_dir / "agt-test_ed25519" + old_pub = keys_dir / "agt-test_ed25519.pub" + old_priv.write_text("old key") + old_pub.write_text("old pubkey") + + ca = LocalCA(ca_key, state) + with patch("warden.ca.subprocess.run", side_effect=_mock_keygen_gen): + privkey, pubkey = ca.generate_keypair("agt-test") + + assert privkey.read_text() == "fake private key" + + +def test_generate_keypair_ca_error_on_failure(tmp_path): + ca_key = tmp_path / "ca_key" + ca_key.write_text("fake-ca") + ca = LocalCA(ca_key, tmp_path / "state") + + def fail_run(cmd, **kwargs): + result = MagicMock() + result.returncode = 1 + result.stderr = "failed to generate key" + return result + + with patch("warden.ca.subprocess.run", side_effect=fail_run): + with pytest.raises(CAError, match="Key generation failed"): + ca.generate_keypair("agt-test") + + +def test_generate_keypair_sets_permissions(tmp_path): + ca_key = tmp_path / "ca_key" + ca_key.write_text("fake-ca") + ca = LocalCA(ca_key, tmp_path / "state") + + with patch("warden.ca.subprocess.run", side_effect=_mock_keygen_gen): + privkey, pubkey = ca.generate_keypair("agt-test") + + assert oct(privkey.stat().st_mode & 0o777) == oct(0o600) + assert oct(pubkey.stat().st_mode & 0o777) == oct(0o644) diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..36e7971 --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,395 @@ +"""Tests for warden CLI commands.""" +import json +from datetime import datetime, timezone +from pathlib import Path +from unittest.mock import patch + +from typer.testing import CliRunner + +from warden.cli import app +from warden.models import CertRecord + +runner = CliRunner() + +CERT_CONTENT = "ssh-ed25519-cert-v01@openssh.com AAAA_fake_cert_data" + + +def _write_config(tmp_path: Path, ca_key: Path, state_dir: Path, inventory_path: Path) -> Path: + cfg_path = tmp_path / "warden.yaml" + cfg_path.write_text( + f"backend: local\n" + f"ca_key: {ca_key}\n" + f"state_dir: {state_dir}\n" + f"inventory_path: {inventory_path}\n" + ) + return cfg_path + + +def _write_inventory(path: Path, actors: list[dict]) -> None: + import yaml + path.parent.mkdir(parents=True, exist_ok=True) + actors_dict = {a["name"]: {k: v for k, v in a.items() if k != "name"} for a in actors} + path.write_text(yaml.dump({"actors": actors_dict})) + + +def _make_cert_record(tmp_path: Path, actor_name: str = "agt-test") -> CertRecord: + cert_path = tmp_path / f"{actor_name}-cert.pub" + cert_path.write_text(CERT_CONTENT) + return CertRecord( + identity=actor_name, + valid_before=datetime(2030, 1, 1, tzinfo=timezone.utc), + cert_path=cert_path, + signed_at=datetime(2026, 3, 28, 10, 0, 0, tzinfo=timezone.utc), + principals=["agt-task"], + actor_name=actor_name, + ) + + +# --------------------------------------------------------------------------- +# warden sign +# --------------------------------------------------------------------------- + +def test_sign_exits_0_outputs_cert(tmp_path, monkeypatch): + state_dir = tmp_path / "state" + state_dir.mkdir() + ca_key = tmp_path / "ca_key" + ca_key.write_text("fake") + pubkey = tmp_path / "key.pub" + pubkey.write_text("ssh-ed25519 AAAA") + inv_path = tmp_path / "inventory.yaml" + _write_inventory(inv_path, [{"name": "agt-test", "type": "agt", "principals": ["agt-task"], "ttl_hours": 24}]) + cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) + monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) + + record = _make_cert_record(state_dir) + with patch("warden.cli.LocalCA") as MockCA: + MockCA.return_value.sign.return_value = record + result = runner.invoke(app, ["sign", "agt-test", "--pubkey", str(pubkey)]) + + assert result.exit_code == 0 + assert CERT_CONTENT in result.output + + +def test_sign_exits_1_unknown_actor(tmp_path, monkeypatch): + state_dir = tmp_path / "state" + ca_key = tmp_path / "ca_key" + ca_key.write_text("fake") + pubkey = tmp_path / "key.pub" + pubkey.write_text("ssh-ed25519 AAAA") + inv_path = tmp_path / "inventory.yaml" + _write_inventory(inv_path, []) + cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) + monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) + + result = runner.invoke(app, ["sign", "agt-unknown", "--pubkey", str(pubkey)]) + assert result.exit_code == 1 + + +def test_sign_exits_1_on_config_error(tmp_path, monkeypatch): + monkeypatch.setenv("WARDEN_CONFIG", str(tmp_path / "nonexistent.yaml")) + result = runner.invoke(app, ["sign", "agt-test", "--pubkey", "/tmp/k.pub"]) + assert result.exit_code == 1 + + +# --------------------------------------------------------------------------- +# warden issue +# --------------------------------------------------------------------------- + +def test_issue_exits_1_on_vault_backend(tmp_path, monkeypatch): + state_dir = tmp_path / "state" + inv_path = tmp_path / "inventory.yaml" + _write_inventory(inv_path, []) + cfg_path = tmp_path / "warden.yaml" + cfg_path.write_text( + f"backend: vault\n" + f"state_dir: {state_dir}\n" + f"inventory_path: {inv_path}\n" + f"vault:\n addr: http://127.0.0.1:8200\n" + ) + monkeypatch.setenv("WARDEN_CONFIG", str(cfg_path)) + + result = runner.invoke(app, ["issue", "agt-test"]) + assert result.exit_code == 1 + assert "local" in result.output + + +def test_issue_exits_0_local_backend(tmp_path, monkeypatch): + state_dir = tmp_path / "state" + state_dir.mkdir() + ca_key = tmp_path / "ca_key" + ca_key.write_text("fake") + inv_path = tmp_path / "inventory.yaml" + _write_inventory(inv_path, [{"name": "agt-test", "type": "agt", "principals": ["agt-task"], "ttl_hours": 24}]) + cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) + monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) + + pubkey = state_dir / "keys" / "agt-test_ed25519.pub" + record = _make_cert_record(state_dir) + with patch("warden.cli.LocalCA") as MockCA: + MockCA.return_value.generate_keypair.return_value = ( + state_dir / "keys" / "agt-test_ed25519", + pubkey, + ) + MockCA.return_value.sign.return_value = record + result = runner.invoke(app, ["issue", "agt-test"]) + + assert result.exit_code == 0 + assert "agt-test" in result.output + + +# --------------------------------------------------------------------------- +# warden status +# --------------------------------------------------------------------------- + +def test_status_no_certs_empty_state_dir(tmp_path, monkeypatch): + state_dir = tmp_path / "state" + state_dir.mkdir() + ca_key = tmp_path / "ca_key" + ca_key.write_text("fake") + inv_path = tmp_path / "inventory.yaml" + _write_inventory(inv_path, []) + cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) + monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) + + result = runner.invoke(app, ["status"]) + assert result.exit_code == 0 + assert "No certificates" in result.output + + +def test_status_state_dir_override_no_config(tmp_path): + """--state-dir bypasses config loading entirely.""" + state_dir = tmp_path / "state" + state_dir.mkdir() + result = runner.invoke(app, ["status", "--state-dir", str(state_dir)]) + assert result.exit_code == 0 + assert "No certificates" in result.output + + +def test_status_exits_1_expired_cert(tmp_path, monkeypatch): + state_dir = tmp_path / "state" + state_dir.mkdir() + cert = state_dir / "agt-test-cert.pub" + cert.write_text(CERT_CONTENT) + + meta = { + "identity": "agt-test", + "valid_before": datetime(2020, 1, 1, tzinfo=timezone.utc), + "principals": ["agt-task"], + } + with patch("warden.cli.parse_cert_metadata", return_value=meta): + result = runner.invoke(app, ["status", "--state-dir", str(state_dir)]) + + assert result.exit_code == 1 + + +# --------------------------------------------------------------------------- +# warden scorecard +# --------------------------------------------------------------------------- + +def test_scorecard_exits_0_clean(tmp_path, monkeypatch): + state_dir = tmp_path / "state" + state_dir.mkdir() + ca_key = tmp_path / "ca_key" + ca_key.write_text("fake") + inv_path = tmp_path / "inventory.yaml" + _write_inventory(inv_path, [{"name": "agt-bridge", "type": "agt", "principals": ["agt-task"], "ttl_hours": 24}]) + cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) + monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) + + result = runner.invoke(app, ["scorecard"]) + assert result.exit_code == 0 + + +def test_scorecard_exits_1_on_fail(tmp_path, monkeypatch): + state_dir = tmp_path / "state" + state_dir.mkdir() + ca_key = tmp_path / "ca_key" + ca_key.write_text("fake") + inv_path = tmp_path / "inventory.yaml" + # bad-name has wrong prefix for AGT type + _write_inventory(inv_path, [{"name": "bad-name", "type": "agt", "principals": ["x"], "ttl_hours": 24}]) + cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) + monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) + + result = runner.invoke(app, ["scorecard"]) + assert result.exit_code == 1 + + +# --------------------------------------------------------------------------- +# warden inventory add / list / remove +# --------------------------------------------------------------------------- + +def test_inventory_round_trip(tmp_path, monkeypatch): + state_dir = tmp_path / "state" + ca_key = tmp_path / "ca_key" + ca_key.write_text("fake") + inv_path = tmp_path / "inventory.yaml" + _write_inventory(inv_path, []) + cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) + monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) + + add_result = runner.invoke(app, ["inventory", "add", "agt-test", "--type", "agt"]) + assert add_result.exit_code == 0 + + list_result = runner.invoke(app, ["inventory", "list"]) + assert list_result.exit_code == 0 + assert "agt-test" in list_result.output + + remove_result = runner.invoke(app, ["inventory", "remove", "agt-test"]) + assert remove_result.exit_code == 0 + + list_result2 = runner.invoke(app, ["inventory", "list"]) + assert "agt-test" not in list_result2.output + + +def test_inventory_list_json(tmp_path, monkeypatch): + state_dir = tmp_path / "state" + ca_key = tmp_path / "ca_key" + ca_key.write_text("fake") + inv_path = tmp_path / "inventory.yaml" + _write_inventory(inv_path, [{"name": "agt-bridge", "type": "agt", "principals": ["agt-task"], "ttl_hours": 24}]) + cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) + monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) + + result = runner.invoke(app, ["inventory", "list", "--json"]) + assert result.exit_code == 0 + data = json.loads(result.output) + assert "agt-bridge" in data + + +# --------------------------------------------------------------------------- +# warden log +# --------------------------------------------------------------------------- + +def test_log_exits_0_no_log(tmp_path, monkeypatch): + state_dir = tmp_path / "state" + state_dir.mkdir() + ca_key = tmp_path / "ca_key" + ca_key.write_text("fake") + inv_path = tmp_path / "inventory.yaml" + _write_inventory(inv_path, []) + cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) + monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) + + result = runner.invoke(app, ["log"]) + assert result.exit_code == 0 + + +def test_log_json_parseable(tmp_path, monkeypatch): + state_dir = tmp_path / "state" + state_dir.mkdir() + log_path = state_dir / "signatures.log" + entry = {"timestamp": "2026-03-28T10:00:00+00:00", "actor": "agt-test", + "actor_type": "agt", "identity": "agt-test", "principals": ["agt-task"], + "ttl_hours": 24, "valid_before": "2026-03-29T10:00:00+00:00", + "cert_path": "/tmp/cert.pub", "backend": "local"} + log_path.write_text(json.dumps(entry) + "\n") + + ca_key = tmp_path / "ca_key" + ca_key.write_text("fake") + inv_path = tmp_path / "inventory.yaml" + _write_inventory(inv_path, []) + cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) + monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) + + result = runner.invoke(app, ["log", "--json"]) + assert result.exit_code == 0 + data = json.loads(result.output) + assert isinstance(data, list) + assert data[0]["actor"] == "agt-test" + + +def test_log_filters_by_actor(tmp_path, monkeypatch): + state_dir = tmp_path / "state" + state_dir.mkdir() + log_path = state_dir / "signatures.log" + for actor in ["agt-alpha", "agt-beta"]: + entry = {"timestamp": "2026-03-28T10:00:00+00:00", "actor": actor, + "actor_type": "agt", "identity": actor, "principals": [actor], + "ttl_hours": 24, "valid_before": "2026-03-29T10:00:00+00:00", + "cert_path": "/tmp/cert.pub", "backend": "local"} + log_path.open("a").write(json.dumps(entry) + "\n") + + ca_key = tmp_path / "ca_key" + ca_key.write_text("fake") + inv_path = tmp_path / "inventory.yaml" + _write_inventory(inv_path, []) + cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) + monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) + + result = runner.invoke(app, ["log", "agt-alpha", "--json"]) + assert result.exit_code == 0 + data = json.loads(result.output) + assert len(data) == 1 + assert data[0]["actor"] == "agt-alpha" + + +# --------------------------------------------------------------------------- +# warden cleanup +# --------------------------------------------------------------------------- + +def test_cleanup_dry_run_no_delete(tmp_path, monkeypatch): + state_dir = tmp_path / "state" + state_dir.mkdir() + cert = state_dir / "agt-test-cert.pub" + cert.write_text(CERT_CONTENT) + + ca_key = tmp_path / "ca_key" + ca_key.write_text("fake") + inv_path = tmp_path / "inventory.yaml" + _write_inventory(inv_path, []) + cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) + monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) + + meta = { + "identity": "agt-test", + "valid_before": datetime(2020, 1, 1, tzinfo=timezone.utc), + "principals": ["agt-task"], + } + with patch("warden.cli.parse_cert_metadata", return_value=meta): + result = runner.invoke(app, ["cleanup", "--dry-run"]) + + assert result.exit_code == 0 + assert cert.exists() # not deleted + assert "would remove" in result.output + + +def test_cleanup_removes_stale_cert(tmp_path, monkeypatch): + state_dir = tmp_path / "state" + state_dir.mkdir() + cert = state_dir / "agt-test-cert.pub" + cert.write_text(CERT_CONTENT) + + ca_key = tmp_path / "ca_key" + ca_key.write_text("fake") + inv_path = tmp_path / "inventory.yaml" + _write_inventory(inv_path, []) + cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) + monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) + + meta = { + "identity": "agt-test", + "valid_before": datetime(2020, 1, 1, tzinfo=timezone.utc), + "principals": ["agt-task"], + } + with patch("warden.cli.parse_cert_metadata", return_value=meta): + result = runner.invoke(app, ["cleanup"]) + + assert result.exit_code == 0 + assert not cert.exists() + + +def test_cleanup_exits_0_nothing_to_clean(tmp_path, monkeypatch): + state_dir = tmp_path / "state" + state_dir.mkdir() + + ca_key = tmp_path / "ca_key" + ca_key.write_text("fake") + inv_path = tmp_path / "inventory.yaml" + _write_inventory(inv_path, []) + cfg = _write_config(tmp_path, ca_key, state_dir, inv_path) + monkeypatch.setenv("WARDEN_CONFIG", str(cfg)) + + result = runner.invoke(app, ["cleanup"]) + assert result.exit_code == 0 + assert "No stale" in result.output diff --git a/tests/test_integration.py b/tests/test_integration.py new file mode 100644 index 0000000..4af4447 --- /dev/null +++ b/tests/test_integration.py @@ -0,0 +1,104 @@ +"""Integration tests: real ssh-keygen, no mocking. + +Run with: uv run pytest -m integration +Excluded from the default unit suite by pyproject.toml addopts. +""" +import shutil +import subprocess +from datetime import datetime, timezone +from pathlib import Path + +import pytest + +from warden.ca import LocalCA, parse_cert_metadata +from warden.models import ActorType, CertSpec + +pytestmark = pytest.mark.integration + + +@pytest.fixture(autouse=True) +def require_ssh_keygen(): + if shutil.which("ssh-keygen") is None: + pytest.skip("ssh-keygen not found in PATH") + + +def _generate_ca_key(tmp_path: Path) -> Path: + ca_key = tmp_path / "ca_key" + subprocess.run( + ["ssh-keygen", "-t", "ed25519", "-f", str(ca_key), "-N", "", "-C", "test-ca"], + check=True, + capture_output=True, + ) + return ca_key + + +def _generate_actor_key(tmp_path: Path) -> Path: + key = tmp_path / "actor_key" + subprocess.run( + ["ssh-keygen", "-t", "ed25519", "-f", str(key), "-N", "", "-C", "actor"], + check=True, + capture_output=True, + ) + return key + + +def test_local_ca_sign_real_ssh_keygen(tmp_path): + ca_key = _generate_ca_key(tmp_path) + actor_key = _generate_actor_key(tmp_path) + pubkey = Path(str(actor_key) + ".pub") + + spec = CertSpec( + actor_name="agt-integration-test", + actor_type=ActorType.AGT, + pubkey_path=pubkey, + ttl_hours=1, + principals=["agt-integration-test"], + identity="agt-integration-test", + ) + + state_dir = tmp_path / "state" + ca = LocalCA(ca_key, state_dir) + record = ca.sign(spec) + + assert record.cert_path.exists() + assert record.valid_before > datetime.now(timezone.utc) + assert record.identity == "agt-integration-test" + assert record.principals == ["agt-integration-test"] + + # Re-parse without any mocking + meta = parse_cert_metadata(record.cert_path) + assert meta["identity"] == "agt-integration-test" + assert meta["valid_before"] > datetime.now(timezone.utc) + + +def test_local_ca_sign_cert_file_mode_600(tmp_path): + ca_key = _generate_ca_key(tmp_path) + actor_key = _generate_actor_key(tmp_path) + pubkey = Path(str(actor_key) + ".pub") + + spec = CertSpec( + actor_name="agt-integration-test", + actor_type=ActorType.AGT, + pubkey_path=pubkey, + ttl_hours=1, + principals=["agt-integration-test"], + identity="agt-integration-test", + ) + + ca = LocalCA(ca_key, tmp_path / "state") + record = ca.sign(spec) + + assert oct(record.cert_path.stat().st_mode & 0o777) == oct(0o600) + + +def test_generate_keypair_real_ssh_keygen(tmp_path): + ca_key = _generate_ca_key(tmp_path) + ca = LocalCA(ca_key, tmp_path / "state") + + privkey, pubkey = ca.generate_keypair("agt-integration-test") + + assert privkey.exists() + assert pubkey.exists() + assert oct(privkey.stat().st_mode & 0o777) == oct(0o600) + assert oct(pubkey.stat().st_mode & 0o777) == oct(0o644) + assert "ssh-ed25519" in pubkey.read_text() diff --git a/tests/test_scorecard.py b/tests/test_scorecard.py index 2ff1694..1b0fb06 100644 --- a/tests/test_scorecard.py +++ b/tests/test_scorecard.py @@ -10,6 +10,7 @@ from datetime import datetime, timezone from warden.scorecard import ( check_actor_name_prefixes, check_all_actors_have_principals, + check_file_permissions, check_no_stale_certs, check_no_expired_certs, check_ttl_policy, @@ -100,7 +101,7 @@ def test_run_scorecard_clean(tmp_path): ) results = run_scorecard(tmp_path, inv) assert all(r.passed for r in results) - assert len(results) == 5 + assert len(results) == 6 # --------------------------------------------------------------------------- @@ -160,6 +161,52 @@ def test_ttl_policy_skips_unknown_actor(tmp_path): assert result.passed # unknown actor skipped, not a violation +# --------------------------------------------------------------------------- +# check_file_permissions +# --------------------------------------------------------------------------- + +def test_file_permissions_no_state_dir(): + result = check_file_permissions(Path("/nonexistent/state/dir")) + assert result.passed + + +def test_file_permissions_empty_dir(tmp_path): + result = check_file_permissions(tmp_path) + assert result.passed + + +def test_file_permissions_pass(tmp_path): + cert = tmp_path / "agt-bridge-cert.pub" + cert.write_text("fake") + cert.chmod(0o600) + result = check_file_permissions(tmp_path) + assert result.passed + + +def test_file_permissions_fail_world_readable(tmp_path): + cert = tmp_path / "agt-bridge-cert.pub" + cert.write_text("fake") + cert.chmod(0o644) + result = check_file_permissions(tmp_path) + assert not result.passed + assert "agt-bridge-cert.pub" in result.detail + + +def test_file_permissions_keys_dir(tmp_path): + keys_dir = tmp_path / "keys" + keys_dir.mkdir() + key = keys_dir / "agt-test_ed25519" + key.write_text("fake key") + key.chmod(0o644) + result = check_file_permissions(tmp_path) + assert not result.passed + assert "keys/agt-test_ed25519" in result.detail + + +# --------------------------------------------------------------------------- +# (continuation) +# --------------------------------------------------------------------------- + def test_stale_certs_detail_suggests_cleanup(tmp_path): cert_path = tmp_path / "agt-bridge-cert.pub" cert_path.write_text("fake") diff --git a/tests/test_vault.py b/tests/test_vault.py new file mode 100644 index 0000000..a27c287 --- /dev/null +++ b/tests/test_vault.py @@ -0,0 +1,213 @@ +"""Tests for warden.vault — VaultCA backend.""" +from unittest.mock import MagicMock, patch + +import httpx +import pytest + +from warden.ca import CAError +from warden.config import VaultConfig +from warden.models import ActorType, CertSpec +from warden.vault import VaultCA + +SAMPLE_CERT = "ssh-ed25519-cert-v01@openssh.com AAAA_fake_cert_data" + +SAMPLE_SSHKEYGEN_L = """\ +/tmp/key-cert.pub: + Type: ssh-ed25519-cert-v01@openssh.com user certificate + Public key: ED25519-CERT SHA256:abc123 + Signing CA: ED25519 SHA256:xyz (using ssh-ed25519) + Key ID: "agt-test" + Serial: 0 + Valid: from 2026-03-28T10:00:00 to 2026-03-29T10:00:00 + Principals: + agt-task + Critical Options: (none) + Extensions: + permit-pty +""" + + +def _make_cfg(**overrides): + defaults = { + "addr": "http://127.0.0.1:8200", + "mount": "ssh", + "token_env": "VAULT_TOKEN", + "role_map": {"agt": "agt-role", "adm": "adm-role", "atm": "atm-role"}, + } + defaults.update(overrides) + return VaultConfig(**defaults) + + +def _make_spec(tmp_path, **overrides): + pubkey = tmp_path / "key.pub" + pubkey.write_text("ssh-ed25519 AAAA actor-key") + defaults = { + "actor_name": "agt-test", + "actor_type": ActorType.AGT, + "pubkey_path": pubkey, + "ttl_hours": 24, + "principals": ["agt-task"], + "identity": "agt-test", + } + defaults.update(overrides) + return CertSpec(**defaults) + + +def _mock_httpx_post(signed_key: str): + resp = MagicMock() + resp.json.return_value = {"data": {"signed_key": signed_key}} + resp.raise_for_status.return_value = None + return resp + + +def _mock_ssh_keygen_L(cmd, **kwargs): + result = MagicMock() + result.returncode = 0 + result.stdout = SAMPLE_SSHKEYGEN_L + result.stderr = "" + return result + + +# --------------------------------------------------------------------------- +# VaultCA.sign — success +# --------------------------------------------------------------------------- + +def test_vault_ca_sign_success(tmp_path, monkeypatch): + monkeypatch.setenv("VAULT_TOKEN", "fake-token") + spec = _make_spec(tmp_path) + cfg = _make_cfg() + ca = VaultCA(cfg, tmp_path / "state") + + with ( + patch("warden.vault.httpx.post", return_value=_mock_httpx_post(SAMPLE_CERT)), + patch("warden.ca.subprocess.run", side_effect=_mock_ssh_keygen_L), + ): + record = ca.sign(spec) + + assert record.actor_name == "agt-test" + assert record.identity == "agt-test" + assert record.principals == ["agt-task"] + dest = tmp_path / "state" / "agt-test-cert.pub" + assert dest.exists() + assert SAMPLE_CERT in dest.read_text() + + +def test_vault_ca_sign_cert_mode_600(tmp_path, monkeypatch): + monkeypatch.setenv("VAULT_TOKEN", "fake-token") + spec = _make_spec(tmp_path) + ca = VaultCA(_make_cfg(), tmp_path / "state") + + with ( + patch("warden.vault.httpx.post", return_value=_mock_httpx_post(SAMPLE_CERT)), + patch("warden.ca.subprocess.run", side_effect=_mock_ssh_keygen_L), + ): + record = ca.sign(spec) + + assert oct(record.cert_path.stat().st_mode & 0o777) == oct(0o600) + + +def test_vault_ca_sign_writes_signature_log(tmp_path, monkeypatch): + import json + monkeypatch.setenv("VAULT_TOKEN", "fake-token") + spec = _make_spec(tmp_path) + ca = VaultCA(_make_cfg(), tmp_path / "state") + + with ( + patch("warden.vault.httpx.post", return_value=_mock_httpx_post(SAMPLE_CERT)), + patch("warden.ca.subprocess.run", side_effect=_mock_ssh_keygen_L), + ): + ca.sign(spec) + + log_path = tmp_path / "state" / "signatures.log" + assert log_path.exists() + entry = json.loads(log_path.read_text().strip()) + assert entry["backend"] == "vault" + assert entry["actor"] == "agt-test" + + +# --------------------------------------------------------------------------- +# VaultCA.sign — failure paths +# --------------------------------------------------------------------------- + +def test_vault_ca_sign_http_403(tmp_path, monkeypatch): + monkeypatch.setenv("VAULT_TOKEN", "bad-token") + spec = _make_spec(tmp_path) + ca = VaultCA(_make_cfg(), tmp_path / "state") + + request = httpx.Request("POST", "http://127.0.0.1:8200/v1/ssh/sign/agt-role") + response = httpx.Response(403, request=request, text="permission denied") + exc = httpx.HTTPStatusError("403", request=request, response=response) + + with patch("warden.vault.httpx.post", side_effect=exc): + with pytest.raises(CAError, match="403"): + ca.sign(spec) + + +def test_vault_ca_sign_request_error(tmp_path, monkeypatch): + monkeypatch.setenv("VAULT_TOKEN", "fake-token") + spec = _make_spec(tmp_path) + ca = VaultCA(_make_cfg(), tmp_path / "state") + + request = httpx.Request("POST", "http://127.0.0.1:8200/v1/ssh/sign/agt-role") + exc = httpx.ConnectError("connection refused", request=request) + + with patch("warden.vault.httpx.post", side_effect=exc): + with pytest.raises(CAError, match="unreachable"): + ca.sign(spec) + + +def test_vault_ca_sign_missing_token(tmp_path, monkeypatch): + monkeypatch.delenv("VAULT_TOKEN", raising=False) + spec = _make_spec(tmp_path) + ca = VaultCA(_make_cfg(), tmp_path / "state") + + with pytest.raises(CAError, match="VAULT_TOKEN"): + ca.sign(spec) + + +def test_vault_ca_sign_missing_role(tmp_path, monkeypatch): + monkeypatch.setenv("VAULT_TOKEN", "fake-token") + cfg = _make_cfg(role_map={}) # no roles mapped + spec = _make_spec(tmp_path) + ca = VaultCA(cfg, tmp_path / "state") + + with pytest.raises(CAError, match="role_map"): + ca.sign(spec) + + +def test_vault_ca_sign_missing_pubkey(tmp_path, monkeypatch): + monkeypatch.setenv("VAULT_TOKEN", "fake-token") + spec = _make_spec(tmp_path, pubkey_path=tmp_path / "nonexistent.pub") + ca = VaultCA(_make_cfg(), tmp_path / "state") + + with pytest.raises(CAError, match="Public key not found"): + ca.sign(spec) + + +def test_vault_ca_sign_ttl_enforcement(tmp_path, monkeypatch): + monkeypatch.setenv("VAULT_TOKEN", "fake-token") + spec = _make_spec(tmp_path, ttl_hours=100) # AGT max is 24h + ca = VaultCA(_make_cfg(), tmp_path / "state") + + with pytest.raises(CAError, match="exceeds maximum"): + ca.sign(spec) + + +def test_vault_ca_sign_evicts_existing_cert(tmp_path, monkeypatch): + monkeypatch.setenv("VAULT_TOKEN", "fake-token") + state = tmp_path / "state" + state.mkdir() + old_cert = state / "agt-test-cert.pub" + old_cert.write_text("old cert") + + spec = _make_spec(tmp_path) + ca = VaultCA(_make_cfg(), state) + + with ( + patch("warden.vault.httpx.post", return_value=_mock_httpx_post(SAMPLE_CERT)), + patch("warden.ca.subprocess.run", side_effect=_mock_ssh_keygen_L), + ): + record = ca.sign(spec) + + assert record.cert_path.read_text().strip() == SAMPLE_CERT + assert len(list(state.glob("agt-test-cert.pub"))) == 1 diff --git a/workplans/WARDEN-WP-0003-test-coverage-and-quality.md b/workplans/WARDEN-WP-0003-test-coverage-and-quality.md index bce26e4..54e65d5 100644 --- a/workplans/WARDEN-WP-0003-test-coverage-and-quality.md +++ b/workplans/WARDEN-WP-0003-test-coverage-and-quality.md @@ -4,7 +4,7 @@ type: workplan title: "OpsWarden Test Coverage and Code Quality" domain: custodian repo: ops-warden -status: active +status: done owner: Bernd topic_slug: custodian planning_priority: medium @@ -84,57 +84,57 @@ received a cert via ops-bridge but have no warden installation. ```task id: WARDEN-WP-0003-T1 state_hub_task_id: eff074ce-c027-4df5-8006-0990296592ac -status: todo +status: done priority: high ``` -- [ ] Create `tests/test_vault.py` -- [ ] Test `VaultCA.sign()` success: mock `httpx.post` returning a valid +- [x] Create `tests/test_vault.py` +- [x] Test `VaultCA.sign()` success: mock `httpx.post` returning a valid `signed_key`; assert `CertRecord` fields; assert cert file written to state_dir -- [ ] Test HTTP 403: `httpx.HTTPStatusError` → `CAError` with status code in message -- [ ] Test unreachable Vault: `httpx.RequestError` → `CAError` with fallback hint -- [ ] Test missing `VAULT_TOKEN`: `_token()` raises `CAError` before HTTP call -- [ ] Test missing role in `role_map`: `CAError` before HTTP call -- [ ] Test missing pubkey file: `CAError` before HTTP call +- [x] Test HTTP 403: `httpx.HTTPStatusError` → `CAError` with status code in message +- [x] Test unreachable Vault: `httpx.RequestError` → `CAError` with fallback hint +- [x] Test missing `VAULT_TOKEN`: `_token()` raises `CAError` before HTTP call +- [x] Test missing role in `role_map`: `CAError` before HTTP call +- [x] Test missing pubkey file: `CAError` before HTTP call ### T2 — LocalCA.generate_keypair tests ```task id: WARDEN-WP-0003-T2 state_hub_task_id: ddfe5331-0a3b-4783-bdf4-f5ebcdf7965c -status: todo +status: done priority: medium ``` -- [ ] Add `TestGenerateKeypair` class to `tests/test_ca.py` -- [ ] Test success: mock `subprocess.run`; assert privkey and pubkey paths returned -- [ ] Test ssh-keygen called with `-t ed25519`, `-N ""`, `-C actor_name` -- [ ] Test existing files are unlinked before generation (write dummy files first) -- [ ] Test `CAError` raised on non-zero ssh-keygen exit code -- [ ] Test output files land in `state_dir/keys/` +- [x] Add `TestGenerateKeypair` class to `tests/test_ca.py` +- [x] Test success: mock `subprocess.run`; assert privkey and pubkey paths returned +- [x] Test ssh-keygen called with `-t ed25519`, `-N ""`, `-C actor_name` +- [x] Test existing files are unlinked before generation (write dummy files first) +- [x] Test `CAError` raised on non-zero ssh-keygen exit code +- [x] Test output files land in `state_dir/keys/` ### T3 — CLI tests ```task id: WARDEN-WP-0003-T3 state_hub_task_id: 040ce3a1-0efb-4816-a2d9-357162dd1612 -status: todo +status: done priority: high ``` -- [ ] Create `tests/test_cli.py` using `typer.testing.CliRunner` -- [ ] `warden sign`: exits 0 and stdout is cert text (mock CA); exits 1 on +- [x] Create `tests/test_cli.py` using `typer.testing.CliRunner` +- [x] `warden sign`: exits 0 and stdout is cert text (mock CA); exits 1 on unknown actor; exits 1 on config error -- [ ] `warden issue`: exits 1 on vault backend; exits 0 on local backend (mock CA) -- [ ] `warden status`: exits 0 and prints "no cert" message when state_dir empty; +- [x] `warden issue`: exits 1 on vault backend; exits 0 on local backend (mock CA) +- [x] `warden status`: exits 0 and prints "no cert" message when state_dir empty; exits 1 when cert is expired (mock `parse_cert_metadata`) -- [ ] `warden scorecard`: exits 0 on clean inventory + empty state_dir; +- [x] `warden scorecard`: exits 0 on clean inventory + empty state_dir; exits 1 when a check fails -- [ ] `warden inventory add / list / remove`: round-trip via tmp inventory file -- [ ] `warden log`: exits 0 with empty output when no log; `--json` is valid JSON +- [x] `warden inventory add / list / remove`: round-trip via tmp inventory file +- [x] `warden log`: exits 0 with empty output when no log; `--json` is valid JSON (after WARDEN-WP-0002 T3 adds the log command) -- [ ] `warden cleanup --dry-run`: exits 0, no files deleted +- [x] `warden cleanup --dry-run`: exits 0, no files deleted (after WARDEN-WP-0002 T2 adds cleanup) ### T4 — Real ssh-keygen integration test @@ -142,38 +142,38 @@ priority: high ```task id: WARDEN-WP-0003-T4 state_hub_task_id: 434fb008-103f-410c-85fd-e77b33e61fe4 -status: todo +status: done priority: medium ``` -- [ ] Create `tests/test_integration.py` -- [ ] Mark all tests `@pytest.mark.integration` -- [ ] Add `pytest.ini_options` to `pyproject.toml`: +- [x] Create `tests/test_integration.py` +- [x] Mark all tests `@pytest.mark.integration` +- [x] Add `pytest.ini_options` to `pyproject.toml`: `addopts = "-m 'not integration'"` so unit suite skips them by default -- [ ] Test `LocalCA.sign()` end-to-end: generate a real CA keypair and actor +- [x] Test `LocalCA.sign()` end-to-end: generate a real CA keypair and actor keypair via subprocess ssh-keygen in tmp_path; call `LocalCA.sign()`; assert `CertRecord.valid_before > datetime.now(utc)`; assert cert file exists; assert `parse_cert_metadata()` succeeds on it without mocking -- [ ] Skip test if `shutil.which("ssh-keygen") is None` -- [ ] Document in README: `uv run pytest -m integration` to run real-CA tests +- [x] Skip test if `shutil.which("ssh-keygen") is None` +- [x] Document in README: `uv run pytest -m integration` to run real-CA tests ### T5 — File permissions enforcement (mode 600) ```task id: WARDEN-WP-0003-T5 state_hub_task_id: ac146fe6-d1fd-4186-91bd-6f098de72449 -status: todo +status: done priority: medium ``` -- [ ] `ca.py` `LocalCA.sign()`: call `os.chmod(dest, 0o600)` after `shutil.copy2` -- [ ] `ca.py` `LocalCA.generate_keypair()`: call `os.chmod(privkey, 0o600)` and +- [x] `ca.py` `LocalCA.sign()`: call `os.chmod(dest, 0o600)` after `shutil.copy2` +- [x] `ca.py` `LocalCA.generate_keypair()`: call `os.chmod(privkey, 0o600)` and `os.chmod(pubkey, 0o644)` after generation -- [ ] `vault.py` `VaultCA.sign()`: call `os.chmod(dest, 0o600)` after `dest.write_text` -- [ ] `scorecard.py`: add `check_file_permissions(state_dir)` — flag any +- [x] `vault.py` `VaultCA.sign()`: call `os.chmod(dest, 0o600)` after `dest.write_text` +- [x] `scorecard.py`: add `check_file_permissions(state_dir)` — flag any `*-cert.pub` or `keys/*` file where `stat().st_mode & 0o044 != 0` -- [ ] Add `check_file_permissions` to `run_scorecard()` -- [ ] Update `test_ca.py`: assert `os.chmod` called with correct mode after sign +- [x] Add `check_file_permissions` to `run_scorecard()` +- [x] Update `test_ca.py`: assert `os.chmod` called with correct mode after sign and generate_keypair (patch os.chmod or check stat on actual files in tmp_path) @@ -182,28 +182,28 @@ priority: medium ```task id: WARDEN-WP-0003-T6 state_hub_task_id: 1c9f1987-7b11-43c1-a5e3-c2fd8d1c1589 -status: todo +status: done priority: low ``` -- [ ] `cli.py` `status()`: add +- [x] `cli.py` `status()`: add `state_dir_override: Annotated[Optional[Path], typer.Option("--state-dir")] = None` -- [ ] When `--state-dir` is provided: use it directly, skip `_load_cfg()` entirely -- [ ] When absent: load config as today -- [ ] Add test in `test_cli.py`: invoke `warden status --state-dir ` +- [x] When `--state-dir` is provided: use it directly, skip `_load_cfg()` entirely +- [x] When absent: load config as today +- [x] Add test in `test_cli.py`: invoke `warden status --state-dir ` without a config file; assert exit 0 --- ## Acceptance Criteria -- [ ] `uv run pytest` runs unit suite only; all pass; VaultCA and generate_keypair +- [x] `uv run pytest` runs unit suite only; all pass; VaultCA and generate_keypair covered -- [ ] `uv run pytest -m integration` succeeds (requires ssh-keygen in PATH) -- [ ] `test_cli.py` covers all commands; no mocked subprocess in CLI tests where +- [x] `uv run pytest -m integration` succeeds (requires ssh-keygen in PATH) +- [x] `test_cli.py` covers all commands; no mocked subprocess in CLI tests where avoidable (use tmp inventory files and mocked CA) -- [ ] `ls -la ~/.local/state/warden/*.pub` shows mode 600 for newly signed certs -- [ ] Scorecard `file_permissions` check passes on a clean state dir; fails on a +- [x] `ls -la ~/.local/state/warden/*.pub` shows mode 600 for newly signed certs +- [x] Scorecard `file_permissions` check passes on a clean state dir; fails on a world-readable cert -- [ ] `warden status --state-dir /tmp/some-dir` runs without a `warden.yaml` -- [ ] All lints pass: `uv run ruff check .` +- [x] `warden status --state-dir /tmp/some-dir` runs without a `warden.yaml` +- [x] All lints pass: `uv run ruff check .`