#!/usr/bin/env python3 from __future__ import annotations import argparse import re import sys from pathlib import Path from typing import Any import yaml SECRET_MARKERS = [ "AGE-SECRET-KEY-1", "-----BEGIN PRIVATE KEY-----", "-----BEGIN OPENSSH PRIVATE KEY-----", "OPENBAO_ROOT_TOKEN=", "VAULT_TOKEN=", "BAO_TOKEN=", "hvs.", "hvb.", "hvc.", ] REQUIRED_DENIED_MODES = { "chat", "state-hub-body", "git", "command-line-token-argument", "llm-prompt", } ALLOWED_CREDENTIAL_TYPES = {"openbao-token"} ALLOWED_GRANT_CLASSES = {"self-service", "approval-required", "break-glass"} ALLOWED_GRANT_STATUSES = {"pilot", "active", "deprecated", "disabled"} DISALLOWED_POLICIES = {"root", "platform-admin"} TTL_RE = re.compile(r"^([1-9][0-9]*)([smhd])$") def fail(message: str) -> None: print(f"[FAIL] {message}", file=sys.stderr) def ttl_seconds(value: Any, field: str, errors: list[str]) -> int | None: if not isinstance(value, str): errors.append(f"{field} must be a string TTL such as 15m") return None match = TTL_RE.match(value) if not match: errors.append(f"{field} must match : {value!r}") return None amount = int(match.group(1)) unit = match.group(2) multiplier = {"s": 1, "m": 60, "h": 3600, "d": 86400}[unit] return amount * multiplier def require_dict(value: Any, field: str, errors: list[str]) -> dict[str, Any]: if not isinstance(value, dict): errors.append(f"{field} must be an object") return {} return value def require_list(value: Any, field: str, errors: list[str]) -> list[Any]: if not isinstance(value, list): errors.append(f"{field} must be a list") return [] return value def require_nonempty_string(value: Any, field: str, errors: list[str]) -> str: if not isinstance(value, str) or not value.strip(): errors.append(f"{field} must be a non-empty string") return "" return value.strip() def validate_grant( grant: Any, index: int, catalog: dict[str, Any], errors: list[str] ) -> str: prefix = f"grants[{index}]" grant_obj = require_dict(grant, prefix, errors) if not grant_obj: return "" grant_id = require_nonempty_string(grant_obj.get("id"), f"{prefix}.id", errors) require_nonempty_string(grant_obj.get("title"), f"{prefix}.title", errors) require_nonempty_string( grant_obj.get("description"), f"{prefix}.description", errors ) status = require_nonempty_string( grant_obj.get("status"), f"{prefix}.status", errors ) if status and status not in ALLOWED_GRANT_STATUSES: errors.append( f"{prefix}.status must be one of {sorted(ALLOWED_GRANT_STATUSES)}" ) grant_class = require_nonempty_string( grant_obj.get("grant_class"), f"{prefix}.grant_class", errors ) if grant_class and grant_class not in ALLOWED_GRANT_CLASSES: errors.append( f"{prefix}.grant_class must be one of {sorted(ALLOWED_GRANT_CLASSES)}" ) credential_type = require_nonempty_string( grant_obj.get("credential_type"), f"{prefix}.credential_type", errors ) if credential_type and credential_type not in ALLOWED_CREDENTIAL_TYPES: errors.append( f"{prefix}.credential_type must be one of {sorted(ALLOWED_CREDENTIAL_TYPES)}" ) require_nonempty_string(grant_obj.get("issuer"), f"{prefix}.issuer", errors) require_nonempty_string(grant_obj.get("audience"), f"{prefix}.audience", errors) openbao = require_dict(grant_obj.get("openbao"), f"{prefix}.openbao", errors) policies = [ str(policy) for policy in require_list( openbao.get("policies"), f"{prefix}.openbao.policies", errors ) ] if not policies: errors.append(f"{prefix}.openbao.policies must contain at least one policy") for policy in policies: if not policy or policy in DISALLOWED_POLICIES: errors.append( f"{prefix}.openbao.policies contains disallowed policy: {policy!r}" ) configured_disallowed = set( str(policy) for policy in require_list( openbao.get("disallowed_policies"), f"{prefix}.openbao.disallowed_policies", errors, ) ) missing_disallowed = DISALLOWED_POLICIES - configured_disallowed if missing_disallowed: errors.append( f"{prefix}.openbao.disallowed_policies missing {sorted(missing_disallowed)}" ) require_nonempty_string( openbao.get("token_role"), f"{prefix}.openbao.token_role", errors ) require_nonempty_string( openbao.get("issuer_policy"), f"{prefix}.openbao.issuer_policy", errors ) require_list(openbao.get("mount_paths"), f"{prefix}.openbao.mount_paths", errors) ttl = require_dict(grant_obj.get("ttl"), f"{prefix}.ttl", errors) default_ttl = ttl_seconds(ttl.get("default"), f"{prefix}.ttl.default", errors) max_ttl = ttl_seconds(ttl.get("max"), f"{prefix}.ttl.max", errors) if default_ttl is not None and max_ttl is not None and default_ttl > max_ttl: errors.append(f"{prefix}.ttl.default must not exceed ttl.max") if ttl.get("renewable") is not False: errors.append(f"{prefix}.ttl.renewable must be false for the pilot") actors = require_dict(grant_obj.get("actors"), f"{prefix}.actors", errors) allowed_actor_types = require_list( actors.get("allowed_types"), f"{prefix}.actors.allowed_types", errors ) if not allowed_actor_types: errors.append(f"{prefix}.actors.allowed_types must not be empty") require_nonempty_string( actors.get("required_subject_binding"), f"{prefix}.actors.required_subject_binding", errors, ) authorization = require_dict( grant_obj.get("authorization"), f"{prefix}.authorization", errors ) if authorization.get("purpose_required") is not True: errors.append(f"{prefix}.authorization.purpose_required must be true") require_list( authorization.get("allowed_purpose_examples"), f"{prefix}.authorization.allowed_purpose_examples", errors, ) delivery = require_dict(grant_obj.get("delivery"), f"{prefix}.delivery", errors) modes = catalog.get("delivery_modes", {}) allowed_known = set( str(mode) for mode in require_list( modes.get("allowed_known"), "delivery_modes.allowed_known", errors ) ) denied_known = set( str(mode) for mode in require_list( modes.get("denied_known"), "delivery_modes.denied_known", errors ) ) allowed = set( str(mode) for mode in require_list( delivery.get("allowed"), f"{prefix}.delivery.allowed", errors ) ) denied = set( str(mode) for mode in require_list( delivery.get("denied"), f"{prefix}.delivery.denied", errors ) ) if allowed - allowed_known: errors.append( f"{prefix}.delivery.allowed has unknown modes: {sorted(allowed - allowed_known)}" ) if denied - denied_known: errors.append( f"{prefix}.delivery.denied has unknown modes: {sorted(denied - denied_known)}" ) if allowed & denied: errors.append( f"{prefix}.delivery modes both allowed and denied: {sorted(allowed & denied)}" ) missing_denied = REQUIRED_DENIED_MODES - denied if missing_denied: errors.append(f"{prefix}.delivery.denied missing {sorted(missing_denied)}") preferred = require_nonempty_string( delivery.get("preferred"), f"{prefix}.delivery.preferred", errors ) if preferred and preferred not in allowed: errors.append(f"{prefix}.delivery.preferred must be in delivery.allowed") if "local-token-file" in allowed: local_file = require_dict( delivery.get("local_token_file"), f"{prefix}.delivery.local_token_file", errors, ) if local_file.get("directory") != ".local/credential-leases": errors.append( f"{prefix}.delivery.local_token_file.directory must be .local/credential-leases" ) if str(local_file.get("mode")) != "0600": errors.append(f"{prefix}.delivery.local_token_file.mode must be 0600") if "kubernetes-auth" in allowed: kubernetes_auth = require_dict( delivery.get("kubernetes_auth"), f"{prefix}.delivery.kubernetes_auth", errors, ) require_nonempty_string( kubernetes_auth.get("mount"), f"{prefix}.delivery.kubernetes_auth.mount", errors, ) require_nonempty_string( kubernetes_auth.get("role"), f"{prefix}.delivery.kubernetes_auth.role", errors, ) if not require_list( kubernetes_auth.get("service_account_names"), f"{prefix}.delivery.kubernetes_auth.service_account_names", errors, ): errors.append( f"{prefix}.delivery.kubernetes_auth.service_account_names must not be empty" ) if not require_list( kubernetes_auth.get("namespaces"), f"{prefix}.delivery.kubernetes_auth.namespaces", errors, ): errors.append( f"{prefix}.delivery.kubernetes_auth.namespaces must not be empty" ) audit = require_dict(grant_obj.get("audit"), f"{prefix}.audit", errors) if audit.get("openbao_audit_required") is not True: errors.append(f"{prefix}.audit.openbao_audit_required must be true") if audit.get("record_secret_values") is not False: errors.append(f"{prefix}.audit.record_secret_values must be false") revocation = require_dict( grant_obj.get("revocation"), f"{prefix}.revocation", errors ) if revocation.get("required") is not True: errors.append(f"{prefix}.revocation.required must be true") if revocation.get("by_accessor") is not True: errors.append(f"{prefix}.revocation.by_accessor must be true") return grant_id def main() -> int: parser = argparse.ArgumentParser( description="Validate non-secret credential grant catalog." ) parser.add_argument( "catalog", nargs="?", default="credential-grants/catalog.yaml", help="Path to catalog YAML", ) args = parser.parse_args() path = Path(args.catalog) if not path.exists(): fail(f"catalog file is missing: {path}") return 1 raw = path.read_text(encoding="utf-8") errors: list[str] = [] for marker in SECRET_MARKERS: if marker in raw: errors.append(f"secret-looking marker present: {marker}") try: catalog = yaml.safe_load(raw) except yaml.YAMLError as exc: fail(f"catalog YAML is invalid: {exc}") return 1 catalog_obj = require_dict(catalog, "catalog", errors) if catalog_obj.get("version") != 1: errors.append("version must be 1") require_nonempty_string(catalog_obj.get("updated"), "updated", errors) require_nonempty_string(catalog_obj.get("owner_repo"), "owner_repo", errors) require_nonempty_string(catalog_obj.get("workplan_id"), "workplan_id", errors) delivery_modes = require_dict( catalog_obj.get("delivery_modes"), "delivery_modes", errors ) allowed_known = set( str(mode) for mode in require_list( delivery_modes.get("allowed_known"), "delivery_modes.allowed_known", errors ) ) denied_known = set( str(mode) for mode in require_list( delivery_modes.get("denied_known"), "delivery_modes.denied_known", errors ) ) if not REQUIRED_DENIED_MODES.issubset(denied_known): errors.append( f"delivery_modes.denied_known missing {sorted(REQUIRED_DENIED_MODES - denied_known)}" ) if allowed_known & denied_known: errors.append( f"delivery_modes overlap between allowed and denied: {sorted(allowed_known & denied_known)}" ) grant_classes = set( str(item) for item in require_list( catalog_obj.get("grant_classes"), "grant_classes", errors ) ) if grant_classes != ALLOWED_GRANT_CLASSES: errors.append(f"grant_classes must be exactly {sorted(ALLOWED_GRANT_CLASSES)}") grants = require_list(catalog_obj.get("grants"), "grants", errors) if not grants: errors.append("grants must not be empty") seen: set[str] = set() for index, grant in enumerate(grants): grant_id = validate_grant(grant, index, catalog_obj, errors) if grant_id: if grant_id in seen: errors.append(f"duplicate grant id: {grant_id}") seen.add(grant_id) if "ops-warden/warden-sign" not in seen: errors.append("initial grant ops-warden/warden-sign is required") if errors: for error in errors: fail(error) return 1 print(f"[OK] credential grant catalog is valid: {path}") print(f"[OK] grants: {len(grants)}") for grant in grants: print(f"[OK] {grant['id']}: {grant['grant_class']} {grant['credential_type']}") return 0 if __name__ == "__main__": raise SystemExit(main())