session-memory Phase 1: Grok adapter (T02)

- adapters/grok.py: reads the per-session dir (summary.json + chat_history.jsonl
  + events.jsonl + updates.jsonl); conversation from chat_history, lifecycle/
  turn from events, tool-call names paired in order from updates ACP stream
- registered in ingest dispatch; codex+grok sources enabled in config.toml
- tests/test_grok_adapter.py (synthetic + real local sessions)
- live multi-flavor dry-run discovers 89 sessions across flavors

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-06 22:12:30 +02:00
parent bc11cb9aec
commit 06767ef924
5 changed files with 282 additions and 4 deletions

View File

@@ -0,0 +1,182 @@
"""Grok CLI collector adapter — Tier 0 -> Tier 1 (design §2.3, §4.3).
A Grok session is a *directory* ``~/.grok/sessions/<enc-cwd>/<uuid>/`` containing
``summary.json`` (metadata), ``chat_history.jsonl`` (the canonical transcript),
``events.jsonl`` (explicit lifecycle + ``turn_number``), and ``updates.jsonl``
(ACP ``session/update`` stream, which carries tool-call names/args).
The ingest glob matches ``chat_history.jsonl``; this adapter derives its sibling
files from the same directory. Conversation order is taken from
``chat_history.jsonl``; tool-call names are paired, in order, from
``updates.jsonl`` ``tool_call`` entries to classify edits/test runs.
"""
from __future__ import annotations
import json
import os
from typing import Any, Optional
from ..core.schema import Cost, Session, SessionEvent
from .common import (
Normalized,
classify_tool,
first_line,
iter_jsonl,
now_iso,
resolve_repo,
seconds_between,
stringify,
)
FLAVOR = "grok"
def _text_content(content: Any) -> str:
if isinstance(content, str):
return content
if isinstance(content, list):
return "\n".join(
(b.get("text") or "") for b in content if isinstance(b, dict)
)
return ""
def _tool_calls_in_order(session_dir: str) -> list[dict[str, Any]]:
"""Ordered list of {title, rawInput} from updates.jsonl tool_call entries."""
calls: list[dict[str, Any]] = []
upd = os.path.join(session_dir, "updates.jsonl")
if not os.path.exists(upd):
return calls
for rec in iter_jsonl(upd):
u = (rec.get("params") or {}).get("update") or {}
if u.get("sessionUpdate") == "tool_call":
calls.append({"title": u.get("title") or "", "rawInput": u.get("rawInput") or {},
"id": u.get("toolCallId")})
return calls
def _session_meta(session_dir: str) -> dict[str, Any]:
p = os.path.join(session_dir, "summary.json")
if not os.path.exists(p):
return {}
try:
with open(p, "r", encoding="utf-8") as f:
return json.load(f)
except (OSError, ValueError):
return {}
def _lifecycle(session_dir: str) -> tuple[list[dict[str, Any]], Optional[str]]:
"""events.jsonl records + the model id seen there."""
evs, model = [], None
p = os.path.join(session_dir, "events.jsonl")
if os.path.exists(p):
for rec in iter_jsonl(p):
evs.append(rec)
model = model or rec.get("model_id")
return evs, model
def parse_session(path: str, repo_domain_map: Optional[dict[str, str]] = None) -> Optional[Normalized]:
repo_domain_map = repo_domain_map or {}
# accept either the chat_history.jsonl path or the session dir
session_dir = path if os.path.isdir(path) else os.path.dirname(path)
chat = os.path.join(session_dir, "chat_history.jsonl")
if not os.path.exists(chat):
return None
meta = _session_meta(session_dir)
info = meta.get("info") or {}
session_id = info.get("id") or os.path.basename(session_dir.rstrip("/"))
cwd = info.get("cwd") or meta.get("git_root_dir")
life_events, life_model = _lifecycle(session_dir)
model = meta.get("current_model_id") or life_model
pending_calls = _tool_calls_in_order(session_dir)
call_idx = 0
events: list[SessionEvent] = []
blobs: dict[str, str] = {}
seq = 0
def add(kind, *, role=None, tool=None, summary=None, body=None, parent_seq=None) -> int:
nonlocal seq
s = seq
seq += 1
ref = None
if body:
ref = f"blob://{session_id}/{s}"
blobs[ref] = body
events.append(SessionEvent(
session_uid=Session.make_uid(FLAVOR, session_id), seq=s, parent_seq=parent_seq,
ts=None, kind=kind, role=role, tool=tool,
summary=(summary or "")[:300] or None, payload_ref=ref,
))
return s
# explicit lifecycle first (turn_started/turn_ended carry no bodies)
for le in life_events:
t = le.get("type")
if t in ("turn_started", "loop_started", "turn_ended", "phase_changed"):
add("lifecycle", summary=t)
for rec in iter_jsonl(chat):
rtype = rec.get("type")
content = rec.get("content")
if rtype == "user":
text = _text_content(content)
if text.strip():
add("user_msg", role="user", summary=first_line(text), body=text)
elif rtype == "reasoning":
text = _text_content(content)
if text.strip():
add("thinking", role="assistant", summary="reasoning", body=text)
elif rtype == "assistant":
text = _text_content(content)
if text.strip():
add("assistant_msg", role="assistant", summary=first_line(text), body=text)
elif rtype == "tool_result":
# pair with the next tool_call (in order) to recover name/args
tool = None
parent = None
if call_idx < len(pending_calls):
call = pending_calls[call_idx]
call_idx += 1
tool = call["title"]
cmd = stringify(call["rawInput"])
kind = classify_tool(tool, cmd)
parent = add(kind, role="assistant", tool=tool, summary=tool, body=cmd)
body = _text_content(content) if not isinstance(content, str) else content
add("tool_result", role="tool", tool=tool, summary="tool result",
body=stringify(body), parent_seq=parent)
if not events:
return None
cost = Cost(turns=sum(1 for e in events if e.kind == "user_msg"))
started = info.get("created_at") or meta.get("created_at")
ended = meta.get("last_active_at") or info.get("updated_at") or meta.get("updated_at")
cost.wall_clock_s = seconds_between(started, ended)
repo, domain = resolve_repo(cwd, repo_domain_map)
session = Session(
session_uid=Session.make_uid(FLAVOR, session_id), flavor=FLAVOR,
native_session_id=session_id, repo=repo, domain=domain, cwd=cwd,
git_branch=meta.get("head_branch"), model=model,
started_at=started, ended_at=ended, outcome="unknown", cost=cost,
source_path=chat,
source_bytes=_dir_bytes(session_dir),
discovered_at=now_iso(),
)
return Normalized(session=session, events=events, blobs=blobs)
def _dir_bytes(d: str) -> int:
total = 0
for root, _, files in os.walk(d):
for f in files:
try:
total += os.path.getsize(os.path.join(root, f))
except OSError:
pass
return total

View File

@@ -20,14 +20,14 @@ root = "~/.claude/projects"
# glob, relative to root; covers sessions and agent-* sidechains
glob = "*/*.jsonl"
# Codex / Grok adapters land in Phase 1 (schemas confirmed in the design doc).
# Codex / Grok adapters added in Phase 1 (AGENTIC-WP-0003).
[sources.codex]
enabled = false
enabled = true
root = "~/.codex/sessions"
glob = "*/*/*/rollout-*.jsonl"
[sources.grok]
enabled = false
enabled = true
root = "~/.grok/sessions"
glob = "*/*/chat_history.jsonl"
@@ -37,3 +37,5 @@ agentic-resources = "helix_forge"
the-custodian = "custodian"
state-hub = "custodian"
ops-bridge = "custodian"
net-kingdom = "netkingdom"
can-you-assist = "coulomb_social"

View File

@@ -20,6 +20,7 @@ from typing import Any
from .adapters import claude as claude_adapter
from .adapters import codex as codex_adapter
from .adapters import grok as grok_adapter
from .core import digest as digest_mod
from .core.cursor import Cursors
from .core.retention import RetentionConfig, sweep as retention_sweep
@@ -29,6 +30,7 @@ from .core.store import Store
_ADAPTERS = {
"claude": claude_adapter.parse_session,
"codex": codex_adapter.parse_session,
"grok": grok_adapter.parse_session,
}