diff --git a/session_memory/config.toml b/session_memory/config.toml new file mode 100644 index 0000000..a7f3be4 --- /dev/null +++ b/session_memory/config.toml @@ -0,0 +1,39 @@ +# Coding Session Memory — configuration (design §5.1, §8). +# Paths support ~ expansion. Edit caps to taste; see docs/DESIGN-session-memory.md. + +[store] +# Local store lives under the repo by default (gitignored). +db_path = "session_memory/.store/mem.db" +blob_dir = "session_memory/.store/blobs" +cursor = "session_memory/.store/cursors.json" + +[retention] +raw_soft_cap_bytes = 4294967296 # 4 GiB — begin evicting analyzed sessions above this +raw_hard_cap_bytes = 6442450944 # 6 GiB — absolute Tier 1 ceiling +raw_max_age_days = 45 # backstop: analyzed raw older than this is evictable +distilled_cap_bytes = 1073741824 # 1 GiB — Tier 2 ceiling (alert, never auto-drop) +cadence = "daily" # sweep trigger: daily | weekly | on-hook + +[sources.claude] +enabled = true +root = "~/.claude/projects" +# glob, relative to root; covers sessions and agent-* sidechains +glob = "*/*.jsonl" + +# Codex / Grok adapters land in Phase 1 (schemas confirmed in the design doc). +[sources.codex] +enabled = false +root = "~/.codex/sessions" +glob = "*/*/*/rollout-*.jsonl" + +[sources.grok] +enabled = false +root = "~/.grok/sessions" +glob = "*/*/chat_history.jsonl" + +# cwd basename -> domain slug. Used to tag sessions with their Custodian domain. +[repo_domain_map] +agentic-resources = "helix_forge" +the-custodian = "custodian" +state-hub = "custodian" +ops-bridge = "custodian" diff --git a/session_memory/core/cursor.py b/session_memory/core/cursor.py new file mode 100644 index 0000000..8be2b7e --- /dev/null +++ b/session_memory/core/cursor.py @@ -0,0 +1,49 @@ +"""Per-source ingest cursors (design §6; T06). + +Tracks ``(path -> size, mtime)`` so sweeps re-ingest only changed/grown files. +Persisted as a small JSON sidecar. Ingest itself is idempotent on +``(session_uid, seq)`` in the store, so the cursor is an optimization, not a +correctness requirement — a lost cursor just means a full (still-idempotent) +re-scan. +""" + +from __future__ import annotations + +import json +import os +from typing import Optional + + +class Cursors: + def __init__(self, path: str): + self.path = path + self._data: dict[str, dict] = {} + if os.path.exists(path): + try: + with open(path, "r", encoding="utf-8") as f: + self._data = json.load(f) + except (OSError, ValueError): + self._data = {} + + def is_changed(self, file_path: str) -> bool: + """True if the file is new or has changed size/mtime since last seen.""" + try: + stat = os.stat(file_path) + except OSError: + return False + prev = self._data.get(file_path) + return prev is None or prev.get("size") != stat.st_size or prev.get("mtime") != stat.st_mtime + + def mark(self, file_path: str) -> None: + try: + stat = os.stat(file_path) + except OSError: + return + self._data[file_path] = {"size": stat.st_size, "mtime": stat.st_mtime} + + def save(self) -> None: + os.makedirs(os.path.dirname(self.path) or ".", exist_ok=True) + tmp = self.path + ".tmp" + with open(tmp, "w", encoding="utf-8") as f: + json.dump(self._data, f) + os.replace(tmp, self.path) diff --git a/session_memory/ingest.py b/session_memory/ingest.py new file mode 100644 index 0000000..805acbe --- /dev/null +++ b/session_memory/ingest.py @@ -0,0 +1,128 @@ +"""Session-memory sweep entrypoint (design §7; T06). + +One sweep: discover (per enabled source) -> normalize (adapter) -> store -> +digest -> retention-evict. Idempotent and re-runnable; intended to be triggered +on the configured cadence (``/schedule`` daily/weekly) or by an agent hook. + +Usage: + python -m session_memory.ingest [--config PATH] [--once] [--dry-run] +""" + +from __future__ import annotations + +import argparse +import glob +import os +import sys +import tomllib +from dataclasses import dataclass, field +from typing import Any + +from .adapters import claude as claude_adapter +from .core import digest as digest_mod +from .core.cursor import Cursors +from .core.retention import RetentionConfig, sweep as retention_sweep +from .core.store import Store + +# adapter dispatch by source name +_ADAPTERS = {"claude": claude_adapter.parse_session} + + +@dataclass +class SweepResult: + discovered: int = 0 + ingested: int = 0 + skipped_unchanged: int = 0 + analyzed: int = 0 + warnings: list[str] = field(default_factory=list) + retention: Any = None + + +def _expand(p: str) -> str: + return os.path.expanduser(p) + + +def load_config(path: str) -> dict[str, Any]: + with open(path, "rb") as f: + return tomllib.load(f) + + +def run_sweep(config: dict[str, Any], *, dry_run: bool = False) -> SweepResult: + store_cfg = config.get("store", {}) + ret_cfg = config.get("retention", {}) + repo_map = config.get("repo_domain_map", {}) + res = SweepResult() + + # In dry-run we only discover + parse: no store is created or written. + store = None if dry_run else Store(_expand(store_cfg["db_path"]), _expand(store_cfg["blob_dir"])) + cursors = Cursors(_expand(store_cfg["cursor"])) + + for name, src in config.get("sources", {}).items(): + if not src.get("enabled"): + continue + parse = _ADAPTERS.get(name) + if parse is None: + res.warnings.append(f"no adapter for source {name!r} (Phase 1)") + continue + root = _expand(src["root"]) + for fp in sorted(glob.glob(os.path.join(root, src["glob"]))): + res.discovered += 1 + if not cursors.is_changed(fp): + res.skipped_unchanged += 1 + continue + try: + bundle = parse(fp, repo_map) + except Exception as e: # one bad file must not abort the sweep + res.warnings.append(f"parse failed {fp}: {e}") + continue + if bundle is None: + cursors.mark(fp) + continue + if not dry_run: + store.ingest(bundle) + digest_mod.analyze(store, bundle.session.session_uid) + res.analyzed += 1 + res.ingested += 1 + cursors.mark(fp) + + if not dry_run and store is not None: + cursors.save() + rc = RetentionConfig( + raw_soft_cap_bytes=int(ret_cfg.get("raw_soft_cap_bytes", RetentionConfig.raw_soft_cap_bytes)), + raw_hard_cap_bytes=int(ret_cfg.get("raw_hard_cap_bytes", RetentionConfig.raw_hard_cap_bytes)), + raw_max_age_days=int(ret_cfg.get("raw_max_age_days", RetentionConfig.raw_max_age_days)), + distilled_cap_bytes=int(ret_cfg.get("distilled_cap_bytes", RetentionConfig.distilled_cap_bytes)), + ) + res.retention = retention_sweep(store, rc, analyze_fn=digest_mod.analyze) + res.warnings.extend(res.retention.warnings) + + if store is not None: + store.close() + return res + + +def main(argv: list[str] | None = None) -> int: + here = os.path.dirname(os.path.abspath(__file__)) + ap = argparse.ArgumentParser(description="Run one coding-session-memory sweep.") + ap.add_argument("--config", default=os.path.join(here, "config.toml")) + ap.add_argument("--dry-run", action="store_true", help="discover + parse, but do not write or evict") + ap.add_argument("--once", action="store_true", help="(default) run a single sweep") + args = ap.parse_args(argv) + + config = load_config(args.config) + res = run_sweep(config, dry_run=args.dry_run) + + print(f"discovered={res.discovered} ingested={res.ingested} " + f"skipped_unchanged={res.skipped_unchanged} analyzed={res.analyzed}") + if res.retention is not None: + r = res.retention + print(f"retention: freed={r.bytes_freed}B final_usage={r.final_usage_bytes}B " + f"backstop={len(r.backstop_evicted)} budget={len(r.budget_evicted)} " + f"overflow_analyzed={len(r.overflow_analyzed)} data_loss={len(r.overflow_data_loss)}") + for w in res.warnings: + print(f" WARN: {w}", file=sys.stderr) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_ingest.py b/tests/test_ingest.py new file mode 100644 index 0000000..21dd4bc --- /dev/null +++ b/tests/test_ingest.py @@ -0,0 +1,81 @@ +"""Ingest sweep + cursor tests (T06).""" + +import json +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from session_memory.core.cursor import Cursors # noqa: E402 +from session_memory.ingest import run_sweep # noqa: E402 + + +def test_cursor_change_detection(tmp_path): + f = tmp_path / "a.jsonl" + f.write_text("{}\n") + cur = Cursors(str(tmp_path / "cur.json")) + assert cur.is_changed(str(f)) is True + cur.mark(str(f)) + assert cur.is_changed(str(f)) is False + f.write_text("{}\n{}\n") # grow + assert cur.is_changed(str(f)) is True + + +def _claude_session_file(dir_path, native): + os.makedirs(dir_path, exist_ok=True) + p = os.path.join(dir_path, f"{native}.jsonl") + recs = [ + {"type": "user", "uuid": "u1", "sessionId": native, + "timestamp": "2026-06-06T10:00:00Z", "cwd": "/home/worsch/agentic-resources", + "gitBranch": "main", "message": {"role": "user", "content": "hi"}}, + {"type": "assistant", "uuid": "a1", "parentUuid": "u1", "sessionId": native, + "timestamp": "2026-06-06T10:00:02Z", + "message": {"role": "assistant", "model": "claude-opus-4-8", + "usage": {"input_tokens": 5, "output_tokens": 2}, + "content": [{"type": "text", "text": "hello"}]}}, + ] + with open(p, "w", encoding="utf-8") as f: + for r in recs: + f.write(json.dumps(r) + "\n") + return p + + +def _config(tmp_path, projects_dir): + return { + "store": { + "db_path": str(tmp_path / ".store/mem.db"), + "blob_dir": str(tmp_path / ".store/blobs"), + "cursor": str(tmp_path / ".store/cursors.json"), + }, + "retention": {"raw_soft_cap_bytes": 10**12, "raw_hard_cap_bytes": 10**12, + "raw_max_age_days": 10**6, "distilled_cap_bytes": 10**12}, + "sources": {"claude": {"enabled": True, "root": str(projects_dir), "glob": "*/*.jsonl"}}, + "repo_domain_map": {"agentic-resources": "helix_forge"}, + } + + +def test_run_sweep_end_to_end(tmp_path): + projects = tmp_path / "projects" + _claude_session_file(str(projects / "-home-worsch-agentic-resources"), "sess-aaa") + cfg = _config(tmp_path, projects) + + res = run_sweep(cfg) + assert res.discovered == 1 + assert res.ingested == 1 + assert res.analyzed == 1 + assert res.retention is not None + + # re-run: cursor skips the unchanged file (idempotent, cheap) + res2 = run_sweep(cfg) + assert res2.skipped_unchanged == 1 + assert res2.ingested == 0 + + +def test_dry_run_writes_nothing(tmp_path): + projects = tmp_path / "projects" + _claude_session_file(str(projects / "-home-worsch-agentic-resources"), "sess-bbb") + cfg = _config(tmp_path, projects) + res = run_sweep(cfg, dry_run=True) + assert res.discovered == 1 and res.ingested == 1 + assert res.retention is None + assert not os.path.exists(cfg["store"]["db_path"]) # no store created diff --git a/workplans/AGENTIC-WP-0002-session-memory-phase0.md b/workplans/AGENTIC-WP-0002-session-memory-phase0.md index 57d69bd..267e668 100644 --- a/workplans/AGENTIC-WP-0002-session-memory-phase0.md +++ b/workplans/AGENTIC-WP-0002-session-memory-phase0.md @@ -103,7 +103,7 @@ synthetic sessions and tiny caps. ```task id: AGENTIC-WP-0002-T06 -status: progress +status: done priority: medium state_hub_task_id: "a4b35c76-154d-4e99-b6d0-61cb6e47ecc0" ``` @@ -118,7 +118,7 @@ intended `cadence` trigger (`/schedule` daily/weekly) in the repo docs. ```task id: AGENTIC-WP-0002-T07 -status: todo +status: progress priority: medium state_hub_task_id: "98d5cc7c-c285-4556-91a3-a85e0a2bb6df" ```