"""Two-tier store (design §3, §8). Tier 1 (bulky, evictable): ``Session`` + ``SessionEvent`` rows in SQLite, with event bodies written out-of-line as files under a blob dir (referenced by ``payload_ref``). Tier 2 (compact, durable): per-session ``digest`` rows. Writes are idempotent on ``(session_uid, seq)`` for events and on ``session_uid`` for sessions/digests, so sweeps are safely re-runnable. Eviction (:meth:`evict_raw`) deletes Tier 1 rows + blobs but keeps the session row and its Tier 2 digest — the invariant that makes budget-based retention non-lossy. """ from __future__ import annotations import hashlib import json import os import re import sqlite3 from datetime import datetime, timezone from typing import Any, Optional from .schema import Cost, Session, SessionEvent _SAFE = re.compile(r"[^A-Za-z0-9._-]+") def _now() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def _fingerprint(ev: SessionEvent, body: Optional[str]) -> str: """Stable content fingerprint, independent of seq/payload_ref, for dedup.""" h = hashlib.sha1() parts = [ev.ts or "", ev.kind, ev.role or "", ev.tool or "", ev.summary or "", ev.role or "", str(ev.is_sidechain)] h.update("\x1f".join(parts).encode("utf-8")) if body is not None: h.update(b"\x1e") h.update(body.encode("utf-8")) return h.hexdigest() class Store: def __init__(self, db_path: str, blob_dir: str): self.db_path = db_path self.blob_dir = blob_dir os.makedirs(os.path.dirname(db_path) or ".", exist_ok=True) os.makedirs(blob_dir, exist_ok=True) self.db = sqlite3.connect(db_path) self.db.row_factory = sqlite3.Row self.db.execute("PRAGMA journal_mode=WAL") self._init_schema() def close(self) -> None: self.db.close() def __enter__(self) -> "Store": return self def __exit__(self, *exc) -> None: self.close() def _init_schema(self) -> None: self.db.executescript( """ CREATE TABLE IF NOT EXISTS sessions ( session_uid TEXT PRIMARY KEY, json TEXT NOT NULL, analyzed_at TEXT, evicted_at TEXT ); CREATE TABLE IF NOT EXISTS events ( session_uid TEXT NOT NULL, seq INTEGER NOT NULL, json TEXT NOT NULL, PRIMARY KEY (session_uid, seq) ); CREATE TABLE IF NOT EXISTS blobs ( ref TEXT PRIMARY KEY, session_uid TEXT NOT NULL, path TEXT NOT NULL, nbytes INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS digests ( session_uid TEXT PRIMARY KEY, json TEXT NOT NULL, nbytes INTEGER NOT NULL ); CREATE INDEX IF NOT EXISTS ix_events_uid ON events(session_uid); CREATE INDEX IF NOT EXISTS ix_blobs_uid ON blobs(session_uid); """ ) self.db.commit() # ---- Tier 1 writes ----------------------------------------------------- def upsert_session(self, s: Session) -> None: self.db.execute( "INSERT INTO sessions(session_uid, json, analyzed_at, evicted_at) " "VALUES(?,?,?,?) ON CONFLICT(session_uid) DO UPDATE SET " "json=excluded.json, analyzed_at=excluded.analyzed_at, evicted_at=excluded.evicted_at", (s.session_uid, s.to_json(), s.analyzed_at, s.evicted_at), ) self.db.commit() def upsert_events(self, events: list[SessionEvent]) -> int: rows = [(e.session_uid, e.seq, e.to_json()) for e in events] self.db.executemany( "INSERT INTO events(session_uid, seq, json) VALUES(?,?,?) " "ON CONFLICT(session_uid, seq) DO UPDATE SET json=excluded.json", rows, ) self.db.commit() return len(rows) def write_blobs(self, session_uid: str, blobs: dict[str, str]) -> int: """Write event bodies as files; record path + size. Returns bytes written.""" total = 0 sub = os.path.join(self.blob_dir, _SAFE.sub("_", session_uid)) os.makedirs(sub, exist_ok=True) for ref, body in blobs.items(): data = body.encode("utf-8") fname = _SAFE.sub("_", ref) + ".txt" path = os.path.join(sub, fname) with open(path, "w", encoding="utf-8") as f: f.write(body) self.db.execute( "INSERT INTO blobs(ref, session_uid, path, nbytes) VALUES(?,?,?,?) " "ON CONFLICT(ref) DO UPDATE SET path=excluded.path, nbytes=excluded.nbytes", (ref, session_uid, path, len(data)), ) total += len(data) self.db.commit() return total def ingest(self, bundle) -> int: """Persist a Normalized bundle, merging into any existing session. Multiple files can map to one ``session_uid`` (Claude resume/sidechains; Grok multi-file dirs). Events are de-duplicated by content fingerprint and genuinely-new events are appended with offset ``seq`` (design OQ6 / T03). Returns the number of new events written. Idempotent: re-ingesting the same bundle adds nothing. """ s = bundle.session existing = self.get_session(s.session_uid) if existing is None: if s.ingested_at is None: s.ingested_at = _now() self.upsert_session(s) # known fingerprints + current max seq for this session seen = self._event_fingerprints(s.session_uid) next_seq = self._max_seq(s.session_uid) + 1 new_events: list[SessionEvent] = [] new_blobs: dict[str, str] = {} old_to_new: dict[int, int] = {} for ev in bundle.events: body = bundle.blobs.get(ev.payload_ref) if ev.payload_ref else None fp = _fingerprint(ev, body) if fp in seen: continue # already stored (prior file or prior sweep) new_seq = next_seq next_seq += 1 old_to_new[ev.seq] = new_seq # remap parent within this bundle; cross-file parents become None parent = old_to_new.get(ev.parent_seq) if ev.parent_seq is not None else None ref = None if body is not None: ref = f"blob://{s.session_uid}/{new_seq}" new_blobs[ref] = body merged = SessionEvent( session_uid=s.session_uid, seq=new_seq, parent_seq=parent, ts=ev.ts, kind=ev.kind, role=ev.role, tool=ev.tool, summary=ev.summary, payload_ref=ref, tokens=ev.tokens, is_sidechain=ev.is_sidechain, ) new_events.append(merged) seen.add(fp) if new_events: self.upsert_events(new_events) self.write_blobs(s.session_uid, new_blobs) return len(new_events) def _max_seq(self, session_uid: str) -> int: row = self.db.execute( "SELECT COALESCE(MAX(seq), -1) m FROM events WHERE session_uid=?", (session_uid,) ).fetchone() return int(row["m"]) def _event_fingerprints(self, session_uid: str) -> set[str]: fps: set[str] = set() for e in self.get_events(session_uid): body = None if e.payload_ref: r = self.db.execute("SELECT path FROM blobs WHERE ref=?", (e.payload_ref,)).fetchone() if r: try: with open(r["path"], "r", encoding="utf-8") as f: body = f.read() except OSError: body = None fps.add(_fingerprint(e, body)) return fps # ---- Tier 2 (digest) --------------------------------------------------- def write_digest(self, session_uid: str, digest: dict[str, Any], analyzed_at: Optional[str] = None) -> None: payload = json.dumps(digest, sort_keys=True) self.db.execute( "INSERT INTO digests(session_uid, json, nbytes) VALUES(?,?,?) " "ON CONFLICT(session_uid) DO UPDATE SET json=excluded.json, nbytes=excluded.nbytes", (session_uid, payload, len(payload.encode("utf-8"))), ) self.db.execute( "UPDATE sessions SET analyzed_at=? WHERE session_uid=?", (analyzed_at or _now(), session_uid), ) self.db.commit() def get_digest(self, session_uid: str) -> Optional[dict[str, Any]]: row = self.db.execute("SELECT json FROM digests WHERE session_uid=?", (session_uid,)).fetchone() return json.loads(row["json"]) if row else None def list_digests(self) -> list[dict[str, Any]]: return [json.loads(r["json"]) for r in self.db.execute("SELECT json FROM digests")] def save_patterns(self, patterns: list[dict[str, Any]]) -> None: """Persist candidate patterns to a Tier 2 table (replace prior run).""" self.db.execute( "CREATE TABLE IF NOT EXISTS patterns (" "key TEXT PRIMARY KEY, json TEXT NOT NULL, detected_at TEXT NOT NULL)" ) self.db.execute("DELETE FROM patterns") self.db.executemany( "INSERT INTO patterns(key, json, detected_at) VALUES(?,?,?)", [(p["key"], json.dumps(p, sort_keys=True), _now()) for p in patterns], ) self.db.commit() # ---- reads ------------------------------------------------------------- def get_session(self, session_uid: str) -> Optional[Session]: row = self.db.execute( "SELECT json, analyzed_at, evicted_at FROM sessions WHERE session_uid=?", (session_uid,) ).fetchone() return self._row_to_session(row) if row else None def list_sessions(self) -> list[Session]: rows = self.db.execute("SELECT json, analyzed_at, evicted_at FROM sessions") return [self._row_to_session(r) for r in rows] @staticmethod def _row_to_session(row) -> Session: """Rebuild a Session, treating the watermark columns as authoritative.""" s = Session.from_json(row["json"]) s.analyzed_at = row["analyzed_at"] s.evicted_at = row["evicted_at"] return s def get_events(self, session_uid: str) -> list[SessionEvent]: rows = self.db.execute( "SELECT json FROM events WHERE session_uid=? ORDER BY seq", (session_uid,) ).fetchall() return [SessionEvent.from_json(r["json"]) for r in rows] def count_events(self, session_uid: str) -> int: return self.db.execute( "SELECT COUNT(*) c FROM events WHERE session_uid=?", (session_uid,) ).fetchone()["c"] # ---- usage accounting (drives retention) ------------------------------- def tier1_usage_bytes(self) -> int: """Bytes held in Tier 1: event-row JSON + blob bytes for non-evicted sessions.""" row = self.db.execute( "SELECT COALESCE(SUM(LENGTH(json)),0) b FROM events e " "WHERE NOT EXISTS (SELECT 1 FROM sessions s " "WHERE s.session_uid=e.session_uid AND s.evicted_at IS NOT NULL)" ).fetchone() blob = self.db.execute("SELECT COALESCE(SUM(nbytes),0) b FROM blobs").fetchone() return int(row["b"]) + int(blob["b"]) def session_tier1_bytes(self, session_uid: str) -> int: ev = self.db.execute( "SELECT COALESCE(SUM(LENGTH(json)),0) b FROM events WHERE session_uid=?", (session_uid,) ).fetchone()["b"] bl = self.db.execute( "SELECT COALESCE(SUM(nbytes),0) b FROM blobs WHERE session_uid=?", (session_uid,) ).fetchone()["b"] return int(ev) + int(bl) def tier2_usage_bytes(self) -> int: return int(self.db.execute("SELECT COALESCE(SUM(nbytes),0) b FROM digests").fetchone()["b"]) # ---- eviction ---------------------------------------------------------- def evict_raw(self, session_uid: str) -> int: """Drop Tier 1 raw (events + blob files) for a session; keep digest + row. Sets ``evicted_at``. Returns bytes freed. Safe to call on an already-evicted session (no-op-ish). """ freed = self.session_tier1_bytes(session_uid) for r in self.db.execute("SELECT path FROM blobs WHERE session_uid=?", (session_uid,)).fetchall(): try: os.remove(r["path"]) except FileNotFoundError: pass self.db.execute("DELETE FROM blobs WHERE session_uid=?", (session_uid,)) self.db.execute("DELETE FROM events WHERE session_uid=?", (session_uid,)) self.db.execute("UPDATE sessions SET evicted_at=? WHERE session_uid=?", (_now(), session_uid)) self.db.commit() return freed