From abb888f3efd72b7abc4e06f3f9aadd0de3d6be8b Mon Sep 17 00:00:00 2001 From: tegwick Date: Sat, 6 Jun 2026 20:03:04 +0200 Subject: [PATCH] session-memory Phase 0: session digest + outcome heuristic (T04) - session_memory/core/digest.py: build_digest (cost totals, kind/tool histograms, markers, snippets) + cross-flavor infer_outcome heuristic; analyze() promotes Tier1->Tier2 and sets analyzed_at (-> evictable) - tests/test_digest.py Co-Authored-By: Claude Opus 4.8 --- session_memory/core/digest.py | 159 ++++++++++++++++++ tests/test_digest.py | 82 +++++++++ .../AGENTIC-WP-0002-session-memory-phase0.md | 4 +- 3 files changed, 243 insertions(+), 2 deletions(-) create mode 100644 session_memory/core/digest.py create mode 100644 tests/test_digest.py diff --git a/session_memory/core/digest.py b/session_memory/core/digest.py new file mode 100644 index 0000000..2aa708d --- /dev/null +++ b/session_memory/core/digest.py @@ -0,0 +1,159 @@ +"""Session digest — Tier 1 -> Tier 2 promotion (design §3, §4; T04). + +Compresses a session's events into a small, durable digest: outcome heuristic, +cost totals, tool histogram, and counts of error/retry/test/edit/human markers, +plus a few key snippets. Writing the digest sets ``analyzed_at``, which is what +makes a session evictable under budget-based retention (design §5). + +Signal extraction beyond this digest is intentionally out of scope here — it +belongs to the Detect phase (PRD §6.2). +""" + +from __future__ import annotations + +import collections +from typing import Any + +from .schema import Session, SessionEvent + +# Substrings in tool_result bodies / summaries that suggest a failure. +_FAIL_HINTS = ("error", "failed", "exception", "traceback", "fatal", "non-zero") +# Substrings suggesting a clean test pass. +_PASS_HINTS = ("passed", "0 failed", "ok", "success") + + +def infer_outcome(events: list[SessionEvent], blobs: dict[str, str] | None = None) -> str: + """Heuristic outcome label across flavors (design OQ2). + + - ``abandoned`` if the session has no assistant output at all. + - ``fail`` if the last substantive signal is an error / failing test. + - ``success`` if it ends on assistant output or a passing test. + - ``unknown`` otherwise. + """ + blobs = blobs or {} + assistant = [e for e in events if e.kind == "assistant_msg"] + if not assistant: + return "abandoned" + + # Look at error and test signals; weight the latest ones. + last_fail = _last_index(events, lambda e: e.kind == "error") + last_test = _last_index(events, lambda e: e.kind == "test_run") + last_completion = _last_index(events, lambda e: e.kind in ("completion", "assistant_msg")) + + test_passed = None + if last_test is not None: + # inspect the nearest following tool_result body for pass/fail hints + body = _nearby_result_body(events, last_test, blobs) + if body: + low = body.lower() + if any(h in low for h in _FAIL_HINTS): + test_passed = False + elif any(h in low for h in _PASS_HINTS): + test_passed = True + + if test_passed is False and (last_test or 0) >= (last_completion or 0): + return "fail" + if last_fail is not None and last_completion is not None and last_fail > last_completion: + return "fail" + if test_passed is True: + return "success" + if last_completion is not None: + return "success" + return "unknown" + + +def build_digest(session: Session, events: list[SessionEvent], + blobs: dict[str, str] | None = None) -> dict[str, Any]: + """Produce the compact Tier 2 digest dict for a session.""" + blobs = blobs or {} + kind_counts = collections.Counter(e.kind for e in events) + tool_hist = collections.Counter(e.tool for e in events if e.tool) + retries = kind_counts.get("retry", 0) + outcome = infer_outcome(events, blobs) + + return { + "session_uid": session.session_uid, + "flavor": session.flavor, + "repo": session.repo, + "domain": session.domain, + "model": session.model, + "started_at": session.started_at, + "ended_at": session.ended_at, + "outcome": outcome, + "cost": { + "input_tokens": session.cost.input_tokens, + "output_tokens": session.cost.output_tokens, + "cache_tokens": session.cost.cache_tokens, + "wall_clock_s": session.cost.wall_clock_s, + "turns": session.cost.turns, + "retries": retries, + }, + "event_count": len(events), + "kind_counts": dict(kind_counts), + "tool_histogram": dict(tool_hist), + "markers": { + "errors": kind_counts.get("error", 0), + "retries": retries, + "test_runs": kind_counts.get("test_run", 0), + "edits": kind_counts.get("edit", 0), + "human_interventions": kind_counts.get("human_intervention", 0), + }, + "first_prompt": _first_prompt(events, blobs), + "last_assistant": _last_assistant(events, blobs), + "schema_version": session.schema_version, + } + + +def analyze(store, session_uid: str) -> dict[str, Any]: + """Read a session from the store, write its digest, return the digest.""" + session = store.get_session(session_uid) + if session is None: + raise KeyError(session_uid) + events = store.get_events(session_uid) + blobs = {e.payload_ref: _read_blob(store, e.payload_ref) + for e in events if e.payload_ref} + digest = build_digest(session, events, blobs) + store.write_digest(session_uid, digest) + return digest + + +# ---- helpers --------------------------------------------------------------- + +def _last_index(events, pred): + idx = None + for i, e in enumerate(events): + if pred(e): + idx = i + return idx + + +def _nearby_result_body(events, idx, blobs): + for e in events[idx + 1: idx + 4]: + if e.kind == "tool_result" and e.payload_ref in blobs: + return blobs[e.payload_ref] + return None + + +def _first_prompt(events, blobs): + for e in events: + if e.kind == "user_msg": + return (blobs.get(e.payload_ref) or e.summary or "")[:280] + return None + + +def _last_assistant(events, blobs): + for e in reversed(events): + if e.kind == "assistant_msg": + return (blobs.get(e.payload_ref) or e.summary or "")[:280] + return None + + +def _read_blob(store, ref): + row = store.db.execute("SELECT path FROM blobs WHERE ref=?", (ref,)).fetchone() + if not row: + return "" + try: + with open(row["path"], "r", encoding="utf-8") as f: + return f.read() + except OSError: + return "" diff --git a/tests/test_digest.py b/tests/test_digest.py new file mode 100644 index 0000000..49a2bd8 --- /dev/null +++ b/tests/test_digest.py @@ -0,0 +1,82 @@ +"""Digest tests (T04): outcome heuristic + Tier 2 promotion.""" + +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from session_memory.adapters.claude import Normalized # noqa: E402 +from session_memory.core.digest import analyze, build_digest, infer_outcome # noqa: E402 +from session_memory.core.schema import Cost, Session, SessionEvent # noqa: E402 +from session_memory.core.store import Store # noqa: E402 + + +def _ev(uid, seq, kind, **kw): + return SessionEvent(session_uid=uid, seq=seq, kind=kind, **kw) + + +def test_infer_outcome_abandoned(): + uid = "claude:s" + assert infer_outcome([_ev(uid, 0, "user_msg")]) == "abandoned" + + +def test_infer_outcome_success_on_passing_test(): + uid = "claude:s" + events = [ + _ev(uid, 0, "user_msg"), + _ev(uid, 1, "assistant_msg"), + _ev(uid, 2, "test_run", tool="Bash"), + _ev(uid, 3, "tool_result", payload_ref="b3"), + ] + assert infer_outcome(events, {"b3": "6 passed in 0.4s"}) == "success" + + +def test_infer_outcome_fail_on_failing_test(): + uid = "claude:s" + events = [ + _ev(uid, 0, "user_msg"), + _ev(uid, 1, "assistant_msg"), + _ev(uid, 2, "test_run", tool="Bash"), + _ev(uid, 3, "tool_result", payload_ref="b3"), + ] + assert infer_outcome(events, {"b3": "1 failed, traceback ..."}) == "fail" + + +def test_build_digest_histograms_and_markers(): + uid = "claude:s" + s = Session(session_uid=uid, flavor="claude", native_session_id="s", + repo="agentic-resources", cost=Cost(input_tokens=100, output_tokens=40, turns=2)) + events = [ + _ev(uid, 0, "user_msg"), + _ev(uid, 1, "edit", tool="Edit"), + _ev(uid, 2, "edit", tool="Write"), + _ev(uid, 3, "test_run", tool="Bash"), + _ev(uid, 4, "error"), + _ev(uid, 5, "assistant_msg"), + ] + d = build_digest(s, events) + assert d["tool_histogram"] == {"Edit": 1, "Write": 1, "Bash": 1} + assert d["markers"]["edits"] == 2 + assert d["markers"]["errors"] == 1 + assert d["markers"]["test_runs"] == 1 + assert d["event_count"] == 6 + assert d["cost"]["input_tokens"] == 100 + + +def test_analyze_writes_digest_and_sets_analyzed(tmp_path): + st = Store(str(tmp_path / "m.db"), str(tmp_path / "blobs")) + uid = Session.make_uid("claude", "s1") + s = Session(session_uid=uid, flavor="claude", native_session_id="s1") + events = [ + SessionEvent(session_uid=uid, seq=0, kind="user_msg", payload_ref="b0"), + SessionEvent(session_uid=uid, seq=1, kind="assistant_msg", payload_ref="b1"), + ] + blobs = {"b0": "please help", "b1": "done"} + st.ingest(Normalized(session=s, events=events, blobs=blobs)) + + assert st.get_session(uid).is_evictable is False + d = analyze(st, uid) + assert d["outcome"] == "success" + assert d["first_prompt"] == "please help" + assert st.get_session(uid).analyzed_at is not None + assert st.get_session(uid).is_evictable is True # now promoted -> evictable diff --git a/workplans/AGENTIC-WP-0002-session-memory-phase0.md b/workplans/AGENTIC-WP-0002-session-memory-phase0.md index 51bf876..6f1ba99 100644 --- a/workplans/AGENTIC-WP-0002-session-memory-phase0.md +++ b/workplans/AGENTIC-WP-0002-session-memory-phase0.md @@ -72,7 +72,7 @@ Tier 1 (rows + blobs) and Tier 2, used by retention. ```task id: AGENTIC-WP-0002-T04 -status: progress +status: done priority: medium state_hub_task_id: "017d8e90-633a-49f2-b342-8690938798cd" ``` @@ -86,7 +86,7 @@ Signal extraction beyond the digest stays stubbed for the Detect phase. ```task id: AGENTIC-WP-0002-T05 -status: todo +status: progress priority: high state_hub_task_id: "89177c79-528e-4023-a7eb-67f8e0276ba9" ```