Compare commits

...

5 Commits

Author SHA1 Message Date
055713aa4f session-memory Phase 1: T08 verify across all three flavors + docs
Marks AGENTIC-WP-0003 finished. Full suite 40/40 green; live pipeline
over real local sessions (Codex via fixtures) surfaced 3 candidate
patterns, 2 cross-flavor (Claude+Grok) — PRD success metric met.
README documents the detect entrypoint and Phase 0/1/next status.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-06 23:39:37 +02:00
436a96dcd8 session-memory Phase 1: Detect pipeline (T04-T07)
- detect/signals.py: pure extractors over digests (retry storm, repeated
  errors, budget overrun vs corpus p90, abandoned, clean pass, recovery)
- detect/cluster.py: deterministic clustering into candidate Patterns with
  evidence (sessions/repos/flavors/cost impact) + cross-flavor flagging
- detect/__main__.py: python -m session_memory.detect, ranked report
  (cross-flavor first) + --json; persists candidates to Tier 2 patterns table
- core/store.py: list_digests + save_patterns
- tests for signals, cluster, detect entrypoint

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-06 22:31:13 +02:00
06767ef924 session-memory Phase 1: Grok adapter (T02)
- adapters/grok.py: reads the per-session dir (summary.json + chat_history.jsonl
  + events.jsonl + updates.jsonl); conversation from chat_history, lifecycle/
  turn from events, tool-call names paired in order from updates ACP stream
- registered in ingest dispatch; codex+grok sources enabled in config.toml
- tests/test_grok_adapter.py (synthetic + real local sessions)
- live multi-flavor dry-run discovers 89 sessions across flavors

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-06 22:12:30 +02:00
bc11cb9aec session-memory Phase 1: Codex adapter (T01) + multi-file merge (T03)
- adapters/common.py: shared Normalized + helpers (resolve_repo, classify_tool,
  jsonl iter, etc.); claude.py refactored to use it (Normalized re-exported)
- adapters/codex.py: rollout {timestamp,type,payload} parser; session_meta/
  response_item/event_msg mapping; flat call_id join; token_count cost;
  registered in ingest dispatch
- core/store.py: ingest() now merges multi-file sessions by content
  fingerprint, appends new events with offset seq (design OQ6); idempotent
- tests/test_codex_adapter.py, tests/test_merge.py

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-06 21:55:32 +02:00
5aea22f24f Register AGENTIC-WP-0003 (session-memory Phase 1) with State Hub
Codex + Grok adapters, multi-file session merge, and the Detect pipeline
(signals -> clustering -> evidence -> candidate report).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-06 21:50:23 +02:00
19 changed files with 1428 additions and 94 deletions

View File

@@ -13,13 +13,19 @@ time window.
```
session_memory/
adapters/claude.py # Tier0 -> Tier1 normalizer (Codex/Grok land in Phase 1)
adapters/common.py # shared Normalized bundle + helpers
adapters/claude.py # Tier0 -> Tier1 normalizers, one per flavor
adapters/codex.py # (rollout {timestamp,type,payload}, flat call_id join)
adapters/grok.py # (per-session dir: chat_history + events + updates)
core/schema.py # Session / SessionEvent / Cost
core/store.py # SQLite rows + blob-dir bodies (Tier1) + digests (Tier2)
core/store.py # SQLite rows + blob-dir bodies (Tier1) + digests/patterns (Tier2)
core/cursor.py # incremental ingest cursors
core/digest.py # Tier1 -> Tier2 promotion + outcome heuristic
core/retention.py # budget-based eviction sweep
ingest.py # one sweep: discover -> normalize -> store -> digest -> evict
detect/signals.py # signal extractors over digests
detect/cluster.py # cluster signals -> candidate patterns + cross-flavor flag
detect/__main__.py # python -m session_memory.detect (ranked report)
config.toml # store paths, retention caps, sources, repo->domain map
```
@@ -51,6 +57,20 @@ the sweep *runs*. Trigger it with the repo scheduler, e.g. daily:
or a cron entry / `/loop` on a timer. Push-capture (agent Stop/SessionEnd hooks)
can also enqueue a sweep; see design §7.
## Detect candidate patterns
After ingesting, mine the digests for recurring problem/success patterns:
```bash
python -m session_memory.detect # ranked report, cross-flavor first
python -m session_memory.detect --json # machine-readable candidates
python -m session_memory.detect --min-frequency 3
```
Candidates are persisted to a Tier 2 `patterns` table and are the input to the
Curate phase (Phase 2). Patterns whose evidence spans more than one agent flavor
are flagged `[CROSS-FLAVOR]` — the highest-value reuse targets.
## Retention knobs (`[retention]` in config.toml)
| Key | Meaning |
@@ -71,5 +91,9 @@ python -m pytest # 26 tests: schema, adapter, store, digest, retention,
## Status
Phase 0 (AGENTIC-WP-0002): Claude adapter only, end to end. Codex and Grok
adapters are designed (schemas confirmed in the design doc) and land in Phase 1.
- **Phase 0** (AGENTIC-WP-0002): schema, store, digest, budget retention, Claude
adapter, ingest sweep.
- **Phase 1** (AGENTIC-WP-0003): Codex + Grok adapters, multi-file session merge,
and the Detect pipeline (signals → clustering → cross-flavor candidate patterns).
- **Next — Phase 2 (Curate):** review/approve candidates into a versioned pattern
catalog. **Phase 3 (Distribute) / Phase 4 (Measure)** follow per the PRD.

View File

@@ -11,54 +11,23 @@ that the store persists out-of-line so Tier 1 rows stay light.
from __future__ import annotations
import json
import os
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Iterable, Optional
from typing import Any, Optional
from ..core.schema import Cost, Session, SessionEvent
from .common import ( # noqa: F401 (Normalized re-exported for back-compat)
Normalized,
classify_tool,
first_line as _first_line,
iter_jsonl as _iter_records,
now_iso as _now,
resolve_repo as _resolve_repo,
seconds_between as _seconds_between,
stringify as _stringify,
)
FLAVOR = "claude"
# tool_use names that mutate files -> kind "edit"
_EDIT_TOOLS = {"Edit", "Write", "NotebookEdit", "MultiEdit"}
# crude test-runner detection inside Bash commands -> kind "test_run"
_TEST_HINTS = ("pytest", "unittest", "npm test", "npm run test", "go test", "cargo test", "jest", "vitest")
@dataclass
class Normalized:
session: Session
events: list[SessionEvent]
blobs: dict[str, str] = field(default_factory=dict)
def _iter_records(path: str) -> Iterable[dict[str, Any]]:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError:
continue # tolerate partial/corrupt trailing lines
def _resolve_repo(cwd: Optional[str], repo_domain_map: dict[str, str]) -> tuple[Optional[str], Optional[str]]:
"""cwd -> (repo, domain). repo is the cwd basename; domain via map."""
if not cwd:
return None, None
repo = os.path.basename(cwd.rstrip("/")) or None
domain = repo_domain_map.get(repo) if repo else None
return repo, domain
def _is_test_command(text: str) -> bool:
low = text.lower()
return any(h in low for h in _TEST_HINTS)
def _content_blocks(message: dict[str, Any]) -> list[dict[str, Any]]:
content = message.get("content")
@@ -159,11 +128,8 @@ def parse_session(path: str, repo_domain_map: Optional[dict[str, str]] = None) -
name = b.get("name", "")
inp = b.get("input", {})
body = _stringify(inp)
kind = "tool_call"
if name in _EDIT_TOOLS:
kind = "edit"
elif name == "Bash" and _is_test_command(_stringify(inp.get("command", ""))):
kind = "test_run"
cmd = inp.get("command", "") if isinstance(inp, dict) else ""
kind = classify_tool(name, _stringify(cmd))
add_event(uuid, parent, ts, kind, role="assistant", tool=name,
summary=f"{name}", body=body, sidechain=sidechain)
@@ -194,35 +160,3 @@ def parse_session(path: str, repo_domain_map: Optional[dict[str, str]] = None) -
discovered_at=_now(),
)
return Normalized(session=session, events=events, blobs=blobs)
# ---- helpers ---------------------------------------------------------------
def _stringify(v: Any) -> str:
if v is None:
return ""
if isinstance(v, str):
return v
try:
return json.dumps(v, ensure_ascii=False)[:20000]
except (TypeError, ValueError):
return str(v)[:20000]
def _first_line(text: str) -> str:
return (text or "").strip().splitlines()[0] if (text or "").strip() else ""
def _seconds_between(start: Optional[str], end: Optional[str]) -> float:
if not start or not end:
return 0.0
try:
a = datetime.fromisoformat(start.replace("Z", "+00:00"))
b = datetime.fromisoformat(end.replace("Z", "+00:00"))
return max(0.0, (b - a).total_seconds())
except ValueError:
return 0.0
def _now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")

View File

@@ -0,0 +1,167 @@
"""OpenAI Codex CLI collector adapter — Tier 0 -> Tier 1 (design §2.2, §4.3).
Reads ``$CODEX_HOME/sessions/YYYY/MM/DD/rollout-*.jsonl``. Each line is a
``RolloutLine`` wrapper ``{timestamp, type, payload}``; ``type`` discriminates
``session_meta`` / ``response_item`` / ``event_msg`` / ``turn_context`` /
``compacted``.
Codex is **flat** — tool calls and outputs are joined only by ``call_id`` with no
parent-ref DAG — so ``seq`` is assigned by temporal (line) order and
``parent_seq`` is set for ``function_call_output`` back to its ``function_call``.
"""
from __future__ import annotations
import os
from typing import Any, Optional
from ..core.schema import Cost, Session, SessionEvent
from .common import (
Normalized,
classify_tool,
first_line,
iter_jsonl,
now_iso,
resolve_repo,
seconds_between,
stringify,
)
FLAVOR = "codex"
def _message_text(payload: dict[str, Any]) -> str:
content = payload.get("content")
if isinstance(content, str):
return content
parts = []
if isinstance(content, list):
for b in content:
if isinstance(b, dict):
parts.append(b.get("text") or b.get("output_text") or "")
elif isinstance(b, str):
parts.append(b)
return "\n".join(p for p in parts if p)
def _extract_tokens(payload: dict[str, Any]) -> tuple[int, int, int]:
"""Best-effort (input, output, cache) from a token_count payload.
Field shapes vary across Codex versions; probe known locations, else recurse.
"""
for scope in (payload, payload.get("info") or {}, payload.get("usage") or {},
(payload.get("info") or {}).get("total_token_usage") or {}):
if isinstance(scope, dict):
i = scope.get("input_tokens") or scope.get("prompt_tokens")
o = scope.get("output_tokens") or scope.get("completion_tokens")
if i is not None or o is not None:
cache = scope.get("cached_input_tokens") or scope.get("cache_read_input_tokens") or 0
return int(i or 0), int(o or 0), int(cache or 0)
return 0, 0, 0
def parse_session(path: str, repo_domain_map: Optional[dict[str, str]] = None) -> Optional[Normalized]:
repo_domain_map = repo_domain_map or {}
records = list(iter_jsonl(path))
if not records:
return None
session_id: Optional[str] = None
cwd = model = cli_version = None
timestamps: list[str] = []
events: list[SessionEvent] = []
blobs: dict[str, str] = {}
call_seq: dict[str, int] = {} # call_id -> seq of its function_call
cost = Cost()
seq = 0
def add_event(ts, kind, *, role=None, tool=None, summary=None, body=None,
tokens=0, parent_seq=None) -> int:
nonlocal seq
s = seq
seq += 1
payload_ref = None
if body:
payload_ref = f"blob://{session_id}/{s}"
blobs[payload_ref] = body
events.append(SessionEvent(
session_uid=Session.make_uid(FLAVOR, session_id or "unknown"),
seq=s, parent_seq=parent_seq, ts=ts, kind=kind, role=role, tool=tool,
summary=(summary or "")[:300] or None, payload_ref=payload_ref, tokens=tokens,
))
return s
for rec in records:
rtype = rec.get("type")
ts = rec.get("timestamp")
if ts:
timestamps.append(ts)
payload = rec.get("payload") or {}
if rtype == "session_meta":
session_id = session_id or payload.get("id")
cwd = cwd or payload.get("cwd")
model = model or payload.get("model")
cli_version = cli_version or payload.get("cli_version")
elif rtype == "turn_context":
model = model or payload.get("model")
elif rtype == "response_item":
ptype = payload.get("type")
if ptype == "message":
role = payload.get("role", "assistant")
text = _message_text(payload)
kind = "assistant_msg" if role == "assistant" else "user_msg"
add_event(ts, kind, role=role, summary=first_line(text), body=text)
elif ptype == "function_call":
name = payload.get("name", "")
args = stringify(payload.get("arguments"))
kind = classify_tool(name, args)
s = add_event(ts, kind, role="assistant", tool=name,
summary=name, body=args)
call_id = payload.get("call_id")
if call_id:
call_seq[call_id] = s
elif ptype == "function_call_output":
call_id = payload.get("call_id")
parent = call_seq.get(call_id)
body = stringify(payload.get("output"))
add_event(ts, "tool_result", role="tool", tool=None,
summary="tool result", body=body, parent_seq=parent)
elif ptype == "reasoning":
body = _message_text(payload) or stringify(payload.get("summary"))
add_event(ts, "thinking", role="assistant", summary="reasoning", body=body)
elif rtype == "event_msg":
ptype = payload.get("type")
if ptype == "task_started":
add_event(ts, "lifecycle", summary="task_started")
elif ptype == "task_complete":
add_event(ts, "completion", summary="task_complete")
elif ptype == "token_count":
i, o, c = _extract_tokens(payload)
cost.input_tokens += i
cost.output_tokens += o
cost.cache_tokens += c
# user_message / agent_message echoes are duplicated by response_item
# messages on modern Codex; skipped to avoid double counting.
if session_id is None:
return None
cost.turns = sum(1 for e in events if e.kind == "user_msg")
started = min(timestamps) if timestamps else None
ended = max(timestamps) if timestamps else None
cost.wall_clock_s = seconds_between(started, ended)
repo, domain = resolve_repo(cwd, repo_domain_map)
session = Session(
session_uid=Session.make_uid(FLAVOR, session_id),
flavor=FLAVOR, native_session_id=session_id,
repo=repo, domain=domain, cwd=cwd, model=model,
started_at=started, ended_at=ended, outcome="unknown", cost=cost,
source_path=path, source_bytes=os.path.getsize(path) if os.path.exists(path) else 0,
discovered_at=now_iso(),
)
return Normalized(session=session, events=events, blobs=blobs)

View File

@@ -0,0 +1,100 @@
"""Shared adapter helpers (Tier 0 -> Tier 1).
The ``Normalized`` bundle contract and small flavor-agnostic helpers used by every
collector adapter. Per-flavor parsing lives in the individual adapter modules.
"""
from __future__ import annotations
import json
import os
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Optional
from ..core.schema import Session, SessionEvent
# tool names that mutate files -> kind "edit" (union across flavors)
EDIT_TOOLS = {
"Edit", "Write", "NotebookEdit", "MultiEdit", # Claude
"apply_patch", "write_file", "edit_file", # Codex / Grok variants
}
# substrings in a shell/tool command that indicate a test run -> kind "test_run"
TEST_HINTS = (
"pytest", "unittest", "npm test", "npm run test", "go test",
"cargo test", "jest", "vitest", "make test", "tox",
)
@dataclass
class Normalized:
session: Session
events: list[SessionEvent]
blobs: dict[str, str] = field(default_factory=dict)
def resolve_repo(cwd: Optional[str], repo_domain_map: dict[str, str]) -> tuple[Optional[str], Optional[str]]:
"""cwd -> (repo, domain). repo is the cwd basename; domain via map."""
if not cwd:
return None, None
repo = os.path.basename(cwd.rstrip("/")) or None
domain = repo_domain_map.get(repo) if repo else None
return repo, domain
def is_test_command(text: str) -> bool:
low = (text or "").lower()
return any(h in low for h in TEST_HINTS)
def classify_tool(name: str, command_text: str = "") -> str:
"""Map a tool invocation to an event kind: edit | test_run | tool_call."""
if name in EDIT_TOOLS:
return "edit"
if is_test_command(command_text) or is_test_command(name):
return "test_run"
return "tool_call"
def stringify(v: Any, limit: int = 20000) -> str:
if v is None:
return ""
if isinstance(v, str):
return v[:limit]
try:
return json.dumps(v, ensure_ascii=False)[:limit]
except (TypeError, ValueError):
return str(v)[:limit]
def first_line(text: str) -> str:
t = (text or "").strip()
return t.splitlines()[0] if t else ""
def seconds_between(start: Optional[str], end: Optional[str]) -> float:
if not start or not end:
return 0.0
try:
a = datetime.fromisoformat(start.replace("Z", "+00:00"))
b = datetime.fromisoformat(end.replace("Z", "+00:00"))
return max(0.0, (b - a).total_seconds())
except ValueError:
return 0.0
def iter_jsonl(path: str):
"""Yield parsed JSON objects from a JSONL file, tolerating bad lines."""
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError:
continue
def now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")

View File

@@ -0,0 +1,182 @@
"""Grok CLI collector adapter — Tier 0 -> Tier 1 (design §2.3, §4.3).
A Grok session is a *directory* ``~/.grok/sessions/<enc-cwd>/<uuid>/`` containing
``summary.json`` (metadata), ``chat_history.jsonl`` (the canonical transcript),
``events.jsonl`` (explicit lifecycle + ``turn_number``), and ``updates.jsonl``
(ACP ``session/update`` stream, which carries tool-call names/args).
The ingest glob matches ``chat_history.jsonl``; this adapter derives its sibling
files from the same directory. Conversation order is taken from
``chat_history.jsonl``; tool-call names are paired, in order, from
``updates.jsonl`` ``tool_call`` entries to classify edits/test runs.
"""
from __future__ import annotations
import json
import os
from typing import Any, Optional
from ..core.schema import Cost, Session, SessionEvent
from .common import (
Normalized,
classify_tool,
first_line,
iter_jsonl,
now_iso,
resolve_repo,
seconds_between,
stringify,
)
FLAVOR = "grok"
def _text_content(content: Any) -> str:
if isinstance(content, str):
return content
if isinstance(content, list):
return "\n".join(
(b.get("text") or "") for b in content if isinstance(b, dict)
)
return ""
def _tool_calls_in_order(session_dir: str) -> list[dict[str, Any]]:
"""Ordered list of {title, rawInput} from updates.jsonl tool_call entries."""
calls: list[dict[str, Any]] = []
upd = os.path.join(session_dir, "updates.jsonl")
if not os.path.exists(upd):
return calls
for rec in iter_jsonl(upd):
u = (rec.get("params") or {}).get("update") or {}
if u.get("sessionUpdate") == "tool_call":
calls.append({"title": u.get("title") or "", "rawInput": u.get("rawInput") or {},
"id": u.get("toolCallId")})
return calls
def _session_meta(session_dir: str) -> dict[str, Any]:
p = os.path.join(session_dir, "summary.json")
if not os.path.exists(p):
return {}
try:
with open(p, "r", encoding="utf-8") as f:
return json.load(f)
except (OSError, ValueError):
return {}
def _lifecycle(session_dir: str) -> tuple[list[dict[str, Any]], Optional[str]]:
"""events.jsonl records + the model id seen there."""
evs, model = [], None
p = os.path.join(session_dir, "events.jsonl")
if os.path.exists(p):
for rec in iter_jsonl(p):
evs.append(rec)
model = model or rec.get("model_id")
return evs, model
def parse_session(path: str, repo_domain_map: Optional[dict[str, str]] = None) -> Optional[Normalized]:
repo_domain_map = repo_domain_map or {}
# accept either the chat_history.jsonl path or the session dir
session_dir = path if os.path.isdir(path) else os.path.dirname(path)
chat = os.path.join(session_dir, "chat_history.jsonl")
if not os.path.exists(chat):
return None
meta = _session_meta(session_dir)
info = meta.get("info") or {}
session_id = info.get("id") or os.path.basename(session_dir.rstrip("/"))
cwd = info.get("cwd") or meta.get("git_root_dir")
life_events, life_model = _lifecycle(session_dir)
model = meta.get("current_model_id") or life_model
pending_calls = _tool_calls_in_order(session_dir)
call_idx = 0
events: list[SessionEvent] = []
blobs: dict[str, str] = {}
seq = 0
def add(kind, *, role=None, tool=None, summary=None, body=None, parent_seq=None) -> int:
nonlocal seq
s = seq
seq += 1
ref = None
if body:
ref = f"blob://{session_id}/{s}"
blobs[ref] = body
events.append(SessionEvent(
session_uid=Session.make_uid(FLAVOR, session_id), seq=s, parent_seq=parent_seq,
ts=None, kind=kind, role=role, tool=tool,
summary=(summary or "")[:300] or None, payload_ref=ref,
))
return s
# explicit lifecycle first (turn_started/turn_ended carry no bodies)
for le in life_events:
t = le.get("type")
if t in ("turn_started", "loop_started", "turn_ended", "phase_changed"):
add("lifecycle", summary=t)
for rec in iter_jsonl(chat):
rtype = rec.get("type")
content = rec.get("content")
if rtype == "user":
text = _text_content(content)
if text.strip():
add("user_msg", role="user", summary=first_line(text), body=text)
elif rtype == "reasoning":
text = _text_content(content)
if text.strip():
add("thinking", role="assistant", summary="reasoning", body=text)
elif rtype == "assistant":
text = _text_content(content)
if text.strip():
add("assistant_msg", role="assistant", summary=first_line(text), body=text)
elif rtype == "tool_result":
# pair with the next tool_call (in order) to recover name/args
tool = None
parent = None
if call_idx < len(pending_calls):
call = pending_calls[call_idx]
call_idx += 1
tool = call["title"]
cmd = stringify(call["rawInput"])
kind = classify_tool(tool, cmd)
parent = add(kind, role="assistant", tool=tool, summary=tool, body=cmd)
body = _text_content(content) if not isinstance(content, str) else content
add("tool_result", role="tool", tool=tool, summary="tool result",
body=stringify(body), parent_seq=parent)
if not events:
return None
cost = Cost(turns=sum(1 for e in events if e.kind == "user_msg"))
started = info.get("created_at") or meta.get("created_at")
ended = meta.get("last_active_at") or info.get("updated_at") or meta.get("updated_at")
cost.wall_clock_s = seconds_between(started, ended)
repo, domain = resolve_repo(cwd, repo_domain_map)
session = Session(
session_uid=Session.make_uid(FLAVOR, session_id), flavor=FLAVOR,
native_session_id=session_id, repo=repo, domain=domain, cwd=cwd,
git_branch=meta.get("head_branch"), model=model,
started_at=started, ended_at=ended, outcome="unknown", cost=cost,
source_path=chat,
source_bytes=_dir_bytes(session_dir),
discovered_at=now_iso(),
)
return Normalized(session=session, events=events, blobs=blobs)
def _dir_bytes(d: str) -> int:
total = 0
for root, _, files in os.walk(d):
for f in files:
try:
total += os.path.getsize(os.path.join(root, f))
except OSError:
pass
return total

View File

@@ -20,14 +20,14 @@ root = "~/.claude/projects"
# glob, relative to root; covers sessions and agent-* sidechains
glob = "*/*.jsonl"
# Codex / Grok adapters land in Phase 1 (schemas confirmed in the design doc).
# Codex / Grok adapters added in Phase 1 (AGENTIC-WP-0003).
[sources.codex]
enabled = false
enabled = true
root = "~/.codex/sessions"
glob = "*/*/*/rollout-*.jsonl"
[sources.grok]
enabled = false
enabled = true
root = "~/.grok/sessions"
glob = "*/*/chat_history.jsonl"
@@ -37,3 +37,5 @@ agentic-resources = "helix_forge"
the-custodian = "custodian"
state-hub = "custodian"
ops-bridge = "custodian"
net-kingdom = "netkingdom"
can-you-assist = "coulomb_social"

View File

@@ -12,6 +12,7 @@ Tier 2 digest — the invariant that makes budget-based retention non-lossy.
from __future__ import annotations
import hashlib
import json
import os
import re
@@ -28,6 +29,18 @@ def _now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _fingerprint(ev: SessionEvent, body: Optional[str]) -> str:
"""Stable content fingerprint, independent of seq/payload_ref, for dedup."""
h = hashlib.sha1()
parts = [ev.ts or "", ev.kind, ev.role or "", ev.tool or "", ev.summary or "",
ev.role or "", str(ev.is_sidechain)]
h.update("\x1f".join(parts).encode("utf-8"))
if body is not None:
h.update(b"\x1e")
h.update(body.encode("utf-8"))
return h.hexdigest()
class Store:
def __init__(self, db_path: str, blob_dir: str):
self.db_path = db_path
@@ -121,14 +134,75 @@ class Store:
self.db.commit()
return total
def ingest(self, bundle) -> None:
"""Persist a full Normalized bundle (session + events + blobs)."""
def ingest(self, bundle) -> int:
"""Persist a Normalized bundle, merging into any existing session.
Multiple files can map to one ``session_uid`` (Claude resume/sidechains;
Grok multi-file dirs). Events are de-duplicated by content fingerprint and
genuinely-new events are appended with offset ``seq`` (design OQ6 / T03).
Returns the number of new events written. Idempotent: re-ingesting the
same bundle adds nothing.
"""
s = bundle.session
if s.ingested_at is None:
s.ingested_at = _now()
self.upsert_session(s)
self.upsert_events(bundle.events)
self.write_blobs(s.session_uid, bundle.blobs)
existing = self.get_session(s.session_uid)
if existing is None:
if s.ingested_at is None:
s.ingested_at = _now()
self.upsert_session(s)
# known fingerprints + current max seq for this session
seen = self._event_fingerprints(s.session_uid)
next_seq = self._max_seq(s.session_uid) + 1
new_events: list[SessionEvent] = []
new_blobs: dict[str, str] = {}
old_to_new: dict[int, int] = {}
for ev in bundle.events:
body = bundle.blobs.get(ev.payload_ref) if ev.payload_ref else None
fp = _fingerprint(ev, body)
if fp in seen:
continue # already stored (prior file or prior sweep)
new_seq = next_seq
next_seq += 1
old_to_new[ev.seq] = new_seq
# remap parent within this bundle; cross-file parents become None
parent = old_to_new.get(ev.parent_seq) if ev.parent_seq is not None else None
ref = None
if body is not None:
ref = f"blob://{s.session_uid}/{new_seq}"
new_blobs[ref] = body
merged = SessionEvent(
session_uid=s.session_uid, seq=new_seq, parent_seq=parent, ts=ev.ts,
kind=ev.kind, role=ev.role, tool=ev.tool, summary=ev.summary,
payload_ref=ref, tokens=ev.tokens, is_sidechain=ev.is_sidechain,
)
new_events.append(merged)
seen.add(fp)
if new_events:
self.upsert_events(new_events)
self.write_blobs(s.session_uid, new_blobs)
return len(new_events)
def _max_seq(self, session_uid: str) -> int:
row = self.db.execute(
"SELECT COALESCE(MAX(seq), -1) m FROM events WHERE session_uid=?", (session_uid,)
).fetchone()
return int(row["m"])
def _event_fingerprints(self, session_uid: str) -> set[str]:
fps: set[str] = set()
for e in self.get_events(session_uid):
body = None
if e.payload_ref:
r = self.db.execute("SELECT path FROM blobs WHERE ref=?", (e.payload_ref,)).fetchone()
if r:
try:
with open(r["path"], "r", encoding="utf-8") as f:
body = f.read()
except OSError:
body = None
fps.add(_fingerprint(e, body))
return fps
# ---- Tier 2 (digest) ---------------------------------------------------
@@ -149,6 +223,22 @@ class Store:
row = self.db.execute("SELECT json FROM digests WHERE session_uid=?", (session_uid,)).fetchone()
return json.loads(row["json"]) if row else None
def list_digests(self) -> list[dict[str, Any]]:
return [json.loads(r["json"]) for r in self.db.execute("SELECT json FROM digests")]
def save_patterns(self, patterns: list[dict[str, Any]]) -> None:
"""Persist candidate patterns to a Tier 2 table (replace prior run)."""
self.db.execute(
"CREATE TABLE IF NOT EXISTS patterns ("
"key TEXT PRIMARY KEY, json TEXT NOT NULL, detected_at TEXT NOT NULL)"
)
self.db.execute("DELETE FROM patterns")
self.db.executemany(
"INSERT INTO patterns(key, json, detected_at) VALUES(?,?,?)",
[(p["key"], json.dumps(p, sort_keys=True), _now()) for p in patterns],
)
self.db.commit()
# ---- reads -------------------------------------------------------------
def get_session(self, session_uid: str) -> Optional[Session]:

View File

@@ -0,0 +1 @@
"""Detect: extract signals from sessions, cluster into candidate patterns."""

View File

@@ -0,0 +1,70 @@
"""Detect entrypoint (T07): digests -> signals -> clusters -> report.
python -m session_memory.detect [--config PATH] [--json] [--min-frequency N]
Reads Tier 2 digests from the store, extracts signals, clusters them into
candidate patterns, persists the candidates, and prints a ranked report
(cross-flavor first) — the input to the Curate phase (Phase 2).
"""
from __future__ import annotations
import argparse
import json
import os
from ..core.store import Store
from ..ingest import _expand, load_config
from .cluster import cluster
from .signals import extract_signals
def run_detect(config: dict, *, min_frequency: int = 2) -> list[dict]:
store_cfg = config.get("store", {})
store = Store(_expand(store_cfg["db_path"]), _expand(store_cfg["blob_dir"]))
digests = store.list_digests()
signals = extract_signals(digests)
patterns = [p.to_dict() for p in cluster(signals, min_frequency=min_frequency)]
store.save_patterns(patterns)
store.close()
return patterns
def _format_report(patterns: list[dict], n_digests: int) -> str:
lines = [f"# Candidate Patterns ({len(patterns)} from {n_digests} sessions)", ""]
if not patterns:
lines.append("No recurring patterns above the frequency threshold yet.")
return "\n".join(lines)
for i, p in enumerate(patterns, 1):
flag = " [CROSS-FLAVOR]" if p["cross_flavor"] else ""
lines.append(f"{i}. {p['title']}{flag}")
lines.append(f" score={p['score']} freq={p['frequency']} "
f"impact={p['cost_impact']} flavors={','.join(p['flavors'])}")
lines.append(f" repos={','.join(p['repos']) or '-'} "
f"sessions={len(p['sessions'])}")
lines.append("")
return "\n".join(lines)
def main(argv=None) -> int:
here = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
ap = argparse.ArgumentParser(description="Detect candidate patterns from session digests.")
ap.add_argument("--config", default=os.path.join(here, "config.toml"))
ap.add_argument("--min-frequency", type=int, default=2)
ap.add_argument("--json", action="store_true", help="emit machine-readable JSON")
args = ap.parse_args(argv)
config = load_config(args.config)
store_cfg = config.get("store", {})
n = len(Store(_expand(store_cfg["db_path"]), _expand(store_cfg["blob_dir"])).list_digests())
patterns = run_detect(config, min_frequency=args.min_frequency)
if args.json:
print(json.dumps(patterns, indent=2))
else:
print(_format_report(patterns, n))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,78 @@
"""Pattern clusterer + evidence (PRD §5, §6.2; T05/T06).
Groups recurring :class:`Signal`s into candidate ``Pattern`` records. Clustering
is deterministic and keyed on ``(polarity, signal-type, locus)`` — enough to
surface "the same thing keeps happening" without embeddings (a later option).
Each candidate carries evidence (FR-D3): supporting sessions, frequency, affected
repos, affected **flavors**, and an estimated cost-impact score. Candidates whose
evidence spans more than one flavor are flagged ``cross_flavor`` (FR-D4) — the
highest-value reuse targets.
"""
from __future__ import annotations
import collections
from dataclasses import asdict, dataclass, field
from typing import Any
from .signals import PROBLEM, Signal
@dataclass
class Pattern:
key: str # stable cluster key
polarity: str # problem | success
signal_type: str
locus: str
frequency: int # number of supporting signals
sessions: list[str] = field(default_factory=list)
repos: list[str] = field(default_factory=list)
flavors: list[str] = field(default_factory=list)
cross_flavor: bool = False
cost_impact: float = 0.0 # frequency-weighted magnitude
score: float = 0.0 # ranking score (impact x frequency)
title: str = ""
def to_dict(self) -> dict[str, Any]:
return asdict(self)
def _key(s: Signal) -> str:
return f"{s.polarity}:{s.type}:{s.locus}"
def _title(polarity: str, signal_type: str, n_flavors: int) -> str:
scope = "cross-flavor " if n_flavors > 1 else ""
verb = "problem" if polarity == PROBLEM else "success"
return f"{scope}{verb}: {signal_type.replace('_', ' ')}"
def cluster(signals: list[Signal], *, min_frequency: int = 2) -> list[Pattern]:
"""Group signals into candidate patterns; keep clusters >= min_frequency."""
groups: dict[str, list[Signal]] = collections.defaultdict(list)
for s in signals:
groups[_key(s)].append(s)
patterns: list[Pattern] = []
for key, members in groups.items():
if len(members) < min_frequency:
continue
sessions = sorted({m.session_uid for m in members})
repos = sorted({m.repo for m in members if m.repo})
flavors = sorted({m.flavor for m in members})
cost_impact = sum(m.magnitude for m in members)
first = members[0]
p = Pattern(
key=key, polarity=first.polarity, signal_type=first.type, locus=first.locus,
frequency=len(members), sessions=sessions, repos=repos, flavors=flavors,
cross_flavor=len(flavors) > 1, cost_impact=round(cost_impact, 3),
title=_title(first.polarity, first.type, len(flavors)),
)
# rank: impact x frequency, with a boost for cross-flavor reuse value
p.score = round(p.cost_impact * p.frequency * (1.5 if p.cross_flavor else 1.0), 3)
patterns.append(p)
# cross-flavor first, then by score
patterns.sort(key=lambda p: (not p.cross_flavor, -p.score))
return patterns

View File

@@ -0,0 +1,116 @@
"""Signal extractors (PRD §6.2; T04).
Pure functions over a session digest (Tier 2) — the compact, durable view. Each
extractor emits zero or more :class:`Signal`s. A signal records its source
session, a *locus* (what it's about), a *polarity* (problem vs. success), and a
*magnitude*. Signals are the atoms the clusterer groups into candidate patterns.
No new capture happens here; everything is derived from digests already written
by the Capture layer, so detection is cheap and re-runnable.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
# polarity
PROBLEM = "problem"
SUCCESS = "success"
@dataclass
class Signal:
session_uid: str
flavor: str
repo: Optional[str]
type: str # e.g. "budget_overrun", "clean_pass"
polarity: str # PROBLEM | SUCCESS
locus: str # normalized subject key (tool, marker, ...)
magnitude: float = 1.0 # strength / cost weight
detail: dict[str, Any] = field(default_factory=dict)
# --- individual extractors --------------------------------------------------
# Each takes (digest, ctx) and returns a list[Signal]. ctx carries corpus-level
# stats (e.g. cost percentiles) so extractors can compare a session to its peers.
def _base(digest, type_, polarity, locus, magnitude=1.0, **detail) -> Signal:
return Signal(
session_uid=digest["session_uid"], flavor=digest["flavor"],
repo=digest.get("repo"), type=type_, polarity=polarity, locus=locus,
magnitude=magnitude, detail=detail,
)
def sig_retry_storm(digest, ctx) -> list[Signal]:
retries = digest.get("markers", {}).get("retries", 0)
if retries >= ctx.get("retry_storm_threshold", 3):
return [_base(digest, "retry_storm", PROBLEM, "retries", float(retries), retries=retries)]
return []
def sig_repeated_errors(digest, ctx) -> list[Signal]:
errors = digest.get("markers", {}).get("errors", 0)
if errors >= ctx.get("error_threshold", 3):
return [_base(digest, "repeated_errors", PROBLEM, "errors", float(errors), errors=errors)]
return []
def sig_budget_overrun(digest, ctx) -> list[Signal]:
total = digest.get("cost", {}).get("input_tokens", 0) + digest.get("cost", {}).get("output_tokens", 0)
p90 = ctx.get("tokens_p90", 0)
if p90 and total > p90:
return [_base(digest, "budget_overrun", PROBLEM, "tokens",
float(total) / max(p90, 1), tokens=total, p90=p90)]
return []
def sig_abandoned(digest, ctx) -> list[Signal]:
if digest.get("outcome") == "abandoned":
return [_base(digest, "abandoned", PROBLEM, "outcome", 1.0)]
return []
def sig_clean_pass(digest, ctx) -> list[Signal]:
"""Success: ended success, ran tests, no errors, modest cost."""
m = digest.get("markers", {})
if (digest.get("outcome") == "success" and m.get("test_runs", 0) >= 1
and m.get("errors", 0) == 0 and m.get("retries", 0) == 0):
return [_base(digest, "clean_pass", SUCCESS, "outcome", 1.0,
test_runs=m.get("test_runs"))]
return []
def sig_error_then_recovery(digest, ctx) -> list[Signal]:
"""Success despite hitting errors — a recovery worth learning from."""
m = digest.get("markers", {})
if digest.get("outcome") == "success" and m.get("errors", 0) >= 1:
return [_base(digest, "error_then_recovery", SUCCESS, "errors",
float(m.get("errors", 1)), errors=m.get("errors"))]
return []
EXTRACTORS: list[Callable] = [
sig_retry_storm, sig_repeated_errors, sig_budget_overrun, sig_abandoned,
sig_clean_pass, sig_error_then_recovery,
]
def build_context(digests: list[dict]) -> dict[str, Any]:
"""Corpus-level stats so extractors can compare a session to its peers."""
totals = sorted(
d.get("cost", {}).get("input_tokens", 0) + d.get("cost", {}).get("output_tokens", 0)
for d in digests
)
p90 = totals[int(0.9 * (len(totals) - 1))] if totals else 0
return {"tokens_p90": p90, "retry_storm_threshold": 3, "error_threshold": 3}
def extract_signals(digests: list[dict], ctx: Optional[dict] = None) -> list[Signal]:
ctx = ctx or build_context(digests)
out: list[Signal] = []
for d in digests:
for ex in EXTRACTORS:
out.extend(ex(d, ctx))
return out

View File

@@ -19,13 +19,19 @@ from dataclasses import dataclass, field
from typing import Any
from .adapters import claude as claude_adapter
from .adapters import codex as codex_adapter
from .adapters import grok as grok_adapter
from .core import digest as digest_mod
from .core.cursor import Cursors
from .core.retention import RetentionConfig, sweep as retention_sweep
from .core.store import Store
# adapter dispatch by source name
_ADAPTERS = {"claude": claude_adapter.parse_session}
_ADAPTERS = {
"claude": claude_adapter.parse_session,
"codex": codex_adapter.parse_session,
"grok": grok_adapter.parse_session,
}
@dataclass

54
tests/test_cluster.py Normal file
View File

@@ -0,0 +1,54 @@
"""Clusterer + evidence + cross-flavor tests (T05/T06)."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.detect.cluster import cluster # noqa: E402
from session_memory.detect.signals import PROBLEM, SUCCESS, Signal # noqa: E402
def _sig(uid, flavor, repo, type_, polarity, locus, mag=1.0):
return Signal(session_uid=uid, flavor=flavor, repo=repo, type=type_,
polarity=polarity, locus=locus, magnitude=mag)
def test_min_frequency_filters_singletons():
sigs = [_sig("claude:a", "claude", "r1", "retry_storm", PROBLEM, "retries")]
assert cluster(sigs, min_frequency=2) == []
def test_clusters_recurring_signal_with_evidence():
sigs = [
_sig("claude:a", "claude", "r1", "retry_storm", PROBLEM, "retries", 5),
_sig("claude:b", "claude", "r2", "retry_storm", PROBLEM, "retries", 3),
]
pats = cluster(sigs, min_frequency=2)
assert len(pats) == 1
p = pats[0]
assert p.frequency == 2
assert p.sessions == ["claude:a", "claude:b"]
assert sorted(p.repos) == ["r1", "r2"]
assert p.flavors == ["claude"]
assert p.cross_flavor is False
assert p.cost_impact == 8.0
def test_cross_flavor_flagged_and_ranked_first():
sigs = [
# cross-flavor problem (claude + codex)
_sig("claude:a", "claude", "r1", "repeated_errors", PROBLEM, "errors", 3),
_sig("codex:b", "codex", "r2", "repeated_errors", PROBLEM, "errors", 3),
# single-flavor success cluster with higher raw impact
_sig("grok:c", "grok", "r3", "clean_pass", SUCCESS, "outcome", 5),
_sig("grok:d", "grok", "r4", "clean_pass", SUCCESS, "outcome", 5),
]
pats = cluster(sigs, min_frequency=2)
assert len(pats) == 2
xf = next(p for p in pats if p.signal_type == "repeated_errors")
assert xf.cross_flavor is True
assert sorted(xf.flavors) == ["claude", "codex"]
# cross-flavor pattern is ranked first even if another has higher raw impact
assert pats[0].cross_flavor is True
assert "cross-flavor" in pats[0].title

View File

@@ -0,0 +1,86 @@
"""Codex adapter tests (T01): synthetic rollout fixture."""
import json
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.adapters.codex import parse_session # noqa: E402
REPO_MAP = {"agentic-resources": "helix_forge"}
def _rollout(path, lines):
with open(path, "w", encoding="utf-8") as f:
for ln in lines:
f.write(json.dumps(ln) + "\n")
def test_codex_rollout_parse(tmp_path):
p = tmp_path / "rollout-2026-06-06-abc.jsonl"
_rollout(p, [
{"timestamp": "2026-06-06T10:00:00Z", "type": "session_meta",
"payload": {"id": "cdx-1", "cwd": "/home/worsch/agentic-resources",
"model_provider": "openai", "cli_version": "0.44.0", "model": "gpt-5-codex"}},
{"timestamp": "2026-06-06T10:00:01Z", "type": "turn_context",
"payload": {"model": "gpt-5-codex", "approval_policy": "on-request"}},
{"timestamp": "2026-06-06T10:00:02Z", "type": "event_msg",
"payload": {"type": "task_started"}},
{"timestamp": "2026-06-06T10:00:03Z", "type": "response_item",
"payload": {"type": "message", "role": "user",
"content": [{"type": "input_text", "text": "fix the bug"}]}},
{"timestamp": "2026-06-06T10:00:04Z", "type": "response_item",
"payload": {"type": "reasoning", "summary": "think about it"}},
{"timestamp": "2026-06-06T10:00:05Z", "type": "response_item",
"payload": {"type": "function_call", "name": "apply_patch",
"arguments": "{\"path\":\"x.py\"}", "call_id": "call_1"}},
{"timestamp": "2026-06-06T10:00:06Z", "type": "response_item",
"payload": {"type": "function_call", "name": "shell",
"arguments": "{\"command\":\"pytest -q\"}", "call_id": "call_2"}},
{"timestamp": "2026-06-06T10:00:07Z", "type": "response_item",
"payload": {"type": "function_call_output", "call_id": "call_2", "output": "2 passed"}},
{"timestamp": "2026-06-06T10:00:08Z", "type": "response_item",
"payload": {"type": "message", "role": "assistant",
"content": [{"type": "output_text", "text": "done"}]}},
{"timestamp": "2026-06-06T10:00:09Z", "type": "event_msg",
"payload": {"type": "token_count",
"info": {"total_token_usage": {"input_tokens": 200, "output_tokens": 30,
"cached_input_tokens": 15}}}},
{"timestamp": "2026-06-06T10:00:10Z", "type": "event_msg",
"payload": {"type": "task_complete"}},
])
norm = parse_session(str(p), REPO_MAP)
assert norm is not None
s = norm.session
assert s.session_uid == "codex:cdx-1"
assert s.flavor == "codex"
assert s.repo == "agentic-resources" and s.domain == "helix_forge"
assert s.model == "gpt-5-codex"
assert s.cost.input_tokens == 200 and s.cost.output_tokens == 30 and s.cost.cache_tokens == 15
assert s.cost.turns == 1
assert s.cost.wall_clock_s == 10.0
kinds = [e.kind for e in norm.events]
assert kinds == ["lifecycle", "user_msg", "thinking", "edit", "test_run",
"tool_result", "assistant_msg", "completion"]
# flat linkage: function_call_output links to its function_call by call_id
out = next(e for e in norm.events if e.kind == "tool_result")
test_call = next(e for e in norm.events if e.kind == "test_run")
assert out.parent_seq == test_call.seq
# apply_patch classified as edit; pytest as test_run
edit = next(e for e in norm.events if e.kind == "edit")
assert edit.tool == "apply_patch"
def test_codex_empty_or_no_meta_returns_none(tmp_path):
p = tmp_path / "rollout-empty.jsonl"
p.write_text("")
assert parse_session(str(p), REPO_MAP) is None
p2 = tmp_path / "rollout-nometa.jsonl"
_rollout(p2, [{"timestamp": "t", "type": "event_msg", "payload": {"type": "task_started"}}])
assert parse_session(str(p2), REPO_MAP) is None # no session_meta -> no id

View File

@@ -0,0 +1,44 @@
"""Detect entrypoint tests (T07): end-to-end digests -> patterns, persisted."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.core.store import Store # noqa: E402
from session_memory.detect.__main__ import run_detect # noqa: E402
def _digest(uid, flavor, repo, **markers):
return {
"session_uid": uid, "flavor": flavor, "repo": repo, "outcome": "fail",
"cost": {"input_tokens": 10, "output_tokens": 1},
"markers": {"errors": markers.get("errors", 0), "retries": markers.get("retries", 0),
"test_runs": 0, "edits": 0, "human_interventions": 0},
}
def _config(tmp_path):
return {"store": {"db_path": str(tmp_path / ".store/m.db"),
"blob_dir": str(tmp_path / ".store/blobs"),
"cursor": str(tmp_path / ".store/c.json")}}
def test_run_detect_persists_cross_flavor_pattern(tmp_path):
cfg = _config(tmp_path)
st = Store(cfg["store"]["db_path"], cfg["store"]["blob_dir"])
# same problem (retry_storm) across two flavors -> cross-flavor candidate
st.write_digest("claude:a", _digest("claude:a", "claude", "r1", retries=5))
st.write_digest("codex:b", _digest("codex:b", "codex", "r2", retries=4))
st.close()
patterns = run_detect(cfg, min_frequency=2)
assert len(patterns) == 1
assert patterns[0]["cross_flavor"] is True
assert patterns[0]["signal_type"] == "retry_storm"
# persisted to the Tier 2 patterns table
st2 = Store(cfg["store"]["db_path"], cfg["store"]["blob_dir"])
rows = st2.db.execute("SELECT key FROM patterns").fetchall()
assert len(rows) == 1
st2.close()

View File

@@ -0,0 +1,92 @@
"""Grok adapter tests (T02): synthetic session dir + real local sessions."""
import glob
import json
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.adapters.grok import parse_session # noqa: E402
REPO_MAP = {"agentic-resources": "helix_forge", "net-kingdom": "netkingdom",
"can-you-assist": "coulomb_social"}
def _mk_session(dir_path, sid):
os.makedirs(dir_path, exist_ok=True)
with open(os.path.join(dir_path, "summary.json"), "w") as f:
json.dump({"info": {"id": sid, "cwd": "/home/worsch/agentic-resources"},
"created_at": "2026-06-06T10:00:00Z",
"last_active_at": "2026-06-06T10:05:00Z",
"current_model_id": "grok-build", "head_branch": "main"}, f)
with open(os.path.join(dir_path, "events.jsonl"), "w") as f:
f.write(json.dumps({"ts": "2026-06-06T10:00:00Z", "type": "turn_started",
"turn_number": 0, "model_id": "grok-build"}) + "\n")
f.write(json.dumps({"ts": "2026-06-06T10:05:00Z", "type": "turn_ended",
"turn_number": 0}) + "\n")
with open(os.path.join(dir_path, "chat_history.jsonl"), "w") as f:
for rec in [
{"type": "system", "content": "sys prompt"},
{"type": "user", "content": [{"type": "text", "text": "fix the bug"}]},
{"type": "reasoning", "content": [{"type": "text", "text": "thinking..."}]},
{"type": "assistant", "content": ""}, # empty -> skipped
{"type": "tool_result", "content": "The file x.py has been updated"},
{"type": "assistant", "content": "done"},
{"type": "tool_result", "content": "6 passed"},
]:
f.write(json.dumps(rec) + "\n")
with open(os.path.join(dir_path, "updates.jsonl"), "w") as f:
for u in [
{"sessionUpdate": "tool_call", "toolCallId": "c1", "title": "edit_file",
"rawInput": {"target_file": "x.py"}},
{"sessionUpdate": "tool_call", "toolCallId": "c2", "title": "shell",
"rawInput": {"command": "pytest -q"}},
]:
f.write(json.dumps({"timestamp": "t", "method": "session/update",
"params": {"sessionId": sid, "update": u}}) + "\n")
def test_grok_synthetic_dir(tmp_path):
d = tmp_path / "%2Fhome%2Fworsch%2Fagentic-resources" / "sid-1"
_mk_session(str(d), "sid-1")
norm = parse_session(str(d / "chat_history.jsonl"), REPO_MAP)
assert norm is not None
s = norm.session
assert s.session_uid == "grok:sid-1"
assert s.flavor == "grok"
assert s.repo == "agentic-resources" and s.domain == "helix_forge"
assert s.model == "grok-build"
assert s.git_branch == "main"
assert s.cost.turns == 1
assert s.cost.wall_clock_s == 300.0
kinds = [e.kind for e in norm.events]
# 4 lifecycle from events.jsonl? no: turn_started + turn_ended = 2 lifecycle
assert kinds.count("lifecycle") == 2
assert "user_msg" in kinds and "thinking" in kinds and "assistant_msg" in kinds
# paired tool calls recovered names -> edit + test_run, each followed by tool_result
assert "edit" in kinds and "test_run" in kinds
edit = next(e for e in norm.events if e.kind == "edit")
assert edit.tool == "edit_file"
# tool_result after test_run links to it
tr = [e for e in norm.events if e.kind == "tool_result"]
assert len(tr) == 2
def test_real_local_grok_sessions_if_available():
base = os.path.expanduser("~/.grok/sessions")
chats = glob.glob(os.path.join(base, "*", "*", "chat_history.jsonl"))
if not chats:
return
parsed = 0
for c in chats:
norm = parse_session(c, REPO_MAP)
if norm is None:
continue
parsed += 1
assert norm.session.session_uid.startswith("grok:")
seqs = [e.seq for e in norm.events]
assert seqs == sorted(seqs) and len(seqs) == len(set(seqs))
assert parsed >= 1

66
tests/test_merge.py Normal file
View File

@@ -0,0 +1,66 @@
"""Multi-file session merge tests (T03)."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.adapters.common import Normalized # noqa: E402
from session_memory.core.schema import Session, SessionEvent # noqa: E402
from session_memory.core.store import Store # noqa: E402
def _part(native, kinds, base_blob="b"):
uid = Session.make_uid("claude", native)
s = Session(session_uid=uid, flavor="claude", native_session_id=native)
events, blobs = [], {}
for i, k in enumerate(kinds):
ref = f"blob://{native}/{i}"
events.append(SessionEvent(session_uid=uid, seq=i, parent_seq=(i - 1 if i else None),
kind=k, ts=f"2026-06-06T10:0{i}:00Z", payload_ref=ref))
blobs[ref] = f"{base_blob}-{k}-{i}"
return Normalized(session=s, events=events, blobs=blobs)
def test_second_file_appends_not_overwrites(tmp_path):
st = Store(str(tmp_path / "m.db"), str(tmp_path / "blobs"))
uid = Session.make_uid("claude", "s1")
# file 1: 3 events (seq 0..2)
n1 = _part("s1", ["user_msg", "assistant_msg", "tool_call"])
added1 = st.ingest(n1)
assert added1 == 3
assert st.count_events(uid) == 3
# file 2 for the SAME session: repeats event 0 + adds 2 new (continuation)
n2 = _part("s1", ["user_msg", "edit", "completion"])
# make the first event identical to file1's first event so it dedups
n2.events[0].kind = "user_msg"
n2.events[0].ts = "2026-06-06T10:00:00Z"
n2.blobs[n2.events[0].payload_ref] = "b-user_msg-0"
added2 = st.ingest(n2)
# only the 2 genuinely-new events appended; total grows additively
assert added2 == 2
assert st.count_events(uid) == 5
seqs = [e.seq for e in st.get_events(uid)]
assert seqs == [0, 1, 2, 3, 4] # contiguous, offset
def test_reingest_same_bundle_is_idempotent(tmp_path):
st = Store(str(tmp_path / "m.db"), str(tmp_path / "blobs"))
uid = Session.make_uid("claude", "s2")
n = _part("s2", ["user_msg", "assistant_msg"])
assert st.ingest(n) == 2
assert st.ingest(n) == 0 # nothing new on re-run
assert st.count_events(uid) == 2
def test_appended_event_parent_remapped_within_part(tmp_path):
st = Store(str(tmp_path / "m.db"), str(tmp_path / "blobs"))
uid = Session.make_uid("claude", "s3")
st.ingest(_part("s3", ["user_msg", "assistant_msg"])) # seq 0,1
st.ingest(_part("s3", ["x_unused"]) if False else _part("s3", ["thinking", "edit"])) # new 2,3
events = {e.seq: e for e in st.get_events(uid)}
# the 'edit' (seq 3) had parent_seq=0 within its part -> remapped to its part's first new seq (2)
assert events[3].parent_seq == 2

53
tests/test_signals.py Normal file
View File

@@ -0,0 +1,53 @@
"""Signal extractor tests (T04)."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.detect.signals import ( # noqa: E402
PROBLEM, SUCCESS, build_context, extract_signals,
)
def _digest(uid, flavor="claude", repo="r", outcome="success", tokens=100,
errors=0, retries=0, test_runs=0):
return {
"session_uid": uid, "flavor": flavor, "repo": repo, "outcome": outcome,
"cost": {"input_tokens": tokens, "output_tokens": 0},
"markers": {"errors": errors, "retries": retries, "test_runs": test_runs,
"edits": 0, "human_interventions": 0},
}
def test_problem_signals():
digests = [
_digest("claude:a", retries=5, outcome="fail"),
_digest("claude:b", errors=4),
_digest("claude:c", outcome="abandoned"),
]
sigs = extract_signals(digests)
types = {(s.session_uid, s.type) for s in sigs}
assert ("claude:a", "retry_storm") in types
assert ("claude:b", "repeated_errors") in types
assert ("claude:c", "abandoned") in types
assert all(s.polarity == PROBLEM for s in sigs
if s.type in ("retry_storm", "repeated_errors", "abandoned"))
def test_success_signals():
sigs = extract_signals([_digest("grok:x", outcome="success", test_runs=2)])
assert any(s.type == "clean_pass" and s.polarity == SUCCESS for s in sigs)
rec = extract_signals([_digest("codex:y", outcome="success", errors=2)])
assert any(s.type == "error_then_recovery" and s.polarity == SUCCESS for s in rec)
def test_budget_overrun_uses_corpus_p90():
digests = [_digest(f"claude:{i}", tokens=100) for i in range(10)]
digests.append(_digest("claude:big", tokens=100000))
ctx = build_context(digests)
assert ctx["tokens_p90"] >= 100
sigs = extract_signals(digests, ctx)
overruns = [s for s in sigs if s.type == "budget_overrun"]
assert overruns and overruns[0].session_uid == "claude:big"

View File

@@ -0,0 +1,169 @@
---
id: AGENTIC-WP-0003
type: workplan
title: "Coding Session Memory — Phase 1 (Codex + Grok adapters, Detect)"
domain: helix_forge
repo: agentic-resources
status: finished
owner: codex
topic_slug: helix-forge
created: "2026-06-06"
updated: "2026-06-06"
state_hub_workstream_id: "88c75b47-1c89-43bc-bb3e-739ec3c8f7d4"
---
# Coding Session Memory — Phase 1
Extends Phase 0 ([AGENTIC-WP-0002](AGENTIC-WP-0002-session-memory-phase0.md)) along
two axes of [PRD-helix-forge](../docs/PRD-helix-forge.md):
1. **Multi-flavor capture (G1/G6):** add the Codex and Grok collector adapters so
the agnostic core ingests all three families through thin edges.
2. **Detect (PRD §6.2):** run signal extractors over normalized sessions, cluster
recurring signals into candidate problem/success patterns, attach evidence, and
flag cross-flavor patterns.
Both flavors' on-disk schemas are already confirmed in
[DESIGN-session-memory.md](../docs/DESIGN-session-memory.md) §2.2 (Codex) and §2.3
(Grok), with the native→`kind` mapping in §4.3 — so the adapters are written
against known structures, not discovered ones.
## Codex Collector Adapter
```task
id: AGENTIC-WP-0003-T01
status: done
priority: high
state_hub_task_id: "91264fd4-ba99-4add-b317-e2320c3c932c"
```
Implement `adapters/codex.py` reading `~/.codex/sessions/YYYY/MM/DD/rollout-*.jsonl`
per design §2.2: line wrapper `{timestamp,type,payload}`; map `session_meta`→Session
fields, `turn_context`→model, `response_item/message``user_msg`/`assistant_msg`,
`function_call`+`function_call_output` (joined on `call_id`)→`tool_call`/`tool_result`,
`reasoning``thinking`, `event_msg/task_*``lifecycle`/`completion`,
`event_msg/token_count`→cost. Codex is flat: assign `seq`/`parent_seq` by temporal
order (no native DAG). Version-detect on `session_meta.cli_version`. Reuse the
`Normalized` bundle contract. Tests use synthetic rollout fixtures; confirm the
`token_count` payload field names against a real install if Codex is present
(design OQ1 residual).
## Grok Collector Adapter
```task
id: AGENTIC-WP-0003-T02
status: done
priority: high
state_hub_task_id: "fe3d7d1c-110e-4f16-8d56-062fa4a651aa"
```
Implement `adapters/grok.py` reading the per-session directory
`~/.grok/sessions/<cwd>/<uuid>/` per design §2.3: `summary.json`→Session
id/cwd/timestamps, `chat_history.jsonl`→messages, `events.jsonl`→explicit
`lifecycle` events and `turn_number` (key `seq` off it), tool calls/results from
`chat_history`/`updates.jsonl`, token fields from events/updates. Resolve the
url-encoded cwd dir name back to a path. Tests against the real local Grok
sessions on this workstation plus a synthetic dir fixture.
## Multi-File / Multi-Part Session Merge
```task
id: AGENTIC-WP-0003-T03
status: done
priority: medium
state_hub_task_id: "c4acfb63-84cd-4299-a44d-91bb6857fa88"
```
Address design OQ6 (surfaced in Phase 0): several files can map to one
`session_uid` (resume, sidechains; Grok dirs are inherently multi-file). Change
the store/ingest path to **merge** events across parts of one session rather than
last-file-wins upsert — stable event ordering and de-duplication keyed on native
identity. Verify event counts are additive and idempotent on re-run.
## Signal Extractors
```task
id: AGENTIC-WP-0003-T04
status: done
priority: high
state_hub_task_id: "20920c5d-16f7-43bb-9ed7-9afbfeaf7207"
```
Implement `detect/signals.py`: derive `Signal`s from normalized sessions/digests —
e.g. repeated test failure on the same target, budget overrun (cost vs. peers),
retry storm, fast clean resolution, human escalation, error-then-recovery. Each
signal carries its source `session_uid`, locus (file/tool/task), polarity
(problem|success), and magnitude. Pure functions over Tier 1 events + Tier 2
digests; no new capture. Unit-tested on synthetic sessions.
## Pattern Clusterer
```task
id: AGENTIC-WP-0003-T05
status: done
priority: high
state_hub_task_id: "f42d57f6-34dc-4a92-bf6a-4d8eab572467"
```
Implement `detect/cluster.py`: group recurring signals across sessions/repos/
flavors into candidate `ProblemPattern`/`SuccessPattern` records (PRD §5). Start
with deterministic keyed clustering (locus + signal-type + normalized message);
leave embedding-based similarity as a later option. Output candidates with
frequency and member session lists.
## Pattern Evidence + Cross-Flavor Flagging
```task
id: AGENTIC-WP-0003-T06
status: done
priority: medium
state_hub_task_id: "8fd502d6-d138-4a42-acd5-6f5921859605"
```
For each candidate pattern (PRD §6.2 FR-D3/FR-D4) attach evidence: supporting
sessions, frequency, affected repos, affected **flavors**, and estimated cost
impact (token/retry deltas vs. baseline). Explicitly flag candidates whose
evidence spans more than one flavor as `cross_flavor: true` — the highest-value
reuse targets. Persist candidates to a Tier 2 `patterns` store/table.
## Candidate Pattern Report
```task
id: AGENTIC-WP-0003-T07
status: done
priority: medium
state_hub_task_id: "34a96d5d-9165-4761-b91e-3643b0401410"
```
Add a `detect` entrypoint (`python -m session_memory.detect`) that runs extractors
→ clusterer → evidence and emits a human-readable candidate report (ranked by
cost impact × frequency, cross-flavor first), plus machine-readable JSON. This is
the input to the Curate phase (Phase 2) review workflow. Document usage in the
session_memory README.
## Verify Across All Three Flavors
```task
id: AGENTIC-WP-0003-T08
status: done
priority: medium
state_hub_task_id: "b272c3fa-af81-4a6c-9ed9-7b42173efa81"
```
Run the full pipeline (ingest all enabled sources → digest → detect) against the
real local Claude and Grok sessions on this workstation (Codex via fixtures if not
installed). Confirm: normalized rows for each flavor, at least one candidate
pattern surfaced, and at least one **cross-flavor** pattern detected if the data
supports it (PRD success metric). Record results and refresh design open
questions. After workplan file updates, notify the custodian operator to run from
`~/state-hub`:
```bash
make fix-consistency REPO=agentic-resources
```
**Verification results (2026-06-06):** full suite 40/40 green. Live pipeline over
real local sessions (Codex not installed → fixtures): ingested 88 → 67 digests
(63 Claude + 4 Grok); detect surfaced 3 candidate patterns, **2 cross-flavor**
(Claude+Grok) — "clean pass" success across 18 sessions and "abandoned" problem
across 13 — plus a Claude-only budget-overrun. PRD cross-flavor success metric met.