Add credential CCR operator handoff

This commit is contained in:
2026-06-28 00:21:02 +02:00
parent a27a114491
commit 248bc58b6a
4 changed files with 104 additions and 7 deletions

View File

@@ -213,6 +213,9 @@ credential-change-sync-decision: ## Sync resolved State Hub decision back into a
credential-change-apply-plan: ## Render approved-only operator apply plan
scripts/credential-change.py apply-plan $(CREDENTIAL_CHANGE)
credential-change-operator-commands: ## Render approved-only non-secret OpenBao operator commands
scripts/credential-change.py operator-commands $(CREDENTIAL_CHANGE)
openbao-token-grants-dry-run: ## Dry-run OpenBao token roles and issuer policies for credential grants
scripts/openbao-apply-token-grants.py --dry-run $(OPENBAO_TOKEN_GRANT_ARGS)

View File

@@ -155,10 +155,14 @@ scripts/credential-change.py deny CCR-2026-0001 --reviewer <name> --comment "...
scripts/credential-change.py needs-changes CCR-2026-0001 --reviewer <name> --comment "..."
make credential-change-sync-decision CREDENTIAL_CHANGE=CCR-2026-0001
make credential-change-apply-plan CREDENTIAL_CHANGE=CCR-2026-0001
make credential-change-operator-commands CREDENTIAL_CHANGE=CCR-2026-0001
```
`apply-plan` is intentionally guarded: it refuses anything not approved and
refuses unconfirmed auth bindings.
`apply-plan` and `operator-commands` are intentionally guarded: they refuse
anything not approved and refuse unconfirmed auth bindings. `operator-commands`
renders the reviewed non-secret `bao policy write` and `bao write auth/.../role`
commands for a platform operator; the actual secret value is still provisioned
through approved OpenBao/operator custody only.
The same operations can be exposed through chat by having the agent create the
proposal, show the rendered summary, then call the CLI only after the human

View File

@@ -5,6 +5,7 @@ import argparse
import json
import os
import re
import shlex
import sys
import urllib.error
import urllib.request
@@ -374,6 +375,46 @@ def render_plan(ccr: dict[str, Any]) -> str:
return "\n".join(lines)
def shell_kv_arg(key: str, value: Any) -> str:
if isinstance(value, (dict, list)):
encoded = json.dumps(value, sort_keys=True, separators=(",", ":"))
elif isinstance(value, bool):
encoded = "true" if value else "false"
elif value is None:
encoded = ""
else:
encoded = str(value)
return shlex.quote(f"{key}={encoded}")
def render_operator_commands(ccr: dict[str, Any]) -> str:
openbao = ccr["openbao"]
auth = openbao["auth"]
auth_path = f"auth/{auth['mount']}/role/{auth['role']}"
payload = auth_payload(ccr)
role_args = " ".join(
shell_kv_arg(key, payload[key]) for key in sorted(payload)
)
secret_args = " ".join(
shlex.quote(f"{field}=<enter-through-approved-custody>")
for field in openbao["fields"]
)
lines = [
f"# Operator handoff for {ccr['id']}: {ccr['title']}",
"# Run from the railiance-platform repo with an approved OpenBao operator token.",
"set -euo pipefail",
f"bao policy write {shlex.quote(openbao['policy_name'])} {shlex.quote(openbao['policy_file'])}",
f"bao write {shlex.quote(auth_path)} {role_args}",
"",
"# Secret provisioning remains under approved OpenBao/operator custody.",
"# Do not paste secret values into Git, State Hub, workplans, logs, or chat.",
f"# bao kv put {shlex.quote(openbao['kv_path'])} {secret_args}",
"",
"# After provisioning, run positive and negative verification without printing secret values.",
]
return "\n".join(lines)
def validate_or_exit(path: Path) -> tuple[dict[str, Any], list[str]]:
ccr, errors, warnings = validate_ccr(path)
for warning in warnings:
@@ -645,18 +686,30 @@ def command_status(args: argparse.Namespace) -> int:
return 0
def require_apply_ready(ccr: dict[str, Any], command_name: str) -> None:
if ccr.get("status") not in APPLY_ALLOWED_STATUSES:
fail(f"{command_name} requires status approved, got {ccr.get('status')}")
auth = ccr["openbao"]["auth"]
if auth.get("bound_claims_confirmed") is not True:
fail(f"{command_name} requires openbao.auth.bound_claims_confirmed=true")
def command_apply_plan(args: argparse.Namespace) -> int:
path = resolve_ccr(args.ref)
ccr, _warnings = validate_or_exit(path)
if ccr.get("status") not in APPLY_ALLOWED_STATUSES:
fail(f"apply-plan requires status approved, got {ccr.get('status')}")
auth = ccr["openbao"]["auth"]
if auth.get("bound_claims_confirmed") is not True:
fail("apply-plan requires openbao.auth.bound_claims_confirmed=true")
require_apply_ready(ccr, "apply-plan")
print(render_plan(ccr))
return 0
def command_operator_commands(args: argparse.Namespace) -> int:
path = resolve_ccr(args.ref)
ccr, _warnings = validate_or_exit(path)
require_apply_ready(ccr, "operator-commands")
print(render_operator_commands(ccr))
return 0
def command_decision(args: argparse.Namespace, status: str) -> int:
path = resolve_ccr(args.ref)
append_decision(path, status, args.reviewer, args.comment)
@@ -713,6 +766,13 @@ def build_parser() -> argparse.ArgumentParser:
apply_plan.add_argument("ref")
apply_plan.set_defaults(func=command_apply_plan)
operator_commands = sub.add_parser(
"operator-commands",
help="Render reviewed non-secret OpenBao commands for an approved CCR",
)
operator_commands.add_argument("ref")
operator_commands.set_defaults(func=command_operator_commands)
for name, status in (
("approve", "approved"),
("deny", "denied"),

View File

@@ -126,6 +126,36 @@ class CredentialChangeTests(unittest.TestCase):
with self.assertRaises(SystemExit):
credential_change.command_apply_plan(type("Args", (), {"ref": str(self.issue_core)})())
def test_operator_commands_render_non_secret_policy_and_role_handoff(self) -> None:
ccr, errors, warnings = credential_change.validate_ccr(self.sample)
self.assertEqual(errors, [])
self.assertEqual(warnings, [])
rendered = credential_change.render_operator_commands(ccr)
self.assertIn(
"bao policy write workload-kv-read-whynot-design-npm-publish",
rendered,
)
self.assertIn(
"bao write auth/netkingdom/role/whynot-design-workload-kv-read",
rendered,
)
self.assertIn(
'bound_claims={"groups":["whynot-design"]}',
rendered,
)
self.assertIn(
"# bao kv put platform/workloads/whynot-design/whynot-design/npm-publish",
rendered,
)
self.assertIn("NPM_AUTH_TOKEN=<enter-through-approved-custody>", rendered)
self.assertNotIn("npm_", rendered)
def test_operator_commands_refuse_unapproved_ccr(self) -> None:
with self.assertRaises(SystemExit):
credential_change.command_operator_commands(
type("Args", (), {"ref": str(self.issue_core)})()
)
def test_approve_records_comment_but_unconfirmed_claim_still_blocks_apply(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)