From 451fb8f1f36d7f45843285a5af9f1a93210eeeda Mon Sep 17 00:00:00 2001 From: tegwick Date: Sat, 6 Jun 2026 21:37:40 +0200 Subject: [PATCH] session-memory Phase 0: budget-based retention sweep (T05) - session_memory/core/retention.py: RetentionConfig + sweep() with backstop, budget (oldest-analyzed-first, never touches un-analyzed), and hard-cap overflow (analyze-now then reported last-resort data_loss); EvictionReport - tests/test_retention.py covers all four branches Co-Authored-By: Claude Opus 4.8 --- session_memory/core/retention.py | 144 ++++++++++++++++++ tests/test_retention.py | 107 +++++++++++++ .../AGENTIC-WP-0002-session-memory-phase0.md | 4 +- 3 files changed, 253 insertions(+), 2 deletions(-) create mode 100644 session_memory/core/retention.py create mode 100644 tests/test_retention.py diff --git a/session_memory/core/retention.py b/session_memory/core/retention.py new file mode 100644 index 0000000..17cd6ef --- /dev/null +++ b/session_memory/core/retention.py @@ -0,0 +1,144 @@ +"""Budget-based retention sweep (design §5; T05). + +Eviction is tied to the two conditions the design names — a session is dropped +from Tier 1 once it has been *analyzed* (its digest is in Tier 2) **and** space is +needed, with a max-age backstop. The invariant: raw bytes are never dropped +before the Tier 2 digest exists, except the explicitly-reported hard-cap overflow +path. + +Order of passes per sweep: + 1. backstop — evict analyzed sessions older than ``raw_max_age_days`` + 2. budget — while over ``raw_soft_cap_bytes``, evict oldest-analyzed first + 3. overflow — if still over ``raw_hard_cap_bytes`` and only un-analyzed bulk + remains: analyze-now, retry budget; last resort evict oldest + un-analyzed and emit a reported ``data_loss`` event. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Callable, Optional + +from .schema import Session + + +@dataclass +class RetentionConfig: + raw_soft_cap_bytes: int = 4 * 1024**3 # 4 GiB + raw_hard_cap_bytes: int = 6 * 1024**3 # 6 GiB + raw_max_age_days: int = 45 + distilled_cap_bytes: int = 1 * 1024**3 # 1 GiB (alert only, never auto-drop) + + +@dataclass +class EvictionReport: + backstop_evicted: list[str] = field(default_factory=list) + budget_evicted: list[str] = field(default_factory=list) + overflow_analyzed: list[str] = field(default_factory=list) + overflow_data_loss: list[str] = field(default_factory=list) + bytes_freed: int = 0 + final_usage_bytes: int = 0 + over_hard_cap: bool = False + tier2_over_cap: bool = False + warnings: list[str] = field(default_factory=list) + + @property + def lost_data(self) -> bool: + return bool(self.overflow_data_loss) + + +def _parse_ts(ts: Optional[str]) -> Optional[datetime]: + if not ts: + return None + try: + return datetime.fromisoformat(ts.replace("Z", "+00:00")) + except ValueError: + return None + + +def _age_days(s: Session, now: datetime) -> Optional[float]: + ref = _parse_ts(s.ended_at) or _parse_ts(s.started_at) or _parse_ts(s.ingested_at) + if ref is None: + return None + if ref.tzinfo is None: + ref = ref.replace(tzinfo=timezone.utc) + return (now - ref).total_seconds() / 86400.0 + + +def _sort_key(s: Session) -> str: + # oldest-analyzed-first; fall back through timestamps + return s.analyzed_at or s.ended_at or s.ingested_at or "" + + +def sweep(store, config: RetentionConfig, *, + analyze_fn: Optional[Callable[[object, str], object]] = None, + now: Optional[datetime] = None) -> EvictionReport: + """Run one retention sweep against ``store``. Returns an EvictionReport. + + ``analyze_fn(store, session_uid)`` is used by the overflow path to make + un-analyzed sessions evictable; pass ``digest.analyze``. + """ + now = now or datetime.now(timezone.utc) + report = EvictionReport() + + def live_sessions() -> list[Session]: + return [s for s in store.list_sessions() if s.evicted_at is None] + + # 1. backstop pass — analyzed + older than max age + for s in sorted(live_sessions(), key=_sort_key): + age = _age_days(s, now) + if s.is_evictable and age is not None and age > config.raw_max_age_days: + report.bytes_freed += store.evict_raw(s.session_uid) + report.backstop_evicted.append(s.session_uid) + + # 2. budget pass — evict oldest analyzed while over soft cap + while store.tier1_usage_bytes() > config.raw_soft_cap_bytes: + candidates = [s for s in live_sessions() if s.is_evictable] + if not candidates: + break # will not destroy un-analyzed data for space + victim = min(candidates, key=_sort_key) + report.bytes_freed += store.evict_raw(victim.session_uid) + report.budget_evicted.append(victim.session_uid) + + # 3. overflow path — only if still over HARD cap with un-analyzed bulk left + if store.tier1_usage_bytes() > config.raw_hard_cap_bytes: + # 3a. try to analyze now so those sessions become evictable + if analyze_fn is not None: + for s in sorted(live_sessions(), key=_sort_key): + if not s.is_evictable: + try: + analyze_fn(store, s.session_uid) + report.overflow_analyzed.append(s.session_uid) + except Exception as e: # analysis may fail; keep going + report.warnings.append(f"analyze failed for {s.session_uid}: {e}") + # retry budget pass on the freshly-analyzed sessions + while store.tier1_usage_bytes() > config.raw_soft_cap_bytes: + candidates = [s for s in live_sessions() if s.is_evictable] + if not candidates: + break + victim = min(candidates, key=_sort_key) + report.bytes_freed += store.evict_raw(victim.session_uid) + report.budget_evicted.append(victim.session_uid) + + # 3b. last resort — evict oldest un-analyzed, REPORTED as data loss + while store.tier1_usage_bytes() > config.raw_hard_cap_bytes: + remaining = [s for s in live_sessions() if not s.is_evictable] + if not remaining: + break + victim = min(remaining, key=_sort_key) + report.bytes_freed += store.evict_raw(victim.session_uid) + report.overflow_data_loss.append(victim.session_uid) + report.warnings.append( + f"data_loss: evicted un-analyzed {victim.session_uid} to stay under hard cap" + ) + + usage = store.tier1_usage_bytes() + report.final_usage_bytes = usage + report.over_hard_cap = usage > config.raw_hard_cap_bytes + report.tier2_over_cap = store.tier2_usage_bytes() > config.distilled_cap_bytes + if report.tier2_over_cap: + report.warnings.append( + "tier2 distilled store over cap — flag for curation review (do not auto-drop)" + ) + return report diff --git a/tests/test_retention.py b/tests/test_retention.py new file mode 100644 index 0000000..3f57177 --- /dev/null +++ b/tests/test_retention.py @@ -0,0 +1,107 @@ +"""Retention tests (T05): each pass of the budget-based eviction, with tiny caps.""" + +import os +import sys +from datetime import datetime, timedelta, timezone + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from session_memory.adapters.claude import Normalized # noqa: E402 +from session_memory.core import digest as digest_mod # noqa: E402 +from session_memory.core.retention import RetentionConfig, sweep # noqa: E402 +from session_memory.core.schema import Cost, Session, SessionEvent # noqa: E402 +from session_memory.core.store import Store # noqa: E402 + +NOW = datetime(2026, 6, 6, tzinfo=timezone.utc) + + +def _ingest(st, native, *, body_bytes=1000, ended=None, analyze=False): + uid = Session.make_uid("claude", native) + s = Session(session_uid=uid, flavor="claude", native_session_id=native, + ended_at=(ended or NOW).strftime("%Y-%m-%dT%H:%M:%SZ"), + ingested_at=NOW.strftime("%Y-%m-%dT%H:%M:%SZ")) + ref = f"blob://{native}/0" + events = [SessionEvent(session_uid=uid, seq=0, kind="assistant_msg", payload_ref=ref)] + st.ingest(Normalized(session=s, events=events, blobs={ref: "x" * body_bytes})) + if analyze: + digest_mod.analyze(st, uid) + return uid + + +def _store(tmp_path): + return Store(str(tmp_path / "m.db"), str(tmp_path / "blobs")) + + +def test_backstop_evicts_old_analyzed_only(tmp_path): + st = _store(tmp_path) + old = _ingest(st, "old", ended=NOW - timedelta(days=60), analyze=True) + young = _ingest(st, "young", ended=NOW - timedelta(days=1), analyze=True) + unanalyzed_old = _ingest(st, "oldraw", ended=NOW - timedelta(days=60), analyze=False) + + cfg = RetentionConfig(raw_soft_cap_bytes=10**12, raw_hard_cap_bytes=10**12, raw_max_age_days=45) + rep = sweep(st, cfg, now=NOW) + + assert old in rep.backstop_evicted + assert young not in rep.backstop_evicted # too recent + assert unanalyzed_old not in rep.backstop_evicted # not analyzed -> protected + assert st.get_session(old).evicted_at is not None + assert st.get_session(unanalyzed_old).evicted_at is None + + +def test_budget_pass_evicts_oldest_analyzed_first(tmp_path): + st = _store(tmp_path) + a = _ingest(st, "a", body_bytes=2000, ended=NOW - timedelta(days=3), analyze=True) + b = _ingest(st, "b", body_bytes=2000, ended=NOW - timedelta(days=2), analyze=True) + c = _ingest(st, "c", body_bytes=2000, ended=NOW - timedelta(days=1), analyze=True) + + # soft cap that forces evicting ~two of the three + cfg = RetentionConfig(raw_soft_cap_bytes=2500, raw_hard_cap_bytes=10**9, raw_max_age_days=10**6) + rep = sweep(st, cfg, now=NOW) + + assert rep.budget_evicted[:2] == [a, b] # oldest-first + assert st.get_session(c).evicted_at is None # newest survives + assert st.tier1_usage_bytes() <= cfg.raw_soft_cap_bytes + + +def test_budget_pass_never_touches_unanalyzed(tmp_path): + st = _store(tmp_path) + raw1 = _ingest(st, "r1", body_bytes=5000, analyze=False) + raw2 = _ingest(st, "r2", body_bytes=5000, analyze=False) + + cfg = RetentionConfig(raw_soft_cap_bytes=100, raw_hard_cap_bytes=10**9, raw_max_age_days=10**6) + rep = sweep(st, cfg, now=NOW) + + # over soft cap but nothing analyzed -> no eviction, no data loss + assert rep.budget_evicted == [] + assert rep.lost_data is False + assert st.get_session(raw1).evicted_at is None + assert st.get_session(raw2).evicted_at is None + assert st.tier1_usage_bytes() > cfg.raw_soft_cap_bytes # tolerated, not destroyed + + +def test_overflow_analyzes_then_evicts_without_data_loss(tmp_path): + st = _store(tmp_path) + r1 = _ingest(st, "r1", body_bytes=4000, ended=NOW - timedelta(days=2), analyze=False) + r2 = _ingest(st, "r2", body_bytes=4000, ended=NOW - timedelta(days=1), analyze=False) + + cfg = RetentionConfig(raw_soft_cap_bytes=3000, raw_hard_cap_bytes=5000, raw_max_age_days=10**6) + rep = sweep(st, cfg, now=NOW, analyze_fn=digest_mod.analyze) + + # overflow path analyzed the un-analyzed sessions, then budget-evicted + assert set(rep.overflow_analyzed) == {r1, r2} + assert rep.lost_data is False # analysis avoided data loss + assert st.tier1_usage_bytes() <= cfg.raw_soft_cap_bytes + + +def test_overflow_last_resort_reports_data_loss(tmp_path): + st = _store(tmp_path) + # one un-analyzed session bigger than the hard cap, analysis disabled (no fn) + big = _ingest(st, "big", body_bytes=20000, analyze=False) + + cfg = RetentionConfig(raw_soft_cap_bytes=1000, raw_hard_cap_bytes=2000, raw_max_age_days=10**6) + rep = sweep(st, cfg, now=NOW, analyze_fn=None) + + assert big in rep.overflow_data_loss + assert rep.lost_data is True + assert any("data_loss" in w for w in rep.warnings) + assert st.get_session(big).evicted_at is not None diff --git a/workplans/AGENTIC-WP-0002-session-memory-phase0.md b/workplans/AGENTIC-WP-0002-session-memory-phase0.md index 6f1ba99..57d69bd 100644 --- a/workplans/AGENTIC-WP-0002-session-memory-phase0.md +++ b/workplans/AGENTIC-WP-0002-session-memory-phase0.md @@ -86,7 +86,7 @@ Signal extraction beyond the digest stays stubbed for the Detect phase. ```task id: AGENTIC-WP-0002-T05 -status: progress +status: done priority: high state_hub_task_id: "89177c79-528e-4023-a7eb-67f8e0276ba9" ``` @@ -103,7 +103,7 @@ synthetic sessions and tiny caps. ```task id: AGENTIC-WP-0002-T06 -status: todo +status: progress priority: medium state_hub_task_id: "a4b35c76-154d-4e99-b6d0-61cb6e47ecc0" ```