"""State Hub decision integration (FR-U4; T05). Every final promote/reject is recorded as an auditable decision so the rationale, the source candidate key, and an evidence snapshot are traceable. The catalog file remains the durable artifact (ADR-001); the decision is the audit trail. The recorder is **graceful under a hub outage** — exactly the condition hit during Phase 1, where statuses were synced after the fact. A pluggable ``sink`` does the actual write (HTTP to the hub, or the MCP ``record_decision`` tool driven by the operator). If the sink is absent or raises, the decision is appended to a local queue (``decisions.queue.jsonl``) and can be replayed later with :meth:`flush`. """ from __future__ import annotations import json import os from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Callable, Optional # A sink takes a hub-shaped decision payload and persists it (may raise on failure). Sink = Callable[[dict], None] def _now() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def build_decision(candidate: dict, action: str, rationale: str, *, workstream_id: Optional[str] = None, decided_by: str = "curator") -> dict: """Shape a curate decision as a State Hub ``record_decision`` payload.""" key = candidate["key"] verb = "Promote" if action == "approve" else "Reject" return { "title": f"{verb} pattern candidate {key}", "decision_type": "made", "workstream_id": workstream_id, "rationale": rationale, "decided_by": decided_by, "description": json.dumps({ "action": action, "source_key": key, "evidence": candidate, }, sort_keys=True), "recorded_at": _now(), } @dataclass class DecisionRecorder: """Records decisions through ``sink`` with a durable local-queue fallback.""" queue_path: str sink: Optional[Sink] = None workstream_id: Optional[str] = None decided_by: str = "curator" _queued: int = field(default=0, init=False) def record(self, candidate: dict, action: str, rationale: str) -> bool: """Record one decision. Returns True if the sink accepted it, else queued.""" payload = build_decision(candidate, action, rationale, workstream_id=self.workstream_id, decided_by=self.decided_by) if self.sink is not None: try: self.sink(payload) return True except Exception: # hub down / transient — fall through to the queue pass self._append(payload) return False def pending(self) -> list[dict]: if not os.path.exists(self.queue_path): return [] with open(self.queue_path, encoding="utf-8") as fh: return [json.loads(line) for line in fh if line.strip()] def flush(self, sink: Optional[Sink] = None) -> int: """Replay queued decisions through ``sink``. Returns count synced. Stops at the first failure so ordering is preserved; the unsynced tail is rewritten back to the queue. """ sink = sink or self.sink if sink is None: return 0 items = self.pending() synced = 0 for i, payload in enumerate(items): try: sink(payload) synced += 1 except Exception: self._rewrite(items[i:]) return synced self._rewrite([]) return synced # --- internals ---------------------------------------------------------- def _append(self, payload: dict) -> None: os.makedirs(os.path.dirname(self.queue_path) or ".", exist_ok=True) with open(self.queue_path, "a", encoding="utf-8") as fh: fh.write(json.dumps(payload, sort_keys=True)) fh.write("\n") self._queued += 1 def _rewrite(self, items: list[dict]) -> None: with open(self.queue_path, "w", encoding="utf-8") as fh: for payload in items: fh.write(json.dumps(payload, sort_keys=True)) fh.write("\n")