"""Session digest — Tier 1 -> Tier 2 promotion (design §3, §4; T04). Compresses a session's events into a small, durable digest: outcome heuristic, cost totals, tool histogram, and counts of error/retry/test/edit/human markers, plus a few key snippets. Writing the digest sets ``analyzed_at``, which is what makes a session evictable under budget-based retention (design §5). Signal extraction beyond this digest is intentionally out of scope here — it belongs to the Detect phase (PRD §6.2). """ from __future__ import annotations import collections import json import re from typing import Any from .schema import Session, SessionEvent # Substrings in tool_result bodies / summaries that suggest a failure. _FAIL_HINTS = ("error", "failed", "exception", "traceback", "fatal", "non-zero") # Substrings suggesting a clean test pass. _PASS_HINTS = ("passed", "0 failed", "ok", "success") # A line that is numbered source content from a Read result (`cat -n` style), # e.g. "229\t raise InfospaceError(" — code text, never a runtime error. _NUMBERED_LINE_RE = re.compile(r"^\s*\d+\t") # Top-level keys that mark a JSON tool-result as an actual error (vs. success). _JSON_ERROR_KEYS = ("error", "errors", "detail") # Normalization patterns so the same error collapses to one fingerprint # regardless of paths / ids / counts (WP-0006 T01). _UUID_RE = re.compile(r"\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b", re.I) _HEXADDR_RE = re.compile(r"\b0x[0-9a-f]+\b", re.I) _PATH_RE = re.compile(r"(?:/[\w.\-]+)+/?|[A-Za-z]:\\[\w.\\\-]+") _NUM_RE = re.compile(r"\b\d+\b") _WS_RE = re.compile(r"\s+") _ERR_SAMPLE_MAX = 200 _ERR_FP_MAX = 160 def infer_outcome(events: list[SessionEvent], blobs: dict[str, str] | None = None) -> str: """Heuristic outcome label across flavors (design OQ2). - ``abandoned`` if the session has no assistant output at all. - ``fail`` if the last substantive signal is an error / failing test. - ``success`` if it ends on assistant output or a passing test. - ``unknown`` otherwise. """ blobs = blobs or {} assistant = [e for e in events if e.kind == "assistant_msg"] if not assistant: return "abandoned" # Look at error and test signals; weight the latest ones. last_fail = _last_index(events, lambda e: e.kind == "error") last_test = _last_index(events, lambda e: e.kind == "test_run") last_completion = _last_index(events, lambda e: e.kind in ("completion", "assistant_msg")) test_passed = None if last_test is not None: # inspect the nearest following tool_result body for pass/fail hints body = _nearby_result_body(events, last_test, blobs) if body: low = body.lower() if any(h in low for h in _FAIL_HINTS): test_passed = False elif any(h in low for h in _PASS_HINTS): test_passed = True if test_passed is False and (last_test or 0) >= (last_completion or 0): return "fail" if last_fail is not None and last_completion is not None and last_fail > last_completion: return "fail" if test_passed is True: return "success" if last_completion is not None: return "success" return "unknown" def build_digest(session: Session, events: list[SessionEvent], blobs: dict[str, str] | None = None) -> dict[str, Any]: """Produce the compact Tier 2 digest dict for a session.""" blobs = blobs or {} kind_counts = collections.Counter(e.kind for e in events) tool_hist = collections.Counter(e.tool for e in events if e.tool) retries = kind_counts.get("retry", 0) outcome = infer_outcome(events, blobs) return { "session_uid": session.session_uid, "flavor": session.flavor, "repo": session.repo, "domain": session.domain, "model": session.model, "started_at": session.started_at, "ended_at": session.ended_at, "outcome": outcome, "cost": { "input_tokens": session.cost.input_tokens, "output_tokens": session.cost.output_tokens, "cache_tokens": session.cost.cache_tokens, "wall_clock_s": session.cost.wall_clock_s, "turns": session.cost.turns, "retries": retries, }, "event_count": len(events), "kind_counts": dict(kind_counts), "tool_histogram": dict(tool_hist), "markers": { "errors": kind_counts.get("error", 0), "retries": retries, "test_runs": kind_counts.get("test_run", 0), "edits": kind_counts.get("edit", 0), "human_interventions": kind_counts.get("human_intervention", 0), }, "first_prompt": _first_prompt(events, blobs), "last_assistant": _last_assistant(events, blobs), "error_snippets": _error_snippets(events, blobs), "schema_version": session.schema_version, } def analyze(store, session_uid: str) -> dict[str, Any]: """Read a session from the store, write its digest, return the digest.""" session = store.get_session(session_uid) if session is None: raise KeyError(session_uid) events = store.get_events(session_uid) blobs = {e.payload_ref: _read_blob(store, e.payload_ref) for e in events if e.payload_ref} digest = build_digest(session, events, blobs) store.write_digest(session_uid, digest) return digest # ---- helpers --------------------------------------------------------------- def _last_index(events, pred): idx = None for i, e in enumerate(events): if pred(e): idx = i return idx def _nearby_result_body(events, idx, blobs): for e in events[idx + 1: idx + 4]: if e.kind == "tool_result" and e.payload_ref in blobs: return blobs[e.payload_ref] return None def _first_prompt(events, blobs): for e in events: if e.kind == "user_msg": return (blobs.get(e.payload_ref) or e.summary or "")[:280] return None def _last_assistant(events, blobs): for e in reversed(events): if e.kind == "assistant_msg": return (blobs.get(e.payload_ref) or e.summary or "")[:280] return None def _error_line(text: str) -> str: """Pick the most error-like line from a body. Prefers the *last* line matching a fail hint — in a Python traceback the actual exception is the final line, while the bare ``Traceback (most recent call last):`` header is just noise and is skipped. """ lines = [ln.strip() for ln in text.splitlines() if ln.strip()] matches = [ln for ln in lines if any(h in ln.lower() for h in _FAIL_HINTS) and not ln.lower().startswith("traceback")] if matches: return matches[-1] # fall back to any fail-hint line (e.g. only the traceback header), else first any_hint = [ln for ln in lines if any(h in ln.lower() for h in _FAIL_HINTS)] return any_hint[-1] if any_hint else (lines[0] if lines else "") def _error_fingerprint(text: str) -> str: """Stable, content-addressable key for an error, paths/ids/numbers removed.""" s = _error_line(text).lower() s = _UUID_RE.sub("", s) s = _HEXADDR_RE.sub("", s) s = _PATH_RE.sub("", s) s = _NUM_RE.sub("", s) return _WS_RE.sub(" ", s).strip()[:_ERR_FP_MAX] def _error_body(event: SessionEvent, blobs: dict) -> str: """Best available text for a failed event.""" if event.payload_ref and event.payload_ref in blobs: return blobs[event.payload_ref] return event.summary or "" def _looks_like_file_read(body: str) -> bool: """True if the body is mostly numbered source lines (a Read result), not an error.""" lines = [ln for ln in body.splitlines() if ln.strip()] if not lines: return False numbered = sum(1 for ln in lines if _NUMBERED_LINE_RE.match(ln)) return numbered >= max(3, len(lines) // 2) def _json_verdict(body: str): """Classify a JSON tool-result body: 'error', 'success', or None (not JSON). Hub MCP successes look like ``{"result": "..."}`` and mention 'error' deep inside summaries but are not failures ('success'). A payload with a top-level error key (``{"detail": ...}`` / ``{"error": ...}``) is 'error'. Non-JSON text returns None so the plain fail-hint heuristic still applies. """ s = body.strip() if not s or s[0] not in "{[": return None try: obj = json.loads(s) except (ValueError, TypeError): return None if isinstance(obj, dict) and any(k in obj for k in _JSON_ERROR_KEYS): return "error" return "success" def _is_failed(event: SessionEvent, blobs: dict) -> bool: if event.kind == "error": return True if event.kind == "tool_result": body = _error_body(event, blobs) if not body.strip(): return False if _looks_like_file_read(body): return False verdict = _json_verdict(body) if verdict is not None: return verdict == "error" return any(h in body.lower() for h in _FAIL_HINTS) return False def _error_snippets(events: list[SessionEvent], blobs: dict) -> list[dict]: """Collapse a session's failures into deduped, normalized error fingerprints. Durable in Tier 2 (the raw blobs may be evicted): each entry is ``{fingerprint, sample, count, tool}`` with same-fingerprint occurrences counted. Ordered by frequency (then first appearance) for stable output. """ agg: dict[str, dict] = {} order: list[str] = [] for e in events: if not _is_failed(e, blobs): continue body = _error_body(e, blobs) if not body.strip(): continue fp = _error_fingerprint(body) if not fp: continue if fp not in agg: agg[fp] = {"fingerprint": fp, "sample": _error_line(body)[:_ERR_SAMPLE_MAX], "count": 0, "tool": e.tool} order.append(fp) agg[fp]["count"] += 1 snippets = [agg[fp] for fp in order] snippets.sort(key=lambda s: (-s["count"], order.index(s["fingerprint"]))) return snippets def _read_blob(store, ref): row = store.db.execute("SELECT path FROM blobs WHERE ref=?", (ref,)).fetchone() if not row: return "" try: with open(row["path"], "r", encoding="utf-8") as f: return f.read() except OSError: return ""