Files
ops-warden/tests/test_policy.py
tegwick 8e9383a33a feat: opt-in flex-auth policy gate and OpenBao verify (WP-0007)
Add policy.py client that calls flex-auth /v1/check before sign/issue when
policy.enabled is true. Record policy_decision_id in signatures.log. Default
off preserves existing inventory-only behavior. Document production OpenBao
health probe and update config/wiki references.
2026-06-17 08:37:14 +02:00

140 lines
4.5 KiB
Python

"""Tests for warden.policy — flex-auth gate."""
from pathlib import Path
from unittest.mock import MagicMock, patch
import httpx
import pytest
from warden.ca import CAError
from warden.config import PolicyConfig
from warden.models import ActorType, CertSpec
from warden.policy import check_sign_policy, pubkey_fingerprint
def _spec(pubkey_path: Path) -> CertSpec:
return CertSpec(
actor_name="agt-state-hub-bridge",
actor_type=ActorType.AGT,
pubkey_path=pubkey_path,
ttl_hours=24,
principals=["agt-task-bridge"],
)
def test_pubkey_fingerprint(tmp_path):
pubkey = tmp_path / "key.pub"
pubkey.write_text("ssh-ed25519 AAAA test\n")
fp = pubkey_fingerprint(pubkey)
assert fp.startswith("sha256:")
assert len(fp) == 7 + 64
def test_disabled_returns_none(tmp_path):
pubkey = tmp_path / "key.pub"
pubkey.write_text("ssh-ed25519 AAAA\n")
cfg = PolicyConfig(enabled=False)
assert check_sign_policy(cfg, _spec(pubkey)) is None
def test_allow_returns_decision_id(tmp_path):
pubkey = tmp_path / "key.pub"
pubkey.write_text("ssh-ed25519 AAAA\n")
cfg = PolicyConfig(enabled=True, flex_auth_url="http://flex-auth.test")
mock_response = MagicMock()
mock_response.json.return_value = {"effect": "allow", "id": "dec-123"}
mock_response.raise_for_status = MagicMock()
with patch("warden.policy.httpx.post", return_value=mock_response) as post:
result = check_sign_policy(cfg, _spec(pubkey))
assert result == "dec-123"
post.assert_called_once()
call_kwargs = post.call_args
assert call_kwargs[0][0] == "http://flex-auth.test/v1/check"
body = call_kwargs[1]["json"]
assert body["action"] == "sign"
assert body["resource"]["type"] == "ssh-certificate"
assert body["context"]["actor_name"] == "agt-state-hub-bridge"
def test_deny_raises_ca_error(tmp_path):
pubkey = tmp_path / "key.pub"
pubkey.write_text("ssh-ed25519 AAAA\n")
cfg = PolicyConfig(enabled=True)
mock_response = MagicMock()
mock_response.json.return_value = {
"effect": "deny",
"reason": "actor not authorized",
}
mock_response.raise_for_status = MagicMock()
with patch("warden.policy.httpx.post", return_value=mock_response):
with pytest.raises(CAError, match="denied SSH sign"):
check_sign_policy(cfg, _spec(pubkey))
def test_unreachable_fail_closed_raises(tmp_path):
pubkey = tmp_path / "key.pub"
pubkey.write_text("ssh-ed25519 AAAA\n")
cfg = PolicyConfig(enabled=True, fail_closed=True)
with patch(
"warden.policy.httpx.post",
side_effect=httpx.ConnectError("connection refused"),
):
with pytest.raises(CAError, match="unreachable"):
check_sign_policy(cfg, _spec(pubkey))
def test_unreachable_fail_open_returns_none(tmp_path):
pubkey = tmp_path / "key.pub"
pubkey.write_text("ssh-ed25519 AAAA\n")
cfg = PolicyConfig(enabled=True, fail_closed=False)
with patch(
"warden.policy.httpx.post",
side_effect=httpx.ConnectError("connection refused"),
):
assert check_sign_policy(cfg, _spec(pubkey)) is None
def test_http_error_fail_closed_raises(tmp_path):
pubkey = tmp_path / "key.pub"
pubkey.write_text("ssh-ed25519 AAAA\n")
cfg = PolicyConfig(enabled=True, fail_closed=True)
mock_response = MagicMock()
mock_response.status_code = 403
error = httpx.HTTPStatusError(
"forbidden", request=MagicMock(), response=mock_response
)
with patch("warden.policy.httpx.post", side_effect=error):
with pytest.raises(CAError, match="HTTP 403"):
check_sign_policy(cfg, _spec(pubkey))
def test_missing_pubkey_raises(tmp_path):
cfg = PolicyConfig(enabled=True)
spec = _spec(tmp_path / "missing.pub")
with pytest.raises(CAError, match="Public key not found"):
check_sign_policy(cfg, spec)
def test_subject_from_env(tmp_path, monkeypatch):
pubkey = tmp_path / "key.pub"
pubkey.write_text("ssh-ed25519 AAAA\n")
cfg = PolicyConfig(enabled=True, subject_env="WARDEN_POLICY_SUBJECT")
monkeypatch.setenv("WARDEN_POLICY_SUBJECT", "iam:bernd")
mock_response = MagicMock()
mock_response.json.return_value = {"effect": "allow", "id": "dec-456"}
mock_response.raise_for_status = MagicMock()
with patch("warden.policy.httpx.post", return_value=mock_response) as post:
check_sign_policy(cfg, _spec(pubkey))
body = post.call_args[1]["json"]
assert body["subject"]["id"] == "iam:bernd"