generated from coulomb/repo-seed
- session_memory/core/retention.py: RetentionConfig + sweep() with backstop, budget (oldest-analyzed-first, never touches un-analyzed), and hard-cap overflow (analyze-now then reported last-resort data_loss); EvictionReport - tests/test_retention.py covers all four branches Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
145 lines
5.9 KiB
Python
145 lines
5.9 KiB
Python
"""Budget-based retention sweep (design §5; T05).
|
|
|
|
Eviction is tied to the two conditions the design names — a session is dropped
|
|
from Tier 1 once it has been *analyzed* (its digest is in Tier 2) **and** space is
|
|
needed, with a max-age backstop. The invariant: raw bytes are never dropped
|
|
before the Tier 2 digest exists, except the explicitly-reported hard-cap overflow
|
|
path.
|
|
|
|
Order of passes per sweep:
|
|
1. backstop — evict analyzed sessions older than ``raw_max_age_days``
|
|
2. budget — while over ``raw_soft_cap_bytes``, evict oldest-analyzed first
|
|
3. overflow — if still over ``raw_hard_cap_bytes`` and only un-analyzed bulk
|
|
remains: analyze-now, retry budget; last resort evict oldest
|
|
un-analyzed and emit a reported ``data_loss`` event.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from typing import Callable, Optional
|
|
|
|
from .schema import Session
|
|
|
|
|
|
@dataclass
|
|
class RetentionConfig:
|
|
raw_soft_cap_bytes: int = 4 * 1024**3 # 4 GiB
|
|
raw_hard_cap_bytes: int = 6 * 1024**3 # 6 GiB
|
|
raw_max_age_days: int = 45
|
|
distilled_cap_bytes: int = 1 * 1024**3 # 1 GiB (alert only, never auto-drop)
|
|
|
|
|
|
@dataclass
|
|
class EvictionReport:
|
|
backstop_evicted: list[str] = field(default_factory=list)
|
|
budget_evicted: list[str] = field(default_factory=list)
|
|
overflow_analyzed: list[str] = field(default_factory=list)
|
|
overflow_data_loss: list[str] = field(default_factory=list)
|
|
bytes_freed: int = 0
|
|
final_usage_bytes: int = 0
|
|
over_hard_cap: bool = False
|
|
tier2_over_cap: bool = False
|
|
warnings: list[str] = field(default_factory=list)
|
|
|
|
@property
|
|
def lost_data(self) -> bool:
|
|
return bool(self.overflow_data_loss)
|
|
|
|
|
|
def _parse_ts(ts: Optional[str]) -> Optional[datetime]:
|
|
if not ts:
|
|
return None
|
|
try:
|
|
return datetime.fromisoformat(ts.replace("Z", "+00:00"))
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _age_days(s: Session, now: datetime) -> Optional[float]:
|
|
ref = _parse_ts(s.ended_at) or _parse_ts(s.started_at) or _parse_ts(s.ingested_at)
|
|
if ref is None:
|
|
return None
|
|
if ref.tzinfo is None:
|
|
ref = ref.replace(tzinfo=timezone.utc)
|
|
return (now - ref).total_seconds() / 86400.0
|
|
|
|
|
|
def _sort_key(s: Session) -> str:
|
|
# oldest-analyzed-first; fall back through timestamps
|
|
return s.analyzed_at or s.ended_at or s.ingested_at or ""
|
|
|
|
|
|
def sweep(store, config: RetentionConfig, *,
|
|
analyze_fn: Optional[Callable[[object, str], object]] = None,
|
|
now: Optional[datetime] = None) -> EvictionReport:
|
|
"""Run one retention sweep against ``store``. Returns an EvictionReport.
|
|
|
|
``analyze_fn(store, session_uid)`` is used by the overflow path to make
|
|
un-analyzed sessions evictable; pass ``digest.analyze``.
|
|
"""
|
|
now = now or datetime.now(timezone.utc)
|
|
report = EvictionReport()
|
|
|
|
def live_sessions() -> list[Session]:
|
|
return [s for s in store.list_sessions() if s.evicted_at is None]
|
|
|
|
# 1. backstop pass — analyzed + older than max age
|
|
for s in sorted(live_sessions(), key=_sort_key):
|
|
age = _age_days(s, now)
|
|
if s.is_evictable and age is not None and age > config.raw_max_age_days:
|
|
report.bytes_freed += store.evict_raw(s.session_uid)
|
|
report.backstop_evicted.append(s.session_uid)
|
|
|
|
# 2. budget pass — evict oldest analyzed while over soft cap
|
|
while store.tier1_usage_bytes() > config.raw_soft_cap_bytes:
|
|
candidates = [s for s in live_sessions() if s.is_evictable]
|
|
if not candidates:
|
|
break # will not destroy un-analyzed data for space
|
|
victim = min(candidates, key=_sort_key)
|
|
report.bytes_freed += store.evict_raw(victim.session_uid)
|
|
report.budget_evicted.append(victim.session_uid)
|
|
|
|
# 3. overflow path — only if still over HARD cap with un-analyzed bulk left
|
|
if store.tier1_usage_bytes() > config.raw_hard_cap_bytes:
|
|
# 3a. try to analyze now so those sessions become evictable
|
|
if analyze_fn is not None:
|
|
for s in sorted(live_sessions(), key=_sort_key):
|
|
if not s.is_evictable:
|
|
try:
|
|
analyze_fn(store, s.session_uid)
|
|
report.overflow_analyzed.append(s.session_uid)
|
|
except Exception as e: # analysis may fail; keep going
|
|
report.warnings.append(f"analyze failed for {s.session_uid}: {e}")
|
|
# retry budget pass on the freshly-analyzed sessions
|
|
while store.tier1_usage_bytes() > config.raw_soft_cap_bytes:
|
|
candidates = [s for s in live_sessions() if s.is_evictable]
|
|
if not candidates:
|
|
break
|
|
victim = min(candidates, key=_sort_key)
|
|
report.bytes_freed += store.evict_raw(victim.session_uid)
|
|
report.budget_evicted.append(victim.session_uid)
|
|
|
|
# 3b. last resort — evict oldest un-analyzed, REPORTED as data loss
|
|
while store.tier1_usage_bytes() > config.raw_hard_cap_bytes:
|
|
remaining = [s for s in live_sessions() if not s.is_evictable]
|
|
if not remaining:
|
|
break
|
|
victim = min(remaining, key=_sort_key)
|
|
report.bytes_freed += store.evict_raw(victim.session_uid)
|
|
report.overflow_data_loss.append(victim.session_uid)
|
|
report.warnings.append(
|
|
f"data_loss: evicted un-analyzed {victim.session_uid} to stay under hard cap"
|
|
)
|
|
|
|
usage = store.tier1_usage_bytes()
|
|
report.final_usage_bytes = usage
|
|
report.over_hard_cap = usage > config.raw_hard_cap_bytes
|
|
report.tier2_over_cap = store.tier2_usage_bytes() > config.distilled_cap_bytes
|
|
if report.tier2_over_cap:
|
|
report.warnings.append(
|
|
"tier2 distilled store over cap — flag for curation review (do not auto-drop)"
|
|
)
|
|
return report
|