#!/usr/bin/env python3 """Non-secret security bootstrap console. This tool is intentionally conservative. It prints status, gates, checklists, and templates. It does not collect or store secret values, and it refuses to run live OpenBao initialization. """ from __future__ import annotations import argparse import base64 import hashlib import html import json import shlex import subprocess import sys import urllib.error import urllib.parse import urllib.request from dataclasses import dataclass from datetime import datetime, timezone from http import HTTPStatus from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Any DEFAULT_STAGE = "S1 - Low-trust assembly" STAGE_ORDER = ("S1", "S2", "S3", "S4", "S5", "S6") DEFAULT_METADATA_PATH = Path("/tmp/net-kingdom-security-bootstrap.json") REPO_ROOT = Path(__file__).resolve().parents[2] APPROVAL_PHRASE = "approve custody mode" VALID_STORAGE_CLASSES = {"password-safe", "offline-packet", "hardware-token"} VALID_MFA_CLASSES = {"totp", "webauthn", "hardware-token"} VALID_MFA_ENROLLMENT_SOURCES = { "identity-provider", "external-verifier", "hardware-registration", "deferred", } VALID_CUSTODY_MODES = {"temporary-single-king", "two-of-three-planned", "two-of-three-ready"} CUSTODY_APPROVAL_MODES = {"temporary-single-king", "two-of-three-ready"} KEYCAPE_ISSUER = "https://kc.coulomb.social" OIDC_CLIENT_ID = "netkingdom-bootstrap-console" OIDC_SCOPE = "openid profile email groups" OIDC_CODE_VERIFIER = "netkingdom-bootstrap-local-oidc-verifier-2026-v1" KEYCAPE_OPENBAO_CLIENT_ID = "openbao-admin" KEYCAPE_OPENBAO_CLIENT_CONFIG = REPO_ROOT / "sso-mfa/k8s/keycape/create-secrets.sh" KEYCAPE_OPENBAO_CLIENT_REDIRECTS = ( "http://localhost:8250/oidc/callback", "http://127.0.0.1:8250/oidc/callback", ) AGE_PUBLIC_PREFIX = "age1" AGE_PRIVATE_MARKER = "AGE-SECRET-KEY-1" @dataclass(frozen=True) class Gate: name: str status: str reason: str def load_metadata(path: Path | None) -> dict[str, Any]: if path is None or not path.exists(): return {} try: data = json.loads(path.read_text()) except json.JSONDecodeError as exc: raise SystemExit(f"metadata is not valid JSON: {path}: {exc}") from exc if not isinstance(data, dict): raise SystemExit(f"metadata root must be an object: {path}") return data def write_metadata(path: Path, data: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp_path = path.with_name(f".{path.name}.tmp") tmp_path.write_text(json.dumps(data, indent=2, sort_keys=True) + "\n") tmp_path.replace(path) def utc_now() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat() def normalize_storage_classes(value: Any) -> list[str]: if isinstance(value, str): raw_values = [item.strip() for item in value.split(",")] elif isinstance(value, list): raw_values = [str(item).strip() for item in value] else: raw_values = [] values = [item for item in raw_values if item] return sorted(set(values)) def yes(data: dict[str, Any], key: str) -> bool: return data.get(key) is True def keycape_openbao_client_source_ready() -> bool: try: text = KEYCAPE_OPENBAO_CLIENT_CONFIG.read_text() except OSError: return False required = [ f'clientId: "{KEYCAPE_OPENBAO_CLIENT_ID}"', 'allowedScopes: ["openid", "profile", "email", "groups"]', ] required.extend(f'"{uri}"' for uri in KEYCAPE_OPENBAO_CLIENT_REDIRECTS) return all(item in text for item in required) def second_factor_ready(data: dict[str, Any]) -> bool: return ( data.get("mfa_class") in VALID_MFA_CLASSES and yes(data, "mfa_enrolled_confirmed") and data.get("mfa_enrollment_source") in VALID_MFA_ENROLLMENT_SOURCES - {"deferred"} ) def second_factor_reason(data: dict[str, Any]) -> str: if data.get("mfa_class") not in VALID_MFA_CLASSES: return "Select TOTP, WebAuthn, or hardware-token." if data.get("mfa_enrollment_source") == "deferred": return "Deferred factor enrollment blocks live OpenBao custody." if not yes(data, "mfa_enrolled_confirmed"): return "Confirm the factor was enrolled with the authority that will verify it; record no seed." if data.get("mfa_enrollment_source") not in VALID_MFA_ENROLLMENT_SOURCES: return "Record the non-secret enrollment source." return "Second factor enrollment is confirmed without recording seed material." def identity_account_ready(data: dict[str, Any]) -> bool: return ( yes(data, "identity_account_created") and bool(data.get("identity_account_reference")) and yes(data, "identity_group_confirmed") and bool(data.get("identity_group_reference")) ) def identity_account_reason(data: dict[str, Any]) -> str: if not yes(data, "identity_account_created"): return "Create the dedicated platform-root account in LLDAP first." if not data.get("identity_account_reference"): return "Record a non-secret account reference such as platform-root@lldap." if not yes(data, "identity_group_confirmed"): return "Confirm the account is assigned to the required LLDAP admin group." if not data.get("identity_group_reference"): return "Record the non-secret group reference such as net-kingdom-admins." return "Dedicated identity account is recorded without storing its password." def password_safe_ready(data: dict[str, Any]) -> bool: return yes(data, "password_safe_confirmed") def identity_login_ready(data: dict[str, Any]) -> bool: return yes(data, "oidc_login_verified") def recovery_material_ready(data: dict[str, Any]) -> bool: return yes(data, "recovery_confirmed") def recovery_material_reason(data: dict[str, Any]) -> str: if recovery_material_ready(data): return "Recovery references are prepared outside this UI." return ( "Prepare password recovery, MFA recovery/re-enrollment, custodian age-key " "recovery, and encrypted bootstrap bundle recovery references." ) def custody_packet_ready(data: dict[str, Any]) -> bool: return yes(data, "custody_packet_prepared") def custody_packet_reason(data: dict[str, Any]) -> str: if custody_packet_ready(data): return "The offline ceremony packet is ready without recording secret values here." return ( "Prepare the OpenBao ceremony packet: selected custody mode, recovery references, " "share assignment slots, root-token disposition plan, and signature/date." ) def metadata_secret_boundary_issue(data: dict[str, Any]) -> str: state = bootstrap_secret_state() if state["plaintext_secrets_present"]: return "Plaintext bootstrap secrets directory is present; remove it before custody approval." encoded = json.dumps(data, sort_keys=True) secret_markers = ( AGE_PRIVATE_MARKER, "-----BEGIN PRIVATE KEY-----", "-----BEGIN OPENSSH PRIVATE KEY-----", "OPENBAO_ROOT_TOKEN", "VAULT_TOKEN", ) for marker in secret_markers: if marker in encoded: return f"Metadata contains a secret-looking marker: {marker}." return "" def secret_boundary_ready(data: dict[str, Any]) -> bool: return metadata_secret_boundary_issue(data) == "" def secret_boundary_reason(data: dict[str, Any]) -> str: issue = metadata_secret_boundary_issue(data) if issue: return issue return "The control surface stores only non-secret references; no user attestation is required." def extract_age_public_key(value: Any) -> str: if value is None: return "" text = str(value).strip() if AGE_PRIVATE_MARKER in text: return "" for token in text.replace(",", " ").split(): clean = token.strip().strip('"').strip("'") if clean.startswith(AGE_PUBLIC_PREFIX): return clean return "" def age_public_key_fingerprint(public_key: str) -> str: if not public_key: return "" digest = hashlib.sha256(public_key.encode("utf-8")).hexdigest() return f"sha256:{digest[:16]}" def bootstrap_secret_state() -> dict[str, Any]: root = Path.cwd() bootstrap_dir = root / "sso-mfa" / "bootstrap" encrypted_dir = bootstrap_dir / "secrets.enc" plaintext_dir = bootstrap_dir / "secrets" encrypted_files = sorted(encrypted_dir.glob("*/*.age")) if encrypted_dir.exists() else [] plaintext_files = sorted(path for path in plaintext_dir.glob("*/*") if path.is_file()) if plaintext_dir.exists() else [] return { "bootstrap_dir": str(bootstrap_dir), "encrypted_bundle_path": str(encrypted_dir), "encrypted_bundle_exists": encrypted_dir.exists(), "encrypted_file_count": len(encrypted_files), "plaintext_secrets_path": str(plaintext_dir), "plaintext_secrets_present": plaintext_dir.exists(), "plaintext_file_count": len(plaintext_files), } def key_custody_validation(data: dict[str, Any]) -> list[Gate]: public_key = extract_age_public_key(data.get("custodian_age_public_key")) state = bootstrap_secret_state() plaintext_present = bool(state["plaintext_secrets_present"]) return [ Gate( "Custodian public key", "done" if public_key and yes(data, "custodian_age_public_key_confirmed") else "blocked", "Register the custodian age public key used to encrypt bootstrap bundles.", ), Gate( "Private key custody", "done" if data.get("custodian_age_private_key_reference") and yes(data, "custodian_age_private_key_confirmed") else "blocked", "Record only where the private key is held; never paste it into this UI.", ), Gate( "Encrypted bundle", "done" if state["encrypted_bundle_exists"] and state["encrypted_file_count"] else "blocked", "Encrypted bootstrap secrets should live under sso-mfa/bootstrap/secrets.enc/.", ), Gate( "Plaintext exposure", "blocked" if plaintext_present else "done", "Plaintext sso-mfa/bootstrap/secrets/ is present; shred it after any apply ceremony." if plaintext_present else "No plaintext bootstrap secrets directory is present.", ), ] def kit_validation(data: dict[str, Any]) -> list[Gate]: storage_classes = data.get("storage_classes", []) if not isinstance(storage_classes, list): storage_classes = [] storage_values = {str(item) for item in storage_classes} custody_mode = data.get("custody_mode", "") return [ Gate( "Credential label", "done" if data.get("credential_label") else "blocked", "Use a dedicated label such as platform-root.", ), Gate( "Identity account", "done" if identity_account_ready(data) else "blocked", identity_account_reason(data), ), Gate( "Setup operator/contact", "done" if data.get("setup_operator") and data.get("notification_contact") else "blocked", "Record non-secret setup operator and notification contact.", ), Gate( "Storage class", "done" if storage_values & VALID_STORAGE_CLASSES else "blocked", "Select where the credential is held; hardware is optional policy, not a default requirement.", ), Gate( "Password safe storage", "done" if password_safe_ready(data) else "blocked", "Confirm the credential password is stored in the password safe without recording it here.", ), Gate( "Second factor", "done" if second_factor_ready(data) else "blocked", second_factor_reason(data), ), Gate( "Identity login path", "done" if identity_login_ready(data) else "blocked", "Verify the dedicated account can complete the NetKingdom login path.", ), Gate( "Custody strategy selected", "done" if custody_mode in VALID_CUSTODY_MODES else "blocked", "Choose how the OpenBao init ceremony will be controlled.", ), Gate( "Recovery material", "done" if recovery_material_ready(data) else "blocked", recovery_material_reason(data), ), Gate( "Custody packet", "done" if custody_packet_ready(data) else "blocked", custody_packet_reason(data), ), Gate( "Control-surface secret boundary", "done" if secret_boundary_ready(data) else "blocked", secret_boundary_reason(data), ), ] def king_kit_ready(data: dict[str, Any]) -> bool: gates = kit_validation(data) return all(gate.status == "done" for gate in gates) def custody_mode_approved(data: dict[str, Any]) -> bool: return data.get("custody_mode") in CUSTODY_APPROVAL_MODES and yes(data, "custody_mode_approved") def custody_mode_reason(data: dict[str, Any]) -> str: mode = data.get("custody_mode") if mode in CUSTODY_APPROVAL_MODES and yes(data, "custody_mode_approved"): return "Approved for the next gate under the selected custody mode." if mode == "two-of-three-planned": return "Two-of-three is recorded as the target, but live init stays blocked until it is ready." if mode in CUSTODY_APPROVAL_MODES and not yes(data, "custody_mode_approved"): return "Strategy is selected; explicit approval is still pending." return "Choose temporary-single-king or two-of-three-ready for live OpenBao custody." def derive_stage(data: dict[str, Any]) -> str: if yes(data, "platform_reopened"): return "S6 - Reopen under custody" if yes(data, "cleanup_complete") or yes(data, "openbao_oidc_admin_login_verified"): return "S5 - Cleanup and hardening" if yes(data, "openbao_initial_config_applied"): return "S4 - Admin identity integration" if yes(data, "openbao_initialized"): return "S3 - OpenBao bootstrap" if yes(data, "king_credential_ready") or king_kit_ready(data): return "S2 - King credential preparation" return DEFAULT_STAGE def stage_payloads(data: dict[str, Any]) -> list[dict[str, str]]: current = derive_stage(data).split(" - ", 1)[0] try: current_index = STAGE_ORDER.index(current) except ValueError: current_index = 0 rows = [ ( "S1", "Low-trust assembly", "Install and connect subsystems using low-trust setup accounts and non-secret metadata.", "Complete the king credential kit.", ), ( "S2", "King credential preparation", "Create platform-root, enroll MFA, approve custody strategy, and prepare recovery material.", "Run OpenBao preflight and init under the selected custody strategy.", ), ( "S3", "OpenBao bootstrap", "Initialize, unseal, rotate trial material, configure OpenBao, and create a temporary admin bridge.", "Bind normal OpenBao admin access to NetKingdom identity.", ), ( "S4", "Admin identity integration", "Deploy the code-defined KeyCape client, map verified NetKingdom admin claims to OpenBao policy, and verify login.", "Apply live KeyCape config, configure OpenBao OIDC auth, and verify admin login.", ), ( "S5", "Cleanup and hardening", "Retire bootstrap-era access, resolve taint, record root-token disposition, prove restore, and document residual risk.", "Complete cleanup and mark the platform ready to reopen.", ), ( "S6", "Reopen under custody", "Operate under the approved custody model with break-glass and recovery paths known.", "Review related workplans.", ), ] payload = [] for index, (stage_id, name, description, next_step) in enumerate(rows): status = "pending" if index < current_index: status = "done" elif index == current_index: status = "active" payload.append( { "id": stage_id, "name": name, "description": description, "next": next_step, "status": status, } ) return payload def build_gates(data: dict[str, Any]) -> list[Gate]: return [ Gate( "King credential kit", "done" if yes(data, "king_credential_ready") or king_kit_ready(data) else "blocked", "Dedicated king credential, second factor, and recovery storage.", ), Gate( "Custody strategy approval", "done" if custody_mode_approved(data) else "blocked", custody_mode_reason(data), ), Gate( "OpenBao preflight", "done" if yes(data, "openbao_preflight_passed") else "blocked", "Run safe Railiance OpenBao status and verification checks.", ), Gate( "OpenBao init ceremony", "human" if not yes(data, "openbao_initialized") else "done", "Human-attended ceremony only. This console will not run init.", ), Gate( "OpenBao initial configuration", ( "done" if yes(data, "openbao_initial_config_applied") else "human" if yes(data, "openbao_initialized") else "blocked" ), "Apply first auth, mount, and policy configuration; audit may be a declarative follow-up.", ), Gate( "KeyCape OpenBao client definition", "done" if keycape_openbao_client_source_ready() else "blocked", "The non-secret openbao-admin client is defined in KeyCape source/config generation.", ), Gate( "KeyCape OpenBao client deployed", ( "done" if yes(data, "openbao_oidc_client_registered") else "human" if keycape_openbao_client_source_ready() and yes(data, "openbao_initial_config_applied") else "blocked" ), "Apply the code-defined client to the live KeyCape keycape-config Secret and restart KeyCape.", ), Gate( "OpenBao OIDC auth", ( "done" if yes(data, "openbao_oidc_auth_configured") else "human" if yes(data, "openbao_oidc_client_registered") else "blocked" ), "OpenBao OIDC/JWT auth is configured against KeyCape and maps claims to policy.", ), Gate( "OIDC admin login", ( "done" if yes(data, "openbao_oidc_admin_login_verified") else "human" if yes(data, "openbao_oidc_auth_configured") else "blocked" ), "platform-root can obtain OpenBao platform-admin access through KeyCape/MFA.", ), Gate( "Root-token disposition", "done" if data.get("root_token_disposition") in {"revoked", "offline-sealed"} else "blocked", "Root token is revoked or sealed offline without recording value.", ), Gate( "Restore drill", "done" if yes(data, "restore_drill_passed") else "blocked", "Snapshot and isolated restore proof before live secrets.", ), Gate( "Cleanup and rotation", "done" if yes(data, "cleanup_complete") else "blocked", "Bootstrap-era credentials, databases, and access paths reviewed.", ), Gate( "Platform reopen", "done" if yes(data, "platform_reopened") else "blocked", "Final operator confirmation that the platform is reopened under custody.", ), ] def kit_next_action(kit_gates: list[Gate]) -> str: labels = { "Credential label": "Set credential label", "Identity account": "Create platform-root account", "Setup operator/contact": "Record setup contact", "Storage class": "Select recovery storage", "Password safe storage": "Confirm password safe entry", "Second factor": "Enroll MFA factor", "Identity login path": "Verify OIDC login", "Custody strategy selected": "Select custody strategy", "Recovery material": "Prepare recovery material", "Custody packet": "Prepare custody packet", "Control-surface secret boundary": "Confirm secret boundary", } for gate in kit_gates: if gate.status != "done": return labels.get(gate.name, "Define king credential kit") return "Define king credential kit" def next_action( gates: list[Gate], kit_gates: list[Gate] | None = None, data: dict[str, Any] | None = None, ) -> str: for gate in gates: if gate.status == "human": if gate.name == "OpenBao init ceremony": if data and yes(data, "openbao_init_output_produced") and not yes(data, "openbao_initialized"): return "Run OpenBao unseal prompt" return "Run attended OpenBao init ceremony" if gate.name == "KeyCape OpenBao client deployed": return "Apply KeyCape OpenBao client config" if gate.name == "OpenBao OIDC auth": return "Run OpenBao OIDC auth setup" if gate.name == "OIDC admin login": if data and yes(data, "openbao_oidc_auth_configured") and not yes(data, "openbao_oidc_admin_login_verified"): if not yes(data, "openbao_oidc_client_registered"): return "Check KeyCape public route" return "Verify OpenBao OIDC admin login" return "Verify OpenBao OIDC admin login" return gate.name if gate.status == "blocked": if gate.name == "King credential kit": if kit_gates is not None: return kit_next_action(kit_gates) return "Define king credential kit" if gate.name == "Custody strategy approval": return "Approve custody strategy" if gate.name == "OpenBao preflight": return "Run OpenBao preflight" if gate.name == "KeyCape OpenBao client definition": return "Ship KeyCape OpenBao client definition" if gate.name == "KeyCape OpenBao client deployed": return "Apply KeyCape OpenBao client config" if gate.name == "OpenBao OIDC auth": return "Run OpenBao OIDC auth setup" if gate.name == "OIDC admin login": if data and yes(data, "openbao_oidc_auth_configured") and not yes(data, "openbao_oidc_admin_login_verified"): if not yes(data, "openbao_oidc_client_registered"): return "Check KeyCape public route" return "Verify OpenBao OIDC admin login" return "Verify OpenBao OIDC admin login" if gate.name == "Root-token disposition": return "Record root-token disposition" if gate.name == "Restore drill": return "Run restore drill" if gate.name == "Cleanup and rotation": return "Complete handover cleanup" if gate.name == "Platform reopen": return "Reopen platform under custody" return "Review related workplans" def print_status(data: dict[str, Any]) -> None: merged = metadata_template() merged.update(data) gates = build_gates(merged) key_gates = key_custody_validation(merged) kit_gates = kit_validation(merged) state = bootstrap_secret_state() print("SECURITY BOOTSTRAP") print("") print("Stage") print(derive_stage(merged)) print("") print("Next safe action") print(next_action(gates, kit_gates, merged)) print("") print("Key custody") public_key = extract_age_public_key(merged.get("custodian_age_public_key")) print(f"- fingerprint: {age_public_key_fingerprint(public_key) or 'not registered'}") print(f"- encrypted bundle files: {state['encrypted_file_count']} at {state['encrypted_bundle_path']}") print(f"- plaintext secrets present: {state['plaintext_secrets_present']}") for gate in key_gates: print(f"- {gate.status}: {gate.name} - {gate.reason}") print("") print("Gates") for gate in gates: print(f"- {gate.status}: {gate.name} - {gate.reason}") print("") print("Available actions") print("1. king-kit") print("2. custody-packet") print("3. openbao-preflight") print("4. handover-checklist") print("5. metadata-template") print("6. approve-custody-mode") print("7. web-ui") print("") print("Refusal boundary") print("This console will not run bao operator init or collect secret values.") def print_king_kit() -> None: print("KING CREDENTIAL KIT") print("") rows = [ "Name the credential, for example platform-root.", "Choose storage: password safe, offline packet, hardware-backed, or a combination.", "Add a second factor: TOTP, WebAuthn, or hardware token.", "Prepare recovery material without recording values in software.", "Select custody mode: temporary-single-king, two-of-three-planned, or two-of-three-ready.", "Print or prepare the offline custody packet.", "Record only non-secret metadata.", ] for index, row in enumerate(rows, start=1): print(f"{index}. {row}") def print_validate_king_kit(data: dict[str, Any]) -> int: print("KING CREDENTIAL KIT VALIDATION") print("") if not data: print("No metadata loaded. Use --metadata with a non-secret JSON file.") print("Run metadata-template for the expected shape.") return 2 gates = kit_validation(data) for gate in gates: print(f"- {gate.status}: {gate.name} - {gate.reason}") print("") if king_kit_ready(data) and custody_mode_approved(data): print("Kit definition and custody-mode approval are complete.") print("Live OpenBao init remains a separate human-attended ceremony.") return 0 if king_kit_ready(data): print("Kit definition is complete except custody-mode approval.") print("Live OpenBao init is still blocked until T03 approves custody mode.") return 0 print("Kit definition is incomplete.") return 1 def merged_approval_metadata( existing: dict[str, Any], payload: dict[str, Any], ) -> dict[str, Any]: data = metadata_template() data.update(existing) text_fields = ( "credential_label", "bootstrap_mode", "identity_account_home", "identity_account_reference", "identity_group_reference", "custodian_age_private_key_reference", "setup_operator", "notification_contact", "role_setup_operator_email", "role_platform_custodian_email", "role_identity_admin_email", "role_openbao_operator_email", "role_recovery_custodian_email", "role_future_quorum_email", "mfa_class", "mfa_enrollment_source", "mfa_enrollment_reference", "custody_mode", "root_token_disposition", "notes", ) for field in text_fields: if field in payload and payload[field] is not None: value = str(payload[field]).strip() data[field] = "" if AGE_PRIVATE_MARKER in value else value if "custodian_age_public_key" in payload: data["custodian_age_public_key"] = extract_age_public_key(payload["custodian_age_public_key"]) if "storage_classes" in payload: data["storage_classes"] = normalize_storage_classes(payload["storage_classes"]) for field in ( "custodian_age_public_key_confirmed", "custodian_age_private_key_confirmed", "recovery_confirmed", "custody_packet_prepared", "no_secret_capture_confirmed", "mfa_enrolled_confirmed", "identity_account_created", "identity_group_confirmed", "oidc_login_verified", "password_safe_confirmed", "openbao_preflight_passed", "openbao_init_output_produced", "openbao_initialized", "openbao_post_unseal_verified", "openbao_initial_config_applied", "openbao_trial_material_exposed", "openbao_compromise_response_complete", "openbao_unseal_keys_rotated", "openbao_emergency_lockdown_drilled", "openbao_oidc_client_registered", "openbao_oidc_auth_configured", "openbao_oidc_admin_login_verified", "restore_drill_passed", "cleanup_complete", "platform_reopened", ): if field in payload: data[field] = payload[field] is True return data def save_progress_metadata(existing: dict[str, Any], payload: dict[str, Any]) -> dict[str, Any]: data = merged_approval_metadata(existing, payload) data["metadata_updated_at"] = utc_now() data["progress_scope"] = "Non-secret local bootstrap progress only." return data def validate_custody_approval( data: dict[str, Any], approval_phrase: str, ) -> list[str]: errors: list[str] = [] mode = data.get("custody_mode") if approval_phrase.strip().lower() != APPROVAL_PHRASE: errors.append(f'Type "{APPROVAL_PHRASE}" to approve the selected custody strategy.') if mode not in VALID_CUSTODY_MODES: errors.append("Select a custody mode.") elif mode not in CUSTODY_APPROVAL_MODES: errors.append( "two-of-three-planned is a target state, not live-init approval. " "Use temporary-single-king now or two-of-three-ready when shares exist." ) for gate in kit_validation(data): if gate.name == "Custody strategy selected": continue if gate.status != "done": errors.append(f"{gate.name}: {gate.reason}") return errors def approve_custody_metadata( existing: dict[str, Any], payload: dict[str, Any], approval_phrase: str, approver: str, ) -> tuple[dict[str, Any], list[str]]: data = merged_approval_metadata(existing, payload) errors = validate_custody_approval(data, approval_phrase) if errors: return data, errors data["king_credential_ready"] = True data["custody_mode_approved"] = True data["custody_approved_at"] = utc_now() data["custody_approved_by"] = approver or data.get("setup_operator", "") data["approval_scope"] = "Non-secret local custody-mode approval only. Does not run OpenBao init." return data, [] def print_approve_custody_mode(args: argparse.Namespace, data: dict[str, Any]) -> int: if args.metadata is None: print("ERROR: approve-custody-mode requires --metadata /path/to/non-secret.json", file=sys.stderr) return 2 approval_phrase = args.approval_phrase or "" if args.yes: approval_phrase = APPROVAL_PHRASE elif not approval_phrase: print("This writes non-secret custody approval metadata only.") print("It will not run OpenBao init and will not store secret values.") try: approval_phrase = input(f'Type "{APPROVAL_PHRASE}" to continue: ') except EOFError: approval_phrase = "" payload: dict[str, Any] = { "custody_mode": args.mode, } for key in ( "credential_label", "identity_account_home", "identity_account_reference", "identity_group_reference", "setup_operator", "notification_contact", "mfa_class", "mfa_enrollment_source", "mfa_enrollment_reference", "notes", ): value = getattr(args, key) if value is not None: payload[key] = value if args.storage_class: payload["storage_classes"] = args.storage_class for field in ( "recovery_confirmed", "custody_packet_prepared", "no_secret_capture_confirmed", "mfa_enrolled_confirmed", "identity_account_created", "identity_group_confirmed", "oidc_login_verified", "password_safe_confirmed", ): if getattr(args, field): payload[field] = True approved, errors = approve_custody_metadata( data, payload, approval_phrase, args.approved_by or "", ) if errors: print("CUSTODY MODE NOT APPROVED") print("") for error in errors: print(f"- {error}") return 1 write_metadata(args.metadata, approved) print("CUSTODY MODE APPROVED") print("") print(f"Metadata: {args.metadata}") print(f"Mode: {approved['custody_mode']}") print(f"Approved by: {approved.get('custody_approved_by', '')}") print(f"Approved at: {approved.get('custody_approved_at', '')}") print("") print("OpenBao init remains a separate human-attended ceremony.") return 0 def print_custody_packet() -> None: print("CUSTODY PACKET TEMPLATE") print("") print("Credential label:") print("Date:") print("Setup operator/contact:") print("Custody mode:") print("Notification contact:") print("") print("Storage location description:") print("Second-factor location description:") print("Recovery material location description:") print("") print("OpenBao share assignment rows:") print("- Share A:") print("- Share B:") print("- Share C:") print("") print("Root-token disposition:") print("Signature/date:") print("") print("Do not write this packet into Git, State Hub, chat, tickets, or email.") def print_handover_checklist() -> None: print("HANDOVER CHECKLIST") print("") rows = [ "King credential kit complete.", "OpenBao initialized and unsealed under approved custody mode.", "Root token revoked or sealed offline.", "Non-root platform admin path verified.", "Bootstrap-era database credentials rotated.", "Temporary admin accounts reviewed and removed or scoped.", "Kubernetes service accounts and privileged bindings reviewed.", "SOPS/age recipients and emergency bundle reviewed.", "Backup snapshot exists.", "Restore drill passed.", "Audit handling known.", "Remaining risk exceptions recorded with owner and date.", ] for row in rows: print(f"- {row}") def metadata_template() -> dict[str, Any]: return { "bootstrap_mode": "custody", "custodian_age_public_key": "", "custodian_age_public_key_confirmed": False, "custodian_age_private_key_reference": "", "custodian_age_private_key_confirmed": False, "credential_label": "platform-root", "identity_account_home": "lldap", "identity_account_reference": "", "identity_account_created": False, "identity_group_reference": "net-kingdom-admins", "identity_group_confirmed": False, "setup_operator": "tegwick", "notification_contact": "bernd.worsch@gmail.com", "role_setup_operator_email": "bernd.worsch@gmail.com", "role_platform_custodian_email": "bernd.worsch@gmail.com", "role_identity_admin_email": "bernd.worsch@gmail.com", "role_openbao_operator_email": "bernd.worsch@gmail.com", "role_recovery_custodian_email": "bernd.worsch@gmail.com", "role_future_quorum_email": "", "storage_classes": ["password-safe", "offline-packet"], "password_safe_confirmed": False, "mfa_class": "totp", "mfa_enrolled_confirmed": False, "mfa_enrollment_source": "deferred", "mfa_enrollment_reference": "", "recovery_confirmed": False, "custody_packet_prepared": False, "no_secret_capture_confirmed": False, "king_credential_ready": False, "custody_mode": "", "custody_mode_approved": False, "custody_approved_at": "", "custody_approved_by": "", "approval_scope": "", "oidc_login_verified": False, "metadata_updated_at": "", "progress_scope": "", "openbao_preflight_passed": False, "openbao_init_output_produced": False, "openbao_initialized": False, "openbao_post_unseal_verified": False, "openbao_initial_config_applied": False, "openbao_trial_material_exposed": False, "openbao_compromise_response_complete": False, "openbao_unseal_keys_rotated": False, "openbao_emergency_lockdown_drilled": False, "openbao_oidc_client_registered": False, "openbao_oidc_auth_configured": False, "openbao_oidc_admin_login_verified": False, "root_token_disposition": "", "restore_drill_passed": False, "cleanup_complete": False, "platform_reopened": False, "review_date": "", "notes": "Non-secret metadata only.", } def print_openbao_preflight(args: argparse.Namespace) -> int: print("OPENBAO PREFLIGHT") print("") print("Safe commands:") print(f"make -C {args.railiance_path} openbao-status") print(f"make -C {args.railiance_path} openbao-verify") print("") if not args.run: print("Dry run only. Pass --run to execute safe preflight commands.") return 0 railiance_path = Path(args.railiance_path).expanduser().resolve() if not railiance_path.is_dir(): print(f"ERROR: Railiance path not found: {railiance_path}", file=sys.stderr) return 2 for target in ("openbao-status", "openbao-verify"): result = subprocess.run( ["make", "-C", str(railiance_path), target], check=False, ) if result.returncode != 0: return result.returncode return 0 def gate_payload(gate: Gate) -> dict[str, str]: return { "name": gate.name, "status": gate.status, "reason": gate.reason, } def role_email(data: dict[str, Any], role_key: str) -> str: fallback = str(data.get("notification_contact") or "").strip() return str(data.get(role_key) or fallback).strip() def role_payloads(data: dict[str, Any]) -> list[dict[str, str]]: rows = [ ( "setup-operator", "Runs bootstrap commands and records non-secret evidence.", "NetKingdom bootstrap", "role_setup_operator_email", ), ( "platform-root-custodian", "Holds the dedicated platform-root credential and approves custody gates.", "NetKingdom identity", "role_platform_custodian_email", ), ( "identity-admin", "Administers LLDAP, privacyIDEA, and login repair during bootstrap.", "Identity stack", "role_identity_admin_email", ), ( "openbao-ceremony-operator", "Runs the attended OpenBao ceremony without copying secret output into the control surface.", "Railiance OpenBao", "role_openbao_operator_email", ), ( "recovery-custodian", "Can recover the platform-root credential and encrypted bootstrap bundle outside this UI.", "Custody packet", "role_recovery_custodian_email", ), ( "future-quorum-custodian", "Reserved for later two-of-three custody migration.", "Custody strategy", "role_future_quorum_email", ), ] payloads: list[dict[str, str]] = [] for name, description, subsystem, key in rows: email = role_email(data, key) payloads.append( { "name": name, "description": description, "subsystem": subsystem, "responsibility": name, "email": email, "location": "Role assignment in local bootstrap metadata." if email else "Not assigned yet.", "state": "set" if email else "nil", } ) return payloads def state_value(ok: bool, set_value: bool = False, err: bool = False) -> str: if err: return "err" if ok: return "ok" if set_value: return "set" return "nil" def openbao_trial_taint( data: dict[str, Any], relation: str = "downstream", material: str = "general", ) -> dict[str, Any]: if not yes(data, "openbao_trial_material_exposed") or yes(data, "openbao_compromise_response_complete"): return {} unseal_resolved = yes(data, "openbao_unseal_keys_rotated") root_resolved = data.get("root_token_disposition") == "revoked" if material == "unseal" and unseal_resolved: return {} if material == "root-token" and root_resolved: return {} relation_text = "Directly marked" if relation == "direct" else "Downstream" unresolved = [] if not unseal_resolved: unresolved.append("exposed unseal shares need rotation") if not root_resolved: unresolved.append("exposed initial root token needs revocation") if unresolved: resolution = " Remaining: " + "; ".join(unresolved) + "." else: resolution = ( " Exposed unseal shares are rotated and the exposed initial root token is revoked; " "only residual downstream review remains before marking the compromise response complete." ) return { "tainted": True, "taint_source": "Trial key material exposed", "taint_reference": "Usecases & Runbooks / Trial key material exposed", "taint_reason": ( f"{relation_text} from recorded OpenBao trial key-material exposure. " "Operator may proceed, but resulting evidence and work should be treated as tainted " "until rotation, revocation, reset, or another compromise response is recorded." + resolution ), } def add_taint(row: dict[str, Any], taint: dict[str, Any]) -> dict[str, Any]: if taint: row.update(taint) return row def subsystem_payloads(data: dict[str, Any]) -> list[dict[str, str]]: public_key = extract_age_public_key(data.get("custodian_age_public_key")) state = bootstrap_secret_state() return [ { "name": "age recipient", "description": "Public age recipient used to encrypt bootstrap bundles.", "subsystem": "custodian age envelope", "responsibility": "recovery-custodian", "email": role_email(data, "role_recovery_custodian_email"), "location": age_public_key_fingerprint(public_key) or "No public recipient registered.", "state": state_value( bool(public_key) and yes(data, "custodian_age_public_key_confirmed"), bool(public_key), ), }, { "name": "platform-root user", "description": "Dedicated LLDAP account used as the current king credential.", "subsystem": "LLDAP", "responsibility": "identity-admin", "email": role_email(data, "role_identity_admin_email"), "location": str(data.get("identity_account_reference") or "Not recorded."), "state": state_value(identity_account_ready(data), bool(data.get("identity_account_reference"))), }, { "name": "platform-root MFA token", "description": "Second factor enrolled with the authority that verifies login.", "subsystem": "privacyIDEA", "responsibility": "identity-admin", "email": role_email(data, "role_identity_admin_email"), "location": str(data.get("mfa_enrollment_reference") or "Not recorded."), "state": state_value(second_factor_ready(data), yes(data, "mfa_enrolled_confirmed")), }, { "name": "bootstrap OIDC client", "description": "KeyCape login path used to verify platform-root can authenticate with MFA.", "subsystem": "KeyCape", "responsibility": "identity-admin", "email": role_email(data, "role_identity_admin_email"), "location": KEYCAPE_ISSUER, "state": state_value(identity_login_ready(data), bool(data.get("identity_account_reference"))), }, { "name": "openbao-0", "description": "Railiance OpenBao pod, services, PVCs, and sealed pre-init state.", "subsystem": "Railiance OpenBao", "responsibility": "openbao-ceremony-operator", "email": role_email(data, "role_openbao_operator_email"), "location": "namespace=openbao, pod=openbao-0", "state": state_value(yes(data, "openbao_preflight_passed"), yes(data, "custody_mode_approved")), }, { "name": "encrypted bootstrap bundle", "description": "Encrypted bootstrap secret bundle; plaintext directory must be absent.", "subsystem": "sso-mfa/bootstrap", "responsibility": "recovery-custodian", "email": role_email(data, "role_recovery_custodian_email"), "location": str(state["encrypted_bundle_path"]), "state": state_value( bool(state["encrypted_bundle_exists"]) and not bool(state["plaintext_secrets_present"]), bool(state["encrypted_bundle_exists"]), bool(state["plaintext_secrets_present"]), ), }, ] def integration_payloads(data: dict[str, Any]) -> list[dict[str, str]]: openbao_direct_taint = openbao_trial_taint(data, "direct") return [ { "name": "LLDAP admin group assignment", "description": "platform-root is assigned to the current NetKingdom admin group.", "subsystem": "LLDAP", "responsibility": "identity-admin", "email": role_email(data, "role_identity_admin_email"), "location": str(data.get("identity_group_reference") or "Not recorded."), "state": state_value(yes(data, "identity_group_confirmed"), bool(data.get("identity_group_reference"))), }, { "name": "privacyIDEA MFA verification", "description": "The same platform-root account has an enrolled second factor.", "subsystem": "privacyIDEA", "responsibility": "identity-admin", "email": role_email(data, "role_identity_admin_email"), "location": str(data.get("mfa_enrollment_reference") or "Not recorded."), "state": state_value(second_factor_ready(data), yes(data, "mfa_enrolled_confirmed")), }, { "name": "KeyCape OIDC login", "description": "platform-root completed the OIDC login check through KeyCape.", "subsystem": "KeyCape", "responsibility": "identity-admin", "email": role_email(data, "role_identity_admin_email"), "location": "local bootstrap callback", "state": state_value(yes(data, "oidc_login_verified")), }, { "name": "OpenBao preflight", "description": "Railiance status and verify targets passed in the approved pre-init state.", "subsystem": "Railiance OpenBao", "responsibility": "openbao-ceremony-operator", "email": role_email(data, "role_openbao_operator_email"), "location": "../railiance-platform", "state": state_value(yes(data, "openbao_preflight_passed"), yes(data, "custody_mode_approved")), }, add_taint( { "name": "OpenBao init/unseal ceremony", "description": "Attended ceremony creates unseal shares and initial root token outside this UI.", "subsystem": "Railiance OpenBao", "responsibility": "openbao-ceremony-operator", "email": role_email(data, "role_openbao_operator_email"), "location": "operator shell with custody packet present", "state": state_value(yes(data, "openbao_initialized"), yes(data, "openbao_preflight_passed")), }, openbao_direct_taint, ), add_taint( { "name": "OpenBao initial configuration", "description": "First auth, mount, and policy configuration after unseal.", "subsystem": "Railiance OpenBao", "responsibility": "openbao-ceremony-operator", "email": role_email(data, "role_openbao_operator_email"), "location": "../railiance-platform openbao-configure-initial", "state": state_value(yes(data, "openbao_initial_config_applied"), yes(data, "openbao_initialized")), }, openbao_trial_taint(data, "downstream") if yes(data, "openbao_initialized") else {}, ), ] def admin_identity_payloads(data: dict[str, Any]) -> list[dict[str, str]]: downstream_taint = openbao_trial_taint(data, "downstream") if yes(data, "openbao_initialized") else {} source_ready = keycape_openbao_client_source_ready() return [ { "name": "openbao-admin client definition", "description": "Code-defined KeyCape public PKCE client for Railiance OpenBao CLI login. This is development/configuration, not a manual registration step.", "subsystem": "KeyCape source", "responsibility": "identity-admin", "email": role_email(data, "role_identity_admin_email"), "location": "sso-mfa/k8s/keycape/create-secrets.sh", "state": state_value(source_ready), }, add_taint( { "name": "openbao-admin client deployed", "description": "The code-defined KeyCape client is applied to the live keycape-config Secret and KeyCape was restarted.", "subsystem": "KeyCape", "responsibility": "identity-admin", "email": role_email(data, "role_identity_admin_email"), "location": "clientId=openbao-admin; CLI redirects localhost/127.0.0.1:8250", "state": state_value(yes(data, "openbao_oidc_client_registered"), source_ready and yes(data, "openbao_initial_config_applied")), }, downstream_taint, ), add_taint( { "name": "OpenBao OIDC auth method", "description": "OpenBao trusts KeyCape discovery and maps the net-kingdom-admins group to the platform-admin policy.", "subsystem": "Railiance OpenBao", "responsibility": "openbao-ceremony-operator", "email": role_email(data, "role_openbao_operator_email"), "location": "auth/keycape or equivalent OpenBao OIDC/JWT mount", "state": state_value(yes(data, "openbao_oidc_auth_configured"), yes(data, "openbao_oidc_client_registered")), }, downstream_taint, ), add_taint( { "name": "OIDC admin login", "description": "platform-root can obtain OpenBao platform-admin access through KeyCape MFA instead of a manually minted token.", "subsystem": "KeyCape -> OpenBao", "responsibility": "platform-root-custodian", "email": role_email(data, "role_platform_custodian_email"), "location": "bao login -method=oidc -path=keycape role=platform-admin", "state": state_value(yes(data, "openbao_oidc_admin_login_verified"), yes(data, "openbao_oidc_auth_configured")), }, downstream_taint, ), ] def admin_identity_command_payloads(data: dict[str, Any]) -> list[dict[str, str]]: source_ready = keycape_openbao_client_source_ready() client_deployed = yes(data, "openbao_oidc_client_registered") auth_configured = yes(data, "openbao_oidc_auth_configured") login_verified = yes(data, "openbao_oidc_admin_login_verified") initial_config_applied = yes(data, "openbao_initial_config_applied") downstream_taint = openbao_trial_taint(data, "downstream") if yes(data, "openbao_initialized") else {} def action( name: str, description: str, status: str, status_reason: str, command: str, taint: dict[str, str] | None = None, ) -> dict[str, str]: return add_taint( { "name": name, "description": description, "status": status, "status_reason": status_reason, "command": command, }, taint or {}, ) deploy_state = "done" if client_deployed else "todo" if source_ready and initial_config_applied else "blocked" deploy_reason = "Live KeyCape is recorded as carrying the code-defined openbao-admin client." if deploy_state == "todo": deploy_reason = "Operator action: patch the live keycape-config Secret in place and restart KeyCape. No bootstrap secret bundle decryption is required." if deploy_state == "blocked": deploy_reason = "Blocked until OpenBao initial configuration exists and the KeyCape client definition is present in source." auth_state = "done" if auth_configured else "todo" if client_deployed else "blocked" auth_reason = "OpenBao OIDC/JWT auth is recorded as configured." if auth_state == "todo": auth_reason = "Operator action: run the helper script and enter a root/sudo-capable OpenBao token at the pod TTY prompt. The token value is not recorded." if auth_state == "blocked": auth_reason = "Apply and confirm the live KeyCape openbao-admin client before configuring OpenBao auth." login_state = "done" if login_verified else "todo" if auth_configured else "blocked" login_reason = "OIDC-backed OpenBao platform-admin login is recorded as verified." if login_state == "todo": login_reason = "Human verification: complete the browser MFA flow and confirm the resulting token has platform-admin policy." if login_state == "blocked": login_reason = "Configure OpenBao OIDC auth before testing the login path." keycape_dir = shlex.quote(str(KEYCAPE_OPENBAO_CLIENT_CONFIG.parent)) kubectl_bin = "/home/worsch/.local/bin/kubectl" deploy_command = ( "bash <<'NETKINGDOM_KEYCAPE_APPLY'\n" "set -euo pipefail\n" f"cd {keycape_dir}\n" "bash ./patch-openbao-client.sh\n" "kubectl rollout restart deployment/keycape -n sso\n" "kubectl rollout status deployment/keycape -n sso --timeout=60s\n" "bash ./verify-openbao-client.sh\n" "NETKINGDOM_KEYCAPE_APPLY\n" ) configure_command = f"bash {shlex.quote(str(KEYCAPE_OPENBAO_CLIENT_CONFIG.parent / 'configure-openbao-oidc.sh'))}" public_route_state = "done" if client_deployed else "todo" if source_ready else "blocked" public_route_reason = "The live KeyCape verifier has proven the public route, discovery endpoint, and openbao-admin client." if public_route_state == "todo": public_route_reason = "Operator action: confirm public DNS routes kc.coulomb.social to the patched KeyCape ingress and that /authorize recognizes openbao-admin." if public_route_state == "blocked": public_route_reason = "Ship the code-defined KeyCape OpenBao client before probing the public login route." public_route_command = ( "bash <<'NETKINGDOM_KEYCAPE_PUBLIC_ROUTE'\n" "set -euo pipefail\n" f"cd {keycape_dir}\n" f"KUBECTL={kubectl_bin} bash ./verify-openbao-client.sh\n" "printf 'DNS addresses for kc.coulomb.social:\\n'\n" "getent ahosts kc.coulomb.social | awk '{print $1}' | sort -u\n" "printf 'KeyCape ingress address from Kubernetes:\\n'\n" f"{kubectl_bin} get ingress keycape -n sso -o jsonpath='{{.status.loadBalancer.ingress[0].ip}}{{\"\\n\"}}'\n" "NETKINGDOM_KEYCAPE_PUBLIC_ROUTE\n" ) login_command = ( "# Terminal 1: bridge the browser callback to the bao CLI running in the OpenBao pod.\n" "kubectl -n openbao port-forward pod/openbao-0 8250:8250\n\n" "# Terminal 2: run the pod-bundled bao CLI, then copy the printed login URL into your local browser if needed.\n" "kubectl exec -it -n openbao openbao-0 -- sh -lc '\n" " export BAO_ADDR=http://127.0.0.1:8200\n" " bao login -method=oidc -path=keycape role=platform-admin \\\n" " skip_browser=true listenaddress=0.0.0.0 callbackhost=127.0.0.1 port=8250\n" " bao token lookup\n" "'" ) return [ action( "Apply code-defined KeyCape OpenBao client", "Deployment action for the non-secret openbao-admin client already present in source. Patches the live KeyCape Secret without decrypting the bootstrap secret bundle.", deploy_state, deploy_reason, deploy_command, downstream_taint if yes(data, "openbao_initialized") else {}, ), action( "Configure OpenBao OIDC auth", "Create or update the auth/keycape mount and platform-admin role so KeyCape group claims map to OpenBao platform-admin policy. The helper uses a non-secret compatibility client-secret value because OpenBao requires the field while the current KeyCape client is public PKCE.", auth_state, auth_reason, configure_command, downstream_taint if yes(data, "openbao_initialized") else {}, ), action( "Check KeyCape public route for OpenBao", "Verify that public DNS and ingress route kc.coulomb.social to the patched KeyCape instance before attempting the browser-based OpenBao login.", public_route_state, public_route_reason, public_route_command, downstream_taint if yes(data, "openbao_initialized") else {}, ), action( "Verify OIDC-backed OpenBao admin login", "Use the bao CLI already present in the OpenBao pod, bridge its localhost callback to your workstation, complete the KeyCape MFA browser flow, and verify the returned token before checking the confirmation box.", login_state, login_reason, login_command, downstream_taint if yes(data, "openbao_initialized") else {}, ), ] def artifact_payloads(data: dict[str, Any]) -> list[dict[str, str]]: public_key = extract_age_public_key(data.get("custodian_age_public_key")) state = bootstrap_secret_state() root_disposition = str(data.get("root_token_disposition") or "") init_output = yes(data, "openbao_init_output_produced") openbao_unseal_taint = openbao_trial_taint(data, "direct", "unseal") openbao_root_taint = openbao_trial_taint(data, "direct", "root-token") return [ { "name": "platform-root", "description": "Dedicated LLDAP user for the king credential.", "subsystem": "LLDAP", "responsibility": "platform-root-custodian", "email": role_email(data, "role_platform_custodian_email"), "location": str(data.get("identity_account_reference") or "Not recorded."), "state": state_value(identity_account_ready(data), bool(data.get("identity_account_reference"))), }, { "name": "net-kingdom-admins", "description": "Current lightweight admin group for the platform-root identity.", "subsystem": "LLDAP", "responsibility": "identity-admin", "email": role_email(data, "role_identity_admin_email"), "location": str(data.get("identity_group_reference") or "Not recorded."), "state": state_value(yes(data, "identity_group_confirmed"), bool(data.get("identity_group_reference"))), }, { "name": "platform-root password entry", "description": "Password-safe entry for the dedicated identity password. Value is never stored here.", "subsystem": "password safe", "responsibility": "platform-root-custodian", "email": role_email(data, "role_platform_custodian_email"), "location": "operator password safe / offline packet", "state": state_value(yes(data, "password_safe_confirmed")), }, { "name": "TOTP token", "description": "privacyIDEA token for the platform-root login path.", "subsystem": "privacyIDEA", "responsibility": "platform-root-custodian", "email": role_email(data, "role_platform_custodian_email"), "location": str(data.get("mfa_enrollment_reference") or "Not recorded."), "state": state_value(second_factor_ready(data), yes(data, "mfa_enrolled_confirmed")), }, { "name": "age recipient", "description": "Public recipient used for encrypted bootstrap bundles.", "subsystem": "custodian age envelope", "responsibility": "recovery-custodian", "email": role_email(data, "role_recovery_custodian_email"), "location": age_public_key_fingerprint(public_key) or "Not recorded.", "state": state_value(bool(public_key) and yes(data, "custodian_age_public_key_confirmed"), bool(public_key)), }, { "name": "age private key reference", "description": "Non-secret pointer to the private age key location.", "subsystem": "custodian age envelope", "responsibility": "recovery-custodian", "email": role_email(data, "role_recovery_custodian_email"), "location": str(data.get("custodian_age_private_key_reference") or "Not recorded."), "state": state_value(yes(data, "custodian_age_private_key_confirmed"), bool(data.get("custodian_age_private_key_reference"))), }, { "name": "secrets.enc", "description": "Encrypted bootstrap bundle.", "subsystem": "sso-mfa/bootstrap", "responsibility": "recovery-custodian", "email": role_email(data, "role_recovery_custodian_email"), "location": str(state["encrypted_bundle_path"]), "state": state_value(bool(state["encrypted_bundle_exists"]), False, bool(state["plaintext_secrets_present"])), }, { "name": "custody strategy", "description": "Selected OpenBao ceremony control model.", "subsystem": "custody model", "responsibility": "platform-root-custodian", "email": role_email(data, "role_platform_custodian_email"), "location": str(data.get("custody_mode") or "Not selected."), "state": state_value(yes(data, "custody_mode_approved"), data.get("custody_mode") in VALID_CUSTODY_MODES), }, { "name": "recovery material", "description": "Recovery references for identity, MFA, age key, and encrypted bootstrap bundle.", "subsystem": "custody packet", "responsibility": "recovery-custodian", "email": role_email(data, "role_recovery_custodian_email"), "location": "offline packet / password safe references", "state": state_value(yes(data, "recovery_confirmed")), }, { "name": "OpenBao custody packet", "description": "Ceremony envelope with share assignment slots and root-token disposition plan.", "subsystem": "Railiance OpenBao", "responsibility": "openbao-ceremony-operator", "email": role_email(data, "role_openbao_operator_email"), "location": "offline ceremony packet", "state": state_value(yes(data, "custody_packet_prepared")), }, add_taint( { "name": "unseal shares", "description": "Real OpenBao shares produced by init. They must be routed directly to approved custody locations.", "subsystem": "Railiance OpenBao", "responsibility": "openbao-ceremony-operator", "email": role_email(data, "role_openbao_operator_email"), "location": "created during attended init; not stored here", "state": state_value(yes(data, "openbao_unseal_keys_rotated"), init_output or yes(data, "openbao_initialized")), }, openbao_unseal_taint, ), add_taint( { "name": "initial root token", "description": "OpenBao bootstrap token produced by init. Use only for first configuration and disposition.", "subsystem": "Railiance OpenBao", "responsibility": "openbao-ceremony-operator", "email": role_email(data, "role_openbao_operator_email"), "location": "created during attended init; never pasted here", "state": state_value(root_disposition in {"revoked", "offline-sealed"}, init_output or yes(data, "openbao_initialized")), }, openbao_root_taint, ), ] def command_payloads(data: dict[str, Any]) -> list[dict[str, str]]: preflight_done = yes(data, "openbao_preflight_passed") custody_approved = custody_mode_approved(data) init_output = yes(data, "openbao_init_output_produced") initialized = yes(data, "openbao_initialized") post_unseal_verified = yes(data, "openbao_post_unseal_verified") initial_config_applied = yes(data, "openbao_initial_config_applied") trial_exposed = yes(data, "openbao_trial_material_exposed") response_complete = yes(data, "openbao_compromise_response_complete") openbao_direct_taint = openbao_trial_taint(data, "direct") openbao_downstream_taint = openbao_trial_taint(data, "downstream") preflight_state = "done" if preflight_done else "todo" preflight_reason = "Safe preflight passed." if not preflight_done: preflight_reason = "Run after custody approval and before init." if not custody_approved: preflight_state = "blocked" preflight_reason = "Approve the selected custody strategy first." init_state = "done" if init_output or initialized else "todo" init_reason = "Init output was produced. Do not paste unseal shares or root token here." if not (init_output or initialized): init_reason = "Run once, attended, after OpenBao preflight." if not preflight_done: init_state = "blocked" init_reason = "OpenBao preflight must pass first." config_state = "done" if initial_config_applied else "todo" config_reason = "Initial configuration is recorded. Root-token disposition remains a separate gate." if not initial_config_applied: config_reason = "Configure OpenBao, then record this non-secret completion flag." if trial_exposed and initialized and not response_complete: if initial_config_applied: config_reason = "Initial configuration is recorded on a tainted workpath. Complete root-token disposition and compromise response before production trust." else: config_reason = "Tainted by trial key-material exposure. Operator may proceed, but record the taint and complete rotation, reset, or another compromise response before production trust." if not initialized: config_state = "blocked" config_reason = "OpenBao must be initialized and unsealed first." verify_state = "done" if post_unseal_verified else "todo" verify_reason = "Post-unseal readiness has been verified." if not post_unseal_verified: verify_reason = "Verify filesystem and post-unseal readiness before live secrets move in." if not initialized: verify_state = "blocked" verify_reason = "OpenBao must be initialized and unsealed first." return [ { "name": "OpenBao preflight", "description": "Run safe status and verification checks. Does not initialize OpenBao.", "status": preflight_state, "status_reason": preflight_reason, "command": ( "python3 tools/security-bootstrap-console/security_bootstrap_console.py " "openbao-preflight --railiance-path ../railiance-platform --run" ), }, add_taint( { "name": "OpenBao init ceremony", "description": "Creates real unseal shares and the initial root token. Run once, attended.", "status": init_state, "status_reason": init_reason, "command": "kubectl exec -n openbao openbao-0 -- bao operator init -key-shares=3 -key-threshold=2", }, openbao_direct_taint if init_output or initialized else {}, ), add_taint( { "name": "OpenBao initial configuration", "description": "Apply first audit, auth, mount, and policy configuration after unseal.", "status": config_state, "status_reason": config_reason, "command": "make -C ../railiance-platform openbao-configure-initial", }, openbao_downstream_taint if initialized else {}, ), add_taint( { "name": "OpenBao post-unseal verification", "description": "Verify filesystem and post-unseal readiness before live secrets move in.", "status": verify_state, "status_reason": verify_reason, "command": "make -C ../railiance-platform openbao-verify-post-unseal", }, openbao_downstream_taint if initialized else {}, ), ] def runbook_payloads(data: dict[str, Any]) -> list[dict[str, str]]: initialized = yes(data, "openbao_initialized") initial_config_applied = yes(data, "openbao_initial_config_applied") trial_exposed = yes(data, "openbao_trial_material_exposed") response_complete = yes(data, "openbao_compromise_response_complete") keys_rotated = yes(data, "openbao_unseal_keys_rotated") openbao_direct_taint = openbao_trial_taint(data, "direct") openbao_downstream_taint = openbao_trial_taint(data, "downstream") openbao_unseal_taint = openbao_trial_taint(data, "downstream", "unseal") key_compromise_location = "Template: record the exposure, choose reset versus rotation, inspect affected paths, and record only the non-secret outcome." if trial_exposed and not response_complete: key_compromise_location = "Active template: trial material is marked exposed; choose reset versus rotation before production trust." elif response_complete: key_compromise_location = "Template retained for future incidents; the current non-secret compromise response is marked complete." rotate_location = "Template: unseal OpenBao, start rotate-keys, submit current shares by prompt, route new shares to custody holders, then record confirmation." if not initialized: rotate_location = "Template: prepare now; execution needs an unsealed OpenBao instance and a quorum of current shares." lockdown_location = "Template: use a root/sudo-capable token, run emergency seal, confirm Sealed true, then record the drill or incident outcome." if not initialized: lockdown_location = "Template: prepare now; execution only changes availability while OpenBao is unsealed and serving requests." restore_location = "Template: prepare workspace, snapshot, encrypt to age recipient, restore in isolation, verify, destroy drill environment, then record evidence." if not initial_config_applied: restore_location = "Template: prepare now; execute after initial OpenBao configuration exists and before live secrets move in." return [ add_taint( { "name": "Key material compromised", "description": "Respond when init output, unseal shares, or root-token material escaped the custody boundary.", "subsystem": "Railiance OpenBao", "responsibility": "openbao-ceremony-operator", "email": role_email(data, "role_openbao_operator_email"), "location": key_compromise_location, "state": "template", }, openbao_direct_taint if trial_exposed and not response_complete else {}, ), add_taint( { "name": "Generate new unseal keys", "description": "Rotate OpenBao Shamir unseal shares after a trial exposure or planned custody migration.", "subsystem": "Railiance OpenBao", "responsibility": "openbao-ceremony-operator", "email": role_email(data, "role_openbao_operator_email"), "location": rotate_location, "state": "template", }, openbao_unseal_taint if trial_exposed and not response_complete and not keys_rotated else {}, ), add_taint( { "name": "Emergency lock-down", "description": "Seal Railiance OpenBao to stop access to stored OpenBao assets until a later unseal quorum reopens it.", "subsystem": "Railiance OpenBao", "responsibility": "openbao-ceremony-operator", "email": role_email(data, "role_openbao_operator_email"), "location": lockdown_location, "state": "template", }, openbao_downstream_taint if initialized else {}, ), add_taint( { "name": "Restore drill", "description": "Prove that Railiance OpenBao can be snapshotted, restored into isolation, unsealed, and verified before production trust.", "subsystem": "Railiance OpenBao", "responsibility": "openbao-ceremony-operator", "email": role_email(data, "role_openbao_operator_email"), "location": restore_location, "state": "template", }, openbao_downstream_taint if initialized else {}, ), ] def runbook_command_payloads(data: dict[str, Any]) -> list[dict[str, str]]: initialized = yes(data, "openbao_initialized") trial_exposed = yes(data, "openbao_trial_material_exposed") response_complete = yes(data, "openbao_compromise_response_complete") openbao_direct_taint = openbao_trial_taint(data, "direct") openbao_downstream_taint = openbao_trial_taint(data, "downstream") def token_prompt_command(bao_command: str) -> str: return ( "printf 'OpenBao token: ' >&2\n" "read -rs OPENBAO_TOKEN\n" "printf '\\n' >&2\n" "printf '%s\\n' \"$OPENBAO_TOKEN\" | kubectl exec -i -n openbao openbao-0 -- " f"sh -c 'read -r BAO_TOKEN; export BAO_TOKEN; {bao_command}'\n" "unset OPENBAO_TOKEN" ) def interactive_token_command(bao_command: str, prompt_nonce: bool = False) -> str: nonce_prompt = ( ' printf "Rotation nonce: " >&2\n' ' read -r ROTATION_NONCE\n' if prompt_nonce else "" ) return ( "kubectl exec -it -n openbao openbao-0 -- sh -lc '\n" " restore_tty() { stty echo 2>/dev/null || true; }\n" " trap restore_tty EXIT INT TERM\n" f"{nonce_prompt}" " printf \"OpenBao token: \" >&2\n" " stty -echo\n" " read -r BAO_TOKEN\n" " stty echo\n" " printf \"\\n\" >&2\n" " export BAO_TOKEN\n" f" {bao_command}\n" " unset BAO_TOKEN\n" "'" ) def action(name: str, description: str, command: str, taint: dict[str, str] | None = None) -> dict[str, str]: return add_taint( { "name": name, "description": description, "command": command, }, taint or {}, ) seal_command = token_prompt_command("bao operator seal") audit_list_command = token_prompt_command("bao audit list") secrets_list_command = token_prompt_command("bao secrets list") auth_list_command = token_prompt_command("bao auth list") platform_admin_token_command = token_prompt_command( "bao token create -policy=platform-admin -period=24h -orphan" ) rotate_init_command = interactive_token_command( "bao operator rotate-keys -init -key-shares=3 -key-threshold=2" ) rotate_status_command = interactive_token_command("bao operator rotate-keys -status") rotate_submit_command = interactive_token_command( 'bao operator rotate-keys -nonce="$ROTATION_NONCE"', prompt_nonce=True, ) rotate_cancel_command = interactive_token_command("bao operator rotate-keys -cancel") openbao_status_command = "kubectl exec -n openbao openbao-0 -- bao status" direct_taint = openbao_direct_taint if initialized else {} downstream_taint = openbao_downstream_taint if initialized else {} compromise_taint = openbao_downstream_taint if trial_exposed and not response_complete else {} public_key = extract_age_public_key(data.get("custodian_age_public_key")) quoted_public_key = shlex.quote(public_key if public_key else "") snapshot_workspace_command = ( 'export RESTORE_DRILL_DIR="${RESTORE_DRILL_DIR:-/tmp/netkingdom-openbao-restore-drill}"\n' 'mkdir -p "$RESTORE_DRILL_DIR"\n' 'chmod 700 "$RESTORE_DRILL_DIR"\n' 'printf "Restore drill workspace: %s\\n" "$RESTORE_DRILL_DIR"' ) snapshot_command = ( 'export RESTORE_DRILL_DIR="${RESTORE_DRILL_DIR:-/tmp/netkingdom-openbao-restore-drill}"\n' 'mkdir -p "$RESTORE_DRILL_DIR"\n' 'chmod 700 "$RESTORE_DRILL_DIR"\n' "printf 'OpenBao token: ' >&2\n" "read -rs OPENBAO_TOKEN\n" "printf '\\n' >&2\n" 'printf \'%s\\n\' "$OPENBAO_TOKEN" | kubectl exec -i -n openbao openbao-0 -- ' "sh -c 'read -r BAO_TOKEN; export BAO_TOKEN; bao operator raft snapshot save /tmp/openbao-raft.snap'\n" "unset OPENBAO_TOKEN\n" 'kubectl cp openbao/openbao-0:/tmp/openbao-raft.snap "$RESTORE_DRILL_DIR/openbao-raft.snap"\n' 'kubectl exec -n openbao openbao-0 -- rm -f /tmp/openbao-raft.snap\n' 'sha256sum "$RESTORE_DRILL_DIR/openbao-raft.snap" | tee "$RESTORE_DRILL_DIR/openbao-raft.snap.sha256"' ) encrypt_snapshot_command = ( 'export RESTORE_DRILL_DIR="${RESTORE_DRILL_DIR:-/tmp/netkingdom-openbao-restore-drill}"\n' f'age -r {quoted_public_key} -o "$RESTORE_DRILL_DIR/openbao-raft.snap.age" "$RESTORE_DRILL_DIR/openbao-raft.snap"\n' 'sha256sum "$RESTORE_DRILL_DIR/openbao-raft.snap.age" | tee "$RESTORE_DRILL_DIR/openbao-raft.snap.age.sha256"\n' 'if command -v shred >/dev/null 2>&1; then\n' ' shred -u "$RESTORE_DRILL_DIR/openbao-raft.snap"\n' "else\n" ' rm -f "$RESTORE_DRILL_DIR/openbao-raft.snap"\n' "fi" ) isolated_restore_command = ( "cat <<'RESTORE_DRILL'\n" "Isolated Railiance OpenBao restore drill evidence required:\n" "1. Use a disposable cluster, VM, or namespace. Do not restore into namespace=openbao.\n" "2. Deploy a fresh OpenBao instance with empty storage.\n" "3. Decrypt the encrypted snapshot only inside the isolated drill workspace.\n" "4. Restore with: bao operator raft snapshot restore -force /tmp/openbao-raft.snap\n" "5. Unseal the isolated instance with the current trial/drill shares.\n" "6. Verify status, mounts, auth methods, policies, and a non-production test secret read.\n" "7. Destroy the isolated environment and record only non-secret evidence in this UI.\n" "RESTORE_DRILL" ) return [ action( "OpenBao status", "Show seal, initialization, storage, and HA state for the OpenBao pod. This command does not require a token.", openbao_status_command, downstream_taint, ), action( "Unseal by prompt", "Provide threshold shares interactively. Never put shares on the command line.", "kubectl exec -it -n openbao openbao-0 -- bao operator unseal", direct_taint, ), action( "bao audit list", "List OpenBao audit devices using a token entered by local hidden prompt.", audit_list_command, downstream_taint, ), action( "bao secrets list", "List enabled OpenBao secrets engines using a token entered by local hidden prompt.", secrets_list_command, downstream_taint, ), action( "bao auth list", "List enabled OpenBao auth methods using a token entered by local hidden prompt.", auth_list_command, downstream_taint, ), action( "Create platform-admin token", "Create a renewable 24-hour non-root OpenBao token with the platform-admin policy. The emitted token is secret; store it immediately through the approved operator secret path.", platform_admin_token_command, downstream_taint, ), action( "Start unseal-key rotation", "Run once to start a new 3-share, threshold-2 rotation. If rotation is already in progress, do not rerun init; check status and submit existing shares.", rotate_init_command, compromise_taint, ), action( "Check unseal-key rotation status", "Inspect the active rotation without submitting a share. Use after init or when OpenBao says rotation is already in progress.", rotate_status_command, compromise_taint, ), action( "Submit current shares for rotation", "Enter the nonce from rotation init, then token and existing unseal-share prompts. Repeat with distinct current shares until progress reaches the threshold.", rotate_submit_command, compromise_taint, ), action( "Cancel key rotation", "Abort a started rotation if the nonce, share handling, or ceremony context is wrong. Requires a root/sudo-capable token.", rotate_cancel_command, compromise_taint, ), action( "Emergency seal OpenBao", "Prompt locally for an OpenBao token and seal Railiance OpenBao without placing the token on the command line.", seal_command, downstream_taint, ), action( "Confirm sealed status", "Check that Railiance OpenBao reports Sealed true after an emergency seal.", openbao_status_command, downstream_taint, ), action( "Prepare restore drill workspace", "Create a local restricted directory for temporary snapshot evidence.", snapshot_workspace_command, downstream_taint, ), action( "Create encrypted-restore snapshot source", "Prompt locally for an OpenBao token, create a Raft snapshot in the pod, copy it out, remove the pod copy, and record a plaintext hash before encryption.", snapshot_command, downstream_taint, ), action( "Encrypt restore snapshot", "Encrypt the Raft snapshot to the custodian age recipient and remove the local plaintext snapshot. Replace if no recipient is recorded.", encrypt_snapshot_command, downstream_taint, ), action( "Run isolated restore proof", "Checklist for proving the snapshot can restore into an isolated OpenBao instance before live secrets move in.", isolated_restore_command, downstream_taint, ), action( "Run post-restore readiness check", "Re-run the Railiance post-unseal checks after restore evidence has been captured.", "make -C ../railiance-platform openbao-verify-post-unseal", downstream_taint, ), ] def section_gate_payloads(data: dict[str, Any]) -> list[dict[str, str]]: role_rows = role_payloads(data) role_ok = all(row["state"] != "nil" for row in role_rows[:5]) subsystem_rows = subsystem_payloads(data) integration_rows = integration_payloads(data) admin_rows = admin_identity_payloads(data) artifact_rows = artifact_payloads(data) cleanup_done = yes(data, "cleanup_complete") reopened = yes(data, "platform_reopened") return [ { "key": "intro", "name": "Introduction & Actors", "status": "ok", "reason": "NetKingdom purpose, global actors, and supported custody shapes are visible.", }, { "key": "subsystems", "name": "Subsystems & Scopes", "status": "ok" if all(row["state"] in {"ok", "set"} for row in subsystem_rows) else "err", "reason": "Subsystems have install/access evidence." if all(row["state"] != "nil" for row in subsystem_rows) else "Complete subsystem setup fields and confirmations.", }, { "key": "roles", "name": "Roles & Responsibilities", "status": "ok" if role_ok else "set", "reason": "All active bootstrap roles have a designated email." if role_ok else "Assign an email to each active bootstrap role.", }, { "key": "integrations", "name": "Integration & Tests", "status": "ok" if all(row["state"] == "ok" for row in integration_rows[:4]) else "set", "reason": "Identity and OpenBao preflight checks are done." if all(row["state"] == "ok" for row in integration_rows[:4]) else "Run or confirm the remaining integration checks.", }, { "key": "admin-identity", "name": "Admin Identity Integration", "status": "ok" if all(row["state"] == "ok" for row in admin_rows) else "set", "reason": "OpenBao admin access is bound to NetKingdom OIDC claims." if all(row["state"] == "ok" for row in admin_rows) else "Run only the remaining operator cards: live KeyCape deploy, protected OpenBao auth setup, or login verification.", }, { "key": "artifacts", "name": "Artefacts & Locations", "status": "ok" if all(row["state"] != "nil" for row in artifact_rows[:10]) else "set", "reason": "Core custody artefacts have locations or confirmations." if all(row["state"] != "nil" for row in artifact_rows[:10]) else "Record missing artefact locations and confirmations.", }, { "key": "runbooks", "name": "Usecases & Runbooks", "status": "ok", "reason": "Reusable actions and runbook templates are available; execution state is tracked by Integration & Tests and explicit confirmations.", }, { "key": "handover", "name": "Final Handover", "status": "ok" if reopened else "set" if cleanup_done else "err", "reason": ( "Platform is marked reopened under custody." if reopened else "Cleanup is complete; confirm the platform has reopened under custody." if cleanup_done else "Complete cleanup, taint response, and hardening before reopening." ), }, { "key": "terminology", "name": "Terminology & Patterns", "status": "ok", "reason": "Shared NetKingdom security terms and patterns are documented for operators.", }, ] def status_payload(data: dict[str, Any], metadata_path: Path) -> dict[str, Any]: merged = metadata_template() merged.update(data) gates = build_gates(merged) metadata_view = dict(merged) public_key = extract_age_public_key(metadata_view.get("custodian_age_public_key")) metadata_view["custodian_age_public_key"] = public_key metadata_view["custodian_age_public_key_fingerprint"] = age_public_key_fingerprint(public_key) for role_key in ( "role_setup_operator_email", "role_platform_custodian_email", "role_identity_admin_email", "role_openbao_operator_email", "role_recovery_custodian_email", "role_future_quorum_email", ): if not metadata_view.get(role_key): metadata_view[role_key] = role_email(merged, role_key) return { "metadata_path": str(metadata_path), "stage": derive_stage(merged), "stage_steps": stage_payloads(merged), "next_action": next_action(gates, kit_validation(merged), merged), "gates": [gate_payload(gate) for gate in gates], "key_custody_gates": [gate_payload(gate) for gate in key_custody_validation(merged)], "kit_gates": [gate_payload(gate) for gate in kit_validation(merged)], "section_gates": section_gate_payloads(merged), "roles": role_payloads(merged), "subsystems": subsystem_payloads(merged), "integrations": integration_payloads(merged), "admin_identity_integrations": admin_identity_payloads(merged), "admin_identity_commands": admin_identity_command_payloads(merged), "runbooks": runbook_payloads(merged), "artifacts": artifact_payloads(merged), "commands": command_payloads(merged), "runbook_commands": runbook_command_payloads(merged), "bootstrap_secret_state": bootstrap_secret_state(), "metadata": metadata_view, "approval_phrase": APPROVAL_PHRASE, "custody_approval_modes": sorted(CUSTODY_APPROVAL_MODES), } def oidc_code_challenge() -> str: digest = hashlib.sha256(OIDC_CODE_VERIFIER.encode("ascii")).digest() return base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=") def local_oidc_redirect_uri(host: str) -> str: clean_host = host.strip() or "127.0.0.1:8876" return f"http://{clean_host}/oidc/callback" def local_oidc_start_url(host: str) -> str: params = { "response_type": "code", "client_id": OIDC_CLIENT_ID, "redirect_uri": local_oidc_redirect_uri(host), "scope": OIDC_SCOPE, "state": "netkingdom-bootstrap-login-check", "code_challenge": oidc_code_challenge(), "code_challenge_method": "S256", } return f"{KEYCAPE_ISSUER}/authorize?{urllib.parse.urlencode(params)}" def decode_jwt_payload(token: str) -> dict[str, Any]: parts = token.split(".") if len(parts) < 2: return {} payload = parts[1] payload += "=" * (-len(payload) % 4) try: decoded = base64.urlsafe_b64decode(payload.encode("ascii")) claims = json.loads(decoded) except (ValueError, json.JSONDecodeError): return {} return claims if isinstance(claims, dict) else {} def exchange_oidc_code(code: str, host: str) -> dict[str, Any]: form = urllib.parse.urlencode( { "grant_type": "authorization_code", "client_id": OIDC_CLIENT_ID, "code": code, "code_verifier": OIDC_CODE_VERIFIER, "redirect_uri": local_oidc_redirect_uri(host), } ).encode("utf-8") request = urllib.request.Request( f"{KEYCAPE_ISSUER}/token", data=form, headers={"Content-Type": "application/x-www-form-urlencoded"}, method="POST", ) with urllib.request.urlopen(request, timeout=10) as response: payload = json.loads(response.read().decode("utf-8")) if not isinstance(payload, dict): raise ValueError("token endpoint returned a non-object JSON payload") return payload def oidc_result_html(query: str, host: str) -> str: params = urllib.parse.parse_qs(query) error = params.get("error", [""])[0] description = params.get("error_description", [""])[0] code = params.get("code", [""])[0] state = params.get("state", [""])[0] title = "OIDC Login Check" status = "Waiting for callback result." rows: list[tuple[str, str]] = [] note = ( "No tokens or OTP values are stored by this local page. If token exchange " "succeeds, only non-secret claims are shown." ) if error: status = "Login did not complete." rows.append(("Error", error)) if description: rows.append(("Description", description)) elif not code: status = "No authorization code was returned." note = ( "Start the check from the bootstrap console. If the browser never " "returns here, KeyCape may still need its public Authelia redirect " "configuration or a browser OTP prompt." ) else: try: token_payload = exchange_oidc_code(code, host) claims = decode_jwt_payload(str(token_payload.get("access_token", ""))) status = "OIDC login path completed." rows.extend( [ ("State", state or "(none)"), ("Issuer", str(claims.get("iss", ""))), ("Audience", str(claims.get("aud", ""))), ("Subject", str(claims.get("sub", ""))), ("Username", str(claims.get("preferred_username", ""))), ("Email", str(claims.get("email", ""))), ("Groups", json.dumps(claims.get("groups", []))), ] ) note = ( "Return to the bootstrap console, check OIDC login verified for " "the same account, and save progress." ) except urllib.error.HTTPError as exc: body = exc.read(1000).decode("utf-8", "replace") status = "Authorization returned, but token exchange failed." rows.extend( [ ("HTTP status", str(exc.code)), ("Endpoint", f"{KEYCAPE_ISSUER}/token"), ("Response", body), ] ) note = ( "This usually means the live KeyCape config has not yet registered " "this local callback URI, the code expired, or the OTP browser " "prompt path is still incomplete." ) except Exception as exc: # noqa: BLE001 - local diagnostic page status = "Authorization returned, but token exchange could not run." rows.append(("Error", str(exc))) table_rows = "\n".join( f"{html.escape(label)}{html.escape(value)}" for label, value in rows ) return f""" {html.escape(title)}

{html.escape(status)}

{html.escape(note)}

{table_rows}
Return to bootstrap console
""" def ui_html() -> str: return """ NetKingdom Security Bootstrap
NetKingdom control surface

Guided security bootstrap

Stage Loading
Next safe action Loading
Metadata Loading
1. Introduction & Actorsnil
Loading introduction gate.

NetKingdom is the operating frame for accountable digital sovereignty: identity, custody, secrets, approvals, recovery, and emergency control are made explicit before subsystems are trusted with live assets.

role
King / CEO
Ultimate accountable authority for the kingdom, final custody intent, and existential risk decisions.
Global actor
Always present
role
Guardian / CSO
Owns security posture, emergency lock-down readiness, incident response, and custody-risk review.
Global actor
Required for quorum setups
role
Armorer / CTO
Owns technical implementation, platform architecture, automation, and operational correctness.
Global actor
Required for quorum setups
role
Steward / COO
Owns process reliability, onboarding, offboarding, runbook hygiene, and day-to-day operating cadence.
Global actor
Five-person setup
role
Treasurer / CFO
Owns financial authorization, asset inventory, economic risk, and continuity of valuable holdings.
Global actor
Five-person setup
AOne Person KingdomKing / CEO only. Best for early bootstrap and solo custody; fast but has no human quorum.
BThree Person KingdomKing / CEO plus two others. Decisions and access control target a two-of-three quorum.
CFive Person KingdomAll global actors active. Decisions and access control target a three-of-five quorum.
3. Roles & Responsibilitiesnil
Loading role gate.

Define who is accountable for each bootstrap role before touching subsystem-specific controls. Role chips in every record show the role name; hover them to see the designated email.

Change responsibilitiesset
2. Subsystems & Scopesnil
Loading subsystem gate.

This section is about installing each subsystem and establishing initial user access. Integration checks come later.

4. Integration & Testsnil
Loading integration gate.

This section tracks stateful integration runbooks and gates. Status belongs to these tasks; reusable command-only actions live below in Usecases & Runbooks.

5. Admin Identity Integrationnil
Loading admin identity gate.

This stage replaces manually minted OpenBao admin tokens as the normal path. Development-owned client definitions are shipped in source; operator-owned cards below apply live config, use protected OpenBao prompts, or verify login.

  • Development/config: openbao-admin is defined in sso-mfa/k8s/keycape/create-secrets.sh; no manual KeyCape registration is expected.
  • Operator deployment: apply the updated KeyCape config to live keycape-config and restart KeyCape if the live client is missing.
  • Protected OpenBao step: configure auth/keycape with a hidden root/sudo token prompt, then verify platform-root can complete MFA-backed login.
6. Artefacts & Locationsnil
Loading artefact gate.

This is the final overview of what has been established. Locations are references only; passwords, OTP seeds, age private keys, unseal shares, and root tokens are never recorded here.

  • Recovery material: password recovery, MFA recovery/re-enrollment, custodian age-key recovery, encrypted bundle recovery, and notification contact references.
  • Custody packet: selected strategy, recovery references, OpenBao init checklist, unseal-share assignment slots, quorum plan, root-token disposition plan, and signature/date line.
Secret capture is enforced by architecture: the control surface does not request secrets, and the gate checks local metadata plus plaintext bootstrap-secret presence.
Approval phrase for selected strategy
7. Usecases & Runbooksnil
Loading runbook gate.

This section contains reusable actions and runbook templates. Action cards are copyable command references without task status; runbook task state belongs to Integration & Tests or to the explicit confirmation gates.

8. Final Handovernil
Loading final handover gate.

This is the line between trial/bootstrap and operating under custody. Mark these only after root-token disposition, restore proof, taint response, and cleanup have been handled outside this UI.

  • Cleanup means bootstrap-era passwords, service tokens, temporary admin paths, trial OpenBao material, and plaintext secret exposure have been rotated, retired, reset, or explicitly accepted as residual risk.
  • Reopen means the platform is intentionally operated again under the selected custody strategy, with break-glass and restore paths known.
9. Terminology & Patternsnil
Loading terminology gate.

These terms apply across NetKingdom. Subsystems may have their own names, but the control surface keeps the cross-subsystem security pattern visible.

term
Subsystem
A bounded tool or service with its own admin surface, state, and security model.
Pattern
LLDAP, privacyIDEA, KeyCape, OpenBao
term
Artefact
A named credential, key, token, group, policy, packet, bundle, or evidence item with an owner and location.
Pattern
Track name, subsystem, role, location, state
term
Custody
The human and technical arrangement that controls who can access or recover critical material.
Pattern
Single king, two-of-three, three-of-five
term
Quorum
A threshold of actors or shares required before sensitive decisions or recovery actions can proceed.
Pattern
2 of 3, 3 of 5
term
Taint
A warning that work is downstream of a compromised, exposed, or trial-only artefact. Taint informs; it does not hard-block the operator.
Pattern
Keeps source reference visible
term
Break-glass
Emergency material that can restore control but is not part of normal daily operation.
Pattern
Retrieve, use, review, rotate if needed
term
Seal and unseal
For Railiance OpenBao, sealing stops access to stored OpenBao assets; unsealing requires the configured threshold of unseal shares.
Pattern
Emergency lock-down and reopening
term
Root token vs unseal shares
The Railiance OpenBao initial root token is a superuser API credential after unseal. Unseal shares control whether OpenBao can decrypt and serve requests at all.
Pattern
Different layers, different custody
term
Least privilege
Daily access should use scoped credentials; root, quorum, and break-glass paths remain exceptional and reviewable.
Pattern
Delegate, expire, audit

Actions

Waiting for local approval.
""" def make_ui_handler(metadata_path: Path) -> type[BaseHTTPRequestHandler]: class SecurityBootstrapUIHandler(BaseHTTPRequestHandler): server_version = "SecurityBootstrapUI/1.0" def log_message(self, format: str, *args: Any) -> None: print(f"{self.address_string()} - {format % args}") def send_json(self, status: HTTPStatus, payload: dict[str, Any]) -> None: body = json.dumps(payload, indent=2).encode("utf-8") self.send_response(status.value) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Cache-Control", "no-store") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def send_html(self, status: HTTPStatus, body: str) -> None: encoded = body.encode("utf-8") self.send_response(status.value) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Cache-Control", "no-store") self.send_header("Content-Length", str(len(encoded))) self.end_headers() self.wfile.write(encoded) def do_GET(self) -> None: parsed = urllib.parse.urlparse(self.path) if parsed.path == "/" or parsed.path == "/index.html": self.send_html(HTTPStatus.OK, ui_html()) return if parsed.path == "/api/status": data = load_metadata(metadata_path) if not data: data = metadata_template() self.send_json(HTTPStatus.OK, status_payload(data, metadata_path)) return if parsed.path == "/oidc/start": host = self.headers.get("Host", "127.0.0.1:8876") self.send_response(HTTPStatus.FOUND.value) self.send_header("Location", local_oidc_start_url(host)) self.send_header("Cache-Control", "no-store") self.end_headers() return if parsed.path == "/oidc/callback": host = self.headers.get("Host", "127.0.0.1:8876") self.send_html(HTTPStatus.OK, oidc_result_html(parsed.query, host)) return self.send_error(HTTPStatus.NOT_FOUND.value) def do_POST(self) -> None: if self.path not in {"/api/approve-custody", "/api/save-progress"}: self.send_error(HTTPStatus.NOT_FOUND.value) return try: length = int(self.headers.get("Content-Length", "0")) except ValueError: self.send_json(HTTPStatus.BAD_REQUEST, {"errors": ["Invalid content length."]}) return if length > 65536: self.send_json(HTTPStatus.REQUEST_ENTITY_TOO_LARGE, {"errors": ["Request too large."]}) return try: payload = json.loads(self.rfile.read(length) or b"{}") except json.JSONDecodeError as exc: self.send_json(HTTPStatus.BAD_REQUEST, {"errors": [f"Invalid JSON: {exc}"]}) return if not isinstance(payload, dict): self.send_json(HTTPStatus.BAD_REQUEST, {"errors": ["JSON body must be an object."]}) return existing = load_metadata(metadata_path) if self.path == "/api/save-progress": saved = save_progress_metadata(existing, payload) write_metadata(metadata_path, saved) response = status_payload(saved, metadata_path) response["message"] = "Progress saved." self.send_json(HTTPStatus.OK, response) return approval_phrase = str(payload.get("approval_phrase", "")) approved_by = str(payload.get("approved_by", "")) approved, errors = approve_custody_metadata(existing, payload, approval_phrase, approved_by) if errors: response = status_payload(approved, metadata_path) response["errors"] = errors self.send_json(HTTPStatus.BAD_REQUEST, response) return write_metadata(metadata_path, approved) response = status_payload(approved, metadata_path) response["message"] = "Selected custody strategy approved." self.send_json(HTTPStatus.OK, response) return SecurityBootstrapUIHandler def serve_web_ui(args: argparse.Namespace) -> int: metadata_path = args.metadata or DEFAULT_METADATA_PATH handler = make_ui_handler(metadata_path) httpd = ThreadingHTTPServer((args.host, args.port), handler) host = html.escape(args.host) print("SECURITY BOOTSTRAP UI") print("") print(f"URL: http://{host}:{args.port}") print(f"Metadata: {metadata_path}") print("") print("Local non-secret custody approval only. Press Ctrl+C to stop.") try: httpd.serve_forever() except KeyboardInterrupt: print("") print("Stopped.") finally: httpd.server_close() return 0 def refuse_live_init() -> int: print("REFUSED") print("") print("This console does not run bao operator init.") print("Use the human-attended Railiance OpenBao ceremony after all gates pass.") return 2 def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Non-secret NetKingdom security bootstrap console.", ) parser.add_argument( "--metadata", type=Path, help="Optional non-secret metadata JSON file.", ) sub = parser.add_subparsers(dest="command", required=True) sub.add_parser("status", help="Show trust stage, gates, and next safe action.") sub.add_parser("king-kit", help="Print king credential kit checklist.") sub.add_parser("validate-king-kit", help="Validate non-secret king credential metadata.") approve = sub.add_parser("approve-custody-mode", help="Approve a live-init-ready custody mode.") approve.add_argument( "--mode", choices=sorted(VALID_CUSTODY_MODES), default="temporary-single-king", help="Custody mode to record. two-of-three-planned cannot approve live init.", ) approve.add_argument("--credential-label", dest="credential_label") approve.add_argument("--setup-operator", dest="setup_operator") approve.add_argument("--notification-contact", dest="notification_contact") approve.add_argument("--storage-class", dest="storage_class", action="append") approve.add_argument("--mfa-class", dest="mfa_class", choices=sorted(VALID_MFA_CLASSES)) approve.add_argument("--mfa-enrolled-confirmed", action="store_true") approve.add_argument( "--mfa-enrollment-source", choices=sorted(VALID_MFA_ENROLLMENT_SOURCES), default=None, ) approve.add_argument("--mfa-enrollment-reference") approve.add_argument("--recovery-confirmed", action="store_true") approve.add_argument("--custody-packet-prepared", action="store_true") approve.add_argument("--no-secret-capture-confirmed", action="store_true") approve.add_argument("--approval-phrase", default="") approve.add_argument("--approved-by", default="") approve.add_argument("--notes") approve.add_argument( "--yes", action="store_true", help=f'Use the required approval phrase "{APPROVAL_PHRASE}" non-interactively.', ) sub.add_parser("custody-packet", help="Print blank offline custody packet template.") sub.add_parser("handover-checklist", help="Print handover and cleanup checklist.") sub.add_parser("metadata-template", help="Print non-secret metadata JSON template.") sub.add_parser("refuse-live-init", help="Explain why live OpenBao init is refused.") web = sub.add_parser("web-ui", help="Serve a local custody approval UI.") web.add_argument("--host", default="127.0.0.1", help="Bind host. Defaults to localhost.") web.add_argument("--port", type=int, default=8765, help="Bind port. Defaults to 8765.") preflight = sub.add_parser("openbao-preflight", help="Show or run safe OpenBao preflight.") preflight.add_argument( "--railiance-path", default="../railiance-platform", help="Path to railiance-platform repo.", ) preflight.add_argument( "--run", action="store_true", help="Run safe preflight targets. Does not run OpenBao init.", ) return parser def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) data = load_metadata(args.metadata) if args.command == "status": print_status(data) return 0 if args.command == "king-kit": print_king_kit() return 0 if args.command == "validate-king-kit": return print_validate_king_kit(data) if args.command == "approve-custody-mode": return print_approve_custody_mode(args, data) if args.command == "custody-packet": print_custody_packet() return 0 if args.command == "handover-checklist": print_handover_checklist() return 0 if args.command == "metadata-template": print(json.dumps(metadata_template(), indent=2)) return 0 if args.command == "openbao-preflight": return print_openbao_preflight(args) if args.command == "web-ui": return serve_web_ui(args) if args.command == "refuse-live-init": return refuse_live_init() parser.error(f"unknown command: {args.command}") return 2 if __name__ == "__main__": raise SystemExit(main())