"""Claude Code collector adapter — Tier 0 -> Tier 1 (design §2.1, §4.3). Reads ``~/.claude/projects//.jsonl`` (and ``agent-*.jsonl`` sidechains), discriminates on the record ``type``, reconstructs the turn DAG via ``uuid``/``parentUuid``, and emits normalized records. Returns a :class:`Normalized` bundle: the ``Session``, its ordered ``SessionEvent`` list, and a ``blobs`` map (``payload_ref -> full text body``) that the store persists out-of-line so Tier 1 rows stay light. """ from __future__ import annotations import json import os from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Any, Iterable, Optional from ..core.schema import Cost, Session, SessionEvent FLAVOR = "claude" # tool_use names that mutate files -> kind "edit" _EDIT_TOOLS = {"Edit", "Write", "NotebookEdit", "MultiEdit"} # crude test-runner detection inside Bash commands -> kind "test_run" _TEST_HINTS = ("pytest", "unittest", "npm test", "npm run test", "go test", "cargo test", "jest", "vitest") @dataclass class Normalized: session: Session events: list[SessionEvent] blobs: dict[str, str] = field(default_factory=dict) def _iter_records(path: str) -> Iterable[dict[str, Any]]: with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: yield json.loads(line) except json.JSONDecodeError: continue # tolerate partial/corrupt trailing lines def _resolve_repo(cwd: Optional[str], repo_domain_map: dict[str, str]) -> tuple[Optional[str], Optional[str]]: """cwd -> (repo, domain). repo is the cwd basename; domain via map.""" if not cwd: return None, None repo = os.path.basename(cwd.rstrip("/")) or None domain = repo_domain_map.get(repo) if repo else None return repo, domain def _is_test_command(text: str) -> bool: low = text.lower() return any(h in low for h in _TEST_HINTS) def _content_blocks(message: dict[str, Any]) -> list[dict[str, Any]]: content = message.get("content") if isinstance(content, str): return [{"type": "text", "text": content}] if isinstance(content, list): return [b for b in content if isinstance(b, dict)] return [] def parse_session(path: str, repo_domain_map: Optional[dict[str, str]] = None) -> Optional[Normalized]: """Parse one Claude transcript file into a Normalized bundle. Returns None if the file has no usable session records. """ repo_domain_map = repo_domain_map or {} records = list(_iter_records(path)) if not records: return None session_id: Optional[str] = None cwd = git_branch = version = model = None timestamps: list[str] = [] file_is_sidechain = os.path.basename(path).startswith("agent-") events: list[SessionEvent] = [] blobs: dict[str, str] = {} uuid_to_seq: dict[str, int] = {} cost = Cost() seq = 0 def add_event(uuid: Optional[str], parent_uuid: Optional[str], ts, kind, *, role=None, tool=None, summary=None, body=None, tokens=0, sidechain=False): nonlocal seq s = seq seq += 1 if uuid: uuid_to_seq[uuid] = s parent_seq = uuid_to_seq.get(parent_uuid) if parent_uuid else None payload_ref = None if body: payload_ref = f"blob://{session_id}/{s}" blobs[payload_ref] = body events.append(SessionEvent( session_uid=Session.make_uid(FLAVOR, session_id or "unknown"), seq=s, parent_seq=parent_seq, ts=ts, kind=kind, role=role, tool=tool, summary=(summary or "")[:300] or None, payload_ref=payload_ref, tokens=tokens, is_sidechain=sidechain or file_is_sidechain, )) for rec in records: rtype = rec.get("type") ts = rec.get("timestamp") if ts: timestamps.append(ts) session_id = session_id or rec.get("sessionId") cwd = cwd or rec.get("cwd") git_branch = git_branch or rec.get("gitBranch") version = version or rec.get("version") uuid = rec.get("uuid") parent = rec.get("parentUuid") sidechain = bool(rec.get("isSidechain")) if rtype == "user": msg = rec.get("message", {}) for b in _content_blocks(msg): bt = b.get("type") if bt == "tool_result": body = _stringify(b.get("content")) add_event(uuid, parent, ts, "tool_result", role="tool", summary="tool result", body=body, sidechain=sidechain) else: text = b.get("text", "") add_event(uuid, parent, ts, "user_msg", role="user", summary=_first_line(text), body=text, sidechain=sidechain) elif rtype == "assistant": msg = rec.get("message", {}) model = model or msg.get("model") usage = msg.get("usage") or {} cost.input_tokens += int(usage.get("input_tokens", 0) or 0) cost.output_tokens += int(usage.get("output_tokens", 0) or 0) cost.cache_tokens += int( (usage.get("cache_read_input_tokens", 0) or 0) + (usage.get("cache_creation_input_tokens", 0) or 0) ) out_tokens = int(usage.get("output_tokens", 0) or 0) for b in _content_blocks(msg): bt = b.get("type") if bt == "thinking": add_event(uuid, parent, ts, "thinking", role="assistant", summary="thinking", body=b.get("thinking", ""), sidechain=sidechain) elif bt == "text": text = b.get("text", "") add_event(uuid, parent, ts, "assistant_msg", role="assistant", summary=_first_line(text), body=text, tokens=out_tokens, sidechain=sidechain) elif bt == "tool_use": name = b.get("name", "") inp = b.get("input", {}) body = _stringify(inp) kind = "tool_call" if name in _EDIT_TOOLS: kind = "edit" elif name == "Bash" and _is_test_command(_stringify(inp.get("command", ""))): kind = "test_run" add_event(uuid, parent, ts, kind, role="assistant", tool=name, summary=f"{name}", body=body, sidechain=sidechain) elif rtype == "summary": add_event(uuid, parent, ts, "lifecycle", summary="summary", body=_stringify(rec.get("summary")), sidechain=sidechain) # queue-operation / ai-title / last-prompt / attachment: skipped as events if session_id is None: return None cost.turns = sum(1 for e in events if e.kind == "user_msg") started = min(timestamps) if timestamps else None ended = max(timestamps) if timestamps else None cost.wall_clock_s = _seconds_between(started, ended) repo, domain = _resolve_repo(cwd, repo_domain_map) session = Session( session_uid=Session.make_uid(FLAVOR, session_id), flavor=FLAVOR, native_session_id=session_id, repo=repo, domain=domain, cwd=cwd, git_branch=git_branch, model=model, started_at=started, ended_at=ended, outcome="unknown", # outcome inference happens in the digest step (T04) cost=cost, source_path=path, source_bytes=os.path.getsize(path) if os.path.exists(path) else 0, discovered_at=_now(), ) return Normalized(session=session, events=events, blobs=blobs) # ---- helpers --------------------------------------------------------------- def _stringify(v: Any) -> str: if v is None: return "" if isinstance(v, str): return v try: return json.dumps(v, ensure_ascii=False)[:20000] except (TypeError, ValueError): return str(v)[:20000] def _first_line(text: str) -> str: return (text or "").strip().splitlines()[0] if (text or "").strip() else "" def _seconds_between(start: Optional[str], end: Optional[str]) -> float: if not start or not end: return 0.0 try: a = datetime.fromisoformat(start.replace("Z", "+00:00")) b = datetime.fromisoformat(end.replace("Z", "+00:00")) return max(0.0, (b - a).total_seconds()) except ValueError: return 0.0 def _now() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")