Show compromised OpenBao paths as tainted

This commit is contained in:
2026-05-25 14:57:53 +02:00
parent 907675b4f4
commit 9afe30f49f
2 changed files with 236 additions and 127 deletions

View File

@@ -911,6 +911,28 @@ def state_value(ok: bool, set_value: bool = False, err: bool = False) -> str:
return "nil"
def openbao_trial_taint(data: dict[str, Any], relation: str = "downstream") -> dict[str, Any]:
if not yes(data, "openbao_trial_material_exposed") or yes(data, "openbao_compromise_response_complete"):
return {}
relation_text = "Directly marked" if relation == "direct" else "Downstream"
return {
"tainted": True,
"taint_source": "Trial key material exposed",
"taint_reference": "Usecases & Runbooks / Trial key material exposed",
"taint_reason": (
f"{relation_text} from recorded OpenBao trial key-material exposure. "
"Operator may proceed, but resulting evidence and work should be treated as tainted "
"until rotation, reset, or another compromise response is recorded."
),
}
def add_taint(row: dict[str, Any], taint: dict[str, Any]) -> dict[str, Any]:
if taint:
row.update(taint)
return row
def subsystem_payloads(data: dict[str, Any]) -> list[dict[str, str]]:
public_key = extract_age_public_key(data.get("custodian_age_public_key"))
state = bootstrap_secret_state()
@@ -980,6 +1002,7 @@ def subsystem_payloads(data: dict[str, Any]) -> list[dict[str, str]]:
def integration_payloads(data: dict[str, Any]) -> list[dict[str, str]]:
openbao_direct_taint = openbao_trial_taint(data, "direct")
return [
{
"name": "LLDAP admin group assignment",
@@ -1017,15 +1040,18 @@ def integration_payloads(data: dict[str, Any]) -> list[dict[str, str]]:
"location": "../railiance-platform",
"state": state_value(yes(data, "openbao_preflight_passed"), yes(data, "custody_mode_approved")),
},
{
"name": "OpenBao init/unseal ceremony",
"description": "Attended ceremony creates unseal shares and initial root token outside this UI.",
"subsystem": "Railiance OpenBao",
"responsibility": "openbao-ceremony-operator",
"email": role_email(data, "role_openbao_operator_email"),
"location": "operator shell with custody packet present",
"state": state_value(yes(data, "openbao_initialized"), yes(data, "openbao_preflight_passed")),
},
add_taint(
{
"name": "OpenBao init/unseal ceremony",
"description": "Attended ceremony creates unseal shares and initial root token outside this UI.",
"subsystem": "Railiance OpenBao",
"responsibility": "openbao-ceremony-operator",
"email": role_email(data, "role_openbao_operator_email"),
"location": "operator shell with custody packet present",
"state": state_value(yes(data, "openbao_initialized"), yes(data, "openbao_preflight_passed")),
},
openbao_direct_taint,
),
]
@@ -1034,6 +1060,7 @@ def artifact_payloads(data: dict[str, Any]) -> list[dict[str, str]]:
state = bootstrap_secret_state()
root_disposition = str(data.get("root_token_disposition") or "")
init_output = yes(data, "openbao_init_output_produced")
openbao_direct_taint = openbao_trial_taint(data, "direct")
return [
{
"name": "platform-root",
@@ -1125,24 +1152,30 @@ def artifact_payloads(data: dict[str, Any]) -> list[dict[str, str]]:
"location": "offline ceremony packet",
"state": state_value(yes(data, "custody_packet_prepared")),
},
{
"name": "unseal shares",
"description": "Real OpenBao shares produced by init. They must be routed directly to approved custody locations.",
"subsystem": "Railiance OpenBao",
"responsibility": "openbao-ceremony-operator",
"email": role_email(data, "role_openbao_operator_email"),
"location": "created during attended init; not stored here",
"state": state_value(yes(data, "openbao_unseal_keys_rotated"), init_output or yes(data, "openbao_initialized")),
},
{
"name": "initial root token",
"description": "OpenBao bootstrap token produced by init. Use only for first configuration and disposition.",
"subsystem": "Railiance OpenBao",
"responsibility": "openbao-ceremony-operator",
"email": role_email(data, "role_openbao_operator_email"),
"location": "created during attended init; never pasted here",
"state": state_value(root_disposition in {"revoked", "offline-sealed"}, init_output or yes(data, "openbao_initialized")),
},
add_taint(
{
"name": "unseal shares",
"description": "Real OpenBao shares produced by init. They must be routed directly to approved custody locations.",
"subsystem": "Railiance OpenBao",
"responsibility": "openbao-ceremony-operator",
"email": role_email(data, "role_openbao_operator_email"),
"location": "created during attended init; not stored here",
"state": state_value(yes(data, "openbao_unseal_keys_rotated"), init_output or yes(data, "openbao_initialized")),
},
openbao_direct_taint,
),
add_taint(
{
"name": "initial root token",
"description": "OpenBao bootstrap token produced by init. Use only for first configuration and disposition.",
"subsystem": "Railiance OpenBao",
"responsibility": "openbao-ceremony-operator",
"email": role_email(data, "role_openbao_operator_email"),
"location": "created during attended init; never pasted here",
"state": state_value(root_disposition in {"revoked", "offline-sealed"}, init_output or yes(data, "openbao_initialized")),
},
openbao_direct_taint,
),
]
@@ -1153,8 +1186,11 @@ def command_payloads(data: dict[str, Any]) -> list[dict[str, str]]:
initialized = yes(data, "openbao_initialized")
post_unseal_verified = yes(data, "openbao_post_unseal_verified")
trial_exposed = yes(data, "openbao_trial_material_exposed")
response_complete = yes(data, "openbao_compromise_response_complete")
keys_rotated = yes(data, "openbao_unseal_keys_rotated")
root_disposed = data.get("root_token_disposition") in {"revoked", "offline-sealed"}
openbao_direct_taint = openbao_trial_taint(data, "direct")
openbao_downstream_taint = openbao_trial_taint(data, "downstream")
status_state = "todo"
status_reason = "Run any time to inspect the current OpenBao deployment state."
@@ -1189,17 +1225,15 @@ def command_payloads(data: dict[str, Any]) -> list[dict[str, str]]:
unseal_state = "blocked"
unseal_reason = "OpenBao init output must be produced first."
if trial_exposed and initialized and not keys_rotated:
config_state = "done" if root_disposed else "todo"
config_reason = "Initial configuration and root-token disposition are recorded."
if not root_disposed:
config_reason = "Configure OpenBao, then revoke or offline-seal the root token."
if trial_exposed and initialized and not response_complete:
config_reason = "Tainted by trial key-material exposure. Operator may proceed, but record the taint and complete rotation, reset, or another compromise response before production trust."
if not initialized:
config_state = "blocked"
config_reason = "Trial key material is exposed; rotate unseal keys or reset before configuration."
else:
config_state = "done" if root_disposed else "todo"
config_reason = "Initial configuration and root-token disposition are recorded."
if not root_disposed:
config_reason = "Configure OpenBao, then revoke or offline-seal the root token."
if not initialized:
config_state = "blocked"
config_reason = "OpenBao must be initialized and unsealed first."
config_reason = "OpenBao must be initialized and unsealed first."
verify_state = "done" if post_unseal_verified else "todo"
verify_reason = "Post-unseal readiness has been verified."
@@ -1227,34 +1261,46 @@ def command_payloads(data: dict[str, Any]) -> list[dict[str, str]]:
"openbao-preflight --railiance-path ../railiance-platform --run"
),
},
{
"name": "OpenBao init ceremony",
"description": "Creates real unseal shares and the initial root token. Run once, attended.",
"status": init_state,
"status_reason": init_reason,
"command": "kubectl exec -n openbao openbao-0 -- bao operator init -key-shares=3 -key-threshold=2",
},
{
"name": "OpenBao unseal prompt",
"description": "Enter unseal shares by interactive terminal prompt. Do not place shares on the command line.",
"status": unseal_state,
"status_reason": unseal_reason,
"command": "kubectl exec -it -n openbao openbao-0 -- bao operator unseal",
},
{
"name": "OpenBao initial configuration",
"description": "Apply first audit, auth, mount, and policy configuration after unseal.",
"status": config_state,
"status_reason": config_reason,
"command": "make -C ../railiance-platform openbao-configure-initial",
},
{
"name": "OpenBao post-unseal verification",
"description": "Verify filesystem and post-unseal readiness before live secrets move in.",
"status": verify_state,
"status_reason": verify_reason,
"command": "make -C ../railiance-platform openbao-verify-post-unseal",
},
add_taint(
{
"name": "OpenBao init ceremony",
"description": "Creates real unseal shares and the initial root token. Run once, attended.",
"status": init_state,
"status_reason": init_reason,
"command": "kubectl exec -n openbao openbao-0 -- bao operator init -key-shares=3 -key-threshold=2",
},
openbao_direct_taint if init_output or initialized else {},
),
add_taint(
{
"name": "OpenBao unseal prompt",
"description": "Enter unseal shares by interactive terminal prompt. Do not place shares on the command line.",
"status": unseal_state,
"status_reason": unseal_reason,
"command": "kubectl exec -it -n openbao openbao-0 -- bao operator unseal",
},
openbao_direct_taint if initialized else {},
),
add_taint(
{
"name": "OpenBao initial configuration",
"description": "Apply first audit, auth, mount, and policy configuration after unseal.",
"status": config_state,
"status_reason": config_reason,
"command": "make -C ../railiance-platform openbao-configure-initial",
},
openbao_downstream_taint if initialized else {},
),
add_taint(
{
"name": "OpenBao post-unseal verification",
"description": "Verify filesystem and post-unseal readiness before live secrets move in.",
"status": verify_state,
"status_reason": verify_reason,
"command": "make -C ../railiance-platform openbao-verify-post-unseal",
},
openbao_downstream_taint if initialized else {},
),
]
@@ -1264,6 +1310,8 @@ def runbook_payloads(data: dict[str, Any]) -> list[dict[str, str]]:
trial_exposed = yes(data, "openbao_trial_material_exposed")
response_complete = yes(data, "openbao_compromise_response_complete")
keys_rotated = yes(data, "openbao_unseal_keys_rotated")
openbao_direct_taint = openbao_trial_taint(data, "direct")
openbao_downstream_taint = openbao_trial_taint(data, "downstream")
key_compromise_status = "done" if response_complete else "todo"
key_compromise_location = "Use for trial output exposure, screenshots, chat paste, shell history, or lost custody."
@@ -1281,24 +1329,30 @@ def runbook_payloads(data: dict[str, Any]) -> list[dict[str, str]]:
rotate_location = "Record the key-compromise condition or schedule a normal rotation first."
return [
{
"name": "Key material compromised",
"description": "Respond when init output, unseal shares, or root-token material escaped the custody boundary.",
"subsystem": "Railiance OpenBao",
"responsibility": "openbao-ceremony-operator",
"email": role_email(data, "role_openbao_operator_email"),
"location": key_compromise_location,
"state": key_compromise_status,
},
{
"name": "Generate new unseal keys",
"description": "Rotate OpenBao Shamir unseal shares after a trial exposure or planned custody migration.",
"subsystem": "Railiance OpenBao",
"responsibility": "openbao-ceremony-operator",
"email": role_email(data, "role_openbao_operator_email"),
"location": rotate_location,
"state": rotate_status,
},
add_taint(
{
"name": "Key material compromised",
"description": "Respond when init output, unseal shares, or root-token material escaped the custody boundary.",
"subsystem": "Railiance OpenBao",
"responsibility": "openbao-ceremony-operator",
"email": role_email(data, "role_openbao_operator_email"),
"location": key_compromise_location,
"state": key_compromise_status,
},
openbao_direct_taint if trial_exposed and not response_complete else {},
),
add_taint(
{
"name": "Generate new unseal keys",
"description": "Rotate OpenBao Shamir unseal shares after a trial exposure or planned custody migration.",
"subsystem": "Railiance OpenBao",
"responsibility": "openbao-ceremony-operator",
"email": role_email(data, "role_openbao_operator_email"),
"location": rotate_location,
"state": rotate_status,
},
openbao_downstream_taint if trial_exposed and not response_complete else {},
),
]
@@ -1308,6 +1362,8 @@ def runbook_command_payloads(data: dict[str, Any]) -> list[dict[str, str]]:
trial_exposed = yes(data, "openbao_trial_material_exposed")
response_complete = yes(data, "openbao_compromise_response_complete")
keys_rotated = yes(data, "openbao_unseal_keys_rotated")
openbao_direct_taint = openbao_trial_taint(data, "direct")
openbao_downstream_taint = openbao_trial_taint(data, "downstream")
exposure_status = "done" if trial_exposed else "todo"
exposure_reason = "Trial key-material exposure is recorded in non-secret metadata." if trial_exposed else "Record that the trial init output escaped custody before using affected material."
@@ -1334,48 +1390,66 @@ def runbook_command_payloads(data: dict[str, Any]) -> list[dict[str, str]]:
rotate_reason = "Record exposure or schedule a normal rotation before generating new shares."
return [
{
"name": "Record key exposure",
"description": "Non-secret metadata checkbox in this UI; do not paste exposed values.",
"status": exposure_status,
"status_reason": exposure_reason,
"command": "Use the checkbox: Trial key material exposed",
},
{
"name": "Unseal by prompt",
"description": "Provide threshold shares interactively. Never put shares on the command line.",
"status": unseal_status,
"status_reason": unseal_reason,
"command": "kubectl exec -it -n openbao openbao-0 -- bao operator unseal",
},
{
"name": "Start unseal-key rotation",
"description": "Generate a new 3-share, threshold-2 Shamir split after compromise or planned migration.",
"status": rotate_status,
"status_reason": rotate_reason,
"command": "kubectl exec -it -n openbao openbao-0 -- bao operator rotate-keys -init -key-shares=3 -key-threshold=2",
},
{
"name": "Submit current shares for rotation",
"description": "Repeat by prompt until the required threshold completes. Use the nonce from rotation init.",
"status": rotate_status,
"status_reason": rotate_reason,
"command": "kubectl exec -it -n openbao openbao-0 -- bao operator rotate-keys -nonce=<nonce-from-rotation-init>",
},
{
"name": "Cancel key rotation",
"description": "Abort a started rotation if the nonce, share handling, or ceremony context is wrong.",
"status": "todo" if initialized and not keys_rotated else "blocked",
"status_reason": "Available while a rotation is in progress." if initialized and not keys_rotated else "No active rotation expected.",
"command": "kubectl exec -it -n openbao openbao-0 -- bao operator rotate-keys -cancel",
},
{
"name": "Record compromise response complete",
"description": "Non-secret metadata checkbox after exposed material is rotated or the trial environment is reset.",
"status": response_status,
"status_reason": response_reason,
"command": "Use the checkbox: Compromise response complete",
},
add_taint(
{
"name": "Record key exposure",
"description": "Non-secret metadata checkbox in this UI; do not paste exposed values.",
"status": exposure_status,
"status_reason": exposure_reason,
"command": "Use the checkbox: Trial key material exposed",
},
openbao_direct_taint if trial_exposed and not response_complete else {},
),
add_taint(
{
"name": "Unseal by prompt",
"description": "Provide threshold shares interactively. Never put shares on the command line.",
"status": unseal_status,
"status_reason": unseal_reason,
"command": "kubectl exec -it -n openbao openbao-0 -- bao operator unseal",
},
openbao_direct_taint if initialized else {},
),
add_taint(
{
"name": "Start unseal-key rotation",
"description": "Generate a new 3-share, threshold-2 Shamir split after compromise or planned migration.",
"status": rotate_status,
"status_reason": rotate_reason,
"command": "kubectl exec -it -n openbao openbao-0 -- bao operator rotate-keys -init -key-shares=3 -key-threshold=2",
},
openbao_downstream_taint if trial_exposed and not response_complete else {},
),
add_taint(
{
"name": "Submit current shares for rotation",
"description": "Repeat by prompt until the required threshold completes. Use the nonce from rotation init.",
"status": rotate_status,
"status_reason": rotate_reason,
"command": "kubectl exec -it -n openbao openbao-0 -- bao operator rotate-keys -nonce=<nonce-from-rotation-init>",
},
openbao_downstream_taint if trial_exposed and not response_complete else {},
),
add_taint(
{
"name": "Cancel key rotation",
"description": "Abort a started rotation if the nonce, share handling, or ceremony context is wrong.",
"status": "todo" if initialized and not keys_rotated else "blocked",
"status_reason": "Available while a rotation is in progress." if initialized and not keys_rotated else "No active rotation expected.",
"command": "kubectl exec -it -n openbao openbao-0 -- bao operator rotate-keys -cancel",
},
openbao_downstream_taint if trial_exposed and not response_complete else {},
),
add_taint(
{
"name": "Record compromise response complete",
"description": "Non-secret metadata checkbox after exposed material is rotated or the trial environment was reset.",
"status": response_status,
"status_reason": response_reason,
"command": "Use the checkbox: Compromise response complete",
},
openbao_downstream_taint if trial_exposed and not response_complete else {},
),
]
@@ -1641,6 +1715,8 @@ def ui_html() -> str:
--warn: #fff2b8;
--human: #e6ecf7;
--bad: #f4d6d0;
--taint: #fde8e3;
--taint-line: #ba6b61;
}
* { box-sizing: border-box; }
body {
@@ -1887,6 +1963,10 @@ def ui_html() -> str:
background: #ffffff;
padding: 10px;
}
.record-row.tainted, .command-row.tainted {
background: var(--taint);
border-color: var(--taint-line);
}
.record-name {
font-weight: 650;
line-height: 1.2;
@@ -1898,6 +1978,15 @@ def ui_html() -> str:
margin-top: 3px;
overflow-wrap: anywhere;
}
.taint-note {
border-left: 4px solid var(--taint-line);
color: var(--ink);
font-size: 13px;
line-height: 1.35;
margin-top: 7px;
padding-left: 8px;
overflow-wrap: anywhere;
}
.record-context {
display: grid;
gap: 6px;
@@ -2377,12 +2466,23 @@ def ui_html() -> str:
return chip;
}
function makeTaintNote(item) {
if (!item || !item.tainted) return null;
const note = document.createElement("div");
note.className = "taint-note";
const source = item.taint_source || "upstream taint";
const reference = item.taint_reference ? " (" + item.taint_reference + ")" : "";
const reason = item.taint_reason ? ": " + item.taint_reason : "";
note.textContent = "Tainted from " + source + reference + reason;
return note;
}
function renderRecords(target, rows) {
const root = document.getElementById(target);
root.replaceChildren();
for (const record of rows || []) {
const row = document.createElement("div");
row.className = "record-row";
row.className = "record-row" + (record.tainted ? " tainted" : "");
row.append(makeStateBadge(record.state));
const identity = document.createElement("div");
@@ -2393,6 +2493,8 @@ def ui_html() -> str:
description.className = "record-description";
description.textContent = record.description;
identity.append(name, description);
const taintNote = makeTaintNote(record);
if (taintNote) identity.append(taintNote);
const context = document.createElement("div");
context.className = "record-context";
@@ -2431,7 +2533,7 @@ def ui_html() -> str:
root.replaceChildren();
for (const item of commands || []) {
const row = document.createElement("div");
row.className = "command-row";
row.className = "command-row" + (item.tainted ? " tainted" : "");
const head = document.createElement("div");
head.className = "command-head";
@@ -2446,6 +2548,8 @@ def ui_html() -> str:
statusReason.className = "record-description";
statusReason.textContent = item.status_reason || "";
title.append(name, description, statusReason);
const taintNote = makeTaintNote(item);
if (taintNote) title.append(taintNote);
const button = document.createElement("button");
button.className = "copy-button secondary";