"""Session-memory sweep entrypoint (design ยง7; T06). One sweep: discover (per enabled source) -> normalize (adapter) -> store -> digest -> retention-evict. Idempotent and re-runnable; intended to be triggered on the configured cadence (``/schedule`` daily/weekly) or by an agent hook. Usage: python -m session_memory.ingest [--config PATH] [--once] [--dry-run] """ from __future__ import annotations import argparse import glob import os import sys import tomllib from dataclasses import dataclass, field from typing import Any from .adapters import claude as claude_adapter from .core import digest as digest_mod from .core.cursor import Cursors from .core.retention import RetentionConfig, sweep as retention_sweep from .core.store import Store # adapter dispatch by source name _ADAPTERS = {"claude": claude_adapter.parse_session} @dataclass class SweepResult: discovered: int = 0 ingested: int = 0 skipped_unchanged: int = 0 analyzed: int = 0 warnings: list[str] = field(default_factory=list) retention: Any = None def _expand(p: str) -> str: return os.path.expanduser(p) def load_config(path: str) -> dict[str, Any]: with open(path, "rb") as f: return tomllib.load(f) def run_sweep(config: dict[str, Any], *, dry_run: bool = False) -> SweepResult: store_cfg = config.get("store", {}) ret_cfg = config.get("retention", {}) repo_map = config.get("repo_domain_map", {}) res = SweepResult() # In dry-run we only discover + parse: no store is created or written. store = None if dry_run else Store(_expand(store_cfg["db_path"]), _expand(store_cfg["blob_dir"])) cursors = Cursors(_expand(store_cfg["cursor"])) for name, src in config.get("sources", {}).items(): if not src.get("enabled"): continue parse = _ADAPTERS.get(name) if parse is None: res.warnings.append(f"no adapter for source {name!r} (Phase 1)") continue root = _expand(src["root"]) for fp in sorted(glob.glob(os.path.join(root, src["glob"]))): res.discovered += 1 if not cursors.is_changed(fp): res.skipped_unchanged += 1 continue try: bundle = parse(fp, repo_map) except Exception as e: # one bad file must not abort the sweep res.warnings.append(f"parse failed {fp}: {e}") continue if bundle is None: cursors.mark(fp) continue if not dry_run: store.ingest(bundle) digest_mod.analyze(store, bundle.session.session_uid) res.analyzed += 1 res.ingested += 1 cursors.mark(fp) if not dry_run and store is not None: cursors.save() rc = RetentionConfig( raw_soft_cap_bytes=int(ret_cfg.get("raw_soft_cap_bytes", RetentionConfig.raw_soft_cap_bytes)), raw_hard_cap_bytes=int(ret_cfg.get("raw_hard_cap_bytes", RetentionConfig.raw_hard_cap_bytes)), raw_max_age_days=int(ret_cfg.get("raw_max_age_days", RetentionConfig.raw_max_age_days)), distilled_cap_bytes=int(ret_cfg.get("distilled_cap_bytes", RetentionConfig.distilled_cap_bytes)), ) res.retention = retention_sweep(store, rc, analyze_fn=digest_mod.analyze) res.warnings.extend(res.retention.warnings) if store is not None: store.close() return res def main(argv: list[str] | None = None) -> int: here = os.path.dirname(os.path.abspath(__file__)) ap = argparse.ArgumentParser(description="Run one coding-session-memory sweep.") ap.add_argument("--config", default=os.path.join(here, "config.toml")) ap.add_argument("--dry-run", action="store_true", help="discover + parse, but do not write or evict") ap.add_argument("--once", action="store_true", help="(default) run a single sweep") args = ap.parse_args(argv) config = load_config(args.config) res = run_sweep(config, dry_run=args.dry_run) print(f"discovered={res.discovered} ingested={res.ingested} " f"skipped_unchanged={res.skipped_unchanged} analyzed={res.analyzed}") if res.retention is not None: r = res.retention print(f"retention: freed={r.bytes_freed}B final_usage={r.final_usage_bytes}B " f"backstop={len(r.backstop_evicted)} budget={len(r.budget_evicted)} " f"overflow_analyzed={len(r.overflow_analyzed)} data_loss={len(r.overflow_data_loss)}") for w in res.warnings: print(f" WARN: {w}", file=sys.stderr) return 0 if __name__ == "__main__": raise SystemExit(main())