220 lines
6.9 KiB
Python
Executable File
220 lines
6.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import getpass
|
|
import shlex
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yaml
|
|
|
|
REPO_DIR = Path(__file__).resolve().parents[1]
|
|
|
|
|
|
class BaoRunner:
|
|
def __init__(
|
|
self,
|
|
*,
|
|
kubectl: str,
|
|
namespace: str,
|
|
release: str,
|
|
dry_run: bool,
|
|
use_token_helper: bool,
|
|
token: str | None,
|
|
) -> None:
|
|
self.kubectl_parts = shlex.split(kubectl)
|
|
self.namespace = namespace
|
|
self.pod = f"{release}-0"
|
|
self.dry_run = dry_run
|
|
self.use_token_helper = use_token_helper
|
|
self.token = token
|
|
|
|
def run(
|
|
self, args: list[str], input_text: str | None = None
|
|
) -> subprocess.CompletedProcess[str]:
|
|
rendered = "bao " + shlex.join(args)
|
|
if self.dry_run:
|
|
print(f"DRY-RUN: {rendered}")
|
|
return subprocess.CompletedProcess(args, 0, "", "")
|
|
|
|
if self.use_token_helper:
|
|
cmd = (
|
|
self.kubectl_parts
|
|
+ ["exec", "-i", "-n", self.namespace, self.pod, "--", "bao"]
|
|
+ args
|
|
)
|
|
proc_input = input_text
|
|
else:
|
|
if not self.token:
|
|
raise RuntimeError(
|
|
"OpenBao token is required unless --use-token-helper is set"
|
|
)
|
|
cmd = (
|
|
self.kubectl_parts
|
|
+ [
|
|
"exec",
|
|
"-i",
|
|
"-n",
|
|
self.namespace,
|
|
self.pod,
|
|
"--",
|
|
"sh",
|
|
"-c",
|
|
'read -r BAO_TOKEN; export BAO_TOKEN; exec bao "$@"',
|
|
"sh",
|
|
]
|
|
+ args
|
|
)
|
|
proc_input = self.token + "\n" + (input_text or "")
|
|
|
|
result = subprocess.run(cmd, input=proc_input, capture_output=True, text=True)
|
|
if result.stdout:
|
|
print(result.stdout, end="")
|
|
if result.returncode != 0:
|
|
if result.stderr:
|
|
print(result.stderr, file=sys.stderr, end="")
|
|
raise SystemExit(result.returncode)
|
|
if result.stderr:
|
|
print(result.stderr, file=sys.stderr, end="")
|
|
return result
|
|
|
|
|
|
def read_token(
|
|
token_file: str | None, dry_run: bool, use_token_helper: bool
|
|
) -> str | None:
|
|
if dry_run or use_token_helper:
|
|
return None
|
|
if token_file:
|
|
path = Path(token_file)
|
|
if not path.exists():
|
|
raise SystemExit(f"ERROR: OPENBAO_TOKEN_FILE does not exist: {path}")
|
|
lines = path.read_text(encoding="utf-8").splitlines()
|
|
token = lines[0].strip() if lines else ""
|
|
if not token:
|
|
raise SystemExit(f"ERROR: OPENBAO_TOKEN_FILE is empty: {path}")
|
|
return token
|
|
token = getpass.getpass("OpenBao token: ")
|
|
if not token:
|
|
raise SystemExit("ERROR: empty OpenBao token")
|
|
return token
|
|
|
|
|
|
def load_catalog(path: Path) -> dict[str, Any]:
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
data = yaml.safe_load(handle)
|
|
if not isinstance(data, dict):
|
|
raise SystemExit(f"ERROR: catalog root must be an object: {path}")
|
|
return data
|
|
|
|
|
|
def selected_grants(
|
|
catalog: dict[str, Any], grant_id: str | None
|
|
) -> list[dict[str, Any]]:
|
|
grants = catalog.get("grants") or []
|
|
if not isinstance(grants, list):
|
|
raise SystemExit("ERROR: catalog grants must be a list")
|
|
selected = [
|
|
grant for grant in grants if not grant_id or grant.get("id") == grant_id
|
|
]
|
|
if grant_id and not selected:
|
|
raise SystemExit(f"ERROR: grant not found in catalog: {grant_id}")
|
|
return selected
|
|
|
|
|
|
def role_args(grant: dict[str, Any]) -> list[str]:
|
|
openbao = grant["openbao"]
|
|
ttl = grant["ttl"]
|
|
policies = ",".join(openbao["policies"])
|
|
disallowed = ",".join(openbao.get("disallowed_policies") or [])
|
|
args = [
|
|
"write",
|
|
f"auth/token/roles/{openbao['token_role']}",
|
|
f"allowed_policies={policies}",
|
|
f"disallowed_policies={disallowed}",
|
|
"orphan=true",
|
|
"renewable=false",
|
|
f"token_explicit_max_ttl={ttl['max']}",
|
|
"token_no_default_policy=true",
|
|
"token_type=service",
|
|
]
|
|
return args
|
|
|
|
|
|
def write_policy(runner: BaoRunner, name: str, policy_file: Path) -> None:
|
|
if not policy_file.exists():
|
|
raise SystemExit(f"ERROR: missing policy file: {policy_file}")
|
|
if runner.dry_run:
|
|
print(f"DRY-RUN: bao policy write {name} {policy_file}")
|
|
return
|
|
runner.run(
|
|
["policy", "write", name, "-"],
|
|
input_text=policy_file.read_text(encoding="utf-8"),
|
|
)
|
|
print(f"OK: policy {name} applied")
|
|
|
|
|
|
def apply_grant(runner: BaoRunner, grant: dict[str, Any], policy_dir: Path) -> None:
|
|
if grant.get("credential_type") != "openbao-token":
|
|
print(f"SKIP: {grant.get('id')} is not an openbao-token grant")
|
|
return
|
|
openbao = grant["openbao"]
|
|
issuer_policy = openbao["issuer_policy"]
|
|
policy_file = policy_dir / f"{issuer_policy}.hcl"
|
|
write_policy(runner, issuer_policy, policy_file)
|
|
runner.run(role_args(grant))
|
|
runner.run(["read", f"auth/token/roles/{openbao['token_role']}"])
|
|
print(f"OK: token role {openbao['token_role']} configured for grant {grant['id']}")
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(
|
|
description="Apply OpenBao token roles for credential grants."
|
|
)
|
|
parser.add_argument("--catalog", default="credential-grants/catalog.yaml")
|
|
parser.add_argument("--policy-dir", default="openbao/policies")
|
|
parser.add_argument("--grant", help="Limit to one grant id")
|
|
parser.add_argument("--dry-run", action="store_true")
|
|
parser.add_argument(
|
|
"--use-token-helper",
|
|
action="store_true",
|
|
help="Use the OpenBao CLI token helper inside the pod",
|
|
)
|
|
parser.add_argument("--namespace", default=None)
|
|
parser.add_argument("--release", default=None)
|
|
parser.add_argument("--kubectl", default=None)
|
|
args = parser.parse_args()
|
|
|
|
import os
|
|
|
|
namespace = args.namespace or os.environ.get("OPENBAO_NAMESPACE", "openbao")
|
|
release = args.release or os.environ.get("OPENBAO_RELEASE", "openbao")
|
|
kubectl = args.kubectl or os.environ.get("KUBECTL", "kubectl")
|
|
token_file = os.environ.get("OPENBAO_TOKEN_FILE")
|
|
token = read_token(token_file, args.dry_run, args.use_token_helper)
|
|
|
|
catalog = load_catalog(REPO_DIR / args.catalog)
|
|
runner = BaoRunner(
|
|
kubectl=kubectl,
|
|
namespace=namespace,
|
|
release=release,
|
|
dry_run=args.dry_run,
|
|
use_token_helper=args.use_token_helper,
|
|
token=token,
|
|
)
|
|
|
|
if not args.dry_run:
|
|
runner.run(["status"])
|
|
|
|
policy_dir = REPO_DIR / args.policy_dir
|
|
for grant in selected_grants(catalog, args.grant):
|
|
apply_grant(runner, grant, policy_dir)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|