#!/usr/bin/env python3 """Backfill State Hub token events from local Codex session logs. The parser lives in ``api.services.token_sources.codex`` so this CLI only handles operator flags, repo attribution, idempotent writes, and fallback cleanup. """ from __future__ import annotations import argparse import json import os import sys import urllib.parse import urllib.request from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parent.parent if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from api.services.token_sources import collect_codex_sessions, parse_iso # noqa: E402 from api.services.token_sources.attribution import repo_refs_from_api, resolve_repo # noqa: E402 DEFAULT_API = os.environ.get("STATE_HUB_API", "http://127.0.0.1:8000") BACKFILL_NOTE = "backfill:codex-session" SUPERSEDED_HEURISTIC_NOTE = "heuristic_superseded_by_codex_backfill" def http_json(api_base: str, method: str, path: str, body: dict[str, Any] | None = None) -> Any: url = f"{api_base.rstrip('/')}/{path.lstrip('/')}" data = None headers = {"Content-Type": "application/json"} if body is not None: data = json.dumps(body).encode("utf-8") req = urllib.request.Request(url, data=data, headers=headers, method=method) with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read() or b"null") def find_codex_home(explicit: str | None) -> Path: candidates: list[Path] = [] if explicit: candidates.append(Path(explicit)) env_home = os.environ.get("CODEX_HOME") if env_home: candidates.append(Path(env_home)) candidates.extend( [ Path.home() / ".codex", Path("/mnt/c/Users/bernd.worsch/.codex"), ] ) for candidate in candidates: if candidate.is_dir(): return candidate raise SystemExit("Could not find Codex home; pass --codex-home") def list_events(api_base: str, params: dict[str, Any]) -> list[dict[str, Any]]: events: list[dict[str, Any]] = [] offset = 0 while True: page_params = {**params, "limit": 1000, "offset": offset} encoded = urllib.parse.urlencode(page_params) page = http_json(api_base, "GET", f"/token-events/?{encoded}") if not isinstance(page, list) or not page: break events.extend(page) if len(page) < 1000: break offset += 1000 return events def existing_codex_events(api_base: str) -> dict[str, dict[str, Any]]: events = list_events( api_base, {"source_provider": "codex_session", "include_superseded": "true"}, ) by_source: dict[str, dict[str, Any]] = {} for event in events: source_id = event.get("source_id") or event.get("ref_id") if isinstance(source_id, str): by_source[source_id] = event return by_source def fetch_heuristics(api_base: str, since: str) -> list[dict[str, Any]]: return list_events( api_base, { "source_provider": "task_fallback", "note": "heuristic", "since": since, "include_superseded": "false", }, ) def patch_superseded_heuristic(api_base: str, event_id: str) -> None: http_json( api_base, "PATCH", f"/token-events/{event_id}", { "tokens_in": 0, "tokens_out": 0, "note": SUPERSEDED_HEURISTIC_NOTE, "measurement_kind": "superseded", "source_provider": "task_fallback", "confidence": 0.0, "raw_total_tokens": 0, }, ) def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--since", default="2026-05-19", help="UTC date/time to backfill from") parser.add_argument("--api-base", default=DEFAULT_API) parser.add_argument("--codex-home") parser.add_argument("--apply", action="store_true", help="write backfill events") parser.add_argument( "--zero-heuristics", action="store_true", help="set post-since heuristic task fallback events to zero after backfill", ) args = parser.parse_args() since = parse_iso(args.since) since_param = since.isoformat() codex_home = find_codex_home(args.codex_home) repo_refs = repo_refs_from_api(http_json(args.api_base, "GET", "/repos/")) existing = existing_codex_events(args.api_base) sessions = collect_codex_sessions(codex_home, since) planned: list[tuple[str, Any, str | None, str | None]] = [] by_repo: dict[str, list[int]] = {} for session in sessions: event = existing.get(session.source_id) existing_total = (event.get("tokens_in", 0) + event.get("tokens_out", 0)) if event else 0 action = "create" if event is None else ("update" if session.tokens_total > existing_total else "skip") match = resolve_repo(session.cwd, repo_refs) repo_id = match.repo_id if match else None repo_slug = match.slug if match else None if action != "skip": planned.append((action, session, repo_id, repo_slug)) label = repo_slug or "(unattributed)" totals = by_repo.setdefault(label, [0, 0, 0]) totals[0] += 1 totals[1] += session.tokens_in totals[2] += session.tokens_out heuristics = fetch_heuristics(args.api_base, since_param) if args.zero_heuristics else [] print(f"codex_home: {codex_home}") print(f"since: {since.isoformat()}") print(f"sessions found: {len(sessions)}") print(f"backfill events to create: {sum(1 for action, *_ in planned if action == 'create')}") print(f"backfill events to update: {sum(1 for action, *_ in planned if action == 'update')}") for repo_slug, (count, tokens_in, tokens_out) in sorted(by_repo.items()): print(f" {repo_slug}: {count} sessions, {tokens_in + tokens_out:,} tokens") if args.zero_heuristics: total = sum((e.get("tokens_in") or 0) + (e.get("tokens_out") or 0) for e in heuristics) print(f"heuristic events to zero: {len(heuristics)} ({total:,} tokens)") if not args.apply: print("dry run only; pass --apply to write changes") return 0 for _action, session, repo_id, repo_slug in planned: payload = session.to_token_event_payload(repo_id=repo_id) payload["note"] = BACKFILL_NOTE payload["raw_metadata"] = { **payload.get("raw_metadata", {}), "repo_slug": repo_slug, "attribution_method": resolve_repo(session.cwd, repo_refs).method if resolve_repo(session.cwd, repo_refs) else None, } http_json(args.api_base, "POST", "/token-events/upsert", payload) for event in heuristics: patch_superseded_heuristic(args.api_base, event["id"]) print(f"upserted {len(planned)} backfill events") if args.zero_heuristics: print(f"zeroed {len(heuristics)} heuristic events") return 0 if __name__ == "__main__": raise SystemExit(main())