#!/usr/bin/env python3 from __future__ import annotations import argparse import json import os import re import sys from datetime import datetime, timezone from pathlib import Path from typing import Any import yaml REPO_DIR = Path(__file__).resolve().parents[1] DEFAULT_CCR_DIR = REPO_DIR / "credential-change-requests" ALLOWED_STATUSES = { "draft", "proposed", "needs_changes", "approved", "denied", "apply_pending", "applied", "verified", "active", "deactivated", "rotated", "compromised", "superseded", "cancelled", } APPLY_ALLOWED_STATUSES = {"approved"} SECRET_MARKERS = [ "AGE-SECRET-KEY-1", "-----BEGIN PRIVATE KEY-----", "-----BEGIN OPENSSH PRIVATE KEY-----", "OPENBAO_ROOT_TOKEN=", "VAULT_TOKEN=", "BAO_TOKEN=", "hvb.", "hvc.", "hvs.", "npm_", "ghp_", "sk-", ] DISALLOWED_POLICY_NAMES = {"root", "platform-admin"} FRONTDOOR_READINESS = { "template", "pending-review", "approved-pending-apply", "applied-pending-verify", "ready", "disabled", "compromised", } SAFE_ID_RE = re.compile(r"^[A-Z0-9][A-Z0-9_.-]*$") TTL_RE = re.compile(r"^[1-9][0-9]*[smhd]$") def fail(message: str) -> None: raise SystemExit(f"ERROR: {message}") def utc_now() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat() def resolve_repo_path(path: str | Path) -> Path: p = Path(path).expanduser() if p.is_absolute(): return p return (REPO_DIR / p).resolve() def load_yaml(path: Path) -> dict[str, Any]: data = yaml.safe_load(path.read_text(encoding="utf-8")) if not isinstance(data, dict): fail(f"YAML root must be an object: {path}") return data def dump_yaml(path: Path, data: dict[str, Any]) -> None: path.write_text( yaml.safe_dump(data, sort_keys=False, allow_unicode=False), encoding="utf-8", ) def ccr_dir() -> Path: return resolve_repo_path(os.environ.get("CCR_DIR", str(DEFAULT_CCR_DIR))) def resolve_ccr(ref: str) -> Path: candidate = resolve_repo_path(ref) if candidate.exists(): return candidate matches = sorted(ccr_dir().glob(f"{ref}*.y*ml")) if len(matches) == 1: return matches[0] if len(matches) > 1: fail(f"CCR reference is ambiguous: {ref} -> {[m.name for m in matches]}") fail(f"CCR not found by path or id prefix: {ref}") def require_object(value: Any, field: str, errors: list[str]) -> dict[str, Any]: if not isinstance(value, dict): errors.append(f"{field} must be an object") return {} return value def require_list(value: Any, field: str, errors: list[str]) -> list[Any]: if not isinstance(value, list): errors.append(f"{field} must be a list") return [] return value def require_string(value: Any, field: str, errors: list[str]) -> str: if not isinstance(value, str) or not value.strip(): errors.append(f"{field} must be a non-empty string") return "" return value.strip() def scan_for_secrets(path: Path, errors: list[str]) -> None: text = path.read_text(encoding="utf-8") for marker in SECRET_MARKERS: if marker in text: errors.append(f"{path.name} contains rejected secret marker {marker!r}") def validate_workload_kv_read(ccr: dict[str, Any], errors: list[str], warnings: list[str]) -> None: target = require_object(ccr.get("target"), "target", errors) for field in ("domain", "tenant", "workload", "environment", "purpose"): require_string(target.get(field), f"target.{field}", errors) openbao = require_object(ccr.get("openbao"), "openbao", errors) mount = require_string(openbao.get("mount"), "openbao.mount", errors) kv_path = require_string(openbao.get("kv_path"), "openbao.kv_path", errors) policy_name = require_string( openbao.get("policy_name"), "openbao.policy_name", errors ) policy_file = require_string( openbao.get("policy_file"), "openbao.policy_file", errors ) fields = [str(field) for field in require_list(openbao.get("fields"), "openbao.fields", errors)] if not fields: errors.append("openbao.fields must contain at least one field") if mount and kv_path and not kv_path.startswith(f"{mount}/"): errors.append("openbao.kv_path must start with the declared mount") if any(fragment in kv_path for fragment in ("*", "..")): errors.append("openbao.kv_path must not contain '*' or '..'") if policy_name in DISALLOWED_POLICY_NAMES: errors.append(f"openbao.policy_name is disallowed: {policy_name}") if policy_file: resolved_policy = resolve_repo_path(policy_file) if not resolved_policy.exists(): errors.append(f"openbao.policy_file does not exist: {policy_file}") auth = require_object(openbao.get("auth"), "openbao.auth", errors) method = require_string(auth.get("method"), "openbao.auth.method", errors) if method not in {"oidc", "kubernetes"}: errors.append("openbao.auth.method must be oidc or kubernetes") require_string(auth.get("mount"), "openbao.auth.mount", errors) require_string(auth.get("role"), "openbao.auth.role", errors) policies = [str(policy) for policy in require_list(auth.get("policies"), "openbao.auth.policies", errors)] if policies != [policy_name]: errors.append("openbao.auth.policies must contain exactly openbao.policy_name") for policy in policies: if policy in DISALLOWED_POLICY_NAMES: errors.append(f"openbao.auth.policies contains disallowed policy {policy}") ttl = auth.get("ttl") if ttl is not None and (not isinstance(ttl, str) or not TTL_RE.match(ttl)): errors.append("openbao.auth.ttl must match ") bound_claims = require_object( auth.get("bound_claims"), "openbao.auth.bound_claims", errors ) if not bound_claims: errors.append("openbao.auth.bound_claims must not be empty") if auth.get("bound_claims_confirmed") is not True: warnings.append("OIDC/Kubernetes bound claim is not confirmed; apply is blocked") frontdoor = require_object(ccr.get("access_frontdoor"), "access_frontdoor", errors) require_string(frontdoor.get("type"), "access_frontdoor.type", errors) require_string(frontdoor.get("catalog_id"), "access_frontdoor.catalog_id", errors) readiness = require_string(frontdoor.get("readiness"), "access_frontdoor.readiness", errors) if readiness and readiness not in FRONTDOOR_READINESS: errors.append( f"access_frontdoor.readiness must be one of {sorted(FRONTDOOR_READINESS)}" ) resolvable = frontdoor.get("resolvable") if not isinstance(resolvable, bool): errors.append("access_frontdoor.resolvable must be a boolean") if resolvable is True and ccr.get("status") != "active": errors.append("access_frontdoor.resolvable=true requires status active") command = frontdoor.get("command") if command is not None and not isinstance(command, str): errors.append("access_frontdoor.command must be a string when present") risk = require_object(ccr.get("risk"), "risk", errors) require_string(risk.get("classification"), "risk.classification", errors) require_list(risk.get("notes"), "risk.notes", errors) verification = require_object(ccr.get("verification"), "verification", errors) for field in ("positive", "negative", "activation_conditions"): values = require_list(verification.get(field), f"verification.{field}", errors) if not values: errors.append(f"verification.{field} must not be empty") lifecycle = require_object(ccr.get("lifecycle"), "lifecycle", errors) for field in ("deactivate", "rotate", "compromised"): require_string(lifecycle.get(field), f"lifecycle.{field}", errors) def validate_ccr(path: Path) -> tuple[dict[str, Any], list[str], list[str]]: errors: list[str] = [] warnings: list[str] = [] scan_for_secrets(path, errors) ccr = load_yaml(path) for field in ( "id", "kind", "schema_version", "request_type", "title", "status", "created", "updated", "requester", ): if field == "schema_version": if ccr.get(field) != 1: errors.append("schema_version must be 1") elif field == "requester": require_object(ccr.get(field), field, errors) else: require_string(ccr.get(field), field, errors) if ccr.get("kind") != "credential-change-request": errors.append("kind must be credential-change-request") ccr_id = ccr.get("id") if isinstance(ccr_id, str) and not SAFE_ID_RE.match(ccr_id): errors.append("id must contain only uppercase letters, digits, dot, dash, or underscore") status = ccr.get("status") if isinstance(status, str) and status not in ALLOWED_STATUSES: errors.append(f"status must be one of {sorted(ALLOWED_STATUSES)}") request_type = ccr.get("request_type") if request_type != "workload-kv-read": errors.append("request_type must be workload-kv-read") else: validate_workload_kv_read(ccr, errors, warnings) return ccr, errors, warnings def render_summary(ccr: dict[str, Any], warnings: list[str]) -> str: openbao = ccr["openbao"] auth = openbao["auth"] frontdoor = ccr["access_frontdoor"] risk = ccr["risk"] verification = ccr["verification"] fields = ", ".join(openbao["fields"]) claim_bits = ", ".join( f"{key}={value}" for key, value in auth.get("bound_claims", {}).items() ) lines = [ f"Request: {ccr['title']}", f"CCR: {ccr['id']} ({ccr['status']})", f"Type: {ccr['request_type']}", f"Target: {ccr['target']['tenant']}/{ccr['target']['workload']} ({ccr['target']['environment']})", "Mount/path/field:", f" {openbao['kv_path']}", f" {fields}", "Policy:", f" {openbao['policy_name']}", "Auth binding:", f" {auth['mount']} {auth['method']} role {auth['role']}", f" bound claims: {claim_bits}", f" confirmed: {auth.get('bound_claims_confirmed') is True}", "Access front door:", f" {frontdoor['type']} {frontdoor['catalog_id']}", f" readiness: {frontdoor.get('readiness')} resolvable={frontdoor.get('resolvable') is True}", ] if frontdoor.get("command"): lines.append(f" command: {frontdoor['command']}") lines.append(f"Risk: {risk['classification']}") for note in risk.get("notes", []): lines.append(f" - {note}") lines.append("Checks:") for check in verification.get("positive", []): lines.append(f" + {check}") for check in verification.get("negative", []): lines.append(f" - {check}") if warnings: lines.append("Warnings:") for warning in warnings: lines.append(f" ! {warning}") lines.extend( [ "Decision:", " approve | deny | needs_changes", "Comment:", " free text; do not include secret values", ] ) return "\n".join(lines) def generated_policy_hcl(ccr: dict[str, Any]) -> str: openbao = ccr["openbao"] mount = openbao["mount"] suffix = openbao["kv_path"][len(mount) + 1 :] return ( f'path "{mount}/data/{suffix}" {{\n' ' capabilities = ["read"]\n' "}\n\n" f'path "{mount}/metadata/{suffix}" {{\n' ' capabilities = ["read"]\n' "}\n" ) def auth_payload(ccr: dict[str, Any]) -> dict[str, Any]: auth = ccr["openbao"]["auth"] if auth["method"] == "kubernetes": claims = auth["bound_claims"] return { "bound_service_account_names": claims.get("service_account_names", []), "bound_service_account_namespaces": claims.get( "service_account_namespaces", [] ), "policies": ",".join(auth["policies"]), "ttl": auth.get("ttl", "15m"), } payload: dict[str, Any] = { "role_type": "oidc", "user_claim": auth.get("user_claim", "sub"), "policies": ",".join(auth["policies"]), "ttl": auth.get("ttl", "15m"), "bound_claims": auth["bound_claims"], } if auth.get("groups_claim"): payload["groups_claim"] = auth["groups_claim"] return payload def render_plan(ccr: dict[str, Any]) -> str: openbao = ccr["openbao"] auth = openbao["auth"] payload = auth_payload(ccr) lines = [ f"CCR {ccr['id']} apply plan", "", "1. Write policy HCL:", f" policy: {openbao['policy_name']}", f" source: {openbao['policy_file']}", "", generated_policy_hcl(ccr).rstrip(), "", "2. Create/update auth role payload:", f" path: auth/{auth['mount']}/role/{auth['role']}", json.dumps(payload, indent=2, sort_keys=True), "", "3. Provision secret value out-of-band:", f" path: {openbao['kv_path']}", f" fields: {', '.join(openbao['fields'])}", "", "4. Verify positive and negative access without printing secret values.", ] return "\n".join(lines) def validate_or_exit(path: Path) -> tuple[dict[str, Any], list[str]]: ccr, errors, warnings = validate_ccr(path) for warning in warnings: print(f"[WARN] {path.name}: {warning}", file=sys.stderr) if errors: for error in errors: print(f"[FAIL] {path.name}: {error}", file=sys.stderr) raise SystemExit(1) return ccr, warnings def apply_blockers(ccr: dict[str, Any]) -> list[str]: blockers: list[str] = [] if ccr.get("status") not in APPLY_ALLOWED_STATUSES: blockers.append(f"apply requires status approved, got {ccr.get('status')}") if ccr["openbao"]["auth"].get("bound_claims_confirmed") is not True: blockers.append("apply requires confirmed OpenBao auth binding") return blockers def frontdoor_blockers(ccr: dict[str, Any]) -> list[str]: frontdoor = ccr["access_frontdoor"] blockers: list[str] = [] if ccr.get("status") != "active": blockers.append(f"front door requires CCR status active, got {ccr.get('status')}") if frontdoor.get("readiness") != "ready": blockers.append( f"front door readiness must be ready, got {frontdoor.get('readiness')}" ) if frontdoor.get("resolvable") is not True: blockers.append("front door is marked resolvable=false") return blockers def status_payload(ccr: dict[str, Any], warnings: list[str]) -> dict[str, Any]: apply_blocked_by = apply_blockers(ccr) frontdoor_blocked_by = frontdoor_blockers(ccr) frontdoor = ccr["access_frontdoor"] openbao = ccr["openbao"] auth = openbao["auth"] return { "id": ccr["id"], "title": ccr["title"], "status": ccr["status"], "request_type": ccr["request_type"], "apply_allowed": not apply_blocked_by, "apply_blockers": apply_blocked_by, "frontdoor_resolvable": not frontdoor_blocked_by, "frontdoor_blockers": frontdoor_blocked_by, "warnings": warnings, "openbao": { "mount": openbao["mount"], "kv_path": openbao["kv_path"], "fields": openbao["fields"], "policy_name": openbao["policy_name"], "auth_mount": auth["mount"], "auth_method": auth["method"], "auth_role": auth["role"], "bound_claims_confirmed": auth.get("bound_claims_confirmed") is True, }, "access_frontdoor": { "type": frontdoor["type"], "catalog_id": frontdoor["catalog_id"], "readiness": frontdoor.get("readiness"), "resolvable": frontdoor.get("resolvable") is True, "command": frontdoor.get("command"), }, } def render_status(payload: dict[str, Any]) -> str: lines = [ f"CCR: {payload['id']} ({payload['status']})", f"Catalog: {payload['access_frontdoor']['catalog_id']}", f"Readiness: {payload['access_frontdoor']['readiness']}", f"Resolvable: {payload['frontdoor_resolvable']}", f"Apply allowed: {payload['apply_allowed']}", ] command = payload["access_frontdoor"].get("command") if command: lines.append(f"Command: {command}") if payload["apply_blockers"]: lines.append("Apply blockers:") for blocker in payload["apply_blockers"]: lines.append(f" - {blocker}") if payload["frontdoor_blockers"]: lines.append("Front-door blockers:") for blocker in payload["frontdoor_blockers"]: lines.append(f" - {blocker}") if payload["warnings"]: lines.append("Warnings:") for warning in payload["warnings"]: lines.append(f" - {warning}") return "\n".join(lines) def append_decision(path: Path, status: str, reviewer: str, comment: str) -> None: ccr, _warnings = validate_or_exit(path) review = ccr.setdefault("review", {}) comments = review.setdefault("comments", []) if not isinstance(comments, list): fail("review.comments must be a list") comments.append( { "at": utc_now(), "reviewer": reviewer, "decision": status, "comment": comment, } ) ccr["status"] = status ccr["updated"] = datetime.now(timezone.utc).date().isoformat() dump_yaml(path, ccr) def command_validate(args: argparse.Namespace) -> int: refs = args.refs or [str(path) for path in sorted(ccr_dir().glob("*.y*ml"))] if not refs: fail(f"no CCR files found in {ccr_dir()}") ok = True for ref in refs: path = resolve_ccr(ref) _ccr, errors, warnings = validate_ccr(path) for warning in warnings: print(f"[WARN] {path.name}: {warning}", file=sys.stderr) if errors: ok = False for error in errors: print(f"[FAIL] {path.name}: {error}", file=sys.stderr) else: print(f"[OK] {path.name}") return 0 if ok else 1 def command_render(args: argparse.Namespace) -> int: path = resolve_ccr(args.ref) ccr, warnings = validate_or_exit(path) print(render_summary(ccr, warnings)) return 0 def command_plan(args: argparse.Namespace) -> int: path = resolve_ccr(args.ref) ccr, _warnings = validate_or_exit(path) print(render_plan(ccr)) return 0 def command_status(args: argparse.Namespace) -> int: path = resolve_ccr(args.ref) ccr, errors, warnings = validate_ccr(path) if errors: for error in errors: print(f"[FAIL] {path.name}: {error}", file=sys.stderr) return 1 payload = status_payload(ccr, warnings) if args.json: print(json.dumps(payload, indent=2, sort_keys=True)) else: print(render_status(payload)) return 0 def command_apply_plan(args: argparse.Namespace) -> int: path = resolve_ccr(args.ref) ccr, _warnings = validate_or_exit(path) if ccr.get("status") not in APPLY_ALLOWED_STATUSES: fail(f"apply-plan requires status approved, got {ccr.get('status')}") auth = ccr["openbao"]["auth"] if auth.get("bound_claims_confirmed") is not True: fail("apply-plan requires openbao.auth.bound_claims_confirmed=true") print(render_plan(ccr)) return 0 def command_decision(args: argparse.Namespace, status: str) -> int: path = resolve_ccr(args.ref) append_decision(path, status, args.reviewer, args.comment) print(f"[OK] {path.name} -> {status}") return 0 def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Validate, render, and review non-secret credential change requests." ) sub = parser.add_subparsers(dest="command", required=True) validate = sub.add_parser("validate", help="Validate CCR files") validate.add_argument("refs", nargs="*") validate.set_defaults(func=command_validate) render = sub.add_parser("render", help="Render a human review summary") render.add_argument("ref") render.set_defaults(func=command_render) plan = sub.add_parser("plan", help="Render the generated apply plan for review") plan.add_argument("ref") plan.set_defaults(func=command_plan) status = sub.add_parser("status", help="Render machine-readable readiness status") status.add_argument("ref") status.add_argument("--json", action="store_true") status.set_defaults(func=command_status) apply_plan = sub.add_parser( "apply-plan", help="Render an operator apply plan only for approved CCRs" ) apply_plan.add_argument("ref") apply_plan.set_defaults(func=command_apply_plan) for name, status in ( ("approve", "approved"), ("deny", "denied"), ("needs-changes", "needs_changes"), ): decision = sub.add_parser(name, help=f"Record {status} decision") decision.add_argument("ref") decision.add_argument("--reviewer", required=True) decision.add_argument("--comment", required=True) decision.set_defaults(func=lambda args, status=status: command_decision(args, status)) return parser def main() -> int: parser = build_parser() args = parser.parse_args() return int(args.func(args)) if __name__ == "__main__": raise SystemExit(main())