"""Hub decision integration tests (T05): payload shape + graceful queue/flush.""" import os import sys sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from session_memory.curate.catalog import Catalog # noqa: E402 from session_memory.curate.decisions import DecisionRecorder, build_decision # noqa: E402 from session_memory.curate.review import APPROVE, REJECT, ReviewLog, review # noqa: E402 def _candidate(key="success:clean_pass:outcome"): return {"key": key, "frequency": 18, "sessions": ["a", "b"], "cost_impact": 9.0, "cross_flavor": True, "flavors": ["claude", "grok"]} def test_build_decision_payload_shape(): d = build_decision(_candidate(), "approve", "looks solid", workstream_id="ws-1") assert d["decision_type"] == "made" assert d["workstream_id"] == "ws-1" assert "Promote" in d["title"] assert d["rationale"] == "looks solid" assert "success:clean_pass:outcome" in d["description"] def test_sink_accepts_decision(tmp_path): captured = [] rec = DecisionRecorder(str(tmp_path / "q.jsonl"), sink=captured.append) assert rec.record(_candidate(), "approve", "ok") is True assert rec.pending() == [] assert len(captured) == 1 def test_queues_when_sink_down(tmp_path): def boom(_): raise RuntimeError("hub down") rec = DecisionRecorder(str(tmp_path / "q.jsonl"), sink=boom) assert rec.record(_candidate(), "reject", "noise") is False assert len(rec.pending()) == 1 def test_no_sink_defaults_to_queue(tmp_path): rec = DecisionRecorder(str(tmp_path / "q.jsonl")) rec.record(_candidate(), "approve", "ok") assert len(rec.pending()) == 1 def test_flush_replays_queue(tmp_path): rec = DecisionRecorder(str(tmp_path / "q.jsonl")) # offline -> queue rec.record(_candidate("problem:abandoned:outcome"), "reject", "x") rec.record(_candidate("success:clean_pass:outcome"), "approve", "y") captured = [] assert rec.flush(sink=captured.append) == 2 assert rec.pending() == [] assert len(captured) == 2 def test_review_records_each_final_decision(tmp_path): cat = Catalog(str(tmp_path / "catalog")) log = ReviewLog(str(tmp_path / "reviews.jsonl")) captured = [] rec = DecisionRecorder(str(tmp_path / "q.jsonl"), sink=captured.append, workstream_id="ws") cands = [_candidate("success:clean_pass:outcome"), _candidate("problem:abandoned:outcome")] review(cands, lambda c: (APPROVE if "success" in c["key"] else REJECT, "r"), cat, log, recorder=rec) assert len(captured) == 2 actions = sorted("Promote" in d["title"] for d in captured) assert actions == [False, True]