Files
agentic-resources/session_memory/curate/decisions.py
tegwick 4b7a628b6f session-memory Phase 2: hub decision integration (T05)
decisions.py: every final promote/reject becomes a record_decision-shaped
payload (rationale + source key + evidence snapshot). DecisionRecorder degrades
gracefully under a hub outage — pluggable sink with a durable local-queue
fallback and ordered flush/replay (mirrors Phase 1's after-the-fact sync).
Wired into review() via an optional recorder. 6 new tests; suite 70/70 green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-07 00:31:22 +02:00

115 lines
4.1 KiB
Python

"""State Hub decision integration (FR-U4; T05).
Every final promote/reject is recorded as an auditable decision so the rationale,
the source candidate key, and an evidence snapshot are traceable. The catalog
file remains the durable artifact (ADR-001); the decision is the audit trail.
The recorder is **graceful under a hub outage** — exactly the condition hit during
Phase 1, where statuses were synced after the fact. A pluggable ``sink`` does the
actual write (HTTP to the hub, or the MCP ``record_decision`` tool driven by the
operator). If the sink is absent or raises, the decision is appended to a local
queue (``decisions.queue.jsonl``) and can be replayed later with :meth:`flush`.
"""
from __future__ import annotations
import json
import os
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Callable, Optional
# A sink takes a hub-shaped decision payload and persists it (may raise on failure).
Sink = Callable[[dict], None]
def _now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def build_decision(candidate: dict, action: str, rationale: str,
*, workstream_id: Optional[str] = None,
decided_by: str = "curator") -> dict:
"""Shape a curate decision as a State Hub ``record_decision`` payload."""
key = candidate["key"]
verb = "Promote" if action == "approve" else "Reject"
return {
"title": f"{verb} pattern candidate {key}",
"decision_type": "made",
"workstream_id": workstream_id,
"rationale": rationale,
"decided_by": decided_by,
"description": json.dumps({
"action": action,
"source_key": key,
"evidence": candidate,
}, sort_keys=True),
"recorded_at": _now(),
}
@dataclass
class DecisionRecorder:
"""Records decisions through ``sink`` with a durable local-queue fallback."""
queue_path: str
sink: Optional[Sink] = None
workstream_id: Optional[str] = None
decided_by: str = "curator"
_queued: int = field(default=0, init=False)
def record(self, candidate: dict, action: str, rationale: str) -> bool:
"""Record one decision. Returns True if the sink accepted it, else queued."""
payload = build_decision(candidate, action, rationale,
workstream_id=self.workstream_id, decided_by=self.decided_by)
if self.sink is not None:
try:
self.sink(payload)
return True
except Exception: # hub down / transient — fall through to the queue
pass
self._append(payload)
return False
def pending(self) -> list[dict]:
if not os.path.exists(self.queue_path):
return []
with open(self.queue_path, encoding="utf-8") as fh:
return [json.loads(line) for line in fh if line.strip()]
def flush(self, sink: Optional[Sink] = None) -> int:
"""Replay queued decisions through ``sink``. Returns count synced.
Stops at the first failure so ordering is preserved; the unsynced tail is
rewritten back to the queue.
"""
sink = sink or self.sink
if sink is None:
return 0
items = self.pending()
synced = 0
for i, payload in enumerate(items):
try:
sink(payload)
synced += 1
except Exception:
self._rewrite(items[i:])
return synced
self._rewrite([])
return synced
# --- internals ----------------------------------------------------------
def _append(self, payload: dict) -> None:
os.makedirs(os.path.dirname(self.queue_path) or ".", exist_ok=True)
with open(self.queue_path, "a", encoding="utf-8") as fh:
fh.write(json.dumps(payload, sort_keys=True))
fh.write("\n")
self._queued += 1
def _rewrite(self, items: list[dict]) -> None:
with open(self.queue_path, "w", encoding="utf-8") as fh:
for payload in items:
fh.write(json.dumps(payload, sort_keys=True))
fh.write("\n")