Files
railiance-platform/scripts/credential-grants-validate.py

358 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import re
import sys
from pathlib import Path
from typing import Any
import yaml
SECRET_MARKERS = [
"AGE-SECRET-KEY-1",
"-----BEGIN PRIVATE KEY-----",
"-----BEGIN OPENSSH PRIVATE KEY-----",
"OPENBAO_ROOT_TOKEN=",
"VAULT_TOKEN=",
"BAO_TOKEN=",
"hvs.",
"hvb.",
"hvc.",
]
REQUIRED_DENIED_MODES = {
"chat",
"state-hub-body",
"git",
"command-line-token-argument",
"llm-prompt",
}
ALLOWED_CREDENTIAL_TYPES = {"openbao-token"}
ALLOWED_GRANT_CLASSES = {"self-service", "approval-required", "break-glass"}
ALLOWED_GRANT_STATUSES = {"pilot", "active", "deprecated", "disabled"}
DISALLOWED_POLICIES = {"root", "platform-admin"}
TTL_RE = re.compile(r"^([1-9][0-9]*)([smhd])$")
def fail(message: str) -> None:
print(f"[FAIL] {message}", file=sys.stderr)
def ttl_seconds(value: Any, field: str, errors: list[str]) -> int | None:
if not isinstance(value, str):
errors.append(f"{field} must be a string TTL such as 15m")
return None
match = TTL_RE.match(value)
if not match:
errors.append(f"{field} must match <positive integer><s|m|h|d>: {value!r}")
return None
amount = int(match.group(1))
unit = match.group(2)
multiplier = {"s": 1, "m": 60, "h": 3600, "d": 86400}[unit]
return amount * multiplier
def require_dict(value: Any, field: str, errors: list[str]) -> dict[str, Any]:
if not isinstance(value, dict):
errors.append(f"{field} must be an object")
return {}
return value
def require_list(value: Any, field: str, errors: list[str]) -> list[Any]:
if not isinstance(value, list):
errors.append(f"{field} must be a list")
return []
return value
def require_nonempty_string(value: Any, field: str, errors: list[str]) -> str:
if not isinstance(value, str) or not value.strip():
errors.append(f"{field} must be a non-empty string")
return ""
return value.strip()
def validate_grant(
grant: Any, index: int, catalog: dict[str, Any], errors: list[str]
) -> str:
prefix = f"grants[{index}]"
grant_obj = require_dict(grant, prefix, errors)
if not grant_obj:
return ""
grant_id = require_nonempty_string(grant_obj.get("id"), f"{prefix}.id", errors)
require_nonempty_string(grant_obj.get("title"), f"{prefix}.title", errors)
require_nonempty_string(
grant_obj.get("description"), f"{prefix}.description", errors
)
status = require_nonempty_string(
grant_obj.get("status"), f"{prefix}.status", errors
)
if status and status not in ALLOWED_GRANT_STATUSES:
errors.append(
f"{prefix}.status must be one of {sorted(ALLOWED_GRANT_STATUSES)}"
)
grant_class = require_nonempty_string(
grant_obj.get("grant_class"), f"{prefix}.grant_class", errors
)
if grant_class and grant_class not in ALLOWED_GRANT_CLASSES:
errors.append(
f"{prefix}.grant_class must be one of {sorted(ALLOWED_GRANT_CLASSES)}"
)
credential_type = require_nonempty_string(
grant_obj.get("credential_type"), f"{prefix}.credential_type", errors
)
if credential_type and credential_type not in ALLOWED_CREDENTIAL_TYPES:
errors.append(
f"{prefix}.credential_type must be one of {sorted(ALLOWED_CREDENTIAL_TYPES)}"
)
require_nonempty_string(grant_obj.get("issuer"), f"{prefix}.issuer", errors)
require_nonempty_string(grant_obj.get("audience"), f"{prefix}.audience", errors)
openbao = require_dict(grant_obj.get("openbao"), f"{prefix}.openbao", errors)
policies = [
str(policy)
for policy in require_list(
openbao.get("policies"), f"{prefix}.openbao.policies", errors
)
]
if not policies:
errors.append(f"{prefix}.openbao.policies must contain at least one policy")
for policy in policies:
if not policy or policy in DISALLOWED_POLICIES:
errors.append(
f"{prefix}.openbao.policies contains disallowed policy: {policy!r}"
)
configured_disallowed = set(
str(policy)
for policy in require_list(
openbao.get("disallowed_policies"),
f"{prefix}.openbao.disallowed_policies",
errors,
)
)
missing_disallowed = DISALLOWED_POLICIES - configured_disallowed
if missing_disallowed:
errors.append(
f"{prefix}.openbao.disallowed_policies missing {sorted(missing_disallowed)}"
)
require_nonempty_string(
openbao.get("token_role"), f"{prefix}.openbao.token_role", errors
)
require_nonempty_string(
openbao.get("issuer_policy"), f"{prefix}.openbao.issuer_policy", errors
)
require_list(openbao.get("mount_paths"), f"{prefix}.openbao.mount_paths", errors)
ttl = require_dict(grant_obj.get("ttl"), f"{prefix}.ttl", errors)
default_ttl = ttl_seconds(ttl.get("default"), f"{prefix}.ttl.default", errors)
max_ttl = ttl_seconds(ttl.get("max"), f"{prefix}.ttl.max", errors)
if default_ttl is not None and max_ttl is not None and default_ttl > max_ttl:
errors.append(f"{prefix}.ttl.default must not exceed ttl.max")
if ttl.get("renewable") is not False:
errors.append(f"{prefix}.ttl.renewable must be false for the pilot")
actors = require_dict(grant_obj.get("actors"), f"{prefix}.actors", errors)
allowed_actor_types = require_list(
actors.get("allowed_types"), f"{prefix}.actors.allowed_types", errors
)
if not allowed_actor_types:
errors.append(f"{prefix}.actors.allowed_types must not be empty")
require_nonempty_string(
actors.get("required_subject_binding"),
f"{prefix}.actors.required_subject_binding",
errors,
)
authorization = require_dict(
grant_obj.get("authorization"), f"{prefix}.authorization", errors
)
if authorization.get("purpose_required") is not True:
errors.append(f"{prefix}.authorization.purpose_required must be true")
require_list(
authorization.get("allowed_purpose_examples"),
f"{prefix}.authorization.allowed_purpose_examples",
errors,
)
delivery = require_dict(grant_obj.get("delivery"), f"{prefix}.delivery", errors)
modes = catalog.get("delivery_modes", {})
allowed_known = set(
str(mode)
for mode in require_list(
modes.get("allowed_known"), "delivery_modes.allowed_known", errors
)
)
denied_known = set(
str(mode)
for mode in require_list(
modes.get("denied_known"), "delivery_modes.denied_known", errors
)
)
allowed = set(
str(mode)
for mode in require_list(
delivery.get("allowed"), f"{prefix}.delivery.allowed", errors
)
)
denied = set(
str(mode)
for mode in require_list(
delivery.get("denied"), f"{prefix}.delivery.denied", errors
)
)
if allowed - allowed_known:
errors.append(
f"{prefix}.delivery.allowed has unknown modes: {sorted(allowed - allowed_known)}"
)
if denied - denied_known:
errors.append(
f"{prefix}.delivery.denied has unknown modes: {sorted(denied - denied_known)}"
)
if allowed & denied:
errors.append(
f"{prefix}.delivery modes both allowed and denied: {sorted(allowed & denied)}"
)
missing_denied = REQUIRED_DENIED_MODES - denied
if missing_denied:
errors.append(f"{prefix}.delivery.denied missing {sorted(missing_denied)}")
preferred = require_nonempty_string(
delivery.get("preferred"), f"{prefix}.delivery.preferred", errors
)
if preferred and preferred not in allowed:
errors.append(f"{prefix}.delivery.preferred must be in delivery.allowed")
if "local-token-file" in allowed:
local_file = require_dict(
delivery.get("local_token_file"),
f"{prefix}.delivery.local_token_file",
errors,
)
if local_file.get("directory") != ".local/credential-leases":
errors.append(
f"{prefix}.delivery.local_token_file.directory must be .local/credential-leases"
)
if str(local_file.get("mode")) != "0600":
errors.append(f"{prefix}.delivery.local_token_file.mode must be 0600")
audit = require_dict(grant_obj.get("audit"), f"{prefix}.audit", errors)
if audit.get("openbao_audit_required") is not True:
errors.append(f"{prefix}.audit.openbao_audit_required must be true")
if audit.get("record_secret_values") is not False:
errors.append(f"{prefix}.audit.record_secret_values must be false")
revocation = require_dict(
grant_obj.get("revocation"), f"{prefix}.revocation", errors
)
if revocation.get("required") is not True:
errors.append(f"{prefix}.revocation.required must be true")
if revocation.get("by_accessor") is not True:
errors.append(f"{prefix}.revocation.by_accessor must be true")
return grant_id
def main() -> int:
parser = argparse.ArgumentParser(
description="Validate non-secret credential grant catalog."
)
parser.add_argument(
"catalog",
nargs="?",
default="credential-grants/catalog.yaml",
help="Path to catalog YAML",
)
args = parser.parse_args()
path = Path(args.catalog)
if not path.exists():
fail(f"catalog file is missing: {path}")
return 1
raw = path.read_text(encoding="utf-8")
errors: list[str] = []
for marker in SECRET_MARKERS:
if marker in raw:
errors.append(f"secret-looking marker present: {marker}")
try:
catalog = yaml.safe_load(raw)
except yaml.YAMLError as exc:
fail(f"catalog YAML is invalid: {exc}")
return 1
catalog_obj = require_dict(catalog, "catalog", errors)
if catalog_obj.get("version") != 1:
errors.append("version must be 1")
require_nonempty_string(catalog_obj.get("updated"), "updated", errors)
require_nonempty_string(catalog_obj.get("owner_repo"), "owner_repo", errors)
require_nonempty_string(catalog_obj.get("workplan_id"), "workplan_id", errors)
delivery_modes = require_dict(
catalog_obj.get("delivery_modes"), "delivery_modes", errors
)
allowed_known = set(
str(mode)
for mode in require_list(
delivery_modes.get("allowed_known"), "delivery_modes.allowed_known", errors
)
)
denied_known = set(
str(mode)
for mode in require_list(
delivery_modes.get("denied_known"), "delivery_modes.denied_known", errors
)
)
if not REQUIRED_DENIED_MODES.issubset(denied_known):
errors.append(
f"delivery_modes.denied_known missing {sorted(REQUIRED_DENIED_MODES - denied_known)}"
)
if allowed_known & denied_known:
errors.append(
f"delivery_modes overlap between allowed and denied: {sorted(allowed_known & denied_known)}"
)
grant_classes = set(
str(item)
for item in require_list(
catalog_obj.get("grant_classes"), "grant_classes", errors
)
)
if grant_classes != ALLOWED_GRANT_CLASSES:
errors.append(f"grant_classes must be exactly {sorted(ALLOWED_GRANT_CLASSES)}")
grants = require_list(catalog_obj.get("grants"), "grants", errors)
if not grants:
errors.append("grants must not be empty")
seen: set[str] = set()
for index, grant in enumerate(grants):
grant_id = validate_grant(grant, index, catalog_obj, errors)
if grant_id:
if grant_id in seen:
errors.append(f"duplicate grant id: {grant_id}")
seen.add(grant_id)
if "ops-warden/warden-sign" not in seen:
errors.append("initial grant ops-warden/warden-sign is required")
if errors:
for error in errors:
fail(error)
return 1
print(f"[OK] credential grant catalog is valid: {path}")
print(f"[OK] grants: {len(grants)}")
for grant in grants:
print(f"[OK] {grant['id']}: {grant['grant_class']} {grant['credential_type']}")
return 0
if __name__ == "__main__":
raise SystemExit(main())