generated from coulomb/repo-seed
- adapters/common.py: shared Normalized + helpers (resolve_repo, classify_tool,
jsonl iter, etc.); claude.py refactored to use it (Normalized re-exported)
- adapters/codex.py: rollout {timestamp,type,payload} parser; session_meta/
response_item/event_msg mapping; flat call_id join; token_count cost;
registered in ingest dispatch
- core/store.py: ingest() now merges multi-file sessions by content
fingerprint, appends new events with offset seq (design OQ6); idempotent
- tests/test_codex_adapter.py, tests/test_merge.py
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
101 lines
2.9 KiB
Python
101 lines
2.9 KiB
Python
"""Shared adapter helpers (Tier 0 -> Tier 1).
|
|
|
|
The ``Normalized`` bundle contract and small flavor-agnostic helpers used by every
|
|
collector adapter. Per-flavor parsing lives in the individual adapter modules.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Optional
|
|
|
|
from ..core.schema import Session, SessionEvent
|
|
|
|
# tool names that mutate files -> kind "edit" (union across flavors)
|
|
EDIT_TOOLS = {
|
|
"Edit", "Write", "NotebookEdit", "MultiEdit", # Claude
|
|
"apply_patch", "write_file", "edit_file", # Codex / Grok variants
|
|
}
|
|
# substrings in a shell/tool command that indicate a test run -> kind "test_run"
|
|
TEST_HINTS = (
|
|
"pytest", "unittest", "npm test", "npm run test", "go test",
|
|
"cargo test", "jest", "vitest", "make test", "tox",
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class Normalized:
|
|
session: Session
|
|
events: list[SessionEvent]
|
|
blobs: dict[str, str] = field(default_factory=dict)
|
|
|
|
|
|
def resolve_repo(cwd: Optional[str], repo_domain_map: dict[str, str]) -> tuple[Optional[str], Optional[str]]:
|
|
"""cwd -> (repo, domain). repo is the cwd basename; domain via map."""
|
|
if not cwd:
|
|
return None, None
|
|
repo = os.path.basename(cwd.rstrip("/")) or None
|
|
domain = repo_domain_map.get(repo) if repo else None
|
|
return repo, domain
|
|
|
|
|
|
def is_test_command(text: str) -> bool:
|
|
low = (text or "").lower()
|
|
return any(h in low for h in TEST_HINTS)
|
|
|
|
|
|
def classify_tool(name: str, command_text: str = "") -> str:
|
|
"""Map a tool invocation to an event kind: edit | test_run | tool_call."""
|
|
if name in EDIT_TOOLS:
|
|
return "edit"
|
|
if is_test_command(command_text) or is_test_command(name):
|
|
return "test_run"
|
|
return "tool_call"
|
|
|
|
|
|
def stringify(v: Any, limit: int = 20000) -> str:
|
|
if v is None:
|
|
return ""
|
|
if isinstance(v, str):
|
|
return v[:limit]
|
|
try:
|
|
return json.dumps(v, ensure_ascii=False)[:limit]
|
|
except (TypeError, ValueError):
|
|
return str(v)[:limit]
|
|
|
|
|
|
def first_line(text: str) -> str:
|
|
t = (text or "").strip()
|
|
return t.splitlines()[0] if t else ""
|
|
|
|
|
|
def seconds_between(start: Optional[str], end: Optional[str]) -> float:
|
|
if not start or not end:
|
|
return 0.0
|
|
try:
|
|
a = datetime.fromisoformat(start.replace("Z", "+00:00"))
|
|
b = datetime.fromisoformat(end.replace("Z", "+00:00"))
|
|
return max(0.0, (b - a).total_seconds())
|
|
except ValueError:
|
|
return 0.0
|
|
|
|
|
|
def iter_jsonl(path: str):
|
|
"""Yield parsed JSON objects from a JSONL file, tolerating bad lines."""
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
yield json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
|
|
def now_iso() -> str:
|
|
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|