generated from coulomb/repo-seed
session-memory Phase 1: Codex adapter (T01) + multi-file merge (T03)
- adapters/common.py: shared Normalized + helpers (resolve_repo, classify_tool,
jsonl iter, etc.); claude.py refactored to use it (Normalized re-exported)
- adapters/codex.py: rollout {timestamp,type,payload} parser; session_meta/
response_item/event_msg mapping; flat call_id join; token_count cost;
registered in ingest dispatch
- core/store.py: ingest() now merges multi-file sessions by content
fingerprint, appends new events with offset seq (design OQ6); idempotent
- tests/test_codex_adapter.py, tests/test_merge.py
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -11,54 +11,23 @@ that the store persists out-of-line so Tier 1 rows stay light.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Iterable, Optional
|
||||
from typing import Any, Optional
|
||||
|
||||
from ..core.schema import Cost, Session, SessionEvent
|
||||
from .common import ( # noqa: F401 (Normalized re-exported for back-compat)
|
||||
Normalized,
|
||||
classify_tool,
|
||||
first_line as _first_line,
|
||||
iter_jsonl as _iter_records,
|
||||
now_iso as _now,
|
||||
resolve_repo as _resolve_repo,
|
||||
seconds_between as _seconds_between,
|
||||
stringify as _stringify,
|
||||
)
|
||||
|
||||
FLAVOR = "claude"
|
||||
|
||||
# tool_use names that mutate files -> kind "edit"
|
||||
_EDIT_TOOLS = {"Edit", "Write", "NotebookEdit", "MultiEdit"}
|
||||
# crude test-runner detection inside Bash commands -> kind "test_run"
|
||||
_TEST_HINTS = ("pytest", "unittest", "npm test", "npm run test", "go test", "cargo test", "jest", "vitest")
|
||||
|
||||
|
||||
@dataclass
|
||||
class Normalized:
|
||||
session: Session
|
||||
events: list[SessionEvent]
|
||||
blobs: dict[str, str] = field(default_factory=dict)
|
||||
|
||||
|
||||
def _iter_records(path: str) -> Iterable[dict[str, Any]]:
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
yield json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
continue # tolerate partial/corrupt trailing lines
|
||||
|
||||
|
||||
def _resolve_repo(cwd: Optional[str], repo_domain_map: dict[str, str]) -> tuple[Optional[str], Optional[str]]:
|
||||
"""cwd -> (repo, domain). repo is the cwd basename; domain via map."""
|
||||
if not cwd:
|
||||
return None, None
|
||||
repo = os.path.basename(cwd.rstrip("/")) or None
|
||||
domain = repo_domain_map.get(repo) if repo else None
|
||||
return repo, domain
|
||||
|
||||
|
||||
def _is_test_command(text: str) -> bool:
|
||||
low = text.lower()
|
||||
return any(h in low for h in _TEST_HINTS)
|
||||
|
||||
|
||||
def _content_blocks(message: dict[str, Any]) -> list[dict[str, Any]]:
|
||||
content = message.get("content")
|
||||
@@ -159,11 +128,8 @@ def parse_session(path: str, repo_domain_map: Optional[dict[str, str]] = None) -
|
||||
name = b.get("name", "")
|
||||
inp = b.get("input", {})
|
||||
body = _stringify(inp)
|
||||
kind = "tool_call"
|
||||
if name in _EDIT_TOOLS:
|
||||
kind = "edit"
|
||||
elif name == "Bash" and _is_test_command(_stringify(inp.get("command", ""))):
|
||||
kind = "test_run"
|
||||
cmd = inp.get("command", "") if isinstance(inp, dict) else ""
|
||||
kind = classify_tool(name, _stringify(cmd))
|
||||
add_event(uuid, parent, ts, kind, role="assistant", tool=name,
|
||||
summary=f"{name}", body=body, sidechain=sidechain)
|
||||
|
||||
@@ -194,35 +160,3 @@ def parse_session(path: str, repo_domain_map: Optional[dict[str, str]] = None) -
|
||||
discovered_at=_now(),
|
||||
)
|
||||
return Normalized(session=session, events=events, blobs=blobs)
|
||||
|
||||
|
||||
# ---- helpers ---------------------------------------------------------------
|
||||
|
||||
def _stringify(v: Any) -> str:
|
||||
if v is None:
|
||||
return ""
|
||||
if isinstance(v, str):
|
||||
return v
|
||||
try:
|
||||
return json.dumps(v, ensure_ascii=False)[:20000]
|
||||
except (TypeError, ValueError):
|
||||
return str(v)[:20000]
|
||||
|
||||
|
||||
def _first_line(text: str) -> str:
|
||||
return (text or "").strip().splitlines()[0] if (text or "").strip() else ""
|
||||
|
||||
|
||||
def _seconds_between(start: Optional[str], end: Optional[str]) -> float:
|
||||
if not start or not end:
|
||||
return 0.0
|
||||
try:
|
||||
a = datetime.fromisoformat(start.replace("Z", "+00:00"))
|
||||
b = datetime.fromisoformat(end.replace("Z", "+00:00"))
|
||||
return max(0.0, (b - a).total_seconds())
|
||||
except ValueError:
|
||||
return 0.0
|
||||
|
||||
|
||||
def _now() -> str:
|
||||
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
|
||||
167
session_memory/adapters/codex.py
Normal file
167
session_memory/adapters/codex.py
Normal file
@@ -0,0 +1,167 @@
|
||||
"""OpenAI Codex CLI collector adapter — Tier 0 -> Tier 1 (design §2.2, §4.3).
|
||||
|
||||
Reads ``$CODEX_HOME/sessions/YYYY/MM/DD/rollout-*.jsonl``. Each line is a
|
||||
``RolloutLine`` wrapper ``{timestamp, type, payload}``; ``type`` discriminates
|
||||
``session_meta`` / ``response_item`` / ``event_msg`` / ``turn_context`` /
|
||||
``compacted``.
|
||||
|
||||
Codex is **flat** — tool calls and outputs are joined only by ``call_id`` with no
|
||||
parent-ref DAG — so ``seq`` is assigned by temporal (line) order and
|
||||
``parent_seq`` is set for ``function_call_output`` back to its ``function_call``.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from typing import Any, Optional
|
||||
|
||||
from ..core.schema import Cost, Session, SessionEvent
|
||||
from .common import (
|
||||
Normalized,
|
||||
classify_tool,
|
||||
first_line,
|
||||
iter_jsonl,
|
||||
now_iso,
|
||||
resolve_repo,
|
||||
seconds_between,
|
||||
stringify,
|
||||
)
|
||||
|
||||
FLAVOR = "codex"
|
||||
|
||||
|
||||
def _message_text(payload: dict[str, Any]) -> str:
|
||||
content = payload.get("content")
|
||||
if isinstance(content, str):
|
||||
return content
|
||||
parts = []
|
||||
if isinstance(content, list):
|
||||
for b in content:
|
||||
if isinstance(b, dict):
|
||||
parts.append(b.get("text") or b.get("output_text") or "")
|
||||
elif isinstance(b, str):
|
||||
parts.append(b)
|
||||
return "\n".join(p for p in parts if p)
|
||||
|
||||
|
||||
def _extract_tokens(payload: dict[str, Any]) -> tuple[int, int, int]:
|
||||
"""Best-effort (input, output, cache) from a token_count payload.
|
||||
|
||||
Field shapes vary across Codex versions; probe known locations, else recurse.
|
||||
"""
|
||||
for scope in (payload, payload.get("info") or {}, payload.get("usage") or {},
|
||||
(payload.get("info") or {}).get("total_token_usage") or {}):
|
||||
if isinstance(scope, dict):
|
||||
i = scope.get("input_tokens") or scope.get("prompt_tokens")
|
||||
o = scope.get("output_tokens") or scope.get("completion_tokens")
|
||||
if i is not None or o is not None:
|
||||
cache = scope.get("cached_input_tokens") or scope.get("cache_read_input_tokens") or 0
|
||||
return int(i or 0), int(o or 0), int(cache or 0)
|
||||
return 0, 0, 0
|
||||
|
||||
|
||||
def parse_session(path: str, repo_domain_map: Optional[dict[str, str]] = None) -> Optional[Normalized]:
|
||||
repo_domain_map = repo_domain_map or {}
|
||||
records = list(iter_jsonl(path))
|
||||
if not records:
|
||||
return None
|
||||
|
||||
session_id: Optional[str] = None
|
||||
cwd = model = cli_version = None
|
||||
timestamps: list[str] = []
|
||||
events: list[SessionEvent] = []
|
||||
blobs: dict[str, str] = {}
|
||||
call_seq: dict[str, int] = {} # call_id -> seq of its function_call
|
||||
cost = Cost()
|
||||
seq = 0
|
||||
|
||||
def add_event(ts, kind, *, role=None, tool=None, summary=None, body=None,
|
||||
tokens=0, parent_seq=None) -> int:
|
||||
nonlocal seq
|
||||
s = seq
|
||||
seq += 1
|
||||
payload_ref = None
|
||||
if body:
|
||||
payload_ref = f"blob://{session_id}/{s}"
|
||||
blobs[payload_ref] = body
|
||||
events.append(SessionEvent(
|
||||
session_uid=Session.make_uid(FLAVOR, session_id or "unknown"),
|
||||
seq=s, parent_seq=parent_seq, ts=ts, kind=kind, role=role, tool=tool,
|
||||
summary=(summary or "")[:300] or None, payload_ref=payload_ref, tokens=tokens,
|
||||
))
|
||||
return s
|
||||
|
||||
for rec in records:
|
||||
rtype = rec.get("type")
|
||||
ts = rec.get("timestamp")
|
||||
if ts:
|
||||
timestamps.append(ts)
|
||||
payload = rec.get("payload") or {}
|
||||
|
||||
if rtype == "session_meta":
|
||||
session_id = session_id or payload.get("id")
|
||||
cwd = cwd or payload.get("cwd")
|
||||
model = model or payload.get("model")
|
||||
cli_version = cli_version or payload.get("cli_version")
|
||||
|
||||
elif rtype == "turn_context":
|
||||
model = model or payload.get("model")
|
||||
|
||||
elif rtype == "response_item":
|
||||
ptype = payload.get("type")
|
||||
if ptype == "message":
|
||||
role = payload.get("role", "assistant")
|
||||
text = _message_text(payload)
|
||||
kind = "assistant_msg" if role == "assistant" else "user_msg"
|
||||
add_event(ts, kind, role=role, summary=first_line(text), body=text)
|
||||
elif ptype == "function_call":
|
||||
name = payload.get("name", "")
|
||||
args = stringify(payload.get("arguments"))
|
||||
kind = classify_tool(name, args)
|
||||
s = add_event(ts, kind, role="assistant", tool=name,
|
||||
summary=name, body=args)
|
||||
call_id = payload.get("call_id")
|
||||
if call_id:
|
||||
call_seq[call_id] = s
|
||||
elif ptype == "function_call_output":
|
||||
call_id = payload.get("call_id")
|
||||
parent = call_seq.get(call_id)
|
||||
body = stringify(payload.get("output"))
|
||||
add_event(ts, "tool_result", role="tool", tool=None,
|
||||
summary="tool result", body=body, parent_seq=parent)
|
||||
elif ptype == "reasoning":
|
||||
body = _message_text(payload) or stringify(payload.get("summary"))
|
||||
add_event(ts, "thinking", role="assistant", summary="reasoning", body=body)
|
||||
|
||||
elif rtype == "event_msg":
|
||||
ptype = payload.get("type")
|
||||
if ptype == "task_started":
|
||||
add_event(ts, "lifecycle", summary="task_started")
|
||||
elif ptype == "task_complete":
|
||||
add_event(ts, "completion", summary="task_complete")
|
||||
elif ptype == "token_count":
|
||||
i, o, c = _extract_tokens(payload)
|
||||
cost.input_tokens += i
|
||||
cost.output_tokens += o
|
||||
cost.cache_tokens += c
|
||||
# user_message / agent_message echoes are duplicated by response_item
|
||||
# messages on modern Codex; skipped to avoid double counting.
|
||||
|
||||
if session_id is None:
|
||||
return None
|
||||
|
||||
cost.turns = sum(1 for e in events if e.kind == "user_msg")
|
||||
started = min(timestamps) if timestamps else None
|
||||
ended = max(timestamps) if timestamps else None
|
||||
cost.wall_clock_s = seconds_between(started, ended)
|
||||
|
||||
repo, domain = resolve_repo(cwd, repo_domain_map)
|
||||
session = Session(
|
||||
session_uid=Session.make_uid(FLAVOR, session_id),
|
||||
flavor=FLAVOR, native_session_id=session_id,
|
||||
repo=repo, domain=domain, cwd=cwd, model=model,
|
||||
started_at=started, ended_at=ended, outcome="unknown", cost=cost,
|
||||
source_path=path, source_bytes=os.path.getsize(path) if os.path.exists(path) else 0,
|
||||
discovered_at=now_iso(),
|
||||
)
|
||||
return Normalized(session=session, events=events, blobs=blobs)
|
||||
100
session_memory/adapters/common.py
Normal file
100
session_memory/adapters/common.py
Normal file
@@ -0,0 +1,100 @@
|
||||
"""Shared adapter helpers (Tier 0 -> Tier 1).
|
||||
|
||||
The ``Normalized`` bundle contract and small flavor-agnostic helpers used by every
|
||||
collector adapter. Per-flavor parsing lives in the individual adapter modules.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Optional
|
||||
|
||||
from ..core.schema import Session, SessionEvent
|
||||
|
||||
# tool names that mutate files -> kind "edit" (union across flavors)
|
||||
EDIT_TOOLS = {
|
||||
"Edit", "Write", "NotebookEdit", "MultiEdit", # Claude
|
||||
"apply_patch", "write_file", "edit_file", # Codex / Grok variants
|
||||
}
|
||||
# substrings in a shell/tool command that indicate a test run -> kind "test_run"
|
||||
TEST_HINTS = (
|
||||
"pytest", "unittest", "npm test", "npm run test", "go test",
|
||||
"cargo test", "jest", "vitest", "make test", "tox",
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Normalized:
|
||||
session: Session
|
||||
events: list[SessionEvent]
|
||||
blobs: dict[str, str] = field(default_factory=dict)
|
||||
|
||||
|
||||
def resolve_repo(cwd: Optional[str], repo_domain_map: dict[str, str]) -> tuple[Optional[str], Optional[str]]:
|
||||
"""cwd -> (repo, domain). repo is the cwd basename; domain via map."""
|
||||
if not cwd:
|
||||
return None, None
|
||||
repo = os.path.basename(cwd.rstrip("/")) or None
|
||||
domain = repo_domain_map.get(repo) if repo else None
|
||||
return repo, domain
|
||||
|
||||
|
||||
def is_test_command(text: str) -> bool:
|
||||
low = (text or "").lower()
|
||||
return any(h in low for h in TEST_HINTS)
|
||||
|
||||
|
||||
def classify_tool(name: str, command_text: str = "") -> str:
|
||||
"""Map a tool invocation to an event kind: edit | test_run | tool_call."""
|
||||
if name in EDIT_TOOLS:
|
||||
return "edit"
|
||||
if is_test_command(command_text) or is_test_command(name):
|
||||
return "test_run"
|
||||
return "tool_call"
|
||||
|
||||
|
||||
def stringify(v: Any, limit: int = 20000) -> str:
|
||||
if v is None:
|
||||
return ""
|
||||
if isinstance(v, str):
|
||||
return v[:limit]
|
||||
try:
|
||||
return json.dumps(v, ensure_ascii=False)[:limit]
|
||||
except (TypeError, ValueError):
|
||||
return str(v)[:limit]
|
||||
|
||||
|
||||
def first_line(text: str) -> str:
|
||||
t = (text or "").strip()
|
||||
return t.splitlines()[0] if t else ""
|
||||
|
||||
|
||||
def seconds_between(start: Optional[str], end: Optional[str]) -> float:
|
||||
if not start or not end:
|
||||
return 0.0
|
||||
try:
|
||||
a = datetime.fromisoformat(start.replace("Z", "+00:00"))
|
||||
b = datetime.fromisoformat(end.replace("Z", "+00:00"))
|
||||
return max(0.0, (b - a).total_seconds())
|
||||
except ValueError:
|
||||
return 0.0
|
||||
|
||||
|
||||
def iter_jsonl(path: str):
|
||||
"""Yield parsed JSON objects from a JSONL file, tolerating bad lines."""
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
yield json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
|
||||
def now_iso() -> str:
|
||||
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
@@ -12,6 +12,7 @@ Tier 2 digest — the invariant that makes budget-based retention non-lossy.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
@@ -28,6 +29,18 @@ def _now() -> str:
|
||||
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
|
||||
|
||||
def _fingerprint(ev: SessionEvent, body: Optional[str]) -> str:
|
||||
"""Stable content fingerprint, independent of seq/payload_ref, for dedup."""
|
||||
h = hashlib.sha1()
|
||||
parts = [ev.ts or "", ev.kind, ev.role or "", ev.tool or "", ev.summary or "",
|
||||
ev.role or "", str(ev.is_sidechain)]
|
||||
h.update("\x1f".join(parts).encode("utf-8"))
|
||||
if body is not None:
|
||||
h.update(b"\x1e")
|
||||
h.update(body.encode("utf-8"))
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
class Store:
|
||||
def __init__(self, db_path: str, blob_dir: str):
|
||||
self.db_path = db_path
|
||||
@@ -121,14 +134,75 @@ class Store:
|
||||
self.db.commit()
|
||||
return total
|
||||
|
||||
def ingest(self, bundle) -> None:
|
||||
"""Persist a full Normalized bundle (session + events + blobs)."""
|
||||
def ingest(self, bundle) -> int:
|
||||
"""Persist a Normalized bundle, merging into any existing session.
|
||||
|
||||
Multiple files can map to one ``session_uid`` (Claude resume/sidechains;
|
||||
Grok multi-file dirs). Events are de-duplicated by content fingerprint and
|
||||
genuinely-new events are appended with offset ``seq`` (design OQ6 / T03).
|
||||
Returns the number of new events written. Idempotent: re-ingesting the
|
||||
same bundle adds nothing.
|
||||
"""
|
||||
s = bundle.session
|
||||
if s.ingested_at is None:
|
||||
s.ingested_at = _now()
|
||||
self.upsert_session(s)
|
||||
self.upsert_events(bundle.events)
|
||||
self.write_blobs(s.session_uid, bundle.blobs)
|
||||
existing = self.get_session(s.session_uid)
|
||||
if existing is None:
|
||||
if s.ingested_at is None:
|
||||
s.ingested_at = _now()
|
||||
self.upsert_session(s)
|
||||
# known fingerprints + current max seq for this session
|
||||
seen = self._event_fingerprints(s.session_uid)
|
||||
next_seq = self._max_seq(s.session_uid) + 1
|
||||
|
||||
new_events: list[SessionEvent] = []
|
||||
new_blobs: dict[str, str] = {}
|
||||
old_to_new: dict[int, int] = {}
|
||||
for ev in bundle.events:
|
||||
body = bundle.blobs.get(ev.payload_ref) if ev.payload_ref else None
|
||||
fp = _fingerprint(ev, body)
|
||||
if fp in seen:
|
||||
continue # already stored (prior file or prior sweep)
|
||||
new_seq = next_seq
|
||||
next_seq += 1
|
||||
old_to_new[ev.seq] = new_seq
|
||||
# remap parent within this bundle; cross-file parents become None
|
||||
parent = old_to_new.get(ev.parent_seq) if ev.parent_seq is not None else None
|
||||
ref = None
|
||||
if body is not None:
|
||||
ref = f"blob://{s.session_uid}/{new_seq}"
|
||||
new_blobs[ref] = body
|
||||
merged = SessionEvent(
|
||||
session_uid=s.session_uid, seq=new_seq, parent_seq=parent, ts=ev.ts,
|
||||
kind=ev.kind, role=ev.role, tool=ev.tool, summary=ev.summary,
|
||||
payload_ref=ref, tokens=ev.tokens, is_sidechain=ev.is_sidechain,
|
||||
)
|
||||
new_events.append(merged)
|
||||
seen.add(fp)
|
||||
|
||||
if new_events:
|
||||
self.upsert_events(new_events)
|
||||
self.write_blobs(s.session_uid, new_blobs)
|
||||
return len(new_events)
|
||||
|
||||
def _max_seq(self, session_uid: str) -> int:
|
||||
row = self.db.execute(
|
||||
"SELECT COALESCE(MAX(seq), -1) m FROM events WHERE session_uid=?", (session_uid,)
|
||||
).fetchone()
|
||||
return int(row["m"])
|
||||
|
||||
def _event_fingerprints(self, session_uid: str) -> set[str]:
|
||||
fps: set[str] = set()
|
||||
for e in self.get_events(session_uid):
|
||||
body = None
|
||||
if e.payload_ref:
|
||||
r = self.db.execute("SELECT path FROM blobs WHERE ref=?", (e.payload_ref,)).fetchone()
|
||||
if r:
|
||||
try:
|
||||
with open(r["path"], "r", encoding="utf-8") as f:
|
||||
body = f.read()
|
||||
except OSError:
|
||||
body = None
|
||||
fps.add(_fingerprint(e, body))
|
||||
return fps
|
||||
|
||||
# ---- Tier 2 (digest) ---------------------------------------------------
|
||||
|
||||
|
||||
@@ -19,13 +19,17 @@ from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
from .adapters import claude as claude_adapter
|
||||
from .adapters import codex as codex_adapter
|
||||
from .core import digest as digest_mod
|
||||
from .core.cursor import Cursors
|
||||
from .core.retention import RetentionConfig, sweep as retention_sweep
|
||||
from .core.store import Store
|
||||
|
||||
# adapter dispatch by source name
|
||||
_ADAPTERS = {"claude": claude_adapter.parse_session}
|
||||
_ADAPTERS = {
|
||||
"claude": claude_adapter.parse_session,
|
||||
"codex": codex_adapter.parse_session,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
Reference in New Issue
Block a user