#!/usr/bin/env python3 """Read-only readiness gate for an ops-bridge cert_command pilot (WARDEN-WP-0016 T1). Before an operator migrates a tunnel from a static SSH key to a warden-signed certificate (see ``wiki/playbooks/ops-bridge-tunnel-cert.md``), this script asserts the **ops-warden side is ready** — *without signing anything*: * warden.yaml loads and names a known backend (local | vault), * the actor exists in the inventory with a valid type and resolvable TTL, * the public key file exists and is structurally a public key (no private key), * the actor has at least one principal, * (optional) the actor's principals are deployed in railiance-infra's ``ssh_principals.yaml`` (mirrors ``scripts/check_principals_drift.py``). Exit 0 = ready, 1 = not ready (a check failed), 2 = bad input (missing/invalid files). It never signs, reads a private key, or prints a secret. The actual cert_command smoke is the opt-in ``--sign-smoke`` step (WP-0016 T2), kept separate because it issues a cert. Usage: python scripts/check_tunnel_cert_readiness.py \\ --actor agt-state-hub-bridge \\ --pubkey ~/.ssh/agt-state-hub-bridge_ed25519.pub \\ --config ~/.config/warden/warden.yaml \\ [--infra ~/railiance-infra/ansible/inventory/ssh_principals.yaml] """ from __future__ import annotations import argparse import sys from pathlib import Path from typing import Any, List, Optional, Tuple _SRC = Path(__file__).resolve().parent.parent / "src" if _SRC.is_dir() and str(_SRC) not in sys.path: sys.path.insert(0, str(_SRC)) import yaml # noqa: E402 from warden.config import ConfigError, WardenConfig, load_config # noqa: E402 from warden.inventory import ActorEntry, InventoryError, load_inventory # noqa: E402 from warden.models import MAX_TTL_HOURS, CertSpec # noqa: E402 # A check result: status in {"ok", "fail", "skip"}, a short label, and a detail line. Check = Tuple[str, str, str] # Public-key prefixes we accept for a cert_command pubkey (never a private key). _PUBKEY_PREFIXES = ("ssh-ed25519 ", "ssh-rsa ", "ecdsa-sha2-", "sk-ssh-", "ssh-dss ") def build_cert_command(actor: str, pubkey: Path) -> str: """The cert_command an ops-bridge tunnel config would carry for this actor.""" return f"warden sign {actor} --pubkey {pubkey}" def check_pubkey(pubkey: Path) -> Check: if not pubkey.exists(): return ("fail", "public key", f"{pubkey} does not exist") text = pubkey.read_text(errors="replace").strip() if "PRIVATE KEY" in text: return ("fail", "public key", f"{pubkey} looks like a PRIVATE key — use the .pub") if not text.startswith(_PUBKEY_PREFIXES): return ("fail", "public key", f"{pubkey} is not a recognized SSH public key") return ("ok", "public key", f"{pubkey} ({text.split()[0]})") def check_actor(inventory_actors: dict, actor: str) -> Tuple[Check, Optional[ActorEntry]]: entry = inventory_actors.get(actor) if entry is None: return (("fail", "inventory", f"actor {actor!r} not in inventory"), None) max_ttl = MAX_TTL_HOURS.get(entry.actor_type) if not entry.ttl_hours or entry.ttl_hours <= 0: return (("fail", "inventory", f"actor {actor!r} has no resolvable TTL"), entry) if max_ttl and entry.ttl_hours > max_ttl: return ( ("fail", "inventory", f"actor {actor!r} TTL {entry.ttl_hours}h exceeds " f"{entry.actor_type.value} max {max_ttl}h"), entry, ) return ( ("ok", "inventory", f"{actor} type={entry.actor_type.value} ttl={entry.ttl_hours}h"), entry, ) def check_principals(entry: ActorEntry) -> Check: if not entry.principals: return ("fail", "principals", f"actor {entry.name!r} has no principals") return ("ok", "principals", ", ".join(entry.principals)) def _infra_principals(infra: dict[str, Any]) -> set[str]: # Mirrors scripts/check_principals_drift.py._infra_principals. principals: set[str] = set() for host_data in (infra.get("ssh_principals") or {}).values(): for user_principals in (host_data.get("users") or {}).values(): principals.update(user_principals) return principals def check_infra_principal(entry: ActorEntry, infra_path: Optional[Path]) -> Check: if infra_path is None: return ("skip", "infra principals", "no --infra given (host-side check skipped)") if not infra_path.exists(): return ("fail", "infra principals", f"{infra_path} not found") infra = yaml.safe_load(infra_path.read_text()) or {} deployed = _infra_principals(infra) missing = [p for p in entry.principals if p not in deployed] if missing: return ( "fail", "infra principals", f"not deployed in {infra_path.name}: {', '.join(missing)}", ) return ("ok", "infra principals", f"all deployed in {infra_path.name}") def run_checks( cfg: WardenConfig, actor: str, pubkey: Path, infra_path: Optional[Path], ) -> List[Check]: """Run every readiness check and return the result list (pure; no signing).""" checks: List[Check] = [ ("ok", "config", f"backend={cfg.backend}, inventory={cfg.inventory_path}") ] inventory = load_inventory(cfg.inventory_path) actor_check, entry = check_actor(inventory.actors, actor) checks.append(actor_check) checks.append(check_pubkey(pubkey)) if entry is not None: checks.append(check_principals(entry)) checks.append(check_infra_principal(entry, infra_path)) return checks def sign_smoke(cfg: WardenConfig, actor: str, pubkey: Path) -> List[Check]: """Opt-in cert_command contract smoke against the LOCAL backend (WP-0016 T2). Actually runs the cert_command (issues a short-lived local cert) and validates the emitted certificate: identity matches the actor, principals match inventory, and the validity window is within the actor type's max TTL. Requires ``ssh-keygen`` and a local backend — it must not touch production OpenBao. Raises on misuse. """ from warden.ca import CAError, LocalCA, parse_cert_metadata if cfg.backend != "local": raise ValueError( f"--sign-smoke runs offline against the local backend, but config backend is " f"{cfg.backend!r}. Point --config at a local warden.yaml for the smoke." ) inventory = load_inventory(cfg.inventory_path) entry = inventory.actors.get(actor) if entry is None: return [("fail", "sign smoke", f"actor {actor!r} not in inventory")] spec = CertSpec( actor_name=actor, actor_type=entry.actor_type, pubkey_path=pubkey, ttl_hours=entry.ttl_hours, principals=entry.principals, identity=actor, ) try: record = LocalCA(cfg.ca_key, cfg.state_dir).sign(spec) except CAError as e: return [("fail", "sign smoke", f"signing failed: {e}")] checks: List[Check] = [] if record.identity == actor: checks.append(("ok", "cert identity", record.identity)) else: checks.append(("fail", "cert identity", f"{record.identity!r} != {actor!r}")) if set(record.principals) == set(entry.principals): checks.append(("ok", "cert principals", ", ".join(record.principals))) else: checks.append( ("fail", "cert principals", f"{record.principals} != inventory {entry.principals}") ) # Measure the validity window from the cert's own from→to so it is independent of # how ssh-keygen renders the timezone (parse_cert_metadata reads both the same way). max_ttl = MAX_TTL_HOURS.get(entry.actor_type) meta = parse_cert_metadata(record.cert_path) valid_from = meta.get("valid_from") if valid_from is None: window_h = (record.valid_before - record.signed_at).total_seconds() / 3600 else: window_h = (meta["valid_before"] - valid_from).total_seconds() / 3600 if max_ttl is None or window_h <= max_ttl + 0.1: checks.append(("ok", "cert validity", f"~{window_h:.1f}h (max {max_ttl}h)")) else: checks.append(("fail", "cert validity", f"~{window_h:.1f}h exceeds max {max_ttl}h")) return checks def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--actor", required=True) parser.add_argument("--pubkey", type=Path, required=True) parser.add_argument("--config", type=Path, default=None, help="warden.yaml (or WARDEN_CONFIG)") parser.add_argument("--infra", type=Path, default=None, help="railiance-infra ssh_principals.yaml") parser.add_argument( "--sign-smoke", action="store_true", help="Also run the cert_command against the local backend and validate the cert (WP-0016 T2)", ) args = parser.parse_args() try: cfg = load_config(args.config) except ConfigError as e: print(f"config error: {e}", file=sys.stderr) return 2 pubkey = args.pubkey.expanduser() try: checks = run_checks(cfg, args.actor, pubkey, args.infra) if args.sign_smoke: checks += sign_smoke(cfg, args.actor, pubkey) except (InventoryError, ValueError, yaml.YAMLError) as e: print(f"input error: {e}", file=sys.stderr) return 2 glyph = {"ok": "✓", "fail": "✗", "skip": "·"} print(f"cert_command readiness — actor {args.actor!r}\n") for status, label, detail in checks: print(f" {glyph[status]} {label}: {detail}") print(f"\n cert_command: {build_cert_command(args.actor, args.pubkey)}") failed = [c for c in checks if c[0] == "fail"] if failed: print(f"\nNOT READY — {len(failed)} check(s) failed. See " "wiki/playbooks/ops-bridge-tunnel-cert.md") return 1 print("\nREADY — ops-warden side is set. Next: cert_command smoke (--sign-smoke), " "then hand the cutover to ops-bridge.") return 0 if __name__ == "__main__": raise SystemExit(main())