from __future__ import annotations import importlib.util import os import shutil import sys import tempfile import unittest from pathlib import Path REPO_DIR = Path(__file__).resolve().parents[1] SPEC = importlib.util.spec_from_file_location( "credential_change", REPO_DIR / "scripts/credential-change.py" ) credential_change = importlib.util.module_from_spec(SPEC) assert SPEC.loader is not None sys.modules[SPEC.name] = credential_change SPEC.loader.exec_module(credential_change) class CredentialChangeTests(unittest.TestCase): def setUp(self) -> None: self.sample = ( REPO_DIR / "credential-change-requests/CCR-2026-0001-whynot-design-npm-publish.yaml" ) self.issue_core = ( REPO_DIR / "credential-change-requests/CCR-2026-0002-issue-core-ingestion-api-key.yaml" ) def test_sample_ccr_validates_without_bound_claim_warning(self) -> None: ccr, errors, warnings = credential_change.validate_ccr(self.sample) self.assertEqual(errors, []) self.assertEqual(warnings, []) self.assertTrue(ccr["openbao"]["auth"]["bound_claims_confirmed"]) def test_unconfirmed_sibling_ccr_keeps_bound_claim_warning(self) -> None: _ccr, errors, warnings = credential_change.validate_ccr(self.issue_core) self.assertEqual(errors, []) self.assertIn("bound claim is not confirmed", warnings[0]) def test_all_repo_ccrs_validate(self) -> None: for path in sorted((REPO_DIR / "credential-change-requests").glob("*.yaml")): with self.subTest(path=path.name): _ccr, errors, _warnings = credential_change.validate_ccr(path) self.assertEqual(errors, []) def test_render_summary_contains_review_fields(self) -> None: ccr, _errors, warnings = credential_change.validate_ccr(self.sample) rendered = credential_change.render_summary(ccr, warnings) self.assertIn("whynot-design npm publish token lane", rendered) self.assertIn("platform/workloads/coulomb/whynot-design/npm-publish", rendered) self.assertIn("whynot-design-npm-publish", rendered) self.assertIn("readiness: template resolvable=False", rendered) self.assertIn("approve | deny | needs_changes", rendered) def test_status_payload_marks_template_not_resolvable(self) -> None: ccr, _errors, warnings = credential_change.validate_ccr(self.sample) payload = credential_change.status_payload(ccr, warnings) self.assertFalse(payload["apply_allowed"]) self.assertFalse(payload["frontdoor_resolvable"]) self.assertEqual(payload["access_frontdoor"]["readiness"], "template") self.assertEqual(payload["access_frontdoor"]["catalog_id"], "whynot-design-npm-publish") self.assertEqual(payload["apply_blockers"], ["apply requires status approved, got proposed"]) self.assertEqual(payload["warnings"], []) self.assertIsNone(payload["state_hub"]["decision_id"]) self.assertIn( "front door requires CCR status active, got proposed", payload["frontdoor_blockers"], ) self.assertIn("front door is marked resolvable=false", payload["frontdoor_blockers"]) def test_state_hub_rationale_prefix_maps_to_ccr_status(self) -> None: cases = { "APPROVE: scoped path and binding are correct": "approved", "DENY: wrong tenant": "denied", "NEEDS_CHANGES: use a read-only token": "needs_changes", "request changes: clarify service account": "needs_changes", } for rationale, expected in cases.items(): with self.subTest(rationale=rationale): self.assertEqual( credential_change.ccr_status_from_state_hub_rationale(rationale), expected, ) with self.assertRaises(SystemExit): credential_change.ccr_status_from_state_hub_rationale("looks good") def test_sync_state_hub_decision_updates_ccr_status(self) -> None: with tempfile.TemporaryDirectory() as tmp: copied = Path(tmp) / self.sample.name shutil.copy2(self.sample, copied) copied_ccr = credential_change.load_yaml(copied) copied_ccr.setdefault("state_hub", {})[ "decision_id" ] = "250669d0-8475-4527-9624-cd072249f9a9" credential_change.dump_yaml(copied, copied_ccr) original = credential_change.state_hub_decision_status try: credential_change.state_hub_decision_status = lambda _ccr, _url: { "id": "250669d0-8475-4527-9624-cd072249f9a9", "status": "resolved", "rationale": "APPROVE: scoped path and confirmed binding are acceptable", "decided_by": "unit-test", "decided_at": "2026-06-27T22:00:00Z", } credential_change.sync_state_hub_decision(copied, "http://state-hub.test") finally: credential_change.state_hub_decision_status = original ccr, errors, warnings = credential_change.validate_ccr(copied) self.assertEqual(errors, []) self.assertEqual(warnings, []) self.assertEqual(ccr["status"], "approved") self.assertEqual(ccr["review"]["comments"][-1]["reviewer"], "unit-test") self.assertIn("State Hub decision", ccr["review"]["comments"][-1]["comment"]) self.assertEqual(ccr["state_hub"]["decision_resolved_at"], "2026-06-27T22:00:00Z") def test_kubernetes_auth_payload_uses_service_account_bounds(self) -> None: ccr, errors, _warnings = credential_change.validate_ccr(self.issue_core) self.assertEqual(errors, []) payload = credential_change.auth_payload(ccr) self.assertEqual(payload["bound_service_account_names"], ["issue-core"]) self.assertEqual(payload["bound_service_account_namespaces"], ["issue-core"]) self.assertNotIn("bound_claims", payload) def test_apply_plan_refuses_unapproved_ccr(self) -> None: with self.assertRaises(SystemExit): credential_change.command_apply_plan(type("Args", (), {"ref": str(self.issue_core)})()) def test_operator_commands_render_non_secret_policy_and_role_handoff(self) -> None: ccr, errors, warnings = credential_change.validate_ccr(self.sample) self.assertEqual(errors, []) self.assertEqual(warnings, []) rendered = credential_change.render_operator_commands(ccr) self.assertIn( "bao policy write workload-kv-read-whynot-design-npm-publish", rendered, ) self.assertIn( "bao write auth/netkingdom/role/whynot-design-workload-kv-read", rendered, ) self.assertIn( 'bound_claims={"groups":["whynot-design"]}', rendered, ) self.assertIn( "# bao kv put platform/workloads/coulomb/whynot-design/npm-publish", rendered, ) self.assertIn("NPM_AUTH_TOKEN=", rendered) self.assertNotIn("npm_", rendered) def test_operator_commands_refuse_unapproved_ccr(self) -> None: with self.assertRaises(SystemExit): credential_change.command_operator_commands( type("Args", (), {"ref": str(self.issue_core)})() ) def test_approve_records_comment_but_unconfirmed_claim_still_blocks_apply(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) ccr_dir = tmp_path / "ccrs" ccr_dir.mkdir() copied = ccr_dir / self.issue_core.name shutil.copy2(self.issue_core, copied) old_ccr_dir = os.environ.get("CCR_DIR") os.environ["CCR_DIR"] = str(ccr_dir) try: credential_change.append_decision( copied, "approved", "unit-test", "looks right" ) ccr, errors, _warnings = credential_change.validate_ccr(copied) self.assertEqual(errors, []) self.assertEqual(ccr["status"], "approved") self.assertEqual(ccr["review"]["comments"][-1]["comment"], "looks right") with self.assertRaises(SystemExit): credential_change.command_apply_plan( type("Args", (), {"ref": "CCR-2026-0002"})() ) finally: if old_ccr_dir is None: os.environ.pop("CCR_DIR", None) else: os.environ["CCR_DIR"] = old_ccr_dir def test_confirm_binding_records_comment_and_clears_warning(self) -> None: with tempfile.TemporaryDirectory() as tmp: copied = Path(tmp) / self.issue_core.name shutil.copy2(self.issue_core, copied) credential_change.confirm_binding( copied, "unit-test", "service account binding confirmed" ) ccr, errors, warnings = credential_change.validate_ccr(copied) self.assertEqual(errors, []) self.assertEqual(warnings, []) self.assertTrue(ccr["openbao"]["auth"]["bound_claims_confirmed"]) self.assertEqual(ccr["review"]["comments"][-1]["decision"], "binding_confirmed") def test_generated_policy_is_narrow(self) -> None: ccr, _errors, _warnings = credential_change.validate_ccr(self.sample) policy = credential_change.generated_policy_hcl(ccr) self.assertIn('path "platform/data/workloads/coulomb/whynot-design/npm-publish"', policy) self.assertNotIn("*", policy) self.assertNotIn("delete", policy) if __name__ == "__main__": unittest.main()