generated from coulomb/repo-seed
session-memory Phase 2: hub decision integration (T05)
decisions.py: every final promote/reject becomes a record_decision-shaped payload (rationale + source key + evidence snapshot). DecisionRecorder degrades gracefully under a hub outage — pluggable sink with a durable local-queue fallback and ordered flush/replay (mirrors Phase 1's after-the-fact sync). Wired into review() via an optional recorder. 6 new tests; suite 70/70 green. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
114
session_memory/curate/decisions.py
Normal file
114
session_memory/curate/decisions.py
Normal file
@@ -0,0 +1,114 @@
|
||||
"""State Hub decision integration (FR-U4; T05).
|
||||
|
||||
Every final promote/reject is recorded as an auditable decision so the rationale,
|
||||
the source candidate key, and an evidence snapshot are traceable. The catalog
|
||||
file remains the durable artifact (ADR-001); the decision is the audit trail.
|
||||
|
||||
The recorder is **graceful under a hub outage** — exactly the condition hit during
|
||||
Phase 1, where statuses were synced after the fact. A pluggable ``sink`` does the
|
||||
actual write (HTTP to the hub, or the MCP ``record_decision`` tool driven by the
|
||||
operator). If the sink is absent or raises, the decision is appended to a local
|
||||
queue (``decisions.queue.jsonl``) and can be replayed later with :meth:`flush`.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from typing import Callable, Optional
|
||||
|
||||
# A sink takes a hub-shaped decision payload and persists it (may raise on failure).
|
||||
Sink = Callable[[dict], None]
|
||||
|
||||
|
||||
def _now() -> str:
|
||||
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
|
||||
|
||||
def build_decision(candidate: dict, action: str, rationale: str,
|
||||
*, workstream_id: Optional[str] = None,
|
||||
decided_by: str = "curator") -> dict:
|
||||
"""Shape a curate decision as a State Hub ``record_decision`` payload."""
|
||||
key = candidate["key"]
|
||||
verb = "Promote" if action == "approve" else "Reject"
|
||||
return {
|
||||
"title": f"{verb} pattern candidate {key}",
|
||||
"decision_type": "made",
|
||||
"workstream_id": workstream_id,
|
||||
"rationale": rationale,
|
||||
"decided_by": decided_by,
|
||||
"description": json.dumps({
|
||||
"action": action,
|
||||
"source_key": key,
|
||||
"evidence": candidate,
|
||||
}, sort_keys=True),
|
||||
"recorded_at": _now(),
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class DecisionRecorder:
|
||||
"""Records decisions through ``sink`` with a durable local-queue fallback."""
|
||||
|
||||
queue_path: str
|
||||
sink: Optional[Sink] = None
|
||||
workstream_id: Optional[str] = None
|
||||
decided_by: str = "curator"
|
||||
_queued: int = field(default=0, init=False)
|
||||
|
||||
def record(self, candidate: dict, action: str, rationale: str) -> bool:
|
||||
"""Record one decision. Returns True if the sink accepted it, else queued."""
|
||||
payload = build_decision(candidate, action, rationale,
|
||||
workstream_id=self.workstream_id, decided_by=self.decided_by)
|
||||
if self.sink is not None:
|
||||
try:
|
||||
self.sink(payload)
|
||||
return True
|
||||
except Exception: # hub down / transient — fall through to the queue
|
||||
pass
|
||||
self._append(payload)
|
||||
return False
|
||||
|
||||
def pending(self) -> list[dict]:
|
||||
if not os.path.exists(self.queue_path):
|
||||
return []
|
||||
with open(self.queue_path, encoding="utf-8") as fh:
|
||||
return [json.loads(line) for line in fh if line.strip()]
|
||||
|
||||
def flush(self, sink: Optional[Sink] = None) -> int:
|
||||
"""Replay queued decisions through ``sink``. Returns count synced.
|
||||
|
||||
Stops at the first failure so ordering is preserved; the unsynced tail is
|
||||
rewritten back to the queue.
|
||||
"""
|
||||
sink = sink or self.sink
|
||||
if sink is None:
|
||||
return 0
|
||||
items = self.pending()
|
||||
synced = 0
|
||||
for i, payload in enumerate(items):
|
||||
try:
|
||||
sink(payload)
|
||||
synced += 1
|
||||
except Exception:
|
||||
self._rewrite(items[i:])
|
||||
return synced
|
||||
self._rewrite([])
|
||||
return synced
|
||||
|
||||
# --- internals ----------------------------------------------------------
|
||||
|
||||
def _append(self, payload: dict) -> None:
|
||||
os.makedirs(os.path.dirname(self.queue_path) or ".", exist_ok=True)
|
||||
with open(self.queue_path, "a", encoding="utf-8") as fh:
|
||||
fh.write(json.dumps(payload, sort_keys=True))
|
||||
fh.write("\n")
|
||||
self._queued += 1
|
||||
|
||||
def _rewrite(self, items: list[dict]) -> None:
|
||||
with open(self.queue_path, "w", encoding="utf-8") as fh:
|
||||
for payload in items:
|
||||
fh.write(json.dumps(payload, sort_keys=True))
|
||||
fh.write("\n")
|
||||
@@ -22,6 +22,7 @@ from datetime import datetime, timezone
|
||||
from typing import Callable, Optional
|
||||
|
||||
from .catalog import Catalog
|
||||
from .decisions import DecisionRecorder
|
||||
from .gating import GateConfig, evaluate
|
||||
from .schema import Provenance, Resolution, Scope, SolutionPattern
|
||||
|
||||
@@ -119,13 +120,16 @@ class ReviewResult:
|
||||
|
||||
|
||||
def review(candidates: list[dict], decide: Decider, catalog: Catalog,
|
||||
log: ReviewLog, gate: Optional[GateConfig] = None) -> ReviewResult:
|
||||
log: ReviewLog, gate: Optional[GateConfig] = None,
|
||||
recorder: Optional[DecisionRecorder] = None) -> ReviewResult:
|
||||
"""Run each candidate through ``decide``; promote approvals into ``catalog``.
|
||||
|
||||
When a ``gate`` (T04 evidence bar) is supplied, the promoted pattern's
|
||||
``status``/``distribution_ready`` are set from the gate evaluation, so an
|
||||
approved-but-thin candidate lands as ``provisional`` rather than
|
||||
distribution-ready.
|
||||
distribution-ready. When a ``recorder`` (T05) is supplied, each final
|
||||
promote/reject is logged as an auditable hub decision (queued if the hub is
|
||||
down).
|
||||
"""
|
||||
result = ReviewResult()
|
||||
for cand in candidates:
|
||||
@@ -149,4 +153,6 @@ def review(candidates: list[dict], decide: Decider, catalog: Catalog,
|
||||
else:
|
||||
raise ValueError(f"unknown review action {action!r}")
|
||||
log.record(cand, action, rationale)
|
||||
if recorder is not None:
|
||||
recorder.record(cand, action, rationale)
|
||||
return result
|
||||
|
||||
70
tests/test_curate_decisions.py
Normal file
70
tests/test_curate_decisions.py
Normal file
@@ -0,0 +1,70 @@
|
||||
"""Hub decision integration tests (T05): payload shape + graceful queue/flush."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from session_memory.curate.catalog import Catalog # noqa: E402
|
||||
from session_memory.curate.decisions import DecisionRecorder, build_decision # noqa: E402
|
||||
from session_memory.curate.review import APPROVE, REJECT, ReviewLog, review # noqa: E402
|
||||
|
||||
|
||||
def _candidate(key="success:clean_pass:outcome"):
|
||||
return {"key": key, "frequency": 18, "sessions": ["a", "b"],
|
||||
"cost_impact": 9.0, "cross_flavor": True, "flavors": ["claude", "grok"]}
|
||||
|
||||
|
||||
def test_build_decision_payload_shape():
|
||||
d = build_decision(_candidate(), "approve", "looks solid", workstream_id="ws-1")
|
||||
assert d["decision_type"] == "made"
|
||||
assert d["workstream_id"] == "ws-1"
|
||||
assert "Promote" in d["title"]
|
||||
assert d["rationale"] == "looks solid"
|
||||
assert "success:clean_pass:outcome" in d["description"]
|
||||
|
||||
|
||||
def test_sink_accepts_decision(tmp_path):
|
||||
captured = []
|
||||
rec = DecisionRecorder(str(tmp_path / "q.jsonl"), sink=captured.append)
|
||||
assert rec.record(_candidate(), "approve", "ok") is True
|
||||
assert rec.pending() == []
|
||||
assert len(captured) == 1
|
||||
|
||||
|
||||
def test_queues_when_sink_down(tmp_path):
|
||||
def boom(_):
|
||||
raise RuntimeError("hub down")
|
||||
|
||||
rec = DecisionRecorder(str(tmp_path / "q.jsonl"), sink=boom)
|
||||
assert rec.record(_candidate(), "reject", "noise") is False
|
||||
assert len(rec.pending()) == 1
|
||||
|
||||
|
||||
def test_no_sink_defaults_to_queue(tmp_path):
|
||||
rec = DecisionRecorder(str(tmp_path / "q.jsonl"))
|
||||
rec.record(_candidate(), "approve", "ok")
|
||||
assert len(rec.pending()) == 1
|
||||
|
||||
|
||||
def test_flush_replays_queue(tmp_path):
|
||||
rec = DecisionRecorder(str(tmp_path / "q.jsonl")) # offline -> queue
|
||||
rec.record(_candidate("problem:abandoned:outcome"), "reject", "x")
|
||||
rec.record(_candidate("success:clean_pass:outcome"), "approve", "y")
|
||||
captured = []
|
||||
assert rec.flush(sink=captured.append) == 2
|
||||
assert rec.pending() == []
|
||||
assert len(captured) == 2
|
||||
|
||||
|
||||
def test_review_records_each_final_decision(tmp_path):
|
||||
cat = Catalog(str(tmp_path / "catalog"))
|
||||
log = ReviewLog(str(tmp_path / "reviews.jsonl"))
|
||||
captured = []
|
||||
rec = DecisionRecorder(str(tmp_path / "q.jsonl"), sink=captured.append, workstream_id="ws")
|
||||
cands = [_candidate("success:clean_pass:outcome"), _candidate("problem:abandoned:outcome")]
|
||||
review(cands, lambda c: (APPROVE if "success" in c["key"] else REJECT, "r"), cat, log,
|
||||
recorder=rec)
|
||||
assert len(captured) == 2
|
||||
actions = sorted("Promote" in d["title"] for d in captured)
|
||||
assert actions == [False, True]
|
||||
@@ -112,7 +112,7 @@ catalog stays lean and agent context budgets are protected. Knobs live in
|
||||
|
||||
```task
|
||||
id: AGENTIC-WP-0004-T05
|
||||
status: todo
|
||||
status: done
|
||||
priority: medium
|
||||
state_hub_task_id: "449f12d4-fae0-450d-873f-143b3a570b5a"
|
||||
```
|
||||
|
||||
Reference in New Issue
Block a user