"""Tests for warden.policy — flex-auth gate.""" from pathlib import Path from unittest.mock import MagicMock, patch import httpx import pytest from warden.ca import CAError from warden.config import PolicyConfig from warden.models import ActorType, CertSpec from warden.policy import check_sign_policy, pubkey_fingerprint def _spec(pubkey_path: Path) -> CertSpec: return CertSpec( actor_name="agt-state-hub-bridge", actor_type=ActorType.AGT, pubkey_path=pubkey_path, ttl_hours=24, principals=["agt-task-bridge"], ) def test_pubkey_fingerprint(tmp_path): pubkey = tmp_path / "key.pub" pubkey.write_text("ssh-ed25519 AAAA test\n") fp = pubkey_fingerprint(pubkey) assert fp.startswith("sha256:") assert len(fp) == 7 + 64 def test_disabled_returns_none(tmp_path): pubkey = tmp_path / "key.pub" pubkey.write_text("ssh-ed25519 AAAA\n") cfg = PolicyConfig(enabled=False) assert check_sign_policy(cfg, _spec(pubkey)) is None def test_allow_returns_decision_id(tmp_path): pubkey = tmp_path / "key.pub" pubkey.write_text("ssh-ed25519 AAAA\n") cfg = PolicyConfig(enabled=True, flex_auth_url="http://flex-auth.test") mock_response = MagicMock() mock_response.json.return_value = {"effect": "allow", "id": "dec-123"} mock_response.raise_for_status = MagicMock() with patch("warden.policy.httpx.post", return_value=mock_response) as post: result = check_sign_policy(cfg, _spec(pubkey)) assert result == "dec-123" post.assert_called_once() call_kwargs = post.call_args assert call_kwargs[0][0] == "http://flex-auth.test/v1/check" body = call_kwargs[1]["json"] assert body["action"] == "sign" assert body["resource"]["type"] == "ssh-certificate" assert body["context"]["actor_name"] == "agt-state-hub-bridge" def test_deny_raises_ca_error(tmp_path): pubkey = tmp_path / "key.pub" pubkey.write_text("ssh-ed25519 AAAA\n") cfg = PolicyConfig(enabled=True) mock_response = MagicMock() mock_response.json.return_value = { "effect": "deny", "reason": "actor not authorized", } mock_response.raise_for_status = MagicMock() with patch("warden.policy.httpx.post", return_value=mock_response): with pytest.raises(CAError, match="denied SSH sign"): check_sign_policy(cfg, _spec(pubkey)) def test_unreachable_fail_closed_raises(tmp_path): pubkey = tmp_path / "key.pub" pubkey.write_text("ssh-ed25519 AAAA\n") cfg = PolicyConfig(enabled=True, fail_closed=True) with patch( "warden.policy.httpx.post", side_effect=httpx.ConnectError("connection refused"), ): with pytest.raises(CAError, match="unreachable"): check_sign_policy(cfg, _spec(pubkey)) def test_unreachable_fail_open_returns_none(tmp_path): pubkey = tmp_path / "key.pub" pubkey.write_text("ssh-ed25519 AAAA\n") cfg = PolicyConfig(enabled=True, fail_closed=False) with patch( "warden.policy.httpx.post", side_effect=httpx.ConnectError("connection refused"), ): assert check_sign_policy(cfg, _spec(pubkey)) is None def test_http_error_fail_closed_raises(tmp_path): pubkey = tmp_path / "key.pub" pubkey.write_text("ssh-ed25519 AAAA\n") cfg = PolicyConfig(enabled=True, fail_closed=True) mock_response = MagicMock() mock_response.status_code = 403 error = httpx.HTTPStatusError( "forbidden", request=MagicMock(), response=mock_response ) with patch("warden.policy.httpx.post", side_effect=error): with pytest.raises(CAError, match="HTTP 403"): check_sign_policy(cfg, _spec(pubkey)) def test_missing_pubkey_raises(tmp_path): cfg = PolicyConfig(enabled=True) spec = _spec(tmp_path / "missing.pub") with pytest.raises(CAError, match="Public key not found"): check_sign_policy(cfg, spec) def test_subject_from_env(tmp_path, monkeypatch): pubkey = tmp_path / "key.pub" pubkey.write_text("ssh-ed25519 AAAA\n") cfg = PolicyConfig(enabled=True, subject_env="WARDEN_POLICY_SUBJECT") monkeypatch.setenv("WARDEN_POLICY_SUBJECT", "iam:bernd") mock_response = MagicMock() mock_response.json.return_value = {"effect": "allow", "id": "dec-456"} mock_response.raise_for_status = MagicMock() with patch("warden.policy.httpx.post", return_value=mock_response) as post: check_sign_policy(cfg, _spec(pubkey)) body = post.call_args[1]["json"] assert body["subject"]["id"] == "iam:bernd"