Initial Commit

This commit is contained in:
2026-03-28 00:45:43 +00:00
parent a436a7569d
commit 5ae6b988aa
23 changed files with 2400 additions and 0 deletions

0
tests/__init__.py Normal file
View File

180
tests/test_ca.py Normal file
View File

@@ -0,0 +1,180 @@
"""Tests for warden.ca — LocalCA and parse_cert_metadata."""
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from warden.ca import CAError, LocalCA, parse_cert_metadata
from warden.models import ActorType, CertSpec
SAMPLE_SSHKEYGEN_L = """\
/tmp/key-cert.pub:
Type: ssh-ed25519-cert-v01@openssh.com user certificate
Public key: ED25519-CERT SHA256:abc123
Signing CA: ED25519 SHA256:xyz (using ssh-ed25519)
Key ID: "agt-state-hub-bridge"
Serial: 0
Valid: from 2026-03-28T10:00:00 to 2026-03-29T10:00:00
Principals:
agt-task-bridge
Critical Options: (none)
Extensions:
permit-pty
"""
CERT_CONTENT = "ssh-ed25519-cert-v01@openssh.com AAAA_fake_cert_data"
def _mock_run_factory(cert_content: str):
"""Returns a mock subprocess.run that writes the cert file on sign and returns
SAMPLE_SSHKEYGEN_L on -L."""
def mock_run(cmd, **kwargs):
result = MagicMock()
result.returncode = 0
result.stdout = ""
result.stderr = ""
if not isinstance(cmd, list) or not cmd:
return result
if cmd[0] == "ssh-keygen" and "-s" in cmd:
# Signing: write cert next to the pubkey copy (last arg)
pubkey_path = Path(cmd[-1])
cert_path = pubkey_path.parent / (pubkey_path.stem + "-cert.pub")
cert_path.write_text(cert_content)
elif cmd[0] == "ssh-keygen" and "-L" in cmd:
result.stdout = SAMPLE_SSHKEYGEN_L
return result
return mock_run
# ---------------------------------------------------------------------------
# parse_cert_metadata
# ---------------------------------------------------------------------------
def test_parse_cert_metadata(tmp_path):
cert_path = tmp_path / "key-cert.pub"
cert_path.write_text(CERT_CONTENT)
mock_result = MagicMock(returncode=0, stdout=SAMPLE_SSHKEYGEN_L, stderr="")
with patch("warden.ca.subprocess.run", return_value=mock_result):
meta = parse_cert_metadata(cert_path)
assert meta["identity"] == "agt-state-hub-bridge"
assert meta["principals"] == ["agt-task-bridge"]
assert meta["valid_before"] == datetime(2026, 3, 29, 10, 0, 0, tzinfo=timezone.utc)
def test_parse_cert_metadata_failure(tmp_path):
cert_path = tmp_path / "key-cert.pub"
cert_path.write_text("not a cert")
mock_result = MagicMock(returncode=1, stdout="", stderr="not a certificate")
with patch("warden.ca.subprocess.run", return_value=mock_result):
with pytest.raises(CAError, match="ssh-keygen -L failed"):
parse_cert_metadata(cert_path)
def test_parse_cert_metadata_missing_valid_before(tmp_path):
cert_path = tmp_path / "key-cert.pub"
cert_path.write_text(CERT_CONTENT)
output_no_valid = SAMPLE_SSHKEYGEN_L.replace(
" Valid: from 2026-03-28T10:00:00 to 2026-03-29T10:00:00\n", ""
)
mock_result = MagicMock(returncode=0, stdout=output_no_valid, stderr="")
with patch("warden.ca.subprocess.run", return_value=mock_result):
with pytest.raises(CAError, match="valid_before"):
parse_cert_metadata(cert_path)
# ---------------------------------------------------------------------------
# LocalCA.sign
# ---------------------------------------------------------------------------
def test_local_ca_sign(tmp_path):
ca_key = tmp_path / "ca_key"
ca_key.write_text("fake-ca-private-key")
pubkey = tmp_path / "key.pub"
pubkey.write_text("ssh-ed25519 AAAA actor-key")
spec = CertSpec(
actor_name="agt-state-hub-bridge",
actor_type=ActorType.AGT,
pubkey_path=pubkey,
ttl_hours=24,
principals=["agt-task-bridge"],
identity="agt-state-hub-bridge",
)
with patch("warden.ca.subprocess.run", side_effect=_mock_run_factory(CERT_CONTENT)):
ca = LocalCA(ca_key, tmp_path / "state")
record = ca.sign(spec)
assert record.identity == "agt-state-hub-bridge"
assert record.actor_name == "agt-state-hub-bridge"
assert record.principals == ["agt-task-bridge"]
cert_dest = tmp_path / "state" / "agt-state-hub-bridge-cert.pub"
assert cert_dest.exists()
assert cert_dest.read_text().strip() == CERT_CONTENT
def test_local_ca_sign_missing_pubkey(tmp_path):
ca_key = tmp_path / "ca_key"
ca_key.write_text("fake-ca")
spec = CertSpec(
actor_name="agt-test",
actor_type=ActorType.AGT,
pubkey_path=tmp_path / "nonexistent.pub",
ttl_hours=24,
principals=["agt-test"],
)
ca = LocalCA(ca_key, tmp_path / "state")
with pytest.raises(CAError, match="Public key not found"):
ca.sign(spec)
def test_local_ca_sign_missing_ca_key(tmp_path):
pubkey = tmp_path / "key.pub"
pubkey.write_text("ssh-ed25519 AAAA")
spec = CertSpec(
actor_name="agt-test",
actor_type=ActorType.AGT,
pubkey_path=pubkey,
ttl_hours=24,
principals=["agt-test"],
)
ca = LocalCA(tmp_path / "nonexistent_ca", tmp_path / "state")
with pytest.raises(CAError, match="CA key not found"):
ca.sign(spec)
def test_local_ca_sign_ssh_keygen_failure(tmp_path):
ca_key = tmp_path / "ca_key"
ca_key.write_text("fake-ca")
pubkey = tmp_path / "key.pub"
pubkey.write_text("ssh-ed25519 AAAA")
spec = CertSpec(
actor_name="agt-test",
actor_type=ActorType.AGT,
pubkey_path=pubkey,
ttl_hours=24,
principals=["agt-test"],
)
def fail_run(cmd, **kwargs):
result = MagicMock()
result.returncode = 1
result.stderr = "load key: invalid format"
result.stdout = ""
return result
ca = LocalCA(ca_key, tmp_path / "state")
with patch("warden.ca.subprocess.run", side_effect=fail_run):
with pytest.raises(CAError, match="Signing failed"):
ca.sign(spec)

84
tests/test_config.py Normal file
View File

@@ -0,0 +1,84 @@
"""Tests for warden.config."""
from pathlib import Path
import pytest
import yaml
from warden.config import ConfigError, load_config
def write_yaml(path: Path, content: dict) -> None:
with path.open("w") as f:
yaml.dump(content, f)
def test_load_local_config(tmp_path):
cfg_path = tmp_path / "warden.yaml"
write_yaml(cfg_path, {"backend": "local", "ca_key": str(tmp_path / "ca")})
cfg = load_config(cfg_path)
assert cfg.backend == "local"
assert cfg.ca_key == tmp_path / "ca"
def test_local_backend_missing_ca_key_raises(tmp_path):
cfg_path = tmp_path / "warden.yaml"
write_yaml(cfg_path, {"backend": "local"})
with pytest.raises(ConfigError, match="ca_key"):
load_config(cfg_path)
def test_invalid_backend_raises(tmp_path):
cfg_path = tmp_path / "warden.yaml"
write_yaml(cfg_path, {"backend": "magic", "ca_key": "/tmp/ca"})
with pytest.raises(ConfigError, match="backend"):
load_config(cfg_path)
def test_vault_backend(tmp_path):
cfg_path = tmp_path / "warden.yaml"
write_yaml(cfg_path, {
"backend": "vault",
"vault": {
"addr": "https://vault.example.com",
"role_map": {"adm": "adm-role", "agt": "agt-role", "atm": "atm-role"},
},
})
cfg = load_config(cfg_path)
assert cfg.backend == "vault"
assert cfg.vault is not None
assert cfg.vault.addr == "https://vault.example.com"
assert cfg.vault.role_map["agt"] == "agt-role"
def test_vault_backend_missing_addr_raises(tmp_path):
cfg_path = tmp_path / "warden.yaml"
write_yaml(cfg_path, {"backend": "vault", "vault": {}})
with pytest.raises(ConfigError, match="addr"):
load_config(cfg_path)
def test_missing_config_raises():
with pytest.raises(ConfigError, match="not found"):
load_config(Path("/nonexistent/path/warden.yaml"))
def test_custom_state_dir(tmp_path):
cfg_path = tmp_path / "warden.yaml"
custom_state = tmp_path / "my-state"
write_yaml(cfg_path, {
"backend": "local",
"ca_key": str(tmp_path / "ca"),
"state_dir": str(custom_state),
})
cfg = load_config(cfg_path)
assert cfg.state_dir == custom_state
def test_default_vault_token_env(tmp_path):
cfg_path = tmp_path / "warden.yaml"
write_yaml(cfg_path, {
"backend": "vault",
"vault": {"addr": "https://vault.example.com"},
})
cfg = load_config(cfg_path)
assert cfg.vault.token_env == "VAULT_TOKEN"

87
tests/test_inventory.py Normal file
View File

@@ -0,0 +1,87 @@
"""Tests for warden.inventory."""
from pathlib import Path
import pytest
from warden.inventory import (
ActorEntry,
InventoryError,
PrincipalsInventory,
load_inventory,
save_inventory,
)
from warden.models import ActorType
def test_empty_inventory_on_missing_file(tmp_path):
inv = load_inventory(tmp_path / "nonexistent.yaml")
assert inv.actors == {}
assert inv.hosts == {}
def test_roundtrip(tmp_path):
inv = PrincipalsInventory()
inv.actors["agt-test"] = ActorEntry(
name="agt-test",
actor_type=ActorType.AGT,
principals=["agt-task-test"],
ttl_hours=24,
description="test actor",
)
path = tmp_path / "inventory.yaml"
save_inventory(inv, path)
loaded = load_inventory(path)
assert "agt-test" in loaded.actors
entry = loaded.actors["agt-test"]
assert entry.actor_type == ActorType.AGT
assert entry.principals == ["agt-task-test"]
assert entry.ttl_hours == 24
assert entry.description == "test actor"
def test_roundtrip_multiple_actors(tmp_path):
inv = PrincipalsInventory()
inv.actors["adm-bernd"] = ActorEntry("adm-bernd", ActorType.ADM, ["adm-full"], 48)
inv.actors["atm-backup"] = ActorEntry("atm-backup", ActorType.ATM, ["atm-backup-daily"], 8)
path = tmp_path / "inventory.yaml"
save_inventory(inv, path)
loaded = load_inventory(path)
assert set(loaded.actors) == {"adm-bernd", "atm-backup"}
assert loaded.actors["adm-bernd"].actor_type == ActorType.ADM
def test_invalid_actor_type_raises(tmp_path):
path = tmp_path / "inventory.yaml"
path.write_text("actors:\n agt-test:\n type: bogus\n principals: []\n")
with pytest.raises(InventoryError, match="invalid type"):
load_inventory(path)
def test_actor_name_prefix_violation_raises(tmp_path):
path = tmp_path / "inventory.yaml"
path.write_text("actors:\n wrong-name:\n type: agt\n principals: [x]\n")
with pytest.raises(InventoryError):
load_inventory(path)
def test_default_principal_is_actor_name(tmp_path):
path = tmp_path / "inventory.yaml"
path.write_text("actors:\n agt-bridge:\n type: agt\n")
inv = load_inventory(path)
assert inv.actors["agt-bridge"].principals == ["agt-bridge"]
def test_default_ttl_applied(tmp_path):
path = tmp_path / "inventory.yaml"
path.write_text("actors:\n atm-cron:\n type: atm\n principals: [atm-cron]\n")
inv = load_inventory(path)
assert inv.actors["atm-cron"].ttl_hours == 8 # DEFAULT_TTL_HOURS[ATM]
def test_invalid_yaml_raises(tmp_path):
path = tmp_path / "inventory.yaml"
path.write_text(": : : invalid yaml :::")
with pytest.raises(InventoryError, match="Invalid YAML"):
load_inventory(path)

67
tests/test_models.py Normal file
View File

@@ -0,0 +1,67 @@
"""Tests for warden.models."""
from pathlib import Path
import pytest
from warden.models import (
ACTOR_PREFIX,
DEFAULT_TTL_HOURS,
ActorType,
CertSpec,
validate_actor_name,
)
def test_default_ttl_per_type():
assert DEFAULT_TTL_HOURS[ActorType.ADM] == 48
assert DEFAULT_TTL_HOURS[ActorType.AGT] == 24
assert DEFAULT_TTL_HOURS[ActorType.ATM] == 8
def test_actor_prefix_map():
assert ACTOR_PREFIX[ActorType.ADM] == "adm-"
assert ACTOR_PREFIX[ActorType.AGT] == "agt-"
assert ACTOR_PREFIX[ActorType.ATM] == "atm-"
@pytest.mark.parametrize("name,actor_type", [
("adm-bernd", ActorType.ADM),
("agt-incident-resolver-v2", ActorType.AGT),
("atm-backup-daily", ActorType.ATM),
])
def test_validate_actor_name_valid(name, actor_type):
validate_actor_name(name, actor_type) # should not raise
@pytest.mark.parametrize("name,actor_type", [
("bernd", ActorType.ADM),
("automation-backup", ActorType.ATM),
("agt-bridge", ActorType.ADM), # wrong type for prefix
("atm-backup", ActorType.AGT),
])
def test_validate_actor_name_invalid(name, actor_type):
with pytest.raises(ValueError, match="must start with"):
validate_actor_name(name, actor_type)
def test_certspec_default_identity():
spec = CertSpec(
actor_name="agt-test",
actor_type=ActorType.AGT,
pubkey_path=Path("/tmp/key.pub"),
ttl_hours=24,
principals=["agt-task-bridge"],
)
assert spec.identity == "agt-test"
def test_certspec_explicit_identity():
spec = CertSpec(
actor_name="agt-test",
actor_type=ActorType.AGT,
pubkey_path=Path("/tmp/key.pub"),
ttl_hours=24,
principals=["agt-task-bridge"],
identity="custom-identity",
)
assert spec.identity == "custom-identity"

100
tests/test_scorecard.py Normal file
View File

@@ -0,0 +1,100 @@
"""Tests for warden.scorecard."""
from pathlib import Path
import pytest
from warden.inventory import ActorEntry, PrincipalsInventory
from warden.models import ActorType
from warden.scorecard import (
check_actor_name_prefixes,
check_all_actors_have_principals,
check_no_stale_certs,
check_no_expired_certs,
run_scorecard,
)
def make_inventory(*actors):
inv = PrincipalsInventory()
for name, atype, principals in actors:
inv.actors[name] = ActorEntry(
name=name, actor_type=atype, principals=principals, ttl_hours=24
)
return inv
# ---------------------------------------------------------------------------
# check_actor_name_prefixes
# ---------------------------------------------------------------------------
def test_prefix_check_pass():
inv = make_inventory(
("adm-bernd", ActorType.ADM, ["adm-full"]),
("agt-bridge", ActorType.AGT, ["agt-task-bridge"]),
("atm-cron", ActorType.ATM, ["atm-cron"]),
)
result = check_actor_name_prefixes(inv)
assert result.passed
def test_prefix_check_fail_bad_name():
# Bypass validate_actor_name by inserting directly
inv = PrincipalsInventory()
inv.actors["bad-name"] = ActorEntry(
name="bad-name", actor_type=ActorType.AGT, principals=["x"], ttl_hours=24
)
result = check_actor_name_prefixes(inv)
assert not result.passed
assert "bad-name" in result.detail
# ---------------------------------------------------------------------------
# check_all_actors_have_principals
# ---------------------------------------------------------------------------
def test_principals_check_pass():
inv = make_inventory(("agt-bridge", ActorType.AGT, ["agt-task-bridge"]))
result = check_all_actors_have_principals(inv)
assert result.passed
def test_principals_check_fail_empty():
inv = PrincipalsInventory()
inv.actors["agt-bridge"] = ActorEntry(
name="agt-bridge", actor_type=ActorType.AGT, principals=[], ttl_hours=24
)
result = check_all_actors_have_principals(inv)
assert not result.passed
assert "agt-bridge" in result.detail
# ---------------------------------------------------------------------------
# check_no_stale_certs
# ---------------------------------------------------------------------------
def test_no_stale_certs_nonexistent_dir():
result = check_no_stale_certs(Path("/nonexistent/state/dir"))
assert result.passed
def test_no_stale_certs_empty_dir(tmp_path):
result = check_no_stale_certs(tmp_path)
assert result.passed
def test_no_expired_certs_empty_dir(tmp_path):
result = check_no_expired_certs(tmp_path)
assert result.passed
# ---------------------------------------------------------------------------
# run_scorecard
# ---------------------------------------------------------------------------
def test_run_scorecard_clean(tmp_path):
inv = make_inventory(
("agt-bridge", ActorType.AGT, ["agt-task-bridge"]),
)
results = run_scorecard(tmp_path, inv)
assert all(r.passed for r in results)
assert len(results) == 4