generated from coulomb/repo-seed
- detect/signals.py: pure extractors over digests (retry storm, repeated errors, budget overrun vs corpus p90, abandoned, clean pass, recovery) - detect/cluster.py: deterministic clustering into candidate Patterns with evidence (sessions/repos/flavors/cost impact) + cross-flavor flagging - detect/__main__.py: python -m session_memory.detect, ranked report (cross-flavor first) + --json; persists candidates to Tier 2 patterns table - core/store.py: list_digests + save_patterns - tests for signals, cluster, detect entrypoint Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
316 lines
13 KiB
Python
316 lines
13 KiB
Python
"""Two-tier store (design §3, §8).
|
|
|
|
Tier 1 (bulky, evictable): ``Session`` + ``SessionEvent`` rows in SQLite, with
|
|
event bodies written out-of-line as files under a blob dir (referenced by
|
|
``payload_ref``). Tier 2 (compact, durable): per-session ``digest`` rows.
|
|
|
|
Writes are idempotent on ``(session_uid, seq)`` for events and on
|
|
``session_uid`` for sessions/digests, so sweeps are safely re-runnable. Eviction
|
|
(:meth:`evict_raw`) deletes Tier 1 rows + blobs but keeps the session row and its
|
|
Tier 2 digest — the invariant that makes budget-based retention non-lossy.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Optional
|
|
|
|
from .schema import Cost, Session, SessionEvent
|
|
|
|
_SAFE = re.compile(r"[^A-Za-z0-9._-]+")
|
|
|
|
|
|
def _now() -> str:
|
|
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
|
|
def _fingerprint(ev: SessionEvent, body: Optional[str]) -> str:
|
|
"""Stable content fingerprint, independent of seq/payload_ref, for dedup."""
|
|
h = hashlib.sha1()
|
|
parts = [ev.ts or "", ev.kind, ev.role or "", ev.tool or "", ev.summary or "",
|
|
ev.role or "", str(ev.is_sidechain)]
|
|
h.update("\x1f".join(parts).encode("utf-8"))
|
|
if body is not None:
|
|
h.update(b"\x1e")
|
|
h.update(body.encode("utf-8"))
|
|
return h.hexdigest()
|
|
|
|
|
|
class Store:
|
|
def __init__(self, db_path: str, blob_dir: str):
|
|
self.db_path = db_path
|
|
self.blob_dir = blob_dir
|
|
os.makedirs(os.path.dirname(db_path) or ".", exist_ok=True)
|
|
os.makedirs(blob_dir, exist_ok=True)
|
|
self.db = sqlite3.connect(db_path)
|
|
self.db.row_factory = sqlite3.Row
|
|
self.db.execute("PRAGMA journal_mode=WAL")
|
|
self._init_schema()
|
|
|
|
def close(self) -> None:
|
|
self.db.close()
|
|
|
|
def __enter__(self) -> "Store":
|
|
return self
|
|
|
|
def __exit__(self, *exc) -> None:
|
|
self.close()
|
|
|
|
def _init_schema(self) -> None:
|
|
self.db.executescript(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS sessions (
|
|
session_uid TEXT PRIMARY KEY,
|
|
json TEXT NOT NULL,
|
|
analyzed_at TEXT,
|
|
evicted_at TEXT
|
|
);
|
|
CREATE TABLE IF NOT EXISTS events (
|
|
session_uid TEXT NOT NULL,
|
|
seq INTEGER NOT NULL,
|
|
json TEXT NOT NULL,
|
|
PRIMARY KEY (session_uid, seq)
|
|
);
|
|
CREATE TABLE IF NOT EXISTS blobs (
|
|
ref TEXT PRIMARY KEY,
|
|
session_uid TEXT NOT NULL,
|
|
path TEXT NOT NULL,
|
|
nbytes INTEGER NOT NULL
|
|
);
|
|
CREATE TABLE IF NOT EXISTS digests (
|
|
session_uid TEXT PRIMARY KEY,
|
|
json TEXT NOT NULL,
|
|
nbytes INTEGER NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS ix_events_uid ON events(session_uid);
|
|
CREATE INDEX IF NOT EXISTS ix_blobs_uid ON blobs(session_uid);
|
|
"""
|
|
)
|
|
self.db.commit()
|
|
|
|
# ---- Tier 1 writes -----------------------------------------------------
|
|
|
|
def upsert_session(self, s: Session) -> None:
|
|
self.db.execute(
|
|
"INSERT INTO sessions(session_uid, json, analyzed_at, evicted_at) "
|
|
"VALUES(?,?,?,?) ON CONFLICT(session_uid) DO UPDATE SET "
|
|
"json=excluded.json, analyzed_at=excluded.analyzed_at, evicted_at=excluded.evicted_at",
|
|
(s.session_uid, s.to_json(), s.analyzed_at, s.evicted_at),
|
|
)
|
|
self.db.commit()
|
|
|
|
def upsert_events(self, events: list[SessionEvent]) -> int:
|
|
rows = [(e.session_uid, e.seq, e.to_json()) for e in events]
|
|
self.db.executemany(
|
|
"INSERT INTO events(session_uid, seq, json) VALUES(?,?,?) "
|
|
"ON CONFLICT(session_uid, seq) DO UPDATE SET json=excluded.json",
|
|
rows,
|
|
)
|
|
self.db.commit()
|
|
return len(rows)
|
|
|
|
def write_blobs(self, session_uid: str, blobs: dict[str, str]) -> int:
|
|
"""Write event bodies as files; record path + size. Returns bytes written."""
|
|
total = 0
|
|
sub = os.path.join(self.blob_dir, _SAFE.sub("_", session_uid))
|
|
os.makedirs(sub, exist_ok=True)
|
|
for ref, body in blobs.items():
|
|
data = body.encode("utf-8")
|
|
fname = _SAFE.sub("_", ref) + ".txt"
|
|
path = os.path.join(sub, fname)
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
f.write(body)
|
|
self.db.execute(
|
|
"INSERT INTO blobs(ref, session_uid, path, nbytes) VALUES(?,?,?,?) "
|
|
"ON CONFLICT(ref) DO UPDATE SET path=excluded.path, nbytes=excluded.nbytes",
|
|
(ref, session_uid, path, len(data)),
|
|
)
|
|
total += len(data)
|
|
self.db.commit()
|
|
return total
|
|
|
|
def ingest(self, bundle) -> int:
|
|
"""Persist a Normalized bundle, merging into any existing session.
|
|
|
|
Multiple files can map to one ``session_uid`` (Claude resume/sidechains;
|
|
Grok multi-file dirs). Events are de-duplicated by content fingerprint and
|
|
genuinely-new events are appended with offset ``seq`` (design OQ6 / T03).
|
|
Returns the number of new events written. Idempotent: re-ingesting the
|
|
same bundle adds nothing.
|
|
"""
|
|
s = bundle.session
|
|
existing = self.get_session(s.session_uid)
|
|
if existing is None:
|
|
if s.ingested_at is None:
|
|
s.ingested_at = _now()
|
|
self.upsert_session(s)
|
|
# known fingerprints + current max seq for this session
|
|
seen = self._event_fingerprints(s.session_uid)
|
|
next_seq = self._max_seq(s.session_uid) + 1
|
|
|
|
new_events: list[SessionEvent] = []
|
|
new_blobs: dict[str, str] = {}
|
|
old_to_new: dict[int, int] = {}
|
|
for ev in bundle.events:
|
|
body = bundle.blobs.get(ev.payload_ref) if ev.payload_ref else None
|
|
fp = _fingerprint(ev, body)
|
|
if fp in seen:
|
|
continue # already stored (prior file or prior sweep)
|
|
new_seq = next_seq
|
|
next_seq += 1
|
|
old_to_new[ev.seq] = new_seq
|
|
# remap parent within this bundle; cross-file parents become None
|
|
parent = old_to_new.get(ev.parent_seq) if ev.parent_seq is not None else None
|
|
ref = None
|
|
if body is not None:
|
|
ref = f"blob://{s.session_uid}/{new_seq}"
|
|
new_blobs[ref] = body
|
|
merged = SessionEvent(
|
|
session_uid=s.session_uid, seq=new_seq, parent_seq=parent, ts=ev.ts,
|
|
kind=ev.kind, role=ev.role, tool=ev.tool, summary=ev.summary,
|
|
payload_ref=ref, tokens=ev.tokens, is_sidechain=ev.is_sidechain,
|
|
)
|
|
new_events.append(merged)
|
|
seen.add(fp)
|
|
|
|
if new_events:
|
|
self.upsert_events(new_events)
|
|
self.write_blobs(s.session_uid, new_blobs)
|
|
return len(new_events)
|
|
|
|
def _max_seq(self, session_uid: str) -> int:
|
|
row = self.db.execute(
|
|
"SELECT COALESCE(MAX(seq), -1) m FROM events WHERE session_uid=?", (session_uid,)
|
|
).fetchone()
|
|
return int(row["m"])
|
|
|
|
def _event_fingerprints(self, session_uid: str) -> set[str]:
|
|
fps: set[str] = set()
|
|
for e in self.get_events(session_uid):
|
|
body = None
|
|
if e.payload_ref:
|
|
r = self.db.execute("SELECT path FROM blobs WHERE ref=?", (e.payload_ref,)).fetchone()
|
|
if r:
|
|
try:
|
|
with open(r["path"], "r", encoding="utf-8") as f:
|
|
body = f.read()
|
|
except OSError:
|
|
body = None
|
|
fps.add(_fingerprint(e, body))
|
|
return fps
|
|
|
|
# ---- Tier 2 (digest) ---------------------------------------------------
|
|
|
|
def write_digest(self, session_uid: str, digest: dict[str, Any], analyzed_at: Optional[str] = None) -> None:
|
|
payload = json.dumps(digest, sort_keys=True)
|
|
self.db.execute(
|
|
"INSERT INTO digests(session_uid, json, nbytes) VALUES(?,?,?) "
|
|
"ON CONFLICT(session_uid) DO UPDATE SET json=excluded.json, nbytes=excluded.nbytes",
|
|
(session_uid, payload, len(payload.encode("utf-8"))),
|
|
)
|
|
self.db.execute(
|
|
"UPDATE sessions SET analyzed_at=? WHERE session_uid=?",
|
|
(analyzed_at or _now(), session_uid),
|
|
)
|
|
self.db.commit()
|
|
|
|
def get_digest(self, session_uid: str) -> Optional[dict[str, Any]]:
|
|
row = self.db.execute("SELECT json FROM digests WHERE session_uid=?", (session_uid,)).fetchone()
|
|
return json.loads(row["json"]) if row else None
|
|
|
|
def list_digests(self) -> list[dict[str, Any]]:
|
|
return [json.loads(r["json"]) for r in self.db.execute("SELECT json FROM digests")]
|
|
|
|
def save_patterns(self, patterns: list[dict[str, Any]]) -> None:
|
|
"""Persist candidate patterns to a Tier 2 table (replace prior run)."""
|
|
self.db.execute(
|
|
"CREATE TABLE IF NOT EXISTS patterns ("
|
|
"key TEXT PRIMARY KEY, json TEXT NOT NULL, detected_at TEXT NOT NULL)"
|
|
)
|
|
self.db.execute("DELETE FROM patterns")
|
|
self.db.executemany(
|
|
"INSERT INTO patterns(key, json, detected_at) VALUES(?,?,?)",
|
|
[(p["key"], json.dumps(p, sort_keys=True), _now()) for p in patterns],
|
|
)
|
|
self.db.commit()
|
|
|
|
# ---- reads -------------------------------------------------------------
|
|
|
|
def get_session(self, session_uid: str) -> Optional[Session]:
|
|
row = self.db.execute(
|
|
"SELECT json, analyzed_at, evicted_at FROM sessions WHERE session_uid=?", (session_uid,)
|
|
).fetchone()
|
|
return self._row_to_session(row) if row else None
|
|
|
|
def list_sessions(self) -> list[Session]:
|
|
rows = self.db.execute("SELECT json, analyzed_at, evicted_at FROM sessions")
|
|
return [self._row_to_session(r) for r in rows]
|
|
|
|
@staticmethod
|
|
def _row_to_session(row) -> Session:
|
|
"""Rebuild a Session, treating the watermark columns as authoritative."""
|
|
s = Session.from_json(row["json"])
|
|
s.analyzed_at = row["analyzed_at"]
|
|
s.evicted_at = row["evicted_at"]
|
|
return s
|
|
|
|
def get_events(self, session_uid: str) -> list[SessionEvent]:
|
|
rows = self.db.execute(
|
|
"SELECT json FROM events WHERE session_uid=? ORDER BY seq", (session_uid,)
|
|
).fetchall()
|
|
return [SessionEvent.from_json(r["json"]) for r in rows]
|
|
|
|
def count_events(self, session_uid: str) -> int:
|
|
return self.db.execute(
|
|
"SELECT COUNT(*) c FROM events WHERE session_uid=?", (session_uid,)
|
|
).fetchone()["c"]
|
|
|
|
# ---- usage accounting (drives retention) -------------------------------
|
|
|
|
def tier1_usage_bytes(self) -> int:
|
|
"""Bytes held in Tier 1: event-row JSON + blob bytes for non-evicted sessions."""
|
|
row = self.db.execute(
|
|
"SELECT COALESCE(SUM(LENGTH(json)),0) b FROM events e "
|
|
"WHERE NOT EXISTS (SELECT 1 FROM sessions s "
|
|
"WHERE s.session_uid=e.session_uid AND s.evicted_at IS NOT NULL)"
|
|
).fetchone()
|
|
blob = self.db.execute("SELECT COALESCE(SUM(nbytes),0) b FROM blobs").fetchone()
|
|
return int(row["b"]) + int(blob["b"])
|
|
|
|
def session_tier1_bytes(self, session_uid: str) -> int:
|
|
ev = self.db.execute(
|
|
"SELECT COALESCE(SUM(LENGTH(json)),0) b FROM events WHERE session_uid=?", (session_uid,)
|
|
).fetchone()["b"]
|
|
bl = self.db.execute(
|
|
"SELECT COALESCE(SUM(nbytes),0) b FROM blobs WHERE session_uid=?", (session_uid,)
|
|
).fetchone()["b"]
|
|
return int(ev) + int(bl)
|
|
|
|
def tier2_usage_bytes(self) -> int:
|
|
return int(self.db.execute("SELECT COALESCE(SUM(nbytes),0) b FROM digests").fetchone()["b"])
|
|
|
|
# ---- eviction ----------------------------------------------------------
|
|
|
|
def evict_raw(self, session_uid: str) -> int:
|
|
"""Drop Tier 1 raw (events + blob files) for a session; keep digest + row.
|
|
|
|
Sets ``evicted_at``. Returns bytes freed. Safe to call on an
|
|
already-evicted session (no-op-ish).
|
|
"""
|
|
freed = self.session_tier1_bytes(session_uid)
|
|
for r in self.db.execute("SELECT path FROM blobs WHERE session_uid=?", (session_uid,)).fetchall():
|
|
try:
|
|
os.remove(r["path"])
|
|
except FileNotFoundError:
|
|
pass
|
|
self.db.execute("DELETE FROM blobs WHERE session_uid=?", (session_uid,))
|
|
self.db.execute("DELETE FROM events WHERE session_uid=?", (session_uid,))
|
|
self.db.execute("UPDATE sessions SET evicted_at=? WHERE session_uid=?", (_now(), session_uid))
|
|
self.db.commit()
|
|
return freed
|