generated from coulomb/repo-seed
session-memory Phase 0: Tier1/Tier2 store (T03)
- session_memory/core/store.py: SQLite rows + blob-dir bodies, idempotent ingest on (session_uid,seq), Tier1/Tier2 usage accounting, evict_raw that drops raw but preserves the digest; watermark columns authoritative - tests/test_store.py: ingest idempotency, accounting, eviction invariant Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
225
session_memory/core/store.py
Normal file
225
session_memory/core/store.py
Normal file
@@ -0,0 +1,225 @@
|
||||
"""Two-tier store (design §3, §8).
|
||||
|
||||
Tier 1 (bulky, evictable): ``Session`` + ``SessionEvent`` rows in SQLite, with
|
||||
event bodies written out-of-line as files under a blob dir (referenced by
|
||||
``payload_ref``). Tier 2 (compact, durable): per-session ``digest`` rows.
|
||||
|
||||
Writes are idempotent on ``(session_uid, seq)`` for events and on
|
||||
``session_uid`` for sessions/digests, so sweeps are safely re-runnable. Eviction
|
||||
(:meth:`evict_raw`) deletes Tier 1 rows + blobs but keeps the session row and its
|
||||
Tier 2 digest — the invariant that makes budget-based retention non-lossy.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sqlite3
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Optional
|
||||
|
||||
from .schema import Cost, Session, SessionEvent
|
||||
|
||||
_SAFE = re.compile(r"[^A-Za-z0-9._-]+")
|
||||
|
||||
|
||||
def _now() -> str:
|
||||
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
|
||||
|
||||
class Store:
|
||||
def __init__(self, db_path: str, blob_dir: str):
|
||||
self.db_path = db_path
|
||||
self.blob_dir = blob_dir
|
||||
os.makedirs(os.path.dirname(db_path) or ".", exist_ok=True)
|
||||
os.makedirs(blob_dir, exist_ok=True)
|
||||
self.db = sqlite3.connect(db_path)
|
||||
self.db.row_factory = sqlite3.Row
|
||||
self.db.execute("PRAGMA journal_mode=WAL")
|
||||
self._init_schema()
|
||||
|
||||
def close(self) -> None:
|
||||
self.db.close()
|
||||
|
||||
def __enter__(self) -> "Store":
|
||||
return self
|
||||
|
||||
def __exit__(self, *exc) -> None:
|
||||
self.close()
|
||||
|
||||
def _init_schema(self) -> None:
|
||||
self.db.executescript(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS sessions (
|
||||
session_uid TEXT PRIMARY KEY,
|
||||
json TEXT NOT NULL,
|
||||
analyzed_at TEXT,
|
||||
evicted_at TEXT
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS events (
|
||||
session_uid TEXT NOT NULL,
|
||||
seq INTEGER NOT NULL,
|
||||
json TEXT NOT NULL,
|
||||
PRIMARY KEY (session_uid, seq)
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS blobs (
|
||||
ref TEXT PRIMARY KEY,
|
||||
session_uid TEXT NOT NULL,
|
||||
path TEXT NOT NULL,
|
||||
nbytes INTEGER NOT NULL
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS digests (
|
||||
session_uid TEXT PRIMARY KEY,
|
||||
json TEXT NOT NULL,
|
||||
nbytes INTEGER NOT NULL
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS ix_events_uid ON events(session_uid);
|
||||
CREATE INDEX IF NOT EXISTS ix_blobs_uid ON blobs(session_uid);
|
||||
"""
|
||||
)
|
||||
self.db.commit()
|
||||
|
||||
# ---- Tier 1 writes -----------------------------------------------------
|
||||
|
||||
def upsert_session(self, s: Session) -> None:
|
||||
self.db.execute(
|
||||
"INSERT INTO sessions(session_uid, json, analyzed_at, evicted_at) "
|
||||
"VALUES(?,?,?,?) ON CONFLICT(session_uid) DO UPDATE SET "
|
||||
"json=excluded.json, analyzed_at=excluded.analyzed_at, evicted_at=excluded.evicted_at",
|
||||
(s.session_uid, s.to_json(), s.analyzed_at, s.evicted_at),
|
||||
)
|
||||
self.db.commit()
|
||||
|
||||
def upsert_events(self, events: list[SessionEvent]) -> int:
|
||||
rows = [(e.session_uid, e.seq, e.to_json()) for e in events]
|
||||
self.db.executemany(
|
||||
"INSERT INTO events(session_uid, seq, json) VALUES(?,?,?) "
|
||||
"ON CONFLICT(session_uid, seq) DO UPDATE SET json=excluded.json",
|
||||
rows,
|
||||
)
|
||||
self.db.commit()
|
||||
return len(rows)
|
||||
|
||||
def write_blobs(self, session_uid: str, blobs: dict[str, str]) -> int:
|
||||
"""Write event bodies as files; record path + size. Returns bytes written."""
|
||||
total = 0
|
||||
sub = os.path.join(self.blob_dir, _SAFE.sub("_", session_uid))
|
||||
os.makedirs(sub, exist_ok=True)
|
||||
for ref, body in blobs.items():
|
||||
data = body.encode("utf-8")
|
||||
fname = _SAFE.sub("_", ref) + ".txt"
|
||||
path = os.path.join(sub, fname)
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
f.write(body)
|
||||
self.db.execute(
|
||||
"INSERT INTO blobs(ref, session_uid, path, nbytes) VALUES(?,?,?,?) "
|
||||
"ON CONFLICT(ref) DO UPDATE SET path=excluded.path, nbytes=excluded.nbytes",
|
||||
(ref, session_uid, path, len(data)),
|
||||
)
|
||||
total += len(data)
|
||||
self.db.commit()
|
||||
return total
|
||||
|
||||
def ingest(self, bundle) -> None:
|
||||
"""Persist a full Normalized bundle (session + events + blobs)."""
|
||||
s = bundle.session
|
||||
if s.ingested_at is None:
|
||||
s.ingested_at = _now()
|
||||
self.upsert_session(s)
|
||||
self.upsert_events(bundle.events)
|
||||
self.write_blobs(s.session_uid, bundle.blobs)
|
||||
|
||||
# ---- Tier 2 (digest) ---------------------------------------------------
|
||||
|
||||
def write_digest(self, session_uid: str, digest: dict[str, Any], analyzed_at: Optional[str] = None) -> None:
|
||||
payload = json.dumps(digest, sort_keys=True)
|
||||
self.db.execute(
|
||||
"INSERT INTO digests(session_uid, json, nbytes) VALUES(?,?,?) "
|
||||
"ON CONFLICT(session_uid) DO UPDATE SET json=excluded.json, nbytes=excluded.nbytes",
|
||||
(session_uid, payload, len(payload.encode("utf-8"))),
|
||||
)
|
||||
self.db.execute(
|
||||
"UPDATE sessions SET analyzed_at=? WHERE session_uid=?",
|
||||
(analyzed_at or _now(), session_uid),
|
||||
)
|
||||
self.db.commit()
|
||||
|
||||
def get_digest(self, session_uid: str) -> Optional[dict[str, Any]]:
|
||||
row = self.db.execute("SELECT json FROM digests WHERE session_uid=?", (session_uid,)).fetchone()
|
||||
return json.loads(row["json"]) if row else None
|
||||
|
||||
# ---- reads -------------------------------------------------------------
|
||||
|
||||
def get_session(self, session_uid: str) -> Optional[Session]:
|
||||
row = self.db.execute(
|
||||
"SELECT json, analyzed_at, evicted_at FROM sessions WHERE session_uid=?", (session_uid,)
|
||||
).fetchone()
|
||||
return self._row_to_session(row) if row else None
|
||||
|
||||
def list_sessions(self) -> list[Session]:
|
||||
rows = self.db.execute("SELECT json, analyzed_at, evicted_at FROM sessions")
|
||||
return [self._row_to_session(r) for r in rows]
|
||||
|
||||
@staticmethod
|
||||
def _row_to_session(row) -> Session:
|
||||
"""Rebuild a Session, treating the watermark columns as authoritative."""
|
||||
s = Session.from_json(row["json"])
|
||||
s.analyzed_at = row["analyzed_at"]
|
||||
s.evicted_at = row["evicted_at"]
|
||||
return s
|
||||
|
||||
def get_events(self, session_uid: str) -> list[SessionEvent]:
|
||||
rows = self.db.execute(
|
||||
"SELECT json FROM events WHERE session_uid=? ORDER BY seq", (session_uid,)
|
||||
).fetchall()
|
||||
return [SessionEvent.from_json(r["json"]) for r in rows]
|
||||
|
||||
def count_events(self, session_uid: str) -> int:
|
||||
return self.db.execute(
|
||||
"SELECT COUNT(*) c FROM events WHERE session_uid=?", (session_uid,)
|
||||
).fetchone()["c"]
|
||||
|
||||
# ---- usage accounting (drives retention) -------------------------------
|
||||
|
||||
def tier1_usage_bytes(self) -> int:
|
||||
"""Bytes held in Tier 1: event-row JSON + blob bytes for non-evicted sessions."""
|
||||
row = self.db.execute(
|
||||
"SELECT COALESCE(SUM(LENGTH(json)),0) b FROM events e "
|
||||
"WHERE NOT EXISTS (SELECT 1 FROM sessions s "
|
||||
"WHERE s.session_uid=e.session_uid AND s.evicted_at IS NOT NULL)"
|
||||
).fetchone()
|
||||
blob = self.db.execute("SELECT COALESCE(SUM(nbytes),0) b FROM blobs").fetchone()
|
||||
return int(row["b"]) + int(blob["b"])
|
||||
|
||||
def session_tier1_bytes(self, session_uid: str) -> int:
|
||||
ev = self.db.execute(
|
||||
"SELECT COALESCE(SUM(LENGTH(json)),0) b FROM events WHERE session_uid=?", (session_uid,)
|
||||
).fetchone()["b"]
|
||||
bl = self.db.execute(
|
||||
"SELECT COALESCE(SUM(nbytes),0) b FROM blobs WHERE session_uid=?", (session_uid,)
|
||||
).fetchone()["b"]
|
||||
return int(ev) + int(bl)
|
||||
|
||||
def tier2_usage_bytes(self) -> int:
|
||||
return int(self.db.execute("SELECT COALESCE(SUM(nbytes),0) b FROM digests").fetchone()["b"])
|
||||
|
||||
# ---- eviction ----------------------------------------------------------
|
||||
|
||||
def evict_raw(self, session_uid: str) -> int:
|
||||
"""Drop Tier 1 raw (events + blob files) for a session; keep digest + row.
|
||||
|
||||
Sets ``evicted_at``. Returns bytes freed. Safe to call on an
|
||||
already-evicted session (no-op-ish).
|
||||
"""
|
||||
freed = self.session_tier1_bytes(session_uid)
|
||||
for r in self.db.execute("SELECT path FROM blobs WHERE session_uid=?", (session_uid,)).fetchall():
|
||||
try:
|
||||
os.remove(r["path"])
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
self.db.execute("DELETE FROM blobs WHERE session_uid=?", (session_uid,))
|
||||
self.db.execute("DELETE FROM events WHERE session_uid=?", (session_uid,))
|
||||
self.db.execute("UPDATE sessions SET evicted_at=? WHERE session_uid=?", (_now(), session_uid))
|
||||
self.db.commit()
|
||||
return freed
|
||||
82
tests/test_store.py
Normal file
82
tests/test_store.py
Normal file
@@ -0,0 +1,82 @@
|
||||
"""Store tests (T03): idempotent ingest, usage accounting, eviction."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from session_memory.adapters.claude import Normalized # noqa: E402
|
||||
from session_memory.core.schema import Cost, Session, SessionEvent # noqa: E402
|
||||
from session_memory.core.store import Store # noqa: E402
|
||||
|
||||
|
||||
def _bundle(uid_native="s1", n_events=3):
|
||||
s = Session(
|
||||
session_uid=Session.make_uid("claude", uid_native),
|
||||
flavor="claude", native_session_id=uid_native,
|
||||
repo="agentic-resources", domain="helix_forge",
|
||||
cost=Cost(input_tokens=10),
|
||||
)
|
||||
events, blobs = [], {}
|
||||
for i in range(n_events):
|
||||
ref = f"blob://{uid_native}/{i}"
|
||||
events.append(SessionEvent(session_uid=s.session_uid, seq=i, kind="assistant_msg",
|
||||
payload_ref=ref, summary=f"msg {i}"))
|
||||
blobs[ref] = "x" * 100
|
||||
return Normalized(session=s, events=events, blobs=blobs)
|
||||
|
||||
|
||||
def _store(tmp_path):
|
||||
return Store(str(tmp_path / "mem.db"), str(tmp_path / "blobs"))
|
||||
|
||||
|
||||
def test_ingest_and_read_back(tmp_path):
|
||||
st = _store(tmp_path)
|
||||
b = _bundle("s1", 3)
|
||||
st.ingest(b)
|
||||
s = st.get_session(b.session.session_uid)
|
||||
assert s is not None and s.ingested_at is not None
|
||||
assert st.count_events(b.session.session_uid) == 3
|
||||
assert st.tier1_usage_bytes() > 0
|
||||
|
||||
|
||||
def test_ingest_is_idempotent(tmp_path):
|
||||
st = _store(tmp_path)
|
||||
b = _bundle("s1", 3)
|
||||
st.ingest(b)
|
||||
before = st.tier1_usage_bytes()
|
||||
st.ingest(b) # re-run same sweep
|
||||
assert st.count_events(b.session.session_uid) == 3 # no duplicate rows
|
||||
assert st.tier1_usage_bytes() == before # blobs upserted, not doubled
|
||||
|
||||
|
||||
def test_digest_sets_analyzed_and_tier2_bytes(tmp_path):
|
||||
st = _store(tmp_path)
|
||||
b = _bundle("s1", 2)
|
||||
st.ingest(b)
|
||||
assert st.get_session(b.session.session_uid).analyzed_at is None
|
||||
st.write_digest(b.session.session_uid, {"outcome": "success", "tools": {"Edit": 1}})
|
||||
assert st.get_session(b.session.session_uid).analyzed_at is not None
|
||||
assert st.tier2_usage_bytes() > 0
|
||||
assert st.get_digest(b.session.session_uid)["outcome"] == "success"
|
||||
|
||||
|
||||
def test_evict_raw_keeps_digest_drops_raw(tmp_path):
|
||||
st = _store(tmp_path)
|
||||
b = _bundle("s1", 3)
|
||||
st.ingest(b)
|
||||
st.write_digest(b.session.session_uid, {"outcome": "unknown"})
|
||||
blob_dir_files_before = sum(len(f) for _, _, f in os.walk(str(tmp_path / "blobs")))
|
||||
assert blob_dir_files_before > 0
|
||||
|
||||
freed = st.evict_raw(b.session.session_uid)
|
||||
assert freed > 0
|
||||
assert st.count_events(b.session.session_uid) == 0 # raw gone
|
||||
assert st.get_events(b.session.session_uid) == []
|
||||
assert st.get_session(b.session.session_uid).evicted_at is not None
|
||||
assert st.get_digest(b.session.session_uid) is not None # Tier 2 preserved
|
||||
# blob files removed from disk
|
||||
remaining = [f for _, _, fs in os.walk(str(tmp_path / "blobs")) for f in fs]
|
||||
assert remaining == []
|
||||
# evicted session no longer counts toward Tier 1 usage
|
||||
assert st.tier1_usage_bytes() == 0
|
||||
@@ -58,7 +58,7 @@ Codex/Grok work in Phase 0 (designed for, not built).
|
||||
|
||||
```task
|
||||
id: AGENTIC-WP-0002-T03
|
||||
status: progress
|
||||
status: done
|
||||
priority: high
|
||||
state_hub_task_id: "2387258e-ba6d-4a41-919e-f2f4e0822110"
|
||||
```
|
||||
@@ -72,7 +72,7 @@ Tier 1 (rows + blobs) and Tier 2, used by retention.
|
||||
|
||||
```task
|
||||
id: AGENTIC-WP-0002-T04
|
||||
status: todo
|
||||
status: progress
|
||||
priority: medium
|
||||
state_hub_task_id: "017d8e90-633a-49f2-b342-8690938798cd"
|
||||
```
|
||||
|
||||
Reference in New Issue
Block a user