Add NET-WP-0017 T02 closure validator

This commit is contained in:
2026-06-02 00:24:18 +02:00
parent cd82285efe
commit 31e6d6660f
4 changed files with 257 additions and 5 deletions

View File

@@ -647,9 +647,10 @@ def print_status(data: dict[str, Any]) -> None:
print("2. custody-packet")
print("3. openbao-preflight")
print("4. handover-checklist")
print("5. metadata-template")
print("6. approve-custody-mode")
print("7. web-ui")
print("5. validate-t02")
print("6. metadata-template")
print("7. approve-custody-mode")
print("8. web-ui")
print("")
print("Refusal boundary")
print("This console will not run bao operator init or collect secret values.")
@@ -694,6 +695,168 @@ def print_validate_king_kit(data: dict[str, Any]) -> int:
return 1
def valid_review_date(value: Any) -> bool:
try:
datetime.strptime(str(value).strip(), "%Y-%m-%d")
except (TypeError, ValueError):
return False
return True
def independent_quorum_holder_ready(data: dict[str, Any]) -> bool:
holder = str(data.get("role_future_quorum_email") or "").strip().lower()
if not holder:
return False
current_holders = {
str(data.get(key) or "").strip().lower()
for key in (
"role_setup_operator_email",
"role_platform_custodian_email",
"role_identity_admin_email",
"role_openbao_operator_email",
"role_recovery_custodian_email",
"notification_contact",
)
}
current_holders.discard("")
return holder not in current_holders
def independent_quorum_holder_reason(data: dict[str, Any]) -> str:
holder = str(data.get("role_future_quorum_email") or "").strip()
if not holder:
return "Record the next independent escrow/quorum holder email before moving beyond temporary single-king custody."
if not independent_quorum_holder_ready(data):
return "Future quorum holder is recorded but is not independent from the current bootstrap/custody roles."
return "Future quorum holder is recorded as independent from the current bootstrap/custody roles."
def audit_core_posture_ready(data: dict[str, Any]) -> bool:
if yes(data, "audit_core_production_sink_ready"):
return True
return (
yes(data, "audit_core_bootstrap_risk_accepted")
and bool(str(data.get("audit_core_risk_owner") or "").strip())
and valid_review_date(data.get("audit_core_risk_review_date"))
and bool(str(data.get("audit_core_risk_note") or "").strip())
)
def audit_core_posture_reason(data: dict[str, Any]) -> str:
if yes(data, "audit_core_production_sink_ready"):
return "Production Audit Core retention is recorded as ready."
missing: list[str] = []
if not yes(data, "audit_core_bootstrap_risk_accepted"):
missing.append("risk acceptance")
if not str(data.get("audit_core_risk_owner") or "").strip():
missing.append("owner")
if not valid_review_date(data.get("audit_core_risk_review_date")):
missing.append("review date")
if not str(data.get("audit_core_risk_note") or "").strip():
missing.append("risk note")
if missing:
return "Production Audit Core retention is deferred; record " + ", ".join(missing) + "."
return "Temporary bootstrap audit-retention risk exception is recorded with owner and review date."
def compact_command_output(text: str) -> str:
lines = [line.strip() for line in text.splitlines() if line.strip()]
return lines[-1] if lines else "No validator output captured."
def resolve_cli_path(value: str | Path) -> Path:
path = Path(value).expanduser()
if not path.is_absolute():
path = (Path.cwd() / path).resolve()
return path
def evidence_validator_gate(
name: str,
railiance_path: Path,
target: str,
env_key: str,
evidence_path: Path,
) -> Gate:
if not railiance_path.is_dir():
return Gate(name, "blocked", f"Railiance repo not found: {railiance_path}")
if not evidence_path.exists():
return Gate(name, "blocked", f"Evidence file missing: {evidence_path}")
result = subprocess.run(
["make", target, f"{env_key}={evidence_path}"],
cwd=railiance_path,
text=True,
capture_output=True,
check=False,
)
output = compact_command_output(result.stdout + "\n" + result.stderr)
if result.returncode == 0:
return Gate(name, "done", output)
return Gate(name, "blocked", output)
def t02_metadata_gates(data: dict[str, Any]) -> list[Gate]:
return [
Gate(
"Restore drill metadata",
"done" if yes(data, "restore_drill_passed") else "blocked",
"Record the restore drill completion flag only after evidence exists and validates.",
),
Gate(
"Emergency seal/unseal metadata",
"done" if yes(data, "openbao_emergency_lockdown_drilled") else "blocked",
"Record the emergency drill completion flag only after an attended drill and validated evidence.",
),
Gate(
"Next independent escrow holder",
"done" if independent_quorum_holder_ready(data) else "blocked",
independent_quorum_holder_reason(data),
),
Gate(
"Audit Core retention posture",
"done" if audit_core_posture_ready(data) else "blocked",
audit_core_posture_reason(data),
),
]
def print_validate_t02(args: argparse.Namespace, data: dict[str, Any]) -> int:
merged = metadata_template()
merged.update(data)
railiance_path = resolve_cli_path(args.railiance_path)
restore_evidence = resolve_cli_path(args.restore_evidence)
emergency_evidence = resolve_cli_path(args.emergency_evidence)
gates = t02_metadata_gates(merged)
gates.extend(
[
evidence_validator_gate(
"Restore drill evidence file",
railiance_path,
"openbao-validate-restore-evidence",
"OPENBAO_RESTORE_EVIDENCE",
restore_evidence,
),
evidence_validator_gate(
"Emergency drill evidence file",
railiance_path,
"openbao-validate-emergency-evidence",
"OPENBAO_EMERGENCY_EVIDENCE",
emergency_evidence,
),
]
)
print("NET-WP-0017-T02 VALIDATION")
print("")
for gate in gates:
print(f"- {gate.status}: {gate.name} - {gate.reason}")
print("")
if all(gate.status == "done" for gate in gates):
print("NET-WP-0017-T02 evidence and metadata gates are complete.")
return 0
print("NET-WP-0017-T02 is still open.")
return 1
def merged_approval_metadata(
existing: dict[str, Any],
payload: dict[str, Any],
@@ -720,6 +883,9 @@ def merged_approval_metadata(
"mfa_enrollment_reference",
"custody_mode",
"root_token_disposition",
"audit_core_risk_owner",
"audit_core_risk_review_date",
"audit_core_risk_note",
"notes",
)
for field in text_fields:
@@ -754,6 +920,8 @@ def merged_approval_metadata(
"openbao_oidc_auth_configured",
"openbao_oidc_admin_login_verified",
"restore_drill_passed",
"audit_core_production_sink_ready",
"audit_core_bootstrap_risk_accepted",
"cleanup_complete",
"platform_reopened",
):
@@ -978,6 +1146,11 @@ def metadata_template() -> dict[str, Any]:
"openbao_oidc_admin_login_verified": False,
"root_token_disposition": "",
"restore_drill_passed": False,
"audit_core_production_sink_ready": False,
"audit_core_bootstrap_risk_accepted": False,
"audit_core_risk_owner": "",
"audit_core_risk_review_date": "",
"audit_core_risk_note": "",
"cleanup_complete": False,
"platform_reopened": False,
"review_date": "",
@@ -3106,6 +3279,24 @@ def ui_html() -> str:
<li>Cleanup means bootstrap-era passwords, service tokens, temporary admin paths, trial OpenBao material, and plaintext secret exposure have been rotated, retired, reset, or explicitly accepted as residual risk.</li>
<li>Reopen means the platform is intentionally operated again under the selected custody strategy, with break-glass and restore paths known.</li>
</ul>
<div class="choice-list">
<label class="choice"><input id="audit_core_production_sink_ready" type="checkbox"><span><strong>Production Audit Core sink ready</strong><span>Durable tenant-aware audit retention is available beyond the OpenBao audit PVC.</span></span></label>
<label class="choice"><input id="audit_core_bootstrap_risk_accepted" type="checkbox"><span><strong>Bootstrap audit-risk exception accepted</strong><span>Use only when ordinary onboarding proceeds before the production Audit Core sink exists.</span></span></label>
</div>
<div class="grid" style="margin-top: 14px;">
<label class="field">
<span class="label">Audit risk owner</span>
<input id="audit_core_risk_owner" type="text" autocomplete="off" title="Owner accountable for the temporary audit-retention risk exception.">
</label>
<label class="field">
<span class="label">Audit risk review date</span>
<input id="audit_core_risk_review_date" type="date" autocomplete="off" title="Date when the temporary audit-retention exception must be reviewed.">
</label>
</div>
<label class="field" style="margin-top: 14px;">
<span class="label">Audit risk note</span>
<input id="audit_core_risk_note" type="text" autocomplete="off" title="Non-secret note describing the accepted limitation and expected production Audit Core follow-up.">
</label>
<div class="choice-list">
<label class="choice"><input id="cleanup_complete" type="checkbox"><span><strong>Cleanup and hardening complete</strong><span>Bootstrap-era credentials, databases, access paths, and tainted materials have been reviewed and handled.</span></span></label>
<label class="choice"><input id="platform_reopened" type="checkbox"><span><strong>Platform reopened under custody</strong><span>The operator accepts that the platform is now running under the approved custody model.</span></span></label>
@@ -3200,6 +3391,11 @@ def ui_html() -> str:
"openbao_oidc_admin_login_verified",
"root_token_disposition",
"restore_drill_passed",
"audit_core_production_sink_ready",
"audit_core_bootstrap_risk_accepted",
"audit_core_risk_owner",
"audit_core_risk_review_date",
"audit_core_risk_note",
"cleanup_complete",
"platform_reopened"
];
@@ -3522,6 +3718,11 @@ def ui_html() -> str:
openbao_oidc_admin_login_verified: document.getElementById("openbao_oidc_admin_login_verified").checked,
root_token_disposition: document.getElementById("root_token_disposition").value,
restore_drill_passed: document.getElementById("restore_drill_passed").checked,
audit_core_production_sink_ready: document.getElementById("audit_core_production_sink_ready").checked,
audit_core_bootstrap_risk_accepted: document.getElementById("audit_core_bootstrap_risk_accepted").checked,
audit_core_risk_owner: document.getElementById("audit_core_risk_owner").value.trim(),
audit_core_risk_review_date: document.getElementById("audit_core_risk_review_date").value.trim(),
audit_core_risk_note: document.getElementById("audit_core_risk_note").value.trim(),
cleanup_complete: document.getElementById("cleanup_complete").checked,
platform_reopened: document.getElementById("platform_reopened").checked,
approval_phrase: document.getElementById("approval_phrase").value,
@@ -3777,6 +3978,22 @@ def build_parser() -> argparse.ArgumentParser:
sub.add_parser("status", help="Show trust stage, gates, and next safe action.")
sub.add_parser("king-kit", help="Print king credential kit checklist.")
sub.add_parser("validate-king-kit", help="Validate non-secret king credential metadata.")
validate_t02 = sub.add_parser("validate-t02", help="Validate NET-WP-0017-T02 evidence and metadata gates.")
validate_t02.add_argument(
"--railiance-path",
default="../railiance-platform",
help="Path to railiance-platform repo.",
)
validate_t02.add_argument(
"--restore-evidence",
default="/tmp/netkingdom-openbao-restore-drill/evidence.json",
help="Path to non-secret restore drill evidence JSON.",
)
validate_t02.add_argument(
"--emergency-evidence",
default="/tmp/netkingdom-openbao-emergency-drill/evidence.json",
help="Path to non-secret emergency seal/unseal drill evidence JSON.",
)
approve = sub.add_parser("approve-custody-mode", help="Approve a live-init-ready custody mode.")
approve.add_argument(
"--mode",
@@ -3831,7 +4048,7 @@ def build_parser() -> argparse.ArgumentParser:
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
metadata_commands = {"status", "validate-king-kit", "approve-custody-mode", "web-ui"}
metadata_commands = {"status", "validate-king-kit", "validate-t02", "approve-custody-mode", "web-ui"}
if args.command in metadata_commands and args.metadata is None:
args.metadata = DEFAULT_METADATA_PATH
data = load_metadata(args.metadata)
@@ -3844,6 +4061,8 @@ def main(argv: list[str] | None = None) -> int:
return 0
if args.command == "validate-king-kit":
return print_validate_king_kit(data)
if args.command == "validate-t02":
return print_validate_t02(args, data)
if args.command == "approve-custody-mode":
return print_approve_custody_mode(args, data)
if args.command == "custody-packet":