session-memory: error-body mining into digest (WP-0006 T01)

build_digest now extracts normalized error fingerprints + samples from failed
events (error kind + failing tool_result bodies) into a durable error_snippets
list — paths/numbers/uuids/addrs stripped so the same error collapses to one
fingerprint with a count; Python traceback header skipped in favour of the real
exception line. Durable in Tier 2 (survives Tier 1 eviction). SCHEMA_VERSION ->
2 (re-ingest needed to populate). 7 new tests; suite 95/95 green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-07 12:45:01 +02:00
parent dbd212d2b1
commit 97379e9658
4 changed files with 160 additions and 2 deletions

View File

@@ -12,6 +12,7 @@ belongs to the Detect phase (PRD §6.2).
from __future__ import annotations
import collections
import re
from typing import Any
from .schema import Session, SessionEvent
@@ -21,6 +22,16 @@ _FAIL_HINTS = ("error", "failed", "exception", "traceback", "fatal", "non-zero")
# Substrings suggesting a clean test pass.
_PASS_HINTS = ("passed", "0 failed", "ok", "success")
# Normalization patterns so the same error collapses to one fingerprint
# regardless of paths / ids / counts (WP-0006 T01).
_UUID_RE = re.compile(r"\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b", re.I)
_HEXADDR_RE = re.compile(r"\b0x[0-9a-f]+\b", re.I)
_PATH_RE = re.compile(r"(?:/[\w.\-]+)+/?|[A-Za-z]:\\[\w.\\\-]+")
_NUM_RE = re.compile(r"\b\d+\b")
_WS_RE = re.compile(r"\s+")
_ERR_SAMPLE_MAX = 200
_ERR_FP_MAX = 160
def infer_outcome(events: list[SessionEvent], blobs: dict[str, str] | None = None) -> str:
"""Heuristic outcome label across flavors (design OQ2).
@@ -100,6 +111,7 @@ def build_digest(session: Session, events: list[SessionEvent],
},
"first_prompt": _first_prompt(events, blobs),
"last_assistant": _last_assistant(events, blobs),
"error_snippets": _error_snippets(events, blobs),
"schema_version": session.schema_version,
}
@@ -148,6 +160,78 @@ def _last_assistant(events, blobs):
return None
def _error_line(text: str) -> str:
"""Pick the most error-like line from a body.
Prefers the *last* line matching a fail hint — in a Python traceback the
actual exception is the final line, while the bare ``Traceback (most recent
call last):`` header is just noise and is skipped.
"""
lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
matches = [ln for ln in lines
if any(h in ln.lower() for h in _FAIL_HINTS)
and not ln.lower().startswith("traceback")]
if matches:
return matches[-1]
# fall back to any fail-hint line (e.g. only the traceback header), else first
any_hint = [ln for ln in lines if any(h in ln.lower() for h in _FAIL_HINTS)]
return any_hint[-1] if any_hint else (lines[0] if lines else "")
def _error_fingerprint(text: str) -> str:
"""Stable, content-addressable key for an error, paths/ids/numbers removed."""
s = _error_line(text).lower()
s = _UUID_RE.sub("<uuid>", s)
s = _HEXADDR_RE.sub("<addr>", s)
s = _PATH_RE.sub("<path>", s)
s = _NUM_RE.sub("<n>", s)
return _WS_RE.sub(" ", s).strip()[:_ERR_FP_MAX]
def _error_body(event: SessionEvent, blobs: dict) -> str:
"""Best available text for a failed event."""
if event.payload_ref and event.payload_ref in blobs:
return blobs[event.payload_ref]
return event.summary or ""
def _is_failed(event: SessionEvent, blobs: dict) -> bool:
if event.kind == "error":
return True
if event.kind == "tool_result":
body = _error_body(event, blobs).lower()
return bool(body) and any(h in body for h in _FAIL_HINTS)
return False
def _error_snippets(events: list[SessionEvent], blobs: dict) -> list[dict]:
"""Collapse a session's failures into deduped, normalized error fingerprints.
Durable in Tier 2 (the raw blobs may be evicted): each entry is
``{fingerprint, sample, count, tool}`` with same-fingerprint occurrences
counted. Ordered by frequency (then first appearance) for stable output.
"""
agg: dict[str, dict] = {}
order: list[str] = []
for e in events:
if not _is_failed(e, blobs):
continue
body = _error_body(e, blobs)
if not body.strip():
continue
fp = _error_fingerprint(body)
if not fp:
continue
if fp not in agg:
agg[fp] = {"fingerprint": fp, "sample": _error_line(body)[:_ERR_SAMPLE_MAX],
"count": 0, "tool": e.tool}
order.append(fp)
agg[fp]["count"] += 1
snippets = [agg[fp] for fp in order]
snippets.sort(key=lambda s: (-s["count"], order.index(s["fingerprint"])))
return snippets
def _read_blob(store, ref):
row = store.db.execute("SELECT path FROM blobs WHERE ref=?", (ref,)).fetchone()
if not row:

View File

@@ -11,7 +11,7 @@ import json
from dataclasses import asdict, dataclass, field, fields
from typing import Any, Optional
SCHEMA_VERSION = 1
SCHEMA_VERSION = 2 # v2: digest carries error_snippets (WP-0006 T01)
# Supported agent flavors. ``session_uid`` is always "<flavor>:<native id>".
FLAVORS = ("claude", "codex", "grok")

View File

@@ -0,0 +1,74 @@
"""Error-body mining into the digest (WP-0006 T01)."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.core.digest import ( # noqa: E402
_error_fingerprint,
_error_snippets,
build_digest,
)
from session_memory.core.schema import SCHEMA_VERSION, Session, SessionEvent # noqa: E402
def _ev(seq, kind, **kw):
return SessionEvent(session_uid="claude:s", seq=seq, kind=kind, **kw)
def test_fingerprint_normalizes_paths_numbers_ids():
a = _error_fingerprint("ModuleNotFoundError: No module named 'foo' at /home/x/a.py:42")
b = _error_fingerprint("ModuleNotFoundError: No module named 'foo' at /srv/y/b.py:9991")
assert a == b # paths + line numbers stripped -> same fingerprint
assert "<path>" in a and "<n>" in a
def test_fingerprint_uuid_and_addr():
fp = _error_fingerprint("connection 0xDEADBEEF to 1972d1d9-fc35-4912-8126-1fe64cc51425 failed")
assert "<addr>" in fp and "<uuid>" in fp
def test_snippets_dedup_and_count():
blobs = {"b1": "Traceback...\nValueError: bad thing at /p/x.py:10",
"b2": "Traceback...\nValueError: bad thing at /q/y.py:99",
"b3": "KeyError: 'id'"}
events = [
_ev(0, "error", payload_ref="b1"),
_ev(1, "error", payload_ref="b2"), # same fingerprint as b1
_ev(2, "error", payload_ref="b3"),
]
snips = _error_snippets(events, blobs)
assert len(snips) == 2
top = snips[0]
assert top["count"] == 2 # the ValueError collapsed
assert "ValueError" in top["sample"]
def test_failed_tool_result_mined():
blobs = {"b1": "npm ERR! something failed with non-zero exit"}
events = [_ev(0, "tool_result", tool="Bash", payload_ref="b1")]
snips = _error_snippets(events, blobs)
assert len(snips) == 1
assert snips[0]["tool"] == "Bash"
def test_clean_tool_result_not_mined():
blobs = {"b1": "6 passed in 0.4s"}
events = [_ev(0, "tool_result", tool="Bash", payload_ref="b1")]
assert _error_snippets(events, blobs) == []
def test_build_digest_includes_error_snippets_and_v2():
s = Session(session_uid="claude:s", flavor="claude", native_session_id="s", repo="r")
events = [_ev(0, "user_msg"), _ev(1, "error", payload_ref="b1"), _ev(2, "assistant_msg")]
d = build_digest(s, events, {"b1": "RuntimeError: kaboom at /a/b.py:3"})
assert d["schema_version"] == SCHEMA_VERSION == 2
assert d["error_snippets"][0]["count"] == 1
assert "RuntimeError" in d["error_snippets"][0]["sample"]
def test_no_errors_empty_list():
s = Session(session_uid="claude:s", flavor="claude", native_session_id="s", repo="r")
d = build_digest(s, [_ev(0, "user_msg"), _ev(1, "assistant_msg")])
assert d["error_snippets"] == []

View File

@@ -31,7 +31,7 @@ event/blob bodies already ingested.
```task
id: AGENTIC-WP-0006-T01
status: todo
status: done
priority: high
state_hub_task_id: "136a0a73-61c2-4390-876c-de3880a967e6"
```