generated from coulomb/repo-seed
decisions.py: every final promote/reject becomes a record_decision-shaped payload (rationale + source key + evidence snapshot). DecisionRecorder degrades gracefully under a hub outage — pluggable sink with a durable local-queue fallback and ordered flush/replay (mirrors Phase 1's after-the-fact sync). Wired into review() via an optional recorder. 6 new tests; suite 70/70 green. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
115 lines
4.1 KiB
Python
115 lines
4.1 KiB
Python
"""State Hub decision integration (FR-U4; T05).
|
|
|
|
Every final promote/reject is recorded as an auditable decision so the rationale,
|
|
the source candidate key, and an evidence snapshot are traceable. The catalog
|
|
file remains the durable artifact (ADR-001); the decision is the audit trail.
|
|
|
|
The recorder is **graceful under a hub outage** — exactly the condition hit during
|
|
Phase 1, where statuses were synced after the fact. A pluggable ``sink`` does the
|
|
actual write (HTTP to the hub, or the MCP ``record_decision`` tool driven by the
|
|
operator). If the sink is absent or raises, the decision is appended to a local
|
|
queue (``decisions.queue.jsonl``) and can be replayed later with :meth:`flush`.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from typing import Callable, Optional
|
|
|
|
# A sink takes a hub-shaped decision payload and persists it (may raise on failure).
|
|
Sink = Callable[[dict], None]
|
|
|
|
|
|
def _now() -> str:
|
|
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
|
|
def build_decision(candidate: dict, action: str, rationale: str,
|
|
*, workstream_id: Optional[str] = None,
|
|
decided_by: str = "curator") -> dict:
|
|
"""Shape a curate decision as a State Hub ``record_decision`` payload."""
|
|
key = candidate["key"]
|
|
verb = "Promote" if action == "approve" else "Reject"
|
|
return {
|
|
"title": f"{verb} pattern candidate {key}",
|
|
"decision_type": "made",
|
|
"workstream_id": workstream_id,
|
|
"rationale": rationale,
|
|
"decided_by": decided_by,
|
|
"description": json.dumps({
|
|
"action": action,
|
|
"source_key": key,
|
|
"evidence": candidate,
|
|
}, sort_keys=True),
|
|
"recorded_at": _now(),
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class DecisionRecorder:
|
|
"""Records decisions through ``sink`` with a durable local-queue fallback."""
|
|
|
|
queue_path: str
|
|
sink: Optional[Sink] = None
|
|
workstream_id: Optional[str] = None
|
|
decided_by: str = "curator"
|
|
_queued: int = field(default=0, init=False)
|
|
|
|
def record(self, candidate: dict, action: str, rationale: str) -> bool:
|
|
"""Record one decision. Returns True if the sink accepted it, else queued."""
|
|
payload = build_decision(candidate, action, rationale,
|
|
workstream_id=self.workstream_id, decided_by=self.decided_by)
|
|
if self.sink is not None:
|
|
try:
|
|
self.sink(payload)
|
|
return True
|
|
except Exception: # hub down / transient — fall through to the queue
|
|
pass
|
|
self._append(payload)
|
|
return False
|
|
|
|
def pending(self) -> list[dict]:
|
|
if not os.path.exists(self.queue_path):
|
|
return []
|
|
with open(self.queue_path, encoding="utf-8") as fh:
|
|
return [json.loads(line) for line in fh if line.strip()]
|
|
|
|
def flush(self, sink: Optional[Sink] = None) -> int:
|
|
"""Replay queued decisions through ``sink``. Returns count synced.
|
|
|
|
Stops at the first failure so ordering is preserved; the unsynced tail is
|
|
rewritten back to the queue.
|
|
"""
|
|
sink = sink or self.sink
|
|
if sink is None:
|
|
return 0
|
|
items = self.pending()
|
|
synced = 0
|
|
for i, payload in enumerate(items):
|
|
try:
|
|
sink(payload)
|
|
synced += 1
|
|
except Exception:
|
|
self._rewrite(items[i:])
|
|
return synced
|
|
self._rewrite([])
|
|
return synced
|
|
|
|
# --- internals ----------------------------------------------------------
|
|
|
|
def _append(self, payload: dict) -> None:
|
|
os.makedirs(os.path.dirname(self.queue_path) or ".", exist_ok=True)
|
|
with open(self.queue_path, "a", encoding="utf-8") as fh:
|
|
fh.write(json.dumps(payload, sort_keys=True))
|
|
fh.write("\n")
|
|
self._queued += 1
|
|
|
|
def _rewrite(self, items: list[dict]) -> None:
|
|
with open(self.queue_path, "w", encoding="utf-8") as fh:
|
|
for payload in items:
|
|
fh.write(json.dumps(payload, sort_keys=True))
|
|
fh.write("\n")
|