From 4b7a628b6f0a4830816681272a6372331604b039 Mon Sep 17 00:00:00 2001 From: tegwick Date: Sun, 7 Jun 2026 00:31:22 +0200 Subject: [PATCH] session-memory Phase 2: hub decision integration (T05) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit decisions.py: every final promote/reject becomes a record_decision-shaped payload (rationale + source key + evidence snapshot). DecisionRecorder degrades gracefully under a hub outage — pluggable sink with a durable local-queue fallback and ordered flush/replay (mirrors Phase 1's after-the-fact sync). Wired into review() via an optional recorder. 6 new tests; suite 70/70 green. Co-Authored-By: Claude Opus 4.8 --- session_memory/curate/decisions.py | 114 ++++++++++++++++++ session_memory/curate/review.py | 10 +- tests/test_curate_decisions.py | 70 +++++++++++ .../AGENTIC-WP-0004-session-memory-phase2.md | 2 +- 4 files changed, 193 insertions(+), 3 deletions(-) create mode 100644 session_memory/curate/decisions.py create mode 100644 tests/test_curate_decisions.py diff --git a/session_memory/curate/decisions.py b/session_memory/curate/decisions.py new file mode 100644 index 0000000..df6ec26 --- /dev/null +++ b/session_memory/curate/decisions.py @@ -0,0 +1,114 @@ +"""State Hub decision integration (FR-U4; T05). + +Every final promote/reject is recorded as an auditable decision so the rationale, +the source candidate key, and an evidence snapshot are traceable. The catalog +file remains the durable artifact (ADR-001); the decision is the audit trail. + +The recorder is **graceful under a hub outage** — exactly the condition hit during +Phase 1, where statuses were synced after the fact. A pluggable ``sink`` does the +actual write (HTTP to the hub, or the MCP ``record_decision`` tool driven by the +operator). If the sink is absent or raises, the decision is appended to a local +queue (``decisions.queue.jsonl``) and can be replayed later with :meth:`flush`. +""" + +from __future__ import annotations + +import json +import os +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Callable, Optional + +# A sink takes a hub-shaped decision payload and persists it (may raise on failure). +Sink = Callable[[dict], None] + + +def _now() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def build_decision(candidate: dict, action: str, rationale: str, + *, workstream_id: Optional[str] = None, + decided_by: str = "curator") -> dict: + """Shape a curate decision as a State Hub ``record_decision`` payload.""" + key = candidate["key"] + verb = "Promote" if action == "approve" else "Reject" + return { + "title": f"{verb} pattern candidate {key}", + "decision_type": "made", + "workstream_id": workstream_id, + "rationale": rationale, + "decided_by": decided_by, + "description": json.dumps({ + "action": action, + "source_key": key, + "evidence": candidate, + }, sort_keys=True), + "recorded_at": _now(), + } + + +@dataclass +class DecisionRecorder: + """Records decisions through ``sink`` with a durable local-queue fallback.""" + + queue_path: str + sink: Optional[Sink] = None + workstream_id: Optional[str] = None + decided_by: str = "curator" + _queued: int = field(default=0, init=False) + + def record(self, candidate: dict, action: str, rationale: str) -> bool: + """Record one decision. Returns True if the sink accepted it, else queued.""" + payload = build_decision(candidate, action, rationale, + workstream_id=self.workstream_id, decided_by=self.decided_by) + if self.sink is not None: + try: + self.sink(payload) + return True + except Exception: # hub down / transient — fall through to the queue + pass + self._append(payload) + return False + + def pending(self) -> list[dict]: + if not os.path.exists(self.queue_path): + return [] + with open(self.queue_path, encoding="utf-8") as fh: + return [json.loads(line) for line in fh if line.strip()] + + def flush(self, sink: Optional[Sink] = None) -> int: + """Replay queued decisions through ``sink``. Returns count synced. + + Stops at the first failure so ordering is preserved; the unsynced tail is + rewritten back to the queue. + """ + sink = sink or self.sink + if sink is None: + return 0 + items = self.pending() + synced = 0 + for i, payload in enumerate(items): + try: + sink(payload) + synced += 1 + except Exception: + self._rewrite(items[i:]) + return synced + self._rewrite([]) + return synced + + # --- internals ---------------------------------------------------------- + + def _append(self, payload: dict) -> None: + os.makedirs(os.path.dirname(self.queue_path) or ".", exist_ok=True) + with open(self.queue_path, "a", encoding="utf-8") as fh: + fh.write(json.dumps(payload, sort_keys=True)) + fh.write("\n") + self._queued += 1 + + def _rewrite(self, items: list[dict]) -> None: + with open(self.queue_path, "w", encoding="utf-8") as fh: + for payload in items: + fh.write(json.dumps(payload, sort_keys=True)) + fh.write("\n") diff --git a/session_memory/curate/review.py b/session_memory/curate/review.py index b0abfae..729ab5b 100644 --- a/session_memory/curate/review.py +++ b/session_memory/curate/review.py @@ -22,6 +22,7 @@ from datetime import datetime, timezone from typing import Callable, Optional from .catalog import Catalog +from .decisions import DecisionRecorder from .gating import GateConfig, evaluate from .schema import Provenance, Resolution, Scope, SolutionPattern @@ -119,13 +120,16 @@ class ReviewResult: def review(candidates: list[dict], decide: Decider, catalog: Catalog, - log: ReviewLog, gate: Optional[GateConfig] = None) -> ReviewResult: + log: ReviewLog, gate: Optional[GateConfig] = None, + recorder: Optional[DecisionRecorder] = None) -> ReviewResult: """Run each candidate through ``decide``; promote approvals into ``catalog``. When a ``gate`` (T04 evidence bar) is supplied, the promoted pattern's ``status``/``distribution_ready`` are set from the gate evaluation, so an approved-but-thin candidate lands as ``provisional`` rather than - distribution-ready. + distribution-ready. When a ``recorder`` (T05) is supplied, each final + promote/reject is logged as an auditable hub decision (queued if the hub is + down). """ result = ReviewResult() for cand in candidates: @@ -149,4 +153,6 @@ def review(candidates: list[dict], decide: Decider, catalog: Catalog, else: raise ValueError(f"unknown review action {action!r}") log.record(cand, action, rationale) + if recorder is not None: + recorder.record(cand, action, rationale) return result diff --git a/tests/test_curate_decisions.py b/tests/test_curate_decisions.py new file mode 100644 index 0000000..f5043e7 --- /dev/null +++ b/tests/test_curate_decisions.py @@ -0,0 +1,70 @@ +"""Hub decision integration tests (T05): payload shape + graceful queue/flush.""" + +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from session_memory.curate.catalog import Catalog # noqa: E402 +from session_memory.curate.decisions import DecisionRecorder, build_decision # noqa: E402 +from session_memory.curate.review import APPROVE, REJECT, ReviewLog, review # noqa: E402 + + +def _candidate(key="success:clean_pass:outcome"): + return {"key": key, "frequency": 18, "sessions": ["a", "b"], + "cost_impact": 9.0, "cross_flavor": True, "flavors": ["claude", "grok"]} + + +def test_build_decision_payload_shape(): + d = build_decision(_candidate(), "approve", "looks solid", workstream_id="ws-1") + assert d["decision_type"] == "made" + assert d["workstream_id"] == "ws-1" + assert "Promote" in d["title"] + assert d["rationale"] == "looks solid" + assert "success:clean_pass:outcome" in d["description"] + + +def test_sink_accepts_decision(tmp_path): + captured = [] + rec = DecisionRecorder(str(tmp_path / "q.jsonl"), sink=captured.append) + assert rec.record(_candidate(), "approve", "ok") is True + assert rec.pending() == [] + assert len(captured) == 1 + + +def test_queues_when_sink_down(tmp_path): + def boom(_): + raise RuntimeError("hub down") + + rec = DecisionRecorder(str(tmp_path / "q.jsonl"), sink=boom) + assert rec.record(_candidate(), "reject", "noise") is False + assert len(rec.pending()) == 1 + + +def test_no_sink_defaults_to_queue(tmp_path): + rec = DecisionRecorder(str(tmp_path / "q.jsonl")) + rec.record(_candidate(), "approve", "ok") + assert len(rec.pending()) == 1 + + +def test_flush_replays_queue(tmp_path): + rec = DecisionRecorder(str(tmp_path / "q.jsonl")) # offline -> queue + rec.record(_candidate("problem:abandoned:outcome"), "reject", "x") + rec.record(_candidate("success:clean_pass:outcome"), "approve", "y") + captured = [] + assert rec.flush(sink=captured.append) == 2 + assert rec.pending() == [] + assert len(captured) == 2 + + +def test_review_records_each_final_decision(tmp_path): + cat = Catalog(str(tmp_path / "catalog")) + log = ReviewLog(str(tmp_path / "reviews.jsonl")) + captured = [] + rec = DecisionRecorder(str(tmp_path / "q.jsonl"), sink=captured.append, workstream_id="ws") + cands = [_candidate("success:clean_pass:outcome"), _candidate("problem:abandoned:outcome")] + review(cands, lambda c: (APPROVE if "success" in c["key"] else REJECT, "r"), cat, log, + recorder=rec) + assert len(captured) == 2 + actions = sorted("Promote" in d["title"] for d in captured) + assert actions == [False, True] diff --git a/workplans/AGENTIC-WP-0004-session-memory-phase2.md b/workplans/AGENTIC-WP-0004-session-memory-phase2.md index 3fccfc4..578bcc3 100644 --- a/workplans/AGENTIC-WP-0004-session-memory-phase2.md +++ b/workplans/AGENTIC-WP-0004-session-memory-phase2.md @@ -112,7 +112,7 @@ catalog stays lean and agent context budgets are protected. Knobs live in ```task id: AGENTIC-WP-0004-T05 -status: todo +status: done priority: medium state_hub_task_id: "449f12d4-fae0-450d-873f-143b3a570b5a" ```