Compare commits

...

7 Commits

Author SHA1 Message Date
7c6f4358ee session-memory Phase 0: end-to-end verification + docs (T07)
- verified full sweep over 85 real local Claude transcripts: 63 sessions
  ingested+analyzed, eviction under tiny cap freed 26MB with zero data loss,
  digest-preservation invariant holds, idempotent re-run
- session_memory/README.md: usage, scheduling, retention knobs
- design doc: OQ4 resolved (median ~49KB/session), OQ6 (multi-file sessions)
- workplan AGENTIC-WP-0002 finished

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-06 21:44:46 +02:00
586ed90948 session-memory Phase 0: ingest cursor + sweep entrypoint + config (T06)
- session_memory/core/cursor.py: size/mtime change detection sidecar
- session_memory/config.toml: store paths, retention caps, per-source
  globs (claude on, codex/grok off for Phase 1), repo->domain map
- session_memory/ingest.py: discover->normalize->store->digest->evict;
  --dry-run creates/writes nothing; python -m session_memory.ingest
- tests/test_ingest.py; live dry-run parsed 84/85 real local sessions

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-06 21:41:59 +02:00
451fb8f1f3 session-memory Phase 0: budget-based retention sweep (T05)
- session_memory/core/retention.py: RetentionConfig + sweep() with backstop,
  budget (oldest-analyzed-first, never touches un-analyzed), and hard-cap
  overflow (analyze-now then reported last-resort data_loss); EvictionReport
- tests/test_retention.py covers all four branches

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-06 21:37:40 +02:00
abb888f3ef session-memory Phase 0: session digest + outcome heuristic (T04)
- session_memory/core/digest.py: build_digest (cost totals, kind/tool
  histograms, markers, snippets) + cross-flavor infer_outcome heuristic;
  analyze() promotes Tier1->Tier2 and sets analyzed_at (-> evictable)
- tests/test_digest.py

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-06 20:03:04 +02:00
29fc211a14 session-memory Phase 0: Tier1/Tier2 store (T03)
- session_memory/core/store.py: SQLite rows + blob-dir bodies, idempotent
  ingest on (session_uid,seq), Tier1/Tier2 usage accounting, evict_raw that
  drops raw but preserves the digest; watermark columns authoritative
- tests/test_store.py: ingest idempotency, accounting, eviction invariant

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-06 19:10:02 +02:00
1c29a94fa9 session-memory Phase 0: normalized schema (T01) + Claude adapter (T02)
- session_memory/core/schema.py: Session/SessionEvent/Cost dataclasses,
  flavor-prefixed uids, watermarks, kind/outcome validation (T01)
- session_memory/adapters/claude.py: JSONL -> Normalized bundle, turn DAG
  via uuid/parentUuid, kind mapping, cost from message.usage (T02)
- tests: schema round-trip + adapter (synthetic + real local session)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-06 19:06:10 +02:00
ffe191d44e Add Helix Forge PRD, session-memory design, and Phase 0 workplan
- docs/PRD-helix-forge.md: Capture→Detect→Curate→Distribute→Measure loop
- docs/DESIGN-session-memory.md: tiered store + budget-based eviction;
  verified session-log schemas for Claude/Codex/Grok
- workplans/AGENTIC-WP-0002: Phase 0 (registered with State Hub)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-06 19:00:30 +02:00
22 changed files with 2568 additions and 0 deletions

6
.gitignore vendored
View File

@@ -174,3 +174,9 @@ cython_debug/
# PyPI configuration file
.pypirc
# session-memory local store
session_memory/.store/
__pycache__/
*.pyc
.pytest_cache/

View File

@@ -0,0 +1,380 @@
# Design Document — Coding Session Memory
**Domain:** helix_forge
**Repo:** agentic-resources
**Status:** Draft v0.1
**Author:** Claude (drafted with Bernd Worsch)
**Created:** 2026-06-06
**Updated:** 2026-06-06
**Related:** [PRD-helix-forge.md](./PRD-helix-forge.md) (this is the Capture + storage layer, FR-C* / §8)
---
## 1. Purpose
Helix Forge's loop (Capture → Detect → Curate → Distribute → Measure) needs a
durable, bounded **memory of coding sessions**. This document specifies that
memory: how we **access** each coding agent's session protocol, how we
**normalize** those protocols into one schema, where we **store** the result, and
how we **age it out** — preferring a *storage-budget-based* eviction that drops
old raw content once it has been analyzed or no longer fits, rather than a naive
fixed time window.
The guiding asymmetry: **raw transcripts are bulky and re-derivable; the distilled
analysis is small and precious.** So we keep a *bounded cache* of raw sessions and
a *durable, compact* layer of extracted digests/signals. Eviction targets the
former, never the latter.
## 2. Research — How to Access Each Agent's Session Protocol
All three families persist sessions to the local filesystem as JSONL (plus, for
Grok, a per-session directory). All findings below were verified against the live
installs on this workstation (`~/.claude`, `~/.grok`) and public docs (Codex; not
installed here).
### 2.1 Claude Code ✅ verified on disk
| Aspect | Finding |
|--------|---------|
| Session transcripts | `~/.claude/projects/<url-encoded-cwd>/<session-uuid>.jsonl` — one JSONL per session |
| Subagent sidechains | same dir, `agent-<id>.jsonl`; records carry `isSidechain: true` |
| Global prompt history | `~/.claude/history.jsonl` |
| Record format | one JSON object per line; **`type`** discriminates: `user`, `assistant`, `attachment`, `queue-operation`, `ai-title`, `last-prompt`, `summary`, plus tool-result records |
| Key fields | `type`, `timestamp`, `sessionId`, `uuid`, `parentUuid` (turn DAG), `message` (`role` + content blocks: `text`/`thinking`/`tool_use`/`tool_result`), `cwd`, `gitBranch`, `version`, `requestId`, `toolUseResult`, `userType` |
| Token usage | inside assistant `message.usage` (input/output/cache tokens) |
| Model | `message.model` (e.g. `claude-opus-4-8`) |
| Side data | `~/.claude/todos/`, `~/.claude/tasks/`, `~/.claude/file-history/`, `~/.claude/shell-snapshots/` |
| Live capture hook | Claude Code **SessionEnd / Stop / SessionStart hooks** can fire our ingest on session close (push), in addition to batch scanning (pull) |
The turn DAG (`uuid`/`parentUuid`) lets us reconstruct branching, retries, and
sidechains exactly.
### 2.2 OpenAI Codex CLI ✅ schema confirmed from source (not installed locally)
Schema confirmed from the openai/codex source (`codex-rs/protocol/src/protocol.rs`
via DeepWiki) and a reverse-engineering writeup with real example lines — the two
cross-agree.
| Aspect | Finding |
|--------|---------|
| Session ("rollout") files | `$CODEX_HOME/sessions/YYYY/MM/DD/rollout-*.jsonl` (default `$CODEX_HOME = ~/.codex`) |
| Line wrapper (`RolloutLine`) | every line: **`{timestamp, type, payload}`** (UTC ts + a `RolloutItem`) |
| `type` discriminator | `session_meta` · `response_item` · `event_msg` · `turn_context` · `compacted` |
| `session_meta` | `{id, source, cwd, model_provider, cli_version}` (+ model) — restores env |
| `turn_context` | `{model, approval_policy, sandbox_policy}` — per-turn settings snapshot |
| `response_item` | raw model output / tool calls; `payload.type``message` · `function_call` · `function_call_output` · `reasoning` |
| → `message` | `{role: developer\|user\|assistant, content:[{type:"output_text"\|…, text}]}` |
| → `function_call` | `{name, arguments (JSON string), call_id}` |
| → `function_call_output` | `{call_id, output}` |
| `event_msg` | protocol events; `payload.type``task_started` · `task_complete` · `user_message` · `agent_message` · `token_count` · lifecycle |
| Token usage | `event_msg` with `payload.type = token_count`, interspersed (no fixed cadence) |
| Turn linkage | **flat — tool calls/outputs linked by `call_id`, no parent-ref DAG**; causality inferred from temporal order (unlike Claude's `uuid`/`parentUuid`) |
| Schema versions | older installs differ ("new ≥0.44 / mid / oldest 2025/08"); adapter version-detects on `session_meta.cli_version` |
| Naming / resume | filenames + `session_id` auto-generated; `codex resume --last`; `codex exec` for headless (trajectory-JSON is gh issue #2288) |
| Override location | `CODEX_HOME` env var |
**Adapter notes:** map `event_msg/task_started|task_complete``lifecycle`
events and outcome; `response_item/message``user_msg`/`assistant_msg`;
`function_call`+`function_call_output``tool_call`/`tool_result` joined on
`call_id`; `response_item/reasoning``thinking`; `event_msg/token_count` → cost
block. Because there is no parent-ref DAG, the adapter assigns `seq`/`parent_seq`
from temporal order rather than native links.
### 2.3 Grok CLI (xAI) ✅ verified on disk
Grok stores **a directory per session**, which is the richest source of the three.
| Aspect | Finding |
|--------|---------|
| Session dir | `~/.grok/sessions/<url-encoded-cwd>/<session-uuid>/` |
| `chat_history.jsonl` | full conversation; `type` = `system`/`user`/`assistant` + content |
| `events.jsonl` | **structured lifecycle events**`{ts, type, session_id, turn_number, model_id, yolo_mode, conversation_message_count, session_relationship, schema_version}`; types like `turn_started`, `loop_started` |
| `updates.jsonl` | streaming incremental updates |
| `summary.json` | `{id, cwd, session_summary, created_at, updated_at}` |
| `prompt_context.json` | injected context, incl. which AGENTS.md/CLAUDE.md files were loaded |
| `system_prompt.txt` | exact system prompt for the session |
| `rewind_points.jsonl`, `plan_mode.json` | rewind/plan-mode state |
| Per-cwd prompt history | `~/.grok/sessions/<cwd>/prompt_history.jsonl``{timestamp, session_id, prompt, is_bash}` |
| Global structured log | `~/.grok/logs/unified.jsonl``{ts, src, pid, lvl, msg, ctx, sid, ver}` |
| Search index | `~/.grok/sessions/session_search.sqlite``session_docs(session_id, cwd, updated_at, title)` + FTS5 (`session_docs_fts`) we can query directly |
| Integration surfaces | Grok exposes **ACP (Agent Client Protocol)**, **headless mode** (`grok -p`), and **hooks** (`~/.grok/docs/user-guide/10-hooks.md`) — push-capture options |
### 2.4 Cross-family summary
| | Claude Code | Codex CLI | Grok CLI |
|--|--|--|--|
| Root | `~/.claude/projects/` | `~/.codex/sessions/` | `~/.grok/sessions/` |
| Unit | one `.jsonl`/session | one `rollout-*.jsonl`/session | one **dir**/session |
| Layout | flat per-cwd dir | date-partitioned `YYYY/MM/DD` | per-cwd, per-session dir |
| Discriminator | `type` | `type` (version-dependent) | `type` (in `chat_history`/`events`) |
| Lifecycle events | inferred from records | inferred from records | **explicit** `events.jsonl` |
| Token usage | `message.usage` | per-line usage | from events/updates |
| Push capture | Stop/SessionEnd hooks | `codex exec` wrappers | hooks / ACP |
| Pull capture | scan dir by mtime | scan date partitions | scan dirs / query FTS sqlite |
**Implication:** the common denominator is *"JSONL records discriminated by a
`type` field, with a session id, timestamps, turn linkage, tool calls, and token
usage."* That maps cleanly onto one normalized schema (§4). Per-family quirks
(Grok's explicit `events.jsonl`, Codex's schema versions, Claude's sidechains) are
handled inside each adapter.
## 3. Tiered Storage Model
```
Tier 0 SOURCE (agents' own logs) read-only, never mutated
~/.claude/projects ~/.codex/sessions ~/.grok/sessions
│ collector adapters (per family) + ingest cursor
Tier 1 RAW CACHE (bounded, EVICTABLE) normalized Session + Event records
│ signal extractors / digesters
Tier 2 DISTILLED MEMORY (durable, small) session digests + signals + pattern evidence
```
- **Tier 0 — Source.** The agents' own logs. We treat them as read-only. We keep a
small **ingest cursor** per source so re-scans are incremental (see §6).
- **Tier 1 — Raw cache.** Normalized copies of sessions/events. This is the bulky
tier and the *only* tier subject to budget eviction.
- **Tier 2 — Distilled memory.** Per-session **digest** (outcome, costs, tool
histogram, error/retry/intervention markers, key snippets) plus extracted
**signals** and **pattern evidence pointers**. Compact and durable. A session can
be fully evicted from Tier 1 once its Tier 2 digest exists.
This is what makes "drop old content once it has been analyzed" safe: analysis
*promotes* the valuable bits into Tier 2 before the raw bytes are dropped.
### 3.1 Per-session lifecycle / watermarks
Each session row carries timestamps that drive eviction:
```
discovered_at → ingested_at → analyzed_at → [evictable] → evicted_at
```
- `ingested_at` set when normalized into Tier 1.
- `analyzed_at` set when the Tier 2 digest is written. **A session is evictable iff
`analyzed_at` is set.**
- `evicted_at` set when raw bytes are dropped from Tier 1 (Tier 2 digest remains).
## 4. Normalized Schema (Tier 1)
Two record kinds. Field names are stable across all adapters.
### 4.1 `Session`
```jsonc
{
"session_uid": "claude:17092961-…", // "<flavor>:<native id>", globally unique
"flavor": "claude" | "codex" | "grok",
"native_session_id": "17092961-…",
"repo": "agentic-resources", // resolved from cwd
"domain": "helix_forge", // resolved from repo→domain map
"cwd": "/home/worsch/agentic-resources",
"git_branch": "main",
"model": "claude-opus-4-8",
"started_at": "2026-06-05T21:59:30Z",
"ended_at": "2026-06-05T22:14:00Z",
"outcome": "success|fail|abandoned|unknown",
"cost": { "input_tokens": 0, "output_tokens": 0, "cache_tokens": 0,
"wall_clock_s": 0, "turns": 0, "retries": 0 },
"task_ref": "AGENTIC-WP-0002-T01", // if derivable; else null
"source_path": "~/.claude/projects/…/….jsonl",
"source_bytes": 0,
"schema_version": 1,
"ingested_at": "…", "analyzed_at": null, "evicted_at": null
}
```
### 4.2 `SessionEvent`
```jsonc
{
"session_uid": "claude:17092961-…",
"seq": 12, // monotonic within session
"parent_seq": 11, // turn DAG (Claude uuid/parentUuid)
"ts": "2026-06-05T22:01:13Z",
"kind": "user_msg | assistant_msg | thinking | tool_call | tool_result"
+ "| error | test_run | edit | retry | human_intervention | decision"
+ "| lifecycle | completion",
"role": "user|assistant|system|tool",
"tool": "Bash|Edit|Read|…", // when kind=tool_call/result
"summary": "ran pytest -q", // short, human-readable
"payload_ref": "blob://…", // pointer to full content in Tier 1 blob store
"tokens": 0,
"is_sidechain": false
}
```
Adapters map native records onto `kind`. Grok's `events.jsonl` populates
`lifecycle`/`turn` events directly; Claude/Codex lifecycle is inferred from the
record stream. Bulky bodies live behind `payload_ref` so Tier 1 rows stay light
and blobs can be evicted independently.
### 4.3 Native → `kind` mapping (all three families)
Each cell is the native record/discriminator an adapter reads to emit that
`SessionEvent.kind`. `—` = not natively present; the adapter synthesizes or omits.
| `kind` | Claude Code (`type` / `message`) | Codex CLI (`type``payload.type`) | Grok CLI (file → `type`) |
|--------|----------------------------------|--------------------------------------|---------------------------|
| `user_msg` | `user`, `message.role=user` | `response_item``message` `role=user`/`developer` | `chat_history``user` |
| `assistant_msg` | `assistant`, `message.role=assistant`, content `text` | `response_item``message` `role=assistant` (`output_text`) | `chat_history``assistant` |
| `thinking` | `assistant` content block `type=thinking` | `response_item``reasoning` | `chat_history`/`updates` reasoning block |
| `tool_call` | `assistant` content block `type=tool_use` (`name`,`input`) | `response_item``function_call` (`name`,`arguments`,`call_id`) | `chat_history`/`updates` tool-call entry |
| `tool_result` | `user`/tool record `type=tool_result` + `toolUseResult` | `response_item``function_call_output` (join on `call_id`) | `updates` tool-result entry |
| `test_run` | derived from `tool_call` (Bash running tests) | derived from `function_call` (`exec_command`) | derived from tool-call entry |
| `edit` | `tool_use` where `name` ∈ Edit/Write/NotebookEdit | `function_call` apply-patch/file-write tool | tool-call entry (edit/write) |
| `error` | `toolUseResult` error / non-zero result | `function_call_output` error / `event_msg` error | `events.jsonl` error / failed update |
| `retry` | repeated `tool_use` after error (inferred via DAG) | repeated `function_call` after error (inferred, temporal) | `events.jsonl` loop/retry event |
| `human_intervention` | `user` record mid-turn (interrupt), `userType` | `event_msg``user_message` mid-task | `prompt_history` mid-session / `events.jsonl` |
| `decision` | recorded out-of-band (State Hub `/decisions`) | recorded out-of-band (State Hub) | recorded out-of-band (State Hub) |
| `lifecycle` | inferred: first/last record, `summary`, `queue-operation` | `event_msg``task_started` / `task_complete` | **`events.jsonl`** → `turn_started`/`loop_started`/… (explicit) |
| `completion` | inferred: last `assistant` + `Stop`/`SessionEnd` hook | `event_msg``task_complete` | `events.jsonl` turn end + `summary.json` |
**Linkage note (drives `seq`/`parent_seq`):** Claude has a true turn DAG
(`uuid`/`parentUuid`) — preserve it directly. Codex is **flat**, joined only by
`call_id`; assign `seq` by temporal order. Grok carries explicit `turn_number` in
`events.jsonl`; key `seq` off that plus record order.
**Cost block sources:** Claude `message.usage`; Codex `event_msg/token_count`;
Grok `events.jsonl` / `updates.jsonl` token fields.
## 5. Retention & Eviction
The user's stated preference: **storage-budget-based**, dropping old content once
it has been analyzed or once it no longer fits — *better than* a fixed daily/weekly
window. We implement budget-based as primary, with a time backstop and a scheduled
cadence as the trigger.
### 5.1 Configurable knobs
```toml
[session_memory.retention]
raw_soft_cap_bytes = "4GiB" # begin evicting analyzed sessions above this
raw_hard_cap_bytes = "6GiB" # absolute ceiling for Tier 1
raw_max_age_days = 45 # backstop: analyzed raw older than this is evictable regardless of space
distilled_cap_bytes = "1GiB" # Tier 2 ceiling (should grow slowly; alert, don't auto-drop)
cadence = "daily" # ingest+analyze+evict sweep: daily | weekly | on-hook
```
### 5.2 Eviction algorithm (runs after each ingest+analyze sweep)
1. **Compute** current Tier 1 usage.
2. **Backstop pass:** evict any session where `analyzed_at` is set AND
`age > raw_max_age_days`.
3. **Budget pass:** while `usage > raw_soft_cap_bytes`:
- pick the **oldest `analyzed_at`** session that is not yet evicted;
- drop its Tier 1 raw rows + blobs (Tier 2 digest is kept), set `evicted_at`;
- if **no analyzed-but-unevicted session remains**, stop the budget pass
(we will not destroy un-analyzed data to free space) and go to step 4.
4. **Back-pressure / overflow:** if `usage > raw_hard_cap_bytes` and the only
remaining bulk is **un-analyzed**:
- first try to **analyze now** (run extraction) to make those sessions
evictable, then re-run the budget pass;
- if still over hard cap (analysis can't keep up or fails), evict the **oldest
un-analyzed** sessions as a last resort and emit a
`session_memory.data_loss` warning event + a State Hub progress note. This is
the only path that loses un-analyzed data, and it is always reported.
5. **Tier 2 guard:** if distilled usage > `distilled_cap_bytes`, **do not
auto-drop**; flag for human/curation review (digests are the product).
**Invariant:** *no session's raw bytes are dropped before its Tier 2 digest
exists, except the explicitly-reported hard-cap overflow path.*
### 5.3 Why budget-based beats fixed-window
A fixed daily/weekly drop either deletes data we never analyzed (lossy) or hoards
data we already distilled (wasteful). Budget + `analyzed_at` watermark ties
deletion to **two** real conditions the user named — *"once it has been analyzed"*
(promoted to Tier 2) and *"doesn't fit any longer"* (over budget) — and only falls
back to time as a backstop.
## 6. Ingest Cursors (incremental, idempotent)
Per source, persist a small cursor so sweeps are cheap and re-runnable:
- **Claude / Grok (per-cwd dirs):** track `(file_path, size, mtime)` and last
parsed line offset; re-ingest only grown/changed files. `session_uid` dedupes.
- **Codex (date partitions):** track last-seen `YYYY/MM/DD` + per-file offset.
- Ingest is **idempotent** keyed on `(session_uid, seq)` — safe to re-run after a
crash or partial sweep.
## 7. Capture Modes
- **Pull (default, portable):** scheduled sweep scans Tier 0 by mtime/partition.
Works for all three families with zero coupling to the agent. Triggered on the
configured `cadence` via the repo's scheduler (`/schedule`, cron, or `/loop`).
- **Push (optional, low-latency):** wire the agent's own hooks to ping the ingester
on session close — Claude `Stop`/`SessionEnd` hooks, Grok hooks/ACP, Codex
`exec` wrappers. Push just enqueues; the same idempotent pull pipeline does the
work.
Capture must be **non-blocking** (PRD FR-C5): we read copies of logs out-of-band;
we never sit in the agent's critical path.
## 8. Component Layout (proposed, in-repo)
```
session-memory/
adapters/
claude.py # Tier0→Tier1 normalizer (verified schema)
codex.py # version-detecting normalizer (confirm against real rollout)
grok.py # reads session dir incl. events.jsonl
core/
schema.py # Session / SessionEvent dataclasses + versioning
store.py # Tier1 (rows+blobs) and Tier2 (digests) — SQLite to start
cursor.py # per-source ingest cursors
retention.py # §5 eviction algorithm
digest.py # Tier1→Tier2 session digest + signal stubs
ingest.py # one sweep: discover → normalize → analyze → evict
config.toml # §5.1 knobs + repo→domain map + source paths
```
Storage starts as **SQLite + a blob dir** (rows in SQLite, bulky payloads as files
under `payload_ref`); graduate to Postgres alongside the State Hub only if volume
demands. Digests/decisions are also surfaced to the hub per ADR-001 (files-first;
hub indexes).
## 9. Privacy / Safety
- Tier 0 logs can contain secrets (the Grok `auth.json` and Claude `.credentials`
live in the same trees). The ingester reads **only** session transcripts, never
credential files, and **redacts** obvious secret patterns into `payload_ref`
blobs.
- All data is local; nothing leaves the workstation. Eviction of Tier 1 is a real
delete (not just an index drop) so the bounded cache is also a privacy bound.
## 10. Open Questions
- ~~**OQ1** Confirm Codex `rollout-*.jsonl` per-line schema.~~ **Resolved** (§2.2):
`{timestamp,type,payload}` lines, `type``session_meta`/`response_item`/`event_msg`/`turn_context`/`compacted`,
tool calls flat-linked by `call_id`, tokens via `event_msg/token_count`. Remaining
sub-item: verify the `token_count` payload field names against a real install when
Codex is present (older-version variance only).
- **OQ2** Outcome inference: how do we reliably label `success/fail/abandoned`
across flavors (exit signals differ)? Start heuristic (last-turn + test results +
human-intervention markers), refine in Detect phase.
- **OQ3** `task_ref` resolution — can we always map a session to a workplan task
(via cwd + branch + state-hub), or only sometimes?
- ~~**OQ4** Right default for `raw_soft_cap_bytes`.~~ **Measured** (Phase 0, 85
real local Claude files / 63 distinct sessions): source bytes per session
min 396 · **median ~49 KB** · max 48 MB (one outlier) · ~103 MB total. Claude
defaults (4 GiB soft / 6 GiB hard) leave ample headroom; revisit once Grok dirs
(heavier, multi-file) are ingested in Phase 1.
- **OQ6 (new, found in Phase 0)** Multi-file sessions: ~84 transcript files mapped
to ~63 `session_uid`s — some sessions span multiple files (resume/sidechain
sharing a `sessionId`). Current behavior upserts (last file wins per
`(session_uid, seq)`); a future refinement is to *merge* events across files of
one session rather than overwrite. Acceptable for Phase 0.
- **OQ5** Should push-hooks be opt-in per machine to avoid surprising the agents?
---
*Next step: [AGENTIC-WP-0002] implements Phase 0 — the schema, the Claude
collector, the Tier1/Tier2 store, and the budget-based eviction sweep.*
## Sources
- Claude Code session format — verified on disk: `~/.claude/projects/*/*.jsonl`, `~/.claude/history.jsonl`.
- Grok CLI session format — verified on disk: `~/.grok/sessions/`, `~/.grok/logs/unified.jsonl`, `~/.grok/sessions/session_search.sqlite`; `~/.grok/README.md` (ACP/headless/hooks).
- Codex CLI session format — [ccusage Codex guide](https://ccusage.com/guide/codex/), [Codex advanced config](https://developers.openai.com/codex/config-advanced), [codex-trace](https://github.com/PixelPaw-Labs/codex-trace), [codex-logs](https://github.com/wondercoms/codex-logs), [Session/Rollout Files discussion #3827](https://github.com/openai/codex/discussions/3827), [trajectory-JSON issue #2288](https://github.com/openai/codex/issues/2288).

279
docs/PRD-helix-forge.md Normal file
View File

@@ -0,0 +1,279 @@
# Product Requirements Document — Helix Forge
**Domain:** helix_forge
**Repo:** agentic-resources
**Status:** Draft v0.1
**Author:** Claude (drafted with Bernd Worsch)
**Created:** 2026-06-06
**Updated:** 2026-06-06
---
## 1. Summary
Helix Forge is a system for **handling a collection of repositories and evolving
the utility of what those repositories provide**, by treating the coding sessions
run against them as a first-class data source.
Concretely: across a fleet of repos worked on by multiple coding agents (Claude,
Codex, GrokBuild), Helix Forge **inspects the sessions**, **collects data about the
problems agents hit and the moves that resolved them**, and turns that data into
**reusable solution patterns** that can be discussed, implemented, and re-applied —
across every agent flavor, not just the one that discovered the pattern.
The name is the metaphor: a *helix* of repeated turns (session → pattern → improved
session) feeding a *forge* where the tooling, environments, and instructions for our
agents are hammered into better shape over time. This is the operational engine
behind the INTENT.md goal of an *antifragile, continuously-optimizing agentic
ecosystem*.
## 2. Problem Statement
We run many coding sessions, across many repos, with several different agents. Today
the value of each session is **trapped in that session**:
- When an agent solves a tricky problem, the solution is not captured in a form
another agent (or the same agent next week) can reuse.
- When an agent fails, struggles, or burns excess budget on a problem, that failure
signal is lost — we re-encounter the same friction repeatedly.
- Each agent flavor (Claude, Codex, GrokBuild) has its own environment, instruction
format, and extension mechanism, so a fix discovered for one is **not portable** to
the others without manual translation.
- We have no systematic, evidence-based answer to "what is actually slowing our
agents down, and what consistently makes them faster?" — decisions about tooling,
prompts, and environments are made on anecdote.
**The cost:** repeated mistakes, non-transferable wins, slow and uneven improvement
of agent performance, and no feedback loop from real session data back into the
tools/environments/instructions that shape future sessions.
## 3. Goals & Non-Goals
### 3.1 Goals
| # | Goal |
|---|------|
| G1 | **Capture** coding sessions from Claude, Codex, and GrokBuild in a normalized, comparable form. |
| G2 | **Detect** recurring *problem patterns* (failure, friction, wasted budget) and *success patterns* (efficient resolutions) from that data. |
| G3 | **Curate** detected patterns into a reviewed catalog of *solution patterns* that humans and agents can discuss and approve. |
| G4 | **Distribute** approved patterns back into agent environments — as instructions, tools, or extensions — in a per-flavor-appropriate form. |
| G5 | **Measure** whether distributed patterns actually improved subsequent sessions (close the loop). |
| G6 | Keep the whole loop **agent-flavor-agnostic at the core**, with thin per-flavor adapters at the edges. |
### 3.2 Non-Goals (initial release)
- Not a replacement for the coding agents themselves; Helix Forge observes and
improves them, it does not execute coding tasks.
- Not a general APM/observability product; scope is coding-session improvement, not
arbitrary infrastructure monitoring.
- Not an autonomous self-modifying system — pattern promotion into live agent
environments requires human approval (HITL) for the first release.
- Not building new model training/fine-tuning pipelines; we optimize *context,
tooling, and environment*, not model weights.
- Not replacing the Custodian State Hub; Helix Forge is a producer/consumer of hub
state, not a competing system of record. (See §9.)
## 4. Users & Personas
| Persona | Description | What they need from Helix Forge |
|---------|-------------|----------------------------------|
| **Operator (Bernd)** | Owns the agentic ecosystem; decides which patterns become standards. | A reviewable catalog of patterns with evidence; control over what ships to agents. |
| **Coding agent (Claude / Codex / GrokBuild)** | Runs tasks in a repo; both the *source* of session data and the *consumer* of patterns. | To emit session data cheaply; to receive applicable patterns in its native format at session start. |
| **Repo maintainer agent** | The per-repo agent persona (e.g. `agentic-resources`) following AGENTS.md conventions. | Patterns scoped to its repo/domain; integration via existing workplan + state-hub flow. |
| **Reviewer (human or kaizen agent)** | Evaluates candidate patterns before they become standards. | Clear pattern proposals, supporting evidence, and a discuss/approve/reject workflow. |
## 5. Core Concepts (Domain Model)
- **Session** — one bounded run of a coding agent against a repo. Has an agent flavor,
repo, task reference, timeline of events, outcome, and cost (tokens/time).
- **Session Event** — a normalized atomic record within a session: tool call, edit,
test run, error, retry, human intervention, decision, completion.
- **Signal** — a derived indicator extracted from sessions: e.g. *repeated test
failure on same file*, *budget overrun*, *fast clean resolution*, *retry storm*,
*human escalation*.
- **Problem Pattern** — a recurring negative signal cluster ("agents repeatedly fail
X because Y").
- **Success Pattern** — a recurring positive resolution ("doing Z reliably resolves X
cheaply").
- **Solution Pattern** — a curated, reviewed artifact pairing a problem with one or
more recommended resolutions, written agent-flavor-agnostically, with per-flavor
rendering hints.
- **Pattern Application** — the act of distributing a solution pattern into a specific
agent environment (an instruction snippet, a tool, an extension), plus the record of
its effect on later sessions.
## 6. Functional Requirements
### 6.1 Capture (G1)
- **FR-C1** Ingest session transcripts/logs from each supported agent flavor via a
per-flavor **collector adapter**.
- **FR-C2** Normalize raw logs into the common `Session` + `Session Event` schema,
regardless of source flavor.
- **FR-C3** Tag every session with: agent flavor, repo, domain, task/workplan id (if
any), outcome (success/fail/abandoned), and cost metrics (tokens, wall-clock,
retries).
- **FR-C4** Support both **batch import** (historical logs) and **incremental ingest**
(new sessions as they close).
- **FR-C5** Collection must be low-friction and non-blocking — an agent emitting
session data must never slow or break the actual coding task.
### 6.2 Detect (G2)
- **FR-D1** Run signal extractors over normalized sessions to surface problem and
success signals.
- **FR-D2** Cluster recurring signals across sessions/repos/flavors into candidate
Problem Patterns and Success Patterns.
- **FR-D3** For each candidate pattern, attach **evidence**: the supporting sessions,
frequency, affected repos, affected flavors, and estimated cost impact.
- **FR-D4** Flag **cross-flavor** patterns explicitly (a problem seen in Claude that
Codex also hits) — these are the highest-value reuse targets.
### 6.3 Curate (G3)
- **FR-U1** Present candidate patterns for review with their evidence in a
discuss/approve/reject workflow.
- **FR-U2** Allow a reviewer (human or kaizen agent) to promote a candidate into a
**Solution Pattern**: a named, versioned artifact with problem description,
recommended resolution(s), applicability scope, and per-flavor rendering hints.
- **FR-U3** Maintain a **Pattern Catalog** as the source of truth for approved
solution patterns, versioned and stored as files in-repo (consistent with ADR-001:
files originate work, the hub indexes them).
- **FR-U4** Record pattern decisions through the State Hub decision mechanism so
rationale is auditable.
### 6.4 Distribute (G4)
- **FR-X1** Render each approved solution pattern into per-flavor artifacts via
**distributor adapters**:
- Claude → `CLAUDE.md` snippets, skills, or settings/hooks.
- Codex → `AGENTS.md` snippets / repo conventions.
- GrokBuild → its native instruction/extension format.
- **FR-X2** Scope distribution by repo and domain, so a pattern only lands where it
applies.
- **FR-X3** Distribution is **proposed, not auto-applied** in v1 — output is a
reviewable change (e.g. a workplan or PR), gated by human approval.
- **FR-X4** Track which patterns are currently active in which environments.
### 6.5 Measure (G5)
- **FR-M1** After a pattern is applied, compare subsequent sessions touching the same
signal against the pre-application baseline (cost, retry rate, success rate,
human-intervention rate).
- **FR-M2** Surface per-pattern **effectiveness** so ineffective patterns can be
revised or retired.
- **FR-M3** Provide a fleet-level view: are sessions across the collection getting
cheaper / more reliable over time? (the helix turning.)
### 6.6 Multi-Agent Support (G6)
- **FR-A1** The core schema, detection, catalog, and measurement are **flavor-agnostic**.
- **FR-A2** All flavor-specific knowledge lives in **collector adapters** (input) and
**distributor adapters** (output). Adding a fourth agent = adding one collector +
one distributor, no core changes.
- **FR-A3** A successful pattern discovered via one flavor MUST be expressible for all
other supported flavors.
## 7. Architecture Overview
```
┌──────────── per-flavor edges ────────────┐ ┌──── flavor-agnostic core ────┐
│ │ │ │
Claude ─┐ │ │ │
Codex ─┼─► Collector Adapters ──► Normalizer ─┼────────►│ Session + Event Store │
Grok ─┘ │ │ │ │
│ │ ▼ │
│ │ Signal Extractors │
│ │ │ │
│ │ ▼ │
│ │ Pattern Detector / Clusterer│
│ │ │ │
│ │ ▼ │
│ │ Curation + Pattern Catalog │ ◄─ reviewer (human/kaizen)
│ │ │ │
Claude ◄┐ │ │ ▼ │
Codex ◄┼── Distributor Adapters ◄────────────┼─────────│ Effectiveness Measurement │
Grok ◄┘ │ │ │
└───────────────────────────────────────────┘ └──────────────────────────────┘
▲ feeds back into ▲ tools / environments / instructions
```
**Design principle:** *agnostic core, thin adapters at the edges.* The expensive,
reusable intelligence (normalized sessions, detection, catalog, measurement) is built
once; each agent flavor only needs an input adapter and an output adapter.
## 8. Data & Storage
- **Pattern Catalog** and **workplans**: files in `agentic-resources` (per ADR-001 in
AGENTS.md — files are the source of truth, the hub indexes them).
- **Session/event data**: a local store (start simple: structured files / SQLite;
graduate to Postgres alongside the State Hub if volume warrants).
- **Decisions & progress**: recorded through the Custodian State Hub so the broader
ecosystem stays aware of Helix Forge's activity.
## 9. Integration with the Custodian State Hub
Helix Forge runs inside the `helix_forge` domain and is **not** a competing system of
record:
- Work originates as **workplans** in this repo (`AGENTIC-WP-NNNN`), synced via
`make fix-consistency REPO=agentic-resources`.
- Pattern-promotion and distribution decisions are logged via the hub's decision API.
- Each Helix Forge run logs at least one `add_progress_event()` / `POST /progress/`.
- The hub remains a **read model**; Helix Forge writes its durable artifacts as files
and lets the hub index them.
## 10. Success Metrics
| Metric | Meaning | Target (directional, v1) |
|--------|---------|--------------------------|
| Sessions captured | Coverage of real work | ≥ 90% of sessions across the 3 flavors normalized |
| Patterns cataloged | Knowledge made reusable | A growing, non-trivial catalog of reviewed solution patterns |
| Cross-flavor patterns | Reuse leverage | ≥ 1 pattern proven to transfer across flavors |
| Pattern effectiveness | Loop is closing | Applied patterns show measurable cost/reliability improvement vs. baseline |
| Fleet trend | The helix turns | Median session cost ↓ and success rate ↑ over time |
| Repeated-failure rate | Friction eliminated | Known problem patterns recur less after distribution |
## 11. Phasing / Roadmap
- **Phase 0 — Foundations.** Define the Session/Event schema and Pattern Catalog
format. One collector adapter (Claude) + batch import. Manual inspection only.
- **Phase 1 — Detect.** Signal extractors + pattern clustering over captured sessions;
candidate patterns surfaced with evidence. Add Codex + GrokBuild collectors.
- **Phase 2 — Curate.** Review workflow + versioned Pattern Catalog, wired to hub
decisions.
- **Phase 3 — Distribute.** Distributor adapters for all three flavors; patterns ship
as reviewable workplans/PRs (HITL).
- **Phase 4 — Measure.** Baseline-vs-after effectiveness and fleet-level trend
reporting; retire ineffective patterns. Loop is closed.
## 12. Open Questions
- **OQ1** What is the canonical raw log format available from each of Claude, Codex,
and GrokBuild today, and how lossy is normalization from each?
- **OQ2** How are sessions reliably bounded and attributed to a repo/task across the
three flavors?
- **OQ3** Where does detection logic run — local batch jobs, hub-side, or a dedicated
service? What volume do we actually expect?
- **OQ4** Pattern format: how do we keep one agnostic representation while giving each
distributor enough to render high-quality native artifacts?
- **OQ5** What's the minimum trustworthy evidence bar before a pattern is allowed to be
distributed to live agent environments?
- **OQ6** How do we prevent pattern bloat — too many low-value instructions degrading
agent context budgets (cf. the token-budget policy in global instructions)?
## 13. Risks
| Risk | Mitigation |
|------|------------|
| Capture overhead slows real coding sessions | Async, non-blocking collection (FR-C5); never in the agent's critical path. |
| Patterns become noise / context bloat | Effectiveness gating (FR-M2) + retirement; measure before broad distribution. |
| Over-fitting to one flavor | Agnostic core + explicit cross-flavor flagging (FR-D4, FR-A3). |
| Bad pattern degrades agents | HITL approval before distribution (FR-X3); baseline measurement to catch regressions. |
| Drift from State Hub conventions | Files-first per ADR-001; log via hub; no competing source of record. |
---
*This PRD is a draft for discussion. Next step: a `proposed` workplan
(`AGENTIC-WP-0002`) scoping Phase 0 — the Session/Event schema and the first
(Claude) collector adapter.*

75
session_memory/README.md Normal file
View File

@@ -0,0 +1,75 @@
# session_memory
Capture + retention layer for Helix Forge — the **Capture** stage of the loop in
[../docs/PRD-helix-forge.md](../docs/PRD-helix-forge.md), built to the
[../docs/DESIGN-session-memory.md](../docs/DESIGN-session-memory.md) spec.
It scans coding-agent session logs, normalizes them into one schema, distills a
compact per-session digest, and ages out raw bulk under a **storage budget**
(dropping sessions once analyzed and once space is needed) rather than a fixed
time window.
## Layout
```
session_memory/
adapters/claude.py # Tier0 -> Tier1 normalizer (Codex/Grok land in Phase 1)
core/schema.py # Session / SessionEvent / Cost
core/store.py # SQLite rows + blob-dir bodies (Tier1) + digests (Tier2)
core/cursor.py # incremental ingest cursors
core/digest.py # Tier1 -> Tier2 promotion + outcome heuristic
core/retention.py # budget-based eviction sweep
ingest.py # one sweep: discover -> normalize -> store -> digest -> evict
config.toml # store paths, retention caps, sources, repo->domain map
```
The local store lives under `session_memory/.store/` (gitignored).
## Run a sweep
```bash
# from the repo root
python -m session_memory.ingest # ingest + analyze + evict
python -m session_memory.ingest --dry-run # discover + parse only, writes nothing
python -m session_memory.ingest --config path/to/config.toml
```
Output reports `discovered / ingested / skipped_unchanged / analyzed` and a
retention line (`freed`, `final_usage`, and per-pass eviction counts). Sweeps are
idempotent — re-running skips unchanged files via the cursor.
## Scheduling (cadence)
Retention is budget-based; the `cadence` in `config.toml` only decides how often
the sweep *runs*. Trigger it with the repo scheduler, e.g. daily:
```bash
# Claude Code: schedule a daily routine that runs the sweep
/schedule "daily session-memory sweep" -- python -m session_memory.ingest
```
or a cron entry / `/loop` on a timer. Push-capture (agent Stop/SessionEnd hooks)
can also enqueue a sweep; see design §7.
## Retention knobs (`[retention]` in config.toml)
| Key | Meaning |
|-----|---------|
| `raw_soft_cap_bytes` | begin evicting **analyzed** sessions above this (oldest first) |
| `raw_hard_cap_bytes` | absolute Tier 1 ceiling; overflow path may, as a last resort, evict un-analyzed sessions and report `data_loss` |
| `raw_max_age_days` | backstop: analyzed raw older than this is evictable regardless of space |
| `distilled_cap_bytes` | Tier 2 ceiling — **alert only**, never auto-dropped |
**Invariant:** a session's raw bytes are never dropped before its Tier 2 digest
exists, except the explicitly-reported hard-cap overflow path.
## Tests
```bash
python -m pytest # 26 tests: schema, adapter, store, digest, retention, ingest
```
## Status
Phase 0 (AGENTIC-WP-0002): Claude adapter only, end to end. Codex and Grok
adapters are designed (schemas confirmed in the design doc) and land in Phase 1.

View File

@@ -0,0 +1,7 @@
"""Coding Session Memory — Helix Forge capture + retention layer.
See docs/DESIGN-session-memory.md. Importable package name uses an underscore
(``session_memory``) where the design doc writes ``session-memory/``.
"""
__all__ = ["core", "adapters"]

View File

@@ -0,0 +1 @@
"""Per-flavor collector adapters (Tier 0 -> Tier 1 normalization)."""

View File

@@ -0,0 +1,228 @@
"""Claude Code collector adapter — Tier 0 -> Tier 1 (design §2.1, §4.3).
Reads ``~/.claude/projects/<url-encoded-cwd>/<session-uuid>.jsonl`` (and
``agent-*.jsonl`` sidechains), discriminates on the record ``type``, reconstructs
the turn DAG via ``uuid``/``parentUuid``, and emits normalized records.
Returns a :class:`Normalized` bundle: the ``Session``, its ordered
``SessionEvent`` list, and a ``blobs`` map (``payload_ref -> full text body``)
that the store persists out-of-line so Tier 1 rows stay light.
"""
from __future__ import annotations
import json
import os
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Iterable, Optional
from ..core.schema import Cost, Session, SessionEvent
FLAVOR = "claude"
# tool_use names that mutate files -> kind "edit"
_EDIT_TOOLS = {"Edit", "Write", "NotebookEdit", "MultiEdit"}
# crude test-runner detection inside Bash commands -> kind "test_run"
_TEST_HINTS = ("pytest", "unittest", "npm test", "npm run test", "go test", "cargo test", "jest", "vitest")
@dataclass
class Normalized:
session: Session
events: list[SessionEvent]
blobs: dict[str, str] = field(default_factory=dict)
def _iter_records(path: str) -> Iterable[dict[str, Any]]:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError:
continue # tolerate partial/corrupt trailing lines
def _resolve_repo(cwd: Optional[str], repo_domain_map: dict[str, str]) -> tuple[Optional[str], Optional[str]]:
"""cwd -> (repo, domain). repo is the cwd basename; domain via map."""
if not cwd:
return None, None
repo = os.path.basename(cwd.rstrip("/")) or None
domain = repo_domain_map.get(repo) if repo else None
return repo, domain
def _is_test_command(text: str) -> bool:
low = text.lower()
return any(h in low for h in _TEST_HINTS)
def _content_blocks(message: dict[str, Any]) -> list[dict[str, Any]]:
content = message.get("content")
if isinstance(content, str):
return [{"type": "text", "text": content}]
if isinstance(content, list):
return [b for b in content if isinstance(b, dict)]
return []
def parse_session(path: str, repo_domain_map: Optional[dict[str, str]] = None) -> Optional[Normalized]:
"""Parse one Claude transcript file into a Normalized bundle.
Returns None if the file has no usable session records.
"""
repo_domain_map = repo_domain_map or {}
records = list(_iter_records(path))
if not records:
return None
session_id: Optional[str] = None
cwd = git_branch = version = model = None
timestamps: list[str] = []
file_is_sidechain = os.path.basename(path).startswith("agent-")
events: list[SessionEvent] = []
blobs: dict[str, str] = {}
uuid_to_seq: dict[str, int] = {}
cost = Cost()
seq = 0
def add_event(uuid: Optional[str], parent_uuid: Optional[str], ts, kind, *,
role=None, tool=None, summary=None, body=None, tokens=0, sidechain=False):
nonlocal seq
s = seq
seq += 1
if uuid:
uuid_to_seq[uuid] = s
parent_seq = uuid_to_seq.get(parent_uuid) if parent_uuid else None
payload_ref = None
if body:
payload_ref = f"blob://{session_id}/{s}"
blobs[payload_ref] = body
events.append(SessionEvent(
session_uid=Session.make_uid(FLAVOR, session_id or "unknown"),
seq=s, parent_seq=parent_seq, ts=ts, kind=kind, role=role, tool=tool,
summary=(summary or "")[:300] or None, payload_ref=payload_ref,
tokens=tokens, is_sidechain=sidechain or file_is_sidechain,
))
for rec in records:
rtype = rec.get("type")
ts = rec.get("timestamp")
if ts:
timestamps.append(ts)
session_id = session_id or rec.get("sessionId")
cwd = cwd or rec.get("cwd")
git_branch = git_branch or rec.get("gitBranch")
version = version or rec.get("version")
uuid = rec.get("uuid")
parent = rec.get("parentUuid")
sidechain = bool(rec.get("isSidechain"))
if rtype == "user":
msg = rec.get("message", {})
for b in _content_blocks(msg):
bt = b.get("type")
if bt == "tool_result":
body = _stringify(b.get("content"))
add_event(uuid, parent, ts, "tool_result", role="tool",
summary="tool result", body=body, sidechain=sidechain)
else:
text = b.get("text", "")
add_event(uuid, parent, ts, "user_msg", role="user",
summary=_first_line(text), body=text, sidechain=sidechain)
elif rtype == "assistant":
msg = rec.get("message", {})
model = model or msg.get("model")
usage = msg.get("usage") or {}
cost.input_tokens += int(usage.get("input_tokens", 0) or 0)
cost.output_tokens += int(usage.get("output_tokens", 0) or 0)
cost.cache_tokens += int(
(usage.get("cache_read_input_tokens", 0) or 0)
+ (usage.get("cache_creation_input_tokens", 0) or 0)
)
out_tokens = int(usage.get("output_tokens", 0) or 0)
for b in _content_blocks(msg):
bt = b.get("type")
if bt == "thinking":
add_event(uuid, parent, ts, "thinking", role="assistant",
summary="thinking", body=b.get("thinking", ""), sidechain=sidechain)
elif bt == "text":
text = b.get("text", "")
add_event(uuid, parent, ts, "assistant_msg", role="assistant",
summary=_first_line(text), body=text, tokens=out_tokens, sidechain=sidechain)
elif bt == "tool_use":
name = b.get("name", "")
inp = b.get("input", {})
body = _stringify(inp)
kind = "tool_call"
if name in _EDIT_TOOLS:
kind = "edit"
elif name == "Bash" and _is_test_command(_stringify(inp.get("command", ""))):
kind = "test_run"
add_event(uuid, parent, ts, kind, role="assistant", tool=name,
summary=f"{name}", body=body, sidechain=sidechain)
elif rtype == "summary":
add_event(uuid, parent, ts, "lifecycle", summary="summary",
body=_stringify(rec.get("summary")), sidechain=sidechain)
# queue-operation / ai-title / last-prompt / attachment: skipped as events
if session_id is None:
return None
cost.turns = sum(1 for e in events if e.kind == "user_msg")
started = min(timestamps) if timestamps else None
ended = max(timestamps) if timestamps else None
cost.wall_clock_s = _seconds_between(started, ended)
repo, domain = _resolve_repo(cwd, repo_domain_map)
session = Session(
session_uid=Session.make_uid(FLAVOR, session_id),
flavor=FLAVOR,
native_session_id=session_id,
repo=repo, domain=domain, cwd=cwd, git_branch=git_branch,
model=model, started_at=started, ended_at=ended,
outcome="unknown", # outcome inference happens in the digest step (T04)
cost=cost,
source_path=path,
source_bytes=os.path.getsize(path) if os.path.exists(path) else 0,
discovered_at=_now(),
)
return Normalized(session=session, events=events, blobs=blobs)
# ---- helpers ---------------------------------------------------------------
def _stringify(v: Any) -> str:
if v is None:
return ""
if isinstance(v, str):
return v
try:
return json.dumps(v, ensure_ascii=False)[:20000]
except (TypeError, ValueError):
return str(v)[:20000]
def _first_line(text: str) -> str:
return (text or "").strip().splitlines()[0] if (text or "").strip() else ""
def _seconds_between(start: Optional[str], end: Optional[str]) -> float:
if not start or not end:
return 0.0
try:
a = datetime.fromisoformat(start.replace("Z", "+00:00"))
b = datetime.fromisoformat(end.replace("Z", "+00:00"))
return max(0.0, (b - a).total_seconds())
except ValueError:
return 0.0
def _now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")

View File

@@ -0,0 +1,39 @@
# Coding Session Memory — configuration (design §5.1, §8).
# Paths support ~ expansion. Edit caps to taste; see docs/DESIGN-session-memory.md.
[store]
# Local store lives under the repo by default (gitignored).
db_path = "session_memory/.store/mem.db"
blob_dir = "session_memory/.store/blobs"
cursor = "session_memory/.store/cursors.json"
[retention]
raw_soft_cap_bytes = 4294967296 # 4 GiB — begin evicting analyzed sessions above this
raw_hard_cap_bytes = 6442450944 # 6 GiB — absolute Tier 1 ceiling
raw_max_age_days = 45 # backstop: analyzed raw older than this is evictable
distilled_cap_bytes = 1073741824 # 1 GiB — Tier 2 ceiling (alert, never auto-drop)
cadence = "daily" # sweep trigger: daily | weekly | on-hook
[sources.claude]
enabled = true
root = "~/.claude/projects"
# glob, relative to root; covers sessions and agent-* sidechains
glob = "*/*.jsonl"
# Codex / Grok adapters land in Phase 1 (schemas confirmed in the design doc).
[sources.codex]
enabled = false
root = "~/.codex/sessions"
glob = "*/*/*/rollout-*.jsonl"
[sources.grok]
enabled = false
root = "~/.grok/sessions"
glob = "*/*/chat_history.jsonl"
# cwd basename -> domain slug. Used to tag sessions with their Custodian domain.
[repo_domain_map]
agentic-resources = "helix_forge"
the-custodian = "custodian"
state-hub = "custodian"
ops-bridge = "custodian"

View File

@@ -0,0 +1 @@
"""Flavor-agnostic core: schema, store, cursor, digest, retention."""

View File

@@ -0,0 +1,49 @@
"""Per-source ingest cursors (design §6; T06).
Tracks ``(path -> size, mtime)`` so sweeps re-ingest only changed/grown files.
Persisted as a small JSON sidecar. Ingest itself is idempotent on
``(session_uid, seq)`` in the store, so the cursor is an optimization, not a
correctness requirement — a lost cursor just means a full (still-idempotent)
re-scan.
"""
from __future__ import annotations
import json
import os
from typing import Optional
class Cursors:
def __init__(self, path: str):
self.path = path
self._data: dict[str, dict] = {}
if os.path.exists(path):
try:
with open(path, "r", encoding="utf-8") as f:
self._data = json.load(f)
except (OSError, ValueError):
self._data = {}
def is_changed(self, file_path: str) -> bool:
"""True if the file is new or has changed size/mtime since last seen."""
try:
stat = os.stat(file_path)
except OSError:
return False
prev = self._data.get(file_path)
return prev is None or prev.get("size") != stat.st_size or prev.get("mtime") != stat.st_mtime
def mark(self, file_path: str) -> None:
try:
stat = os.stat(file_path)
except OSError:
return
self._data[file_path] = {"size": stat.st_size, "mtime": stat.st_mtime}
def save(self) -> None:
os.makedirs(os.path.dirname(self.path) or ".", exist_ok=True)
tmp = self.path + ".tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(self._data, f)
os.replace(tmp, self.path)

View File

@@ -0,0 +1,159 @@
"""Session digest — Tier 1 -> Tier 2 promotion (design §3, §4; T04).
Compresses a session's events into a small, durable digest: outcome heuristic,
cost totals, tool histogram, and counts of error/retry/test/edit/human markers,
plus a few key snippets. Writing the digest sets ``analyzed_at``, which is what
makes a session evictable under budget-based retention (design §5).
Signal extraction beyond this digest is intentionally out of scope here — it
belongs to the Detect phase (PRD §6.2).
"""
from __future__ import annotations
import collections
from typing import Any
from .schema import Session, SessionEvent
# Substrings in tool_result bodies / summaries that suggest a failure.
_FAIL_HINTS = ("error", "failed", "exception", "traceback", "fatal", "non-zero")
# Substrings suggesting a clean test pass.
_PASS_HINTS = ("passed", "0 failed", "ok", "success")
def infer_outcome(events: list[SessionEvent], blobs: dict[str, str] | None = None) -> str:
"""Heuristic outcome label across flavors (design OQ2).
- ``abandoned`` if the session has no assistant output at all.
- ``fail`` if the last substantive signal is an error / failing test.
- ``success`` if it ends on assistant output or a passing test.
- ``unknown`` otherwise.
"""
blobs = blobs or {}
assistant = [e for e in events if e.kind == "assistant_msg"]
if not assistant:
return "abandoned"
# Look at error and test signals; weight the latest ones.
last_fail = _last_index(events, lambda e: e.kind == "error")
last_test = _last_index(events, lambda e: e.kind == "test_run")
last_completion = _last_index(events, lambda e: e.kind in ("completion", "assistant_msg"))
test_passed = None
if last_test is not None:
# inspect the nearest following tool_result body for pass/fail hints
body = _nearby_result_body(events, last_test, blobs)
if body:
low = body.lower()
if any(h in low for h in _FAIL_HINTS):
test_passed = False
elif any(h in low for h in _PASS_HINTS):
test_passed = True
if test_passed is False and (last_test or 0) >= (last_completion or 0):
return "fail"
if last_fail is not None and last_completion is not None and last_fail > last_completion:
return "fail"
if test_passed is True:
return "success"
if last_completion is not None:
return "success"
return "unknown"
def build_digest(session: Session, events: list[SessionEvent],
blobs: dict[str, str] | None = None) -> dict[str, Any]:
"""Produce the compact Tier 2 digest dict for a session."""
blobs = blobs or {}
kind_counts = collections.Counter(e.kind for e in events)
tool_hist = collections.Counter(e.tool for e in events if e.tool)
retries = kind_counts.get("retry", 0)
outcome = infer_outcome(events, blobs)
return {
"session_uid": session.session_uid,
"flavor": session.flavor,
"repo": session.repo,
"domain": session.domain,
"model": session.model,
"started_at": session.started_at,
"ended_at": session.ended_at,
"outcome": outcome,
"cost": {
"input_tokens": session.cost.input_tokens,
"output_tokens": session.cost.output_tokens,
"cache_tokens": session.cost.cache_tokens,
"wall_clock_s": session.cost.wall_clock_s,
"turns": session.cost.turns,
"retries": retries,
},
"event_count": len(events),
"kind_counts": dict(kind_counts),
"tool_histogram": dict(tool_hist),
"markers": {
"errors": kind_counts.get("error", 0),
"retries": retries,
"test_runs": kind_counts.get("test_run", 0),
"edits": kind_counts.get("edit", 0),
"human_interventions": kind_counts.get("human_intervention", 0),
},
"first_prompt": _first_prompt(events, blobs),
"last_assistant": _last_assistant(events, blobs),
"schema_version": session.schema_version,
}
def analyze(store, session_uid: str) -> dict[str, Any]:
"""Read a session from the store, write its digest, return the digest."""
session = store.get_session(session_uid)
if session is None:
raise KeyError(session_uid)
events = store.get_events(session_uid)
blobs = {e.payload_ref: _read_blob(store, e.payload_ref)
for e in events if e.payload_ref}
digest = build_digest(session, events, blobs)
store.write_digest(session_uid, digest)
return digest
# ---- helpers ---------------------------------------------------------------
def _last_index(events, pred):
idx = None
for i, e in enumerate(events):
if pred(e):
idx = i
return idx
def _nearby_result_body(events, idx, blobs):
for e in events[idx + 1: idx + 4]:
if e.kind == "tool_result" and e.payload_ref in blobs:
return blobs[e.payload_ref]
return None
def _first_prompt(events, blobs):
for e in events:
if e.kind == "user_msg":
return (blobs.get(e.payload_ref) or e.summary or "")[:280]
return None
def _last_assistant(events, blobs):
for e in reversed(events):
if e.kind == "assistant_msg":
return (blobs.get(e.payload_ref) or e.summary or "")[:280]
return None
def _read_blob(store, ref):
row = store.db.execute("SELECT path FROM blobs WHERE ref=?", (ref,)).fetchone()
if not row:
return ""
try:
with open(row["path"], "r", encoding="utf-8") as f:
return f.read()
except OSError:
return ""

View File

@@ -0,0 +1,144 @@
"""Budget-based retention sweep (design §5; T05).
Eviction is tied to the two conditions the design names — a session is dropped
from Tier 1 once it has been *analyzed* (its digest is in Tier 2) **and** space is
needed, with a max-age backstop. The invariant: raw bytes are never dropped
before the Tier 2 digest exists, except the explicitly-reported hard-cap overflow
path.
Order of passes per sweep:
1. backstop — evict analyzed sessions older than ``raw_max_age_days``
2. budget — while over ``raw_soft_cap_bytes``, evict oldest-analyzed first
3. overflow — if still over ``raw_hard_cap_bytes`` and only un-analyzed bulk
remains: analyze-now, retry budget; last resort evict oldest
un-analyzed and emit a reported ``data_loss`` event.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Callable, Optional
from .schema import Session
@dataclass
class RetentionConfig:
raw_soft_cap_bytes: int = 4 * 1024**3 # 4 GiB
raw_hard_cap_bytes: int = 6 * 1024**3 # 6 GiB
raw_max_age_days: int = 45
distilled_cap_bytes: int = 1 * 1024**3 # 1 GiB (alert only, never auto-drop)
@dataclass
class EvictionReport:
backstop_evicted: list[str] = field(default_factory=list)
budget_evicted: list[str] = field(default_factory=list)
overflow_analyzed: list[str] = field(default_factory=list)
overflow_data_loss: list[str] = field(default_factory=list)
bytes_freed: int = 0
final_usage_bytes: int = 0
over_hard_cap: bool = False
tier2_over_cap: bool = False
warnings: list[str] = field(default_factory=list)
@property
def lost_data(self) -> bool:
return bool(self.overflow_data_loss)
def _parse_ts(ts: Optional[str]) -> Optional[datetime]:
if not ts:
return None
try:
return datetime.fromisoformat(ts.replace("Z", "+00:00"))
except ValueError:
return None
def _age_days(s: Session, now: datetime) -> Optional[float]:
ref = _parse_ts(s.ended_at) or _parse_ts(s.started_at) or _parse_ts(s.ingested_at)
if ref is None:
return None
if ref.tzinfo is None:
ref = ref.replace(tzinfo=timezone.utc)
return (now - ref).total_seconds() / 86400.0
def _sort_key(s: Session) -> str:
# oldest-analyzed-first; fall back through timestamps
return s.analyzed_at or s.ended_at or s.ingested_at or ""
def sweep(store, config: RetentionConfig, *,
analyze_fn: Optional[Callable[[object, str], object]] = None,
now: Optional[datetime] = None) -> EvictionReport:
"""Run one retention sweep against ``store``. Returns an EvictionReport.
``analyze_fn(store, session_uid)`` is used by the overflow path to make
un-analyzed sessions evictable; pass ``digest.analyze``.
"""
now = now or datetime.now(timezone.utc)
report = EvictionReport()
def live_sessions() -> list[Session]:
return [s for s in store.list_sessions() if s.evicted_at is None]
# 1. backstop pass — analyzed + older than max age
for s in sorted(live_sessions(), key=_sort_key):
age = _age_days(s, now)
if s.is_evictable and age is not None and age > config.raw_max_age_days:
report.bytes_freed += store.evict_raw(s.session_uid)
report.backstop_evicted.append(s.session_uid)
# 2. budget pass — evict oldest analyzed while over soft cap
while store.tier1_usage_bytes() > config.raw_soft_cap_bytes:
candidates = [s for s in live_sessions() if s.is_evictable]
if not candidates:
break # will not destroy un-analyzed data for space
victim = min(candidates, key=_sort_key)
report.bytes_freed += store.evict_raw(victim.session_uid)
report.budget_evicted.append(victim.session_uid)
# 3. overflow path — only if still over HARD cap with un-analyzed bulk left
if store.tier1_usage_bytes() > config.raw_hard_cap_bytes:
# 3a. try to analyze now so those sessions become evictable
if analyze_fn is not None:
for s in sorted(live_sessions(), key=_sort_key):
if not s.is_evictable:
try:
analyze_fn(store, s.session_uid)
report.overflow_analyzed.append(s.session_uid)
except Exception as e: # analysis may fail; keep going
report.warnings.append(f"analyze failed for {s.session_uid}: {e}")
# retry budget pass on the freshly-analyzed sessions
while store.tier1_usage_bytes() > config.raw_soft_cap_bytes:
candidates = [s for s in live_sessions() if s.is_evictable]
if not candidates:
break
victim = min(candidates, key=_sort_key)
report.bytes_freed += store.evict_raw(victim.session_uid)
report.budget_evicted.append(victim.session_uid)
# 3b. last resort — evict oldest un-analyzed, REPORTED as data loss
while store.tier1_usage_bytes() > config.raw_hard_cap_bytes:
remaining = [s for s in live_sessions() if not s.is_evictable]
if not remaining:
break
victim = min(remaining, key=_sort_key)
report.bytes_freed += store.evict_raw(victim.session_uid)
report.overflow_data_loss.append(victim.session_uid)
report.warnings.append(
f"data_loss: evicted un-analyzed {victim.session_uid} to stay under hard cap"
)
usage = store.tier1_usage_bytes()
report.final_usage_bytes = usage
report.over_hard_cap = usage > config.raw_hard_cap_bytes
report.tier2_over_cap = store.tier2_usage_bytes() > config.distilled_cap_bytes
if report.tier2_over_cap:
report.warnings.append(
"tier2 distilled store over cap — flag for curation review (do not auto-drop)"
)
return report

View File

@@ -0,0 +1,156 @@
"""Normalized session schema (Tier 1) — design doc §4.
Two record kinds, ``Session`` and ``SessionEvent``, plus the small enums every
adapter targets. Field names here are the stable contract; per-flavor quirks are
absorbed inside each adapter (see design §4.3 native -> kind mapping).
"""
from __future__ import annotations
import json
from dataclasses import asdict, dataclass, field, fields
from typing import Any, Optional
SCHEMA_VERSION = 1
# Supported agent flavors. ``session_uid`` is always "<flavor>:<native id>".
FLAVORS = ("claude", "codex", "grok")
# SessionEvent.kind universe (design §4.2 / §4.3).
KINDS = (
"user_msg",
"assistant_msg",
"thinking",
"tool_call",
"tool_result",
"error",
"test_run",
"edit",
"retry",
"human_intervention",
"decision",
"lifecycle",
"completion",
)
# Session.outcome universe.
OUTCOMES = ("success", "fail", "abandoned", "unknown")
@dataclass
class Cost:
"""Token + effort accounting for a session."""
input_tokens: int = 0
output_tokens: int = 0
cache_tokens: int = 0
wall_clock_s: float = 0.0
turns: int = 0
retries: int = 0
@dataclass
class Session:
"""One bounded run of a coding agent against a repo (design §4.1)."""
session_uid: str # "<flavor>:<native id>" — globally unique
flavor: str
native_session_id: str
repo: Optional[str] = None
domain: Optional[str] = None
cwd: Optional[str] = None
git_branch: Optional[str] = None
model: Optional[str] = None
started_at: Optional[str] = None # ISO-8601 UTC
ended_at: Optional[str] = None
outcome: str = "unknown"
cost: Cost = field(default_factory=Cost)
task_ref: Optional[str] = None
source_path: Optional[str] = None
source_bytes: int = 0
schema_version: int = SCHEMA_VERSION
# watermarks (design §3.1): discovered -> ingested -> analyzed -> evicted
discovered_at: Optional[str] = None
ingested_at: Optional[str] = None
analyzed_at: Optional[str] = None
evicted_at: Optional[str] = None
def __post_init__(self) -> None:
if self.flavor not in FLAVORS:
raise ValueError(f"unknown flavor {self.flavor!r}; expected one of {FLAVORS}")
if self.outcome not in OUTCOMES:
raise ValueError(f"unknown outcome {self.outcome!r}; expected one of {OUTCOMES}")
expected_prefix = f"{self.flavor}:"
if not self.session_uid.startswith(expected_prefix):
raise ValueError(
f"session_uid {self.session_uid!r} must start with {expected_prefix!r}"
)
@property
def is_evictable(self) -> bool:
"""A session may be evicted from Tier 1 only once analyzed (design §3.1)."""
return self.analyzed_at is not None and self.evicted_at is None
@staticmethod
def make_uid(flavor: str, native_session_id: str) -> str:
return f"{flavor}:{native_session_id}"
def to_dict(self) -> dict[str, Any]:
d = asdict(self)
return d
def to_json(self) -> str:
return json.dumps(self.to_dict(), sort_keys=True)
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "Session":
d = dict(d)
cost = d.pop("cost", None)
obj = cls(**{k: v for k, v in d.items() if k in _SESSION_FIELDS})
if cost is not None:
obj.cost = Cost(**{k: v for k, v in cost.items() if k in _COST_FIELDS})
return obj
@classmethod
def from_json(cls, s: str) -> "Session":
return cls.from_dict(json.loads(s))
@dataclass
class SessionEvent:
"""One atomic record within a session (design §4.2)."""
session_uid: str
seq: int # monotonic within session
ts: Optional[str] = None
kind: str = "lifecycle"
parent_seq: Optional[int] = None # turn DAG (Claude); None for flat flavors
role: Optional[str] = None # user|assistant|system|tool
tool: Optional[str] = None # when kind in {tool_call, tool_result}
summary: Optional[str] = None # short, human-readable
payload_ref: Optional[str] = None # pointer to full body in Tier 1 blob store
tokens: int = 0
is_sidechain: bool = False
def __post_init__(self) -> None:
if self.kind not in KINDS:
raise ValueError(f"unknown kind {self.kind!r}; expected one of {KINDS}")
def to_dict(self) -> dict[str, Any]:
return asdict(self)
def to_json(self) -> str:
return json.dumps(self.to_dict(), sort_keys=True)
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "SessionEvent":
return cls(**{k: v for k, v in d.items() if k in _EVENT_FIELDS})
@classmethod
def from_json(cls, s: str) -> "SessionEvent":
return cls.from_dict(json.loads(s))
_SESSION_FIELDS = {f.name for f in fields(Session)}
_COST_FIELDS = {f.name for f in fields(Cost)}
_EVENT_FIELDS = {f.name for f in fields(SessionEvent)}

View File

@@ -0,0 +1,225 @@
"""Two-tier store (design §3, §8).
Tier 1 (bulky, evictable): ``Session`` + ``SessionEvent`` rows in SQLite, with
event bodies written out-of-line as files under a blob dir (referenced by
``payload_ref``). Tier 2 (compact, durable): per-session ``digest`` rows.
Writes are idempotent on ``(session_uid, seq)`` for events and on
``session_uid`` for sessions/digests, so sweeps are safely re-runnable. Eviction
(:meth:`evict_raw`) deletes Tier 1 rows + blobs but keeps the session row and its
Tier 2 digest — the invariant that makes budget-based retention non-lossy.
"""
from __future__ import annotations
import json
import os
import re
import sqlite3
from datetime import datetime, timezone
from typing import Any, Optional
from .schema import Cost, Session, SessionEvent
_SAFE = re.compile(r"[^A-Za-z0-9._-]+")
def _now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
class Store:
def __init__(self, db_path: str, blob_dir: str):
self.db_path = db_path
self.blob_dir = blob_dir
os.makedirs(os.path.dirname(db_path) or ".", exist_ok=True)
os.makedirs(blob_dir, exist_ok=True)
self.db = sqlite3.connect(db_path)
self.db.row_factory = sqlite3.Row
self.db.execute("PRAGMA journal_mode=WAL")
self._init_schema()
def close(self) -> None:
self.db.close()
def __enter__(self) -> "Store":
return self
def __exit__(self, *exc) -> None:
self.close()
def _init_schema(self) -> None:
self.db.executescript(
"""
CREATE TABLE IF NOT EXISTS sessions (
session_uid TEXT PRIMARY KEY,
json TEXT NOT NULL,
analyzed_at TEXT,
evicted_at TEXT
);
CREATE TABLE IF NOT EXISTS events (
session_uid TEXT NOT NULL,
seq INTEGER NOT NULL,
json TEXT NOT NULL,
PRIMARY KEY (session_uid, seq)
);
CREATE TABLE IF NOT EXISTS blobs (
ref TEXT PRIMARY KEY,
session_uid TEXT NOT NULL,
path TEXT NOT NULL,
nbytes INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS digests (
session_uid TEXT PRIMARY KEY,
json TEXT NOT NULL,
nbytes INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS ix_events_uid ON events(session_uid);
CREATE INDEX IF NOT EXISTS ix_blobs_uid ON blobs(session_uid);
"""
)
self.db.commit()
# ---- Tier 1 writes -----------------------------------------------------
def upsert_session(self, s: Session) -> None:
self.db.execute(
"INSERT INTO sessions(session_uid, json, analyzed_at, evicted_at) "
"VALUES(?,?,?,?) ON CONFLICT(session_uid) DO UPDATE SET "
"json=excluded.json, analyzed_at=excluded.analyzed_at, evicted_at=excluded.evicted_at",
(s.session_uid, s.to_json(), s.analyzed_at, s.evicted_at),
)
self.db.commit()
def upsert_events(self, events: list[SessionEvent]) -> int:
rows = [(e.session_uid, e.seq, e.to_json()) for e in events]
self.db.executemany(
"INSERT INTO events(session_uid, seq, json) VALUES(?,?,?) "
"ON CONFLICT(session_uid, seq) DO UPDATE SET json=excluded.json",
rows,
)
self.db.commit()
return len(rows)
def write_blobs(self, session_uid: str, blobs: dict[str, str]) -> int:
"""Write event bodies as files; record path + size. Returns bytes written."""
total = 0
sub = os.path.join(self.blob_dir, _SAFE.sub("_", session_uid))
os.makedirs(sub, exist_ok=True)
for ref, body in blobs.items():
data = body.encode("utf-8")
fname = _SAFE.sub("_", ref) + ".txt"
path = os.path.join(sub, fname)
with open(path, "w", encoding="utf-8") as f:
f.write(body)
self.db.execute(
"INSERT INTO blobs(ref, session_uid, path, nbytes) VALUES(?,?,?,?) "
"ON CONFLICT(ref) DO UPDATE SET path=excluded.path, nbytes=excluded.nbytes",
(ref, session_uid, path, len(data)),
)
total += len(data)
self.db.commit()
return total
def ingest(self, bundle) -> None:
"""Persist a full Normalized bundle (session + events + blobs)."""
s = bundle.session
if s.ingested_at is None:
s.ingested_at = _now()
self.upsert_session(s)
self.upsert_events(bundle.events)
self.write_blobs(s.session_uid, bundle.blobs)
# ---- Tier 2 (digest) ---------------------------------------------------
def write_digest(self, session_uid: str, digest: dict[str, Any], analyzed_at: Optional[str] = None) -> None:
payload = json.dumps(digest, sort_keys=True)
self.db.execute(
"INSERT INTO digests(session_uid, json, nbytes) VALUES(?,?,?) "
"ON CONFLICT(session_uid) DO UPDATE SET json=excluded.json, nbytes=excluded.nbytes",
(session_uid, payload, len(payload.encode("utf-8"))),
)
self.db.execute(
"UPDATE sessions SET analyzed_at=? WHERE session_uid=?",
(analyzed_at or _now(), session_uid),
)
self.db.commit()
def get_digest(self, session_uid: str) -> Optional[dict[str, Any]]:
row = self.db.execute("SELECT json FROM digests WHERE session_uid=?", (session_uid,)).fetchone()
return json.loads(row["json"]) if row else None
# ---- reads -------------------------------------------------------------
def get_session(self, session_uid: str) -> Optional[Session]:
row = self.db.execute(
"SELECT json, analyzed_at, evicted_at FROM sessions WHERE session_uid=?", (session_uid,)
).fetchone()
return self._row_to_session(row) if row else None
def list_sessions(self) -> list[Session]:
rows = self.db.execute("SELECT json, analyzed_at, evicted_at FROM sessions")
return [self._row_to_session(r) for r in rows]
@staticmethod
def _row_to_session(row) -> Session:
"""Rebuild a Session, treating the watermark columns as authoritative."""
s = Session.from_json(row["json"])
s.analyzed_at = row["analyzed_at"]
s.evicted_at = row["evicted_at"]
return s
def get_events(self, session_uid: str) -> list[SessionEvent]:
rows = self.db.execute(
"SELECT json FROM events WHERE session_uid=? ORDER BY seq", (session_uid,)
).fetchall()
return [SessionEvent.from_json(r["json"]) for r in rows]
def count_events(self, session_uid: str) -> int:
return self.db.execute(
"SELECT COUNT(*) c FROM events WHERE session_uid=?", (session_uid,)
).fetchone()["c"]
# ---- usage accounting (drives retention) -------------------------------
def tier1_usage_bytes(self) -> int:
"""Bytes held in Tier 1: event-row JSON + blob bytes for non-evicted sessions."""
row = self.db.execute(
"SELECT COALESCE(SUM(LENGTH(json)),0) b FROM events e "
"WHERE NOT EXISTS (SELECT 1 FROM sessions s "
"WHERE s.session_uid=e.session_uid AND s.evicted_at IS NOT NULL)"
).fetchone()
blob = self.db.execute("SELECT COALESCE(SUM(nbytes),0) b FROM blobs").fetchone()
return int(row["b"]) + int(blob["b"])
def session_tier1_bytes(self, session_uid: str) -> int:
ev = self.db.execute(
"SELECT COALESCE(SUM(LENGTH(json)),0) b FROM events WHERE session_uid=?", (session_uid,)
).fetchone()["b"]
bl = self.db.execute(
"SELECT COALESCE(SUM(nbytes),0) b FROM blobs WHERE session_uid=?", (session_uid,)
).fetchone()["b"]
return int(ev) + int(bl)
def tier2_usage_bytes(self) -> int:
return int(self.db.execute("SELECT COALESCE(SUM(nbytes),0) b FROM digests").fetchone()["b"])
# ---- eviction ----------------------------------------------------------
def evict_raw(self, session_uid: str) -> int:
"""Drop Tier 1 raw (events + blob files) for a session; keep digest + row.
Sets ``evicted_at``. Returns bytes freed. Safe to call on an
already-evicted session (no-op-ish).
"""
freed = self.session_tier1_bytes(session_uid)
for r in self.db.execute("SELECT path FROM blobs WHERE session_uid=?", (session_uid,)).fetchall():
try:
os.remove(r["path"])
except FileNotFoundError:
pass
self.db.execute("DELETE FROM blobs WHERE session_uid=?", (session_uid,))
self.db.execute("DELETE FROM events WHERE session_uid=?", (session_uid,))
self.db.execute("UPDATE sessions SET evicted_at=? WHERE session_uid=?", (_now(), session_uid))
self.db.commit()
return freed

128
session_memory/ingest.py Normal file
View File

@@ -0,0 +1,128 @@
"""Session-memory sweep entrypoint (design §7; T06).
One sweep: discover (per enabled source) -> normalize (adapter) -> store ->
digest -> retention-evict. Idempotent and re-runnable; intended to be triggered
on the configured cadence (``/schedule`` daily/weekly) or by an agent hook.
Usage:
python -m session_memory.ingest [--config PATH] [--once] [--dry-run]
"""
from __future__ import annotations
import argparse
import glob
import os
import sys
import tomllib
from dataclasses import dataclass, field
from typing import Any
from .adapters import claude as claude_adapter
from .core import digest as digest_mod
from .core.cursor import Cursors
from .core.retention import RetentionConfig, sweep as retention_sweep
from .core.store import Store
# adapter dispatch by source name
_ADAPTERS = {"claude": claude_adapter.parse_session}
@dataclass
class SweepResult:
discovered: int = 0
ingested: int = 0
skipped_unchanged: int = 0
analyzed: int = 0
warnings: list[str] = field(default_factory=list)
retention: Any = None
def _expand(p: str) -> str:
return os.path.expanduser(p)
def load_config(path: str) -> dict[str, Any]:
with open(path, "rb") as f:
return tomllib.load(f)
def run_sweep(config: dict[str, Any], *, dry_run: bool = False) -> SweepResult:
store_cfg = config.get("store", {})
ret_cfg = config.get("retention", {})
repo_map = config.get("repo_domain_map", {})
res = SweepResult()
# In dry-run we only discover + parse: no store is created or written.
store = None if dry_run else Store(_expand(store_cfg["db_path"]), _expand(store_cfg["blob_dir"]))
cursors = Cursors(_expand(store_cfg["cursor"]))
for name, src in config.get("sources", {}).items():
if not src.get("enabled"):
continue
parse = _ADAPTERS.get(name)
if parse is None:
res.warnings.append(f"no adapter for source {name!r} (Phase 1)")
continue
root = _expand(src["root"])
for fp in sorted(glob.glob(os.path.join(root, src["glob"]))):
res.discovered += 1
if not cursors.is_changed(fp):
res.skipped_unchanged += 1
continue
try:
bundle = parse(fp, repo_map)
except Exception as e: # one bad file must not abort the sweep
res.warnings.append(f"parse failed {fp}: {e}")
continue
if bundle is None:
cursors.mark(fp)
continue
if not dry_run:
store.ingest(bundle)
digest_mod.analyze(store, bundle.session.session_uid)
res.analyzed += 1
res.ingested += 1
cursors.mark(fp)
if not dry_run and store is not None:
cursors.save()
rc = RetentionConfig(
raw_soft_cap_bytes=int(ret_cfg.get("raw_soft_cap_bytes", RetentionConfig.raw_soft_cap_bytes)),
raw_hard_cap_bytes=int(ret_cfg.get("raw_hard_cap_bytes", RetentionConfig.raw_hard_cap_bytes)),
raw_max_age_days=int(ret_cfg.get("raw_max_age_days", RetentionConfig.raw_max_age_days)),
distilled_cap_bytes=int(ret_cfg.get("distilled_cap_bytes", RetentionConfig.distilled_cap_bytes)),
)
res.retention = retention_sweep(store, rc, analyze_fn=digest_mod.analyze)
res.warnings.extend(res.retention.warnings)
if store is not None:
store.close()
return res
def main(argv: list[str] | None = None) -> int:
here = os.path.dirname(os.path.abspath(__file__))
ap = argparse.ArgumentParser(description="Run one coding-session-memory sweep.")
ap.add_argument("--config", default=os.path.join(here, "config.toml"))
ap.add_argument("--dry-run", action="store_true", help="discover + parse, but do not write or evict")
ap.add_argument("--once", action="store_true", help="(default) run a single sweep")
args = ap.parse_args(argv)
config = load_config(args.config)
res = run_sweep(config, dry_run=args.dry_run)
print(f"discovered={res.discovered} ingested={res.ingested} "
f"skipped_unchanged={res.skipped_unchanged} analyzed={res.analyzed}")
if res.retention is not None:
r = res.retention
print(f"retention: freed={r.bytes_freed}B final_usage={r.final_usage_bytes}B "
f"backstop={len(r.backstop_evicted)} budget={len(r.budget_evicted)} "
f"overflow_analyzed={len(r.overflow_analyzed)} data_loss={len(r.overflow_data_loss)}")
for w in res.warnings:
print(f" WARN: {w}", file=sys.stderr)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,99 @@
"""Claude adapter tests (T02): synthetic fixture + a real on-disk session."""
import glob
import json
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.adapters.claude import parse_session # noqa: E402
REPO_MAP = {"agentic-resources": "helix_forge"}
def _write_jsonl(path, records):
with open(path, "w", encoding="utf-8") as f:
for r in records:
f.write(json.dumps(r) + "\n")
def test_synthetic_session(tmp_path):
p = tmp_path / "11111111-2222-3333-4444-555555555555.jsonl"
_write_jsonl(p, [
{"type": "user", "uuid": "u1", "parentUuid": None,
"timestamp": "2026-06-06T10:00:00Z", "sessionId": "sess-1",
"cwd": "/home/worsch/agentic-resources", "gitBranch": "main",
"version": "1.0", "message": {"role": "user", "content": "fix the bug"}},
{"type": "assistant", "uuid": "a1", "parentUuid": "u1",
"timestamp": "2026-06-06T10:00:05Z", "sessionId": "sess-1",
"message": {"role": "assistant", "model": "claude-opus-4-8",
"usage": {"input_tokens": 100, "output_tokens": 20,
"cache_read_input_tokens": 10},
"content": [
{"type": "thinking", "thinking": "let me look"},
{"type": "text", "text": "I'll edit the file."},
{"type": "tool_use", "name": "Edit",
"input": {"file_path": "x.py", "old_string": "a", "new_string": "b"}},
{"type": "tool_use", "name": "Bash",
"input": {"command": "pytest -q"}},
]}},
{"type": "user", "uuid": "u2", "parentUuid": "a1",
"timestamp": "2026-06-06T10:00:10Z", "sessionId": "sess-1",
"message": {"role": "user",
"content": [{"type": "tool_result", "content": "6 passed"}]}},
])
norm = parse_session(str(p), REPO_MAP)
assert norm is not None
s = norm.session
assert s.session_uid == "claude:sess-1"
assert s.repo == "agentic-resources" and s.domain == "helix_forge"
assert s.model == "claude-opus-4-8"
assert s.cost.input_tokens == 100 and s.cost.output_tokens == 20
assert s.cost.cache_tokens == 10
assert s.cost.turns == 1
assert s.cost.wall_clock_s == 10.0
kinds = [e.kind for e in norm.events]
assert kinds == ["user_msg", "thinking", "assistant_msg", "edit", "test_run", "tool_result"]
# turn DAG: assistant events link back to the first user msg (seq 0)
edit_ev = next(e for e in norm.events if e.kind == "edit")
assert edit_ev.parent_seq == 0
assert edit_ev.tool == "Edit"
# bodies captured as blobs, referenced by payload_ref
assert edit_ev.payload_ref in norm.blobs
assert "x.py" in norm.blobs[edit_ev.payload_ref]
def test_sidechain_filename_marks_events(tmp_path):
p = tmp_path / "agent-deadbeef.jsonl"
_write_jsonl(p, [
{"type": "assistant", "uuid": "a1", "sessionId": "side-1",
"timestamp": "2026-06-06T10:00:00Z",
"message": {"role": "assistant", "content": [{"type": "text", "text": "hi"}]}},
])
norm = parse_session(str(p), REPO_MAP)
assert norm.events[0].is_sidechain is True
def test_real_local_session_if_available():
"""Smoke-parse a real Claude transcript on this workstation, if present."""
base = os.path.expanduser("~/.claude/projects/-home-worsch-agentic-resources")
files = sorted(glob.glob(os.path.join(base, "*.jsonl")))
if not files:
return # environment without local sessions; synthetic tests cover logic
parsed = 0
for fp in files:
norm = parse_session(fp, REPO_MAP)
if norm is None:
continue
parsed += 1
assert norm.session.session_uid.startswith("claude:")
# seq is monotonic and unique
seqs = [e.seq for e in norm.events]
assert seqs == sorted(seqs)
assert len(seqs) == len(set(seqs))
assert parsed >= 1

82
tests/test_digest.py Normal file
View File

@@ -0,0 +1,82 @@
"""Digest tests (T04): outcome heuristic + Tier 2 promotion."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.adapters.claude import Normalized # noqa: E402
from session_memory.core.digest import analyze, build_digest, infer_outcome # noqa: E402
from session_memory.core.schema import Cost, Session, SessionEvent # noqa: E402
from session_memory.core.store import Store # noqa: E402
def _ev(uid, seq, kind, **kw):
return SessionEvent(session_uid=uid, seq=seq, kind=kind, **kw)
def test_infer_outcome_abandoned():
uid = "claude:s"
assert infer_outcome([_ev(uid, 0, "user_msg")]) == "abandoned"
def test_infer_outcome_success_on_passing_test():
uid = "claude:s"
events = [
_ev(uid, 0, "user_msg"),
_ev(uid, 1, "assistant_msg"),
_ev(uid, 2, "test_run", tool="Bash"),
_ev(uid, 3, "tool_result", payload_ref="b3"),
]
assert infer_outcome(events, {"b3": "6 passed in 0.4s"}) == "success"
def test_infer_outcome_fail_on_failing_test():
uid = "claude:s"
events = [
_ev(uid, 0, "user_msg"),
_ev(uid, 1, "assistant_msg"),
_ev(uid, 2, "test_run", tool="Bash"),
_ev(uid, 3, "tool_result", payload_ref="b3"),
]
assert infer_outcome(events, {"b3": "1 failed, traceback ..."}) == "fail"
def test_build_digest_histograms_and_markers():
uid = "claude:s"
s = Session(session_uid=uid, flavor="claude", native_session_id="s",
repo="agentic-resources", cost=Cost(input_tokens=100, output_tokens=40, turns=2))
events = [
_ev(uid, 0, "user_msg"),
_ev(uid, 1, "edit", tool="Edit"),
_ev(uid, 2, "edit", tool="Write"),
_ev(uid, 3, "test_run", tool="Bash"),
_ev(uid, 4, "error"),
_ev(uid, 5, "assistant_msg"),
]
d = build_digest(s, events)
assert d["tool_histogram"] == {"Edit": 1, "Write": 1, "Bash": 1}
assert d["markers"]["edits"] == 2
assert d["markers"]["errors"] == 1
assert d["markers"]["test_runs"] == 1
assert d["event_count"] == 6
assert d["cost"]["input_tokens"] == 100
def test_analyze_writes_digest_and_sets_analyzed(tmp_path):
st = Store(str(tmp_path / "m.db"), str(tmp_path / "blobs"))
uid = Session.make_uid("claude", "s1")
s = Session(session_uid=uid, flavor="claude", native_session_id="s1")
events = [
SessionEvent(session_uid=uid, seq=0, kind="user_msg", payload_ref="b0"),
SessionEvent(session_uid=uid, seq=1, kind="assistant_msg", payload_ref="b1"),
]
blobs = {"b0": "please help", "b1": "done"}
st.ingest(Normalized(session=s, events=events, blobs=blobs))
assert st.get_session(uid).is_evictable is False
d = analyze(st, uid)
assert d["outcome"] == "success"
assert d["first_prompt"] == "please help"
assert st.get_session(uid).analyzed_at is not None
assert st.get_session(uid).is_evictable is True # now promoted -> evictable

81
tests/test_ingest.py Normal file
View File

@@ -0,0 +1,81 @@
"""Ingest sweep + cursor tests (T06)."""
import json
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.core.cursor import Cursors # noqa: E402
from session_memory.ingest import run_sweep # noqa: E402
def test_cursor_change_detection(tmp_path):
f = tmp_path / "a.jsonl"
f.write_text("{}\n")
cur = Cursors(str(tmp_path / "cur.json"))
assert cur.is_changed(str(f)) is True
cur.mark(str(f))
assert cur.is_changed(str(f)) is False
f.write_text("{}\n{}\n") # grow
assert cur.is_changed(str(f)) is True
def _claude_session_file(dir_path, native):
os.makedirs(dir_path, exist_ok=True)
p = os.path.join(dir_path, f"{native}.jsonl")
recs = [
{"type": "user", "uuid": "u1", "sessionId": native,
"timestamp": "2026-06-06T10:00:00Z", "cwd": "/home/worsch/agentic-resources",
"gitBranch": "main", "message": {"role": "user", "content": "hi"}},
{"type": "assistant", "uuid": "a1", "parentUuid": "u1", "sessionId": native,
"timestamp": "2026-06-06T10:00:02Z",
"message": {"role": "assistant", "model": "claude-opus-4-8",
"usage": {"input_tokens": 5, "output_tokens": 2},
"content": [{"type": "text", "text": "hello"}]}},
]
with open(p, "w", encoding="utf-8") as f:
for r in recs:
f.write(json.dumps(r) + "\n")
return p
def _config(tmp_path, projects_dir):
return {
"store": {
"db_path": str(tmp_path / ".store/mem.db"),
"blob_dir": str(tmp_path / ".store/blobs"),
"cursor": str(tmp_path / ".store/cursors.json"),
},
"retention": {"raw_soft_cap_bytes": 10**12, "raw_hard_cap_bytes": 10**12,
"raw_max_age_days": 10**6, "distilled_cap_bytes": 10**12},
"sources": {"claude": {"enabled": True, "root": str(projects_dir), "glob": "*/*.jsonl"}},
"repo_domain_map": {"agentic-resources": "helix_forge"},
}
def test_run_sweep_end_to_end(tmp_path):
projects = tmp_path / "projects"
_claude_session_file(str(projects / "-home-worsch-agentic-resources"), "sess-aaa")
cfg = _config(tmp_path, projects)
res = run_sweep(cfg)
assert res.discovered == 1
assert res.ingested == 1
assert res.analyzed == 1
assert res.retention is not None
# re-run: cursor skips the unchanged file (idempotent, cheap)
res2 = run_sweep(cfg)
assert res2.skipped_unchanged == 1
assert res2.ingested == 0
def test_dry_run_writes_nothing(tmp_path):
projects = tmp_path / "projects"
_claude_session_file(str(projects / "-home-worsch-agentic-resources"), "sess-bbb")
cfg = _config(tmp_path, projects)
res = run_sweep(cfg, dry_run=True)
assert res.discovered == 1 and res.ingested == 1
assert res.retention is None
assert not os.path.exists(cfg["store"]["db_path"]) # no store created

107
tests/test_retention.py Normal file
View File

@@ -0,0 +1,107 @@
"""Retention tests (T05): each pass of the budget-based eviction, with tiny caps."""
import os
import sys
from datetime import datetime, timedelta, timezone
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.adapters.claude import Normalized # noqa: E402
from session_memory.core import digest as digest_mod # noqa: E402
from session_memory.core.retention import RetentionConfig, sweep # noqa: E402
from session_memory.core.schema import Cost, Session, SessionEvent # noqa: E402
from session_memory.core.store import Store # noqa: E402
NOW = datetime(2026, 6, 6, tzinfo=timezone.utc)
def _ingest(st, native, *, body_bytes=1000, ended=None, analyze=False):
uid = Session.make_uid("claude", native)
s = Session(session_uid=uid, flavor="claude", native_session_id=native,
ended_at=(ended or NOW).strftime("%Y-%m-%dT%H:%M:%SZ"),
ingested_at=NOW.strftime("%Y-%m-%dT%H:%M:%SZ"))
ref = f"blob://{native}/0"
events = [SessionEvent(session_uid=uid, seq=0, kind="assistant_msg", payload_ref=ref)]
st.ingest(Normalized(session=s, events=events, blobs={ref: "x" * body_bytes}))
if analyze:
digest_mod.analyze(st, uid)
return uid
def _store(tmp_path):
return Store(str(tmp_path / "m.db"), str(tmp_path / "blobs"))
def test_backstop_evicts_old_analyzed_only(tmp_path):
st = _store(tmp_path)
old = _ingest(st, "old", ended=NOW - timedelta(days=60), analyze=True)
young = _ingest(st, "young", ended=NOW - timedelta(days=1), analyze=True)
unanalyzed_old = _ingest(st, "oldraw", ended=NOW - timedelta(days=60), analyze=False)
cfg = RetentionConfig(raw_soft_cap_bytes=10**12, raw_hard_cap_bytes=10**12, raw_max_age_days=45)
rep = sweep(st, cfg, now=NOW)
assert old in rep.backstop_evicted
assert young not in rep.backstop_evicted # too recent
assert unanalyzed_old not in rep.backstop_evicted # not analyzed -> protected
assert st.get_session(old).evicted_at is not None
assert st.get_session(unanalyzed_old).evicted_at is None
def test_budget_pass_evicts_oldest_analyzed_first(tmp_path):
st = _store(tmp_path)
a = _ingest(st, "a", body_bytes=2000, ended=NOW - timedelta(days=3), analyze=True)
b = _ingest(st, "b", body_bytes=2000, ended=NOW - timedelta(days=2), analyze=True)
c = _ingest(st, "c", body_bytes=2000, ended=NOW - timedelta(days=1), analyze=True)
# soft cap that forces evicting ~two of the three
cfg = RetentionConfig(raw_soft_cap_bytes=2500, raw_hard_cap_bytes=10**9, raw_max_age_days=10**6)
rep = sweep(st, cfg, now=NOW)
assert rep.budget_evicted[:2] == [a, b] # oldest-first
assert st.get_session(c).evicted_at is None # newest survives
assert st.tier1_usage_bytes() <= cfg.raw_soft_cap_bytes
def test_budget_pass_never_touches_unanalyzed(tmp_path):
st = _store(tmp_path)
raw1 = _ingest(st, "r1", body_bytes=5000, analyze=False)
raw2 = _ingest(st, "r2", body_bytes=5000, analyze=False)
cfg = RetentionConfig(raw_soft_cap_bytes=100, raw_hard_cap_bytes=10**9, raw_max_age_days=10**6)
rep = sweep(st, cfg, now=NOW)
# over soft cap but nothing analyzed -> no eviction, no data loss
assert rep.budget_evicted == []
assert rep.lost_data is False
assert st.get_session(raw1).evicted_at is None
assert st.get_session(raw2).evicted_at is None
assert st.tier1_usage_bytes() > cfg.raw_soft_cap_bytes # tolerated, not destroyed
def test_overflow_analyzes_then_evicts_without_data_loss(tmp_path):
st = _store(tmp_path)
r1 = _ingest(st, "r1", body_bytes=4000, ended=NOW - timedelta(days=2), analyze=False)
r2 = _ingest(st, "r2", body_bytes=4000, ended=NOW - timedelta(days=1), analyze=False)
cfg = RetentionConfig(raw_soft_cap_bytes=3000, raw_hard_cap_bytes=5000, raw_max_age_days=10**6)
rep = sweep(st, cfg, now=NOW, analyze_fn=digest_mod.analyze)
# overflow path analyzed the un-analyzed sessions, then budget-evicted
assert set(rep.overflow_analyzed) == {r1, r2}
assert rep.lost_data is False # analysis avoided data loss
assert st.tier1_usage_bytes() <= cfg.raw_soft_cap_bytes
def test_overflow_last_resort_reports_data_loss(tmp_path):
st = _store(tmp_path)
# one un-analyzed session bigger than the hard cap, analysis disabled (no fn)
big = _ingest(st, "big", body_bytes=20000, analyze=False)
cfg = RetentionConfig(raw_soft_cap_bytes=1000, raw_hard_cap_bytes=2000, raw_max_age_days=10**6)
rep = sweep(st, cfg, now=NOW, analyze_fn=None)
assert big in rep.overflow_data_loss
assert rep.lost_data is True
assert any("data_loss" in w for w in rep.warnings)
assert st.get_session(big).evicted_at is not None

97
tests/test_schema.py Normal file
View File

@@ -0,0 +1,97 @@
"""Round-trip + validation tests for the normalized schema (T01)."""
import os
import sys
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.core.schema import ( # noqa: E402
SCHEMA_VERSION,
Cost,
Session,
SessionEvent,
)
def _sample_session() -> Session:
return Session(
session_uid=Session.make_uid("claude", "abc-123"),
flavor="claude",
native_session_id="abc-123",
repo="agentic-resources",
domain="helix_forge",
cwd="/home/worsch/agentic-resources",
git_branch="main",
model="claude-opus-4-8",
started_at="2026-06-06T10:00:00Z",
ended_at="2026-06-06T10:15:00Z",
outcome="success",
cost=Cost(input_tokens=100, output_tokens=50, turns=3, retries=1),
task_ref="AGENTIC-WP-0002-T01",
source_path="~/.claude/projects/x/abc-123.jsonl",
source_bytes=2048,
ingested_at="2026-06-06T10:16:00Z",
)
def test_session_round_trip():
s = _sample_session()
restored = Session.from_json(s.to_json())
assert restored == s
assert restored.cost == s.cost
assert restored.schema_version == SCHEMA_VERSION
def test_session_uid_helper_and_prefix_enforced():
assert Session.make_uid("grok", "z9") == "grok:z9"
with pytest.raises(ValueError):
Session(session_uid="codex:wrong", flavor="claude", native_session_id="wrong")
def test_unknown_flavor_and_outcome_rejected():
with pytest.raises(ValueError):
Session(session_uid="x:1", flavor="x", native_session_id="1")
with pytest.raises(ValueError):
Session(
session_uid="claude:1",
flavor="claude",
native_session_id="1",
outcome="bogus",
)
def test_is_evictable_requires_analyzed_not_evicted():
s = _sample_session()
assert s.is_evictable is False # not analyzed yet
s.analyzed_at = "2026-06-06T10:17:00Z"
assert s.is_evictable is True
s.evicted_at = "2026-06-06T11:00:00Z"
assert s.is_evictable is False # already evicted
def test_event_round_trip_and_kind_validation():
e = SessionEvent(
session_uid="claude:abc-123",
seq=4,
parent_seq=3,
ts="2026-06-06T10:01:00Z",
kind="tool_call",
role="assistant",
tool="Bash",
summary="ran pytest -q",
payload_ref="blob://abc-123/4",
tokens=12,
)
assert SessionEvent.from_json(e.to_json()) == e
with pytest.raises(ValueError):
SessionEvent(session_uid="claude:1", seq=0, kind="not_a_kind")
def test_from_dict_ignores_unknown_fields():
d = _sample_session().to_dict()
d["future_field"] = "ignored"
d["cost"]["future_cost"] = 999
restored = Session.from_dict(d)
assert restored.repo == "agentic-resources"

82
tests/test_store.py Normal file
View File

@@ -0,0 +1,82 @@
"""Store tests (T03): idempotent ingest, usage accounting, eviction."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.adapters.claude import Normalized # noqa: E402
from session_memory.core.schema import Cost, Session, SessionEvent # noqa: E402
from session_memory.core.store import Store # noqa: E402
def _bundle(uid_native="s1", n_events=3):
s = Session(
session_uid=Session.make_uid("claude", uid_native),
flavor="claude", native_session_id=uid_native,
repo="agentic-resources", domain="helix_forge",
cost=Cost(input_tokens=10),
)
events, blobs = [], {}
for i in range(n_events):
ref = f"blob://{uid_native}/{i}"
events.append(SessionEvent(session_uid=s.session_uid, seq=i, kind="assistant_msg",
payload_ref=ref, summary=f"msg {i}"))
blobs[ref] = "x" * 100
return Normalized(session=s, events=events, blobs=blobs)
def _store(tmp_path):
return Store(str(tmp_path / "mem.db"), str(tmp_path / "blobs"))
def test_ingest_and_read_back(tmp_path):
st = _store(tmp_path)
b = _bundle("s1", 3)
st.ingest(b)
s = st.get_session(b.session.session_uid)
assert s is not None and s.ingested_at is not None
assert st.count_events(b.session.session_uid) == 3
assert st.tier1_usage_bytes() > 0
def test_ingest_is_idempotent(tmp_path):
st = _store(tmp_path)
b = _bundle("s1", 3)
st.ingest(b)
before = st.tier1_usage_bytes()
st.ingest(b) # re-run same sweep
assert st.count_events(b.session.session_uid) == 3 # no duplicate rows
assert st.tier1_usage_bytes() == before # blobs upserted, not doubled
def test_digest_sets_analyzed_and_tier2_bytes(tmp_path):
st = _store(tmp_path)
b = _bundle("s1", 2)
st.ingest(b)
assert st.get_session(b.session.session_uid).analyzed_at is None
st.write_digest(b.session.session_uid, {"outcome": "success", "tools": {"Edit": 1}})
assert st.get_session(b.session.session_uid).analyzed_at is not None
assert st.tier2_usage_bytes() > 0
assert st.get_digest(b.session.session_uid)["outcome"] == "success"
def test_evict_raw_keeps_digest_drops_raw(tmp_path):
st = _store(tmp_path)
b = _bundle("s1", 3)
st.ingest(b)
st.write_digest(b.session.session_uid, {"outcome": "unknown"})
blob_dir_files_before = sum(len(f) for _, _, f in os.walk(str(tmp_path / "blobs")))
assert blob_dir_files_before > 0
freed = st.evict_raw(b.session.session_uid)
assert freed > 0
assert st.count_events(b.session.session_uid) == 0 # raw gone
assert st.get_events(b.session.session_uid) == []
assert st.get_session(b.session.session_uid).evicted_at is not None
assert st.get_digest(b.session.session_uid) is not None # Tier 2 preserved
# blob files removed from disk
remaining = [f for _, _, fs in os.walk(str(tmp_path / "blobs")) for f in fs]
assert remaining == []
# evicted session no longer counts toward Tier 1 usage
assert st.tier1_usage_bytes() == 0

View File

@@ -0,0 +1,143 @@
---
id: AGENTIC-WP-0002
type: workplan
title: "Coding Session Memory — Phase 0 (Capture + budget-based retention)"
domain: helix_forge
repo: agentic-resources
status: finished
owner: codex
topic_slug: helix-forge
created: "2026-06-06"
updated: "2026-06-06"
state_hub_workstream_id: "06e6726d-057d-47d8-84f4-0974858f6288"
---
# Coding Session Memory — Phase 0
Implements Phase 0 of [PRD-helix-forge](../docs/PRD-helix-forge.md) per the
[session-memory design](../docs/DESIGN-session-memory.md): a normalized session
schema, the first (Claude) collector, a two-tier store, and a budget-based
eviction sweep that drops analyzed/over-budget raw content while preserving
compact digests.
Scope is deliberately one agent flavor (Claude, schema verified on disk) end to
end, so the agnostic-core / thin-adapter boundary is proven before Codex and Grok
adapters land in Phase 1.
## Define Normalized Session Schema
```task
id: AGENTIC-WP-0002-T01
status: done
priority: high
state_hub_task_id: "61297a16-257c-4579-bd1f-3db035781258"
```
Implement `core/schema.py` with the `Session` and `SessionEvent` dataclasses from
design §4, including `schema_version`, the `flavor`-prefixed `session_uid`, the
cost block, and the `discovered/ingested/analyzed/evicted` watermarks. Add
round-trip (de)serialization tests. This is the contract every adapter targets.
## Claude Collector Adapter
```task
id: AGENTIC-WP-0002-T02
status: done
priority: high
state_hub_task_id: "3b4e6b35-b4f3-40dc-a845-7ac78aa20d62"
```
Implement `adapters/claude.py`: read `~/.claude/projects/<cwd>/<uuid>.jsonl`,
discriminate on `type`, reconstruct the turn DAG via `uuid`/`parentUuid`, map
records onto `SessionEvent.kind`, capture `message.usage` into the cost block,
handle `agent-*.jsonl` sidechains (`is_sidechain`), and resolve `repo`/`domain`
from `cwd`. Verify against real local sessions in this repo's project dir. No
Codex/Grok work in Phase 0 (designed for, not built).
## Tier 1 / Tier 2 Store
```task
id: AGENTIC-WP-0002-T03
status: done
priority: high
state_hub_task_id: "2387258e-ba6d-4a41-919e-f2f4e0822110"
```
Implement `core/store.py`: SQLite for `Session`/`SessionEvent` rows plus a blob
dir for `payload_ref` bodies (Tier 1), and a compact `digest` table (Tier 2).
Writes are idempotent on `(session_uid, seq)`. Provide usage-bytes accounting for
Tier 1 (rows + blobs) and Tier 2, used by retention.
## Session Digest (Tier 1 → Tier 2)
```task
id: AGENTIC-WP-0002-T04
status: done
priority: medium
state_hub_task_id: "017d8e90-633a-49f2-b342-8690938798cd"
```
Implement `core/digest.py`: produce a per-session digest (outcome heuristic, cost
totals, tool histogram, error/retry/human-intervention markers, key snippets) and
set `analyzed_at`. This is the promotion step that makes a session evictable.
Signal extraction beyond the digest stays stubbed for the Detect phase.
## Budget-Based Retention Sweep
```task
id: AGENTIC-WP-0002-T05
status: done
priority: high
state_hub_task_id: "89177c79-528e-4023-a7eb-67f8e0276ba9"
```
Implement `core/retention.py` per design §5: backstop pass (`raw_max_age_days`),
budget pass (evict oldest `analyzed_at` first while over `raw_soft_cap_bytes`,
never touching un-analyzed sessions), and the hard-cap overflow path (analyze-now,
then last-resort evict oldest un-analyzed with a reported `data_loss` event).
Enforce the invariant: raw bytes are never dropped before the Tier 2 digest
exists (except the reported overflow path). Cover each branch with tests using
synthetic sessions and tiny caps.
## Ingest Cursor + Sweep Entrypoint
```task
id: AGENTIC-WP-0002-T06
status: done
priority: medium
state_hub_task_id: "a4b35c76-154d-4e99-b6d0-61cb6e47ecc0"
```
Implement `core/cursor.py` (per-source `(path,size,mtime,offset)` cursors,
idempotent re-runs) and `ingest.py` wiring one sweep: discover → normalize
(Claude) → store → digest → evict. Add `config.toml` with the §5.1 retention
knobs, source paths, and repo→domain map. Document running a sweep and the
intended `cadence` trigger (`/schedule` daily/weekly) in the repo docs.
## Verify End-to-End on Real Sessions
```task
id: AGENTIC-WP-0002-T07
status: done
priority: medium
state_hub_task_id: "98d5cc7c-c285-4556-91a3-a85e0a2bb6df"
```
Run the full sweep against this workstation's real Claude sessions; confirm
normalized rows, digests, idempotent re-run, and an eviction cycle under a small
test cap (analyzed dropped, un-analyzed preserved, overflow reported). Record
results and update the design doc's open questions (esp. OQ4 real per-session
sizes). After workplan file updates, notify the custodian operator to run from
`~/state-hub`:
```bash
make fix-consistency REPO=agentic-resources
```
**Verification results (2026-06-06):** full suite 26/26 green. Live sweep over 85
real local Claude transcripts → 84 parsed (1 empty, tolerated) → 63 distinct
sessions ingested + analyzed. Eviction under a 2 MB soft cap freed 26 MB, ended at
1.34 MB (under cap), zero data loss, and every evicted session kept its Tier 2
digest (invariant holds). Idempotent re-run skipped 84 unchanged, re-ingested 1
(the live session). Outcomes 52 success / 11 abandoned. OQ4 sizes recorded in the
design doc; OQ6 (multi-file sessions) noted as a Phase-1 refinement.