Files
railiance-platform/scripts/openbao-apply-credential-change-appliers.py

218 lines
7.0 KiB
Python
Executable File

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import getpass
import os
import shlex
import subprocess
import sys
from pathlib import Path
from typing import Any
REPO_DIR = Path(__file__).resolve().parents[1]
APPLIERS: dict[str, dict[str, Any]] = {
"nonprod": {
"title": "Credential change non-production metadata applier",
"policy_name": "credential-change-nonprod-applier",
"policy_file": "openbao/policies/credential-change-nonprod-applier.hcl",
"token_role": "credential-change-nonprod-applier",
"max_ttl": "1h",
},
"prod": {
"title": "Credential change production metadata applier",
"policy_name": "credential-change-prod-applier",
"policy_file": "openbao/policies/credential-change-prod-applier.hcl",
"token_role": "credential-change-prod-applier",
"max_ttl": "30m",
},
}
DISALLOWED_POLICIES = ("root", "platform-admin")
class BaoRunner:
def __init__(
self,
*,
kubectl: str,
namespace: str,
release: str,
dry_run: bool,
use_token_helper: bool,
token: str | None,
) -> None:
self.kubectl_parts = shlex.split(kubectl)
self.namespace = namespace
self.pod = f"{release}-0"
self.dry_run = dry_run
self.use_token_helper = use_token_helper
self.token = token
def run(
self, args: list[str], input_text: str | None = None
) -> subprocess.CompletedProcess[str]:
rendered = "bao " + shlex.join(args)
if self.dry_run:
print(f"DRY-RUN: {rendered}")
return subprocess.CompletedProcess(args, 0, "", "")
if self.use_token_helper:
cmd = (
self.kubectl_parts
+ ["exec", "-i", "-n", self.namespace, self.pod, "--", "bao"]
+ args
)
proc_input = input_text
else:
if not self.token:
raise RuntimeError(
"OpenBao token is required unless --use-token-helper is set"
)
cmd = (
self.kubectl_parts
+ [
"exec",
"-i",
"-n",
self.namespace,
self.pod,
"--",
"sh",
"-c",
'read -r BAO_TOKEN; export BAO_TOKEN; exec bao "$@"',
"sh",
]
+ args
)
proc_input = self.token + "\n" + (input_text or "")
result = subprocess.run(cmd, input=proc_input, capture_output=True, text=True)
if result.stdout:
print(result.stdout, end="")
if result.returncode != 0:
if result.stderr:
print(result.stderr, file=sys.stderr, end="")
raise SystemExit(result.returncode)
if result.stderr:
print(result.stderr, file=sys.stderr, end="")
return result
def read_token(
token_file: str | None, dry_run: bool, use_token_helper: bool
) -> str | None:
if dry_run or use_token_helper:
return None
if token_file:
path = Path(token_file)
if not path.exists():
raise SystemExit(f"ERROR: OPENBAO_TOKEN_FILE does not exist: {path}")
lines = path.read_text(encoding="utf-8").splitlines()
token = lines[0].strip() if lines else ""
if not token:
raise SystemExit(f"ERROR: OPENBAO_TOKEN_FILE is empty: {path}")
return token
token = getpass.getpass("OpenBao token: ")
if not token:
raise SystemExit("ERROR: empty OpenBao token")
return token
def selected_appliers(selector: str) -> list[dict[str, Any]]:
if selector == "all":
return [APPLIERS["nonprod"], APPLIERS["prod"]]
try:
return [APPLIERS[selector]]
except KeyError:
raise SystemExit(f"ERROR: applier must be one of {sorted(APPLIERS) + ['all']}")
def role_args(applier: dict[str, Any]) -> list[str]:
return [
"write",
f"auth/token/roles/{applier['token_role']}",
f"allowed_policies={applier['policy_name']}",
f"disallowed_policies={','.join(DISALLOWED_POLICIES)}",
"orphan=true",
"renewable=false",
f"token_explicit_max_ttl={applier['max_ttl']}",
"token_no_default_policy=true",
"token_type=service",
]
def write_policy(runner: BaoRunner, applier: dict[str, Any], policy_dir: Path) -> None:
policy_file = policy_dir / Path(applier["policy_file"]).name
if not policy_file.exists():
raise SystemExit(f"ERROR: missing policy file: {policy_file}")
if runner.dry_run:
print(f"DRY-RUN: bao policy write {applier['policy_name']} {policy_file}")
return
runner.run(
["policy", "write", applier["policy_name"], "-"],
input_text=policy_file.read_text(encoding="utf-8"),
)
print(f"OK: policy {applier['policy_name']} applied")
def apply_applier(runner: BaoRunner, applier: dict[str, Any], policy_dir: Path) -> None:
write_policy(runner, applier, policy_dir)
runner.run(role_args(applier))
runner.run(["read", f"auth/token/roles/{applier['token_role']}"])
print(
"OK: applier role "
f"{applier['token_role']} configured for policy {applier['policy_name']}"
)
def main() -> int:
parser = argparse.ArgumentParser(
description="Apply OpenBao credential-change delegated applier policies and token roles."
)
parser.add_argument(
"--applier", choices=["nonprod", "prod", "all"], default="all"
)
parser.add_argument("--policy-dir", default="openbao/policies")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument(
"--use-token-helper",
action="store_true",
help="Use the OpenBao CLI token helper inside the pod",
)
parser.add_argument("--namespace", default=None)
parser.add_argument("--release", default=None)
parser.add_argument("--kubectl", default=None)
args = parser.parse_args()
namespace = args.namespace or os.environ.get("OPENBAO_NAMESPACE", "openbao")
release = args.release or os.environ.get("OPENBAO_RELEASE", "openbao")
kubectl = args.kubectl or os.environ.get("KUBECTL", "kubectl")
token_file = os.environ.get("OPENBAO_TOKEN_FILE")
token = read_token(token_file, args.dry_run, args.use_token_helper)
runner = BaoRunner(
kubectl=kubectl,
namespace=namespace,
release=release,
dry_run=args.dry_run,
use_token_helper=args.use_token_helper,
token=token,
)
if not args.dry_run:
runner.run(["status"])
policy_dir = REPO_DIR / args.policy_dir
for applier in selected_appliers(args.applier):
apply_applier(runner, applier, policy_dir)
print("NEXT: issue short-lived child tokens through an approved custody path only.")
print("NEXT: run scripts/credential-change.py applier-apply <CCR> with that ambient delegated authority.")
return 0
if __name__ == "__main__":
raise SystemExit(main())