Implement credential change request review flow

This commit is contained in:
2026-06-27 22:57:21 +02:00
parent 8c1e64d5e0
commit 815b124ab1
7 changed files with 772 additions and 14 deletions

466
scripts/credential-change.py Executable file
View File

@@ -0,0 +1,466 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import yaml
REPO_DIR = Path(__file__).resolve().parents[1]
DEFAULT_CCR_DIR = REPO_DIR / "credential-change-requests"
ALLOWED_STATUSES = {
"draft",
"proposed",
"needs_changes",
"approved",
"denied",
"apply_pending",
"applied",
"verified",
"active",
"deactivated",
"rotated",
"compromised",
"superseded",
"cancelled",
}
APPLY_ALLOWED_STATUSES = {"approved"}
SECRET_MARKERS = [
"AGE-SECRET-KEY-1",
"-----BEGIN PRIVATE KEY-----",
"-----BEGIN OPENSSH PRIVATE KEY-----",
"OPENBAO_ROOT_TOKEN=",
"VAULT_TOKEN=",
"BAO_TOKEN=",
"hvb.",
"hvc.",
"hvs.",
"npm_",
"ghp_",
"sk-",
]
DISALLOWED_POLICY_NAMES = {"root", "platform-admin"}
SAFE_ID_RE = re.compile(r"^[A-Z0-9][A-Z0-9_.-]*$")
TTL_RE = re.compile(r"^[1-9][0-9]*[smhd]$")
def fail(message: str) -> None:
raise SystemExit(f"ERROR: {message}")
def utc_now() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
def resolve_repo_path(path: str | Path) -> Path:
p = Path(path).expanduser()
if p.is_absolute():
return p
return (REPO_DIR / p).resolve()
def load_yaml(path: Path) -> dict[str, Any]:
data = yaml.safe_load(path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
fail(f"YAML root must be an object: {path}")
return data
def dump_yaml(path: Path, data: dict[str, Any]) -> None:
path.write_text(
yaml.safe_dump(data, sort_keys=False, allow_unicode=False),
encoding="utf-8",
)
def ccr_dir() -> Path:
return resolve_repo_path(os.environ.get("CCR_DIR", str(DEFAULT_CCR_DIR)))
def resolve_ccr(ref: str) -> Path:
candidate = resolve_repo_path(ref)
if candidate.exists():
return candidate
matches = sorted(ccr_dir().glob(f"{ref}*.y*ml"))
if len(matches) == 1:
return matches[0]
if len(matches) > 1:
fail(f"CCR reference is ambiguous: {ref} -> {[m.name for m in matches]}")
fail(f"CCR not found by path or id prefix: {ref}")
def require_object(value: Any, field: str, errors: list[str]) -> dict[str, Any]:
if not isinstance(value, dict):
errors.append(f"{field} must be an object")
return {}
return value
def require_list(value: Any, field: str, errors: list[str]) -> list[Any]:
if not isinstance(value, list):
errors.append(f"{field} must be a list")
return []
return value
def require_string(value: Any, field: str, errors: list[str]) -> str:
if not isinstance(value, str) or not value.strip():
errors.append(f"{field} must be a non-empty string")
return ""
return value.strip()
def scan_for_secrets(path: Path, errors: list[str]) -> None:
text = path.read_text(encoding="utf-8")
for marker in SECRET_MARKERS:
if marker in text:
errors.append(f"{path.name} contains rejected secret marker {marker!r}")
def validate_workload_kv_read(ccr: dict[str, Any], errors: list[str], warnings: list[str]) -> None:
target = require_object(ccr.get("target"), "target", errors)
for field in ("domain", "tenant", "workload", "environment", "purpose"):
require_string(target.get(field), f"target.{field}", errors)
openbao = require_object(ccr.get("openbao"), "openbao", errors)
mount = require_string(openbao.get("mount"), "openbao.mount", errors)
kv_path = require_string(openbao.get("kv_path"), "openbao.kv_path", errors)
policy_name = require_string(
openbao.get("policy_name"), "openbao.policy_name", errors
)
policy_file = require_string(
openbao.get("policy_file"), "openbao.policy_file", errors
)
fields = [str(field) for field in require_list(openbao.get("fields"), "openbao.fields", errors)]
if not fields:
errors.append("openbao.fields must contain at least one field")
if mount and kv_path and not kv_path.startswith(f"{mount}/"):
errors.append("openbao.kv_path must start with the declared mount")
if any(fragment in kv_path for fragment in ("*", "..")):
errors.append("openbao.kv_path must not contain '*' or '..'")
if policy_name in DISALLOWED_POLICY_NAMES:
errors.append(f"openbao.policy_name is disallowed: {policy_name}")
if policy_file:
resolved_policy = resolve_repo_path(policy_file)
if not resolved_policy.exists():
errors.append(f"openbao.policy_file does not exist: {policy_file}")
auth = require_object(openbao.get("auth"), "openbao.auth", errors)
method = require_string(auth.get("method"), "openbao.auth.method", errors)
if method not in {"oidc", "kubernetes"}:
errors.append("openbao.auth.method must be oidc or kubernetes")
require_string(auth.get("mount"), "openbao.auth.mount", errors)
require_string(auth.get("role"), "openbao.auth.role", errors)
policies = [str(policy) for policy in require_list(auth.get("policies"), "openbao.auth.policies", errors)]
if policies != [policy_name]:
errors.append("openbao.auth.policies must contain exactly openbao.policy_name")
for policy in policies:
if policy in DISALLOWED_POLICY_NAMES:
errors.append(f"openbao.auth.policies contains disallowed policy {policy}")
ttl = auth.get("ttl")
if ttl is not None and (not isinstance(ttl, str) or not TTL_RE.match(ttl)):
errors.append("openbao.auth.ttl must match <positive integer><s|m|h|d>")
bound_claims = require_object(
auth.get("bound_claims"), "openbao.auth.bound_claims", errors
)
if not bound_claims:
errors.append("openbao.auth.bound_claims must not be empty")
if auth.get("bound_claims_confirmed") is not True:
warnings.append("OIDC/Kubernetes bound claim is not confirmed; apply is blocked")
frontdoor = require_object(ccr.get("access_frontdoor"), "access_frontdoor", errors)
require_string(frontdoor.get("type"), "access_frontdoor.type", errors)
require_string(frontdoor.get("catalog_id"), "access_frontdoor.catalog_id", errors)
risk = require_object(ccr.get("risk"), "risk", errors)
require_string(risk.get("classification"), "risk.classification", errors)
require_list(risk.get("notes"), "risk.notes", errors)
verification = require_object(ccr.get("verification"), "verification", errors)
for field in ("positive", "negative", "activation_conditions"):
values = require_list(verification.get(field), f"verification.{field}", errors)
if not values:
errors.append(f"verification.{field} must not be empty")
lifecycle = require_object(ccr.get("lifecycle"), "lifecycle", errors)
for field in ("deactivate", "rotate", "compromised"):
require_string(lifecycle.get(field), f"lifecycle.{field}", errors)
def validate_ccr(path: Path) -> tuple[dict[str, Any], list[str], list[str]]:
errors: list[str] = []
warnings: list[str] = []
scan_for_secrets(path, errors)
ccr = load_yaml(path)
for field in (
"id",
"kind",
"schema_version",
"request_type",
"title",
"status",
"created",
"updated",
"requester",
):
if field == "schema_version":
if ccr.get(field) != 1:
errors.append("schema_version must be 1")
elif field == "requester":
require_object(ccr.get(field), field, errors)
else:
require_string(ccr.get(field), field, errors)
if ccr.get("kind") != "credential-change-request":
errors.append("kind must be credential-change-request")
ccr_id = ccr.get("id")
if isinstance(ccr_id, str) and not SAFE_ID_RE.match(ccr_id):
errors.append("id must contain only uppercase letters, digits, dot, dash, or underscore")
status = ccr.get("status")
if isinstance(status, str) and status not in ALLOWED_STATUSES:
errors.append(f"status must be one of {sorted(ALLOWED_STATUSES)}")
request_type = ccr.get("request_type")
if request_type != "workload-kv-read":
errors.append("request_type must be workload-kv-read")
else:
validate_workload_kv_read(ccr, errors, warnings)
return ccr, errors, warnings
def render_summary(ccr: dict[str, Any], warnings: list[str]) -> str:
openbao = ccr["openbao"]
auth = openbao["auth"]
frontdoor = ccr["access_frontdoor"]
risk = ccr["risk"]
verification = ccr["verification"]
fields = ", ".join(openbao["fields"])
claim_bits = ", ".join(
f"{key}={value}" for key, value in auth.get("bound_claims", {}).items()
)
lines = [
f"Request: {ccr['title']}",
f"CCR: {ccr['id']} ({ccr['status']})",
f"Type: {ccr['request_type']}",
f"Target: {ccr['target']['tenant']}/{ccr['target']['workload']} ({ccr['target']['environment']})",
"Mount/path/field:",
f" {openbao['kv_path']}",
f" {fields}",
"Policy:",
f" {openbao['policy_name']}",
"Auth binding:",
f" {auth['mount']} {auth['method']} role {auth['role']}",
f" bound claims: {claim_bits}",
f" confirmed: {auth.get('bound_claims_confirmed') is True}",
"Access front door:",
f" {frontdoor['type']} {frontdoor['catalog_id']}",
f"Risk: {risk['classification']}",
]
for note in risk.get("notes", []):
lines.append(f" - {note}")
lines.append("Checks:")
for check in verification.get("positive", []):
lines.append(f" + {check}")
for check in verification.get("negative", []):
lines.append(f" - {check}")
if warnings:
lines.append("Warnings:")
for warning in warnings:
lines.append(f" ! {warning}")
lines.extend(
[
"Decision:",
" approve | deny | needs_changes",
"Comment:",
" free text; do not include secret values",
]
)
return "\n".join(lines)
def generated_policy_hcl(ccr: dict[str, Any]) -> str:
openbao = ccr["openbao"]
mount = openbao["mount"]
suffix = openbao["kv_path"][len(mount) + 1 :]
return (
f'path "{mount}/data/{suffix}" {{\n'
' capabilities = ["read"]\n'
"}\n\n"
f'path "{mount}/metadata/{suffix}" {{\n'
' capabilities = ["read"]\n'
"}\n"
)
def auth_payload(ccr: dict[str, Any]) -> dict[str, Any]:
auth = ccr["openbao"]["auth"]
payload: dict[str, Any] = {
"role_type": "oidc" if auth["method"] == "oidc" else "jwt",
"user_claim": auth.get("user_claim", "sub"),
"policies": ",".join(auth["policies"]),
"ttl": auth.get("ttl", "15m"),
"bound_claims": auth["bound_claims"],
}
if auth.get("groups_claim"):
payload["groups_claim"] = auth["groups_claim"]
return payload
def render_plan(ccr: dict[str, Any]) -> str:
openbao = ccr["openbao"]
auth = openbao["auth"]
payload = auth_payload(ccr)
lines = [
f"CCR {ccr['id']} apply plan",
"",
"1. Write policy HCL:",
f" policy: {openbao['policy_name']}",
f" source: {openbao['policy_file']}",
"",
generated_policy_hcl(ccr).rstrip(),
"",
"2. Create/update auth role payload:",
f" path: auth/{auth['mount']}/role/{auth['role']}",
json.dumps(payload, indent=2, sort_keys=True),
"",
"3. Provision secret value out-of-band:",
f" path: {openbao['kv_path']}",
f" fields: {', '.join(openbao['fields'])}",
"",
"4. Verify positive and negative access without printing secret values.",
]
return "\n".join(lines)
def validate_or_exit(path: Path) -> tuple[dict[str, Any], list[str]]:
ccr, errors, warnings = validate_ccr(path)
for warning in warnings:
print(f"[WARN] {path.name}: {warning}", file=sys.stderr)
if errors:
for error in errors:
print(f"[FAIL] {path.name}: {error}", file=sys.stderr)
raise SystemExit(1)
return ccr, warnings
def append_decision(path: Path, status: str, reviewer: str, comment: str) -> None:
ccr, _warnings = validate_or_exit(path)
review = ccr.setdefault("review", {})
comments = review.setdefault("comments", [])
if not isinstance(comments, list):
fail("review.comments must be a list")
comments.append(
{
"at": utc_now(),
"reviewer": reviewer,
"decision": status,
"comment": comment,
}
)
ccr["status"] = status
ccr["updated"] = datetime.now(timezone.utc).date().isoformat()
dump_yaml(path, ccr)
def command_validate(args: argparse.Namespace) -> int:
refs = args.refs or [str(path) for path in sorted(ccr_dir().glob("*.y*ml"))]
if not refs:
fail(f"no CCR files found in {ccr_dir()}")
ok = True
for ref in refs:
path = resolve_ccr(ref)
_ccr, errors, warnings = validate_ccr(path)
for warning in warnings:
print(f"[WARN] {path.name}: {warning}", file=sys.stderr)
if errors:
ok = False
for error in errors:
print(f"[FAIL] {path.name}: {error}", file=sys.stderr)
else:
print(f"[OK] {path.name}")
return 0 if ok else 1
def command_render(args: argparse.Namespace) -> int:
path = resolve_ccr(args.ref)
ccr, warnings = validate_or_exit(path)
print(render_summary(ccr, warnings))
return 0
def command_plan(args: argparse.Namespace) -> int:
path = resolve_ccr(args.ref)
ccr, _warnings = validate_or_exit(path)
print(render_plan(ccr))
return 0
def command_apply_plan(args: argparse.Namespace) -> int:
path = resolve_ccr(args.ref)
ccr, _warnings = validate_or_exit(path)
if ccr.get("status") not in APPLY_ALLOWED_STATUSES:
fail(f"apply-plan requires status approved, got {ccr.get('status')}")
auth = ccr["openbao"]["auth"]
if auth.get("bound_claims_confirmed") is not True:
fail("apply-plan requires openbao.auth.bound_claims_confirmed=true")
print(render_plan(ccr))
return 0
def command_decision(args: argparse.Namespace, status: str) -> int:
path = resolve_ccr(args.ref)
append_decision(path, status, args.reviewer, args.comment)
print(f"[OK] {path.name} -> {status}")
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Validate, render, and review non-secret credential change requests."
)
sub = parser.add_subparsers(dest="command", required=True)
validate = sub.add_parser("validate", help="Validate CCR files")
validate.add_argument("refs", nargs="*")
validate.set_defaults(func=command_validate)
render = sub.add_parser("render", help="Render a human review summary")
render.add_argument("ref")
render.set_defaults(func=command_render)
plan = sub.add_parser("plan", help="Render the generated apply plan for review")
plan.add_argument("ref")
plan.set_defaults(func=command_plan)
apply_plan = sub.add_parser(
"apply-plan", help="Render an operator apply plan only for approved CCRs"
)
apply_plan.add_argument("ref")
apply_plan.set_defaults(func=command_apply_plan)
for name, status in (
("approve", "approved"),
("deny", "denied"),
("needs-changes", "needs_changes"),
):
decision = sub.add_parser(name, help=f"Record {status} decision")
decision.add_argument("ref")
decision.add_argument("--reviewer", required=True)
decision.add_argument("--comment", required=True)
decision.set_defaults(func=lambda args, status=status: command_decision(args, status))
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
return int(args.func(args))
if __name__ == "__main__":
raise SystemExit(main())