Files
railiance-platform/tests/test_credential_change.py

221 lines
10 KiB
Python

from __future__ import annotations
import importlib.util
import os
import shutil
import sys
import tempfile
import unittest
from pathlib import Path
REPO_DIR = Path(__file__).resolve().parents[1]
SPEC = importlib.util.spec_from_file_location(
"credential_change", REPO_DIR / "scripts/credential-change.py"
)
credential_change = importlib.util.module_from_spec(SPEC)
assert SPEC.loader is not None
sys.modules[SPEC.name] = credential_change
SPEC.loader.exec_module(credential_change)
class CredentialChangeTests(unittest.TestCase):
def setUp(self) -> None:
self.sample = (
REPO_DIR
/ "credential-change-requests/CCR-2026-0001-whynot-design-npm-publish.yaml"
)
self.issue_core = (
REPO_DIR
/ "credential-change-requests/CCR-2026-0002-issue-core-ingestion-api-key.yaml"
)
def test_sample_ccr_validates_without_bound_claim_warning(self) -> None:
ccr, errors, warnings = credential_change.validate_ccr(self.sample)
self.assertEqual(errors, [])
self.assertEqual(warnings, [])
self.assertTrue(ccr["openbao"]["auth"]["bound_claims_confirmed"])
def test_unconfirmed_sibling_ccr_keeps_bound_claim_warning(self) -> None:
_ccr, errors, warnings = credential_change.validate_ccr(self.issue_core)
self.assertEqual(errors, [])
self.assertIn("bound claim is not confirmed", warnings[0])
def test_all_repo_ccrs_validate(self) -> None:
for path in sorted((REPO_DIR / "credential-change-requests").glob("*.yaml")):
with self.subTest(path=path.name):
_ccr, errors, _warnings = credential_change.validate_ccr(path)
self.assertEqual(errors, [])
def test_render_summary_contains_review_fields(self) -> None:
ccr, _errors, warnings = credential_change.validate_ccr(self.sample)
rendered = credential_change.render_summary(ccr, warnings)
self.assertIn("whynot-design npm publish token lane", rendered)
self.assertIn("platform/workloads/coulomb/whynot-design/npm-publish", rendered)
self.assertIn("whynot-design-npm-publish", rendered)
self.assertIn("readiness: template resolvable=False", rendered)
self.assertIn("approve | deny | needs_changes", rendered)
def test_status_payload_marks_template_not_resolvable(self) -> None:
ccr, _errors, warnings = credential_change.validate_ccr(self.sample)
payload = credential_change.status_payload(ccr, warnings)
self.assertTrue(payload["apply_allowed"])
self.assertFalse(payload["frontdoor_resolvable"])
self.assertEqual(payload["access_frontdoor"]["readiness"], "template")
self.assertEqual(payload["access_frontdoor"]["catalog_id"], "whynot-design-npm-publish")
self.assertEqual(payload["apply_blockers"], [])
self.assertEqual(payload["warnings"], [])
self.assertEqual(
payload["state_hub"]["decision_id"],
"e6381a56-6b04-4fd5-b2de-f3ef59cde888",
)
self.assertIn(
"front door requires CCR status active, got approved",
payload["frontdoor_blockers"],
)
self.assertIn("front door is marked resolvable=false", payload["frontdoor_blockers"])
def test_state_hub_rationale_prefix_maps_to_ccr_status(self) -> None:
cases = {
"APPROVE: scoped path and binding are correct": "approved",
"DENY: wrong tenant": "denied",
"NEEDS_CHANGES: use a read-only token": "needs_changes",
"request changes: clarify service account": "needs_changes",
}
for rationale, expected in cases.items():
with self.subTest(rationale=rationale):
self.assertEqual(
credential_change.ccr_status_from_state_hub_rationale(rationale),
expected,
)
with self.assertRaises(SystemExit):
credential_change.ccr_status_from_state_hub_rationale("looks good")
def test_sync_state_hub_decision_updates_ccr_status(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
copied = Path(tmp) / self.sample.name
shutil.copy2(self.sample, copied)
copied_ccr = credential_change.load_yaml(copied)
copied_ccr.setdefault("state_hub", {})[
"decision_id"
] = "250669d0-8475-4527-9624-cd072249f9a9"
credential_change.dump_yaml(copied, copied_ccr)
original = credential_change.state_hub_decision_status
try:
credential_change.state_hub_decision_status = lambda _ccr, _url: {
"id": "250669d0-8475-4527-9624-cd072249f9a9",
"status": "resolved",
"rationale": "APPROVE: scoped path and confirmed binding are acceptable",
"decided_by": "unit-test",
"decided_at": "2026-06-27T22:00:00Z",
}
credential_change.sync_state_hub_decision(copied, "http://state-hub.test")
finally:
credential_change.state_hub_decision_status = original
ccr, errors, warnings = credential_change.validate_ccr(copied)
self.assertEqual(errors, [])
self.assertEqual(warnings, [])
self.assertEqual(ccr["status"], "approved")
self.assertEqual(ccr["review"]["comments"][-1]["reviewer"], "unit-test")
self.assertIn("State Hub decision", ccr["review"]["comments"][-1]["comment"])
self.assertEqual(ccr["state_hub"]["decision_resolved_at"], "2026-06-27T22:00:00Z")
def test_kubernetes_auth_payload_uses_service_account_bounds(self) -> None:
ccr, errors, _warnings = credential_change.validate_ccr(self.issue_core)
self.assertEqual(errors, [])
payload = credential_change.auth_payload(ccr)
self.assertEqual(payload["bound_service_account_names"], ["issue-core"])
self.assertEqual(payload["bound_service_account_namespaces"], ["issue-core"])
self.assertNotIn("bound_claims", payload)
def test_apply_plan_refuses_unapproved_ccr(self) -> None:
with self.assertRaises(SystemExit):
credential_change.command_apply_plan(type("Args", (), {"ref": str(self.issue_core)})())
def test_operator_commands_render_non_secret_policy_and_role_handoff(self) -> None:
ccr, errors, warnings = credential_change.validate_ccr(self.sample)
self.assertEqual(errors, [])
self.assertEqual(warnings, [])
rendered = credential_change.render_operator_commands(ccr)
self.assertIn(
"bao policy write workload-kv-read-whynot-design-npm-publish",
rendered,
)
self.assertIn(
"bao write auth/netkingdom/role/whynot-design-workload-kv-read",
rendered,
)
self.assertIn("# Do not paste this shell block into the OpenBao Browser CLI.", rendered)
self.assertIn(
"# Web UI API Explorer path for the role JSON body: /v1/auth/netkingdom/role/whynot-design-workload-kv-read",
rendered,
)
self.assertIn('role_payload_file="$(mktemp)"', rendered)
self.assertIn('"bound_claims": {', rendered)
self.assertIn(
'bao write auth/netkingdom/role/whynot-design-workload-kv-read @"$role_payload_file"',
rendered,
)
self.assertIn(
"# bao kv put platform/workloads/coulomb/whynot-design/npm-publish",
rendered,
)
self.assertIn("NPM_AUTH_TOKEN=<enter-through-approved-custody>", rendered)
self.assertNotIn("npm_", rendered)
def test_operator_commands_refuse_unapproved_ccr(self) -> None:
with self.assertRaises(SystemExit):
credential_change.command_operator_commands(
type("Args", (), {"ref": str(self.issue_core)})()
)
def test_approve_records_comment_but_unconfirmed_claim_still_blocks_apply(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
ccr_dir = tmp_path / "ccrs"
ccr_dir.mkdir()
copied = ccr_dir / self.issue_core.name
shutil.copy2(self.issue_core, copied)
old_ccr_dir = os.environ.get("CCR_DIR")
os.environ["CCR_DIR"] = str(ccr_dir)
try:
credential_change.append_decision(
copied, "approved", "unit-test", "looks right"
)
ccr, errors, _warnings = credential_change.validate_ccr(copied)
self.assertEqual(errors, [])
self.assertEqual(ccr["status"], "approved")
self.assertEqual(ccr["review"]["comments"][-1]["comment"], "looks right")
with self.assertRaises(SystemExit):
credential_change.command_apply_plan(
type("Args", (), {"ref": "CCR-2026-0002"})()
)
finally:
if old_ccr_dir is None:
os.environ.pop("CCR_DIR", None)
else:
os.environ["CCR_DIR"] = old_ccr_dir
def test_confirm_binding_records_comment_and_clears_warning(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
copied = Path(tmp) / self.issue_core.name
shutil.copy2(self.issue_core, copied)
credential_change.confirm_binding(
copied, "unit-test", "service account binding confirmed"
)
ccr, errors, warnings = credential_change.validate_ccr(copied)
self.assertEqual(errors, [])
self.assertEqual(warnings, [])
self.assertTrue(ccr["openbao"]["auth"]["bound_claims_confirmed"])
self.assertEqual(ccr["review"]["comments"][-1]["decision"], "binding_confirmed")
def test_generated_policy_is_narrow(self) -> None:
ccr, _errors, _warnings = credential_change.validate_ccr(self.sample)
policy = credential_change.generated_policy_hcl(ccr)
self.assertIn('path "platform/data/workloads/coulomb/whynot-design/npm-publish"', policy)
self.assertNotIn("*", policy)
self.assertNotIn("delete", policy)
if __name__ == "__main__":
unittest.main()