diff --git a/session_memory/README.md b/session_memory/README.md index 24e15fb..65ec1e3 100644 --- a/session_memory/README.md +++ b/session_memory/README.md @@ -39,6 +39,9 @@ session_memory/ distribute/grok.py # native instruction renderer } different targets) distribute/proposals.py # scoping + proposed-not-applied output + active registry distribute/__main__.py # python -m session_memory.distribute + measure/metrics.py # fleet metrics + persisted baseline snapshots + measure/effect.py # before/after per-pattern effectiveness + measure/__main__.py # python -m session_memory.measure config.toml # store paths, retention caps, sources, repo->domain map, curate gate ``` @@ -141,6 +144,25 @@ python -m session_memory.distribute --json `distribute/active_patterns.json` records which pattern+version is proposed in which `(repo, flavor)` (FR-X4). +## Measure effectiveness (closing the loop) + +Track whether the fleet is getting cheaper / more reliable, and whether a +distributed pattern actually helped. + +```bash +python -m session_memory.measure --label "baseline" # snapshot + trend +python -m session_memory.measure --since 2026-06-07 # before/after a change +python -m session_memory.measure --no-save --json +``` + +- A **snapshot** (infra-overhead share, error rate, schema-thrash, token + percentiles, success rate) is appended to `measure/baselines.jsonl` to build a + trend (FR-M3). +- `--since DATE` splits sessions before/after a change and diffs the metrics, with + an `improved` verdict per metric (FR-M1/FR-M2) — so ineffective patterns can be + retired. Recorded pre-fix baseline (2026-06-07): 27 sessions, infra-overhead + median 11.7 %, error rate 0.96, schema-thrash 8 sessions. + ## Retention knobs (`[retention]` in config.toml) | Key | Meaning | @@ -174,4 +196,6 @@ python -m pytest # schema, adapters, store, digest, retention, ingest, - **Phase 3** (AGENTIC-WP-0007): Distribute — per-flavor distributor adapters render approved patterns into proposed (HITL) artifacts, scoped by repo/domain, with an active-pattern registry. -- **Next — Phase 4 (Measure)** closes the loop per the PRD. +- **Phase 4** (AGENTIC-WP-0009): Measure — fleet baseline/trend + before/after + per-pattern effectiveness. The Capture → Detect → Curate → Distribute → Measure + loop is closed. diff --git a/session_memory/config.toml b/session_memory/config.toml index 7458157..9ff43b1 100644 --- a/session_memory/config.toml +++ b/session_memory/config.toml @@ -39,6 +39,10 @@ min_substantive = 3 # require >= this many substantive (edit/read/shell) tool min_prompt_len = 25 # first prompt shorter than this is treated as trivial # Curate phase (AGENTIC-WP-0004): catalog location + promotion evidence bar. +# Measure phase (AGENTIC-WP-0009): persisted baseline/trend of fleet metrics. +[measure] +baselines = "session_memory/measure/baselines.jsonl" # timestamped metric snapshots (committed) + # Distribute phase (AGENTIC-WP-0007): where per-flavor proposals + the active # registry are written. Proposals are HITL — reviewed, never auto-applied. [distribute] diff --git a/session_memory/measure/__init__.py b/session_memory/measure/__init__.py new file mode 100644 index 0000000..3ced92d --- /dev/null +++ b/session_memory/measure/__init__.py @@ -0,0 +1,9 @@ +"""Measure phase (PRD §6.5) — the loop-closer. + + metrics.py fleet metrics + persisted baseline snapshots (T01) + effect.py before/after per-pattern effectiveness (T02) + __main__.py python -m session_memory.measure (T03) + +Computation over existing digests (reusing WP-0005 tool buckets + WP-0006 error +mining); no new capture. +""" diff --git a/session_memory/measure/__main__.py b/session_memory/measure/__main__.py new file mode 100644 index 0000000..8bc4041 --- /dev/null +++ b/session_memory/measure/__main__.py @@ -0,0 +1,101 @@ +"""Measure entrypoint (T03): fleet trend + per-pattern effectiveness. + + python -m session_memory.measure [--config PATH] [--label L] [--since DATE] + [--no-save] [--json] + +Computes current fleet metrics over the real (quality-filtered) sessions, appends +them to the baseline trend, and reports whether the fleet is getting cheaper / +more reliable over time (FR-M3). With ``--since DATE`` it also reports before/after +effectiveness around a change (FR-M1/FR-M2). +""" + +from __future__ import annotations + +import argparse +import json +import os + +from ..core.store import Store +from ..detect.quality import filter_real, quality_config +from ..ingest import _expand, load_config +from .effect import effectiveness +from .metrics import load_baselines, save_baseline, snapshot + +_TREND_KEYS = ("infra_overhead_share_median", "error_rate", "schema_thrash_sessions", + "tokens_p50", "success_rate") + + +def real_digests(config: dict) -> list[dict]: + s = config.get("store", {}) + store = Store(_expand(s["db_path"]), _expand(s["blob_dir"])) + out = filter_real(store.list_digests(), quality_config(config)) + store.close() + return out + + +def _fmt_trend(baselines: list[dict]) -> str: + if not baselines: + return " (no prior snapshots)" + lines = [] + recent = baselines[-5:] + for b in recent: + when = (b.get("captured_at") or "")[:10] + lbl = f" {b['label']}" if b.get("label") else "" + lines.append(f" {when}{lbl}: overhead_med={b.get('infra_overhead_share_median')} " + f"err_rate={b.get('error_rate')} schema_thrash={b.get('schema_thrash_sessions')} " + f"tok_p50={b.get('tokens_p50')} success={b.get('success_rate')} " + f"(n={b.get('n_sessions')})") + return "\n".join(lines) + + +def _report(current: dict, baselines: list[dict], eff: dict | None) -> str: + lines = [f"# Fleet metrics (n={current.get('n_sessions')} real sessions)"] + for k in _TREND_KEYS: + lines.append(f" {k} = {current.get(k)}") + lines.append("\n## Trend (recent snapshots)") + lines.append(_fmt_trend(baselines)) + if eff is not None: + lines.append(f"\n## Effectiveness since {eff['applied_at']} " + f"(before={eff['n_before']}, after={eff['n_after']})") + if eff["insufficient_data"]: + lines.append(" insufficient data on one side of the date") + else: + for k in _TREND_KEYS: + d = eff["deltas"].get(k, {}) + mark = {True: "improved", False: "worse", None: "—"}[d.get("improved")] + lines.append(f" {k}: {d.get('before')} -> {d.get('after')} " + f"({d.get('change'):+}) {mark}") + return "\n".join(lines) + + +def main(argv=None) -> int: + here = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + ap = argparse.ArgumentParser(description="Measure fleet metrics + per-pattern effectiveness.") + ap.add_argument("--config", default=os.path.join(here, "config.toml")) + ap.add_argument("--label", default="") + ap.add_argument("--since", default=None, help="ISO date for before/after effectiveness") + ap.add_argument("--no-save", action="store_true", help="don't append to the baseline trend") + ap.add_argument("--json", action="store_true") + args = ap.parse_args(argv) + + config = load_config(args.config) + digests = real_digests(config) + current = snapshot(digests, label=args.label) + + path = _expand(config.get("measure", {}).get("baselines", "session_memory/measure/baselines.jsonl")) + prior = load_baselines(path) + if not args.no_save: + save_baseline(current, path) + + eff = effectiveness(digests, args.since, label=args.label) if args.since else None + + if args.json: + print(json.dumps({"current": current, "trend": prior + [current], "effectiveness": eff}, + indent=2)) + else: + print(_report(current, prior + [current], eff)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/session_memory/measure/baselines.jsonl b/session_memory/measure/baselines.jsonl new file mode 100644 index 0000000..c9f950e --- /dev/null +++ b/session_memory/measure/baselines.jsonl @@ -0,0 +1 @@ +{"captured_at": "2026-06-07T13:30:14Z", "error_rate": 0.963, "infra_overhead_share_median": 0.117, "infra_overhead_share_p90": 0.261, "label": "phase4-baseline (pre-fixes)", "n_sessions": 27, "recurring_error_occurrences": 505, "schema_thrash_sessions": 8, "success_rate": 1.0, "tokens_p50": 250725, "tokens_p90": 1423966} diff --git a/session_memory/measure/effect.py b/session_memory/measure/effect.py new file mode 100644 index 0000000..c4e2466 --- /dev/null +++ b/session_memory/measure/effect.py @@ -0,0 +1,60 @@ +"""Before/after per-pattern effectiveness (PRD §6.5 FR-M1/FR-M2; T02). + +Given a change/pattern with an ``applied_at`` date, split sessions into *before* +and *after* by their start time, aggregate each side, and diff the headline +metrics — so we can say whether a distributed pattern (e.g. the Read-before-Edit +reflex, or the State Hub skill) actually moved the numbers, and retire it if not. +""" + +from __future__ import annotations + +from .metrics import aggregate + +# Metrics where a *lower* value after the change means improvement. +_LOWER_IS_BETTER = { + "infra_overhead_share_median", "infra_overhead_share_p90", "error_rate", + "recurring_error_occurrences", "schema_thrash_sessions", "tokens_p50", "tokens_p90", +} +# Metrics where a *higher* value is improvement. +_HIGHER_IS_BETTER = {"success_rate"} + + +def split_by_date(digests: list[dict], applied_at: str) -> tuple[list[dict], list[dict]]: + """Partition digests into (before, after) by ``started_at`` vs ``applied_at``.""" + before, after = [], [] + for d in digests: + ts = d.get("started_at") or "" + (after if ts and ts >= applied_at else before).append(d) + return before, after + + +def _delta(metric: str, before: float, after: float) -> dict: + change = round(after - before, 3) + if metric in _LOWER_IS_BETTER: + improved = change < 0 + elif metric in _HIGHER_IS_BETTER: + improved = change > 0 + else: + improved = None + return {"before": before, "after": after, "change": change, "improved": improved} + + +def effectiveness(digests: list[dict], applied_at: str, *, label: str = "") -> dict: + """Compare fleet metrics after ``applied_at`` against the prior period.""" + before, after = split_by_date(digests, applied_at) + b_agg, a_agg = aggregate(before), aggregate(after) + metrics = (_LOWER_IS_BETTER | _HIGHER_IS_BETTER) + deltas = {} + if before and after: + for m in metrics: + deltas[m] = _delta(m, b_agg.get(m, 0.0), a_agg.get(m, 0.0)) + return { + "label": label, + "applied_at": applied_at, + "n_before": len(before), + "n_after": len(after), + "before": b_agg, + "after": a_agg, + "deltas": deltas, + "insufficient_data": not (before and after), + } diff --git a/session_memory/measure/metrics.py b/session_memory/measure/metrics.py new file mode 100644 index 0000000..3f725aa --- /dev/null +++ b/session_memory/measure/metrics.py @@ -0,0 +1,102 @@ +"""Fleet metrics + persisted baselines (PRD §6.5 FR-M3; T01). + +Computes the headline health metrics of the captured corpus — the same quantities +the friction assessment reported — so they can be tracked over time and compared +before/after a change. Reuses :func:`detect.signals.tool_bucket` (WP-0005) and the +digest ``error_snippets`` (WP-0006); no new capture. + +A **baseline** is a timestamped metrics snapshot appended to a JSONL file, so +successive runs build a trend the entrypoint (T03) can chart. +""" + +from __future__ import annotations + +import collections +import json +import os +from datetime import datetime, timezone + +from ..detect.signals import tool_bucket + + +def _now() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def _pct(values: list[float], q: float) -> float: + if not values: + return 0.0 + s = sorted(values) + return round(s[int(q * (len(s) - 1))], 3) + + +def _median(values: list[float]) -> float: + return _pct(values, 0.5) + + +def _buckets(digest: dict) -> collections.Counter: + b: collections.Counter = collections.Counter() + for tool, n in (digest.get("tool_histogram") or {}).items(): + b[tool_bucket(tool)] += n + return b + + +def session_metrics(digest: dict) -> dict: + """Per-session metrics used to build fleet aggregates.""" + b = _buckets(digest) + total = sum(b.values()) or 1 + overhead = b["statehub_mcp"] + b["task_mgmt"] + b["schema_load"] + cost = digest.get("cost", {}) + tokens = cost.get("input_tokens", 0) + cost.get("output_tokens", 0) + return { + "infra_overhead_share": overhead / total, + "tool_calls": total, + "schema_load": b["schema_load"], + "error_occurrences": sum(s.get("count", 1) for s in (digest.get("error_snippets") or [])), + "has_error": bool(digest.get("error_snippets")), + "tokens": tokens, + "success": digest.get("outcome") == "success", + } + + +def aggregate(digests: list[dict], *, schema_thrash_threshold: int = 5) -> dict: + """Fleet-level metrics over a set of (already quality-filtered) digests.""" + per = [session_metrics(d) for d in digests] + n = len(per) + if n == 0: + return {"n_sessions": 0} + shares = [m["infra_overhead_share"] for m in per] + tokens = [m["tokens"] for m in per] + return { + "n_sessions": n, + "infra_overhead_share_median": _median(shares), + "infra_overhead_share_p90": _pct(shares, 0.9), + "error_rate": round(sum(m["has_error"] for m in per) / n, 3), + "recurring_error_occurrences": sum(m["error_occurrences"] for m in per), + "schema_thrash_sessions": sum(1 for m in per if m["schema_load"] >= schema_thrash_threshold), + "tokens_p50": _pct(tokens, 0.5), + "tokens_p90": _pct(tokens, 0.9), + "success_rate": round(sum(m["success"] for m in per) / n, 3), + } + + +def snapshot(digests: list[dict], *, label: str = "") -> dict: + m = aggregate(digests) + m["captured_at"] = _now() + m["label"] = label + return m + + +def save_baseline(metrics: dict, path: str) -> None: + """Append a metrics snapshot to the baseline JSONL trend file.""" + os.makedirs(os.path.dirname(path) or ".", exist_ok=True) + with open(path, "a", encoding="utf-8") as fh: + fh.write(json.dumps(metrics, sort_keys=True)) + fh.write("\n") + + +def load_baselines(path: str) -> list[dict]: + if not os.path.exists(path): + return [] + with open(path, encoding="utf-8") as fh: + return [json.loads(line) for line in fh if line.strip()] diff --git a/tests/test_measure_effect.py b/tests/test_measure_effect.py new file mode 100644 index 0000000..e7f961f --- /dev/null +++ b/tests/test_measure_effect.py @@ -0,0 +1,49 @@ +"""Before/after effectiveness tests (WP-0009 T02).""" + +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from session_memory.measure.effect import effectiveness, split_by_date # noqa: E402 + + +def _digest(ts, tools=None, errors=0, outcome="success"): + return { + "started_at": ts, "outcome": outcome, + "cost": {"input_tokens": 100, "output_tokens": 0}, + "tool_histogram": tools or {"Bash": 10}, + "error_snippets": [{"fingerprint": f"e{i}", "count": 1} for i in range(errors)], + } + + +def test_split_by_date(): + digs = [_digest("2026-06-01"), _digest("2026-06-05"), _digest("2026-06-10")] + before, after = split_by_date(digs, "2026-06-05") + assert len(before) == 1 and len(after) == 2 # >= applied_at goes to after + + +def test_effectiveness_detects_improvement(): + # before: lots of errors + hub overhead; after: clean + before = [_digest("2026-06-01", tools={"mcp__state-hub__x": 8, "Bash": 2}, errors=3) + for _ in range(3)] + after = [_digest("2026-06-10", tools={"Bash": 10}, errors=0) for _ in range(3)] + e = effectiveness(before + after, "2026-06-05", label="read-before-edit") + assert not e["insufficient_data"] + assert e["n_before"] == 3 and e["n_after"] == 3 + assert e["deltas"]["error_rate"]["improved"] is True + assert e["deltas"]["infra_overhead_share_median"]["improved"] is True + assert e["deltas"]["error_rate"]["change"] < 0 + + +def test_effectiveness_insufficient_data(): + e = effectiveness([_digest("2026-06-01")], "2026-06-05") + assert e["insufficient_data"] is True + assert e["deltas"] == {} + + +def test_success_rate_higher_is_better(): + before = [_digest("2026-06-01", outcome="fail") for _ in range(2)] + after = [_digest("2026-06-10", outcome="success") for _ in range(2)] + e = effectiveness(before + after, "2026-06-05") + assert e["deltas"]["success_rate"]["improved"] is True diff --git a/tests/test_measure_entrypoint.py b/tests/test_measure_entrypoint.py new file mode 100644 index 0000000..ec9e24f --- /dev/null +++ b/tests/test_measure_entrypoint.py @@ -0,0 +1,79 @@ +"""Measure entrypoint tests (WP-0009 T03).""" + +import json +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from session_memory.core.store import Store # noqa: E402 +from session_memory.measure.__main__ import main, real_digests # noqa: E402 +from session_memory.measure.metrics import load_baselines # noqa: E402 + + +def _digest(uid, ts, tools=None): + return { + "session_uid": uid, "flavor": "claude", "repo": "agentic-resources", + "outcome": "success", "started_at": ts, + "cost": {"input_tokens": 100, "output_tokens": 10}, + "event_count": 40, "first_prompt": "Implement the measure entrypoint cleanly", + "tool_histogram": tools or {"Bash": 20, "Edit": 12, "Read": 8}, + "error_snippets": [], + } + + +def _write_config(tmp_path) -> str: + store = tmp_path / ".store" + toml = tmp_path / "config.toml" + toml.write_text( + f'[store]\ndb_path = "{store / "m.db"}"\nblob_dir = "{store / "blobs"}"\n' + f'cursor = "{store / "c.json"}"\n' + f'[measure]\nbaselines = "{tmp_path / "baselines.jsonl"}"\n') + return str(toml), str(store) + + +def _seed(store_dir): + st = Store(os.path.join(store_dir, "m.db"), os.path.join(store_dir, "blobs")) + st.write_digest("claude:a", _digest("claude:a", "2026-06-01")) + st.write_digest("claude:b", _digest("claude:b", "2026-06-10", + tools={"mcp__state-hub__x": 18, "Bash": 8, "Edit": 4})) + st.close() + + +def test_real_digests_filters_and_loads(tmp_path): + cfg_path, store_dir = _write_config(tmp_path) + _seed(store_dir) + from session_memory.ingest import load_config + digs = real_digests(load_config(cfg_path)) + assert len(digs) == 2 + + +def test_main_writes_baseline_and_reports(tmp_path, capsys): + cfg_path, store_dir = _write_config(tmp_path) + _seed(store_dir) + rc = main(["--config", cfg_path, "--label", "first"]) + assert rc == 0 + out = capsys.readouterr().out + assert "Fleet metrics" in out + rows = load_baselines(str(tmp_path / "baselines.jsonl")) + assert len(rows) == 1 and rows[0]["label"] == "first" + + +def test_main_no_save_and_json(tmp_path, capsys): + cfg_path, store_dir = _write_config(tmp_path) + _seed(store_dir) + rc = main(["--config", cfg_path, "--no-save", "--json"]) + assert rc == 0 + data = json.loads(capsys.readouterr().out) + assert data["current"]["n_sessions"] == 2 + assert not os.path.exists(str(tmp_path / "baselines.jsonl")) + + +def test_main_effectiveness_since(tmp_path, capsys): + cfg_path, store_dir = _write_config(tmp_path) + _seed(store_dir) + rc = main(["--config", cfg_path, "--no-save", "--since", "2026-06-05", "--json"]) + assert rc == 0 + data = json.loads(capsys.readouterr().out) + assert data["effectiveness"]["n_before"] == 1 + assert data["effectiveness"]["n_after"] == 1 diff --git a/tests/test_measure_metrics.py b/tests/test_measure_metrics.py new file mode 100644 index 0000000..fbeac1a --- /dev/null +++ b/tests/test_measure_metrics.py @@ -0,0 +1,63 @@ +"""Fleet metrics + baseline tests (WP-0009 T01).""" + +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from session_memory.measure.metrics import ( # noqa: E402 + aggregate, + load_baselines, + save_baseline, + session_metrics, + snapshot, +) + + +def _digest(tools=None, errors=0, tokens=100, outcome="success"): + return { + "outcome": outcome, + "cost": {"input_tokens": tokens, "output_tokens": 0}, + "tool_histogram": tools or {"Bash": 10, "Edit": 5}, + "error_snippets": [{"fingerprint": f"e{i}", "count": 1} for i in range(errors)], + } + + +def test_session_metrics_overhead_and_errors(): + m = session_metrics(_digest(tools={"mcp__state-hub__create_task": 6, "Bash": 4}, errors=2)) + assert abs(m["infra_overhead_share"] - 0.6) < 1e-9 + assert m["error_occurrences"] == 2 + assert m["has_error"] is True + + +def test_aggregate_rates_and_percentiles(): + digs = [ + _digest(tools={"mcp__state-hub__x": 8, "Bash": 2}, errors=1, tokens=50), # 80% overhead + _digest(tools={"Bash": 9, "Edit": 1}, errors=0, tokens=200), # 0% overhead + _digest(tools={"ToolSearch": 6, "Bash": 4}, errors=0, tokens=100, outcome="fail"), + ] + a = aggregate(digs) + assert a["n_sessions"] == 3 + assert a["error_rate"] == round(1 / 3, 3) + assert a["success_rate"] == round(2 / 3, 3) + assert a["schema_thrash_sessions"] == 1 # the ToolSearch=6 session + assert 0 <= a["infra_overhead_share_median"] <= 1 + + +def test_aggregate_empty(): + assert aggregate([]) == {"n_sessions": 0} + + +def test_snapshot_has_timestamp_and_label(): + s = snapshot([_digest()], label="baseline") + assert s["label"] == "baseline" + assert "captured_at" in s and s["n_sessions"] == 1 + + +def test_baseline_roundtrip_appends(tmp_path): + path = str(tmp_path / "baselines.jsonl") + save_baseline(snapshot([_digest()], label="a"), path) + save_baseline(snapshot([_digest(), _digest()], label="b"), path) + rows = load_baselines(path) + assert [r["label"] for r in rows] == ["a", "b"] + assert rows[1]["n_sessions"] == 2 diff --git a/workplans/AGENTIC-WP-0009-session-memory-phase4.md b/workplans/AGENTIC-WP-0009-session-memory-phase4.md index 0342c06..6766a7c 100644 --- a/workplans/AGENTIC-WP-0009-session-memory-phase4.md +++ b/workplans/AGENTIC-WP-0009-session-memory-phase4.md @@ -4,7 +4,7 @@ type: workplan title: "Coding Session Memory — Phase 4 (Measure: effectiveness + fleet trend)" domain: helix_forge repo: agentic-resources -status: ready +status: finished owner: codex topic_slug: helix-forge created: "2026-06-07" @@ -27,7 +27,7 @@ this is computation over existing digests, not new capture. ```task id: AGENTIC-WP-0009-T01 -status: todo +status: done priority: high state_hub_task_id: "e5c2016a-2d51-4382-a013-7153e053e8ed" ``` @@ -41,7 +41,7 @@ percentiles) and persist a **timestamped baseline snapshot**. Reuses ```task id: AGENTIC-WP-0009-T02 -status: todo +status: done priority: high state_hub_task_id: "aa097a00-3462-41da-a137-67e1d61d8d33" ``` @@ -55,7 +55,7 @@ retired (FR-M1/FR-M2). Unit-tested. ```task id: AGENTIC-WP-0009-T03 -status: todo +status: done priority: medium state_hub_task_id: "f1147d59-2fb7-4d35-baec-b8f001bb9d62" ```