generated from coulomb/repo-seed
session-memory Phase 0: budget-based retention sweep (T05)
- session_memory/core/retention.py: RetentionConfig + sweep() with backstop, budget (oldest-analyzed-first, never touches un-analyzed), and hard-cap overflow (analyze-now then reported last-resort data_loss); EvictionReport - tests/test_retention.py covers all four branches Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
144
session_memory/core/retention.py
Normal file
144
session_memory/core/retention.py
Normal file
@@ -0,0 +1,144 @@
|
||||
"""Budget-based retention sweep (design §5; T05).
|
||||
|
||||
Eviction is tied to the two conditions the design names — a session is dropped
|
||||
from Tier 1 once it has been *analyzed* (its digest is in Tier 2) **and** space is
|
||||
needed, with a max-age backstop. The invariant: raw bytes are never dropped
|
||||
before the Tier 2 digest exists, except the explicitly-reported hard-cap overflow
|
||||
path.
|
||||
|
||||
Order of passes per sweep:
|
||||
1. backstop — evict analyzed sessions older than ``raw_max_age_days``
|
||||
2. budget — while over ``raw_soft_cap_bytes``, evict oldest-analyzed first
|
||||
3. overflow — if still over ``raw_hard_cap_bytes`` and only un-analyzed bulk
|
||||
remains: analyze-now, retry budget; last resort evict oldest
|
||||
un-analyzed and emit a reported ``data_loss`` event.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from typing import Callable, Optional
|
||||
|
||||
from .schema import Session
|
||||
|
||||
|
||||
@dataclass
|
||||
class RetentionConfig:
|
||||
raw_soft_cap_bytes: int = 4 * 1024**3 # 4 GiB
|
||||
raw_hard_cap_bytes: int = 6 * 1024**3 # 6 GiB
|
||||
raw_max_age_days: int = 45
|
||||
distilled_cap_bytes: int = 1 * 1024**3 # 1 GiB (alert only, never auto-drop)
|
||||
|
||||
|
||||
@dataclass
|
||||
class EvictionReport:
|
||||
backstop_evicted: list[str] = field(default_factory=list)
|
||||
budget_evicted: list[str] = field(default_factory=list)
|
||||
overflow_analyzed: list[str] = field(default_factory=list)
|
||||
overflow_data_loss: list[str] = field(default_factory=list)
|
||||
bytes_freed: int = 0
|
||||
final_usage_bytes: int = 0
|
||||
over_hard_cap: bool = False
|
||||
tier2_over_cap: bool = False
|
||||
warnings: list[str] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def lost_data(self) -> bool:
|
||||
return bool(self.overflow_data_loss)
|
||||
|
||||
|
||||
def _parse_ts(ts: Optional[str]) -> Optional[datetime]:
|
||||
if not ts:
|
||||
return None
|
||||
try:
|
||||
return datetime.fromisoformat(ts.replace("Z", "+00:00"))
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def _age_days(s: Session, now: datetime) -> Optional[float]:
|
||||
ref = _parse_ts(s.ended_at) or _parse_ts(s.started_at) or _parse_ts(s.ingested_at)
|
||||
if ref is None:
|
||||
return None
|
||||
if ref.tzinfo is None:
|
||||
ref = ref.replace(tzinfo=timezone.utc)
|
||||
return (now - ref).total_seconds() / 86400.0
|
||||
|
||||
|
||||
def _sort_key(s: Session) -> str:
|
||||
# oldest-analyzed-first; fall back through timestamps
|
||||
return s.analyzed_at or s.ended_at or s.ingested_at or ""
|
||||
|
||||
|
||||
def sweep(store, config: RetentionConfig, *,
|
||||
analyze_fn: Optional[Callable[[object, str], object]] = None,
|
||||
now: Optional[datetime] = None) -> EvictionReport:
|
||||
"""Run one retention sweep against ``store``. Returns an EvictionReport.
|
||||
|
||||
``analyze_fn(store, session_uid)`` is used by the overflow path to make
|
||||
un-analyzed sessions evictable; pass ``digest.analyze``.
|
||||
"""
|
||||
now = now or datetime.now(timezone.utc)
|
||||
report = EvictionReport()
|
||||
|
||||
def live_sessions() -> list[Session]:
|
||||
return [s for s in store.list_sessions() if s.evicted_at is None]
|
||||
|
||||
# 1. backstop pass — analyzed + older than max age
|
||||
for s in sorted(live_sessions(), key=_sort_key):
|
||||
age = _age_days(s, now)
|
||||
if s.is_evictable and age is not None and age > config.raw_max_age_days:
|
||||
report.bytes_freed += store.evict_raw(s.session_uid)
|
||||
report.backstop_evicted.append(s.session_uid)
|
||||
|
||||
# 2. budget pass — evict oldest analyzed while over soft cap
|
||||
while store.tier1_usage_bytes() > config.raw_soft_cap_bytes:
|
||||
candidates = [s for s in live_sessions() if s.is_evictable]
|
||||
if not candidates:
|
||||
break # will not destroy un-analyzed data for space
|
||||
victim = min(candidates, key=_sort_key)
|
||||
report.bytes_freed += store.evict_raw(victim.session_uid)
|
||||
report.budget_evicted.append(victim.session_uid)
|
||||
|
||||
# 3. overflow path — only if still over HARD cap with un-analyzed bulk left
|
||||
if store.tier1_usage_bytes() > config.raw_hard_cap_bytes:
|
||||
# 3a. try to analyze now so those sessions become evictable
|
||||
if analyze_fn is not None:
|
||||
for s in sorted(live_sessions(), key=_sort_key):
|
||||
if not s.is_evictable:
|
||||
try:
|
||||
analyze_fn(store, s.session_uid)
|
||||
report.overflow_analyzed.append(s.session_uid)
|
||||
except Exception as e: # analysis may fail; keep going
|
||||
report.warnings.append(f"analyze failed for {s.session_uid}: {e}")
|
||||
# retry budget pass on the freshly-analyzed sessions
|
||||
while store.tier1_usage_bytes() > config.raw_soft_cap_bytes:
|
||||
candidates = [s for s in live_sessions() if s.is_evictable]
|
||||
if not candidates:
|
||||
break
|
||||
victim = min(candidates, key=_sort_key)
|
||||
report.bytes_freed += store.evict_raw(victim.session_uid)
|
||||
report.budget_evicted.append(victim.session_uid)
|
||||
|
||||
# 3b. last resort — evict oldest un-analyzed, REPORTED as data loss
|
||||
while store.tier1_usage_bytes() > config.raw_hard_cap_bytes:
|
||||
remaining = [s for s in live_sessions() if not s.is_evictable]
|
||||
if not remaining:
|
||||
break
|
||||
victim = min(remaining, key=_sort_key)
|
||||
report.bytes_freed += store.evict_raw(victim.session_uid)
|
||||
report.overflow_data_loss.append(victim.session_uid)
|
||||
report.warnings.append(
|
||||
f"data_loss: evicted un-analyzed {victim.session_uid} to stay under hard cap"
|
||||
)
|
||||
|
||||
usage = store.tier1_usage_bytes()
|
||||
report.final_usage_bytes = usage
|
||||
report.over_hard_cap = usage > config.raw_hard_cap_bytes
|
||||
report.tier2_over_cap = store.tier2_usage_bytes() > config.distilled_cap_bytes
|
||||
if report.tier2_over_cap:
|
||||
report.warnings.append(
|
||||
"tier2 distilled store over cap — flag for curation review (do not auto-drop)"
|
||||
)
|
||||
return report
|
||||
107
tests/test_retention.py
Normal file
107
tests/test_retention.py
Normal file
@@ -0,0 +1,107 @@
|
||||
"""Retention tests (T05): each pass of the budget-based eviction, with tiny caps."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from session_memory.adapters.claude import Normalized # noqa: E402
|
||||
from session_memory.core import digest as digest_mod # noqa: E402
|
||||
from session_memory.core.retention import RetentionConfig, sweep # noqa: E402
|
||||
from session_memory.core.schema import Cost, Session, SessionEvent # noqa: E402
|
||||
from session_memory.core.store import Store # noqa: E402
|
||||
|
||||
NOW = datetime(2026, 6, 6, tzinfo=timezone.utc)
|
||||
|
||||
|
||||
def _ingest(st, native, *, body_bytes=1000, ended=None, analyze=False):
|
||||
uid = Session.make_uid("claude", native)
|
||||
s = Session(session_uid=uid, flavor="claude", native_session_id=native,
|
||||
ended_at=(ended or NOW).strftime("%Y-%m-%dT%H:%M:%SZ"),
|
||||
ingested_at=NOW.strftime("%Y-%m-%dT%H:%M:%SZ"))
|
||||
ref = f"blob://{native}/0"
|
||||
events = [SessionEvent(session_uid=uid, seq=0, kind="assistant_msg", payload_ref=ref)]
|
||||
st.ingest(Normalized(session=s, events=events, blobs={ref: "x" * body_bytes}))
|
||||
if analyze:
|
||||
digest_mod.analyze(st, uid)
|
||||
return uid
|
||||
|
||||
|
||||
def _store(tmp_path):
|
||||
return Store(str(tmp_path / "m.db"), str(tmp_path / "blobs"))
|
||||
|
||||
|
||||
def test_backstop_evicts_old_analyzed_only(tmp_path):
|
||||
st = _store(tmp_path)
|
||||
old = _ingest(st, "old", ended=NOW - timedelta(days=60), analyze=True)
|
||||
young = _ingest(st, "young", ended=NOW - timedelta(days=1), analyze=True)
|
||||
unanalyzed_old = _ingest(st, "oldraw", ended=NOW - timedelta(days=60), analyze=False)
|
||||
|
||||
cfg = RetentionConfig(raw_soft_cap_bytes=10**12, raw_hard_cap_bytes=10**12, raw_max_age_days=45)
|
||||
rep = sweep(st, cfg, now=NOW)
|
||||
|
||||
assert old in rep.backstop_evicted
|
||||
assert young not in rep.backstop_evicted # too recent
|
||||
assert unanalyzed_old not in rep.backstop_evicted # not analyzed -> protected
|
||||
assert st.get_session(old).evicted_at is not None
|
||||
assert st.get_session(unanalyzed_old).evicted_at is None
|
||||
|
||||
|
||||
def test_budget_pass_evicts_oldest_analyzed_first(tmp_path):
|
||||
st = _store(tmp_path)
|
||||
a = _ingest(st, "a", body_bytes=2000, ended=NOW - timedelta(days=3), analyze=True)
|
||||
b = _ingest(st, "b", body_bytes=2000, ended=NOW - timedelta(days=2), analyze=True)
|
||||
c = _ingest(st, "c", body_bytes=2000, ended=NOW - timedelta(days=1), analyze=True)
|
||||
|
||||
# soft cap that forces evicting ~two of the three
|
||||
cfg = RetentionConfig(raw_soft_cap_bytes=2500, raw_hard_cap_bytes=10**9, raw_max_age_days=10**6)
|
||||
rep = sweep(st, cfg, now=NOW)
|
||||
|
||||
assert rep.budget_evicted[:2] == [a, b] # oldest-first
|
||||
assert st.get_session(c).evicted_at is None # newest survives
|
||||
assert st.tier1_usage_bytes() <= cfg.raw_soft_cap_bytes
|
||||
|
||||
|
||||
def test_budget_pass_never_touches_unanalyzed(tmp_path):
|
||||
st = _store(tmp_path)
|
||||
raw1 = _ingest(st, "r1", body_bytes=5000, analyze=False)
|
||||
raw2 = _ingest(st, "r2", body_bytes=5000, analyze=False)
|
||||
|
||||
cfg = RetentionConfig(raw_soft_cap_bytes=100, raw_hard_cap_bytes=10**9, raw_max_age_days=10**6)
|
||||
rep = sweep(st, cfg, now=NOW)
|
||||
|
||||
# over soft cap but nothing analyzed -> no eviction, no data loss
|
||||
assert rep.budget_evicted == []
|
||||
assert rep.lost_data is False
|
||||
assert st.get_session(raw1).evicted_at is None
|
||||
assert st.get_session(raw2).evicted_at is None
|
||||
assert st.tier1_usage_bytes() > cfg.raw_soft_cap_bytes # tolerated, not destroyed
|
||||
|
||||
|
||||
def test_overflow_analyzes_then_evicts_without_data_loss(tmp_path):
|
||||
st = _store(tmp_path)
|
||||
r1 = _ingest(st, "r1", body_bytes=4000, ended=NOW - timedelta(days=2), analyze=False)
|
||||
r2 = _ingest(st, "r2", body_bytes=4000, ended=NOW - timedelta(days=1), analyze=False)
|
||||
|
||||
cfg = RetentionConfig(raw_soft_cap_bytes=3000, raw_hard_cap_bytes=5000, raw_max_age_days=10**6)
|
||||
rep = sweep(st, cfg, now=NOW, analyze_fn=digest_mod.analyze)
|
||||
|
||||
# overflow path analyzed the un-analyzed sessions, then budget-evicted
|
||||
assert set(rep.overflow_analyzed) == {r1, r2}
|
||||
assert rep.lost_data is False # analysis avoided data loss
|
||||
assert st.tier1_usage_bytes() <= cfg.raw_soft_cap_bytes
|
||||
|
||||
|
||||
def test_overflow_last_resort_reports_data_loss(tmp_path):
|
||||
st = _store(tmp_path)
|
||||
# one un-analyzed session bigger than the hard cap, analysis disabled (no fn)
|
||||
big = _ingest(st, "big", body_bytes=20000, analyze=False)
|
||||
|
||||
cfg = RetentionConfig(raw_soft_cap_bytes=1000, raw_hard_cap_bytes=2000, raw_max_age_days=10**6)
|
||||
rep = sweep(st, cfg, now=NOW, analyze_fn=None)
|
||||
|
||||
assert big in rep.overflow_data_loss
|
||||
assert rep.lost_data is True
|
||||
assert any("data_loss" in w for w in rep.warnings)
|
||||
assert st.get_session(big).evicted_at is not None
|
||||
@@ -86,7 +86,7 @@ Signal extraction beyond the digest stays stubbed for the Detect phase.
|
||||
|
||||
```task
|
||||
id: AGENTIC-WP-0002-T05
|
||||
status: progress
|
||||
status: done
|
||||
priority: high
|
||||
state_hub_task_id: "89177c79-528e-4023-a7eb-67f8e0276ba9"
|
||||
```
|
||||
@@ -103,7 +103,7 @@ synthetic sessions and tiny caps.
|
||||
|
||||
```task
|
||||
id: AGENTIC-WP-0002-T06
|
||||
status: todo
|
||||
status: progress
|
||||
priority: medium
|
||||
state_hub_task_id: "a4b35c76-154d-4e99-b6d0-61cb6e47ecc0"
|
||||
```
|
||||
|
||||
Reference in New Issue
Block a user