From 29fc211a140899ab1067af3dc7aeaedd5c536aac Mon Sep 17 00:00:00 2001 From: tegwick Date: Sat, 6 Jun 2026 19:10:02 +0200 Subject: [PATCH] session-memory Phase 0: Tier1/Tier2 store (T03) - session_memory/core/store.py: SQLite rows + blob-dir bodies, idempotent ingest on (session_uid,seq), Tier1/Tier2 usage accounting, evict_raw that drops raw but preserves the digest; watermark columns authoritative - tests/test_store.py: ingest idempotency, accounting, eviction invariant Co-Authored-By: Claude Opus 4.8 --- session_memory/core/store.py | 225 ++++++++++++++++++ tests/test_store.py | 82 +++++++ .../AGENTIC-WP-0002-session-memory-phase0.md | 4 +- 3 files changed, 309 insertions(+), 2 deletions(-) create mode 100644 session_memory/core/store.py create mode 100644 tests/test_store.py diff --git a/session_memory/core/store.py b/session_memory/core/store.py new file mode 100644 index 0000000..3b98c09 --- /dev/null +++ b/session_memory/core/store.py @@ -0,0 +1,225 @@ +"""Two-tier store (design §3, §8). + +Tier 1 (bulky, evictable): ``Session`` + ``SessionEvent`` rows in SQLite, with +event bodies written out-of-line as files under a blob dir (referenced by +``payload_ref``). Tier 2 (compact, durable): per-session ``digest`` rows. + +Writes are idempotent on ``(session_uid, seq)`` for events and on +``session_uid`` for sessions/digests, so sweeps are safely re-runnable. Eviction +(:meth:`evict_raw`) deletes Tier 1 rows + blobs but keeps the session row and its +Tier 2 digest — the invariant that makes budget-based retention non-lossy. +""" + +from __future__ import annotations + +import json +import os +import re +import sqlite3 +from datetime import datetime, timezone +from typing import Any, Optional + +from .schema import Cost, Session, SessionEvent + +_SAFE = re.compile(r"[^A-Za-z0-9._-]+") + + +def _now() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +class Store: + def __init__(self, db_path: str, blob_dir: str): + self.db_path = db_path + self.blob_dir = blob_dir + os.makedirs(os.path.dirname(db_path) or ".", exist_ok=True) + os.makedirs(blob_dir, exist_ok=True) + self.db = sqlite3.connect(db_path) + self.db.row_factory = sqlite3.Row + self.db.execute("PRAGMA journal_mode=WAL") + self._init_schema() + + def close(self) -> None: + self.db.close() + + def __enter__(self) -> "Store": + return self + + def __exit__(self, *exc) -> None: + self.close() + + def _init_schema(self) -> None: + self.db.executescript( + """ + CREATE TABLE IF NOT EXISTS sessions ( + session_uid TEXT PRIMARY KEY, + json TEXT NOT NULL, + analyzed_at TEXT, + evicted_at TEXT + ); + CREATE TABLE IF NOT EXISTS events ( + session_uid TEXT NOT NULL, + seq INTEGER NOT NULL, + json TEXT NOT NULL, + PRIMARY KEY (session_uid, seq) + ); + CREATE TABLE IF NOT EXISTS blobs ( + ref TEXT PRIMARY KEY, + session_uid TEXT NOT NULL, + path TEXT NOT NULL, + nbytes INTEGER NOT NULL + ); + CREATE TABLE IF NOT EXISTS digests ( + session_uid TEXT PRIMARY KEY, + json TEXT NOT NULL, + nbytes INTEGER NOT NULL + ); + CREATE INDEX IF NOT EXISTS ix_events_uid ON events(session_uid); + CREATE INDEX IF NOT EXISTS ix_blobs_uid ON blobs(session_uid); + """ + ) + self.db.commit() + + # ---- Tier 1 writes ----------------------------------------------------- + + def upsert_session(self, s: Session) -> None: + self.db.execute( + "INSERT INTO sessions(session_uid, json, analyzed_at, evicted_at) " + "VALUES(?,?,?,?) ON CONFLICT(session_uid) DO UPDATE SET " + "json=excluded.json, analyzed_at=excluded.analyzed_at, evicted_at=excluded.evicted_at", + (s.session_uid, s.to_json(), s.analyzed_at, s.evicted_at), + ) + self.db.commit() + + def upsert_events(self, events: list[SessionEvent]) -> int: + rows = [(e.session_uid, e.seq, e.to_json()) for e in events] + self.db.executemany( + "INSERT INTO events(session_uid, seq, json) VALUES(?,?,?) " + "ON CONFLICT(session_uid, seq) DO UPDATE SET json=excluded.json", + rows, + ) + self.db.commit() + return len(rows) + + def write_blobs(self, session_uid: str, blobs: dict[str, str]) -> int: + """Write event bodies as files; record path + size. Returns bytes written.""" + total = 0 + sub = os.path.join(self.blob_dir, _SAFE.sub("_", session_uid)) + os.makedirs(sub, exist_ok=True) + for ref, body in blobs.items(): + data = body.encode("utf-8") + fname = _SAFE.sub("_", ref) + ".txt" + path = os.path.join(sub, fname) + with open(path, "w", encoding="utf-8") as f: + f.write(body) + self.db.execute( + "INSERT INTO blobs(ref, session_uid, path, nbytes) VALUES(?,?,?,?) " + "ON CONFLICT(ref) DO UPDATE SET path=excluded.path, nbytes=excluded.nbytes", + (ref, session_uid, path, len(data)), + ) + total += len(data) + self.db.commit() + return total + + def ingest(self, bundle) -> None: + """Persist a full Normalized bundle (session + events + blobs).""" + s = bundle.session + if s.ingested_at is None: + s.ingested_at = _now() + self.upsert_session(s) + self.upsert_events(bundle.events) + self.write_blobs(s.session_uid, bundle.blobs) + + # ---- Tier 2 (digest) --------------------------------------------------- + + def write_digest(self, session_uid: str, digest: dict[str, Any], analyzed_at: Optional[str] = None) -> None: + payload = json.dumps(digest, sort_keys=True) + self.db.execute( + "INSERT INTO digests(session_uid, json, nbytes) VALUES(?,?,?) " + "ON CONFLICT(session_uid) DO UPDATE SET json=excluded.json, nbytes=excluded.nbytes", + (session_uid, payload, len(payload.encode("utf-8"))), + ) + self.db.execute( + "UPDATE sessions SET analyzed_at=? WHERE session_uid=?", + (analyzed_at or _now(), session_uid), + ) + self.db.commit() + + def get_digest(self, session_uid: str) -> Optional[dict[str, Any]]: + row = self.db.execute("SELECT json FROM digests WHERE session_uid=?", (session_uid,)).fetchone() + return json.loads(row["json"]) if row else None + + # ---- reads ------------------------------------------------------------- + + def get_session(self, session_uid: str) -> Optional[Session]: + row = self.db.execute( + "SELECT json, analyzed_at, evicted_at FROM sessions WHERE session_uid=?", (session_uid,) + ).fetchone() + return self._row_to_session(row) if row else None + + def list_sessions(self) -> list[Session]: + rows = self.db.execute("SELECT json, analyzed_at, evicted_at FROM sessions") + return [self._row_to_session(r) for r in rows] + + @staticmethod + def _row_to_session(row) -> Session: + """Rebuild a Session, treating the watermark columns as authoritative.""" + s = Session.from_json(row["json"]) + s.analyzed_at = row["analyzed_at"] + s.evicted_at = row["evicted_at"] + return s + + def get_events(self, session_uid: str) -> list[SessionEvent]: + rows = self.db.execute( + "SELECT json FROM events WHERE session_uid=? ORDER BY seq", (session_uid,) + ).fetchall() + return [SessionEvent.from_json(r["json"]) for r in rows] + + def count_events(self, session_uid: str) -> int: + return self.db.execute( + "SELECT COUNT(*) c FROM events WHERE session_uid=?", (session_uid,) + ).fetchone()["c"] + + # ---- usage accounting (drives retention) ------------------------------- + + def tier1_usage_bytes(self) -> int: + """Bytes held in Tier 1: event-row JSON + blob bytes for non-evicted sessions.""" + row = self.db.execute( + "SELECT COALESCE(SUM(LENGTH(json)),0) b FROM events e " + "WHERE NOT EXISTS (SELECT 1 FROM sessions s " + "WHERE s.session_uid=e.session_uid AND s.evicted_at IS NOT NULL)" + ).fetchone() + blob = self.db.execute("SELECT COALESCE(SUM(nbytes),0) b FROM blobs").fetchone() + return int(row["b"]) + int(blob["b"]) + + def session_tier1_bytes(self, session_uid: str) -> int: + ev = self.db.execute( + "SELECT COALESCE(SUM(LENGTH(json)),0) b FROM events WHERE session_uid=?", (session_uid,) + ).fetchone()["b"] + bl = self.db.execute( + "SELECT COALESCE(SUM(nbytes),0) b FROM blobs WHERE session_uid=?", (session_uid,) + ).fetchone()["b"] + return int(ev) + int(bl) + + def tier2_usage_bytes(self) -> int: + return int(self.db.execute("SELECT COALESCE(SUM(nbytes),0) b FROM digests").fetchone()["b"]) + + # ---- eviction ---------------------------------------------------------- + + def evict_raw(self, session_uid: str) -> int: + """Drop Tier 1 raw (events + blob files) for a session; keep digest + row. + + Sets ``evicted_at``. Returns bytes freed. Safe to call on an + already-evicted session (no-op-ish). + """ + freed = self.session_tier1_bytes(session_uid) + for r in self.db.execute("SELECT path FROM blobs WHERE session_uid=?", (session_uid,)).fetchall(): + try: + os.remove(r["path"]) + except FileNotFoundError: + pass + self.db.execute("DELETE FROM blobs WHERE session_uid=?", (session_uid,)) + self.db.execute("DELETE FROM events WHERE session_uid=?", (session_uid,)) + self.db.execute("UPDATE sessions SET evicted_at=? WHERE session_uid=?", (_now(), session_uid)) + self.db.commit() + return freed diff --git a/tests/test_store.py b/tests/test_store.py new file mode 100644 index 0000000..79076a1 --- /dev/null +++ b/tests/test_store.py @@ -0,0 +1,82 @@ +"""Store tests (T03): idempotent ingest, usage accounting, eviction.""" + +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from session_memory.adapters.claude import Normalized # noqa: E402 +from session_memory.core.schema import Cost, Session, SessionEvent # noqa: E402 +from session_memory.core.store import Store # noqa: E402 + + +def _bundle(uid_native="s1", n_events=3): + s = Session( + session_uid=Session.make_uid("claude", uid_native), + flavor="claude", native_session_id=uid_native, + repo="agentic-resources", domain="helix_forge", + cost=Cost(input_tokens=10), + ) + events, blobs = [], {} + for i in range(n_events): + ref = f"blob://{uid_native}/{i}" + events.append(SessionEvent(session_uid=s.session_uid, seq=i, kind="assistant_msg", + payload_ref=ref, summary=f"msg {i}")) + blobs[ref] = "x" * 100 + return Normalized(session=s, events=events, blobs=blobs) + + +def _store(tmp_path): + return Store(str(tmp_path / "mem.db"), str(tmp_path / "blobs")) + + +def test_ingest_and_read_back(tmp_path): + st = _store(tmp_path) + b = _bundle("s1", 3) + st.ingest(b) + s = st.get_session(b.session.session_uid) + assert s is not None and s.ingested_at is not None + assert st.count_events(b.session.session_uid) == 3 + assert st.tier1_usage_bytes() > 0 + + +def test_ingest_is_idempotent(tmp_path): + st = _store(tmp_path) + b = _bundle("s1", 3) + st.ingest(b) + before = st.tier1_usage_bytes() + st.ingest(b) # re-run same sweep + assert st.count_events(b.session.session_uid) == 3 # no duplicate rows + assert st.tier1_usage_bytes() == before # blobs upserted, not doubled + + +def test_digest_sets_analyzed_and_tier2_bytes(tmp_path): + st = _store(tmp_path) + b = _bundle("s1", 2) + st.ingest(b) + assert st.get_session(b.session.session_uid).analyzed_at is None + st.write_digest(b.session.session_uid, {"outcome": "success", "tools": {"Edit": 1}}) + assert st.get_session(b.session.session_uid).analyzed_at is not None + assert st.tier2_usage_bytes() > 0 + assert st.get_digest(b.session.session_uid)["outcome"] == "success" + + +def test_evict_raw_keeps_digest_drops_raw(tmp_path): + st = _store(tmp_path) + b = _bundle("s1", 3) + st.ingest(b) + st.write_digest(b.session.session_uid, {"outcome": "unknown"}) + blob_dir_files_before = sum(len(f) for _, _, f in os.walk(str(tmp_path / "blobs"))) + assert blob_dir_files_before > 0 + + freed = st.evict_raw(b.session.session_uid) + assert freed > 0 + assert st.count_events(b.session.session_uid) == 0 # raw gone + assert st.get_events(b.session.session_uid) == [] + assert st.get_session(b.session.session_uid).evicted_at is not None + assert st.get_digest(b.session.session_uid) is not None # Tier 2 preserved + # blob files removed from disk + remaining = [f for _, _, fs in os.walk(str(tmp_path / "blobs")) for f in fs] + assert remaining == [] + # evicted session no longer counts toward Tier 1 usage + assert st.tier1_usage_bytes() == 0 diff --git a/workplans/AGENTIC-WP-0002-session-memory-phase0.md b/workplans/AGENTIC-WP-0002-session-memory-phase0.md index da380e0..51bf876 100644 --- a/workplans/AGENTIC-WP-0002-session-memory-phase0.md +++ b/workplans/AGENTIC-WP-0002-session-memory-phase0.md @@ -58,7 +58,7 @@ Codex/Grok work in Phase 0 (designed for, not built). ```task id: AGENTIC-WP-0002-T03 -status: progress +status: done priority: high state_hub_task_id: "2387258e-ba6d-4a41-919e-f2f4e0822110" ``` @@ -72,7 +72,7 @@ Tier 1 (rows + blobs) and Tier 2, used by retention. ```task id: AGENTIC-WP-0002-T04 -status: todo +status: progress priority: medium state_hub_task_id: "017d8e90-633a-49f2-b342-8690938798cd" ```