673 lines
31 KiB
Python
673 lines
31 KiB
Python
from __future__ import annotations
|
|
|
|
import importlib.util
|
|
import os
|
|
import shutil
|
|
import sys
|
|
import tempfile
|
|
import unittest
|
|
from pathlib import Path
|
|
|
|
REPO_DIR = Path(__file__).resolve().parents[1]
|
|
SPEC = importlib.util.spec_from_file_location(
|
|
"credential_change", REPO_DIR / "scripts/credential-change.py"
|
|
)
|
|
credential_change = importlib.util.module_from_spec(SPEC)
|
|
assert SPEC.loader is not None
|
|
sys.modules[SPEC.name] = credential_change
|
|
SPEC.loader.exec_module(credential_change)
|
|
|
|
|
|
class CredentialChangeTests(unittest.TestCase):
|
|
def setUp(self) -> None:
|
|
self.sample = (
|
|
REPO_DIR
|
|
/ "credential-change-requests/CCR-2026-0001-whynot-design-npm-publish.yaml"
|
|
)
|
|
self.issue_core = (
|
|
REPO_DIR
|
|
/ "credential-change-requests/CCR-2026-0002-issue-core-ingestion-api-key.yaml"
|
|
)
|
|
|
|
def test_sample_ccr_validates_without_bound_claim_warning(self) -> None:
|
|
ccr, errors, warnings = credential_change.validate_ccr(self.sample)
|
|
self.assertEqual(errors, [])
|
|
self.assertEqual(warnings, [])
|
|
self.assertTrue(ccr["openbao"]["auth"]["bound_claims_confirmed"])
|
|
|
|
def test_issue_core_ccr_has_confirmed_eso_binding(self) -> None:
|
|
ccr, errors, warnings = credential_change.validate_ccr(self.issue_core)
|
|
self.assertEqual(errors, [])
|
|
self.assertEqual(warnings, [])
|
|
self.assertEqual(ccr["openbao"]["auth"]["role"], "external-secrets-issue-core")
|
|
|
|
def test_all_repo_ccrs_validate(self) -> None:
|
|
for path in sorted((REPO_DIR / "credential-change-requests").glob("*.yaml")):
|
|
with self.subTest(path=path.name):
|
|
_ccr, errors, _warnings = credential_change.validate_ccr(path)
|
|
self.assertEqual(errors, [])
|
|
|
|
def test_render_summary_contains_review_fields(self) -> None:
|
|
ccr, _errors, warnings = credential_change.validate_ccr(self.sample)
|
|
rendered = credential_change.render_summary(ccr, warnings)
|
|
self.assertIn("whynot-design npm publish token lane", rendered)
|
|
self.assertIn("platform/workloads/coulomb/whynot-design/npm-publish", rendered)
|
|
self.assertIn("whynot-design-npm-publish", rendered)
|
|
self.assertIn("readiness: ready resolvable=True", rendered)
|
|
self.assertIn("approve | deny | needs_changes", rendered)
|
|
|
|
def test_status_payload_marks_active_ready_resolvable(self) -> None:
|
|
ccr, _errors, warnings = credential_change.validate_ccr(self.sample)
|
|
payload = credential_change.status_payload(ccr, warnings)
|
|
self.assertFalse(payload["apply_allowed"])
|
|
self.assertTrue(payload["apply_complete"])
|
|
self.assertTrue(payload["frontdoor_resolvable"])
|
|
self.assertEqual(payload["status"], "active")
|
|
self.assertEqual(payload["access_frontdoor"]["readiness"], "ready")
|
|
self.assertEqual(payload["access_frontdoor"]["catalog_id"], "whynot-design-npm-publish")
|
|
self.assertEqual(payload["apply_blockers"], [])
|
|
self.assertEqual(payload["frontdoor_blockers"], [])
|
|
self.assertEqual(payload["warnings"], [])
|
|
self.assertEqual(
|
|
payload["state_hub"]["decision_id"],
|
|
"e6381a56-6b04-4fd5-b2de-f3ef59cde888",
|
|
)
|
|
|
|
def test_state_hub_rationale_prefix_maps_to_ccr_status(self) -> None:
|
|
cases = {
|
|
"APPROVE: scoped path and binding are correct": "approved",
|
|
"DENY: wrong tenant": "denied",
|
|
"NEEDS_CHANGES: use a read-only token": "needs_changes",
|
|
"request changes: clarify service account": "needs_changes",
|
|
}
|
|
for rationale, expected in cases.items():
|
|
with self.subTest(rationale=rationale):
|
|
self.assertEqual(
|
|
credential_change.ccr_status_from_state_hub_rationale(rationale),
|
|
expected,
|
|
)
|
|
with self.assertRaises(SystemExit):
|
|
credential_change.ccr_status_from_state_hub_rationale("looks good")
|
|
|
|
def test_sync_state_hub_decision_updates_ccr_status(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
copied = Path(tmp) / self.sample.name
|
|
shutil.copy2(self.sample, copied)
|
|
copied_ccr = credential_change.load_yaml(copied)
|
|
copied_ccr["status"] = "proposed"
|
|
copied_ccr["access_frontdoor"]["readiness"] = "template"
|
|
copied_ccr["access_frontdoor"]["resolvable"] = False
|
|
copied_ccr["access_frontdoor"]["activation"] = "pending-review"
|
|
copied_ccr.setdefault("state_hub", {})[
|
|
"decision_id"
|
|
] = "250669d0-8475-4527-9624-cd072249f9a9"
|
|
credential_change.dump_yaml(copied, copied_ccr)
|
|
original = credential_change.state_hub_decision_status
|
|
try:
|
|
credential_change.state_hub_decision_status = lambda _ccr, _url: {
|
|
"id": "250669d0-8475-4527-9624-cd072249f9a9",
|
|
"status": "resolved",
|
|
"rationale": "APPROVE: scoped path and confirmed binding are acceptable",
|
|
"decided_by": "unit-test",
|
|
"decided_at": "2026-06-27T22:00:00Z",
|
|
}
|
|
credential_change.sync_state_hub_decision(copied, "http://state-hub.test")
|
|
finally:
|
|
credential_change.state_hub_decision_status = original
|
|
ccr, errors, warnings = credential_change.validate_ccr(copied)
|
|
self.assertEqual(errors, [])
|
|
self.assertEqual(warnings, [])
|
|
self.assertEqual(ccr["status"], "approved")
|
|
self.assertEqual(ccr["review"]["comments"][-1]["reviewer"], "unit-test")
|
|
self.assertIn("State Hub decision", ccr["review"]["comments"][-1]["comment"])
|
|
self.assertEqual(ccr["state_hub"]["decision_resolved_at"], "2026-06-27T22:00:00Z")
|
|
|
|
def test_kubernetes_auth_payload_uses_service_account_bounds(self) -> None:
|
|
ccr, errors, _warnings = credential_change.validate_ccr(self.issue_core)
|
|
self.assertEqual(errors, [])
|
|
payload = credential_change.auth_payload(ccr)
|
|
self.assertEqual(payload["bound_service_account_names"], ["external-secrets"])
|
|
self.assertEqual(payload["bound_service_account_namespaces"], ["external-secrets"])
|
|
self.assertNotIn("bound_claims", payload)
|
|
|
|
def test_oidc_auth_payload_includes_redirect_uris(self) -> None:
|
|
ccr, errors, _warnings = credential_change.validate_ccr(self.sample)
|
|
self.assertEqual(errors, [])
|
|
payload = credential_change.auth_payload(ccr)
|
|
self.assertEqual(
|
|
payload["allowed_redirect_uris"],
|
|
[
|
|
"https://bao.coulomb.social/ui/vault/auth/netkingdom/oidc/callback",
|
|
"http://localhost:8250/oidc/callback",
|
|
"http://127.0.0.1:8250/oidc/callback",
|
|
],
|
|
)
|
|
self.assertEqual(
|
|
payload["oidc_scopes"],
|
|
["openid", "profile", "email", "groups"],
|
|
)
|
|
|
|
def test_apply_plan_refuses_unapproved_ccr(self) -> None:
|
|
with self.assertRaises(SystemExit):
|
|
credential_change.command_apply_plan(type("Args", (), {"ref": str(self.issue_core)})())
|
|
|
|
def test_plan_includes_source_artifact_diff_status(self) -> None:
|
|
ccr, errors, _warnings = credential_change.validate_ccr(self.sample)
|
|
self.assertEqual(errors, [])
|
|
rendered = credential_change.render_plan(ccr)
|
|
self.assertIn("Source artifact diff:", rendered)
|
|
self.assertIn("artifact status: matches", rendered)
|
|
|
|
def test_decision_templates_prefill_review_context(self) -> None:
|
|
ccr, errors, _warnings = credential_change.validate_ccr(self.sample)
|
|
self.assertEqual(errors, [])
|
|
rendered = credential_change.render_decision_templates(ccr)
|
|
self.assertIn("APPROVE: CCR-2026-0001", rendered)
|
|
self.assertIn("DENY: CCR-2026-0001", rendered)
|
|
self.assertIn("NEEDS_CHANGES: CCR-2026-0001", rendered)
|
|
self.assertIn("platform/workloads/coulomb/whynot-design/npm-publish", rendered)
|
|
self.assertIn("workload-kv-read-whynot-design-npm-publish", rendered)
|
|
self.assertIn("auth/netkingdom/role/whynot-design-workload-kv-read", rendered)
|
|
|
|
def test_invalid_state_hub_rationale_shows_templates(self) -> None:
|
|
ccr, errors, _warnings = credential_change.validate_ccr(self.sample)
|
|
self.assertEqual(errors, [])
|
|
with self.assertRaises(SystemExit) as raised:
|
|
credential_change.ccr_status_from_state_hub_rationale("looks good", ccr)
|
|
self.assertIn("APPROVE: CCR-2026-0001", str(raised.exception))
|
|
self.assertIn("NEEDS_CHANGES: CCR-2026-0001", str(raised.exception))
|
|
|
|
def test_decision_command_can_record_state_hub_event(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
copied = Path(tmp) / self.issue_core.name
|
|
shutil.copy2(self.issue_core, copied)
|
|
events = []
|
|
original = credential_change.state_hub_post_json
|
|
try:
|
|
credential_change.state_hub_post_json = (
|
|
lambda _base_url, _path, payload: events.append(payload) or {"id": "event-1"}
|
|
)
|
|
exit_code = credential_change.command_decision(
|
|
type(
|
|
"Args",
|
|
(),
|
|
{
|
|
"ref": str(copied),
|
|
"reviewer": "unit-test",
|
|
"comment": "scoped metadata looks correct",
|
|
"record_state_hub": True,
|
|
"state_hub_url": "http://state-hub.test",
|
|
},
|
|
)(),
|
|
"approved",
|
|
)
|
|
finally:
|
|
credential_change.state_hub_post_json = original
|
|
self.assertEqual(exit_code, 0)
|
|
self.assertEqual(events[0]["event_type"], "credential_change_decision")
|
|
self.assertIn("CCR-2026-0002", events[0]["summary"])
|
|
self.assertIn("ISSUE_CORE_API_KEY", events[0]["summary"])
|
|
|
|
def test_operator_commands_render_non_secret_policy_and_role_handoff(self) -> None:
|
|
ccr, errors, warnings = credential_change.validate_ccr(self.sample)
|
|
self.assertEqual(errors, [])
|
|
self.assertEqual(warnings, [])
|
|
rendered = credential_change.render_operator_commands(ccr)
|
|
self.assertIn(
|
|
"bao policy write workload-kv-read-whynot-design-npm-publish",
|
|
rendered,
|
|
)
|
|
self.assertIn(
|
|
"bao write auth/netkingdom/role/whynot-design-workload-kv-read",
|
|
rendered,
|
|
)
|
|
self.assertIn("# Do not paste this shell block into the OpenBao Browser CLI.", rendered)
|
|
self.assertIn(
|
|
"# Web UI API Explorer path for the role JSON body: /v1/auth/netkingdom/role/whynot-design-workload-kv-read",
|
|
rendered,
|
|
)
|
|
self.assertIn('role_payload_file="$(mktemp)"', rendered)
|
|
self.assertIn('"bound_claims": {', rendered)
|
|
self.assertIn('"allowed_redirect_uris": [', rendered)
|
|
self.assertIn('"oidc_scopes": [', rendered)
|
|
self.assertIn('"groups"', rendered)
|
|
self.assertIn(
|
|
'"https://bao.coulomb.social/ui/vault/auth/netkingdom/oidc/callback"',
|
|
rendered,
|
|
)
|
|
self.assertIn(
|
|
'bao write auth/netkingdom/role/whynot-design-workload-kv-read @"$role_payload_file"',
|
|
rendered,
|
|
)
|
|
self.assertIn(
|
|
"# bao kv put platform/workloads/coulomb/whynot-design/npm-publish",
|
|
rendered,
|
|
)
|
|
self.assertIn("NPM_AUTH_TOKEN=<enter-through-approved-custody>", rendered)
|
|
self.assertNotIn("npm_", rendered)
|
|
|
|
def test_operator_commands_refuse_unapproved_ccr(self) -> None:
|
|
with self.assertRaises(SystemExit):
|
|
credential_change.command_operator_commands(
|
|
type("Args", (), {"ref": str(self.issue_core)})()
|
|
)
|
|
|
|
def test_approve_records_comment_but_unconfirmed_claim_still_blocks_apply(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
ccr_dir = tmp_path / "ccrs"
|
|
ccr_dir.mkdir()
|
|
copied = ccr_dir / self.issue_core.name
|
|
shutil.copy2(self.issue_core, copied)
|
|
old_ccr_dir = os.environ.get("CCR_DIR")
|
|
os.environ["CCR_DIR"] = str(ccr_dir)
|
|
try:
|
|
credential_change.append_decision(
|
|
copied, "approved", "unit-test", "looks right"
|
|
)
|
|
copied_data = credential_change.load_yaml(copied)
|
|
copied_data["openbao"]["auth"]["bound_claims_confirmed"] = False
|
|
credential_change.dump_yaml(copied, copied_data)
|
|
ccr, errors, _warnings = credential_change.validate_ccr(copied)
|
|
self.assertEqual(errors, [])
|
|
self.assertEqual(ccr["status"], "approved")
|
|
self.assertEqual(ccr["review"]["comments"][-1]["comment"], "looks right")
|
|
with self.assertRaises(SystemExit):
|
|
credential_change.command_apply_plan(
|
|
type("Args", (), {"ref": "CCR-2026-0002"})()
|
|
)
|
|
finally:
|
|
if old_ccr_dir is None:
|
|
os.environ.pop("CCR_DIR", None)
|
|
else:
|
|
os.environ["CCR_DIR"] = old_ccr_dir
|
|
|
|
def test_confirm_binding_records_comment_and_clears_warning(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
copied = Path(tmp) / self.issue_core.name
|
|
shutil.copy2(self.issue_core, copied)
|
|
credential_change.confirm_binding(
|
|
copied, "unit-test", "service account binding confirmed"
|
|
)
|
|
ccr, errors, warnings = credential_change.validate_ccr(copied)
|
|
self.assertEqual(errors, [])
|
|
self.assertEqual(warnings, [])
|
|
self.assertTrue(ccr["openbao"]["auth"]["bound_claims_confirmed"])
|
|
self.assertEqual(ccr["review"]["comments"][-1]["decision"], "binding_confirmed")
|
|
|
|
def test_generated_policy_is_narrow(self) -> None:
|
|
ccr, _errors, _warnings = credential_change.validate_ccr(self.sample)
|
|
policy = credential_change.generated_policy_hcl(ccr)
|
|
self.assertIn('path "platform/data/workloads/coulomb/whynot-design/npm-publish"', policy)
|
|
self.assertNotIn("*", policy)
|
|
self.assertNotIn("delete", policy)
|
|
|
|
def test_applier_dry_run_succeeds_for_active_ccr(self) -> None:
|
|
ccr, errors, warnings = credential_change.validate_ccr(self.sample)
|
|
self.assertEqual(errors, [])
|
|
self.assertEqual(warnings, [])
|
|
self.assertEqual(credential_change.applier_readiness_blockers(ccr), [])
|
|
payload = credential_change.applier_dry_run_payload(ccr, warnings)
|
|
self.assertEqual(payload["source_artifacts"]["policy"]["status"], "matches")
|
|
self.assertEqual(
|
|
payload["mutations"][0]["openbao_path"],
|
|
"sys/policies/acl/workload-kv-read-whynot-design-npm-publish",
|
|
)
|
|
self.assertEqual(
|
|
payload["mutations"][1]["openbao_path"],
|
|
"auth/netkingdom/role/whynot-design-workload-kv-read",
|
|
)
|
|
rendered = credential_change.render_applier_dry_run(payload)
|
|
self.assertIn("Allowed metadata mutations", rendered)
|
|
self.assertIn("secret value writes", rendered)
|
|
self.assertNotIn("<enter-through-approved-custody>", rendered)
|
|
|
|
def test_applier_dry_run_refuses_unapproved_ccr(self) -> None:
|
|
exit_code = credential_change.command_applier_dry_run(
|
|
type("Args", (), {"ref": str(self.issue_core), "json": False})()
|
|
)
|
|
self.assertEqual(exit_code, 1)
|
|
|
|
def test_applier_dry_run_rejects_out_of_policy_policy_name(self) -> None:
|
|
ccr, errors, _warnings = credential_change.validate_ccr(self.sample)
|
|
self.assertEqual(errors, [])
|
|
ccr["status"] = "approved"
|
|
ccr["openbao"]["policy_name"] = "platform-admin"
|
|
ccr["openbao"]["auth"]["policies"] = ["platform-admin"]
|
|
blockers = credential_change.applier_readiness_blockers(ccr)
|
|
self.assertTrue(
|
|
any("disallowed" in blocker for blocker in blockers),
|
|
blockers,
|
|
)
|
|
|
|
def test_applier_dry_run_rejects_out_of_policy_auth_role(self) -> None:
|
|
ccr, errors, _warnings = credential_change.validate_ccr(self.sample)
|
|
self.assertEqual(errors, [])
|
|
ccr["status"] = "approved"
|
|
ccr["openbao"]["auth"]["role"] = "platform-admin"
|
|
blockers = credential_change.applier_readiness_blockers(ccr)
|
|
self.assertTrue(
|
|
any("auth.role is disallowed" in blocker for blocker in blockers),
|
|
blockers,
|
|
)
|
|
|
|
def test_applier_dry_run_rejects_out_of_scope_mount_and_path(self) -> None:
|
|
ccr, errors, _warnings = credential_change.validate_ccr(self.sample)
|
|
self.assertEqual(errors, [])
|
|
ccr["status"] = "approved"
|
|
ccr["openbao"]["mount"] = "secret"
|
|
ccr["openbao"]["kv_path"] = "secret/platform-admin"
|
|
blockers = credential_change.applier_readiness_blockers(ccr)
|
|
self.assertIn("openbao.mount must be platform, got secret", blockers)
|
|
self.assertIn("openbao.kv_path must stay under platform/workloads/", blockers)
|
|
|
|
def test_nonprod_applier_policy_remains_metadata_only(self) -> None:
|
|
policy = (
|
|
REPO_DIR / "openbao/policies/credential-change-nonprod-applier.hcl"
|
|
).read_text(encoding="utf-8")
|
|
self.assertIn('path "sys/policies/acl/workload-kv-read-*"', policy)
|
|
self.assertIn('path "auth/kubernetes/role/*"', policy)
|
|
self.assertNotIn('path "platform/data/', policy)
|
|
self.assertNotIn('path "platform/metadata/', policy)
|
|
|
|
def test_applier_apply_plan_renders_confirmation(self) -> None:
|
|
ccr, errors, warnings = credential_change.validate_ccr(self.sample)
|
|
self.assertEqual(errors, [])
|
|
rendered = credential_change.render_applier_apply_plan(ccr, warnings)
|
|
self.assertIn("DELEGATED APPLY CCR-2026-0001", rendered)
|
|
self.assertIn("applier-apply CCR-2026-0001", rendered)
|
|
self.assertIn("secret value writes", rendered)
|
|
|
|
def test_applier_apply_refuses_unapproved_ccr(self) -> None:
|
|
exit_code = credential_change.command_applier_apply(
|
|
type(
|
|
"Args",
|
|
(),
|
|
{
|
|
"ref": str(self.issue_core),
|
|
"actor": "unit-test",
|
|
"confirm": None,
|
|
"bao_bin": "bao",
|
|
"plan_only": False,
|
|
"json": False,
|
|
"quiet": True,
|
|
"record_state_hub": False,
|
|
"state_hub_url": "http://state-hub.test",
|
|
},
|
|
)()
|
|
)
|
|
self.assertEqual(exit_code, 1)
|
|
|
|
def test_applier_apply_records_metadata_evidence(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
copied = Path(tmp) / self.sample.name
|
|
shutil.copy2(self.sample, copied)
|
|
ccr = credential_change.load_yaml(copied)
|
|
ccr["status"] = "approved"
|
|
ccr["access_frontdoor"]["readiness"] = "approved-pending-apply"
|
|
ccr["access_frontdoor"]["resolvable"] = False
|
|
credential_change.dump_yaml(copied, ccr)
|
|
calls = []
|
|
events = []
|
|
original_apply = credential_change.run_bao_metadata_apply
|
|
original_post = credential_change.state_hub_post_json
|
|
try:
|
|
credential_change.run_bao_metadata_apply = lambda ccr, bao_bin: calls.append((ccr["id"], bao_bin))
|
|
credential_change.state_hub_post_json = (
|
|
lambda _base_url, _path, payload: events.append(payload) or {"id": "event-1"}
|
|
)
|
|
exit_code = credential_change.command_applier_apply(
|
|
type(
|
|
"Args",
|
|
(),
|
|
{
|
|
"ref": str(copied),
|
|
"actor": "unit-test",
|
|
"confirm": "DELEGATED APPLY CCR-2026-0001",
|
|
"bao_bin": "bao-test",
|
|
"plan_only": False,
|
|
"json": False,
|
|
"quiet": True,
|
|
"record_state_hub": True,
|
|
"state_hub_url": "http://state-hub.test",
|
|
},
|
|
)()
|
|
)
|
|
finally:
|
|
credential_change.run_bao_metadata_apply = original_apply
|
|
credential_change.state_hub_post_json = original_post
|
|
self.assertEqual(exit_code, 0)
|
|
self.assertEqual(calls, [("CCR-2026-0001", "bao-test")])
|
|
updated = credential_change.load_yaml(copied)
|
|
self.assertEqual(updated["status"], "applied")
|
|
self.assertEqual(updated["verification"]["evidence"][-1]["kind"], "delegated_metadata_apply")
|
|
self.assertEqual(events[0]["event_type"], "credential_change_evidence")
|
|
self.assertIn("delegated_metadata_apply", events[0]["summary"])
|
|
|
|
def test_applier_apply_requires_exact_confirmation(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
copied = Path(tmp) / self.sample.name
|
|
shutil.copy2(self.sample, copied)
|
|
ccr = credential_change.load_yaml(copied)
|
|
ccr["status"] = "approved"
|
|
credential_change.dump_yaml(copied, ccr)
|
|
with self.assertRaises(SystemExit):
|
|
credential_change.command_applier_apply(
|
|
type(
|
|
"Args",
|
|
(),
|
|
{
|
|
"ref": str(copied),
|
|
"actor": "unit-test",
|
|
"confirm": "apply it",
|
|
"bao_bin": "bao",
|
|
"plan_only": False,
|
|
"json": False,
|
|
"quiet": True,
|
|
"record_state_hub": False,
|
|
"state_hub_url": "http://state-hub.test",
|
|
},
|
|
)()
|
|
)
|
|
|
|
def test_runbook_renders_apply_verify_guidance(self) -> None:
|
|
ccr, errors, warnings = credential_change.validate_ccr(self.sample)
|
|
self.assertEqual(errors, [])
|
|
payload = credential_change.runbook_payload(ccr, warnings)
|
|
rendered = credential_change.render_runbook(payload)
|
|
self.assertIn("APPLY CCR-2026-0001", rendered)
|
|
self.assertIn("runbook <CCR> --execute-metadata", rendered)
|
|
self.assertIn("record-evidence <CCR>", rendered)
|
|
self.assertIn("Field presence checked without printing values", rendered)
|
|
self.assertNotIn("npm_", rendered)
|
|
|
|
def test_runbook_refuses_unapproved_ccr(self) -> None:
|
|
exit_code = credential_change.command_runbook(
|
|
type(
|
|
"Args",
|
|
(),
|
|
{
|
|
"ref": str(self.issue_core),
|
|
"json": False,
|
|
"execute_metadata": False,
|
|
"actor": "unit-test",
|
|
"confirm": None,
|
|
"bao_bin": "bao",
|
|
"record_state_hub": False,
|
|
"state_hub_url": "http://state-hub.test",
|
|
},
|
|
)()
|
|
)
|
|
self.assertEqual(exit_code, 1)
|
|
|
|
def test_record_evidence_appends_non_secret_entry_and_status(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
copied = Path(tmp) / self.sample.name
|
|
shutil.copy2(self.sample, copied)
|
|
ccr = credential_change.load_yaml(copied)
|
|
ccr["status"] = "approved"
|
|
ccr["access_frontdoor"]["readiness"] = "approved-pending-apply"
|
|
ccr["access_frontdoor"]["resolvable"] = False
|
|
credential_change.dump_yaml(copied, ccr)
|
|
updated = credential_change.append_evidence(
|
|
copied,
|
|
"unit-test",
|
|
"metadata_apply",
|
|
"passed",
|
|
["OpenBao audit timestamp recorded without secret values"],
|
|
set_status="applied",
|
|
)
|
|
self.assertEqual(updated["status"], "applied")
|
|
self.assertEqual(updated["verification"]["evidence"][-1]["kind"], "metadata_apply")
|
|
self.assertEqual(
|
|
updated["verification"]["evidence"][-1]["details"],
|
|
["OpenBao audit timestamp recorded without secret values"],
|
|
)
|
|
|
|
def test_record_evidence_can_mark_frontdoor_ready(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
copied = Path(tmp) / self.sample.name
|
|
shutil.copy2(self.sample, copied)
|
|
updated = credential_change.append_evidence(
|
|
copied,
|
|
"unit-test",
|
|
"frontdoor_activation",
|
|
"passed",
|
|
["Catalog readiness checked without secret values"],
|
|
set_status="active",
|
|
frontdoor_ready=True,
|
|
)
|
|
self.assertEqual(updated["status"], "active")
|
|
self.assertEqual(updated["access_frontdoor"]["readiness"], "ready")
|
|
self.assertTrue(updated["access_frontdoor"]["resolvable"])
|
|
|
|
def test_record_evidence_rejects_secret_markers(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
copied = Path(tmp) / self.sample.name
|
|
shutil.copy2(self.sample, copied)
|
|
with self.assertRaises(SystemExit):
|
|
credential_change.append_evidence(
|
|
copied,
|
|
"unit-test",
|
|
"positive_verification",
|
|
"passed",
|
|
["accidentally pasted sk-test"],
|
|
)
|
|
|
|
def test_lifecycle_plan_renders_deactivation_steps(self) -> None:
|
|
ccr, errors, _warnings = credential_change.validate_ccr(self.sample)
|
|
self.assertEqual(errors, [])
|
|
payload = credential_change.lifecycle_payload(ccr, "deactivate")
|
|
rendered = credential_change.render_lifecycle_plan(payload)
|
|
self.assertIn("lifecycle plan: deactivate", rendered)
|
|
self.assertIn("readiness=disabled resolvable=False", rendered)
|
|
self.assertIn("bao delete auth/netkingdom/role/whynot-design-workload-kv-read", rendered)
|
|
self.assertIn("bao policy delete workload-kv-read-whynot-design-npm-publish", rendered)
|
|
self.assertNotIn("NPM_AUTH_TOKEN=", rendered)
|
|
|
|
def test_lifecycle_event_marks_deactivated_and_disables_frontdoor(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
copied = Path(tmp) / self.sample.name
|
|
shutil.copy2(self.sample, copied)
|
|
updated = credential_change.append_lifecycle_event(
|
|
copied,
|
|
"unit-test",
|
|
"deactivate",
|
|
"No longer needed",
|
|
["Front door disabled in catalog"],
|
|
)
|
|
self.assertEqual(updated["status"], "deactivated")
|
|
self.assertEqual(updated["access_frontdoor"]["readiness"], "disabled")
|
|
self.assertFalse(updated["access_frontdoor"]["resolvable"])
|
|
self.assertEqual(updated["lifecycle"]["events"][-1]["action"], "deactivate")
|
|
|
|
def test_lifecycle_event_records_compromise_blast_radius_and_follow_up(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
copied = Path(tmp) / self.sample.name
|
|
shutil.copy2(self.sample, copied)
|
|
updated = credential_change.append_lifecycle_event(
|
|
copied,
|
|
"unit-test",
|
|
"compromise",
|
|
"Unexpected exposure signal",
|
|
["Access disabled before rotation"],
|
|
blast_radius=["npm publishing lane only"],
|
|
follow_up=["incident-task-1"],
|
|
)
|
|
event = updated["lifecycle"]["events"][-1]
|
|
self.assertEqual(updated["status"], "compromised")
|
|
self.assertEqual(updated["access_frontdoor"]["readiness"], "compromised")
|
|
self.assertEqual(event["blast_radius"], ["npm publishing lane only"])
|
|
self.assertEqual(event["follow_up"], ["incident-task-1"])
|
|
|
|
def test_lifecycle_event_rejects_secret_markers(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
copied = Path(tmp) / self.sample.name
|
|
shutil.copy2(self.sample, copied)
|
|
with self.assertRaises(SystemExit):
|
|
credential_change.append_lifecycle_event(
|
|
copied,
|
|
"unit-test",
|
|
"rotate",
|
|
"accidentally pasted ghp_bad",
|
|
["rotation needed"],
|
|
)
|
|
|
|
def test_import_inventory_writes_non_secret_ccr_and_policy(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
output_dir = tmp_path / "ccrs"
|
|
policy_file = tmp_path / "policies" / "workload-kv-read-imported-lane.hcl"
|
|
args = type(
|
|
"Args",
|
|
(),
|
|
{
|
|
"id": "CCR-2099-0001",
|
|
"title": "imported lane",
|
|
"tenant": "coulomb",
|
|
"workload": "imported",
|
|
"environment": "production",
|
|
"purpose": "runtime token",
|
|
"mount": "platform",
|
|
"kv_path": "platform/workloads/coulomb/imported/runtime-token",
|
|
"field": ["RUNTIME_TOKEN"],
|
|
"policy_name": "workload-kv-read-imported-lane",
|
|
"policy_file": str(policy_file),
|
|
"auth_method": "oidc",
|
|
"auth_mount": "netkingdom",
|
|
"auth_role": "imported-workload-kv-read",
|
|
"bound_claim": ["groups=imported"],
|
|
"service_account": None,
|
|
"service_account_namespace": None,
|
|
"bound_claims_confirmed": True,
|
|
"ttl": "15m",
|
|
"frontdoor_type": "ops-warden",
|
|
"catalog_id": "imported-runtime-token",
|
|
"selector": None,
|
|
"command": None,
|
|
"status": "active",
|
|
"readiness": "ready",
|
|
"resolvable": True,
|
|
"risk": "high",
|
|
"positive_check": "Authorized caller can fetch RUNTIME_TOKEN with output suppressed.",
|
|
"negative_check": "Unauthorized caller cannot read the imported path.",
|
|
"requester_agent": "unit-test",
|
|
"actor": "unit-test",
|
|
"reason": "Imported existing lane without secret values",
|
|
"output_dir": str(output_dir),
|
|
"write_policy": True,
|
|
},
|
|
)()
|
|
path = credential_change.write_inventory_ccr(args)
|
|
self.assertTrue(path.exists())
|
|
self.assertTrue(policy_file.exists())
|
|
ccr, errors, warnings = credential_change.validate_ccr(path)
|
|
self.assertEqual(errors, [])
|
|
self.assertEqual(warnings, [])
|
|
self.assertEqual(ccr["openbao"]["fields"], ["RUNTIME_TOKEN"])
|
|
self.assertNotIn("ghp_", path.read_text(encoding="utf-8"))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|