"""VaultCA backend — HashiCorp Vault SSH engine.""" from __future__ import annotations import os import tempfile from datetime import datetime, timezone from pathlib import Path import httpx from warden.ca import CABackend, CAError, _append_signature_log, _enforce_ttl, _evict_cert, parse_cert_metadata from warden.config import VaultConfig from warden.models import CertRecord, CertSpec class VaultCA(CABackend): """CA backend that signs via HashiCorp Vault SSH secrets engine.""" def __init__(self, vault_cfg: VaultConfig, state_dir: Path) -> None: self._cfg = vault_cfg self._state_dir = Path(os.path.expanduser(str(state_dir))) def _token(self) -> str: token = os.environ.get(self._cfg.token_env, "") if not token: raise CAError( f"Vault token not found. Set the {self._cfg.token_env!r} " f"environment variable, or run: vault login" ) return token def sign(self, spec: CertSpec) -> CertRecord: """Sign the public key via Vault SSH engine. Returns a CertRecord.""" _enforce_ttl(spec) pubkey_path = Path(os.path.expanduser(str(spec.pubkey_path))) if not pubkey_path.exists(): raise CAError(f"Public key not found: {pubkey_path}") pubkey_text = pubkey_path.read_text().strip() role = self._cfg.role_map.get(spec.actor_type.value) if not role: raise CAError( f"No Vault role mapped for actor type {spec.actor_type.value!r}. " f"Add it to vault.role_map in warden.yaml." ) url = f"{self._cfg.addr}/v1/{self._cfg.mount}/sign/{role}" try: response = httpx.post( url, json={ "public_key": pubkey_text, "valid_principals": ",".join(spec.principals), "ttl": f"{spec.ttl_hours}h", "cert_type": "user", "key_id": spec.identity, }, headers={"X-Vault-Token": self._token()}, timeout=10.0, ) response.raise_for_status() except httpx.HTTPStatusError as e: raise CAError( f"Vault signing failed (HTTP {e.response.status_code}): " f"{e.response.text}" ) from e except httpx.RequestError as e: raise CAError( f"Vault unreachable at {self._cfg.addr}. " f"Is Vault running? Consider --backend local as a fallback.\n{e}" ) from e cert_text = response.json()["data"]["signed_key"].strip() self._state_dir.mkdir(parents=True, exist_ok=True) _evict_cert(spec.actor_name, self._state_dir) dest = self._state_dir / f"{spec.actor_name}-cert.pub" dest.write_text(cert_text + "\n") os.chmod(dest, 0o600) # Parse metadata by writing to a tempfile and running ssh-keygen -L with tempfile.NamedTemporaryFile( suffix="-cert.pub", mode="w", delete=False ) as f: f.write(cert_text + "\n") tmp_cert = Path(f.name) try: meta = parse_cert_metadata(tmp_cert) finally: tmp_cert.unlink(missing_ok=True) record = CertRecord( identity=meta["identity"] or spec.identity, valid_before=meta["valid_before"], cert_path=dest, signed_at=datetime.now(timezone.utc), principals=meta["principals"], actor_name=spec.actor_name, ) _append_signature_log(record, spec, self._state_dir, "vault") return record