{html.escape(status)}
{html.escape(note)}
#!/usr/bin/env python3 """Non-secret security bootstrap console. This tool is intentionally conservative. It prints status, gates, checklists, and templates. It does not collect or store secret values, and it refuses to run live OpenBao initialization. """ from __future__ import annotations import argparse import base64 import hashlib import html import json import subprocess import sys import urllib.error import urllib.parse import urllib.request from dataclasses import dataclass from datetime import datetime, timezone from http import HTTPStatus from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Any DEFAULT_STAGE = "S1 - Low-trust assembly" DEFAULT_METADATA_PATH = Path("/tmp/net-kingdom-security-bootstrap.json") APPROVAL_PHRASE = "approve custody mode" VALID_STORAGE_CLASSES = {"password-safe", "offline-packet", "hardware-token"} VALID_MFA_CLASSES = {"totp", "webauthn", "hardware-token"} VALID_MFA_ENROLLMENT_SOURCES = { "identity-provider", "external-verifier", "hardware-registration", "deferred", } VALID_CUSTODY_MODES = {"temporary-single-king", "two-of-three-planned", "two-of-three-ready"} CUSTODY_APPROVAL_MODES = {"temporary-single-king", "two-of-three-ready"} KEYCAPE_ISSUER = "https://kc.coulomb.social" OIDC_CLIENT_ID = "netkingdom-bootstrap-console" OIDC_SCOPE = "openid profile email groups" OIDC_CODE_VERIFIER = "netkingdom-bootstrap-local-oidc-verifier-2026-v1" AGE_PUBLIC_PREFIX = "age1" AGE_PRIVATE_MARKER = "AGE-SECRET-KEY-1" @dataclass(frozen=True) class Gate: name: str status: str reason: str def load_metadata(path: Path | None) -> dict[str, Any]: if path is None or not path.exists(): return {} try: data = json.loads(path.read_text()) except json.JSONDecodeError as exc: raise SystemExit(f"metadata is not valid JSON: {path}: {exc}") from exc if not isinstance(data, dict): raise SystemExit(f"metadata root must be an object: {path}") return data def write_metadata(path: Path, data: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp_path = path.with_name(f".{path.name}.tmp") tmp_path.write_text(json.dumps(data, indent=2, sort_keys=True) + "\n") tmp_path.replace(path) def utc_now() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat() def normalize_storage_classes(value: Any) -> list[str]: if isinstance(value, str): raw_values = [item.strip() for item in value.split(",")] elif isinstance(value, list): raw_values = [str(item).strip() for item in value] else: raw_values = [] values = [item for item in raw_values if item] return sorted(set(values)) def yes(data: dict[str, Any], key: str) -> bool: return data.get(key) is True def second_factor_ready(data: dict[str, Any]) -> bool: return ( data.get("mfa_class") in VALID_MFA_CLASSES and yes(data, "mfa_enrolled_confirmed") and data.get("mfa_enrollment_source") in VALID_MFA_ENROLLMENT_SOURCES - {"deferred"} ) def second_factor_reason(data: dict[str, Any]) -> str: if data.get("mfa_class") not in VALID_MFA_CLASSES: return "Select TOTP, WebAuthn, or hardware-token." if data.get("mfa_enrollment_source") == "deferred": return "Deferred factor enrollment blocks live OpenBao custody." if not yes(data, "mfa_enrolled_confirmed"): return "Confirm the factor was enrolled with the authority that will verify it; record no seed." if data.get("mfa_enrollment_source") not in VALID_MFA_ENROLLMENT_SOURCES: return "Record the non-secret enrollment source." return "Second factor enrollment is confirmed without recording seed material." def identity_account_ready(data: dict[str, Any]) -> bool: return ( yes(data, "identity_account_created") and bool(data.get("identity_account_reference")) and yes(data, "identity_group_confirmed") and bool(data.get("identity_group_reference")) ) def identity_account_reason(data: dict[str, Any]) -> str: if not yes(data, "identity_account_created"): return "Create the dedicated platform-root account in LLDAP first." if not data.get("identity_account_reference"): return "Record a non-secret account reference such as platform-root@lldap." if not yes(data, "identity_group_confirmed"): return "Confirm the account is assigned to the required LLDAP admin group." if not data.get("identity_group_reference"): return "Record the non-secret group reference such as net-kingdom-admins." return "Dedicated identity account is recorded without storing its password." def password_safe_ready(data: dict[str, Any]) -> bool: return yes(data, "password_safe_confirmed") def identity_login_ready(data: dict[str, Any]) -> bool: return yes(data, "oidc_login_verified") def extract_age_public_key(value: Any) -> str: if value is None: return "" text = str(value).strip() if AGE_PRIVATE_MARKER in text: return "" for token in text.replace(",", " ").split(): clean = token.strip().strip('"').strip("'") if clean.startswith(AGE_PUBLIC_PREFIX): return clean return "" def age_public_key_fingerprint(public_key: str) -> str: if not public_key: return "" digest = hashlib.sha256(public_key.encode("utf-8")).hexdigest() return f"sha256:{digest[:16]}" def bootstrap_secret_state() -> dict[str, Any]: root = Path.cwd() bootstrap_dir = root / "sso-mfa" / "bootstrap" encrypted_dir = bootstrap_dir / "secrets.enc" plaintext_dir = bootstrap_dir / "secrets" encrypted_files = sorted(encrypted_dir.glob("*/*.age")) if encrypted_dir.exists() else [] plaintext_files = sorted(path for path in plaintext_dir.glob("*/*") if path.is_file()) if plaintext_dir.exists() else [] return { "bootstrap_dir": str(bootstrap_dir), "encrypted_bundle_path": str(encrypted_dir), "encrypted_bundle_exists": encrypted_dir.exists(), "encrypted_file_count": len(encrypted_files), "plaintext_secrets_path": str(plaintext_dir), "plaintext_secrets_present": plaintext_dir.exists(), "plaintext_file_count": len(plaintext_files), } def key_custody_validation(data: dict[str, Any]) -> list[Gate]: public_key = extract_age_public_key(data.get("custodian_age_public_key")) state = bootstrap_secret_state() plaintext_present = bool(state["plaintext_secrets_present"]) return [ Gate( "Custodian public key", "done" if public_key and yes(data, "custodian_age_public_key_confirmed") else "blocked", "Register the custodian age public key used to encrypt bootstrap bundles.", ), Gate( "Private key custody", "done" if data.get("custodian_age_private_key_reference") and yes(data, "custodian_age_private_key_confirmed") else "blocked", "Record only where the private key is held; never paste it into this UI.", ), Gate( "Encrypted bundle", "done" if state["encrypted_bundle_exists"] and state["encrypted_file_count"] else "blocked", "Encrypted bootstrap secrets should live under sso-mfa/bootstrap/secrets.enc/.", ), Gate( "Plaintext exposure", "blocked" if plaintext_present else "done", "Plaintext sso-mfa/bootstrap/secrets/ is present; shred it after any apply ceremony." if plaintext_present else "No plaintext bootstrap secrets directory is present.", ), ] def kit_validation(data: dict[str, Any]) -> list[Gate]: storage_classes = data.get("storage_classes", []) if not isinstance(storage_classes, list): storage_classes = [] storage_values = {str(item) for item in storage_classes} custody_mode = data.get("custody_mode", "") return [ Gate( "Credential label", "done" if data.get("credential_label") else "blocked", "Use a dedicated label such as platform-root.", ), Gate( "Identity account", "done" if identity_account_ready(data) else "blocked", identity_account_reason(data), ), Gate( "Setup operator/contact", "done" if data.get("setup_operator") and data.get("notification_contact") else "blocked", "Record non-secret setup operator and notification contact.", ), Gate( "Storage class", "done" if storage_values & VALID_STORAGE_CLASSES else "blocked", "Select password-safe, offline-packet, hardware-token, or a combination.", ), Gate( "Password safe storage", "done" if password_safe_ready(data) else "blocked", "Confirm the credential password is stored in the password safe without recording it here.", ), Gate( "Second factor", "done" if second_factor_ready(data) else "blocked", second_factor_reason(data), ), Gate( "Identity login path", "done" if identity_login_ready(data) else "blocked", "Verify the dedicated account can complete the NetKingdom login path.", ), Gate( "Recovery material", "done" if yes(data, "recovery_confirmed") else "blocked", "Confirm recovery material exists without recording values.", ), Gate( "Custody packet", "done" if yes(data, "custody_packet_prepared") else "blocked", "Prepare the offline custody packet.", ), Gate( "No secret capture", "done" if yes(data, "no_secret_capture_confirmed") else "blocked", "Confirm no secret values were stored in metadata, Git, State Hub, chat, tickets, or email.", ), Gate( "Custody mode", "done" if custody_mode in VALID_CUSTODY_MODES else "blocked", "Approve temporary-single-king, two-of-three-planned, or two-of-three-ready.", ), ] def king_kit_ready(data: dict[str, Any]) -> bool: gates = kit_validation(data) required = [gate for gate in gates if gate.name != "Custody mode"] return all(gate.status == "done" for gate in required) def custody_mode_approved(data: dict[str, Any]) -> bool: return data.get("custody_mode") in CUSTODY_APPROVAL_MODES and yes(data, "custody_mode_approved") def custody_mode_reason(data: dict[str, Any]) -> str: mode = data.get("custody_mode") if mode in CUSTODY_APPROVAL_MODES and yes(data, "custody_mode_approved"): return "Approved for the next gate under the selected custody mode." if mode == "two-of-three-planned": return "Two-of-three is recorded as the target, but live init stays blocked until it is ready." if mode in CUSTODY_APPROVAL_MODES and not yes(data, "custody_mode_approved"): return "Mode is selected but not yet explicitly approved." return "Choose temporary-single-king or two-of-three-ready for live OpenBao custody." def derive_stage(data: dict[str, Any]) -> str: if yes(data, "platform_reopened"): return "S5 - Reopen under custody" if yes(data, "cleanup_complete"): return "S4 - Cleanup and hardening" if yes(data, "openbao_initialized"): return "S3 - OpenBao bootstrap" if yes(data, "king_credential_ready") or king_kit_ready(data): return "S2 - King credential preparation" return DEFAULT_STAGE def build_gates(data: dict[str, Any]) -> list[Gate]: return [ Gate( "King credential kit", "done" if yes(data, "king_credential_ready") or king_kit_ready(data) else "blocked", "Dedicated king credential, second factor, and recovery storage.", ), Gate( "Custody mode", "done" if custody_mode_approved(data) else "blocked", custody_mode_reason(data), ), Gate( "OpenBao preflight", "done" if yes(data, "openbao_preflight_passed") else "blocked", "Run safe Railiance OpenBao status and verification checks.", ), Gate( "OpenBao init ceremony", "human" if not yes(data, "openbao_initialized") else "done", "Human-attended ceremony only. This console will not run init.", ), Gate( "Root-token disposition", "done" if data.get("root_token_disposition") in {"revoked", "offline-sealed"} else "blocked", "Root token is revoked or sealed offline without recording value.", ), Gate( "Restore drill", "done" if yes(data, "restore_drill_passed") else "blocked", "Snapshot and isolated restore proof before live secrets.", ), Gate( "Cleanup and rotation", "done" if yes(data, "cleanup_complete") else "blocked", "Bootstrap-era credentials, databases, and access paths reviewed.", ), ] def next_action(gates: list[Gate]) -> str: for gate in gates: if gate.status == "blocked": if gate.name == "King credential kit": return "Define king credential kit" if gate.name == "Custody mode": return "Choose custody mode" if gate.name == "OpenBao preflight": return "Run OpenBao preflight" if gate.name == "Root-token disposition": return "Record root-token disposition" if gate.name == "Restore drill": return "Run restore drill" if gate.name == "Cleanup and rotation": return "Complete handover cleanup" return "Review related workplans" def print_status(data: dict[str, Any]) -> None: merged = metadata_template() merged.update(data) gates = build_gates(merged) key_gates = key_custody_validation(merged) state = bootstrap_secret_state() print("SECURITY BOOTSTRAP") print("") print("Stage") print(derive_stage(merged)) print("") print("Next safe action") print(next_action(gates)) print("") print("Key custody") public_key = extract_age_public_key(merged.get("custodian_age_public_key")) print(f"- fingerprint: {age_public_key_fingerprint(public_key) or 'not registered'}") print(f"- encrypted bundle files: {state['encrypted_file_count']} at {state['encrypted_bundle_path']}") print(f"- plaintext secrets present: {state['plaintext_secrets_present']}") for gate in key_gates: print(f"- {gate.status}: {gate.name} - {gate.reason}") print("") print("Gates") for gate in gates: print(f"- {gate.status}: {gate.name} - {gate.reason}") print("") print("Available actions") print("1. king-kit") print("2. custody-packet") print("3. openbao-preflight") print("4. handover-checklist") print("5. metadata-template") print("6. approve-custody-mode") print("7. web-ui") print("") print("Refusal boundary") print("This console will not run bao operator init or collect secret values.") def print_king_kit() -> None: print("KING CREDENTIAL KIT") print("") rows = [ "Name the credential, for example platform-root.", "Choose storage: password safe, offline packet, hardware-backed, or a combination.", "Add a second factor: TOTP, WebAuthn, or hardware token.", "Prepare recovery material without recording values in software.", "Select custody mode: temporary-single-king, two-of-three-planned, or two-of-three-ready.", "Print or prepare the offline custody packet.", "Record only non-secret metadata.", ] for index, row in enumerate(rows, start=1): print(f"{index}. {row}") def print_validate_king_kit(data: dict[str, Any]) -> int: print("KING CREDENTIAL KIT VALIDATION") print("") if not data: print("No metadata loaded. Use --metadata with a non-secret JSON file.") print("Run metadata-template for the expected shape.") return 2 gates = kit_validation(data) for gate in gates: print(f"- {gate.status}: {gate.name} - {gate.reason}") print("") if king_kit_ready(data) and custody_mode_approved(data): print("Kit definition and custody-mode approval are complete.") print("Live OpenBao init remains a separate human-attended ceremony.") return 0 if king_kit_ready(data): print("Kit definition is complete except custody-mode approval.") print("Live OpenBao init is still blocked until T03 approves custody mode.") return 0 print("Kit definition is incomplete.") return 1 def merged_approval_metadata( existing: dict[str, Any], payload: dict[str, Any], ) -> dict[str, Any]: data = metadata_template() data.update(existing) text_fields = ( "credential_label", "bootstrap_mode", "identity_account_home", "identity_account_reference", "identity_group_reference", "custodian_age_private_key_reference", "setup_operator", "notification_contact", "mfa_class", "mfa_enrollment_source", "mfa_enrollment_reference", "custody_mode", "notes", ) for field in text_fields: if field in payload and payload[field] is not None: value = str(payload[field]).strip() data[field] = "" if AGE_PRIVATE_MARKER in value else value if "custodian_age_public_key" in payload: data["custodian_age_public_key"] = extract_age_public_key(payload["custodian_age_public_key"]) if "storage_classes" in payload: data["storage_classes"] = normalize_storage_classes(payload["storage_classes"]) for field in ( "custodian_age_public_key_confirmed", "custodian_age_private_key_confirmed", "recovery_confirmed", "custody_packet_prepared", "no_secret_capture_confirmed", "mfa_enrolled_confirmed", "identity_account_created", "identity_group_confirmed", "oidc_login_verified", "password_safe_confirmed", ): if field in payload: data[field] = payload[field] is True return data def save_progress_metadata(existing: dict[str, Any], payload: dict[str, Any]) -> dict[str, Any]: data = merged_approval_metadata(existing, payload) data["metadata_updated_at"] = utc_now() data["progress_scope"] = "Non-secret local bootstrap progress only." return data def validate_custody_approval( data: dict[str, Any], approval_phrase: str, ) -> list[str]: errors: list[str] = [] mode = data.get("custody_mode") if approval_phrase.strip().lower() != APPROVAL_PHRASE: errors.append(f'Type "{APPROVAL_PHRASE}" to approve custody mode.') if mode not in VALID_CUSTODY_MODES: errors.append("Select a custody mode.") elif mode not in CUSTODY_APPROVAL_MODES: errors.append( "two-of-three-planned is a target state, not live-init approval. " "Use temporary-single-king now or two-of-three-ready when shares exist." ) for gate in kit_validation(data): if gate.name == "Custody mode": continue if gate.status != "done": errors.append(f"{gate.name}: {gate.reason}") return errors def approve_custody_metadata( existing: dict[str, Any], payload: dict[str, Any], approval_phrase: str, approver: str, ) -> tuple[dict[str, Any], list[str]]: data = merged_approval_metadata(existing, payload) errors = validate_custody_approval(data, approval_phrase) if errors: return data, errors data["king_credential_ready"] = True data["custody_mode_approved"] = True data["custody_approved_at"] = utc_now() data["custody_approved_by"] = approver or data.get("setup_operator", "") data["approval_scope"] = "Non-secret local custody-mode approval only. Does not run OpenBao init." return data, [] def print_approve_custody_mode(args: argparse.Namespace, data: dict[str, Any]) -> int: if args.metadata is None: print("ERROR: approve-custody-mode requires --metadata /path/to/non-secret.json", file=sys.stderr) return 2 approval_phrase = args.approval_phrase or "" if args.yes: approval_phrase = APPROVAL_PHRASE elif not approval_phrase: print("This writes non-secret custody approval metadata only.") print("It will not run OpenBao init and will not store secret values.") try: approval_phrase = input(f'Type "{APPROVAL_PHRASE}" to continue: ') except EOFError: approval_phrase = "" payload: dict[str, Any] = { "custody_mode": args.mode, } for key in ( "credential_label", "identity_account_home", "identity_account_reference", "identity_group_reference", "setup_operator", "notification_contact", "mfa_class", "mfa_enrollment_source", "mfa_enrollment_reference", "notes", ): value = getattr(args, key) if value is not None: payload[key] = value if args.storage_class: payload["storage_classes"] = args.storage_class for field in ( "recovery_confirmed", "custody_packet_prepared", "no_secret_capture_confirmed", "mfa_enrolled_confirmed", "identity_account_created", "identity_group_confirmed", "oidc_login_verified", "password_safe_confirmed", ): if getattr(args, field): payload[field] = True approved, errors = approve_custody_metadata( data, payload, approval_phrase, args.approved_by or "", ) if errors: print("CUSTODY MODE NOT APPROVED") print("") for error in errors: print(f"- {error}") return 1 write_metadata(args.metadata, approved) print("CUSTODY MODE APPROVED") print("") print(f"Metadata: {args.metadata}") print(f"Mode: {approved['custody_mode']}") print(f"Approved by: {approved.get('custody_approved_by', '')}") print(f"Approved at: {approved.get('custody_approved_at', '')}") print("") print("OpenBao init remains a separate human-attended ceremony.") return 0 def print_custody_packet() -> None: print("CUSTODY PACKET TEMPLATE") print("") print("Credential label:") print("Date:") print("Setup operator/contact:") print("Custody mode:") print("Notification contact:") print("") print("Storage location description:") print("Second-factor location description:") print("Recovery material location description:") print("") print("OpenBao share assignment rows:") print("- Share A:") print("- Share B:") print("- Share C:") print("") print("Root-token disposition:") print("Signature/date:") print("") print("Do not write this packet into Git, State Hub, chat, tickets, or email.") def print_handover_checklist() -> None: print("HANDOVER CHECKLIST") print("") rows = [ "King credential kit complete.", "OpenBao initialized and unsealed under approved custody mode.", "Root token revoked or sealed offline.", "Non-root platform admin path verified.", "Bootstrap-era database credentials rotated.", "Temporary admin accounts reviewed and removed or scoped.", "Kubernetes service accounts and privileged bindings reviewed.", "SOPS/age recipients and emergency bundle reviewed.", "Backup snapshot exists.", "Restore drill passed.", "Audit handling known.", "Remaining risk exceptions recorded with owner and date.", ] for row in rows: print(f"- {row}") def metadata_template() -> dict[str, Any]: return { "bootstrap_mode": "custody", "custodian_age_public_key": "", "custodian_age_public_key_confirmed": False, "custodian_age_private_key_reference": "", "custodian_age_private_key_confirmed": False, "credential_label": "platform-root", "identity_account_home": "lldap", "identity_account_reference": "", "identity_account_created": False, "identity_group_reference": "net-kingdom-admins", "identity_group_confirmed": False, "setup_operator": "tegwick", "notification_contact": "bernd.worsch@gmail.com", "storage_classes": ["password-safe", "offline-packet"], "password_safe_confirmed": False, "mfa_class": "totp", "mfa_enrolled_confirmed": False, "mfa_enrollment_source": "deferred", "mfa_enrollment_reference": "", "recovery_confirmed": False, "custody_packet_prepared": False, "no_secret_capture_confirmed": False, "king_credential_ready": False, "custody_mode": "", "custody_mode_approved": False, "custody_approved_at": "", "custody_approved_by": "", "approval_scope": "", "oidc_login_verified": False, "metadata_updated_at": "", "progress_scope": "", "openbao_preflight_passed": False, "openbao_initialized": False, "root_token_disposition": "", "restore_drill_passed": False, "cleanup_complete": False, "platform_reopened": False, "review_date": "", "notes": "Non-secret metadata only.", } def print_openbao_preflight(args: argparse.Namespace) -> int: print("OPENBAO PREFLIGHT") print("") print("Safe commands:") print(f"make -C {args.railiance_path} openbao-status") print(f"make -C {args.railiance_path} openbao-verify") print("") if not args.run: print("Dry run only. Pass --run to execute safe preflight commands.") return 0 railiance_path = Path(args.railiance_path).expanduser().resolve() if not railiance_path.is_dir(): print(f"ERROR: Railiance path not found: {railiance_path}", file=sys.stderr) return 2 for target in ("openbao-status", "openbao-verify"): result = subprocess.run( ["make", "-C", str(railiance_path), target], check=False, ) if result.returncode != 0: return result.returncode return 0 def gate_payload(gate: Gate) -> dict[str, str]: return { "name": gate.name, "status": gate.status, "reason": gate.reason, } def status_payload(data: dict[str, Any], metadata_path: Path) -> dict[str, Any]: merged = metadata_template() merged.update(data) gates = build_gates(merged) metadata_view = dict(merged) public_key = extract_age_public_key(metadata_view.get("custodian_age_public_key")) metadata_view["custodian_age_public_key"] = public_key metadata_view["custodian_age_public_key_fingerprint"] = age_public_key_fingerprint(public_key) return { "metadata_path": str(metadata_path), "stage": derive_stage(merged), "next_action": next_action(gates), "gates": [gate_payload(gate) for gate in gates], "key_custody_gates": [gate_payload(gate) for gate in key_custody_validation(merged)], "kit_gates": [gate_payload(gate) for gate in kit_validation(merged)], "bootstrap_secret_state": bootstrap_secret_state(), "metadata": metadata_view, "approval_phrase": APPROVAL_PHRASE, "custody_approval_modes": sorted(CUSTODY_APPROVAL_MODES), } def oidc_code_challenge() -> str: digest = hashlib.sha256(OIDC_CODE_VERIFIER.encode("ascii")).digest() return base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=") def local_oidc_redirect_uri(host: str) -> str: clean_host = host.strip() or "127.0.0.1:8876" return f"http://{clean_host}/oidc/callback" def local_oidc_start_url(host: str) -> str: params = { "response_type": "code", "client_id": OIDC_CLIENT_ID, "redirect_uri": local_oidc_redirect_uri(host), "scope": OIDC_SCOPE, "state": "netkingdom-bootstrap-login-check", "code_challenge": oidc_code_challenge(), "code_challenge_method": "S256", } return f"{KEYCAPE_ISSUER}/authorize?{urllib.parse.urlencode(params)}" def decode_jwt_payload(token: str) -> dict[str, Any]: parts = token.split(".") if len(parts) < 2: return {} payload = parts[1] payload += "=" * (-len(payload) % 4) try: decoded = base64.urlsafe_b64decode(payload.encode("ascii")) claims = json.loads(decoded) except (ValueError, json.JSONDecodeError): return {} return claims if isinstance(claims, dict) else {} def exchange_oidc_code(code: str, host: str) -> dict[str, Any]: form = urllib.parse.urlencode( { "grant_type": "authorization_code", "client_id": OIDC_CLIENT_ID, "code": code, "code_verifier": OIDC_CODE_VERIFIER, "redirect_uri": local_oidc_redirect_uri(host), } ).encode("utf-8") request = urllib.request.Request( f"{KEYCAPE_ISSUER}/token", data=form, headers={"Content-Type": "application/x-www-form-urlencoded"}, method="POST", ) with urllib.request.urlopen(request, timeout=10) as response: payload = json.loads(response.read().decode("utf-8")) if not isinstance(payload, dict): raise ValueError("token endpoint returned a non-object JSON payload") return payload def oidc_result_html(query: str, host: str) -> str: params = urllib.parse.parse_qs(query) error = params.get("error", [""])[0] description = params.get("error_description", [""])[0] code = params.get("code", [""])[0] state = params.get("state", [""])[0] title = "OIDC Login Check" status = "Waiting for callback result." rows: list[tuple[str, str]] = [] note = ( "No tokens or OTP values are stored by this local page. If token exchange " "succeeds, only non-secret claims are shown." ) if error: status = "Login did not complete." rows.append(("Error", error)) if description: rows.append(("Description", description)) elif not code: status = "No authorization code was returned." note = ( "Start the check from the bootstrap console. If the browser never " "returns here, KeyCape may still need its public Authelia redirect " "configuration or a browser OTP prompt." ) else: try: token_payload = exchange_oidc_code(code, host) claims = decode_jwt_payload(str(token_payload.get("access_token", ""))) status = "OIDC login path completed." rows.extend( [ ("State", state or "(none)"), ("Issuer", str(claims.get("iss", ""))), ("Audience", str(claims.get("aud", ""))), ("Subject", str(claims.get("sub", ""))), ("Username", str(claims.get("preferred_username", ""))), ("Email", str(claims.get("email", ""))), ("Groups", json.dumps(claims.get("groups", []))), ] ) note = ( "Return to the bootstrap console, check OIDC login verified for " "the same account, and save progress." ) except urllib.error.HTTPError as exc: body = exc.read(1000).decode("utf-8", "replace") status = "Authorization returned, but token exchange failed." rows.extend( [ ("HTTP status", str(exc.code)), ("Endpoint", f"{KEYCAPE_ISSUER}/token"), ("Response", body), ] ) note = ( "This usually means the live KeyCape config has not yet registered " "this local callback URI, the code expired, or the OTP browser " "prompt path is still incomplete." ) except Exception as exc: # noqa: BLE001 - local diagnostic page status = "Authorization returned, but token exchange could not run." rows.append(("Error", str(exc))) table_rows = "\n".join( f"
{html.escape(note)}
Loading