generated from coulomb/repo-seed
Compare commits
5 Commits
7c6f4358ee
...
055713aa4f
| Author | SHA1 | Date | |
|---|---|---|---|
| 055713aa4f | |||
| 436a96dcd8 | |||
| 06767ef924 | |||
| bc11cb9aec | |||
| 5aea22f24f |
@@ -13,13 +13,19 @@ time window.
|
||||
|
||||
```
|
||||
session_memory/
|
||||
adapters/claude.py # Tier0 -> Tier1 normalizer (Codex/Grok land in Phase 1)
|
||||
adapters/common.py # shared Normalized bundle + helpers
|
||||
adapters/claude.py # Tier0 -> Tier1 normalizers, one per flavor
|
||||
adapters/codex.py # (rollout {timestamp,type,payload}, flat call_id join)
|
||||
adapters/grok.py # (per-session dir: chat_history + events + updates)
|
||||
core/schema.py # Session / SessionEvent / Cost
|
||||
core/store.py # SQLite rows + blob-dir bodies (Tier1) + digests (Tier2)
|
||||
core/store.py # SQLite rows + blob-dir bodies (Tier1) + digests/patterns (Tier2)
|
||||
core/cursor.py # incremental ingest cursors
|
||||
core/digest.py # Tier1 -> Tier2 promotion + outcome heuristic
|
||||
core/retention.py # budget-based eviction sweep
|
||||
ingest.py # one sweep: discover -> normalize -> store -> digest -> evict
|
||||
detect/signals.py # signal extractors over digests
|
||||
detect/cluster.py # cluster signals -> candidate patterns + cross-flavor flag
|
||||
detect/__main__.py # python -m session_memory.detect (ranked report)
|
||||
config.toml # store paths, retention caps, sources, repo->domain map
|
||||
```
|
||||
|
||||
@@ -51,6 +57,20 @@ the sweep *runs*. Trigger it with the repo scheduler, e.g. daily:
|
||||
or a cron entry / `/loop` on a timer. Push-capture (agent Stop/SessionEnd hooks)
|
||||
can also enqueue a sweep; see design §7.
|
||||
|
||||
## Detect candidate patterns
|
||||
|
||||
After ingesting, mine the digests for recurring problem/success patterns:
|
||||
|
||||
```bash
|
||||
python -m session_memory.detect # ranked report, cross-flavor first
|
||||
python -m session_memory.detect --json # machine-readable candidates
|
||||
python -m session_memory.detect --min-frequency 3
|
||||
```
|
||||
|
||||
Candidates are persisted to a Tier 2 `patterns` table and are the input to the
|
||||
Curate phase (Phase 2). Patterns whose evidence spans more than one agent flavor
|
||||
are flagged `[CROSS-FLAVOR]` — the highest-value reuse targets.
|
||||
|
||||
## Retention knobs (`[retention]` in config.toml)
|
||||
|
||||
| Key | Meaning |
|
||||
@@ -71,5 +91,9 @@ python -m pytest # 26 tests: schema, adapter, store, digest, retention,
|
||||
|
||||
## Status
|
||||
|
||||
Phase 0 (AGENTIC-WP-0002): Claude adapter only, end to end. Codex and Grok
|
||||
adapters are designed (schemas confirmed in the design doc) and land in Phase 1.
|
||||
- **Phase 0** (AGENTIC-WP-0002): schema, store, digest, budget retention, Claude
|
||||
adapter, ingest sweep.
|
||||
- **Phase 1** (AGENTIC-WP-0003): Codex + Grok adapters, multi-file session merge,
|
||||
and the Detect pipeline (signals → clustering → cross-flavor candidate patterns).
|
||||
- **Next — Phase 2 (Curate):** review/approve candidates into a versioned pattern
|
||||
catalog. **Phase 3 (Distribute) / Phase 4 (Measure)** follow per the PRD.
|
||||
|
||||
@@ -11,54 +11,23 @@ that the store persists out-of-line so Tier 1 rows stay light.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Iterable, Optional
|
||||
from typing import Any, Optional
|
||||
|
||||
from ..core.schema import Cost, Session, SessionEvent
|
||||
from .common import ( # noqa: F401 (Normalized re-exported for back-compat)
|
||||
Normalized,
|
||||
classify_tool,
|
||||
first_line as _first_line,
|
||||
iter_jsonl as _iter_records,
|
||||
now_iso as _now,
|
||||
resolve_repo as _resolve_repo,
|
||||
seconds_between as _seconds_between,
|
||||
stringify as _stringify,
|
||||
)
|
||||
|
||||
FLAVOR = "claude"
|
||||
|
||||
# tool_use names that mutate files -> kind "edit"
|
||||
_EDIT_TOOLS = {"Edit", "Write", "NotebookEdit", "MultiEdit"}
|
||||
# crude test-runner detection inside Bash commands -> kind "test_run"
|
||||
_TEST_HINTS = ("pytest", "unittest", "npm test", "npm run test", "go test", "cargo test", "jest", "vitest")
|
||||
|
||||
|
||||
@dataclass
|
||||
class Normalized:
|
||||
session: Session
|
||||
events: list[SessionEvent]
|
||||
blobs: dict[str, str] = field(default_factory=dict)
|
||||
|
||||
|
||||
def _iter_records(path: str) -> Iterable[dict[str, Any]]:
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
yield json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
continue # tolerate partial/corrupt trailing lines
|
||||
|
||||
|
||||
def _resolve_repo(cwd: Optional[str], repo_domain_map: dict[str, str]) -> tuple[Optional[str], Optional[str]]:
|
||||
"""cwd -> (repo, domain). repo is the cwd basename; domain via map."""
|
||||
if not cwd:
|
||||
return None, None
|
||||
repo = os.path.basename(cwd.rstrip("/")) or None
|
||||
domain = repo_domain_map.get(repo) if repo else None
|
||||
return repo, domain
|
||||
|
||||
|
||||
def _is_test_command(text: str) -> bool:
|
||||
low = text.lower()
|
||||
return any(h in low for h in _TEST_HINTS)
|
||||
|
||||
|
||||
def _content_blocks(message: dict[str, Any]) -> list[dict[str, Any]]:
|
||||
content = message.get("content")
|
||||
@@ -159,11 +128,8 @@ def parse_session(path: str, repo_domain_map: Optional[dict[str, str]] = None) -
|
||||
name = b.get("name", "")
|
||||
inp = b.get("input", {})
|
||||
body = _stringify(inp)
|
||||
kind = "tool_call"
|
||||
if name in _EDIT_TOOLS:
|
||||
kind = "edit"
|
||||
elif name == "Bash" and _is_test_command(_stringify(inp.get("command", ""))):
|
||||
kind = "test_run"
|
||||
cmd = inp.get("command", "") if isinstance(inp, dict) else ""
|
||||
kind = classify_tool(name, _stringify(cmd))
|
||||
add_event(uuid, parent, ts, kind, role="assistant", tool=name,
|
||||
summary=f"{name}", body=body, sidechain=sidechain)
|
||||
|
||||
@@ -194,35 +160,3 @@ def parse_session(path: str, repo_domain_map: Optional[dict[str, str]] = None) -
|
||||
discovered_at=_now(),
|
||||
)
|
||||
return Normalized(session=session, events=events, blobs=blobs)
|
||||
|
||||
|
||||
# ---- helpers ---------------------------------------------------------------
|
||||
|
||||
def _stringify(v: Any) -> str:
|
||||
if v is None:
|
||||
return ""
|
||||
if isinstance(v, str):
|
||||
return v
|
||||
try:
|
||||
return json.dumps(v, ensure_ascii=False)[:20000]
|
||||
except (TypeError, ValueError):
|
||||
return str(v)[:20000]
|
||||
|
||||
|
||||
def _first_line(text: str) -> str:
|
||||
return (text or "").strip().splitlines()[0] if (text or "").strip() else ""
|
||||
|
||||
|
||||
def _seconds_between(start: Optional[str], end: Optional[str]) -> float:
|
||||
if not start or not end:
|
||||
return 0.0
|
||||
try:
|
||||
a = datetime.fromisoformat(start.replace("Z", "+00:00"))
|
||||
b = datetime.fromisoformat(end.replace("Z", "+00:00"))
|
||||
return max(0.0, (b - a).total_seconds())
|
||||
except ValueError:
|
||||
return 0.0
|
||||
|
||||
|
||||
def _now() -> str:
|
||||
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
|
||||
167
session_memory/adapters/codex.py
Normal file
167
session_memory/adapters/codex.py
Normal file
@@ -0,0 +1,167 @@
|
||||
"""OpenAI Codex CLI collector adapter — Tier 0 -> Tier 1 (design §2.2, §4.3).
|
||||
|
||||
Reads ``$CODEX_HOME/sessions/YYYY/MM/DD/rollout-*.jsonl``. Each line is a
|
||||
``RolloutLine`` wrapper ``{timestamp, type, payload}``; ``type`` discriminates
|
||||
``session_meta`` / ``response_item`` / ``event_msg`` / ``turn_context`` /
|
||||
``compacted``.
|
||||
|
||||
Codex is **flat** — tool calls and outputs are joined only by ``call_id`` with no
|
||||
parent-ref DAG — so ``seq`` is assigned by temporal (line) order and
|
||||
``parent_seq`` is set for ``function_call_output`` back to its ``function_call``.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from typing import Any, Optional
|
||||
|
||||
from ..core.schema import Cost, Session, SessionEvent
|
||||
from .common import (
|
||||
Normalized,
|
||||
classify_tool,
|
||||
first_line,
|
||||
iter_jsonl,
|
||||
now_iso,
|
||||
resolve_repo,
|
||||
seconds_between,
|
||||
stringify,
|
||||
)
|
||||
|
||||
FLAVOR = "codex"
|
||||
|
||||
|
||||
def _message_text(payload: dict[str, Any]) -> str:
|
||||
content = payload.get("content")
|
||||
if isinstance(content, str):
|
||||
return content
|
||||
parts = []
|
||||
if isinstance(content, list):
|
||||
for b in content:
|
||||
if isinstance(b, dict):
|
||||
parts.append(b.get("text") or b.get("output_text") or "")
|
||||
elif isinstance(b, str):
|
||||
parts.append(b)
|
||||
return "\n".join(p for p in parts if p)
|
||||
|
||||
|
||||
def _extract_tokens(payload: dict[str, Any]) -> tuple[int, int, int]:
|
||||
"""Best-effort (input, output, cache) from a token_count payload.
|
||||
|
||||
Field shapes vary across Codex versions; probe known locations, else recurse.
|
||||
"""
|
||||
for scope in (payload, payload.get("info") or {}, payload.get("usage") or {},
|
||||
(payload.get("info") or {}).get("total_token_usage") or {}):
|
||||
if isinstance(scope, dict):
|
||||
i = scope.get("input_tokens") or scope.get("prompt_tokens")
|
||||
o = scope.get("output_tokens") or scope.get("completion_tokens")
|
||||
if i is not None or o is not None:
|
||||
cache = scope.get("cached_input_tokens") or scope.get("cache_read_input_tokens") or 0
|
||||
return int(i or 0), int(o or 0), int(cache or 0)
|
||||
return 0, 0, 0
|
||||
|
||||
|
||||
def parse_session(path: str, repo_domain_map: Optional[dict[str, str]] = None) -> Optional[Normalized]:
|
||||
repo_domain_map = repo_domain_map or {}
|
||||
records = list(iter_jsonl(path))
|
||||
if not records:
|
||||
return None
|
||||
|
||||
session_id: Optional[str] = None
|
||||
cwd = model = cli_version = None
|
||||
timestamps: list[str] = []
|
||||
events: list[SessionEvent] = []
|
||||
blobs: dict[str, str] = {}
|
||||
call_seq: dict[str, int] = {} # call_id -> seq of its function_call
|
||||
cost = Cost()
|
||||
seq = 0
|
||||
|
||||
def add_event(ts, kind, *, role=None, tool=None, summary=None, body=None,
|
||||
tokens=0, parent_seq=None) -> int:
|
||||
nonlocal seq
|
||||
s = seq
|
||||
seq += 1
|
||||
payload_ref = None
|
||||
if body:
|
||||
payload_ref = f"blob://{session_id}/{s}"
|
||||
blobs[payload_ref] = body
|
||||
events.append(SessionEvent(
|
||||
session_uid=Session.make_uid(FLAVOR, session_id or "unknown"),
|
||||
seq=s, parent_seq=parent_seq, ts=ts, kind=kind, role=role, tool=tool,
|
||||
summary=(summary or "")[:300] or None, payload_ref=payload_ref, tokens=tokens,
|
||||
))
|
||||
return s
|
||||
|
||||
for rec in records:
|
||||
rtype = rec.get("type")
|
||||
ts = rec.get("timestamp")
|
||||
if ts:
|
||||
timestamps.append(ts)
|
||||
payload = rec.get("payload") or {}
|
||||
|
||||
if rtype == "session_meta":
|
||||
session_id = session_id or payload.get("id")
|
||||
cwd = cwd or payload.get("cwd")
|
||||
model = model or payload.get("model")
|
||||
cli_version = cli_version or payload.get("cli_version")
|
||||
|
||||
elif rtype == "turn_context":
|
||||
model = model or payload.get("model")
|
||||
|
||||
elif rtype == "response_item":
|
||||
ptype = payload.get("type")
|
||||
if ptype == "message":
|
||||
role = payload.get("role", "assistant")
|
||||
text = _message_text(payload)
|
||||
kind = "assistant_msg" if role == "assistant" else "user_msg"
|
||||
add_event(ts, kind, role=role, summary=first_line(text), body=text)
|
||||
elif ptype == "function_call":
|
||||
name = payload.get("name", "")
|
||||
args = stringify(payload.get("arguments"))
|
||||
kind = classify_tool(name, args)
|
||||
s = add_event(ts, kind, role="assistant", tool=name,
|
||||
summary=name, body=args)
|
||||
call_id = payload.get("call_id")
|
||||
if call_id:
|
||||
call_seq[call_id] = s
|
||||
elif ptype == "function_call_output":
|
||||
call_id = payload.get("call_id")
|
||||
parent = call_seq.get(call_id)
|
||||
body = stringify(payload.get("output"))
|
||||
add_event(ts, "tool_result", role="tool", tool=None,
|
||||
summary="tool result", body=body, parent_seq=parent)
|
||||
elif ptype == "reasoning":
|
||||
body = _message_text(payload) or stringify(payload.get("summary"))
|
||||
add_event(ts, "thinking", role="assistant", summary="reasoning", body=body)
|
||||
|
||||
elif rtype == "event_msg":
|
||||
ptype = payload.get("type")
|
||||
if ptype == "task_started":
|
||||
add_event(ts, "lifecycle", summary="task_started")
|
||||
elif ptype == "task_complete":
|
||||
add_event(ts, "completion", summary="task_complete")
|
||||
elif ptype == "token_count":
|
||||
i, o, c = _extract_tokens(payload)
|
||||
cost.input_tokens += i
|
||||
cost.output_tokens += o
|
||||
cost.cache_tokens += c
|
||||
# user_message / agent_message echoes are duplicated by response_item
|
||||
# messages on modern Codex; skipped to avoid double counting.
|
||||
|
||||
if session_id is None:
|
||||
return None
|
||||
|
||||
cost.turns = sum(1 for e in events if e.kind == "user_msg")
|
||||
started = min(timestamps) if timestamps else None
|
||||
ended = max(timestamps) if timestamps else None
|
||||
cost.wall_clock_s = seconds_between(started, ended)
|
||||
|
||||
repo, domain = resolve_repo(cwd, repo_domain_map)
|
||||
session = Session(
|
||||
session_uid=Session.make_uid(FLAVOR, session_id),
|
||||
flavor=FLAVOR, native_session_id=session_id,
|
||||
repo=repo, domain=domain, cwd=cwd, model=model,
|
||||
started_at=started, ended_at=ended, outcome="unknown", cost=cost,
|
||||
source_path=path, source_bytes=os.path.getsize(path) if os.path.exists(path) else 0,
|
||||
discovered_at=now_iso(),
|
||||
)
|
||||
return Normalized(session=session, events=events, blobs=blobs)
|
||||
100
session_memory/adapters/common.py
Normal file
100
session_memory/adapters/common.py
Normal file
@@ -0,0 +1,100 @@
|
||||
"""Shared adapter helpers (Tier 0 -> Tier 1).
|
||||
|
||||
The ``Normalized`` bundle contract and small flavor-agnostic helpers used by every
|
||||
collector adapter. Per-flavor parsing lives in the individual adapter modules.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Optional
|
||||
|
||||
from ..core.schema import Session, SessionEvent
|
||||
|
||||
# tool names that mutate files -> kind "edit" (union across flavors)
|
||||
EDIT_TOOLS = {
|
||||
"Edit", "Write", "NotebookEdit", "MultiEdit", # Claude
|
||||
"apply_patch", "write_file", "edit_file", # Codex / Grok variants
|
||||
}
|
||||
# substrings in a shell/tool command that indicate a test run -> kind "test_run"
|
||||
TEST_HINTS = (
|
||||
"pytest", "unittest", "npm test", "npm run test", "go test",
|
||||
"cargo test", "jest", "vitest", "make test", "tox",
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Normalized:
|
||||
session: Session
|
||||
events: list[SessionEvent]
|
||||
blobs: dict[str, str] = field(default_factory=dict)
|
||||
|
||||
|
||||
def resolve_repo(cwd: Optional[str], repo_domain_map: dict[str, str]) -> tuple[Optional[str], Optional[str]]:
|
||||
"""cwd -> (repo, domain). repo is the cwd basename; domain via map."""
|
||||
if not cwd:
|
||||
return None, None
|
||||
repo = os.path.basename(cwd.rstrip("/")) or None
|
||||
domain = repo_domain_map.get(repo) if repo else None
|
||||
return repo, domain
|
||||
|
||||
|
||||
def is_test_command(text: str) -> bool:
|
||||
low = (text or "").lower()
|
||||
return any(h in low for h in TEST_HINTS)
|
||||
|
||||
|
||||
def classify_tool(name: str, command_text: str = "") -> str:
|
||||
"""Map a tool invocation to an event kind: edit | test_run | tool_call."""
|
||||
if name in EDIT_TOOLS:
|
||||
return "edit"
|
||||
if is_test_command(command_text) or is_test_command(name):
|
||||
return "test_run"
|
||||
return "tool_call"
|
||||
|
||||
|
||||
def stringify(v: Any, limit: int = 20000) -> str:
|
||||
if v is None:
|
||||
return ""
|
||||
if isinstance(v, str):
|
||||
return v[:limit]
|
||||
try:
|
||||
return json.dumps(v, ensure_ascii=False)[:limit]
|
||||
except (TypeError, ValueError):
|
||||
return str(v)[:limit]
|
||||
|
||||
|
||||
def first_line(text: str) -> str:
|
||||
t = (text or "").strip()
|
||||
return t.splitlines()[0] if t else ""
|
||||
|
||||
|
||||
def seconds_between(start: Optional[str], end: Optional[str]) -> float:
|
||||
if not start or not end:
|
||||
return 0.0
|
||||
try:
|
||||
a = datetime.fromisoformat(start.replace("Z", "+00:00"))
|
||||
b = datetime.fromisoformat(end.replace("Z", "+00:00"))
|
||||
return max(0.0, (b - a).total_seconds())
|
||||
except ValueError:
|
||||
return 0.0
|
||||
|
||||
|
||||
def iter_jsonl(path: str):
|
||||
"""Yield parsed JSON objects from a JSONL file, tolerating bad lines."""
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
yield json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
|
||||
def now_iso() -> str:
|
||||
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
182
session_memory/adapters/grok.py
Normal file
182
session_memory/adapters/grok.py
Normal file
@@ -0,0 +1,182 @@
|
||||
"""Grok CLI collector adapter — Tier 0 -> Tier 1 (design §2.3, §4.3).
|
||||
|
||||
A Grok session is a *directory* ``~/.grok/sessions/<enc-cwd>/<uuid>/`` containing
|
||||
``summary.json`` (metadata), ``chat_history.jsonl`` (the canonical transcript),
|
||||
``events.jsonl`` (explicit lifecycle + ``turn_number``), and ``updates.jsonl``
|
||||
(ACP ``session/update`` stream, which carries tool-call names/args).
|
||||
|
||||
The ingest glob matches ``chat_history.jsonl``; this adapter derives its sibling
|
||||
files from the same directory. Conversation order is taken from
|
||||
``chat_history.jsonl``; tool-call names are paired, in order, from
|
||||
``updates.jsonl`` ``tool_call`` entries to classify edits/test runs.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from typing import Any, Optional
|
||||
|
||||
from ..core.schema import Cost, Session, SessionEvent
|
||||
from .common import (
|
||||
Normalized,
|
||||
classify_tool,
|
||||
first_line,
|
||||
iter_jsonl,
|
||||
now_iso,
|
||||
resolve_repo,
|
||||
seconds_between,
|
||||
stringify,
|
||||
)
|
||||
|
||||
FLAVOR = "grok"
|
||||
|
||||
|
||||
def _text_content(content: Any) -> str:
|
||||
if isinstance(content, str):
|
||||
return content
|
||||
if isinstance(content, list):
|
||||
return "\n".join(
|
||||
(b.get("text") or "") for b in content if isinstance(b, dict)
|
||||
)
|
||||
return ""
|
||||
|
||||
|
||||
def _tool_calls_in_order(session_dir: str) -> list[dict[str, Any]]:
|
||||
"""Ordered list of {title, rawInput} from updates.jsonl tool_call entries."""
|
||||
calls: list[dict[str, Any]] = []
|
||||
upd = os.path.join(session_dir, "updates.jsonl")
|
||||
if not os.path.exists(upd):
|
||||
return calls
|
||||
for rec in iter_jsonl(upd):
|
||||
u = (rec.get("params") or {}).get("update") or {}
|
||||
if u.get("sessionUpdate") == "tool_call":
|
||||
calls.append({"title": u.get("title") or "", "rawInput": u.get("rawInput") or {},
|
||||
"id": u.get("toolCallId")})
|
||||
return calls
|
||||
|
||||
|
||||
def _session_meta(session_dir: str) -> dict[str, Any]:
|
||||
p = os.path.join(session_dir, "summary.json")
|
||||
if not os.path.exists(p):
|
||||
return {}
|
||||
try:
|
||||
with open(p, "r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
except (OSError, ValueError):
|
||||
return {}
|
||||
|
||||
|
||||
def _lifecycle(session_dir: str) -> tuple[list[dict[str, Any]], Optional[str]]:
|
||||
"""events.jsonl records + the model id seen there."""
|
||||
evs, model = [], None
|
||||
p = os.path.join(session_dir, "events.jsonl")
|
||||
if os.path.exists(p):
|
||||
for rec in iter_jsonl(p):
|
||||
evs.append(rec)
|
||||
model = model or rec.get("model_id")
|
||||
return evs, model
|
||||
|
||||
|
||||
def parse_session(path: str, repo_domain_map: Optional[dict[str, str]] = None) -> Optional[Normalized]:
|
||||
repo_domain_map = repo_domain_map or {}
|
||||
# accept either the chat_history.jsonl path or the session dir
|
||||
session_dir = path if os.path.isdir(path) else os.path.dirname(path)
|
||||
chat = os.path.join(session_dir, "chat_history.jsonl")
|
||||
if not os.path.exists(chat):
|
||||
return None
|
||||
|
||||
meta = _session_meta(session_dir)
|
||||
info = meta.get("info") or {}
|
||||
session_id = info.get("id") or os.path.basename(session_dir.rstrip("/"))
|
||||
cwd = info.get("cwd") or meta.get("git_root_dir")
|
||||
life_events, life_model = _lifecycle(session_dir)
|
||||
model = meta.get("current_model_id") or life_model
|
||||
pending_calls = _tool_calls_in_order(session_dir)
|
||||
call_idx = 0
|
||||
|
||||
events: list[SessionEvent] = []
|
||||
blobs: dict[str, str] = {}
|
||||
seq = 0
|
||||
|
||||
def add(kind, *, role=None, tool=None, summary=None, body=None, parent_seq=None) -> int:
|
||||
nonlocal seq
|
||||
s = seq
|
||||
seq += 1
|
||||
ref = None
|
||||
if body:
|
||||
ref = f"blob://{session_id}/{s}"
|
||||
blobs[ref] = body
|
||||
events.append(SessionEvent(
|
||||
session_uid=Session.make_uid(FLAVOR, session_id), seq=s, parent_seq=parent_seq,
|
||||
ts=None, kind=kind, role=role, tool=tool,
|
||||
summary=(summary or "")[:300] or None, payload_ref=ref,
|
||||
))
|
||||
return s
|
||||
|
||||
# explicit lifecycle first (turn_started/turn_ended carry no bodies)
|
||||
for le in life_events:
|
||||
t = le.get("type")
|
||||
if t in ("turn_started", "loop_started", "turn_ended", "phase_changed"):
|
||||
add("lifecycle", summary=t)
|
||||
|
||||
for rec in iter_jsonl(chat):
|
||||
rtype = rec.get("type")
|
||||
content = rec.get("content")
|
||||
if rtype == "user":
|
||||
text = _text_content(content)
|
||||
if text.strip():
|
||||
add("user_msg", role="user", summary=first_line(text), body=text)
|
||||
elif rtype == "reasoning":
|
||||
text = _text_content(content)
|
||||
if text.strip():
|
||||
add("thinking", role="assistant", summary="reasoning", body=text)
|
||||
elif rtype == "assistant":
|
||||
text = _text_content(content)
|
||||
if text.strip():
|
||||
add("assistant_msg", role="assistant", summary=first_line(text), body=text)
|
||||
elif rtype == "tool_result":
|
||||
# pair with the next tool_call (in order) to recover name/args
|
||||
tool = None
|
||||
parent = None
|
||||
if call_idx < len(pending_calls):
|
||||
call = pending_calls[call_idx]
|
||||
call_idx += 1
|
||||
tool = call["title"]
|
||||
cmd = stringify(call["rawInput"])
|
||||
kind = classify_tool(tool, cmd)
|
||||
parent = add(kind, role="assistant", tool=tool, summary=tool, body=cmd)
|
||||
body = _text_content(content) if not isinstance(content, str) else content
|
||||
add("tool_result", role="tool", tool=tool, summary="tool result",
|
||||
body=stringify(body), parent_seq=parent)
|
||||
|
||||
if not events:
|
||||
return None
|
||||
|
||||
cost = Cost(turns=sum(1 for e in events if e.kind == "user_msg"))
|
||||
started = info.get("created_at") or meta.get("created_at")
|
||||
ended = meta.get("last_active_at") or info.get("updated_at") or meta.get("updated_at")
|
||||
cost.wall_clock_s = seconds_between(started, ended)
|
||||
|
||||
repo, domain = resolve_repo(cwd, repo_domain_map)
|
||||
session = Session(
|
||||
session_uid=Session.make_uid(FLAVOR, session_id), flavor=FLAVOR,
|
||||
native_session_id=session_id, repo=repo, domain=domain, cwd=cwd,
|
||||
git_branch=meta.get("head_branch"), model=model,
|
||||
started_at=started, ended_at=ended, outcome="unknown", cost=cost,
|
||||
source_path=chat,
|
||||
source_bytes=_dir_bytes(session_dir),
|
||||
discovered_at=now_iso(),
|
||||
)
|
||||
return Normalized(session=session, events=events, blobs=blobs)
|
||||
|
||||
|
||||
def _dir_bytes(d: str) -> int:
|
||||
total = 0
|
||||
for root, _, files in os.walk(d):
|
||||
for f in files:
|
||||
try:
|
||||
total += os.path.getsize(os.path.join(root, f))
|
||||
except OSError:
|
||||
pass
|
||||
return total
|
||||
@@ -20,14 +20,14 @@ root = "~/.claude/projects"
|
||||
# glob, relative to root; covers sessions and agent-* sidechains
|
||||
glob = "*/*.jsonl"
|
||||
|
||||
# Codex / Grok adapters land in Phase 1 (schemas confirmed in the design doc).
|
||||
# Codex / Grok adapters added in Phase 1 (AGENTIC-WP-0003).
|
||||
[sources.codex]
|
||||
enabled = false
|
||||
enabled = true
|
||||
root = "~/.codex/sessions"
|
||||
glob = "*/*/*/rollout-*.jsonl"
|
||||
|
||||
[sources.grok]
|
||||
enabled = false
|
||||
enabled = true
|
||||
root = "~/.grok/sessions"
|
||||
glob = "*/*/chat_history.jsonl"
|
||||
|
||||
@@ -37,3 +37,5 @@ agentic-resources = "helix_forge"
|
||||
the-custodian = "custodian"
|
||||
state-hub = "custodian"
|
||||
ops-bridge = "custodian"
|
||||
net-kingdom = "netkingdom"
|
||||
can-you-assist = "coulomb_social"
|
||||
|
||||
@@ -12,6 +12,7 @@ Tier 2 digest — the invariant that makes budget-based retention non-lossy.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
@@ -28,6 +29,18 @@ def _now() -> str:
|
||||
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
|
||||
|
||||
def _fingerprint(ev: SessionEvent, body: Optional[str]) -> str:
|
||||
"""Stable content fingerprint, independent of seq/payload_ref, for dedup."""
|
||||
h = hashlib.sha1()
|
||||
parts = [ev.ts or "", ev.kind, ev.role or "", ev.tool or "", ev.summary or "",
|
||||
ev.role or "", str(ev.is_sidechain)]
|
||||
h.update("\x1f".join(parts).encode("utf-8"))
|
||||
if body is not None:
|
||||
h.update(b"\x1e")
|
||||
h.update(body.encode("utf-8"))
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
class Store:
|
||||
def __init__(self, db_path: str, blob_dir: str):
|
||||
self.db_path = db_path
|
||||
@@ -121,14 +134,75 @@ class Store:
|
||||
self.db.commit()
|
||||
return total
|
||||
|
||||
def ingest(self, bundle) -> None:
|
||||
"""Persist a full Normalized bundle (session + events + blobs)."""
|
||||
def ingest(self, bundle) -> int:
|
||||
"""Persist a Normalized bundle, merging into any existing session.
|
||||
|
||||
Multiple files can map to one ``session_uid`` (Claude resume/sidechains;
|
||||
Grok multi-file dirs). Events are de-duplicated by content fingerprint and
|
||||
genuinely-new events are appended with offset ``seq`` (design OQ6 / T03).
|
||||
Returns the number of new events written. Idempotent: re-ingesting the
|
||||
same bundle adds nothing.
|
||||
"""
|
||||
s = bundle.session
|
||||
if s.ingested_at is None:
|
||||
s.ingested_at = _now()
|
||||
self.upsert_session(s)
|
||||
self.upsert_events(bundle.events)
|
||||
self.write_blobs(s.session_uid, bundle.blobs)
|
||||
existing = self.get_session(s.session_uid)
|
||||
if existing is None:
|
||||
if s.ingested_at is None:
|
||||
s.ingested_at = _now()
|
||||
self.upsert_session(s)
|
||||
# known fingerprints + current max seq for this session
|
||||
seen = self._event_fingerprints(s.session_uid)
|
||||
next_seq = self._max_seq(s.session_uid) + 1
|
||||
|
||||
new_events: list[SessionEvent] = []
|
||||
new_blobs: dict[str, str] = {}
|
||||
old_to_new: dict[int, int] = {}
|
||||
for ev in bundle.events:
|
||||
body = bundle.blobs.get(ev.payload_ref) if ev.payload_ref else None
|
||||
fp = _fingerprint(ev, body)
|
||||
if fp in seen:
|
||||
continue # already stored (prior file or prior sweep)
|
||||
new_seq = next_seq
|
||||
next_seq += 1
|
||||
old_to_new[ev.seq] = new_seq
|
||||
# remap parent within this bundle; cross-file parents become None
|
||||
parent = old_to_new.get(ev.parent_seq) if ev.parent_seq is not None else None
|
||||
ref = None
|
||||
if body is not None:
|
||||
ref = f"blob://{s.session_uid}/{new_seq}"
|
||||
new_blobs[ref] = body
|
||||
merged = SessionEvent(
|
||||
session_uid=s.session_uid, seq=new_seq, parent_seq=parent, ts=ev.ts,
|
||||
kind=ev.kind, role=ev.role, tool=ev.tool, summary=ev.summary,
|
||||
payload_ref=ref, tokens=ev.tokens, is_sidechain=ev.is_sidechain,
|
||||
)
|
||||
new_events.append(merged)
|
||||
seen.add(fp)
|
||||
|
||||
if new_events:
|
||||
self.upsert_events(new_events)
|
||||
self.write_blobs(s.session_uid, new_blobs)
|
||||
return len(new_events)
|
||||
|
||||
def _max_seq(self, session_uid: str) -> int:
|
||||
row = self.db.execute(
|
||||
"SELECT COALESCE(MAX(seq), -1) m FROM events WHERE session_uid=?", (session_uid,)
|
||||
).fetchone()
|
||||
return int(row["m"])
|
||||
|
||||
def _event_fingerprints(self, session_uid: str) -> set[str]:
|
||||
fps: set[str] = set()
|
||||
for e in self.get_events(session_uid):
|
||||
body = None
|
||||
if e.payload_ref:
|
||||
r = self.db.execute("SELECT path FROM blobs WHERE ref=?", (e.payload_ref,)).fetchone()
|
||||
if r:
|
||||
try:
|
||||
with open(r["path"], "r", encoding="utf-8") as f:
|
||||
body = f.read()
|
||||
except OSError:
|
||||
body = None
|
||||
fps.add(_fingerprint(e, body))
|
||||
return fps
|
||||
|
||||
# ---- Tier 2 (digest) ---------------------------------------------------
|
||||
|
||||
@@ -149,6 +223,22 @@ class Store:
|
||||
row = self.db.execute("SELECT json FROM digests WHERE session_uid=?", (session_uid,)).fetchone()
|
||||
return json.loads(row["json"]) if row else None
|
||||
|
||||
def list_digests(self) -> list[dict[str, Any]]:
|
||||
return [json.loads(r["json"]) for r in self.db.execute("SELECT json FROM digests")]
|
||||
|
||||
def save_patterns(self, patterns: list[dict[str, Any]]) -> None:
|
||||
"""Persist candidate patterns to a Tier 2 table (replace prior run)."""
|
||||
self.db.execute(
|
||||
"CREATE TABLE IF NOT EXISTS patterns ("
|
||||
"key TEXT PRIMARY KEY, json TEXT NOT NULL, detected_at TEXT NOT NULL)"
|
||||
)
|
||||
self.db.execute("DELETE FROM patterns")
|
||||
self.db.executemany(
|
||||
"INSERT INTO patterns(key, json, detected_at) VALUES(?,?,?)",
|
||||
[(p["key"], json.dumps(p, sort_keys=True), _now()) for p in patterns],
|
||||
)
|
||||
self.db.commit()
|
||||
|
||||
# ---- reads -------------------------------------------------------------
|
||||
|
||||
def get_session(self, session_uid: str) -> Optional[Session]:
|
||||
|
||||
1
session_memory/detect/__init__.py
Normal file
1
session_memory/detect/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Detect: extract signals from sessions, cluster into candidate patterns."""
|
||||
70
session_memory/detect/__main__.py
Normal file
70
session_memory/detect/__main__.py
Normal file
@@ -0,0 +1,70 @@
|
||||
"""Detect entrypoint (T07): digests -> signals -> clusters -> report.
|
||||
|
||||
python -m session_memory.detect [--config PATH] [--json] [--min-frequency N]
|
||||
|
||||
Reads Tier 2 digests from the store, extracts signals, clusters them into
|
||||
candidate patterns, persists the candidates, and prints a ranked report
|
||||
(cross-flavor first) — the input to the Curate phase (Phase 2).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
|
||||
from ..core.store import Store
|
||||
from ..ingest import _expand, load_config
|
||||
from .cluster import cluster
|
||||
from .signals import extract_signals
|
||||
|
||||
|
||||
def run_detect(config: dict, *, min_frequency: int = 2) -> list[dict]:
|
||||
store_cfg = config.get("store", {})
|
||||
store = Store(_expand(store_cfg["db_path"]), _expand(store_cfg["blob_dir"]))
|
||||
digests = store.list_digests()
|
||||
signals = extract_signals(digests)
|
||||
patterns = [p.to_dict() for p in cluster(signals, min_frequency=min_frequency)]
|
||||
store.save_patterns(patterns)
|
||||
store.close()
|
||||
return patterns
|
||||
|
||||
|
||||
def _format_report(patterns: list[dict], n_digests: int) -> str:
|
||||
lines = [f"# Candidate Patterns ({len(patterns)} from {n_digests} sessions)", ""]
|
||||
if not patterns:
|
||||
lines.append("No recurring patterns above the frequency threshold yet.")
|
||||
return "\n".join(lines)
|
||||
for i, p in enumerate(patterns, 1):
|
||||
flag = " [CROSS-FLAVOR]" if p["cross_flavor"] else ""
|
||||
lines.append(f"{i}. {p['title']}{flag}")
|
||||
lines.append(f" score={p['score']} freq={p['frequency']} "
|
||||
f"impact={p['cost_impact']} flavors={','.join(p['flavors'])}")
|
||||
lines.append(f" repos={','.join(p['repos']) or '-'} "
|
||||
f"sessions={len(p['sessions'])}")
|
||||
lines.append("")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main(argv=None) -> int:
|
||||
here = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
ap = argparse.ArgumentParser(description="Detect candidate patterns from session digests.")
|
||||
ap.add_argument("--config", default=os.path.join(here, "config.toml"))
|
||||
ap.add_argument("--min-frequency", type=int, default=2)
|
||||
ap.add_argument("--json", action="store_true", help="emit machine-readable JSON")
|
||||
args = ap.parse_args(argv)
|
||||
|
||||
config = load_config(args.config)
|
||||
store_cfg = config.get("store", {})
|
||||
n = len(Store(_expand(store_cfg["db_path"]), _expand(store_cfg["blob_dir"])).list_digests())
|
||||
patterns = run_detect(config, min_frequency=args.min_frequency)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(patterns, indent=2))
|
||||
else:
|
||||
print(_format_report(patterns, n))
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
78
session_memory/detect/cluster.py
Normal file
78
session_memory/detect/cluster.py
Normal file
@@ -0,0 +1,78 @@
|
||||
"""Pattern clusterer + evidence (PRD §5, §6.2; T05/T06).
|
||||
|
||||
Groups recurring :class:`Signal`s into candidate ``Pattern`` records. Clustering
|
||||
is deterministic and keyed on ``(polarity, signal-type, locus)`` — enough to
|
||||
surface "the same thing keeps happening" without embeddings (a later option).
|
||||
|
||||
Each candidate carries evidence (FR-D3): supporting sessions, frequency, affected
|
||||
repos, affected **flavors**, and an estimated cost-impact score. Candidates whose
|
||||
evidence spans more than one flavor are flagged ``cross_flavor`` (FR-D4) — the
|
||||
highest-value reuse targets.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import collections
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from typing import Any
|
||||
|
||||
from .signals import PROBLEM, Signal
|
||||
|
||||
|
||||
@dataclass
|
||||
class Pattern:
|
||||
key: str # stable cluster key
|
||||
polarity: str # problem | success
|
||||
signal_type: str
|
||||
locus: str
|
||||
frequency: int # number of supporting signals
|
||||
sessions: list[str] = field(default_factory=list)
|
||||
repos: list[str] = field(default_factory=list)
|
||||
flavors: list[str] = field(default_factory=list)
|
||||
cross_flavor: bool = False
|
||||
cost_impact: float = 0.0 # frequency-weighted magnitude
|
||||
score: float = 0.0 # ranking score (impact x frequency)
|
||||
title: str = ""
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
def _key(s: Signal) -> str:
|
||||
return f"{s.polarity}:{s.type}:{s.locus}"
|
||||
|
||||
|
||||
def _title(polarity: str, signal_type: str, n_flavors: int) -> str:
|
||||
scope = "cross-flavor " if n_flavors > 1 else ""
|
||||
verb = "problem" if polarity == PROBLEM else "success"
|
||||
return f"{scope}{verb}: {signal_type.replace('_', ' ')}"
|
||||
|
||||
|
||||
def cluster(signals: list[Signal], *, min_frequency: int = 2) -> list[Pattern]:
|
||||
"""Group signals into candidate patterns; keep clusters >= min_frequency."""
|
||||
groups: dict[str, list[Signal]] = collections.defaultdict(list)
|
||||
for s in signals:
|
||||
groups[_key(s)].append(s)
|
||||
|
||||
patterns: list[Pattern] = []
|
||||
for key, members in groups.items():
|
||||
if len(members) < min_frequency:
|
||||
continue
|
||||
sessions = sorted({m.session_uid for m in members})
|
||||
repos = sorted({m.repo for m in members if m.repo})
|
||||
flavors = sorted({m.flavor for m in members})
|
||||
cost_impact = sum(m.magnitude for m in members)
|
||||
first = members[0]
|
||||
p = Pattern(
|
||||
key=key, polarity=first.polarity, signal_type=first.type, locus=first.locus,
|
||||
frequency=len(members), sessions=sessions, repos=repos, flavors=flavors,
|
||||
cross_flavor=len(flavors) > 1, cost_impact=round(cost_impact, 3),
|
||||
title=_title(first.polarity, first.type, len(flavors)),
|
||||
)
|
||||
# rank: impact x frequency, with a boost for cross-flavor reuse value
|
||||
p.score = round(p.cost_impact * p.frequency * (1.5 if p.cross_flavor else 1.0), 3)
|
||||
patterns.append(p)
|
||||
|
||||
# cross-flavor first, then by score
|
||||
patterns.sort(key=lambda p: (not p.cross_flavor, -p.score))
|
||||
return patterns
|
||||
116
session_memory/detect/signals.py
Normal file
116
session_memory/detect/signals.py
Normal file
@@ -0,0 +1,116 @@
|
||||
"""Signal extractors (PRD §6.2; T04).
|
||||
|
||||
Pure functions over a session digest (Tier 2) — the compact, durable view. Each
|
||||
extractor emits zero or more :class:`Signal`s. A signal records its source
|
||||
session, a *locus* (what it's about), a *polarity* (problem vs. success), and a
|
||||
*magnitude*. Signals are the atoms the clusterer groups into candidate patterns.
|
||||
|
||||
No new capture happens here; everything is derived from digests already written
|
||||
by the Capture layer, so detection is cheap and re-runnable.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Callable, Optional
|
||||
|
||||
# polarity
|
||||
PROBLEM = "problem"
|
||||
SUCCESS = "success"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Signal:
|
||||
session_uid: str
|
||||
flavor: str
|
||||
repo: Optional[str]
|
||||
type: str # e.g. "budget_overrun", "clean_pass"
|
||||
polarity: str # PROBLEM | SUCCESS
|
||||
locus: str # normalized subject key (tool, marker, ...)
|
||||
magnitude: float = 1.0 # strength / cost weight
|
||||
detail: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
# --- individual extractors --------------------------------------------------
|
||||
# Each takes (digest, ctx) and returns a list[Signal]. ctx carries corpus-level
|
||||
# stats (e.g. cost percentiles) so extractors can compare a session to its peers.
|
||||
|
||||
def _base(digest, type_, polarity, locus, magnitude=1.0, **detail) -> Signal:
|
||||
return Signal(
|
||||
session_uid=digest["session_uid"], flavor=digest["flavor"],
|
||||
repo=digest.get("repo"), type=type_, polarity=polarity, locus=locus,
|
||||
magnitude=magnitude, detail=detail,
|
||||
)
|
||||
|
||||
|
||||
def sig_retry_storm(digest, ctx) -> list[Signal]:
|
||||
retries = digest.get("markers", {}).get("retries", 0)
|
||||
if retries >= ctx.get("retry_storm_threshold", 3):
|
||||
return [_base(digest, "retry_storm", PROBLEM, "retries", float(retries), retries=retries)]
|
||||
return []
|
||||
|
||||
|
||||
def sig_repeated_errors(digest, ctx) -> list[Signal]:
|
||||
errors = digest.get("markers", {}).get("errors", 0)
|
||||
if errors >= ctx.get("error_threshold", 3):
|
||||
return [_base(digest, "repeated_errors", PROBLEM, "errors", float(errors), errors=errors)]
|
||||
return []
|
||||
|
||||
|
||||
def sig_budget_overrun(digest, ctx) -> list[Signal]:
|
||||
total = digest.get("cost", {}).get("input_tokens", 0) + digest.get("cost", {}).get("output_tokens", 0)
|
||||
p90 = ctx.get("tokens_p90", 0)
|
||||
if p90 and total > p90:
|
||||
return [_base(digest, "budget_overrun", PROBLEM, "tokens",
|
||||
float(total) / max(p90, 1), tokens=total, p90=p90)]
|
||||
return []
|
||||
|
||||
|
||||
def sig_abandoned(digest, ctx) -> list[Signal]:
|
||||
if digest.get("outcome") == "abandoned":
|
||||
return [_base(digest, "abandoned", PROBLEM, "outcome", 1.0)]
|
||||
return []
|
||||
|
||||
|
||||
def sig_clean_pass(digest, ctx) -> list[Signal]:
|
||||
"""Success: ended success, ran tests, no errors, modest cost."""
|
||||
m = digest.get("markers", {})
|
||||
if (digest.get("outcome") == "success" and m.get("test_runs", 0) >= 1
|
||||
and m.get("errors", 0) == 0 and m.get("retries", 0) == 0):
|
||||
return [_base(digest, "clean_pass", SUCCESS, "outcome", 1.0,
|
||||
test_runs=m.get("test_runs"))]
|
||||
return []
|
||||
|
||||
|
||||
def sig_error_then_recovery(digest, ctx) -> list[Signal]:
|
||||
"""Success despite hitting errors — a recovery worth learning from."""
|
||||
m = digest.get("markers", {})
|
||||
if digest.get("outcome") == "success" and m.get("errors", 0) >= 1:
|
||||
return [_base(digest, "error_then_recovery", SUCCESS, "errors",
|
||||
float(m.get("errors", 1)), errors=m.get("errors"))]
|
||||
return []
|
||||
|
||||
|
||||
EXTRACTORS: list[Callable] = [
|
||||
sig_retry_storm, sig_repeated_errors, sig_budget_overrun, sig_abandoned,
|
||||
sig_clean_pass, sig_error_then_recovery,
|
||||
]
|
||||
|
||||
|
||||
def build_context(digests: list[dict]) -> dict[str, Any]:
|
||||
"""Corpus-level stats so extractors can compare a session to its peers."""
|
||||
totals = sorted(
|
||||
d.get("cost", {}).get("input_tokens", 0) + d.get("cost", {}).get("output_tokens", 0)
|
||||
for d in digests
|
||||
)
|
||||
p90 = totals[int(0.9 * (len(totals) - 1))] if totals else 0
|
||||
return {"tokens_p90": p90, "retry_storm_threshold": 3, "error_threshold": 3}
|
||||
|
||||
|
||||
def extract_signals(digests: list[dict], ctx: Optional[dict] = None) -> list[Signal]:
|
||||
ctx = ctx or build_context(digests)
|
||||
out: list[Signal] = []
|
||||
for d in digests:
|
||||
for ex in EXTRACTORS:
|
||||
out.extend(ex(d, ctx))
|
||||
return out
|
||||
@@ -19,13 +19,19 @@ from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
from .adapters import claude as claude_adapter
|
||||
from .adapters import codex as codex_adapter
|
||||
from .adapters import grok as grok_adapter
|
||||
from .core import digest as digest_mod
|
||||
from .core.cursor import Cursors
|
||||
from .core.retention import RetentionConfig, sweep as retention_sweep
|
||||
from .core.store import Store
|
||||
|
||||
# adapter dispatch by source name
|
||||
_ADAPTERS = {"claude": claude_adapter.parse_session}
|
||||
_ADAPTERS = {
|
||||
"claude": claude_adapter.parse_session,
|
||||
"codex": codex_adapter.parse_session,
|
||||
"grok": grok_adapter.parse_session,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
54
tests/test_cluster.py
Normal file
54
tests/test_cluster.py
Normal file
@@ -0,0 +1,54 @@
|
||||
"""Clusterer + evidence + cross-flavor tests (T05/T06)."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from session_memory.detect.cluster import cluster # noqa: E402
|
||||
from session_memory.detect.signals import PROBLEM, SUCCESS, Signal # noqa: E402
|
||||
|
||||
|
||||
def _sig(uid, flavor, repo, type_, polarity, locus, mag=1.0):
|
||||
return Signal(session_uid=uid, flavor=flavor, repo=repo, type=type_,
|
||||
polarity=polarity, locus=locus, magnitude=mag)
|
||||
|
||||
|
||||
def test_min_frequency_filters_singletons():
|
||||
sigs = [_sig("claude:a", "claude", "r1", "retry_storm", PROBLEM, "retries")]
|
||||
assert cluster(sigs, min_frequency=2) == []
|
||||
|
||||
|
||||
def test_clusters_recurring_signal_with_evidence():
|
||||
sigs = [
|
||||
_sig("claude:a", "claude", "r1", "retry_storm", PROBLEM, "retries", 5),
|
||||
_sig("claude:b", "claude", "r2", "retry_storm", PROBLEM, "retries", 3),
|
||||
]
|
||||
pats = cluster(sigs, min_frequency=2)
|
||||
assert len(pats) == 1
|
||||
p = pats[0]
|
||||
assert p.frequency == 2
|
||||
assert p.sessions == ["claude:a", "claude:b"]
|
||||
assert sorted(p.repos) == ["r1", "r2"]
|
||||
assert p.flavors == ["claude"]
|
||||
assert p.cross_flavor is False
|
||||
assert p.cost_impact == 8.0
|
||||
|
||||
|
||||
def test_cross_flavor_flagged_and_ranked_first():
|
||||
sigs = [
|
||||
# cross-flavor problem (claude + codex)
|
||||
_sig("claude:a", "claude", "r1", "repeated_errors", PROBLEM, "errors", 3),
|
||||
_sig("codex:b", "codex", "r2", "repeated_errors", PROBLEM, "errors", 3),
|
||||
# single-flavor success cluster with higher raw impact
|
||||
_sig("grok:c", "grok", "r3", "clean_pass", SUCCESS, "outcome", 5),
|
||||
_sig("grok:d", "grok", "r4", "clean_pass", SUCCESS, "outcome", 5),
|
||||
]
|
||||
pats = cluster(sigs, min_frequency=2)
|
||||
assert len(pats) == 2
|
||||
xf = next(p for p in pats if p.signal_type == "repeated_errors")
|
||||
assert xf.cross_flavor is True
|
||||
assert sorted(xf.flavors) == ["claude", "codex"]
|
||||
# cross-flavor pattern is ranked first even if another has higher raw impact
|
||||
assert pats[0].cross_flavor is True
|
||||
assert "cross-flavor" in pats[0].title
|
||||
86
tests/test_codex_adapter.py
Normal file
86
tests/test_codex_adapter.py
Normal file
@@ -0,0 +1,86 @@
|
||||
"""Codex adapter tests (T01): synthetic rollout fixture."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from session_memory.adapters.codex import parse_session # noqa: E402
|
||||
|
||||
REPO_MAP = {"agentic-resources": "helix_forge"}
|
||||
|
||||
|
||||
def _rollout(path, lines):
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
for ln in lines:
|
||||
f.write(json.dumps(ln) + "\n")
|
||||
|
||||
|
||||
def test_codex_rollout_parse(tmp_path):
|
||||
p = tmp_path / "rollout-2026-06-06-abc.jsonl"
|
||||
_rollout(p, [
|
||||
{"timestamp": "2026-06-06T10:00:00Z", "type": "session_meta",
|
||||
"payload": {"id": "cdx-1", "cwd": "/home/worsch/agentic-resources",
|
||||
"model_provider": "openai", "cli_version": "0.44.0", "model": "gpt-5-codex"}},
|
||||
{"timestamp": "2026-06-06T10:00:01Z", "type": "turn_context",
|
||||
"payload": {"model": "gpt-5-codex", "approval_policy": "on-request"}},
|
||||
{"timestamp": "2026-06-06T10:00:02Z", "type": "event_msg",
|
||||
"payload": {"type": "task_started"}},
|
||||
{"timestamp": "2026-06-06T10:00:03Z", "type": "response_item",
|
||||
"payload": {"type": "message", "role": "user",
|
||||
"content": [{"type": "input_text", "text": "fix the bug"}]}},
|
||||
{"timestamp": "2026-06-06T10:00:04Z", "type": "response_item",
|
||||
"payload": {"type": "reasoning", "summary": "think about it"}},
|
||||
{"timestamp": "2026-06-06T10:00:05Z", "type": "response_item",
|
||||
"payload": {"type": "function_call", "name": "apply_patch",
|
||||
"arguments": "{\"path\":\"x.py\"}", "call_id": "call_1"}},
|
||||
{"timestamp": "2026-06-06T10:00:06Z", "type": "response_item",
|
||||
"payload": {"type": "function_call", "name": "shell",
|
||||
"arguments": "{\"command\":\"pytest -q\"}", "call_id": "call_2"}},
|
||||
{"timestamp": "2026-06-06T10:00:07Z", "type": "response_item",
|
||||
"payload": {"type": "function_call_output", "call_id": "call_2", "output": "2 passed"}},
|
||||
{"timestamp": "2026-06-06T10:00:08Z", "type": "response_item",
|
||||
"payload": {"type": "message", "role": "assistant",
|
||||
"content": [{"type": "output_text", "text": "done"}]}},
|
||||
{"timestamp": "2026-06-06T10:00:09Z", "type": "event_msg",
|
||||
"payload": {"type": "token_count",
|
||||
"info": {"total_token_usage": {"input_tokens": 200, "output_tokens": 30,
|
||||
"cached_input_tokens": 15}}}},
|
||||
{"timestamp": "2026-06-06T10:00:10Z", "type": "event_msg",
|
||||
"payload": {"type": "task_complete"}},
|
||||
])
|
||||
|
||||
norm = parse_session(str(p), REPO_MAP)
|
||||
assert norm is not None
|
||||
s = norm.session
|
||||
assert s.session_uid == "codex:cdx-1"
|
||||
assert s.flavor == "codex"
|
||||
assert s.repo == "agentic-resources" and s.domain == "helix_forge"
|
||||
assert s.model == "gpt-5-codex"
|
||||
assert s.cost.input_tokens == 200 and s.cost.output_tokens == 30 and s.cost.cache_tokens == 15
|
||||
assert s.cost.turns == 1
|
||||
assert s.cost.wall_clock_s == 10.0
|
||||
|
||||
kinds = [e.kind for e in norm.events]
|
||||
assert kinds == ["lifecycle", "user_msg", "thinking", "edit", "test_run",
|
||||
"tool_result", "assistant_msg", "completion"]
|
||||
|
||||
# flat linkage: function_call_output links to its function_call by call_id
|
||||
out = next(e for e in norm.events if e.kind == "tool_result")
|
||||
test_call = next(e for e in norm.events if e.kind == "test_run")
|
||||
assert out.parent_seq == test_call.seq
|
||||
|
||||
# apply_patch classified as edit; pytest as test_run
|
||||
edit = next(e for e in norm.events if e.kind == "edit")
|
||||
assert edit.tool == "apply_patch"
|
||||
|
||||
|
||||
def test_codex_empty_or_no_meta_returns_none(tmp_path):
|
||||
p = tmp_path / "rollout-empty.jsonl"
|
||||
p.write_text("")
|
||||
assert parse_session(str(p), REPO_MAP) is None
|
||||
|
||||
p2 = tmp_path / "rollout-nometa.jsonl"
|
||||
_rollout(p2, [{"timestamp": "t", "type": "event_msg", "payload": {"type": "task_started"}}])
|
||||
assert parse_session(str(p2), REPO_MAP) is None # no session_meta -> no id
|
||||
44
tests/test_detect_entrypoint.py
Normal file
44
tests/test_detect_entrypoint.py
Normal file
@@ -0,0 +1,44 @@
|
||||
"""Detect entrypoint tests (T07): end-to-end digests -> patterns, persisted."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from session_memory.core.store import Store # noqa: E402
|
||||
from session_memory.detect.__main__ import run_detect # noqa: E402
|
||||
|
||||
|
||||
def _digest(uid, flavor, repo, **markers):
|
||||
return {
|
||||
"session_uid": uid, "flavor": flavor, "repo": repo, "outcome": "fail",
|
||||
"cost": {"input_tokens": 10, "output_tokens": 1},
|
||||
"markers": {"errors": markers.get("errors", 0), "retries": markers.get("retries", 0),
|
||||
"test_runs": 0, "edits": 0, "human_interventions": 0},
|
||||
}
|
||||
|
||||
|
||||
def _config(tmp_path):
|
||||
return {"store": {"db_path": str(tmp_path / ".store/m.db"),
|
||||
"blob_dir": str(tmp_path / ".store/blobs"),
|
||||
"cursor": str(tmp_path / ".store/c.json")}}
|
||||
|
||||
|
||||
def test_run_detect_persists_cross_flavor_pattern(tmp_path):
|
||||
cfg = _config(tmp_path)
|
||||
st = Store(cfg["store"]["db_path"], cfg["store"]["blob_dir"])
|
||||
# same problem (retry_storm) across two flavors -> cross-flavor candidate
|
||||
st.write_digest("claude:a", _digest("claude:a", "claude", "r1", retries=5))
|
||||
st.write_digest("codex:b", _digest("codex:b", "codex", "r2", retries=4))
|
||||
st.close()
|
||||
|
||||
patterns = run_detect(cfg, min_frequency=2)
|
||||
assert len(patterns) == 1
|
||||
assert patterns[0]["cross_flavor"] is True
|
||||
assert patterns[0]["signal_type"] == "retry_storm"
|
||||
|
||||
# persisted to the Tier 2 patterns table
|
||||
st2 = Store(cfg["store"]["db_path"], cfg["store"]["blob_dir"])
|
||||
rows = st2.db.execute("SELECT key FROM patterns").fetchall()
|
||||
assert len(rows) == 1
|
||||
st2.close()
|
||||
92
tests/test_grok_adapter.py
Normal file
92
tests/test_grok_adapter.py
Normal file
@@ -0,0 +1,92 @@
|
||||
"""Grok adapter tests (T02): synthetic session dir + real local sessions."""
|
||||
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from session_memory.adapters.grok import parse_session # noqa: E402
|
||||
|
||||
REPO_MAP = {"agentic-resources": "helix_forge", "net-kingdom": "netkingdom",
|
||||
"can-you-assist": "coulomb_social"}
|
||||
|
||||
|
||||
def _mk_session(dir_path, sid):
|
||||
os.makedirs(dir_path, exist_ok=True)
|
||||
with open(os.path.join(dir_path, "summary.json"), "w") as f:
|
||||
json.dump({"info": {"id": sid, "cwd": "/home/worsch/agentic-resources"},
|
||||
"created_at": "2026-06-06T10:00:00Z",
|
||||
"last_active_at": "2026-06-06T10:05:00Z",
|
||||
"current_model_id": "grok-build", "head_branch": "main"}, f)
|
||||
with open(os.path.join(dir_path, "events.jsonl"), "w") as f:
|
||||
f.write(json.dumps({"ts": "2026-06-06T10:00:00Z", "type": "turn_started",
|
||||
"turn_number": 0, "model_id": "grok-build"}) + "\n")
|
||||
f.write(json.dumps({"ts": "2026-06-06T10:05:00Z", "type": "turn_ended",
|
||||
"turn_number": 0}) + "\n")
|
||||
with open(os.path.join(dir_path, "chat_history.jsonl"), "w") as f:
|
||||
for rec in [
|
||||
{"type": "system", "content": "sys prompt"},
|
||||
{"type": "user", "content": [{"type": "text", "text": "fix the bug"}]},
|
||||
{"type": "reasoning", "content": [{"type": "text", "text": "thinking..."}]},
|
||||
{"type": "assistant", "content": ""}, # empty -> skipped
|
||||
{"type": "tool_result", "content": "The file x.py has been updated"},
|
||||
{"type": "assistant", "content": "done"},
|
||||
{"type": "tool_result", "content": "6 passed"},
|
||||
]:
|
||||
f.write(json.dumps(rec) + "\n")
|
||||
with open(os.path.join(dir_path, "updates.jsonl"), "w") as f:
|
||||
for u in [
|
||||
{"sessionUpdate": "tool_call", "toolCallId": "c1", "title": "edit_file",
|
||||
"rawInput": {"target_file": "x.py"}},
|
||||
{"sessionUpdate": "tool_call", "toolCallId": "c2", "title": "shell",
|
||||
"rawInput": {"command": "pytest -q"}},
|
||||
]:
|
||||
f.write(json.dumps({"timestamp": "t", "method": "session/update",
|
||||
"params": {"sessionId": sid, "update": u}}) + "\n")
|
||||
|
||||
|
||||
def test_grok_synthetic_dir(tmp_path):
|
||||
d = tmp_path / "%2Fhome%2Fworsch%2Fagentic-resources" / "sid-1"
|
||||
_mk_session(str(d), "sid-1")
|
||||
|
||||
norm = parse_session(str(d / "chat_history.jsonl"), REPO_MAP)
|
||||
assert norm is not None
|
||||
s = norm.session
|
||||
assert s.session_uid == "grok:sid-1"
|
||||
assert s.flavor == "grok"
|
||||
assert s.repo == "agentic-resources" and s.domain == "helix_forge"
|
||||
assert s.model == "grok-build"
|
||||
assert s.git_branch == "main"
|
||||
assert s.cost.turns == 1
|
||||
assert s.cost.wall_clock_s == 300.0
|
||||
|
||||
kinds = [e.kind for e in norm.events]
|
||||
# 4 lifecycle from events.jsonl? no: turn_started + turn_ended = 2 lifecycle
|
||||
assert kinds.count("lifecycle") == 2
|
||||
assert "user_msg" in kinds and "thinking" in kinds and "assistant_msg" in kinds
|
||||
# paired tool calls recovered names -> edit + test_run, each followed by tool_result
|
||||
assert "edit" in kinds and "test_run" in kinds
|
||||
edit = next(e for e in norm.events if e.kind == "edit")
|
||||
assert edit.tool == "edit_file"
|
||||
# tool_result after test_run links to it
|
||||
tr = [e for e in norm.events if e.kind == "tool_result"]
|
||||
assert len(tr) == 2
|
||||
|
||||
|
||||
def test_real_local_grok_sessions_if_available():
|
||||
base = os.path.expanduser("~/.grok/sessions")
|
||||
chats = glob.glob(os.path.join(base, "*", "*", "chat_history.jsonl"))
|
||||
if not chats:
|
||||
return
|
||||
parsed = 0
|
||||
for c in chats:
|
||||
norm = parse_session(c, REPO_MAP)
|
||||
if norm is None:
|
||||
continue
|
||||
parsed += 1
|
||||
assert norm.session.session_uid.startswith("grok:")
|
||||
seqs = [e.seq for e in norm.events]
|
||||
assert seqs == sorted(seqs) and len(seqs) == len(set(seqs))
|
||||
assert parsed >= 1
|
||||
66
tests/test_merge.py
Normal file
66
tests/test_merge.py
Normal file
@@ -0,0 +1,66 @@
|
||||
"""Multi-file session merge tests (T03)."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from session_memory.adapters.common import Normalized # noqa: E402
|
||||
from session_memory.core.schema import Session, SessionEvent # noqa: E402
|
||||
from session_memory.core.store import Store # noqa: E402
|
||||
|
||||
|
||||
def _part(native, kinds, base_blob="b"):
|
||||
uid = Session.make_uid("claude", native)
|
||||
s = Session(session_uid=uid, flavor="claude", native_session_id=native)
|
||||
events, blobs = [], {}
|
||||
for i, k in enumerate(kinds):
|
||||
ref = f"blob://{native}/{i}"
|
||||
events.append(SessionEvent(session_uid=uid, seq=i, parent_seq=(i - 1 if i else None),
|
||||
kind=k, ts=f"2026-06-06T10:0{i}:00Z", payload_ref=ref))
|
||||
blobs[ref] = f"{base_blob}-{k}-{i}"
|
||||
return Normalized(session=s, events=events, blobs=blobs)
|
||||
|
||||
|
||||
def test_second_file_appends_not_overwrites(tmp_path):
|
||||
st = Store(str(tmp_path / "m.db"), str(tmp_path / "blobs"))
|
||||
uid = Session.make_uid("claude", "s1")
|
||||
|
||||
# file 1: 3 events (seq 0..2)
|
||||
n1 = _part("s1", ["user_msg", "assistant_msg", "tool_call"])
|
||||
added1 = st.ingest(n1)
|
||||
assert added1 == 3
|
||||
assert st.count_events(uid) == 3
|
||||
|
||||
# file 2 for the SAME session: repeats event 0 + adds 2 new (continuation)
|
||||
n2 = _part("s1", ["user_msg", "edit", "completion"])
|
||||
# make the first event identical to file1's first event so it dedups
|
||||
n2.events[0].kind = "user_msg"
|
||||
n2.events[0].ts = "2026-06-06T10:00:00Z"
|
||||
n2.blobs[n2.events[0].payload_ref] = "b-user_msg-0"
|
||||
added2 = st.ingest(n2)
|
||||
|
||||
# only the 2 genuinely-new events appended; total grows additively
|
||||
assert added2 == 2
|
||||
assert st.count_events(uid) == 5
|
||||
seqs = [e.seq for e in st.get_events(uid)]
|
||||
assert seqs == [0, 1, 2, 3, 4] # contiguous, offset
|
||||
|
||||
|
||||
def test_reingest_same_bundle_is_idempotent(tmp_path):
|
||||
st = Store(str(tmp_path / "m.db"), str(tmp_path / "blobs"))
|
||||
uid = Session.make_uid("claude", "s2")
|
||||
n = _part("s2", ["user_msg", "assistant_msg"])
|
||||
assert st.ingest(n) == 2
|
||||
assert st.ingest(n) == 0 # nothing new on re-run
|
||||
assert st.count_events(uid) == 2
|
||||
|
||||
|
||||
def test_appended_event_parent_remapped_within_part(tmp_path):
|
||||
st = Store(str(tmp_path / "m.db"), str(tmp_path / "blobs"))
|
||||
uid = Session.make_uid("claude", "s3")
|
||||
st.ingest(_part("s3", ["user_msg", "assistant_msg"])) # seq 0,1
|
||||
st.ingest(_part("s3", ["x_unused"]) if False else _part("s3", ["thinking", "edit"])) # new 2,3
|
||||
events = {e.seq: e for e in st.get_events(uid)}
|
||||
# the 'edit' (seq 3) had parent_seq=0 within its part -> remapped to its part's first new seq (2)
|
||||
assert events[3].parent_seq == 2
|
||||
53
tests/test_signals.py
Normal file
53
tests/test_signals.py
Normal file
@@ -0,0 +1,53 @@
|
||||
"""Signal extractor tests (T04)."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from session_memory.detect.signals import ( # noqa: E402
|
||||
PROBLEM, SUCCESS, build_context, extract_signals,
|
||||
)
|
||||
|
||||
|
||||
def _digest(uid, flavor="claude", repo="r", outcome="success", tokens=100,
|
||||
errors=0, retries=0, test_runs=0):
|
||||
return {
|
||||
"session_uid": uid, "flavor": flavor, "repo": repo, "outcome": outcome,
|
||||
"cost": {"input_tokens": tokens, "output_tokens": 0},
|
||||
"markers": {"errors": errors, "retries": retries, "test_runs": test_runs,
|
||||
"edits": 0, "human_interventions": 0},
|
||||
}
|
||||
|
||||
|
||||
def test_problem_signals():
|
||||
digests = [
|
||||
_digest("claude:a", retries=5, outcome="fail"),
|
||||
_digest("claude:b", errors=4),
|
||||
_digest("claude:c", outcome="abandoned"),
|
||||
]
|
||||
sigs = extract_signals(digests)
|
||||
types = {(s.session_uid, s.type) for s in sigs}
|
||||
assert ("claude:a", "retry_storm") in types
|
||||
assert ("claude:b", "repeated_errors") in types
|
||||
assert ("claude:c", "abandoned") in types
|
||||
assert all(s.polarity == PROBLEM for s in sigs
|
||||
if s.type in ("retry_storm", "repeated_errors", "abandoned"))
|
||||
|
||||
|
||||
def test_success_signals():
|
||||
sigs = extract_signals([_digest("grok:x", outcome="success", test_runs=2)])
|
||||
assert any(s.type == "clean_pass" and s.polarity == SUCCESS for s in sigs)
|
||||
|
||||
rec = extract_signals([_digest("codex:y", outcome="success", errors=2)])
|
||||
assert any(s.type == "error_then_recovery" and s.polarity == SUCCESS for s in rec)
|
||||
|
||||
|
||||
def test_budget_overrun_uses_corpus_p90():
|
||||
digests = [_digest(f"claude:{i}", tokens=100) for i in range(10)]
|
||||
digests.append(_digest("claude:big", tokens=100000))
|
||||
ctx = build_context(digests)
|
||||
assert ctx["tokens_p90"] >= 100
|
||||
sigs = extract_signals(digests, ctx)
|
||||
overruns = [s for s in sigs if s.type == "budget_overrun"]
|
||||
assert overruns and overruns[0].session_uid == "claude:big"
|
||||
169
workplans/AGENTIC-WP-0003-session-memory-phase1.md
Normal file
169
workplans/AGENTIC-WP-0003-session-memory-phase1.md
Normal file
@@ -0,0 +1,169 @@
|
||||
---
|
||||
id: AGENTIC-WP-0003
|
||||
type: workplan
|
||||
title: "Coding Session Memory — Phase 1 (Codex + Grok adapters, Detect)"
|
||||
domain: helix_forge
|
||||
repo: agentic-resources
|
||||
status: finished
|
||||
owner: codex
|
||||
topic_slug: helix-forge
|
||||
created: "2026-06-06"
|
||||
updated: "2026-06-06"
|
||||
state_hub_workstream_id: "88c75b47-1c89-43bc-bb3e-739ec3c8f7d4"
|
||||
---
|
||||
|
||||
# Coding Session Memory — Phase 1
|
||||
|
||||
Extends Phase 0 ([AGENTIC-WP-0002](AGENTIC-WP-0002-session-memory-phase0.md)) along
|
||||
two axes of [PRD-helix-forge](../docs/PRD-helix-forge.md):
|
||||
|
||||
1. **Multi-flavor capture (G1/G6):** add the Codex and Grok collector adapters so
|
||||
the agnostic core ingests all three families through thin edges.
|
||||
2. **Detect (PRD §6.2):** run signal extractors over normalized sessions, cluster
|
||||
recurring signals into candidate problem/success patterns, attach evidence, and
|
||||
flag cross-flavor patterns.
|
||||
|
||||
Both flavors' on-disk schemas are already confirmed in
|
||||
[DESIGN-session-memory.md](../docs/DESIGN-session-memory.md) §2.2 (Codex) and §2.3
|
||||
(Grok), with the native→`kind` mapping in §4.3 — so the adapters are written
|
||||
against known structures, not discovered ones.
|
||||
|
||||
## Codex Collector Adapter
|
||||
|
||||
```task
|
||||
id: AGENTIC-WP-0003-T01
|
||||
status: done
|
||||
priority: high
|
||||
state_hub_task_id: "91264fd4-ba99-4add-b317-e2320c3c932c"
|
||||
```
|
||||
|
||||
Implement `adapters/codex.py` reading `~/.codex/sessions/YYYY/MM/DD/rollout-*.jsonl`
|
||||
per design §2.2: line wrapper `{timestamp,type,payload}`; map `session_meta`→Session
|
||||
fields, `turn_context`→model, `response_item/message`→`user_msg`/`assistant_msg`,
|
||||
`function_call`+`function_call_output` (joined on `call_id`)→`tool_call`/`tool_result`,
|
||||
`reasoning`→`thinking`, `event_msg/task_*`→`lifecycle`/`completion`,
|
||||
`event_msg/token_count`→cost. Codex is flat: assign `seq`/`parent_seq` by temporal
|
||||
order (no native DAG). Version-detect on `session_meta.cli_version`. Reuse the
|
||||
`Normalized` bundle contract. Tests use synthetic rollout fixtures; confirm the
|
||||
`token_count` payload field names against a real install if Codex is present
|
||||
(design OQ1 residual).
|
||||
|
||||
## Grok Collector Adapter
|
||||
|
||||
```task
|
||||
id: AGENTIC-WP-0003-T02
|
||||
status: done
|
||||
priority: high
|
||||
state_hub_task_id: "fe3d7d1c-110e-4f16-8d56-062fa4a651aa"
|
||||
```
|
||||
|
||||
Implement `adapters/grok.py` reading the per-session directory
|
||||
`~/.grok/sessions/<cwd>/<uuid>/` per design §2.3: `summary.json`→Session
|
||||
id/cwd/timestamps, `chat_history.jsonl`→messages, `events.jsonl`→explicit
|
||||
`lifecycle` events and `turn_number` (key `seq` off it), tool calls/results from
|
||||
`chat_history`/`updates.jsonl`, token fields from events/updates. Resolve the
|
||||
url-encoded cwd dir name back to a path. Tests against the real local Grok
|
||||
sessions on this workstation plus a synthetic dir fixture.
|
||||
|
||||
## Multi-File / Multi-Part Session Merge
|
||||
|
||||
```task
|
||||
id: AGENTIC-WP-0003-T03
|
||||
status: done
|
||||
priority: medium
|
||||
state_hub_task_id: "c4acfb63-84cd-4299-a44d-91bb6857fa88"
|
||||
```
|
||||
|
||||
Address design OQ6 (surfaced in Phase 0): several files can map to one
|
||||
`session_uid` (resume, sidechains; Grok dirs are inherently multi-file). Change
|
||||
the store/ingest path to **merge** events across parts of one session rather than
|
||||
last-file-wins upsert — stable event ordering and de-duplication keyed on native
|
||||
identity. Verify event counts are additive and idempotent on re-run.
|
||||
|
||||
## Signal Extractors
|
||||
|
||||
```task
|
||||
id: AGENTIC-WP-0003-T04
|
||||
status: done
|
||||
priority: high
|
||||
state_hub_task_id: "20920c5d-16f7-43bb-9ed7-9afbfeaf7207"
|
||||
```
|
||||
|
||||
Implement `detect/signals.py`: derive `Signal`s from normalized sessions/digests —
|
||||
e.g. repeated test failure on the same target, budget overrun (cost vs. peers),
|
||||
retry storm, fast clean resolution, human escalation, error-then-recovery. Each
|
||||
signal carries its source `session_uid`, locus (file/tool/task), polarity
|
||||
(problem|success), and magnitude. Pure functions over Tier 1 events + Tier 2
|
||||
digests; no new capture. Unit-tested on synthetic sessions.
|
||||
|
||||
## Pattern Clusterer
|
||||
|
||||
```task
|
||||
id: AGENTIC-WP-0003-T05
|
||||
status: done
|
||||
priority: high
|
||||
state_hub_task_id: "f42d57f6-34dc-4a92-bf6a-4d8eab572467"
|
||||
```
|
||||
|
||||
Implement `detect/cluster.py`: group recurring signals across sessions/repos/
|
||||
flavors into candidate `ProblemPattern`/`SuccessPattern` records (PRD §5). Start
|
||||
with deterministic keyed clustering (locus + signal-type + normalized message);
|
||||
leave embedding-based similarity as a later option. Output candidates with
|
||||
frequency and member session lists.
|
||||
|
||||
## Pattern Evidence + Cross-Flavor Flagging
|
||||
|
||||
```task
|
||||
id: AGENTIC-WP-0003-T06
|
||||
status: done
|
||||
priority: medium
|
||||
state_hub_task_id: "8fd502d6-d138-4a42-acd5-6f5921859605"
|
||||
```
|
||||
|
||||
For each candidate pattern (PRD §6.2 FR-D3/FR-D4) attach evidence: supporting
|
||||
sessions, frequency, affected repos, affected **flavors**, and estimated cost
|
||||
impact (token/retry deltas vs. baseline). Explicitly flag candidates whose
|
||||
evidence spans more than one flavor as `cross_flavor: true` — the highest-value
|
||||
reuse targets. Persist candidates to a Tier 2 `patterns` store/table.
|
||||
|
||||
## Candidate Pattern Report
|
||||
|
||||
```task
|
||||
id: AGENTIC-WP-0003-T07
|
||||
status: done
|
||||
priority: medium
|
||||
state_hub_task_id: "34a96d5d-9165-4761-b91e-3643b0401410"
|
||||
```
|
||||
|
||||
Add a `detect` entrypoint (`python -m session_memory.detect`) that runs extractors
|
||||
→ clusterer → evidence and emits a human-readable candidate report (ranked by
|
||||
cost impact × frequency, cross-flavor first), plus machine-readable JSON. This is
|
||||
the input to the Curate phase (Phase 2) review workflow. Document usage in the
|
||||
session_memory README.
|
||||
|
||||
## Verify Across All Three Flavors
|
||||
|
||||
```task
|
||||
id: AGENTIC-WP-0003-T08
|
||||
status: done
|
||||
priority: medium
|
||||
state_hub_task_id: "b272c3fa-af81-4a6c-9ed9-7b42173efa81"
|
||||
```
|
||||
|
||||
Run the full pipeline (ingest all enabled sources → digest → detect) against the
|
||||
real local Claude and Grok sessions on this workstation (Codex via fixtures if not
|
||||
installed). Confirm: normalized rows for each flavor, at least one candidate
|
||||
pattern surfaced, and at least one **cross-flavor** pattern detected if the data
|
||||
supports it (PRD success metric). Record results and refresh design open
|
||||
questions. After workplan file updates, notify the custodian operator to run from
|
||||
`~/state-hub`:
|
||||
|
||||
```bash
|
||||
make fix-consistency REPO=agentic-resources
|
||||
```
|
||||
|
||||
**Verification results (2026-06-06):** full suite 40/40 green. Live pipeline over
|
||||
real local sessions (Codex not installed → fixtures): ingested 88 → 67 digests
|
||||
(63 Claude + 4 Grok); detect surfaced 3 candidate patterns, **2 cross-flavor**
|
||||
(Claude+Grok) — "clean pass" success across 18 sessions and "abandoned" problem
|
||||
across 13 — plus a Claude-only budget-overrun. PRD cross-flavor success metric met.
|
||||
Reference in New Issue
Block a user