diff --git a/session_memory/core/store.py b/session_memory/core/store.py index 87d7689..aa37bba 100644 --- a/session_memory/core/store.py +++ b/session_memory/core/store.py @@ -223,6 +223,22 @@ class Store: row = self.db.execute("SELECT json FROM digests WHERE session_uid=?", (session_uid,)).fetchone() return json.loads(row["json"]) if row else None + def list_digests(self) -> list[dict[str, Any]]: + return [json.loads(r["json"]) for r in self.db.execute("SELECT json FROM digests")] + + def save_patterns(self, patterns: list[dict[str, Any]]) -> None: + """Persist candidate patterns to a Tier 2 table (replace prior run).""" + self.db.execute( + "CREATE TABLE IF NOT EXISTS patterns (" + "key TEXT PRIMARY KEY, json TEXT NOT NULL, detected_at TEXT NOT NULL)" + ) + self.db.execute("DELETE FROM patterns") + self.db.executemany( + "INSERT INTO patterns(key, json, detected_at) VALUES(?,?,?)", + [(p["key"], json.dumps(p, sort_keys=True), _now()) for p in patterns], + ) + self.db.commit() + # ---- reads ------------------------------------------------------------- def get_session(self, session_uid: str) -> Optional[Session]: diff --git a/session_memory/detect/__init__.py b/session_memory/detect/__init__.py new file mode 100644 index 0000000..64c6177 --- /dev/null +++ b/session_memory/detect/__init__.py @@ -0,0 +1 @@ +"""Detect: extract signals from sessions, cluster into candidate patterns.""" diff --git a/session_memory/detect/__main__.py b/session_memory/detect/__main__.py new file mode 100644 index 0000000..270a17c --- /dev/null +++ b/session_memory/detect/__main__.py @@ -0,0 +1,70 @@ +"""Detect entrypoint (T07): digests -> signals -> clusters -> report. + + python -m session_memory.detect [--config PATH] [--json] [--min-frequency N] + +Reads Tier 2 digests from the store, extracts signals, clusters them into +candidate patterns, persists the candidates, and prints a ranked report +(cross-flavor first) — the input to the Curate phase (Phase 2). +""" + +from __future__ import annotations + +import argparse +import json +import os + +from ..core.store import Store +from ..ingest import _expand, load_config +from .cluster import cluster +from .signals import extract_signals + + +def run_detect(config: dict, *, min_frequency: int = 2) -> list[dict]: + store_cfg = config.get("store", {}) + store = Store(_expand(store_cfg["db_path"]), _expand(store_cfg["blob_dir"])) + digests = store.list_digests() + signals = extract_signals(digests) + patterns = [p.to_dict() for p in cluster(signals, min_frequency=min_frequency)] + store.save_patterns(patterns) + store.close() + return patterns + + +def _format_report(patterns: list[dict], n_digests: int) -> str: + lines = [f"# Candidate Patterns ({len(patterns)} from {n_digests} sessions)", ""] + if not patterns: + lines.append("No recurring patterns above the frequency threshold yet.") + return "\n".join(lines) + for i, p in enumerate(patterns, 1): + flag = " [CROSS-FLAVOR]" if p["cross_flavor"] else "" + lines.append(f"{i}. {p['title']}{flag}") + lines.append(f" score={p['score']} freq={p['frequency']} " + f"impact={p['cost_impact']} flavors={','.join(p['flavors'])}") + lines.append(f" repos={','.join(p['repos']) or '-'} " + f"sessions={len(p['sessions'])}") + lines.append("") + return "\n".join(lines) + + +def main(argv=None) -> int: + here = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + ap = argparse.ArgumentParser(description="Detect candidate patterns from session digests.") + ap.add_argument("--config", default=os.path.join(here, "config.toml")) + ap.add_argument("--min-frequency", type=int, default=2) + ap.add_argument("--json", action="store_true", help="emit machine-readable JSON") + args = ap.parse_args(argv) + + config = load_config(args.config) + store_cfg = config.get("store", {}) + n = len(Store(_expand(store_cfg["db_path"]), _expand(store_cfg["blob_dir"])).list_digests()) + patterns = run_detect(config, min_frequency=args.min_frequency) + + if args.json: + print(json.dumps(patterns, indent=2)) + else: + print(_format_report(patterns, n)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/session_memory/detect/cluster.py b/session_memory/detect/cluster.py new file mode 100644 index 0000000..63344f0 --- /dev/null +++ b/session_memory/detect/cluster.py @@ -0,0 +1,78 @@ +"""Pattern clusterer + evidence (PRD §5, §6.2; T05/T06). + +Groups recurring :class:`Signal`s into candidate ``Pattern`` records. Clustering +is deterministic and keyed on ``(polarity, signal-type, locus)`` — enough to +surface "the same thing keeps happening" without embeddings (a later option). + +Each candidate carries evidence (FR-D3): supporting sessions, frequency, affected +repos, affected **flavors**, and an estimated cost-impact score. Candidates whose +evidence spans more than one flavor are flagged ``cross_flavor`` (FR-D4) — the +highest-value reuse targets. +""" + +from __future__ import annotations + +import collections +from dataclasses import asdict, dataclass, field +from typing import Any + +from .signals import PROBLEM, Signal + + +@dataclass +class Pattern: + key: str # stable cluster key + polarity: str # problem | success + signal_type: str + locus: str + frequency: int # number of supporting signals + sessions: list[str] = field(default_factory=list) + repos: list[str] = field(default_factory=list) + flavors: list[str] = field(default_factory=list) + cross_flavor: bool = False + cost_impact: float = 0.0 # frequency-weighted magnitude + score: float = 0.0 # ranking score (impact x frequency) + title: str = "" + + def to_dict(self) -> dict[str, Any]: + return asdict(self) + + +def _key(s: Signal) -> str: + return f"{s.polarity}:{s.type}:{s.locus}" + + +def _title(polarity: str, signal_type: str, n_flavors: int) -> str: + scope = "cross-flavor " if n_flavors > 1 else "" + verb = "problem" if polarity == PROBLEM else "success" + return f"{scope}{verb}: {signal_type.replace('_', ' ')}" + + +def cluster(signals: list[Signal], *, min_frequency: int = 2) -> list[Pattern]: + """Group signals into candidate patterns; keep clusters >= min_frequency.""" + groups: dict[str, list[Signal]] = collections.defaultdict(list) + for s in signals: + groups[_key(s)].append(s) + + patterns: list[Pattern] = [] + for key, members in groups.items(): + if len(members) < min_frequency: + continue + sessions = sorted({m.session_uid for m in members}) + repos = sorted({m.repo for m in members if m.repo}) + flavors = sorted({m.flavor for m in members}) + cost_impact = sum(m.magnitude for m in members) + first = members[0] + p = Pattern( + key=key, polarity=first.polarity, signal_type=first.type, locus=first.locus, + frequency=len(members), sessions=sessions, repos=repos, flavors=flavors, + cross_flavor=len(flavors) > 1, cost_impact=round(cost_impact, 3), + title=_title(first.polarity, first.type, len(flavors)), + ) + # rank: impact x frequency, with a boost for cross-flavor reuse value + p.score = round(p.cost_impact * p.frequency * (1.5 if p.cross_flavor else 1.0), 3) + patterns.append(p) + + # cross-flavor first, then by score + patterns.sort(key=lambda p: (not p.cross_flavor, -p.score)) + return patterns diff --git a/session_memory/detect/signals.py b/session_memory/detect/signals.py new file mode 100644 index 0000000..8429cb8 --- /dev/null +++ b/session_memory/detect/signals.py @@ -0,0 +1,116 @@ +"""Signal extractors (PRD §6.2; T04). + +Pure functions over a session digest (Tier 2) — the compact, durable view. Each +extractor emits zero or more :class:`Signal`s. A signal records its source +session, a *locus* (what it's about), a *polarity* (problem vs. success), and a +*magnitude*. Signals are the atoms the clusterer groups into candidate patterns. + +No new capture happens here; everything is derived from digests already written +by the Capture layer, so detection is cheap and re-runnable. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Callable, Optional + +# polarity +PROBLEM = "problem" +SUCCESS = "success" + + +@dataclass +class Signal: + session_uid: str + flavor: str + repo: Optional[str] + type: str # e.g. "budget_overrun", "clean_pass" + polarity: str # PROBLEM | SUCCESS + locus: str # normalized subject key (tool, marker, ...) + magnitude: float = 1.0 # strength / cost weight + detail: dict[str, Any] = field(default_factory=dict) + + +# --- individual extractors -------------------------------------------------- +# Each takes (digest, ctx) and returns a list[Signal]. ctx carries corpus-level +# stats (e.g. cost percentiles) so extractors can compare a session to its peers. + +def _base(digest, type_, polarity, locus, magnitude=1.0, **detail) -> Signal: + return Signal( + session_uid=digest["session_uid"], flavor=digest["flavor"], + repo=digest.get("repo"), type=type_, polarity=polarity, locus=locus, + magnitude=magnitude, detail=detail, + ) + + +def sig_retry_storm(digest, ctx) -> list[Signal]: + retries = digest.get("markers", {}).get("retries", 0) + if retries >= ctx.get("retry_storm_threshold", 3): + return [_base(digest, "retry_storm", PROBLEM, "retries", float(retries), retries=retries)] + return [] + + +def sig_repeated_errors(digest, ctx) -> list[Signal]: + errors = digest.get("markers", {}).get("errors", 0) + if errors >= ctx.get("error_threshold", 3): + return [_base(digest, "repeated_errors", PROBLEM, "errors", float(errors), errors=errors)] + return [] + + +def sig_budget_overrun(digest, ctx) -> list[Signal]: + total = digest.get("cost", {}).get("input_tokens", 0) + digest.get("cost", {}).get("output_tokens", 0) + p90 = ctx.get("tokens_p90", 0) + if p90 and total > p90: + return [_base(digest, "budget_overrun", PROBLEM, "tokens", + float(total) / max(p90, 1), tokens=total, p90=p90)] + return [] + + +def sig_abandoned(digest, ctx) -> list[Signal]: + if digest.get("outcome") == "abandoned": + return [_base(digest, "abandoned", PROBLEM, "outcome", 1.0)] + return [] + + +def sig_clean_pass(digest, ctx) -> list[Signal]: + """Success: ended success, ran tests, no errors, modest cost.""" + m = digest.get("markers", {}) + if (digest.get("outcome") == "success" and m.get("test_runs", 0) >= 1 + and m.get("errors", 0) == 0 and m.get("retries", 0) == 0): + return [_base(digest, "clean_pass", SUCCESS, "outcome", 1.0, + test_runs=m.get("test_runs"))] + return [] + + +def sig_error_then_recovery(digest, ctx) -> list[Signal]: + """Success despite hitting errors — a recovery worth learning from.""" + m = digest.get("markers", {}) + if digest.get("outcome") == "success" and m.get("errors", 0) >= 1: + return [_base(digest, "error_then_recovery", SUCCESS, "errors", + float(m.get("errors", 1)), errors=m.get("errors"))] + return [] + + +EXTRACTORS: list[Callable] = [ + sig_retry_storm, sig_repeated_errors, sig_budget_overrun, sig_abandoned, + sig_clean_pass, sig_error_then_recovery, +] + + +def build_context(digests: list[dict]) -> dict[str, Any]: + """Corpus-level stats so extractors can compare a session to its peers.""" + totals = sorted( + d.get("cost", {}).get("input_tokens", 0) + d.get("cost", {}).get("output_tokens", 0) + for d in digests + ) + p90 = totals[int(0.9 * (len(totals) - 1))] if totals else 0 + return {"tokens_p90": p90, "retry_storm_threshold": 3, "error_threshold": 3} + + +def extract_signals(digests: list[dict], ctx: Optional[dict] = None) -> list[Signal]: + ctx = ctx or build_context(digests) + out: list[Signal] = [] + for d in digests: + for ex in EXTRACTORS: + out.extend(ex(d, ctx)) + return out diff --git a/tests/test_cluster.py b/tests/test_cluster.py new file mode 100644 index 0000000..afa65e7 --- /dev/null +++ b/tests/test_cluster.py @@ -0,0 +1,54 @@ +"""Clusterer + evidence + cross-flavor tests (T05/T06).""" + +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from session_memory.detect.cluster import cluster # noqa: E402 +from session_memory.detect.signals import PROBLEM, SUCCESS, Signal # noqa: E402 + + +def _sig(uid, flavor, repo, type_, polarity, locus, mag=1.0): + return Signal(session_uid=uid, flavor=flavor, repo=repo, type=type_, + polarity=polarity, locus=locus, magnitude=mag) + + +def test_min_frequency_filters_singletons(): + sigs = [_sig("claude:a", "claude", "r1", "retry_storm", PROBLEM, "retries")] + assert cluster(sigs, min_frequency=2) == [] + + +def test_clusters_recurring_signal_with_evidence(): + sigs = [ + _sig("claude:a", "claude", "r1", "retry_storm", PROBLEM, "retries", 5), + _sig("claude:b", "claude", "r2", "retry_storm", PROBLEM, "retries", 3), + ] + pats = cluster(sigs, min_frequency=2) + assert len(pats) == 1 + p = pats[0] + assert p.frequency == 2 + assert p.sessions == ["claude:a", "claude:b"] + assert sorted(p.repos) == ["r1", "r2"] + assert p.flavors == ["claude"] + assert p.cross_flavor is False + assert p.cost_impact == 8.0 + + +def test_cross_flavor_flagged_and_ranked_first(): + sigs = [ + # cross-flavor problem (claude + codex) + _sig("claude:a", "claude", "r1", "repeated_errors", PROBLEM, "errors", 3), + _sig("codex:b", "codex", "r2", "repeated_errors", PROBLEM, "errors", 3), + # single-flavor success cluster with higher raw impact + _sig("grok:c", "grok", "r3", "clean_pass", SUCCESS, "outcome", 5), + _sig("grok:d", "grok", "r4", "clean_pass", SUCCESS, "outcome", 5), + ] + pats = cluster(sigs, min_frequency=2) + assert len(pats) == 2 + xf = next(p for p in pats if p.signal_type == "repeated_errors") + assert xf.cross_flavor is True + assert sorted(xf.flavors) == ["claude", "codex"] + # cross-flavor pattern is ranked first even if another has higher raw impact + assert pats[0].cross_flavor is True + assert "cross-flavor" in pats[0].title diff --git a/tests/test_detect_entrypoint.py b/tests/test_detect_entrypoint.py new file mode 100644 index 0000000..9cdc307 --- /dev/null +++ b/tests/test_detect_entrypoint.py @@ -0,0 +1,44 @@ +"""Detect entrypoint tests (T07): end-to-end digests -> patterns, persisted.""" + +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from session_memory.core.store import Store # noqa: E402 +from session_memory.detect.__main__ import run_detect # noqa: E402 + + +def _digest(uid, flavor, repo, **markers): + return { + "session_uid": uid, "flavor": flavor, "repo": repo, "outcome": "fail", + "cost": {"input_tokens": 10, "output_tokens": 1}, + "markers": {"errors": markers.get("errors", 0), "retries": markers.get("retries", 0), + "test_runs": 0, "edits": 0, "human_interventions": 0}, + } + + +def _config(tmp_path): + return {"store": {"db_path": str(tmp_path / ".store/m.db"), + "blob_dir": str(tmp_path / ".store/blobs"), + "cursor": str(tmp_path / ".store/c.json")}} + + +def test_run_detect_persists_cross_flavor_pattern(tmp_path): + cfg = _config(tmp_path) + st = Store(cfg["store"]["db_path"], cfg["store"]["blob_dir"]) + # same problem (retry_storm) across two flavors -> cross-flavor candidate + st.write_digest("claude:a", _digest("claude:a", "claude", "r1", retries=5)) + st.write_digest("codex:b", _digest("codex:b", "codex", "r2", retries=4)) + st.close() + + patterns = run_detect(cfg, min_frequency=2) + assert len(patterns) == 1 + assert patterns[0]["cross_flavor"] is True + assert patterns[0]["signal_type"] == "retry_storm" + + # persisted to the Tier 2 patterns table + st2 = Store(cfg["store"]["db_path"], cfg["store"]["blob_dir"]) + rows = st2.db.execute("SELECT key FROM patterns").fetchall() + assert len(rows) == 1 + st2.close() diff --git a/tests/test_signals.py b/tests/test_signals.py new file mode 100644 index 0000000..5957ea5 --- /dev/null +++ b/tests/test_signals.py @@ -0,0 +1,53 @@ +"""Signal extractor tests (T04).""" + +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from session_memory.detect.signals import ( # noqa: E402 + PROBLEM, SUCCESS, build_context, extract_signals, +) + + +def _digest(uid, flavor="claude", repo="r", outcome="success", tokens=100, + errors=0, retries=0, test_runs=0): + return { + "session_uid": uid, "flavor": flavor, "repo": repo, "outcome": outcome, + "cost": {"input_tokens": tokens, "output_tokens": 0}, + "markers": {"errors": errors, "retries": retries, "test_runs": test_runs, + "edits": 0, "human_interventions": 0}, + } + + +def test_problem_signals(): + digests = [ + _digest("claude:a", retries=5, outcome="fail"), + _digest("claude:b", errors=4), + _digest("claude:c", outcome="abandoned"), + ] + sigs = extract_signals(digests) + types = {(s.session_uid, s.type) for s in sigs} + assert ("claude:a", "retry_storm") in types + assert ("claude:b", "repeated_errors") in types + assert ("claude:c", "abandoned") in types + assert all(s.polarity == PROBLEM for s in sigs + if s.type in ("retry_storm", "repeated_errors", "abandoned")) + + +def test_success_signals(): + sigs = extract_signals([_digest("grok:x", outcome="success", test_runs=2)]) + assert any(s.type == "clean_pass" and s.polarity == SUCCESS for s in sigs) + + rec = extract_signals([_digest("codex:y", outcome="success", errors=2)]) + assert any(s.type == "error_then_recovery" and s.polarity == SUCCESS for s in rec) + + +def test_budget_overrun_uses_corpus_p90(): + digests = [_digest(f"claude:{i}", tokens=100) for i in range(10)] + digests.append(_digest("claude:big", tokens=100000)) + ctx = build_context(digests) + assert ctx["tokens_p90"] >= 100 + sigs = extract_signals(digests, ctx) + overruns = [s for s in sigs if s.type == "budget_overrun"] + assert overruns and overruns[0].session_uid == "claude:big" diff --git a/workplans/AGENTIC-WP-0003-session-memory-phase1.md b/workplans/AGENTIC-WP-0003-session-memory-phase1.md index eb4fe79..1ccfbfd 100644 --- a/workplans/AGENTIC-WP-0003-session-memory-phase1.md +++ b/workplans/AGENTIC-WP-0003-session-memory-phase1.md @@ -84,7 +84,7 @@ identity. Verify event counts are additive and idempotent on re-run. ```task id: AGENTIC-WP-0003-T04 -status: todo +status: done priority: high state_hub_task_id: "20920c5d-16f7-43bb-9ed7-9afbfeaf7207" ``` @@ -100,7 +100,7 @@ digests; no new capture. Unit-tested on synthetic sessions. ```task id: AGENTIC-WP-0003-T05 -status: todo +status: done priority: high state_hub_task_id: "f42d57f6-34dc-4a92-bf6a-4d8eab572467" ``` @@ -115,7 +115,7 @@ frequency and member session lists. ```task id: AGENTIC-WP-0003-T06 -status: todo +status: done priority: medium state_hub_task_id: "8fd502d6-d138-4a42-acd5-6f5921859605" ``` @@ -130,7 +130,7 @@ reuse targets. Persist candidates to a Tier 2 `patterns` store/table. ```task id: AGENTIC-WP-0003-T07 -status: todo +status: done priority: medium state_hub_task_id: "34a96d5d-9165-4761-b91e-3643b0401410" ```