Files
tegwick bc11cb9aec session-memory Phase 1: Codex adapter (T01) + multi-file merge (T03)
- adapters/common.py: shared Normalized + helpers (resolve_repo, classify_tool,
  jsonl iter, etc.); claude.py refactored to use it (Normalized re-exported)
- adapters/codex.py: rollout {timestamp,type,payload} parser; session_meta/
  response_item/event_msg mapping; flat call_id join; token_count cost;
  registered in ingest dispatch
- core/store.py: ingest() now merges multi-file sessions by content
  fingerprint, appends new events with offset seq (design OQ6); idempotent
- tests/test_codex_adapter.py, tests/test_merge.py

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-06 21:55:32 +02:00

163 lines
6.5 KiB
Python

"""Claude Code collector adapter — Tier 0 -> Tier 1 (design §2.1, §4.3).
Reads ``~/.claude/projects/<url-encoded-cwd>/<session-uuid>.jsonl`` (and
``agent-*.jsonl`` sidechains), discriminates on the record ``type``, reconstructs
the turn DAG via ``uuid``/``parentUuid``, and emits normalized records.
Returns a :class:`Normalized` bundle: the ``Session``, its ordered
``SessionEvent`` list, and a ``blobs`` map (``payload_ref -> full text body``)
that the store persists out-of-line so Tier 1 rows stay light.
"""
from __future__ import annotations
import os
from typing import Any, Optional
from ..core.schema import Cost, Session, SessionEvent
from .common import ( # noqa: F401 (Normalized re-exported for back-compat)
Normalized,
classify_tool,
first_line as _first_line,
iter_jsonl as _iter_records,
now_iso as _now,
resolve_repo as _resolve_repo,
seconds_between as _seconds_between,
stringify as _stringify,
)
FLAVOR = "claude"
def _content_blocks(message: dict[str, Any]) -> list[dict[str, Any]]:
content = message.get("content")
if isinstance(content, str):
return [{"type": "text", "text": content}]
if isinstance(content, list):
return [b for b in content if isinstance(b, dict)]
return []
def parse_session(path: str, repo_domain_map: Optional[dict[str, str]] = None) -> Optional[Normalized]:
"""Parse one Claude transcript file into a Normalized bundle.
Returns None if the file has no usable session records.
"""
repo_domain_map = repo_domain_map or {}
records = list(_iter_records(path))
if not records:
return None
session_id: Optional[str] = None
cwd = git_branch = version = model = None
timestamps: list[str] = []
file_is_sidechain = os.path.basename(path).startswith("agent-")
events: list[SessionEvent] = []
blobs: dict[str, str] = {}
uuid_to_seq: dict[str, int] = {}
cost = Cost()
seq = 0
def add_event(uuid: Optional[str], parent_uuid: Optional[str], ts, kind, *,
role=None, tool=None, summary=None, body=None, tokens=0, sidechain=False):
nonlocal seq
s = seq
seq += 1
if uuid:
uuid_to_seq[uuid] = s
parent_seq = uuid_to_seq.get(parent_uuid) if parent_uuid else None
payload_ref = None
if body:
payload_ref = f"blob://{session_id}/{s}"
blobs[payload_ref] = body
events.append(SessionEvent(
session_uid=Session.make_uid(FLAVOR, session_id or "unknown"),
seq=s, parent_seq=parent_seq, ts=ts, kind=kind, role=role, tool=tool,
summary=(summary or "")[:300] or None, payload_ref=payload_ref,
tokens=tokens, is_sidechain=sidechain or file_is_sidechain,
))
for rec in records:
rtype = rec.get("type")
ts = rec.get("timestamp")
if ts:
timestamps.append(ts)
session_id = session_id or rec.get("sessionId")
cwd = cwd or rec.get("cwd")
git_branch = git_branch or rec.get("gitBranch")
version = version or rec.get("version")
uuid = rec.get("uuid")
parent = rec.get("parentUuid")
sidechain = bool(rec.get("isSidechain"))
if rtype == "user":
msg = rec.get("message", {})
for b in _content_blocks(msg):
bt = b.get("type")
if bt == "tool_result":
body = _stringify(b.get("content"))
add_event(uuid, parent, ts, "tool_result", role="tool",
summary="tool result", body=body, sidechain=sidechain)
else:
text = b.get("text", "")
add_event(uuid, parent, ts, "user_msg", role="user",
summary=_first_line(text), body=text, sidechain=sidechain)
elif rtype == "assistant":
msg = rec.get("message", {})
model = model or msg.get("model")
usage = msg.get("usage") or {}
cost.input_tokens += int(usage.get("input_tokens", 0) or 0)
cost.output_tokens += int(usage.get("output_tokens", 0) or 0)
cost.cache_tokens += int(
(usage.get("cache_read_input_tokens", 0) or 0)
+ (usage.get("cache_creation_input_tokens", 0) or 0)
)
out_tokens = int(usage.get("output_tokens", 0) or 0)
for b in _content_blocks(msg):
bt = b.get("type")
if bt == "thinking":
add_event(uuid, parent, ts, "thinking", role="assistant",
summary="thinking", body=b.get("thinking", ""), sidechain=sidechain)
elif bt == "text":
text = b.get("text", "")
add_event(uuid, parent, ts, "assistant_msg", role="assistant",
summary=_first_line(text), body=text, tokens=out_tokens, sidechain=sidechain)
elif bt == "tool_use":
name = b.get("name", "")
inp = b.get("input", {})
body = _stringify(inp)
cmd = inp.get("command", "") if isinstance(inp, dict) else ""
kind = classify_tool(name, _stringify(cmd))
add_event(uuid, parent, ts, kind, role="assistant", tool=name,
summary=f"{name}", body=body, sidechain=sidechain)
elif rtype == "summary":
add_event(uuid, parent, ts, "lifecycle", summary="summary",
body=_stringify(rec.get("summary")), sidechain=sidechain)
# queue-operation / ai-title / last-prompt / attachment: skipped as events
if session_id is None:
return None
cost.turns = sum(1 for e in events if e.kind == "user_msg")
started = min(timestamps) if timestamps else None
ended = max(timestamps) if timestamps else None
cost.wall_clock_s = _seconds_between(started, ended)
repo, domain = _resolve_repo(cwd, repo_domain_map)
session = Session(
session_uid=Session.make_uid(FLAVOR, session_id),
flavor=FLAVOR,
native_session_id=session_id,
repo=repo, domain=domain, cwd=cwd, git_branch=git_branch,
model=model, started_at=started, ended_at=ended,
outcome="unknown", # outcome inference happens in the digest step (T04)
cost=cost,
source_path=path,
source_bytes=os.path.getsize(path) if os.path.exists(path) else 0,
discovered_at=_now(),
)
return Normalized(session=session, events=events, blobs=blobs)