Link CCR approval to State Hub decision

This commit is contained in:
2026-06-28 00:00:02 +02:00
parent 52687d8b3e
commit 3706ff703e
6 changed files with 173 additions and 1 deletions

View File

@@ -6,6 +6,8 @@ import json
import os
import re
import sys
import urllib.error
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
@@ -439,6 +441,11 @@ def status_payload(ccr: dict[str, Any], warnings: list[str]) -> dict[str, Any]:
"resolvable": frontdoor.get("resolvable") is True,
"command": frontdoor.get("command"),
},
"state_hub": {
"decision_id": ccr.get("state_hub", {}).get("decision_id"),
"decision_api_url": ccr.get("state_hub", {}).get("decision_api_url"),
"decision_dashboard_url": ccr.get("state_hub", {}).get("decision_dashboard_url"),
},
}
@@ -450,6 +457,12 @@ def render_status(payload: dict[str, Any]) -> str:
f"Resolvable: {payload['frontdoor_resolvable']}",
f"Apply allowed: {payload['apply_allowed']}",
]
decision = payload.get("state_hub", {}).get("decision_api_url")
dashboard = payload.get("state_hub", {}).get("decision_dashboard_url")
if decision:
lines.append(f"State Hub decision: {decision}")
if dashboard:
lines.append(f"Decision dashboard: {dashboard}")
command = payload["access_frontdoor"].get("command")
if command:
lines.append(f"Command: {command}")
@@ -510,6 +523,80 @@ def confirm_binding(path: Path, reviewer: str, comment: str) -> None:
dump_yaml(path, ccr)
STATE_HUB_DECISION_PREFIXES = (
("NEEDS_CHANGES", "needs_changes"),
("NEEDS CHANGES", "needs_changes"),
("REQUEST CHANGES", "needs_changes"),
("APPROVE", "approved"),
("APPROVED", "approved"),
("DENY", "denied"),
("DENIED", "denied"),
("REJECT", "denied"),
("REJECTED", "denied"),
)
def state_hub_get_json(base_url: str, path: str) -> dict[str, Any]:
url = f"{base_url.rstrip('/')}/{path.lstrip('/')}"
try:
with urllib.request.urlopen(url, timeout=10) as response:
data = json.load(response)
except urllib.error.HTTPError as exc:
fail(f"State Hub GET {url} failed with HTTP {exc.code}")
except OSError as exc:
fail(f"State Hub GET {url} failed: {exc}")
if not isinstance(data, dict):
fail(f"State Hub GET {url} returned non-object JSON")
return data
def state_hub_decision_status(ccr: dict[str, Any], base_url: str) -> dict[str, Any]:
decision_id = ccr.get("state_hub", {}).get("decision_id")
if not decision_id:
fail("CCR has no state_hub.decision_id")
return state_hub_get_json(base_url, f"/decisions/{decision_id}")
def ccr_status_from_state_hub_rationale(rationale: str) -> str:
normalized = rationale.strip().upper().replace("-", "_")
for prefix, status in STATE_HUB_DECISION_PREFIXES:
if normalized == prefix or normalized.startswith(f"{prefix}:"):
return status
fail(
"resolved State Hub decision rationale must start with "
"APPROVE:, DENY:, or NEEDS_CHANGES:"
)
def sync_state_hub_decision(path: Path, base_url: str) -> dict[str, Any]:
ccr, errors, warnings = validate_ccr(path)
if errors:
for error in errors:
print(f"[FAIL] {path.name}: {error}", file=sys.stderr)
raise SystemExit(1)
for warning in warnings:
print(f"[WARN] {path.name}: {warning}", file=sys.stderr)
decision = state_hub_decision_status(ccr, base_url)
if decision.get("status") != "resolved":
return decision
rationale = str(decision.get("rationale") or "")
status = ccr_status_from_state_hub_rationale(rationale)
reviewer = str(decision.get("decided_by") or "state-hub")
append_decision(
path,
status,
reviewer,
f"State Hub decision {decision['id']}: {rationale}",
)
updated = load_yaml(path)
state_hub = updated.setdefault("state_hub", {})
state_hub["decision_resolved_at"] = decision.get("decided_at")
dump_yaml(path, updated)
return decision
def command_validate(args: argparse.Namespace) -> int:
refs = args.refs or [str(path) for path in sorted(ccr_dir().glob("*.y*ml"))]
if not refs:
@@ -584,6 +671,19 @@ def command_confirm_binding(args: argparse.Namespace) -> int:
return 0
def command_sync_decision(args: argparse.Namespace) -> int:
path = resolve_ccr(args.ref)
decision = sync_state_hub_decision(path, args.state_hub_url)
if decision.get("status") == "resolved":
print(f"[OK] {path.name} <- State Hub decision {decision['id']}")
else:
print(
f"[WAIT] State Hub decision {decision['id']} is {decision.get('status')}; "
"resolve it before syncing CCR status"
)
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Validate, render, and review non-secret credential change requests."
@@ -633,6 +733,17 @@ def build_parser() -> argparse.ArgumentParser:
binding.add_argument("--comment", required=True)
binding.set_defaults(func=command_confirm_binding)
sync_decision = sub.add_parser(
"sync-decision",
help="Sync an approved/denied/needs_changes CCR decision from State Hub",
)
sync_decision.add_argument("ref")
sync_decision.add_argument(
"--state-hub-url",
default=os.environ.get("STATE_HUB_URL", "http://127.0.0.1:8000"),
)
sync_decision.set_defaults(func=command_sync_decision)
return parser