Files
tegwick 1b6081cd88 session-memory: denoise error fingerprints (WP-0006 follow-up)
Tighten _is_failed: exclude successful hub JSON responses (top-level no-error
payloads) and file-read snapshots (numbered cat -n source lines) that were
polluting error_snippets. JSON verdict classifies error vs success payloads
directly. Cuts distinct fingerprints 444 -> 269 (~40%) over the real corpus with
the top errors unchanged. Assessment caveat updated. 5 new tests; suite 102/102.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-07 13:39:08 +02:00

287 lines
10 KiB
Python

"""Session digest — Tier 1 -> Tier 2 promotion (design §3, §4; T04).
Compresses a session's events into a small, durable digest: outcome heuristic,
cost totals, tool histogram, and counts of error/retry/test/edit/human markers,
plus a few key snippets. Writing the digest sets ``analyzed_at``, which is what
makes a session evictable under budget-based retention (design §5).
Signal extraction beyond this digest is intentionally out of scope here — it
belongs to the Detect phase (PRD §6.2).
"""
from __future__ import annotations
import collections
import json
import re
from typing import Any
from .schema import Session, SessionEvent
# Substrings in tool_result bodies / summaries that suggest a failure.
_FAIL_HINTS = ("error", "failed", "exception", "traceback", "fatal", "non-zero")
# Substrings suggesting a clean test pass.
_PASS_HINTS = ("passed", "0 failed", "ok", "success")
# A line that is numbered source content from a Read result (`cat -n` style),
# e.g. "229\t raise InfospaceError(" — code text, never a runtime error.
_NUMBERED_LINE_RE = re.compile(r"^\s*\d+\t")
# Top-level keys that mark a JSON tool-result as an actual error (vs. success).
_JSON_ERROR_KEYS = ("error", "errors", "detail")
# Normalization patterns so the same error collapses to one fingerprint
# regardless of paths / ids / counts (WP-0006 T01).
_UUID_RE = re.compile(r"\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b", re.I)
_HEXADDR_RE = re.compile(r"\b0x[0-9a-f]+\b", re.I)
_PATH_RE = re.compile(r"(?:/[\w.\-]+)+/?|[A-Za-z]:\\[\w.\\\-]+")
_NUM_RE = re.compile(r"\b\d+\b")
_WS_RE = re.compile(r"\s+")
_ERR_SAMPLE_MAX = 200
_ERR_FP_MAX = 160
def infer_outcome(events: list[SessionEvent], blobs: dict[str, str] | None = None) -> str:
"""Heuristic outcome label across flavors (design OQ2).
- ``abandoned`` if the session has no assistant output at all.
- ``fail`` if the last substantive signal is an error / failing test.
- ``success`` if it ends on assistant output or a passing test.
- ``unknown`` otherwise.
"""
blobs = blobs or {}
assistant = [e for e in events if e.kind == "assistant_msg"]
if not assistant:
return "abandoned"
# Look at error and test signals; weight the latest ones.
last_fail = _last_index(events, lambda e: e.kind == "error")
last_test = _last_index(events, lambda e: e.kind == "test_run")
last_completion = _last_index(events, lambda e: e.kind in ("completion", "assistant_msg"))
test_passed = None
if last_test is not None:
# inspect the nearest following tool_result body for pass/fail hints
body = _nearby_result_body(events, last_test, blobs)
if body:
low = body.lower()
if any(h in low for h in _FAIL_HINTS):
test_passed = False
elif any(h in low for h in _PASS_HINTS):
test_passed = True
if test_passed is False and (last_test or 0) >= (last_completion or 0):
return "fail"
if last_fail is not None and last_completion is not None and last_fail > last_completion:
return "fail"
if test_passed is True:
return "success"
if last_completion is not None:
return "success"
return "unknown"
def build_digest(session: Session, events: list[SessionEvent],
blobs: dict[str, str] | None = None) -> dict[str, Any]:
"""Produce the compact Tier 2 digest dict for a session."""
blobs = blobs or {}
kind_counts = collections.Counter(e.kind for e in events)
tool_hist = collections.Counter(e.tool for e in events if e.tool)
retries = kind_counts.get("retry", 0)
outcome = infer_outcome(events, blobs)
return {
"session_uid": session.session_uid,
"flavor": session.flavor,
"repo": session.repo,
"domain": session.domain,
"model": session.model,
"started_at": session.started_at,
"ended_at": session.ended_at,
"outcome": outcome,
"cost": {
"input_tokens": session.cost.input_tokens,
"output_tokens": session.cost.output_tokens,
"cache_tokens": session.cost.cache_tokens,
"wall_clock_s": session.cost.wall_clock_s,
"turns": session.cost.turns,
"retries": retries,
},
"event_count": len(events),
"kind_counts": dict(kind_counts),
"tool_histogram": dict(tool_hist),
"markers": {
"errors": kind_counts.get("error", 0),
"retries": retries,
"test_runs": kind_counts.get("test_run", 0),
"edits": kind_counts.get("edit", 0),
"human_interventions": kind_counts.get("human_intervention", 0),
},
"first_prompt": _first_prompt(events, blobs),
"last_assistant": _last_assistant(events, blobs),
"error_snippets": _error_snippets(events, blobs),
"schema_version": session.schema_version,
}
def analyze(store, session_uid: str) -> dict[str, Any]:
"""Read a session from the store, write its digest, return the digest."""
session = store.get_session(session_uid)
if session is None:
raise KeyError(session_uid)
events = store.get_events(session_uid)
blobs = {e.payload_ref: _read_blob(store, e.payload_ref)
for e in events if e.payload_ref}
digest = build_digest(session, events, blobs)
store.write_digest(session_uid, digest)
return digest
# ---- helpers ---------------------------------------------------------------
def _last_index(events, pred):
idx = None
for i, e in enumerate(events):
if pred(e):
idx = i
return idx
def _nearby_result_body(events, idx, blobs):
for e in events[idx + 1: idx + 4]:
if e.kind == "tool_result" and e.payload_ref in blobs:
return blobs[e.payload_ref]
return None
def _first_prompt(events, blobs):
for e in events:
if e.kind == "user_msg":
return (blobs.get(e.payload_ref) or e.summary or "")[:280]
return None
def _last_assistant(events, blobs):
for e in reversed(events):
if e.kind == "assistant_msg":
return (blobs.get(e.payload_ref) or e.summary or "")[:280]
return None
def _error_line(text: str) -> str:
"""Pick the most error-like line from a body.
Prefers the *last* line matching a fail hint — in a Python traceback the
actual exception is the final line, while the bare ``Traceback (most recent
call last):`` header is just noise and is skipped.
"""
lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
matches = [ln for ln in lines
if any(h in ln.lower() for h in _FAIL_HINTS)
and not ln.lower().startswith("traceback")]
if matches:
return matches[-1]
# fall back to any fail-hint line (e.g. only the traceback header), else first
any_hint = [ln for ln in lines if any(h in ln.lower() for h in _FAIL_HINTS)]
return any_hint[-1] if any_hint else (lines[0] if lines else "")
def _error_fingerprint(text: str) -> str:
"""Stable, content-addressable key for an error, paths/ids/numbers removed."""
s = _error_line(text).lower()
s = _UUID_RE.sub("<uuid>", s)
s = _HEXADDR_RE.sub("<addr>", s)
s = _PATH_RE.sub("<path>", s)
s = _NUM_RE.sub("<n>", s)
return _WS_RE.sub(" ", s).strip()[:_ERR_FP_MAX]
def _error_body(event: SessionEvent, blobs: dict) -> str:
"""Best available text for a failed event."""
if event.payload_ref and event.payload_ref in blobs:
return blobs[event.payload_ref]
return event.summary or ""
def _looks_like_file_read(body: str) -> bool:
"""True if the body is mostly numbered source lines (a Read result), not an error."""
lines = [ln for ln in body.splitlines() if ln.strip()]
if not lines:
return False
numbered = sum(1 for ln in lines if _NUMBERED_LINE_RE.match(ln))
return numbered >= max(3, len(lines) // 2)
def _json_verdict(body: str):
"""Classify a JSON tool-result body: 'error', 'success', or None (not JSON).
Hub MCP successes look like ``{"result": "..."}`` and mention 'error' deep
inside summaries but are not failures ('success'). A payload with a top-level
error key (``{"detail": ...}`` / ``{"error": ...}``) is 'error'. Non-JSON text
returns None so the plain fail-hint heuristic still applies.
"""
s = body.strip()
if not s or s[0] not in "{[":
return None
try:
obj = json.loads(s)
except (ValueError, TypeError):
return None
if isinstance(obj, dict) and any(k in obj for k in _JSON_ERROR_KEYS):
return "error"
return "success"
def _is_failed(event: SessionEvent, blobs: dict) -> bool:
if event.kind == "error":
return True
if event.kind == "tool_result":
body = _error_body(event, blobs)
if not body.strip():
return False
if _looks_like_file_read(body):
return False
verdict = _json_verdict(body)
if verdict is not None:
return verdict == "error"
return any(h in body.lower() for h in _FAIL_HINTS)
return False
def _error_snippets(events: list[SessionEvent], blobs: dict) -> list[dict]:
"""Collapse a session's failures into deduped, normalized error fingerprints.
Durable in Tier 2 (the raw blobs may be evicted): each entry is
``{fingerprint, sample, count, tool}`` with same-fingerprint occurrences
counted. Ordered by frequency (then first appearance) for stable output.
"""
agg: dict[str, dict] = {}
order: list[str] = []
for e in events:
if not _is_failed(e, blobs):
continue
body = _error_body(e, blobs)
if not body.strip():
continue
fp = _error_fingerprint(body)
if not fp:
continue
if fp not in agg:
agg[fp] = {"fingerprint": fp, "sample": _error_line(body)[:_ERR_SAMPLE_MAX],
"count": 0, "tool": e.tool}
order.append(fp)
agg[fp]["count"] += 1
snippets = [agg[fp] for fp in order]
snippets.sort(key=lambda s: (-s["count"], order.index(s["fingerprint"])))
return snippets
def _read_blob(store, ref):
row = store.db.execute("SELECT path FROM blobs WHERE ref=?", (ref,)).fetchone()
if not row:
return ""
try:
with open(row["path"], "r", encoding="utf-8") as f:
return f.read()
except OSError:
return ""