session-memory Phase 1: Detect pipeline (T04-T07)

- detect/signals.py: pure extractors over digests (retry storm, repeated
  errors, budget overrun vs corpus p90, abandoned, clean pass, recovery)
- detect/cluster.py: deterministic clustering into candidate Patterns with
  evidence (sessions/repos/flavors/cost impact) + cross-flavor flagging
- detect/__main__.py: python -m session_memory.detect, ranked report
  (cross-flavor first) + --json; persists candidates to Tier 2 patterns table
- core/store.py: list_digests + save_patterns
- tests for signals, cluster, detect entrypoint

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-06 22:31:13 +02:00
parent 06767ef924
commit 436a96dcd8
9 changed files with 436 additions and 4 deletions

View File

@@ -223,6 +223,22 @@ class Store:
row = self.db.execute("SELECT json FROM digests WHERE session_uid=?", (session_uid,)).fetchone()
return json.loads(row["json"]) if row else None
def list_digests(self) -> list[dict[str, Any]]:
return [json.loads(r["json"]) for r in self.db.execute("SELECT json FROM digests")]
def save_patterns(self, patterns: list[dict[str, Any]]) -> None:
"""Persist candidate patterns to a Tier 2 table (replace prior run)."""
self.db.execute(
"CREATE TABLE IF NOT EXISTS patterns ("
"key TEXT PRIMARY KEY, json TEXT NOT NULL, detected_at TEXT NOT NULL)"
)
self.db.execute("DELETE FROM patterns")
self.db.executemany(
"INSERT INTO patterns(key, json, detected_at) VALUES(?,?,?)",
[(p["key"], json.dumps(p, sort_keys=True), _now()) for p in patterns],
)
self.db.commit()
# ---- reads -------------------------------------------------------------
def get_session(self, session_uid: str) -> Optional[Session]:

View File

@@ -0,0 +1 @@
"""Detect: extract signals from sessions, cluster into candidate patterns."""

View File

@@ -0,0 +1,70 @@
"""Detect entrypoint (T07): digests -> signals -> clusters -> report.
python -m session_memory.detect [--config PATH] [--json] [--min-frequency N]
Reads Tier 2 digests from the store, extracts signals, clusters them into
candidate patterns, persists the candidates, and prints a ranked report
(cross-flavor first) — the input to the Curate phase (Phase 2).
"""
from __future__ import annotations
import argparse
import json
import os
from ..core.store import Store
from ..ingest import _expand, load_config
from .cluster import cluster
from .signals import extract_signals
def run_detect(config: dict, *, min_frequency: int = 2) -> list[dict]:
store_cfg = config.get("store", {})
store = Store(_expand(store_cfg["db_path"]), _expand(store_cfg["blob_dir"]))
digests = store.list_digests()
signals = extract_signals(digests)
patterns = [p.to_dict() for p in cluster(signals, min_frequency=min_frequency)]
store.save_patterns(patterns)
store.close()
return patterns
def _format_report(patterns: list[dict], n_digests: int) -> str:
lines = [f"# Candidate Patterns ({len(patterns)} from {n_digests} sessions)", ""]
if not patterns:
lines.append("No recurring patterns above the frequency threshold yet.")
return "\n".join(lines)
for i, p in enumerate(patterns, 1):
flag = " [CROSS-FLAVOR]" if p["cross_flavor"] else ""
lines.append(f"{i}. {p['title']}{flag}")
lines.append(f" score={p['score']} freq={p['frequency']} "
f"impact={p['cost_impact']} flavors={','.join(p['flavors'])}")
lines.append(f" repos={','.join(p['repos']) or '-'} "
f"sessions={len(p['sessions'])}")
lines.append("")
return "\n".join(lines)
def main(argv=None) -> int:
here = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
ap = argparse.ArgumentParser(description="Detect candidate patterns from session digests.")
ap.add_argument("--config", default=os.path.join(here, "config.toml"))
ap.add_argument("--min-frequency", type=int, default=2)
ap.add_argument("--json", action="store_true", help="emit machine-readable JSON")
args = ap.parse_args(argv)
config = load_config(args.config)
store_cfg = config.get("store", {})
n = len(Store(_expand(store_cfg["db_path"]), _expand(store_cfg["blob_dir"])).list_digests())
patterns = run_detect(config, min_frequency=args.min_frequency)
if args.json:
print(json.dumps(patterns, indent=2))
else:
print(_format_report(patterns, n))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,78 @@
"""Pattern clusterer + evidence (PRD §5, §6.2; T05/T06).
Groups recurring :class:`Signal`s into candidate ``Pattern`` records. Clustering
is deterministic and keyed on ``(polarity, signal-type, locus)`` — enough to
surface "the same thing keeps happening" without embeddings (a later option).
Each candidate carries evidence (FR-D3): supporting sessions, frequency, affected
repos, affected **flavors**, and an estimated cost-impact score. Candidates whose
evidence spans more than one flavor are flagged ``cross_flavor`` (FR-D4) — the
highest-value reuse targets.
"""
from __future__ import annotations
import collections
from dataclasses import asdict, dataclass, field
from typing import Any
from .signals import PROBLEM, Signal
@dataclass
class Pattern:
key: str # stable cluster key
polarity: str # problem | success
signal_type: str
locus: str
frequency: int # number of supporting signals
sessions: list[str] = field(default_factory=list)
repos: list[str] = field(default_factory=list)
flavors: list[str] = field(default_factory=list)
cross_flavor: bool = False
cost_impact: float = 0.0 # frequency-weighted magnitude
score: float = 0.0 # ranking score (impact x frequency)
title: str = ""
def to_dict(self) -> dict[str, Any]:
return asdict(self)
def _key(s: Signal) -> str:
return f"{s.polarity}:{s.type}:{s.locus}"
def _title(polarity: str, signal_type: str, n_flavors: int) -> str:
scope = "cross-flavor " if n_flavors > 1 else ""
verb = "problem" if polarity == PROBLEM else "success"
return f"{scope}{verb}: {signal_type.replace('_', ' ')}"
def cluster(signals: list[Signal], *, min_frequency: int = 2) -> list[Pattern]:
"""Group signals into candidate patterns; keep clusters >= min_frequency."""
groups: dict[str, list[Signal]] = collections.defaultdict(list)
for s in signals:
groups[_key(s)].append(s)
patterns: list[Pattern] = []
for key, members in groups.items():
if len(members) < min_frequency:
continue
sessions = sorted({m.session_uid for m in members})
repos = sorted({m.repo for m in members if m.repo})
flavors = sorted({m.flavor for m in members})
cost_impact = sum(m.magnitude for m in members)
first = members[0]
p = Pattern(
key=key, polarity=first.polarity, signal_type=first.type, locus=first.locus,
frequency=len(members), sessions=sessions, repos=repos, flavors=flavors,
cross_flavor=len(flavors) > 1, cost_impact=round(cost_impact, 3),
title=_title(first.polarity, first.type, len(flavors)),
)
# rank: impact x frequency, with a boost for cross-flavor reuse value
p.score = round(p.cost_impact * p.frequency * (1.5 if p.cross_flavor else 1.0), 3)
patterns.append(p)
# cross-flavor first, then by score
patterns.sort(key=lambda p: (not p.cross_flavor, -p.score))
return patterns

View File

@@ -0,0 +1,116 @@
"""Signal extractors (PRD §6.2; T04).
Pure functions over a session digest (Tier 2) — the compact, durable view. Each
extractor emits zero or more :class:`Signal`s. A signal records its source
session, a *locus* (what it's about), a *polarity* (problem vs. success), and a
*magnitude*. Signals are the atoms the clusterer groups into candidate patterns.
No new capture happens here; everything is derived from digests already written
by the Capture layer, so detection is cheap and re-runnable.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
# polarity
PROBLEM = "problem"
SUCCESS = "success"
@dataclass
class Signal:
session_uid: str
flavor: str
repo: Optional[str]
type: str # e.g. "budget_overrun", "clean_pass"
polarity: str # PROBLEM | SUCCESS
locus: str # normalized subject key (tool, marker, ...)
magnitude: float = 1.0 # strength / cost weight
detail: dict[str, Any] = field(default_factory=dict)
# --- individual extractors --------------------------------------------------
# Each takes (digest, ctx) and returns a list[Signal]. ctx carries corpus-level
# stats (e.g. cost percentiles) so extractors can compare a session to its peers.
def _base(digest, type_, polarity, locus, magnitude=1.0, **detail) -> Signal:
return Signal(
session_uid=digest["session_uid"], flavor=digest["flavor"],
repo=digest.get("repo"), type=type_, polarity=polarity, locus=locus,
magnitude=magnitude, detail=detail,
)
def sig_retry_storm(digest, ctx) -> list[Signal]:
retries = digest.get("markers", {}).get("retries", 0)
if retries >= ctx.get("retry_storm_threshold", 3):
return [_base(digest, "retry_storm", PROBLEM, "retries", float(retries), retries=retries)]
return []
def sig_repeated_errors(digest, ctx) -> list[Signal]:
errors = digest.get("markers", {}).get("errors", 0)
if errors >= ctx.get("error_threshold", 3):
return [_base(digest, "repeated_errors", PROBLEM, "errors", float(errors), errors=errors)]
return []
def sig_budget_overrun(digest, ctx) -> list[Signal]:
total = digest.get("cost", {}).get("input_tokens", 0) + digest.get("cost", {}).get("output_tokens", 0)
p90 = ctx.get("tokens_p90", 0)
if p90 and total > p90:
return [_base(digest, "budget_overrun", PROBLEM, "tokens",
float(total) / max(p90, 1), tokens=total, p90=p90)]
return []
def sig_abandoned(digest, ctx) -> list[Signal]:
if digest.get("outcome") == "abandoned":
return [_base(digest, "abandoned", PROBLEM, "outcome", 1.0)]
return []
def sig_clean_pass(digest, ctx) -> list[Signal]:
"""Success: ended success, ran tests, no errors, modest cost."""
m = digest.get("markers", {})
if (digest.get("outcome") == "success" and m.get("test_runs", 0) >= 1
and m.get("errors", 0) == 0 and m.get("retries", 0) == 0):
return [_base(digest, "clean_pass", SUCCESS, "outcome", 1.0,
test_runs=m.get("test_runs"))]
return []
def sig_error_then_recovery(digest, ctx) -> list[Signal]:
"""Success despite hitting errors — a recovery worth learning from."""
m = digest.get("markers", {})
if digest.get("outcome") == "success" and m.get("errors", 0) >= 1:
return [_base(digest, "error_then_recovery", SUCCESS, "errors",
float(m.get("errors", 1)), errors=m.get("errors"))]
return []
EXTRACTORS: list[Callable] = [
sig_retry_storm, sig_repeated_errors, sig_budget_overrun, sig_abandoned,
sig_clean_pass, sig_error_then_recovery,
]
def build_context(digests: list[dict]) -> dict[str, Any]:
"""Corpus-level stats so extractors can compare a session to its peers."""
totals = sorted(
d.get("cost", {}).get("input_tokens", 0) + d.get("cost", {}).get("output_tokens", 0)
for d in digests
)
p90 = totals[int(0.9 * (len(totals) - 1))] if totals else 0
return {"tokens_p90": p90, "retry_storm_threshold": 3, "error_threshold": 3}
def extract_signals(digests: list[dict], ctx: Optional[dict] = None) -> list[Signal]:
ctx = ctx or build_context(digests)
out: list[Signal] = []
for d in digests:
for ex in EXTRACTORS:
out.extend(ex(d, ctx))
return out

54
tests/test_cluster.py Normal file
View File

@@ -0,0 +1,54 @@
"""Clusterer + evidence + cross-flavor tests (T05/T06)."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.detect.cluster import cluster # noqa: E402
from session_memory.detect.signals import PROBLEM, SUCCESS, Signal # noqa: E402
def _sig(uid, flavor, repo, type_, polarity, locus, mag=1.0):
return Signal(session_uid=uid, flavor=flavor, repo=repo, type=type_,
polarity=polarity, locus=locus, magnitude=mag)
def test_min_frequency_filters_singletons():
sigs = [_sig("claude:a", "claude", "r1", "retry_storm", PROBLEM, "retries")]
assert cluster(sigs, min_frequency=2) == []
def test_clusters_recurring_signal_with_evidence():
sigs = [
_sig("claude:a", "claude", "r1", "retry_storm", PROBLEM, "retries", 5),
_sig("claude:b", "claude", "r2", "retry_storm", PROBLEM, "retries", 3),
]
pats = cluster(sigs, min_frequency=2)
assert len(pats) == 1
p = pats[0]
assert p.frequency == 2
assert p.sessions == ["claude:a", "claude:b"]
assert sorted(p.repos) == ["r1", "r2"]
assert p.flavors == ["claude"]
assert p.cross_flavor is False
assert p.cost_impact == 8.0
def test_cross_flavor_flagged_and_ranked_first():
sigs = [
# cross-flavor problem (claude + codex)
_sig("claude:a", "claude", "r1", "repeated_errors", PROBLEM, "errors", 3),
_sig("codex:b", "codex", "r2", "repeated_errors", PROBLEM, "errors", 3),
# single-flavor success cluster with higher raw impact
_sig("grok:c", "grok", "r3", "clean_pass", SUCCESS, "outcome", 5),
_sig("grok:d", "grok", "r4", "clean_pass", SUCCESS, "outcome", 5),
]
pats = cluster(sigs, min_frequency=2)
assert len(pats) == 2
xf = next(p for p in pats if p.signal_type == "repeated_errors")
assert xf.cross_flavor is True
assert sorted(xf.flavors) == ["claude", "codex"]
# cross-flavor pattern is ranked first even if another has higher raw impact
assert pats[0].cross_flavor is True
assert "cross-flavor" in pats[0].title

View File

@@ -0,0 +1,44 @@
"""Detect entrypoint tests (T07): end-to-end digests -> patterns, persisted."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.core.store import Store # noqa: E402
from session_memory.detect.__main__ import run_detect # noqa: E402
def _digest(uid, flavor, repo, **markers):
return {
"session_uid": uid, "flavor": flavor, "repo": repo, "outcome": "fail",
"cost": {"input_tokens": 10, "output_tokens": 1},
"markers": {"errors": markers.get("errors", 0), "retries": markers.get("retries", 0),
"test_runs": 0, "edits": 0, "human_interventions": 0},
}
def _config(tmp_path):
return {"store": {"db_path": str(tmp_path / ".store/m.db"),
"blob_dir": str(tmp_path / ".store/blobs"),
"cursor": str(tmp_path / ".store/c.json")}}
def test_run_detect_persists_cross_flavor_pattern(tmp_path):
cfg = _config(tmp_path)
st = Store(cfg["store"]["db_path"], cfg["store"]["blob_dir"])
# same problem (retry_storm) across two flavors -> cross-flavor candidate
st.write_digest("claude:a", _digest("claude:a", "claude", "r1", retries=5))
st.write_digest("codex:b", _digest("codex:b", "codex", "r2", retries=4))
st.close()
patterns = run_detect(cfg, min_frequency=2)
assert len(patterns) == 1
assert patterns[0]["cross_flavor"] is True
assert patterns[0]["signal_type"] == "retry_storm"
# persisted to the Tier 2 patterns table
st2 = Store(cfg["store"]["db_path"], cfg["store"]["blob_dir"])
rows = st2.db.execute("SELECT key FROM patterns").fetchall()
assert len(rows) == 1
st2.close()

53
tests/test_signals.py Normal file
View File

@@ -0,0 +1,53 @@
"""Signal extractor tests (T04)."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.detect.signals import ( # noqa: E402
PROBLEM, SUCCESS, build_context, extract_signals,
)
def _digest(uid, flavor="claude", repo="r", outcome="success", tokens=100,
errors=0, retries=0, test_runs=0):
return {
"session_uid": uid, "flavor": flavor, "repo": repo, "outcome": outcome,
"cost": {"input_tokens": tokens, "output_tokens": 0},
"markers": {"errors": errors, "retries": retries, "test_runs": test_runs,
"edits": 0, "human_interventions": 0},
}
def test_problem_signals():
digests = [
_digest("claude:a", retries=5, outcome="fail"),
_digest("claude:b", errors=4),
_digest("claude:c", outcome="abandoned"),
]
sigs = extract_signals(digests)
types = {(s.session_uid, s.type) for s in sigs}
assert ("claude:a", "retry_storm") in types
assert ("claude:b", "repeated_errors") in types
assert ("claude:c", "abandoned") in types
assert all(s.polarity == PROBLEM for s in sigs
if s.type in ("retry_storm", "repeated_errors", "abandoned"))
def test_success_signals():
sigs = extract_signals([_digest("grok:x", outcome="success", test_runs=2)])
assert any(s.type == "clean_pass" and s.polarity == SUCCESS for s in sigs)
rec = extract_signals([_digest("codex:y", outcome="success", errors=2)])
assert any(s.type == "error_then_recovery" and s.polarity == SUCCESS for s in rec)
def test_budget_overrun_uses_corpus_p90():
digests = [_digest(f"claude:{i}", tokens=100) for i in range(10)]
digests.append(_digest("claude:big", tokens=100000))
ctx = build_context(digests)
assert ctx["tokens_p90"] >= 100
sigs = extract_signals(digests, ctx)
overruns = [s for s in sigs if s.type == "budget_overrun"]
assert overruns and overruns[0].session_uid == "claude:big"

View File

@@ -84,7 +84,7 @@ identity. Verify event counts are additive and idempotent on re-run.
```task
id: AGENTIC-WP-0003-T04
status: todo
status: done
priority: high
state_hub_task_id: "20920c5d-16f7-43bb-9ed7-9afbfeaf7207"
```
@@ -100,7 +100,7 @@ digests; no new capture. Unit-tested on synthetic sessions.
```task
id: AGENTIC-WP-0003-T05
status: todo
status: done
priority: high
state_hub_task_id: "f42d57f6-34dc-4a92-bf6a-4d8eab572467"
```
@@ -115,7 +115,7 @@ frequency and member session lists.
```task
id: AGENTIC-WP-0003-T06
status: todo
status: done
priority: medium
state_hub_task_id: "8fd502d6-d138-4a42-acd5-6f5921859605"
```
@@ -130,7 +130,7 @@ reuse targets. Persist candidates to a Tier 2 `patterns` store/table.
```task
id: AGENTIC-WP-0003-T07
status: todo
status: done
priority: medium
state_hub_task_id: "34a96d5d-9165-4761-b91e-3643b0401410"
```