355 lines
12 KiB
Python
Executable File
355 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yaml
|
|
|
|
SECRET_MARKERS = [
|
|
"AGE-SECRET-KEY-1",
|
|
"-----BEGIN PRIVATE KEY-----",
|
|
"-----BEGIN OPENSSH PRIVATE KEY-----",
|
|
"OPENBAO_ROOT_TOKEN=",
|
|
"VAULT_TOKEN=",
|
|
"BAO_TOKEN=",
|
|
"hvs.",
|
|
"hvb.",
|
|
"hvc.",
|
|
]
|
|
|
|
REQUIRED_DENIED_MODES = {
|
|
"chat",
|
|
"state-hub-body",
|
|
"git",
|
|
"command-line-token-argument",
|
|
"llm-prompt",
|
|
}
|
|
|
|
ALLOWED_CREDENTIAL_TYPES = {"openbao-token"}
|
|
ALLOWED_GRANT_CLASSES = {"self-service", "approval-required", "break-glass"}
|
|
ALLOWED_GRANT_STATUSES = {"pilot", "active", "deprecated", "disabled"}
|
|
DISALLOWED_POLICIES = {"root", "platform-admin"}
|
|
TTL_RE = re.compile(r"^([1-9][0-9]*)([smhd])$")
|
|
|
|
|
|
def fail(message: str) -> None:
|
|
print(f"[FAIL] {message}", file=sys.stderr)
|
|
|
|
|
|
def ttl_seconds(value: Any, field: str, errors: list[str]) -> int | None:
|
|
if not isinstance(value, str):
|
|
errors.append(f"{field} must be a string TTL such as 15m")
|
|
return None
|
|
match = TTL_RE.match(value)
|
|
if not match:
|
|
errors.append(f"{field} must match <positive integer><s|m|h|d>: {value!r}")
|
|
return None
|
|
amount = int(match.group(1))
|
|
unit = match.group(2)
|
|
multiplier = {"s": 1, "m": 60, "h": 3600, "d": 86400}[unit]
|
|
return amount * multiplier
|
|
|
|
|
|
def require_dict(value: Any, field: str, errors: list[str]) -> dict[str, Any]:
|
|
if not isinstance(value, dict):
|
|
errors.append(f"{field} must be an object")
|
|
return {}
|
|
return value
|
|
|
|
|
|
def require_list(value: Any, field: str, errors: list[str]) -> list[Any]:
|
|
if not isinstance(value, list):
|
|
errors.append(f"{field} must be a list")
|
|
return []
|
|
return value
|
|
|
|
|
|
def require_nonempty_string(value: Any, field: str, errors: list[str]) -> str:
|
|
if not isinstance(value, str) or not value.strip():
|
|
errors.append(f"{field} must be a non-empty string")
|
|
return ""
|
|
return value.strip()
|
|
|
|
|
|
def validate_grant(
|
|
grant: Any, index: int, catalog: dict[str, Any], errors: list[str]
|
|
) -> str:
|
|
prefix = f"grants[{index}]"
|
|
grant_obj = require_dict(grant, prefix, errors)
|
|
if not grant_obj:
|
|
return ""
|
|
|
|
grant_id = require_nonempty_string(grant_obj.get("id"), f"{prefix}.id", errors)
|
|
require_nonempty_string(grant_obj.get("title"), f"{prefix}.title", errors)
|
|
require_nonempty_string(
|
|
grant_obj.get("description"), f"{prefix}.description", errors
|
|
)
|
|
|
|
status = require_nonempty_string(
|
|
grant_obj.get("status"), f"{prefix}.status", errors
|
|
)
|
|
if status and status not in ALLOWED_GRANT_STATUSES:
|
|
errors.append(
|
|
f"{prefix}.status must be one of {sorted(ALLOWED_GRANT_STATUSES)}"
|
|
)
|
|
|
|
grant_class = require_nonempty_string(
|
|
grant_obj.get("grant_class"), f"{prefix}.grant_class", errors
|
|
)
|
|
if grant_class and grant_class not in ALLOWED_GRANT_CLASSES:
|
|
errors.append(
|
|
f"{prefix}.grant_class must be one of {sorted(ALLOWED_GRANT_CLASSES)}"
|
|
)
|
|
|
|
credential_type = require_nonempty_string(
|
|
grant_obj.get("credential_type"), f"{prefix}.credential_type", errors
|
|
)
|
|
if credential_type and credential_type not in ALLOWED_CREDENTIAL_TYPES:
|
|
errors.append(
|
|
f"{prefix}.credential_type must be one of {sorted(ALLOWED_CREDENTIAL_TYPES)}"
|
|
)
|
|
|
|
require_nonempty_string(grant_obj.get("issuer"), f"{prefix}.issuer", errors)
|
|
require_nonempty_string(grant_obj.get("audience"), f"{prefix}.audience", errors)
|
|
|
|
openbao = require_dict(grant_obj.get("openbao"), f"{prefix}.openbao", errors)
|
|
policies = [
|
|
str(policy)
|
|
for policy in require_list(
|
|
openbao.get("policies"), f"{prefix}.openbao.policies", errors
|
|
)
|
|
]
|
|
if not policies:
|
|
errors.append(f"{prefix}.openbao.policies must contain at least one policy")
|
|
for policy in policies:
|
|
if not policy or policy in DISALLOWED_POLICIES:
|
|
errors.append(
|
|
f"{prefix}.openbao.policies contains disallowed policy: {policy!r}"
|
|
)
|
|
configured_disallowed = set(
|
|
str(policy)
|
|
for policy in require_list(
|
|
openbao.get("disallowed_policies"),
|
|
f"{prefix}.openbao.disallowed_policies",
|
|
errors,
|
|
)
|
|
)
|
|
missing_disallowed = DISALLOWED_POLICIES - configured_disallowed
|
|
if missing_disallowed:
|
|
errors.append(
|
|
f"{prefix}.openbao.disallowed_policies missing {sorted(missing_disallowed)}"
|
|
)
|
|
require_nonempty_string(
|
|
openbao.get("token_role"), f"{prefix}.openbao.token_role", errors
|
|
)
|
|
require_list(openbao.get("mount_paths"), f"{prefix}.openbao.mount_paths", errors)
|
|
|
|
ttl = require_dict(grant_obj.get("ttl"), f"{prefix}.ttl", errors)
|
|
default_ttl = ttl_seconds(ttl.get("default"), f"{prefix}.ttl.default", errors)
|
|
max_ttl = ttl_seconds(ttl.get("max"), f"{prefix}.ttl.max", errors)
|
|
if default_ttl is not None and max_ttl is not None and default_ttl > max_ttl:
|
|
errors.append(f"{prefix}.ttl.default must not exceed ttl.max")
|
|
if ttl.get("renewable") is not False:
|
|
errors.append(f"{prefix}.ttl.renewable must be false for the pilot")
|
|
|
|
actors = require_dict(grant_obj.get("actors"), f"{prefix}.actors", errors)
|
|
allowed_actor_types = require_list(
|
|
actors.get("allowed_types"), f"{prefix}.actors.allowed_types", errors
|
|
)
|
|
if not allowed_actor_types:
|
|
errors.append(f"{prefix}.actors.allowed_types must not be empty")
|
|
require_nonempty_string(
|
|
actors.get("required_subject_binding"),
|
|
f"{prefix}.actors.required_subject_binding",
|
|
errors,
|
|
)
|
|
|
|
authorization = require_dict(
|
|
grant_obj.get("authorization"), f"{prefix}.authorization", errors
|
|
)
|
|
if authorization.get("purpose_required") is not True:
|
|
errors.append(f"{prefix}.authorization.purpose_required must be true")
|
|
require_list(
|
|
authorization.get("allowed_purpose_examples"),
|
|
f"{prefix}.authorization.allowed_purpose_examples",
|
|
errors,
|
|
)
|
|
|
|
delivery = require_dict(grant_obj.get("delivery"), f"{prefix}.delivery", errors)
|
|
modes = catalog.get("delivery_modes", {})
|
|
allowed_known = set(
|
|
str(mode)
|
|
for mode in require_list(
|
|
modes.get("allowed_known"), "delivery_modes.allowed_known", errors
|
|
)
|
|
)
|
|
denied_known = set(
|
|
str(mode)
|
|
for mode in require_list(
|
|
modes.get("denied_known"), "delivery_modes.denied_known", errors
|
|
)
|
|
)
|
|
allowed = set(
|
|
str(mode)
|
|
for mode in require_list(
|
|
delivery.get("allowed"), f"{prefix}.delivery.allowed", errors
|
|
)
|
|
)
|
|
denied = set(
|
|
str(mode)
|
|
for mode in require_list(
|
|
delivery.get("denied"), f"{prefix}.delivery.denied", errors
|
|
)
|
|
)
|
|
if allowed - allowed_known:
|
|
errors.append(
|
|
f"{prefix}.delivery.allowed has unknown modes: {sorted(allowed - allowed_known)}"
|
|
)
|
|
if denied - denied_known:
|
|
errors.append(
|
|
f"{prefix}.delivery.denied has unknown modes: {sorted(denied - denied_known)}"
|
|
)
|
|
if allowed & denied:
|
|
errors.append(
|
|
f"{prefix}.delivery modes both allowed and denied: {sorted(allowed & denied)}"
|
|
)
|
|
missing_denied = REQUIRED_DENIED_MODES - denied
|
|
if missing_denied:
|
|
errors.append(f"{prefix}.delivery.denied missing {sorted(missing_denied)}")
|
|
preferred = require_nonempty_string(
|
|
delivery.get("preferred"), f"{prefix}.delivery.preferred", errors
|
|
)
|
|
if preferred and preferred not in allowed:
|
|
errors.append(f"{prefix}.delivery.preferred must be in delivery.allowed")
|
|
if "local-token-file" in allowed:
|
|
local_file = require_dict(
|
|
delivery.get("local_token_file"),
|
|
f"{prefix}.delivery.local_token_file",
|
|
errors,
|
|
)
|
|
if local_file.get("directory") != ".local/credential-leases":
|
|
errors.append(
|
|
f"{prefix}.delivery.local_token_file.directory must be .local/credential-leases"
|
|
)
|
|
if str(local_file.get("mode")) != "0600":
|
|
errors.append(f"{prefix}.delivery.local_token_file.mode must be 0600")
|
|
|
|
audit = require_dict(grant_obj.get("audit"), f"{prefix}.audit", errors)
|
|
if audit.get("openbao_audit_required") is not True:
|
|
errors.append(f"{prefix}.audit.openbao_audit_required must be true")
|
|
if audit.get("record_secret_values") is not False:
|
|
errors.append(f"{prefix}.audit.record_secret_values must be false")
|
|
|
|
revocation = require_dict(
|
|
grant_obj.get("revocation"), f"{prefix}.revocation", errors
|
|
)
|
|
if revocation.get("required") is not True:
|
|
errors.append(f"{prefix}.revocation.required must be true")
|
|
if revocation.get("by_accessor") is not True:
|
|
errors.append(f"{prefix}.revocation.by_accessor must be true")
|
|
|
|
return grant_id
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(
|
|
description="Validate non-secret credential grant catalog."
|
|
)
|
|
parser.add_argument(
|
|
"catalog",
|
|
nargs="?",
|
|
default="credential-grants/catalog.yaml",
|
|
help="Path to catalog YAML",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
path = Path(args.catalog)
|
|
if not path.exists():
|
|
fail(f"catalog file is missing: {path}")
|
|
return 1
|
|
|
|
raw = path.read_text(encoding="utf-8")
|
|
errors: list[str] = []
|
|
for marker in SECRET_MARKERS:
|
|
if marker in raw:
|
|
errors.append(f"secret-looking marker present: {marker}")
|
|
|
|
try:
|
|
catalog = yaml.safe_load(raw)
|
|
except yaml.YAMLError as exc:
|
|
fail(f"catalog YAML is invalid: {exc}")
|
|
return 1
|
|
|
|
catalog_obj = require_dict(catalog, "catalog", errors)
|
|
if catalog_obj.get("version") != 1:
|
|
errors.append("version must be 1")
|
|
require_nonempty_string(catalog_obj.get("updated"), "updated", errors)
|
|
require_nonempty_string(catalog_obj.get("owner_repo"), "owner_repo", errors)
|
|
require_nonempty_string(catalog_obj.get("workplan_id"), "workplan_id", errors)
|
|
|
|
delivery_modes = require_dict(
|
|
catalog_obj.get("delivery_modes"), "delivery_modes", errors
|
|
)
|
|
allowed_known = set(
|
|
str(mode)
|
|
for mode in require_list(
|
|
delivery_modes.get("allowed_known"), "delivery_modes.allowed_known", errors
|
|
)
|
|
)
|
|
denied_known = set(
|
|
str(mode)
|
|
for mode in require_list(
|
|
delivery_modes.get("denied_known"), "delivery_modes.denied_known", errors
|
|
)
|
|
)
|
|
if not REQUIRED_DENIED_MODES.issubset(denied_known):
|
|
errors.append(
|
|
f"delivery_modes.denied_known missing {sorted(REQUIRED_DENIED_MODES - denied_known)}"
|
|
)
|
|
if allowed_known & denied_known:
|
|
errors.append(
|
|
f"delivery_modes overlap between allowed and denied: {sorted(allowed_known & denied_known)}"
|
|
)
|
|
|
|
grant_classes = set(
|
|
str(item)
|
|
for item in require_list(
|
|
catalog_obj.get("grant_classes"), "grant_classes", errors
|
|
)
|
|
)
|
|
if grant_classes != ALLOWED_GRANT_CLASSES:
|
|
errors.append(f"grant_classes must be exactly {sorted(ALLOWED_GRANT_CLASSES)}")
|
|
|
|
grants = require_list(catalog_obj.get("grants"), "grants", errors)
|
|
if not grants:
|
|
errors.append("grants must not be empty")
|
|
seen: set[str] = set()
|
|
for index, grant in enumerate(grants):
|
|
grant_id = validate_grant(grant, index, catalog_obj, errors)
|
|
if grant_id:
|
|
if grant_id in seen:
|
|
errors.append(f"duplicate grant id: {grant_id}")
|
|
seen.add(grant_id)
|
|
|
|
if "ops-warden/warden-sign" not in seen:
|
|
errors.append("initial grant ops-warden/warden-sign is required")
|
|
|
|
if errors:
|
|
for error in errors:
|
|
fail(error)
|
|
return 1
|
|
|
|
print(f"[OK] credential grant catalog is valid: {path}")
|
|
print(f"[OK] grants: {len(grants)}")
|
|
for grant in grants:
|
|
print(f"[OK] {grant['id']}: {grant['grant_class']} {grant['credential_type']}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|