Files
tegwick bc11cb9aec session-memory Phase 1: Codex adapter (T01) + multi-file merge (T03)
- adapters/common.py: shared Normalized + helpers (resolve_repo, classify_tool,
  jsonl iter, etc.); claude.py refactored to use it (Normalized re-exported)
- adapters/codex.py: rollout {timestamp,type,payload} parser; session_meta/
  response_item/event_msg mapping; flat call_id join; token_count cost;
  registered in ingest dispatch
- core/store.py: ingest() now merges multi-file sessions by content
  fingerprint, appends new events with offset seq (design OQ6); idempotent
- tests/test_codex_adapter.py, tests/test_merge.py

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-06 21:55:32 +02:00

101 lines
2.9 KiB
Python

"""Shared adapter helpers (Tier 0 -> Tier 1).
The ``Normalized`` bundle contract and small flavor-agnostic helpers used by every
collector adapter. Per-flavor parsing lives in the individual adapter modules.
"""
from __future__ import annotations
import json
import os
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Optional
from ..core.schema import Session, SessionEvent
# tool names that mutate files -> kind "edit" (union across flavors)
EDIT_TOOLS = {
"Edit", "Write", "NotebookEdit", "MultiEdit", # Claude
"apply_patch", "write_file", "edit_file", # Codex / Grok variants
}
# substrings in a shell/tool command that indicate a test run -> kind "test_run"
TEST_HINTS = (
"pytest", "unittest", "npm test", "npm run test", "go test",
"cargo test", "jest", "vitest", "make test", "tox",
)
@dataclass
class Normalized:
session: Session
events: list[SessionEvent]
blobs: dict[str, str] = field(default_factory=dict)
def resolve_repo(cwd: Optional[str], repo_domain_map: dict[str, str]) -> tuple[Optional[str], Optional[str]]:
"""cwd -> (repo, domain). repo is the cwd basename; domain via map."""
if not cwd:
return None, None
repo = os.path.basename(cwd.rstrip("/")) or None
domain = repo_domain_map.get(repo) if repo else None
return repo, domain
def is_test_command(text: str) -> bool:
low = (text or "").lower()
return any(h in low for h in TEST_HINTS)
def classify_tool(name: str, command_text: str = "") -> str:
"""Map a tool invocation to an event kind: edit | test_run | tool_call."""
if name in EDIT_TOOLS:
return "edit"
if is_test_command(command_text) or is_test_command(name):
return "test_run"
return "tool_call"
def stringify(v: Any, limit: int = 20000) -> str:
if v is None:
return ""
if isinstance(v, str):
return v[:limit]
try:
return json.dumps(v, ensure_ascii=False)[:limit]
except (TypeError, ValueError):
return str(v)[:limit]
def first_line(text: str) -> str:
t = (text or "").strip()
return t.splitlines()[0] if t else ""
def seconds_between(start: Optional[str], end: Optional[str]) -> float:
if not start or not end:
return 0.0
try:
a = datetime.fromisoformat(start.replace("Z", "+00:00"))
b = datetime.fromisoformat(end.replace("Z", "+00:00"))
return max(0.0, (b - a).total_seconds())
except ValueError:
return 0.0
def iter_jsonl(path: str):
"""Yield parsed JSON objects from a JSONL file, tolerating bad lines."""
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError:
continue
def now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")