#!/usr/bin/env python3 from __future__ import annotations import argparse import difflib import json import os import re import shlex import subprocess import sys import tempfile import urllib.error import urllib.request from datetime import datetime, timezone from pathlib import Path from typing import Any import yaml REPO_DIR = Path(__file__).resolve().parents[1] DEFAULT_CCR_DIR = REPO_DIR / "credential-change-requests" ALLOWED_STATUSES = { "draft", "proposed", "needs_changes", "approved", "denied", "apply_pending", "applied", "verified", "active", "deactivated", "rotated", "compromised", "superseded", "cancelled", } APPLY_ALLOWED_STATUSES = {"approved"} POST_APPLY_STATUSES = {"applied", "verified", "active"} SECRET_MARKERS = [ "AGE-SECRET-KEY-1", "-----BEGIN PRIVATE KEY-----", "-----BEGIN OPENSSH PRIVATE KEY-----", "OPENBAO_ROOT_TOKEN=", "VAULT_TOKEN=", "BAO_TOKEN=", "hvb.", "hvc.", "hvs.", "npm_", "ghp_", "sk-", ] DISALLOWED_POLICY_NAMES = {"root", "platform-admin"} FRONTDOOR_READINESS = { "template", "pending-review", "approved-pending-apply", "applied-pending-verify", "ready", "disabled", "compromised", } SAFE_ID_RE = re.compile(r"^[A-Z0-9][A-Z0-9_.-]*$") TTL_RE = re.compile(r"^[1-9][0-9]*[smhd]$") LOWER_SAFE_ID_RE = re.compile(r"^[a-z0-9][a-z0-9-]*$") FIELD_NAME_RE = re.compile(r"^[A-Z][A-Z0-9_]*$") APPLIER_DRY_RUN_ALLOWED_STATUSES = APPLY_ALLOWED_STATUSES | POST_APPLY_STATUSES APPLIER_ALLOWED_ENVIRONMENTS = { "build", "development", "test", "staging", "production", } WORKLOAD_KV_POLICY_PREFIX = "workload-kv-read-" OIDC_WORKLOAD_ROLE_SUFFIX = "-workload-kv-read" KUBERNETES_ROLE_SUFFIXES = ("-workload-kv-read", "-secrets-read") KUBERNETES_ROLE_PREFIXES = ("external-secrets-",) EVIDENCE_ID_RE = re.compile(r"^[a-z0-9][a-z0-9_-]*$") RUNBOOK_ALLOWED_STATUSES = APPLY_ALLOWED_STATUSES | POST_APPLY_STATUSES LIFECYCLE_ACTIONS = { "deactivate": { "status": "deactivated", "readiness": "disabled", "resolvable": False, "kind": "deactivation", }, "rotate": { "status": "rotated", "readiness": "applied-pending-verify", "resolvable": False, "kind": "rotation", }, "compromise": { "status": "compromised", "readiness": "compromised", "resolvable": False, "kind": "compromise", }, } def fail(message: str) -> None: raise SystemExit(f"ERROR: {message}") def utc_now() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat() def resolve_repo_path(path: str | Path) -> Path: p = Path(path).expanduser() if p.is_absolute(): return p return (REPO_DIR / p).resolve() def load_yaml(path: Path) -> dict[str, Any]: data = yaml.safe_load(path.read_text(encoding="utf-8")) if not isinstance(data, dict): fail(f"YAML root must be an object: {path}") return data def dump_yaml(path: Path, data: dict[str, Any]) -> None: path.write_text( yaml.safe_dump(data, sort_keys=False, allow_unicode=False), encoding="utf-8", ) def ccr_dir() -> Path: return resolve_repo_path(os.environ.get("CCR_DIR", str(DEFAULT_CCR_DIR))) def resolve_ccr(ref: str) -> Path: candidate = resolve_repo_path(ref) if candidate.exists(): return candidate matches = sorted(ccr_dir().glob(f"{ref}*.y*ml")) if len(matches) == 1: return matches[0] if len(matches) > 1: fail(f"CCR reference is ambiguous: {ref} -> {[m.name for m in matches]}") fail(f"CCR not found by path or id prefix: {ref}") def require_object(value: Any, field: str, errors: list[str]) -> dict[str, Any]: if not isinstance(value, dict): errors.append(f"{field} must be an object") return {} return value def require_list(value: Any, field: str, errors: list[str]) -> list[Any]: if not isinstance(value, list): errors.append(f"{field} must be a list") return [] return value def require_string(value: Any, field: str, errors: list[str]) -> str: if not isinstance(value, str) or not value.strip(): errors.append(f"{field} must be a non-empty string") return "" return value.strip() def contains_secret_marker(text: str, marker: str) -> bool: if marker == "sk-": return re.search(r"(? None: text = path.read_text(encoding="utf-8") for marker in SECRET_MARKERS: if contains_secret_marker(text, marker): errors.append(f"{path.name} contains rejected secret marker {marker!r}") def reject_secret_text(text: str, field: str) -> None: for marker in SECRET_MARKERS: if contains_secret_marker(text, marker): fail(f"{field} contains rejected secret marker {marker!r}") def validate_workload_kv_read(ccr: dict[str, Any], errors: list[str], warnings: list[str]) -> None: target = require_object(ccr.get("target"), "target", errors) for field in ("domain", "tenant", "workload", "environment", "purpose"): require_string(target.get(field), f"target.{field}", errors) openbao = require_object(ccr.get("openbao"), "openbao", errors) mount = require_string(openbao.get("mount"), "openbao.mount", errors) kv_path = require_string(openbao.get("kv_path"), "openbao.kv_path", errors) policy_name = require_string( openbao.get("policy_name"), "openbao.policy_name", errors ) policy_file = require_string( openbao.get("policy_file"), "openbao.policy_file", errors ) fields = [str(field) for field in require_list(openbao.get("fields"), "openbao.fields", errors)] if not fields: errors.append("openbao.fields must contain at least one field") if mount and kv_path and not kv_path.startswith(f"{mount}/"): errors.append("openbao.kv_path must start with the declared mount") if any(fragment in kv_path for fragment in ("*", "..")): errors.append("openbao.kv_path must not contain '*' or '..'") if policy_name in DISALLOWED_POLICY_NAMES: errors.append(f"openbao.policy_name is disallowed: {policy_name}") if policy_file: resolved_policy = resolve_repo_path(policy_file) if not resolved_policy.exists(): errors.append(f"openbao.policy_file does not exist: {policy_file}") auth = require_object(openbao.get("auth"), "openbao.auth", errors) method = require_string(auth.get("method"), "openbao.auth.method", errors) if method not in {"oidc", "kubernetes"}: errors.append("openbao.auth.method must be oidc or kubernetes") require_string(auth.get("mount"), "openbao.auth.mount", errors) require_string(auth.get("role"), "openbao.auth.role", errors) if method == "oidc": redirect_uris = require_list( auth.get("allowed_redirect_uris"), "openbao.auth.allowed_redirect_uris", errors, ) if not redirect_uris: errors.append("openbao.auth.allowed_redirect_uris must not be empty for oidc") for index, uri in enumerate(redirect_uris): if not isinstance(uri, str) or not uri.strip(): errors.append( f"openbao.auth.allowed_redirect_uris[{index}] must be a non-empty string" ) if auth.get("groups_claim"): oidc_scopes = require_list( auth.get("oidc_scopes"), "openbao.auth.oidc_scopes", errors ) if "groups" not in oidc_scopes: errors.append( "openbao.auth.oidc_scopes must include 'groups' when groups_claim is set" ) for index, scope in enumerate(oidc_scopes): if not isinstance(scope, str) or not scope.strip(): errors.append( f"openbao.auth.oidc_scopes[{index}] must be a non-empty string" ) policies = [str(policy) for policy in require_list(auth.get("policies"), "openbao.auth.policies", errors)] if policies != [policy_name]: errors.append("openbao.auth.policies must contain exactly openbao.policy_name") for policy in policies: if policy in DISALLOWED_POLICY_NAMES: errors.append(f"openbao.auth.policies contains disallowed policy {policy}") ttl = auth.get("ttl") if ttl is not None and (not isinstance(ttl, str) or not TTL_RE.match(ttl)): errors.append("openbao.auth.ttl must match ") bound_claims = require_object( auth.get("bound_claims"), "openbao.auth.bound_claims", errors ) if not bound_claims: errors.append("openbao.auth.bound_claims must not be empty") if auth.get("bound_claims_confirmed") is not True: warnings.append("OIDC/Kubernetes bound claim is not confirmed; apply is blocked") frontdoor = require_object(ccr.get("access_frontdoor"), "access_frontdoor", errors) require_string(frontdoor.get("type"), "access_frontdoor.type", errors) require_string(frontdoor.get("catalog_id"), "access_frontdoor.catalog_id", errors) readiness = require_string(frontdoor.get("readiness"), "access_frontdoor.readiness", errors) if readiness and readiness not in FRONTDOOR_READINESS: errors.append( f"access_frontdoor.readiness must be one of {sorted(FRONTDOOR_READINESS)}" ) resolvable = frontdoor.get("resolvable") if not isinstance(resolvable, bool): errors.append("access_frontdoor.resolvable must be a boolean") if resolvable is True and ccr.get("status") != "active": errors.append("access_frontdoor.resolvable=true requires status active") command = frontdoor.get("command") if command is not None and not isinstance(command, str): errors.append("access_frontdoor.command must be a string when present") risk = require_object(ccr.get("risk"), "risk", errors) require_string(risk.get("classification"), "risk.classification", errors) require_list(risk.get("notes"), "risk.notes", errors) verification = require_object(ccr.get("verification"), "verification", errors) for field in ("positive", "negative", "activation_conditions"): values = require_list(verification.get(field), f"verification.{field}", errors) if not values: errors.append(f"verification.{field} must not be empty") lifecycle = require_object(ccr.get("lifecycle"), "lifecycle", errors) for field in ("deactivate", "rotate", "compromised"): require_string(lifecycle.get(field), f"lifecycle.{field}", errors) def validate_ccr(path: Path) -> tuple[dict[str, Any], list[str], list[str]]: errors: list[str] = [] warnings: list[str] = [] scan_for_secrets(path, errors) ccr = load_yaml(path) for field in ( "id", "kind", "schema_version", "request_type", "title", "status", "created", "updated", "requester", ): if field == "schema_version": if ccr.get(field) != 1: errors.append("schema_version must be 1") elif field == "requester": require_object(ccr.get(field), field, errors) else: require_string(ccr.get(field), field, errors) if ccr.get("kind") != "credential-change-request": errors.append("kind must be credential-change-request") ccr_id = ccr.get("id") if isinstance(ccr_id, str) and not SAFE_ID_RE.match(ccr_id): errors.append("id must contain only uppercase letters, digits, dot, dash, or underscore") status = ccr.get("status") if isinstance(status, str) and status not in ALLOWED_STATUSES: errors.append(f"status must be one of {sorted(ALLOWED_STATUSES)}") request_type = ccr.get("request_type") if request_type != "workload-kv-read": errors.append("request_type must be workload-kv-read") else: validate_workload_kv_read(ccr, errors, warnings) return ccr, errors, warnings def render_summary(ccr: dict[str, Any], warnings: list[str]) -> str: openbao = ccr["openbao"] auth = openbao["auth"] frontdoor = ccr["access_frontdoor"] risk = ccr["risk"] verification = ccr["verification"] fields = ", ".join(openbao["fields"]) claim_bits = ", ".join( f"{key}={value}" for key, value in auth.get("bound_claims", {}).items() ) lines = [ f"Request: {ccr['title']}", f"CCR: {ccr['id']} ({ccr['status']})", f"Type: {ccr['request_type']}", f"Target: {ccr['target']['tenant']}/{ccr['target']['workload']} ({ccr['target']['environment']})", "Mount/path/field:", f" {openbao['kv_path']}", f" {fields}", "Policy:", f" {openbao['policy_name']}", "Auth binding:", f" {auth['mount']} {auth['method']} role {auth['role']}", f" bound claims: {claim_bits}", f" confirmed: {auth.get('bound_claims_confirmed') is True}", "Access front door:", f" {frontdoor['type']} {frontdoor['catalog_id']}", f" readiness: {frontdoor.get('readiness')} resolvable={frontdoor.get('resolvable') is True}", ] if frontdoor.get("command"): lines.append(f" command: {frontdoor['command']}") lines.append(f"Risk: {risk['classification']}") for note in risk.get("notes", []): lines.append(f" - {note}") lines.append("Checks:") for check in verification.get("positive", []): lines.append(f" + {check}") for check in verification.get("negative", []): lines.append(f" - {check}") if warnings: lines.append("Warnings:") for warning in warnings: lines.append(f" ! {warning}") lines.extend( [ "Decision:", " approve | deny | needs_changes", "Comment:", " free text; do not include secret values", ] ) return "\n".join(lines) def generated_policy_hcl(ccr: dict[str, Any]) -> str: openbao = ccr["openbao"] mount = openbao["mount"] suffix = openbao["kv_path"][len(mount) + 1 :] return ( f'path "{mount}/data/{suffix}" {{\n' ' capabilities = ["read"]\n' "}\n\n" f'path "{mount}/metadata/{suffix}" {{\n' ' capabilities = ["read"]\n' "}\n" ) def display_repo_path(path: Path) -> str: try: return str(path.resolve().relative_to(REPO_DIR)) except ValueError: return str(path) def policy_artifact_diff(ccr: dict[str, Any]) -> dict[str, Any]: policy_path = resolve_repo_path(ccr["openbao"]["policy_file"]) generated = generated_policy_hcl(ccr) generated_lines = generated.rstrip().splitlines() result: dict[str, Any] = { "path": display_repo_path(policy_path), "status": "missing", "matches": False, "diff": [], } if policy_path.exists(): source = policy_path.read_text(encoding="utf-8") source_lines = source.rstrip().splitlines() result["matches"] = normalized_policy_body(source) == normalized_policy_body( generated ) result["status"] = "matches" if result["matches"] else "differs" else: source_lines = [] if not result["matches"]: result["diff"] = list( difflib.unified_diff( source_lines, generated_lines, fromfile=result["path"], tofile=f"generated/{ccr['openbao']['policy_name']}.hcl", lineterm="", ) ) return result def render_policy_artifact_diff(ccr: dict[str, Any], indent: str = "") -> list[str]: artifact = policy_artifact_diff(ccr) lines = [ f"{indent}source artifact: {artifact['path']}", f"{indent}artifact status: {artifact['status']}", ] if artifact["matches"]: lines.append(f"{indent}diff: none; source artifact matches generated policy body") return lines lines.append(f"{indent}diff:") for line in artifact["diff"]: lines.append(f"{indent}{line}") return lines def auth_payload(ccr: dict[str, Any]) -> dict[str, Any]: auth = ccr["openbao"]["auth"] if auth["method"] == "kubernetes": claims = auth["bound_claims"] return { "bound_service_account_names": claims.get("service_account_names", []), "bound_service_account_namespaces": claims.get( "service_account_namespaces", [] ), "policies": ",".join(auth["policies"]), "ttl": auth.get("ttl", "15m"), } payload: dict[str, Any] = { "role_type": "oidc", "user_claim": auth.get("user_claim", "sub"), "policies": ",".join(auth["policies"]), "ttl": auth.get("ttl", "15m"), "bound_claims": auth["bound_claims"], } if auth.get("groups_claim"): payload["groups_claim"] = auth["groups_claim"] if auth.get("allowed_redirect_uris"): payload["allowed_redirect_uris"] = auth["allowed_redirect_uris"] if auth.get("oidc_scopes"): payload["oidc_scopes"] = auth["oidc_scopes"] return payload def render_plan(ccr: dict[str, Any]) -> str: openbao = ccr["openbao"] auth = openbao["auth"] payload = auth_payload(ccr) lines = [ f"CCR {ccr['id']} apply plan", "", "1. Write policy HCL:", f" policy: {openbao['policy_name']}", f" source: {openbao['policy_file']}", "", generated_policy_hcl(ccr).rstrip(), "", " Source artifact diff:", *render_policy_artifact_diff(ccr, indent=" "), "", "2. Create/update auth role payload:", f" path: auth/{auth['mount']}/role/{auth['role']}", json.dumps(payload, indent=2, sort_keys=True), "", "3. Provision secret value out-of-band:", f" path: {openbao['kv_path']}", f" fields: {', '.join(openbao['fields'])}", "", "4. Verify positive and negative access without printing secret values.", ] return "\n".join(lines) def render_operator_commands(ccr: dict[str, Any]) -> str: openbao = ccr["openbao"] auth = openbao["auth"] auth_path = f"auth/{auth['mount']}/role/{auth['role']}" payload = auth_payload(ccr) role_payload = json.dumps(payload, indent=2, sort_keys=True) secret_args = " ".join( shlex.quote(f"{field}=") for field in openbao["fields"] ) lines = [ f"# Operator handoff for {ccr['id']}: {ccr['title']}", "# Run from the railiance-platform repo with an approved OpenBao operator token.", "# Do not paste this shell block into the OpenBao Browser CLI.", f"# Web UI API Explorer path for the role JSON body: /v1/{auth_path}", "set -euo pipefail", f"bao policy write {shlex.quote(openbao['policy_name'])} {shlex.quote(openbao['policy_file'])}", 'role_payload_file="$(mktemp)"', 'trap \'rm -f "$role_payload_file"\' EXIT', 'cat >"$role_payload_file" <<\'JSON\'', role_payload, "JSON", f"bao write {shlex.quote(auth_path)} @\"$role_payload_file\"", "", "# Secret provisioning remains under approved OpenBao/operator custody.", "# Do not paste secret values into Git, State Hub, workplans, logs, or chat.", f"# bao kv put {shlex.quote(openbao['kv_path'])} {secret_args}", "", "# After provisioning, run positive and negative verification without printing secret values.", ] return "\n".join(lines) def normalized_policy_body(text: str) -> str: lines: list[str] = [] for line in text.splitlines(): stripped = line.strip() if not stripped or stripped.startswith("#"): continue lines.append(stripped) return "\n".join(lines) def string_values(value: Any) -> list[str]: if isinstance(value, str): return [value] if isinstance(value, list): values: list[str] = [] for item in value: values.extend(string_values(item)) return values if isinstance(value, dict): values = [] for item in value.values(): values.extend(string_values(item)) return values return [] def applier_policy_violations(ccr: dict[str, Any]) -> list[str]: violations: list[str] = [] if ccr.get("request_type") != "workload-kv-read": return ["delegated applier only supports workload-kv-read CCRs"] target = ccr.get("target", {}) environment = str(target.get("environment") or "") if environment not in APPLIER_ALLOWED_ENVIRONMENTS: violations.append(f"target.environment is outside delegated applier scope: {environment}") openbao = ccr.get("openbao", {}) mount = str(openbao.get("mount") or "") kv_path = str(openbao.get("kv_path") or "") policy_name = str(openbao.get("policy_name") or "") policy_file = str(openbao.get("policy_file") or "") fields = openbao.get("fields") or [] if mount != "platform": violations.append(f"openbao.mount must be platform, got {mount}") if not kv_path.startswith("platform/workloads/"): violations.append("openbao.kv_path must stay under platform/workloads/") if any(fragment in kv_path for fragment in ("*", "..", "//")): violations.append("openbao.kv_path must not contain wildcard, parent, or empty segments") if "/data/" in kv_path or "/metadata/" in kv_path: violations.append("openbao.kv_path must be the logical KV path, not a KV-v2 API path") for field in fields: field_name = str(field) if not FIELD_NAME_RE.match(field_name): violations.append(f"openbao.fields contains unsafe field name: {field_name}") if policy_name in DISALLOWED_POLICY_NAMES: violations.append(f"openbao.policy_name is disallowed: {policy_name}") if not policy_name.startswith(WORKLOAD_KV_POLICY_PREFIX): violations.append( f"openbao.policy_name must start with {WORKLOAD_KV_POLICY_PREFIX}" ) if not LOWER_SAFE_ID_RE.match(policy_name): violations.append(f"openbao.policy_name contains unsafe characters: {policy_name}") if policy_file: resolved_policy = resolve_repo_path(policy_file) policy_dir = (REPO_DIR / "openbao" / "policies").resolve() try: resolved_policy.relative_to(policy_dir) except ValueError: violations.append("openbao.policy_file must stay under openbao/policies") expected_name = f"{policy_name}.hcl" if resolved_policy.name != expected_name: violations.append( f"openbao.policy_file name must match policy name: {expected_name}" ) if resolved_policy.exists(): source = normalized_policy_body(resolved_policy.read_text(encoding="utf-8")) generated = normalized_policy_body(generated_policy_hcl(ccr)) if source != generated: violations.append("openbao.policy_file does not match generated CCR policy") auth = openbao.get("auth", {}) method = str(auth.get("method") or "") auth_mount = str(auth.get("mount") or "") role = str(auth.get("role") or "") if method == "oidc" and auth_mount != "netkingdom": violations.append("OIDC workload CCRs may only mutate auth/netkingdom roles") elif method == "kubernetes" and auth_mount != "kubernetes": violations.append("Kubernetes workload CCRs may only mutate auth/kubernetes roles") elif method not in {"oidc", "kubernetes"}: violations.append(f"unsupported auth method for delegated applier: {method}") if not LOWER_SAFE_ID_RE.match(role): violations.append(f"openbao.auth.role contains unsafe characters: {role}") if role in DISALLOWED_POLICY_NAMES: violations.append(f"openbao.auth.role is disallowed: {role}") if method == "oidc" and not role.endswith(OIDC_WORKLOAD_ROLE_SUFFIX): violations.append( f"OIDC workload role must end with {OIDC_WORKLOAD_ROLE_SUFFIX}" ) if method == "kubernetes" and not ( role.endswith(KUBERNETES_ROLE_SUFFIXES) or role.startswith(KUBERNETES_ROLE_PREFIXES) ): allowed_roles = list(KUBERNETES_ROLE_SUFFIXES) + [ f"{prefix}*" for prefix in KUBERNETES_ROLE_PREFIXES ] violations.append( "Kubernetes workload role must end with/start with " + " or ".join(allowed_roles) ) for policy in auth.get("policies") or []: policy_value = str(policy) if policy_value != policy_name: violations.append("openbao.auth.policies must contain only openbao.policy_name") if policy_value in DISALLOWED_POLICY_NAMES: violations.append(f"openbao.auth.policies contains disallowed policy: {policy_value}") for value in string_values(auth.get("bound_claims") or {}): if not value.strip() or "*" in value or ".." in value: violations.append("openbao.auth.bound_claims contains an unsafe value") break return violations def applier_readiness_blockers(ccr: dict[str, Any]) -> list[str]: blockers: list[str] = [] status = ccr.get("status") if status not in APPLIER_DRY_RUN_ALLOWED_STATUSES: blockers.append( "applier dry-run requires approved, applied, verified, or active " f"CCR status, got {status}" ) if ccr["openbao"]["auth"].get("bound_claims_confirmed") is not True: blockers.append("applier dry-run requires confirmed OpenBao auth binding") blockers.extend(applier_policy_violations(ccr)) return blockers def applier_dry_run_payload(ccr: dict[str, Any], warnings: list[str]) -> dict[str, Any]: openbao = ccr["openbao"] auth = openbao["auth"] auth_path = f"auth/{auth['mount']}/role/{auth['role']}" return { "id": ccr["id"], "title": ccr["title"], "status": ccr["status"], "environment": ccr["target"]["environment"], "warnings": warnings, "source_artifacts": { "policy": policy_artifact_diff(ccr), }, "mutations": [ { "kind": "policy_write", "openbao_path": f"sys/policies/acl/{openbao['policy_name']}", "policy_name": openbao["policy_name"], "source": openbao["policy_file"], "body": generated_policy_hcl(ccr).rstrip(), }, { "kind": "auth_role_write", "openbao_path": auth_path, "payload": auth_payload(ccr), }, ], "out_of_scope": [ "secret value writes", "secret reads", "front-door activation before verification", ], "required_evidence": [ "CCR id and approval reference", "policy name and auth role path", "OpenBao request id or timestamp", "positive and negative verification references", ], } def render_applier_dry_run(payload: dict[str, Any]) -> str: lines = [ f"CCR {payload['id']} delegated applier dry-run", f"Status: {payload['status']}", f"Environment: {payload['environment']}", "", "Allowed metadata mutations:", ] for mutation in payload["mutations"]: lines.append(f"- {mutation['kind']}: {mutation['openbao_path']}") if mutation["kind"] == "policy_write": lines.append(f" source: {mutation['source']}") lines.append(" body:") for line in mutation["body"].splitlines(): lines.append(f" {line}") else: lines.append(" payload:") rendered_payload = json.dumps(mutation["payload"], indent=4, sort_keys=True) for line in rendered_payload.splitlines(): lines.append(f" {line}") lines.append("") lines.append("Out of scope:") for item in payload["out_of_scope"]: lines.append(f"- {item}") lines.append("Required non-secret evidence:") for item in payload["required_evidence"]: lines.append(f"- {item}") policy_artifact = payload.get("source_artifacts", {}).get("policy") if policy_artifact: lines.append("Source artifact checks:") lines.append(f"- policy: {policy_artifact['path']} ({policy_artifact['status']})") if policy_artifact.get("diff"): lines.append(" diff:") for line in policy_artifact["diff"]: lines.append(f" {line}") if payload["warnings"]: lines.append("Warnings:") for warning in payload["warnings"]: lines.append(f"- {warning}") return "\n".join(lines) def applier_confirmation_phrase(ccr: dict[str, Any]) -> str: return f"DELEGATED APPLY {ccr['id']}" def delegated_apply_details(ccr: dict[str, Any], actor: str) -> list[str]: openbao = ccr["openbao"] auth = openbao["auth"] return [ f"Delegated metadata applier ran as {actor} using local bao CLI ambient authority.", f"Policy metadata write: sys/policies/acl/{openbao['policy_name']}", f"Auth role metadata write: auth/{auth['mount']}/role/{auth['role']}", "No secret values were read, written, printed, or accepted in argv.", ] def render_applier_apply_plan(ccr: dict[str, Any], warnings: list[str]) -> str: payload = applier_dry_run_payload(ccr, warnings) lines = [render_applier_dry_run(payload), "", "Delegated apply confirmation:"] lines.append(f" {applier_confirmation_phrase(ccr)}") lines.extend( [ "", "Live apply command:", f" scripts/credential-change.py applier-apply {ccr['id']} --actor --confirm \"{applier_confirmation_phrase(ccr)}\" --record-state-hub", "", "The command uses the local bao CLI and ambient delegated applier identity.", "It does not accept OpenBao tokens in argv and never writes secret values.", ] ) return "\n".join(lines) def runbook_readiness_blockers(ccr: dict[str, Any]) -> list[str]: blockers: list[str] = [] status = ccr.get("status") if status not in RUNBOOK_ALLOWED_STATUSES: blockers.append( "runbook requires approved, applied, verified, or active CCR status, " f"got {status}" ) if ccr["openbao"]["auth"].get("bound_claims_confirmed") is not True: blockers.append("runbook requires confirmed OpenBao auth binding") return blockers def runbook_confirmation_phrase(ccr: dict[str, Any]) -> str: return f"APPLY {ccr['id']}" def runbook_payload(ccr: dict[str, Any], warnings: list[str]) -> dict[str, Any]: openbao = ccr["openbao"] frontdoor = ccr["access_frontdoor"] verification = ccr["verification"] auth = openbao["auth"] return { "id": ccr["id"], "title": ccr["title"], "status": ccr["status"], "target": ccr["target"], "warnings": warnings, "decision_link": ccr.get("state_hub", {}).get("decision_api_url") or ccr.get("state_hub", {}).get("decision_dashboard_url"), "confirmation_phrase": runbook_confirmation_phrase(ccr), "policy": { "name": openbao["policy_name"], "source": openbao["policy_file"], "artifact": policy_artifact_diff(ccr), }, "auth_role": { "path": f"auth/{auth['mount']}/role/{auth['role']}", "method": auth["method"], }, "secret_provisioning": { "path": openbao["kv_path"], "fields": openbao["fields"], "instruction": ( "Enter or rotate secret values only through approved OpenBao/operator " "custody; do not paste values into Git, State Hub, prompts, chat, " "argv, or logs." ), }, "verification": { "positive": verification.get("positive", []), "negative": verification.get("negative", []), "activation_conditions": verification.get("activation_conditions", []), }, "frontdoor": { "type": frontdoor["type"], "catalog_id": frontdoor["catalog_id"], "readiness": frontdoor.get("readiness"), "resolvable": frontdoor.get("resolvable") is True, "command": frontdoor.get("command"), }, } def render_runbook(payload: dict[str, Any]) -> str: fields = ", ".join(payload["secret_provisioning"]["fields"]) lines = [ f"CCR {payload['id']} operator runbook", f"Title: {payload['title']}", f"Status: {payload['status']}", f"Target: {payload['target']['tenant']}/{payload['target']['workload']} ({payload['target']['environment']})", ] if payload.get("decision_link"): lines.append(f"Decision: {payload['decision_link']}") lines.extend( [ "", "Final attended confirmation:", f" {payload['confirmation_phrase']}", "", "1. Review generated metadata plan:", f" policy: {payload['policy']['name']}", f" source: {payload['policy']['source']}", f" artifact: {payload['policy']['artifact']['status']}", f" auth role: {payload['auth_role']['path']}", "", "2. Apply non-secret metadata:", " scripts/credential-change.py runbook --execute-metadata --actor ", " The command uses the local bao CLI and ambient approved operator authority;", " it does not accept OpenBao tokens in argv and it does not write secret values.", "", "3. Provision secret value through approved custody:", f" path: {payload['secret_provisioning']['path']}", f" fields: {fields}", f" {payload['secret_provisioning']['instruction']}", "", "4. Positive verification:", ] ) for item in payload["verification"]["positive"]: lines.append(f" - {item}") lines.append("") lines.append("5. Negative verification:") for item in payload["verification"]["negative"]: lines.append(f" - {item}") lines.append("") lines.append("6. Record non-secret evidence:") lines.extend( [ " scripts/credential-change.py record-evidence --actor --kind metadata_apply --result passed --detail \"OpenBao request id or audit timestamp: \" --status applied --record-state-hub", " scripts/credential-change.py record-evidence --actor --kind secret_provisioned --result passed --detail \"Field presence checked without printing values\" --record-state-hub", " scripts/credential-change.py record-evidence --actor --kind positive_verification --result passed --detail \"Positive verification reference: \" --record-state-hub", " scripts/credential-change.py record-evidence --actor --kind negative_verification --result passed --detail \"Negative verification reference: \" --status verified --record-state-hub", " scripts/credential-change.py record-evidence --actor --kind frontdoor_activation --result passed --detail \"Front door ready/resolvable after verification\" --status active --frontdoor-ready --record-state-hub", "", "Activation conditions:", ] ) for item in payload["verification"]["activation_conditions"]: lines.append(f" - {item}") if payload["frontdoor"].get("command"): lines.extend(["", f"Front-door command: {payload['frontdoor']['command']}"]) if payload["warnings"]: lines.append("") lines.append("Warnings:") for warning in payload["warnings"]: lines.append(f" - {warning}") return "\n".join(lines) def run_bao_metadata_apply(ccr: dict[str, Any], bao_bin: str) -> None: openbao = ccr["openbao"] auth = openbao["auth"] auth_path = f"auth/{auth['mount']}/role/{auth['role']}" with tempfile.NamedTemporaryFile("w", encoding="utf-8", delete=False) as role_file: role_file.write(json.dumps(auth_payload(ccr), indent=2, sort_keys=True)) role_file.write("\n") role_path = Path(role_file.name) try: commands = [ [bao_bin, "policy", "write", openbao["policy_name"], openbao["policy_file"]], [bao_bin, "write", auth_path, f"@{role_path}"], ] for command in commands: subprocess.run(command, cwd=REPO_DIR, check=True) finally: try: role_path.unlink() except FileNotFoundError: pass def validate_evidence_text(kind: str, result: str, details: list[str]) -> None: if not EVIDENCE_ID_RE.match(kind): fail("evidence kind must contain only lowercase letters, digits, underscore, or dash") if not EVIDENCE_ID_RE.match(result): fail("evidence result must contain only lowercase letters, digits, underscore, or dash") reject_secret_text(kind, "evidence kind") reject_secret_text(result, "evidence result") for detail in details: reject_secret_text(detail, "evidence detail") def append_evidence( path: Path, actor: str, kind: str, result: str, details: list[str], set_status: str | None = None, frontdoor_ready: bool = False, ) -> dict[str, Any]: validate_evidence_text(kind, result, details) reject_secret_text(actor, "evidence actor") ccr, errors, warnings = validate_ccr(path) if errors: for error in errors: print(f"[FAIL] {path.name}: {error}", file=sys.stderr) raise SystemExit(1) for warning in warnings: print(f"[WARN] {path.name}: {warning}", file=sys.stderr) verification = ccr.setdefault("verification", {}) evidence = verification.setdefault("evidence", []) if not isinstance(evidence, list): fail("verification.evidence must be a list") evidence.append( { "at": utc_now(), "actor": actor, "kind": kind, "result": result, "details": details, } ) if set_status: if set_status not in ALLOWED_STATUSES: fail(f"status must be one of {sorted(ALLOWED_STATUSES)}") ccr["status"] = set_status if frontdoor_ready: frontdoor = ccr.setdefault("access_frontdoor", {}) frontdoor["readiness"] = "ready" frontdoor["resolvable"] = True ccr["updated"] = datetime.now(timezone.utc).date().isoformat() dump_yaml(path, ccr) return ccr def record_evidence_state_hub( ccr: dict[str, Any], base_url: str, actor: str, kind: str, result: str, details: list[str] ) -> dict[str, Any]: openbao = ccr["openbao"] summary = ( f"CCR {ccr['id']} evidence {kind}/{result} by {actor}: " f"status={ccr['status']} path={openbao['kv_path']} " f"policy={openbao['policy_name']}; " + "; ".join(details) ) return state_hub_post_json( base_url, "/progress/", { "summary": summary, "event_type": "credential_change_evidence", "author": actor, }, ) def slugify(value: str) -> str: slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-") return slug or "item" def reject_secret_values(values: list[str], field: str) -> None: for value in values: reject_secret_text(value, field) def parse_key_values(values: list[str]) -> dict[str, list[str]]: parsed: dict[str, list[str]] = {} for raw in values: reject_secret_text(raw, "bound claim") key, separator, value = raw.partition("=") key = key.strip() value = value.strip() if separator != "=" or not key or not value: fail("bound claims must use key=value syntax") parsed.setdefault(key, []).append(value) return parsed def lifecycle_action_config(action: str) -> dict[str, Any]: try: return LIFECYCLE_ACTIONS[action] except KeyError: fail(f"lifecycle action must be one of {sorted(LIFECYCLE_ACTIONS)}") def lifecycle_payload(ccr: dict[str, Any], action: str) -> dict[str, Any]: config = lifecycle_action_config(action) openbao = ccr["openbao"] auth = openbao["auth"] frontdoor = ccr["access_frontdoor"] auth_role_path = f"auth/{auth['mount']}/role/{auth['role']}" disable_commands = [ f"bao delete {shlex.quote(auth_role_path)}", f"bao policy delete {shlex.quote(openbao['policy_name'])}", ] return { "id": ccr["id"], "title": ccr["title"], "action": action, "target_status": config["status"], "target_readiness": config["readiness"], "target_resolvable": config["resolvable"], "frontdoor": { "type": frontdoor["type"], "catalog_id": frontdoor["catalog_id"], "command": frontdoor.get("command"), }, "openbao": { "secret_path": openbao["kv_path"], "fields": openbao["fields"], "policy_name": openbao["policy_name"], "auth_role_path": auth_role_path, "disable_commands": disable_commands, }, "record_command": ( "scripts/credential-change.py lifecycle-event " f"{ccr['id']} --action {action} --actor " "--reason \"\" --detail \"\" " "--record-state-hub" ), } def render_lifecycle_plan(payload: dict[str, Any]) -> str: action = payload["action"] lines = [ f"CCR {payload['id']} lifecycle plan: {action}", f"Title: {payload['title']}", f"Target CCR status: {payload['target_status']}", f"Target front door: readiness={payload['target_readiness']} resolvable={payload['target_resolvable']}", "", "1. Record lifecycle event:", f" {payload['record_command']}", "", "2. Front-door action:", f" Mark {payload['frontdoor']['type']} catalog {payload['frontdoor']['catalog_id']} as {payload['target_readiness']} before any further use.", ] if payload["frontdoor"].get("command"): lines.append(f" Existing command to disable/check externally: {payload['frontdoor']['command']}") lines.extend(["", "3. OpenBao metadata action:"]) if action in {"deactivate", "compromise"}: lines.append(" Disable caller access by removing the auth role and read policy with approved operator authority:") for command in payload["openbao"]["disable_commands"]: lines.append(f" {command}") lines.append(" Secret values are not printed or copied; rotate/delete values only through approved custody.") elif action == "rotate": lines.append(" Keep the front door non-resolvable while the replacement value is entered through approved custody.") lines.append(f" Secret path: {payload['openbao']['secret_path']}") lines.append(f" Fields: {', '.join(payload['openbao']['fields'])}") lines.append(" After positive and negative verification, record front-door activation evidence to return the lane to active.") lines.extend(["", "4. Required non-secret notes:"]) if action == "compromise": lines.append(" Include blast-radius notes and follow-up task references; never include the exposed value.") elif action == "rotate": lines.append(" Include old-value revocation evidence, new-value field presence evidence, and verification references.") else: lines.append(" Include the reason for disablement, OpenBao audit/request reference, and front-door disable reference.") return "\n".join(lines) def append_lifecycle_event( path: Path, actor: str, action: str, reason: str, details: list[str], blast_radius: list[str] | None = None, follow_up: list[str] | None = None, ) -> dict[str, Any]: config = lifecycle_action_config(action) reject_secret_text(actor, "lifecycle actor") reject_secret_text(reason, "lifecycle reason") reject_secret_values(details, "lifecycle detail") blast_radius = blast_radius or [] follow_up = follow_up or [] reject_secret_values(blast_radius, "lifecycle blast radius") reject_secret_values(follow_up, "lifecycle follow-up") ccr, errors, warnings = validate_ccr(path) if errors: for error in errors: print(f"[FAIL] {path.name}: {error}", file=sys.stderr) raise SystemExit(1) for warning in warnings: print(f"[WARN] {path.name}: {warning}", file=sys.stderr) lifecycle = ccr.setdefault("lifecycle", {}) events = lifecycle.setdefault("events", []) if not isinstance(events, list): fail("lifecycle.events must be a list") event = { "at": utc_now(), "actor": actor, "action": action, "status": config["status"], "reason": reason, "details": details, } if blast_radius: event["blast_radius"] = blast_radius if follow_up: event["follow_up"] = follow_up events.append(event) ccr["status"] = config["status"] frontdoor = ccr.setdefault("access_frontdoor", {}) frontdoor["readiness"] = config["readiness"] frontdoor["resolvable"] = config["resolvable"] ccr["updated"] = datetime.now(timezone.utc).date().isoformat() dump_yaml(path, ccr) return ccr def record_lifecycle_state_hub( ccr: dict[str, Any], base_url: str, actor: str, action: str, reason: str, details: list[str] ) -> dict[str, Any]: openbao = ccr["openbao"] frontdoor = ccr["access_frontdoor"] summary = ( f"CCR {ccr['id']} lifecycle {action} by {actor}: " f"status={ccr['status']} readiness={frontdoor.get('readiness')} " f"resolvable={frontdoor.get('resolvable') is True} path={openbao['kv_path']} " f"policy={openbao['policy_name']}; reason={reason}; " + "; ".join(details) ) return state_hub_post_json( base_url, "/progress/", { "summary": summary, "event_type": "credential_change_lifecycle", "author": actor, }, ) def inventory_ccr_from_args(args: argparse.Namespace) -> dict[str, Any]: fields = list(args.field or []) if not fields: fail("at least one --field is required") reject_secret_values(fields, "inventory field") for value in ( args.id, args.title, args.tenant, args.workload, args.environment, args.purpose, args.kv_path, args.policy_name or "", args.policy_file or "", args.auth_mount, args.auth_role, args.frontdoor_type, args.catalog_id, args.reason, ): reject_secret_text(str(value), "inventory metadata") policy_name = args.policy_name or f"workload-kv-read-{slugify(args.workload)}-{slugify(args.purpose)}" policy_file = args.policy_file or f"openbao/policies/{policy_name}.hcl" bound_claims = parse_key_values(args.bound_claim or []) if args.auth_method == "kubernetes": if args.service_account: bound_claims["service_account_names"] = list(args.service_account) if args.service_account_namespace: bound_claims["service_account_namespaces"] = list(args.service_account_namespace) if not bound_claims: fail("at least one --bound-claim or Kubernetes service account binding is required") allowed_redirect_uris = list(getattr(args, "redirect_uri", None) or []) if args.auth_method == "oidc" and not allowed_redirect_uris: allowed_redirect_uris = [ "https://bao.coulomb.social/ui/vault/auth/netkingdom/oidc/callback", "http://localhost:8250/oidc/callback", "http://127.0.0.1:8250/oidc/callback", ] ccr = { "id": args.id, "kind": "credential-change-request", "schema_version": 1, "request_type": "workload-kv-read", "title": args.title, "status": args.status, "created": datetime.now(timezone.utc).date().isoformat(), "updated": datetime.now(timezone.utc).date().isoformat(), "requester": { "agent": args.requester_agent, "reason": args.reason, }, "review": { "required": True, "required_approvers": ["platform-operator"], "comments": [ { "at": utc_now(), "reviewer": args.actor, "decision": "inventory_imported", "comment": args.reason, } ], }, "target": { "domain": "financials", "tenant": args.tenant, "workload": args.workload, "environment": args.environment, "purpose": args.purpose, }, "openbao": { "mount": args.mount, "kv_path": args.kv_path, "fields": fields, "policy_name": policy_name, "policy_file": policy_file, "auth": { "method": args.auth_method, "mount": args.auth_mount, "role": args.auth_role, "bound_claims": bound_claims, "bound_claims_confirmed": args.bound_claims_confirmed, "policies": [policy_name], "ttl": args.ttl, }, }, "access_frontdoor": { "type": args.frontdoor_type, "catalog_id": args.catalog_id, "readiness": args.readiness, "resolvable": args.resolvable, "activation": "imported-existing-inventory", }, "risk": { "classification": args.risk, "notes": ["Imported existing credential lane as non-secret CCR-backed inventory."], }, "verification": { "positive": [args.positive_check], "negative": [args.negative_check], "activation_conditions": [ "Existing policy/auth metadata confirmed without printing secret values.", "Existing secret value remains under approved OpenBao/operator custody.", ], "evidence": [], }, "lifecycle": { "deactivate": "Disable the access front door and remove or detach auth role policy.", "rotate": "Replace the secret value through approved custody and re-run verification.", "compromised": "Immediately disable access, rotate value, record blast-radius notes, and open follow-up tasks.", "events": [], }, } if args.auth_method == "oidc": auth = ccr["openbao"]["auth"] auth["allowed_redirect_uris"] = allowed_redirect_uris auth["oidc_scopes"] = ["openid", "profile", "email", "groups"] auth["user_claim"] = "sub" if "groups" in bound_claims: auth["groups_claim"] = "groups" if args.command: reject_secret_text(args.command, "inventory command") ccr["access_frontdoor"]["command"] = args.command if args.selector: reject_secret_text(args.selector, "inventory selector") ccr["access_frontdoor"]["selector"] = args.selector return ccr def inventory_output_path(ccr: dict[str, Any], output_dir: str) -> Path: output = resolve_repo_path(output_dir) filename = f"{ccr['id']}-{slugify(ccr['title'])}.yaml" return output / filename def write_inventory_ccr(args: argparse.Namespace) -> Path: ccr = inventory_ccr_from_args(args) output_path = inventory_output_path(ccr, args.output_dir) output_path.parent.mkdir(parents=True, exist_ok=True) policy_path = resolve_repo_path(ccr["openbao"]["policy_file"]) if args.write_policy: policy_path.parent.mkdir(parents=True, exist_ok=True) if not policy_path.exists(): policy_path.write_text(generated_policy_hcl(ccr), encoding="utf-8") dump_yaml(output_path, ccr) _ccr, errors, warnings = validate_ccr(output_path) for warning in warnings: print(f"[WARN] {output_path.name}: {warning}", file=sys.stderr) if errors: for error in errors: print(f"[FAIL] {output_path.name}: {error}", file=sys.stderr) raise SystemExit(1) return output_path def validate_or_exit(path: Path) -> tuple[dict[str, Any], list[str]]: ccr, errors, warnings = validate_ccr(path) for warning in warnings: print(f"[WARN] {path.name}: {warning}", file=sys.stderr) if errors: for error in errors: print(f"[FAIL] {path.name}: {error}", file=sys.stderr) raise SystemExit(1) return ccr, warnings def apply_blockers(ccr: dict[str, Any]) -> list[str]: blockers: list[str] = [] status = ccr.get("status") if status in POST_APPLY_STATUSES: return blockers if status not in APPLY_ALLOWED_STATUSES: blockers.append(f"apply requires status approved, got {status}") if ccr["openbao"]["auth"].get("bound_claims_confirmed") is not True: blockers.append("apply requires confirmed OpenBao auth binding") return blockers def frontdoor_blockers(ccr: dict[str, Any]) -> list[str]: frontdoor = ccr["access_frontdoor"] blockers: list[str] = [] if ccr.get("status") != "active": blockers.append(f"front door requires CCR status active, got {ccr.get('status')}") if frontdoor.get("readiness") != "ready": blockers.append( f"front door readiness must be ready, got {frontdoor.get('readiness')}" ) if frontdoor.get("resolvable") is not True: blockers.append("front door is marked resolvable=false") return blockers def status_payload(ccr: dict[str, Any], warnings: list[str]) -> dict[str, Any]: apply_blocked_by = apply_blockers(ccr) frontdoor_blocked_by = frontdoor_blockers(ccr) frontdoor = ccr["access_frontdoor"] openbao = ccr["openbao"] auth = openbao["auth"] return { "id": ccr["id"], "title": ccr["title"], "status": ccr["status"], "request_type": ccr["request_type"], "apply_allowed": ccr.get("status") in APPLY_ALLOWED_STATUSES and not apply_blocked_by, "apply_complete": ccr.get("status") in POST_APPLY_STATUSES, "apply_blockers": apply_blocked_by, "frontdoor_resolvable": not frontdoor_blocked_by, "frontdoor_blockers": frontdoor_blocked_by, "warnings": warnings, "openbao": { "mount": openbao["mount"], "kv_path": openbao["kv_path"], "fields": openbao["fields"], "policy_name": openbao["policy_name"], "auth_mount": auth["mount"], "auth_method": auth["method"], "auth_role": auth["role"], "bound_claims_confirmed": auth.get("bound_claims_confirmed") is True, }, "access_frontdoor": { "type": frontdoor["type"], "catalog_id": frontdoor["catalog_id"], "readiness": frontdoor.get("readiness"), "resolvable": frontdoor.get("resolvable") is True, "command": frontdoor.get("command"), }, "state_hub": { "decision_id": ccr.get("state_hub", {}).get("decision_id"), "decision_api_url": ccr.get("state_hub", {}).get("decision_api_url"), "decision_dashboard_url": ccr.get("state_hub", {}).get("decision_dashboard_url"), }, } def render_status(payload: dict[str, Any]) -> str: lines = [ f"CCR: {payload['id']} ({payload['status']})", f"Catalog: {payload['access_frontdoor']['catalog_id']}", f"Readiness: {payload['access_frontdoor']['readiness']}", f"Resolvable: {payload['frontdoor_resolvable']}", f"Apply allowed: {payload['apply_allowed']}", f"Apply complete: {payload.get('apply_complete') is True}", ] decision = payload.get("state_hub", {}).get("decision_api_url") dashboard = payload.get("state_hub", {}).get("decision_dashboard_url") if decision: lines.append(f"State Hub decision: {decision}") if dashboard: lines.append(f"Decision dashboard: {dashboard}") command = payload["access_frontdoor"].get("command") if command: lines.append(f"Command: {command}") if payload["apply_blockers"]: lines.append("Apply blockers:") for blocker in payload["apply_blockers"]: lines.append(f" - {blocker}") if payload["frontdoor_blockers"]: lines.append("Front-door blockers:") for blocker in payload["frontdoor_blockers"]: lines.append(f" - {blocker}") if payload["warnings"]: lines.append("Warnings:") for warning in payload["warnings"]: lines.append(f" - {warning}") return "\n".join(lines) def append_decision(path: Path, status: str, reviewer: str, comment: str) -> dict[str, Any]: reject_secret_text(comment, "review comment") ccr, _warnings = validate_or_exit(path) review = ccr.setdefault("review", {}) comments = review.setdefault("comments", []) if not isinstance(comments, list): fail("review.comments must be a list") comments.append( { "at": utc_now(), "reviewer": reviewer, "decision": status, "comment": comment, } ) ccr["status"] = status ccr["updated"] = datetime.now(timezone.utc).date().isoformat() dump_yaml(path, ccr) return ccr def confirm_binding(path: Path, reviewer: str, comment: str) -> None: reject_secret_text(comment, "binding comment") ccr, errors, _warnings = validate_ccr(path) if errors: for error in errors: print(f"[FAIL] {path.name}: {error}", file=sys.stderr) raise SystemExit(1) ccr["openbao"]["auth"]["bound_claims_confirmed"] = True review = ccr.setdefault("review", {}) comments = review.setdefault("comments", []) if not isinstance(comments, list): fail("review.comments must be a list") comments.append( { "at": utc_now(), "reviewer": reviewer, "decision": "binding_confirmed", "comment": comment, } ) ccr["updated"] = datetime.now(timezone.utc).date().isoformat() dump_yaml(path, ccr) STATE_HUB_DECISION_PREFIXES = ( ("NEEDS_CHANGES", "needs_changes"), ("NEEDS CHANGES", "needs_changes"), ("REQUEST CHANGES", "needs_changes"), ("APPROVE", "approved"), ("APPROVED", "approved"), ("DENY", "denied"), ("DENIED", "denied"), ("REJECT", "denied"), ("REJECTED", "denied"), ) def state_hub_get_json(base_url: str, path: str) -> dict[str, Any]: url = f"{base_url.rstrip('/')}/{path.lstrip('/')}" try: with urllib.request.urlopen(url, timeout=10) as response: data = json.load(response) except urllib.error.HTTPError as exc: fail(f"State Hub GET {url} failed with HTTP {exc.code}") except OSError as exc: fail(f"State Hub GET {url} failed: {exc}") if not isinstance(data, dict): fail(f"State Hub GET {url} returned non-object JSON") return data def state_hub_post_json(base_url: str, path: str, payload: dict[str, Any]) -> dict[str, Any]: url = f"{base_url.rstrip('/')}/{path.lstrip('/')}" body = json.dumps(payload).encode("utf-8") request = urllib.request.Request( url, data=body, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(request, timeout=10) as response: data = json.load(response) except urllib.error.HTTPError as exc: fail(f"State Hub POST {url} failed with HTTP {exc.code}") except OSError as exc: fail(f"State Hub POST {url} failed: {exc}") if not isinstance(data, dict): fail(f"State Hub POST {url} returned non-object JSON") return data def decision_template_context(ccr: dict[str, Any]) -> dict[str, str]: openbao = ccr["openbao"] auth = openbao["auth"] state_hub = ccr.get("state_hub", {}) return { "id": ccr["id"], "kv_path": openbao["kv_path"], "policy_name": openbao["policy_name"], "auth_role_path": f"auth/{auth['mount']}/role/{auth['role']}", "decision_link": state_hub.get("decision_api_url") or state_hub.get("decision_dashboard_url") or "", } def decision_templates(ccr: dict[str, Any] | None = None) -> dict[str, str]: if ccr: context = decision_template_context(ccr) else: context = { "id": "", "kv_path": "", "policy_name": "", "auth_role_path": "auth//role/", "decision_link": "", } scope = ( f"{context['id']} path={context['kv_path']} " f"policy={context['policy_name']} auth_role={context['auth_role_path']}" ) return { "approve": f"APPROVE: {scope}; rationale=", "deny": f"DENY: {scope}; rationale=", "needs_changes": f"NEEDS_CHANGES: {scope}; rationale=", } def render_decision_templates(ccr: dict[str, Any]) -> str: context = decision_template_context(ccr) templates = decision_templates(ccr) lines = [ f"CCR {context['id']} decision templates", f"Decision link: {context['decision_link']}", "Use one of these accepted prefixes exactly:", ] for key in ("approve", "deny", "needs_changes"): lines.append(f"- {templates[key]}") return "\n".join(lines) def invalid_decision_template_message(ccr: dict[str, Any] | None = None) -> str: templates = decision_templates(ccr) return ( "resolved State Hub decision rationale must start with a recognized " "decision template:\n" f" {templates['approve']}\n" f" {templates['deny']}\n" f" {templates['needs_changes']}" ) def state_hub_decision_status(ccr: dict[str, Any], base_url: str) -> dict[str, Any]: decision_id = ccr.get("state_hub", {}).get("decision_id") if not decision_id: fail("CCR has no state_hub.decision_id") return state_hub_get_json(base_url, f"/decisions/{decision_id}") def ccr_status_from_state_hub_rationale( rationale: str, ccr: dict[str, Any] | None = None ) -> str: normalized = rationale.strip().upper().replace("-", "_") for prefix, status in STATE_HUB_DECISION_PREFIXES: if normalized == prefix or normalized.startswith(f"{prefix}:"): return status fail(invalid_decision_template_message(ccr)) def sync_state_hub_decision(path: Path, base_url: str) -> dict[str, Any]: ccr, errors, warnings = validate_ccr(path) if errors: for error in errors: print(f"[FAIL] {path.name}: {error}", file=sys.stderr) raise SystemExit(1) for warning in warnings: print(f"[WARN] {path.name}: {warning}", file=sys.stderr) decision = state_hub_decision_status(ccr, base_url) if decision.get("status") != "resolved": return decision rationale = str(decision.get("rationale") or "") status = ccr_status_from_state_hub_rationale(rationale, ccr) reviewer = str(decision.get("decided_by") or "state-hub") append_decision( path, status, reviewer, f"State Hub decision {decision['id']}: {rationale}", ) updated = load_yaml(path) state_hub = updated.setdefault("state_hub", {}) state_hub["decision_resolved_at"] = decision.get("decided_at") dump_yaml(path, updated) return decision def command_validate(args: argparse.Namespace) -> int: refs = args.refs or [str(path) for path in sorted(ccr_dir().glob("*.y*ml"))] if not refs: fail(f"no CCR files found in {ccr_dir()}") ok = True for ref in refs: path = resolve_ccr(ref) _ccr, errors, warnings = validate_ccr(path) for warning in warnings: print(f"[WARN] {path.name}: {warning}", file=sys.stderr) if errors: ok = False for error in errors: print(f"[FAIL] {path.name}: {error}", file=sys.stderr) else: print(f"[OK] {path.name}") return 0 if ok else 1 def command_render(args: argparse.Namespace) -> int: path = resolve_ccr(args.ref) ccr, warnings = validate_or_exit(path) print(render_summary(ccr, warnings)) return 0 def command_plan(args: argparse.Namespace) -> int: path = resolve_ccr(args.ref) ccr, _warnings = validate_or_exit(path) print(render_plan(ccr)) return 0 def command_decision_templates(args: argparse.Namespace) -> int: path = resolve_ccr(args.ref) ccr, _warnings = validate_or_exit(path) print(render_decision_templates(ccr)) return 0 def command_status(args: argparse.Namespace) -> int: path = resolve_ccr(args.ref) ccr, errors, warnings = validate_ccr(path) if errors: for error in errors: print(f"[FAIL] {path.name}: {error}", file=sys.stderr) return 1 payload = status_payload(ccr, warnings) if args.json: print(json.dumps(payload, indent=2, sort_keys=True)) else: print(render_status(payload)) return 0 def require_apply_ready(ccr: dict[str, Any], command_name: str) -> None: if ccr.get("status") not in APPLY_ALLOWED_STATUSES: fail(f"{command_name} requires status approved, got {ccr.get('status')}") auth = ccr["openbao"]["auth"] if auth.get("bound_claims_confirmed") is not True: fail(f"{command_name} requires openbao.auth.bound_claims_confirmed=true") def command_apply_plan(args: argparse.Namespace) -> int: path = resolve_ccr(args.ref) ccr, _warnings = validate_or_exit(path) require_apply_ready(ccr, "apply-plan") print(render_plan(ccr)) return 0 def command_operator_commands(args: argparse.Namespace) -> int: path = resolve_ccr(args.ref) ccr, _warnings = validate_or_exit(path) require_apply_ready(ccr, "operator-commands") print(render_operator_commands(ccr)) return 0 def command_applier_dry_run(args: argparse.Namespace) -> int: path = resolve_ccr(args.ref) ccr, warnings = validate_or_exit(path) blockers = applier_readiness_blockers(ccr) if blockers: for blocker in blockers: print(f"[BLOCK] {path.name}: {blocker}", file=sys.stderr) return 1 payload = applier_dry_run_payload(ccr, warnings) if args.json: print(json.dumps(payload, indent=2, sort_keys=True)) else: print(render_applier_dry_run(payload)) return 0 def command_applier_apply(args: argparse.Namespace) -> int: path = resolve_ccr(args.ref) ccr, warnings = validate_or_exit(path) blockers = applier_readiness_blockers(ccr) if blockers: for blocker in blockers: print(f"[BLOCK] {path.name}: {blocker}", file=sys.stderr) return 1 if args.json: print(json.dumps(applier_dry_run_payload(ccr, warnings), indent=2, sort_keys=True)) elif not args.quiet: print(render_applier_apply_plan(ccr, warnings)) if args.plan_only: return 0 expected = applier_confirmation_phrase(ccr) phrase = args.confirm or input("Type delegated apply confirmation phrase: ") if phrase != expected: fail(f"confirmation phrase mismatch; expected {expected!r}") run_bao_metadata_apply(ccr, args.bao_bin) set_status = "applied" if ccr.get("status") == "approved" else None details = delegated_apply_details(ccr, args.actor) ccr = append_evidence( path, args.actor, "delegated_metadata_apply", "passed", details, set_status=set_status, ) print(f"[OK] {path.name} delegated metadata apply recorded") if args.record_state_hub: event = record_evidence_state_hub( ccr, args.state_hub_url, args.actor, "delegated_metadata_apply", "passed", details, ) print(f"[OK] State Hub progress event {event.get('id', '')}") return 0 def command_runbook(args: argparse.Namespace) -> int: path = resolve_ccr(args.ref) ccr, warnings = validate_or_exit(path) blockers = runbook_readiness_blockers(ccr) if blockers: for blocker in blockers: print(f"[BLOCK] {path.name}: {blocker}", file=sys.stderr) return 1 payload = runbook_payload(ccr, warnings) if args.json: print(json.dumps(payload, indent=2, sort_keys=True)) else: print(render_runbook(payload)) if args.execute_metadata: require_apply_ready(ccr, "runbook --execute-metadata") phrase = args.confirm or input("Type final confirmation phrase: ") expected = runbook_confirmation_phrase(ccr) if phrase != expected: fail(f"confirmation phrase mismatch; expected {expected!r}") run_bao_metadata_apply(ccr, args.bao_bin) details = ["OpenBao policy and auth-role metadata apply completed without secret values"] ccr = append_evidence( path, args.actor, "metadata_apply", "passed", details, set_status="applied", ) print(f"[OK] {path.name} metadata applied and evidence recorded") if args.record_state_hub: event = record_evidence_state_hub( ccr, args.state_hub_url, args.actor, "metadata_apply", "passed", details ) print(f"[OK] State Hub progress event {event.get('id', '')}") return 0 def command_record_evidence(args: argparse.Namespace) -> int: path = resolve_ccr(args.ref) ccr = append_evidence( path, args.actor, args.kind, args.result, args.detail, set_status=args.status, frontdoor_ready=args.frontdoor_ready, ) print(f"[OK] {path.name} evidence {args.kind}/{args.result} recorded") if args.record_state_hub: event = record_evidence_state_hub( ccr, args.state_hub_url, args.actor, args.kind, args.result, args.detail ) print(f"[OK] State Hub progress event {event.get('id', '')}") return 0 def command_decision(args: argparse.Namespace, status: str) -> int: path = resolve_ccr(args.ref) ccr = append_decision(path, status, args.reviewer, args.comment) print(f"[OK] {path.name} -> {status}") if args.record_state_hub: openbao = ccr["openbao"] auth = openbao["auth"] event = state_hub_post_json( args.state_hub_url, "/progress/", { "summary": ( f"CCR {ccr['id']} decision {status} by {args.reviewer}: " f"path={openbao['kv_path']} policy={openbao['policy_name']} " f"fields={','.join(openbao['fields'])} " f"auth_role=auth/{auth['mount']}/role/{auth['role']}; " f"{args.comment}" ), "event_type": "credential_change_decision", "author": args.reviewer, }, ) print(f"[OK] State Hub progress event {event.get('id', '')}") return 0 def command_confirm_binding(args: argparse.Namespace) -> int: path = resolve_ccr(args.ref) confirm_binding(path, args.reviewer, args.comment) print(f"[OK] {path.name} -> binding_confirmed") return 0 def command_lifecycle_plan(args: argparse.Namespace) -> int: path = resolve_ccr(args.ref) ccr, _warnings = validate_or_exit(path) payload = lifecycle_payload(ccr, args.action) if args.json: print(json.dumps(payload, indent=2, sort_keys=True)) else: print(render_lifecycle_plan(payload)) return 0 def command_lifecycle_event(args: argparse.Namespace) -> int: path = resolve_ccr(args.ref) ccr = append_lifecycle_event( path, args.actor, args.action, args.reason, args.detail, blast_radius=args.blast_radius, follow_up=args.follow_up, ) print(f"[OK] {path.name} lifecycle {args.action} -> {ccr['status']}") if args.record_state_hub: event = record_lifecycle_state_hub( ccr, args.state_hub_url, args.actor, args.action, args.reason, args.detail ) print(f"[OK] State Hub progress event {event.get('id', '')}") return 0 def command_import_inventory(args: argparse.Namespace) -> int: path = write_inventory_ccr(args) print(f"[OK] inventory CCR written: {display_repo_path(path)}") return 0 def command_sync_decision(args: argparse.Namespace) -> int: path = resolve_ccr(args.ref) decision = sync_state_hub_decision(path, args.state_hub_url) if decision.get("status") == "resolved": print(f"[OK] {path.name} <- State Hub decision {decision['id']}") else: print( f"[WAIT] State Hub decision {decision['id']} is {decision.get('status')}; " "resolve it before syncing CCR status" ) return 0 def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Validate, render, and review non-secret credential change requests." ) sub = parser.add_subparsers(dest="command", required=True) validate = sub.add_parser("validate", help="Validate CCR files") validate.add_argument("refs", nargs="*") validate.set_defaults(func=command_validate) render = sub.add_parser("render", help="Render a human review summary") render.add_argument("ref") render.set_defaults(func=command_render) plan = sub.add_parser("plan", help="Render the generated apply plan for review") plan.add_argument("ref") plan.set_defaults(func=command_plan) templates = sub.add_parser( "decision-templates", help="Render State Hub/chat decision rationale templates", ) templates.add_argument("ref") templates.set_defaults(func=command_decision_templates) status = sub.add_parser("status", help="Render machine-readable readiness status") status.add_argument("ref") status.add_argument("--json", action="store_true") status.set_defaults(func=command_status) apply_plan = sub.add_parser( "apply-plan", help="Render an operator apply plan only for approved CCRs" ) apply_plan.add_argument("ref") apply_plan.set_defaults(func=command_apply_plan) operator_commands = sub.add_parser( "operator-commands", help="Render reviewed non-secret OpenBao commands for an approved CCR", ) operator_commands.add_argument("ref") operator_commands.set_defaults(func=command_operator_commands) applier_dry_run = sub.add_parser( "applier-dry-run", help="Validate and render delegated OpenBao metadata mutations", ) applier_dry_run.add_argument("ref") applier_dry_run.add_argument("--json", action="store_true") applier_dry_run.set_defaults(func=command_applier_dry_run) applier_apply = sub.add_parser( "applier-apply", help="Apply delegated OpenBao metadata after dry-run guardrails", ) applier_apply.add_argument("ref") applier_apply.add_argument("--actor", default=os.environ.get("USER", "delegated-applier")) applier_apply.add_argument("--confirm") applier_apply.add_argument("--bao-bin", default=os.environ.get("BAO_BIN", "bao")) applier_apply.add_argument("--plan-only", action="store_true") applier_apply.add_argument("--json", action="store_true") applier_apply.add_argument("--quiet", action="store_true") applier_apply.add_argument("--record-state-hub", action="store_true") applier_apply.add_argument( "--state-hub-url", default=os.environ.get("STATE_HUB_URL", "http://127.0.0.1:8000"), ) applier_apply.set_defaults(func=command_applier_apply) runbook = sub.add_parser( "runbook", help="Render or execute the attended operator apply/verify runbook", ) runbook.add_argument("ref") runbook.add_argument("--json", action="store_true") runbook.add_argument("--execute-metadata", action="store_true") runbook.add_argument("--actor", default=os.environ.get("USER", "operator")) runbook.add_argument("--confirm") runbook.add_argument("--bao-bin", default=os.environ.get("BAO_BIN", "bao")) runbook.add_argument("--record-state-hub", action="store_true") runbook.add_argument( "--state-hub-url", default=os.environ.get("STATE_HUB_URL", "http://127.0.0.1:8000"), ) runbook.set_defaults(func=command_runbook) evidence = sub.add_parser( "record-evidence", help="Append non-secret apply/verification evidence to a CCR", ) evidence.add_argument("ref") evidence.add_argument("--actor", required=True) evidence.add_argument("--kind", required=True) evidence.add_argument("--result", required=True) evidence.add_argument("--detail", action="append", required=True) evidence.add_argument("--status", choices=sorted(ALLOWED_STATUSES)) evidence.add_argument("--frontdoor-ready", action="store_true") evidence.add_argument("--record-state-hub", action="store_true") evidence.add_argument( "--state-hub-url", default=os.environ.get("STATE_HUB_URL", "http://127.0.0.1:8000"), ) evidence.set_defaults(func=command_record_evidence) lifecycle_plan = sub.add_parser( "lifecycle-plan", help="Render deactivation, rotation, or compromise lifecycle guidance", ) lifecycle_plan.add_argument("ref") lifecycle_plan.add_argument("--action", choices=sorted(LIFECYCLE_ACTIONS), required=True) lifecycle_plan.add_argument("--json", action="store_true") lifecycle_plan.set_defaults(func=command_lifecycle_plan) lifecycle_event = sub.add_parser( "lifecycle-event", help="Record a non-secret deactivation, rotation, or compromise event", ) lifecycle_event.add_argument("ref") lifecycle_event.add_argument("--action", choices=sorted(LIFECYCLE_ACTIONS), required=True) lifecycle_event.add_argument("--actor", required=True) lifecycle_event.add_argument("--reason", required=True) lifecycle_event.add_argument("--detail", action="append", required=True) lifecycle_event.add_argument("--blast-radius", action="append", default=[]) lifecycle_event.add_argument("--follow-up", action="append", default=[]) lifecycle_event.add_argument("--record-state-hub", action="store_true") lifecycle_event.add_argument( "--state-hub-url", default=os.environ.get("STATE_HUB_URL", "http://127.0.0.1:8000"), ) lifecycle_event.set_defaults(func=command_lifecycle_event) inventory = sub.add_parser( "import-inventory", help="Create a non-secret CCR for an existing credential lane", ) inventory.add_argument("id") inventory.add_argument("--title", required=True) inventory.add_argument("--tenant", required=True) inventory.add_argument("--workload", required=True) inventory.add_argument("--environment", required=True) inventory.add_argument("--purpose", required=True) inventory.add_argument("--mount", default="platform") inventory.add_argument("--kv-path", required=True) inventory.add_argument("--field", action="append", required=True) inventory.add_argument("--policy-name") inventory.add_argument("--policy-file") inventory.add_argument("--auth-method", choices=("oidc", "kubernetes"), required=True) inventory.add_argument("--auth-mount", required=True) inventory.add_argument("--auth-role", required=True) inventory.add_argument("--bound-claim", action="append", default=[]) inventory.add_argument("--redirect-uri", action="append") inventory.add_argument("--service-account", action="append") inventory.add_argument("--service-account-namespace", action="append") inventory.add_argument("--bound-claims-confirmed", action="store_true") inventory.add_argument("--ttl", default="15m") inventory.add_argument("--frontdoor-type", required=True) inventory.add_argument("--catalog-id", required=True) inventory.add_argument("--selector") inventory.add_argument("--command") inventory.add_argument("--status", choices=sorted(ALLOWED_STATUSES), default="active") inventory.add_argument("--readiness", choices=sorted(FRONTDOOR_READINESS), default="ready") inventory.add_argument("--resolvable", action="store_true") inventory.add_argument("--risk", default="high") inventory.add_argument("--positive-check", default="Authorized caller can fetch the named field without printing the value.") inventory.add_argument("--negative-check", default="Unauthorized caller cannot read the path or field.") inventory.add_argument("--requester-agent", default="codex") inventory.add_argument("--actor", default=os.environ.get("USER", "operator")) inventory.add_argument("--reason", required=True) inventory.add_argument("--output-dir", default=str(DEFAULT_CCR_DIR)) inventory.add_argument("--write-policy", action=argparse.BooleanOptionalAction, default=True) inventory.set_defaults(func=command_import_inventory) for name, status in ( ("approve", "approved"), ("deny", "denied"), ("needs-changes", "needs_changes"), ): decision = sub.add_parser(name, help=f"Record {status} decision") decision.add_argument("ref") decision.add_argument("--reviewer", required=True) decision.add_argument("--comment", required=True) decision.add_argument("--record-state-hub", action="store_true") decision.add_argument( "--state-hub-url", default=os.environ.get("STATE_HUB_URL", "http://127.0.0.1:8000"), ) decision.set_defaults(func=lambda args, status=status: command_decision(args, status)) binding = sub.add_parser( "confirm-binding", help="Record that the non-secret OpenBao auth binding was confirmed", ) binding.add_argument("ref") binding.add_argument("--reviewer", required=True) binding.add_argument("--comment", required=True) binding.set_defaults(func=command_confirm_binding) sync_decision = sub.add_parser( "sync-decision", help="Sync an approved/denied/needs_changes CCR decision from State Hub", ) sync_decision.add_argument("ref") sync_decision.add_argument( "--state-hub-url", default=os.environ.get("STATE_HUB_URL", "http://127.0.0.1:8000"), ) sync_decision.set_defaults(func=command_sync_decision) return parser def main() -> int: parser = build_parser() args = parser.parse_args() return int(args.func(args)) if __name__ == "__main__": raise SystemExit(main())