Files
ops-warden/scripts/check_tunnel_cert_readiness.py
tegwick 8bbd22285e feat(WARDEN-WP-0016): ops-bridge cert_command readiness gate + handoff
Close ops-warden's side of the last Partial INTENT criterion (ops-bridge integrates
via a stable cert_command). The migration playbook and contract already existed; what
was missing was an automated readiness gate before touching tunnel config.

T1 — scripts/check_tunnel_cert_readiness.py: read-only preflight that asserts the
cert_command path is ready without signing — config/backend, actor inventory + TTL
within type max, pubkey exists/parses/not-private, principals present, and optional
host-principal deployment (mirrors check_principals_drift). Exit 0/1/2.

T2 — opt-in --sign-smoke: runs the cert_command against the local backend and validates
identity/principals/TTL of the emitted cert; refuses a vault backend. Window measured
from the cert's own valid_from->valid_before so it's timezone-robust (fixes a CEST
off-by-2h artifact). integration-marked test + a vault-refusal unit test.

T3 — playbook now leads with Step 0 readiness gate; ops-bridge handoff message sent.
T4 — SCOPE INTENT row: Partial -> Pilot-ready; known-gaps + SSH-lane list updated.

9 unit + 1 integration test, 209 default passing, lint clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-27 19:50:28 +02:00

244 lines
9.8 KiB
Python

#!/usr/bin/env python3
"""Read-only readiness gate for an ops-bridge cert_command pilot (WARDEN-WP-0016 T1).
Before an operator migrates a tunnel from a static SSH key to a warden-signed
certificate (see ``wiki/playbooks/ops-bridge-tunnel-cert.md``), this script asserts the
**ops-warden side is ready** — *without signing anything*:
* warden.yaml loads and names a known backend (local | vault),
* the actor exists in the inventory with a valid type and resolvable TTL,
* the public key file exists and is structurally a public key (no private key),
* the actor has at least one principal,
* (optional) the actor's principals are deployed in railiance-infra's
``ssh_principals.yaml`` (mirrors ``scripts/check_principals_drift.py``).
Exit 0 = ready, 1 = not ready (a check failed), 2 = bad input (missing/invalid files).
It never signs, reads a private key, or prints a secret. The actual cert_command smoke
is the opt-in ``--sign-smoke`` step (WP-0016 T2), kept separate because it issues a cert.
Usage:
python scripts/check_tunnel_cert_readiness.py \\
--actor agt-state-hub-bridge \\
--pubkey ~/.ssh/agt-state-hub-bridge_ed25519.pub \\
--config ~/.config/warden/warden.yaml \\
[--infra ~/railiance-infra/ansible/inventory/ssh_principals.yaml]
"""
from __future__ import annotations
import argparse
import sys
from pathlib import Path
from typing import Any, List, Optional, Tuple
_SRC = Path(__file__).resolve().parent.parent / "src"
if _SRC.is_dir() and str(_SRC) not in sys.path:
sys.path.insert(0, str(_SRC))
import yaml # noqa: E402
from warden.config import ConfigError, WardenConfig, load_config # noqa: E402
from warden.inventory import ActorEntry, InventoryError, load_inventory # noqa: E402
from warden.models import MAX_TTL_HOURS, CertSpec # noqa: E402
# A check result: status in {"ok", "fail", "skip"}, a short label, and a detail line.
Check = Tuple[str, str, str]
# Public-key prefixes we accept for a cert_command pubkey (never a private key).
_PUBKEY_PREFIXES = ("ssh-ed25519 ", "ssh-rsa ", "ecdsa-sha2-", "sk-ssh-", "ssh-dss ")
def build_cert_command(actor: str, pubkey: Path) -> str:
"""The cert_command an ops-bridge tunnel config would carry for this actor."""
return f"warden sign {actor} --pubkey {pubkey}"
def check_pubkey(pubkey: Path) -> Check:
if not pubkey.exists():
return ("fail", "public key", f"{pubkey} does not exist")
text = pubkey.read_text(errors="replace").strip()
if "PRIVATE KEY" in text:
return ("fail", "public key", f"{pubkey} looks like a PRIVATE key — use the .pub")
if not text.startswith(_PUBKEY_PREFIXES):
return ("fail", "public key", f"{pubkey} is not a recognized SSH public key")
return ("ok", "public key", f"{pubkey} ({text.split()[0]})")
def check_actor(inventory_actors: dict, actor: str) -> Tuple[Check, Optional[ActorEntry]]:
entry = inventory_actors.get(actor)
if entry is None:
return (("fail", "inventory", f"actor {actor!r} not in inventory"), None)
max_ttl = MAX_TTL_HOURS.get(entry.actor_type)
if not entry.ttl_hours or entry.ttl_hours <= 0:
return (("fail", "inventory", f"actor {actor!r} has no resolvable TTL"), entry)
if max_ttl and entry.ttl_hours > max_ttl:
return (
("fail", "inventory", f"actor {actor!r} TTL {entry.ttl_hours}h exceeds "
f"{entry.actor_type.value} max {max_ttl}h"),
entry,
)
return (
("ok", "inventory", f"{actor} type={entry.actor_type.value} ttl={entry.ttl_hours}h"),
entry,
)
def check_principals(entry: ActorEntry) -> Check:
if not entry.principals:
return ("fail", "principals", f"actor {entry.name!r} has no principals")
return ("ok", "principals", ", ".join(entry.principals))
def _infra_principals(infra: dict[str, Any]) -> set[str]:
# Mirrors scripts/check_principals_drift.py._infra_principals.
principals: set[str] = set()
for host_data in (infra.get("ssh_principals") or {}).values():
for user_principals in (host_data.get("users") or {}).values():
principals.update(user_principals)
return principals
def check_infra_principal(entry: ActorEntry, infra_path: Optional[Path]) -> Check:
if infra_path is None:
return ("skip", "infra principals", "no --infra given (host-side check skipped)")
if not infra_path.exists():
return ("fail", "infra principals", f"{infra_path} not found")
infra = yaml.safe_load(infra_path.read_text()) or {}
deployed = _infra_principals(infra)
missing = [p for p in entry.principals if p not in deployed]
if missing:
return (
"fail",
"infra principals",
f"not deployed in {infra_path.name}: {', '.join(missing)}",
)
return ("ok", "infra principals", f"all deployed in {infra_path.name}")
def run_checks(
cfg: WardenConfig,
actor: str,
pubkey: Path,
infra_path: Optional[Path],
) -> List[Check]:
"""Run every readiness check and return the result list (pure; no signing)."""
checks: List[Check] = [
("ok", "config", f"backend={cfg.backend}, inventory={cfg.inventory_path}")
]
inventory = load_inventory(cfg.inventory_path)
actor_check, entry = check_actor(inventory.actors, actor)
checks.append(actor_check)
checks.append(check_pubkey(pubkey))
if entry is not None:
checks.append(check_principals(entry))
checks.append(check_infra_principal(entry, infra_path))
return checks
def sign_smoke(cfg: WardenConfig, actor: str, pubkey: Path) -> List[Check]:
"""Opt-in cert_command contract smoke against the LOCAL backend (WP-0016 T2).
Actually runs the cert_command (issues a short-lived local cert) and validates the
emitted certificate: identity matches the actor, principals match inventory, and the
validity window is within the actor type's max TTL. Requires ``ssh-keygen`` and a
local backend — it must not touch production OpenBao. Raises on misuse.
"""
from warden.ca import CAError, LocalCA, parse_cert_metadata
if cfg.backend != "local":
raise ValueError(
f"--sign-smoke runs offline against the local backend, but config backend is "
f"{cfg.backend!r}. Point --config at a local warden.yaml for the smoke."
)
inventory = load_inventory(cfg.inventory_path)
entry = inventory.actors.get(actor)
if entry is None:
return [("fail", "sign smoke", f"actor {actor!r} not in inventory")]
spec = CertSpec(
actor_name=actor,
actor_type=entry.actor_type,
pubkey_path=pubkey,
ttl_hours=entry.ttl_hours,
principals=entry.principals,
identity=actor,
)
try:
record = LocalCA(cfg.ca_key, cfg.state_dir).sign(spec)
except CAError as e:
return [("fail", "sign smoke", f"signing failed: {e}")]
checks: List[Check] = []
if record.identity == actor:
checks.append(("ok", "cert identity", record.identity))
else:
checks.append(("fail", "cert identity", f"{record.identity!r} != {actor!r}"))
if set(record.principals) == set(entry.principals):
checks.append(("ok", "cert principals", ", ".join(record.principals)))
else:
checks.append(
("fail", "cert principals", f"{record.principals} != inventory {entry.principals}")
)
# Measure the validity window from the cert's own from→to so it is independent of
# how ssh-keygen renders the timezone (parse_cert_metadata reads both the same way).
max_ttl = MAX_TTL_HOURS.get(entry.actor_type)
meta = parse_cert_metadata(record.cert_path)
valid_from = meta.get("valid_from")
if valid_from is None:
window_h = (record.valid_before - record.signed_at).total_seconds() / 3600
else:
window_h = (meta["valid_before"] - valid_from).total_seconds() / 3600
if max_ttl is None or window_h <= max_ttl + 0.1:
checks.append(("ok", "cert validity", f"~{window_h:.1f}h (max {max_ttl}h)"))
else:
checks.append(("fail", "cert validity", f"~{window_h:.1f}h exceeds max {max_ttl}h"))
return checks
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--actor", required=True)
parser.add_argument("--pubkey", type=Path, required=True)
parser.add_argument("--config", type=Path, default=None, help="warden.yaml (or WARDEN_CONFIG)")
parser.add_argument("--infra", type=Path, default=None, help="railiance-infra ssh_principals.yaml")
parser.add_argument(
"--sign-smoke",
action="store_true",
help="Also run the cert_command against the local backend and validate the cert (WP-0016 T2)",
)
args = parser.parse_args()
try:
cfg = load_config(args.config)
except ConfigError as e:
print(f"config error: {e}", file=sys.stderr)
return 2
pubkey = args.pubkey.expanduser()
try:
checks = run_checks(cfg, args.actor, pubkey, args.infra)
if args.sign_smoke:
checks += sign_smoke(cfg, args.actor, pubkey)
except (InventoryError, ValueError, yaml.YAMLError) as e:
print(f"input error: {e}", file=sys.stderr)
return 2
glyph = {"ok": "", "fail": "", "skip": "·"}
print(f"cert_command readiness — actor {args.actor!r}\n")
for status, label, detail in checks:
print(f" {glyph[status]} {label}: {detail}")
print(f"\n cert_command: {build_cert_command(args.actor, args.pubkey)}")
failed = [c for c in checks if c[0] == "fail"]
if failed:
print(f"\nNOT READY — {len(failed)} check(s) failed. See "
"wiki/playbooks/ops-bridge-tunnel-cert.md")
return 1
print("\nREADY — ops-warden side is set. Next: cert_command smoke (--sign-smoke), "
"then hand the cutover to ops-bridge.")
return 0
if __name__ == "__main__":
raise SystemExit(main())