from __future__ import annotations import json from datetime import datetime from pathlib import Path from typing import Any from api.services.token_sources.base import TokenSourceRecord, parse_iso PARSER_VERSION = "codex-desktop-v1" def iter_codex_session_files(codex_home: Path) -> list[Path]: files: list[Path] = [] sessions = codex_home / "sessions" archived = codex_home / "archived_sessions" if sessions.is_dir(): files.extend(sorted(sessions.glob("*/*/*/*.jsonl"))) if archived.is_dir(): files.extend(sorted(archived.glob("*.jsonl"))) return files def parse_codex_session(path: Path, since: datetime) -> TokenSourceRecord | None: fallback_id = path.stem.removeprefix("rollout-") session_id = fallback_id started_at: datetime | None = None last_at: datetime | None = None cwd: str | None = None model: str | None = None tokens_in = tokens_out = 0 cached_input_tokens = reasoning_output_tokens = 0 raw_total_tokens = 0 usage_records = 0 malformed_lines = 0 try: handle = path.open("r", encoding="utf-8", errors="ignore") except OSError: return None with handle: for line in handle: try: entry: dict[str, Any] = json.loads(line) except json.JSONDecodeError: malformed_lines += 1 continue ts = entry.get("timestamp") parsed_ts = parse_iso(ts) if isinstance(ts, str) else None if parsed_ts: last_at = parsed_ts started_at = started_at or parsed_ts payload = entry.get("payload") or {} if entry.get("type") == "session_meta": meta_id = payload.get("id") if meta_id: session_id = str(meta_id) cwd = payload.get("cwd") or cwd meta_ts = payload.get("timestamp") if isinstance(meta_ts, str): started_at = parse_iso(meta_ts) elif entry.get("type") == "turn_context": cwd = payload.get("cwd") or cwd model = payload.get("model") or model elif entry.get("type") == "event_msg" and payload.get("type") == "token_count": if parsed_ts is None or parsed_ts < since: continue info = payload.get("info") or {} last = info.get("last_token_usage") or {} if not isinstance(last, dict): continue input_tokens = int(last.get("input_tokens") or 0) output_tokens = int(last.get("output_tokens") or 0) if input_tokens == 0 and output_tokens == 0: continue tokens_in += input_tokens tokens_out += output_tokens cached_input_tokens += int(last.get("cached_input_tokens") or 0) reasoning_output_tokens += int(last.get("reasoning_output_tokens") or 0) raw_total_tokens += int(last.get("total_tokens") or input_tokens + output_tokens) usage_records += 1 last_at = parsed_ts if usage_records == 0 or tokens_in + tokens_out == 0: return None return TokenSourceRecord( source_provider="codex_session", source_id=f"codex:{session_id}", source_path=path, source_created_at=last_at, session_id=session_id, cwd=cwd, model=model, agent="codex", tokens_in=tokens_in, tokens_out=tokens_out, cached_input_tokens=cached_input_tokens, reasoning_output_tokens=reasoning_output_tokens, raw_total_tokens=raw_total_tokens or None, parser_version=PARSER_VERSION, confidence=1.0, raw_metadata={ "started_at": started_at.isoformat() if started_at else None, "usage_records": usage_records, "malformed_lines": malformed_lines, "source_file_name": path.name, }, ) def collect_codex_sessions(codex_home: Path, since: datetime) -> list[TokenSourceRecord]: by_id: dict[str, TokenSourceRecord] = {} for path in iter_codex_session_files(codex_home): parsed = parse_codex_session(path, since) if parsed is None: continue current = by_id.get(parsed.source_id) if current is None or parsed.tokens_total > current.tokens_total: by_id[parsed.source_id] = parsed return sorted(by_id.values(), key=lambda item: item.source_created_at or datetime.min.replace(tzinfo=since.tzinfo))