generated from coulomb/repo-seed
Compare commits
7 Commits
0f8382b505
...
7c6f4358ee
| Author | SHA1 | Date | |
|---|---|---|---|
| 7c6f4358ee | |||
| 586ed90948 | |||
| 451fb8f1f3 | |||
| abb888f3ef | |||
| 29fc211a14 | |||
| 1c29a94fa9 | |||
| ffe191d44e |
6
.gitignore
vendored
6
.gitignore
vendored
@@ -174,3 +174,9 @@ cython_debug/
|
||||
# PyPI configuration file
|
||||
.pypirc
|
||||
|
||||
|
||||
# session-memory local store
|
||||
session_memory/.store/
|
||||
__pycache__/
|
||||
*.pyc
|
||||
.pytest_cache/
|
||||
|
||||
380
docs/DESIGN-session-memory.md
Normal file
380
docs/DESIGN-session-memory.md
Normal file
@@ -0,0 +1,380 @@
|
||||
# Design Document — Coding Session Memory
|
||||
|
||||
**Domain:** helix_forge
|
||||
**Repo:** agentic-resources
|
||||
**Status:** Draft v0.1
|
||||
**Author:** Claude (drafted with Bernd Worsch)
|
||||
**Created:** 2026-06-06
|
||||
**Updated:** 2026-06-06
|
||||
**Related:** [PRD-helix-forge.md](./PRD-helix-forge.md) (this is the Capture + storage layer, FR-C* / §8)
|
||||
|
||||
---
|
||||
|
||||
## 1. Purpose
|
||||
|
||||
Helix Forge's loop (Capture → Detect → Curate → Distribute → Measure) needs a
|
||||
durable, bounded **memory of coding sessions**. This document specifies that
|
||||
memory: how we **access** each coding agent's session protocol, how we
|
||||
**normalize** those protocols into one schema, where we **store** the result, and
|
||||
how we **age it out** — preferring a *storage-budget-based* eviction that drops
|
||||
old raw content once it has been analyzed or no longer fits, rather than a naive
|
||||
fixed time window.
|
||||
|
||||
The guiding asymmetry: **raw transcripts are bulky and re-derivable; the distilled
|
||||
analysis is small and precious.** So we keep a *bounded cache* of raw sessions and
|
||||
a *durable, compact* layer of extracted digests/signals. Eviction targets the
|
||||
former, never the latter.
|
||||
|
||||
## 2. Research — How to Access Each Agent's Session Protocol
|
||||
|
||||
All three families persist sessions to the local filesystem as JSONL (plus, for
|
||||
Grok, a per-session directory). All findings below were verified against the live
|
||||
installs on this workstation (`~/.claude`, `~/.grok`) and public docs (Codex; not
|
||||
installed here).
|
||||
|
||||
### 2.1 Claude Code ✅ verified on disk
|
||||
|
||||
| Aspect | Finding |
|
||||
|--------|---------|
|
||||
| Session transcripts | `~/.claude/projects/<url-encoded-cwd>/<session-uuid>.jsonl` — one JSONL per session |
|
||||
| Subagent sidechains | same dir, `agent-<id>.jsonl`; records carry `isSidechain: true` |
|
||||
| Global prompt history | `~/.claude/history.jsonl` |
|
||||
| Record format | one JSON object per line; **`type`** discriminates: `user`, `assistant`, `attachment`, `queue-operation`, `ai-title`, `last-prompt`, `summary`, plus tool-result records |
|
||||
| Key fields | `type`, `timestamp`, `sessionId`, `uuid`, `parentUuid` (turn DAG), `message` (`role` + content blocks: `text`/`thinking`/`tool_use`/`tool_result`), `cwd`, `gitBranch`, `version`, `requestId`, `toolUseResult`, `userType` |
|
||||
| Token usage | inside assistant `message.usage` (input/output/cache tokens) |
|
||||
| Model | `message.model` (e.g. `claude-opus-4-8`) |
|
||||
| Side data | `~/.claude/todos/`, `~/.claude/tasks/`, `~/.claude/file-history/`, `~/.claude/shell-snapshots/` |
|
||||
| Live capture hook | Claude Code **SessionEnd / Stop / SessionStart hooks** can fire our ingest on session close (push), in addition to batch scanning (pull) |
|
||||
|
||||
The turn DAG (`uuid`/`parentUuid`) lets us reconstruct branching, retries, and
|
||||
sidechains exactly.
|
||||
|
||||
### 2.2 OpenAI Codex CLI ✅ schema confirmed from source (not installed locally)
|
||||
|
||||
Schema confirmed from the openai/codex source (`codex-rs/protocol/src/protocol.rs`
|
||||
via DeepWiki) and a reverse-engineering writeup with real example lines — the two
|
||||
cross-agree.
|
||||
|
||||
| Aspect | Finding |
|
||||
|--------|---------|
|
||||
| Session ("rollout") files | `$CODEX_HOME/sessions/YYYY/MM/DD/rollout-*.jsonl` (default `$CODEX_HOME = ~/.codex`) |
|
||||
| Line wrapper (`RolloutLine`) | every line: **`{timestamp, type, payload}`** (UTC ts + a `RolloutItem`) |
|
||||
| `type` discriminator | `session_meta` · `response_item` · `event_msg` · `turn_context` · `compacted` |
|
||||
| `session_meta` | `{id, source, cwd, model_provider, cli_version}` (+ model) — restores env |
|
||||
| `turn_context` | `{model, approval_policy, sandbox_policy}` — per-turn settings snapshot |
|
||||
| `response_item` | raw model output / tool calls; `payload.type` ∈ `message` · `function_call` · `function_call_output` · `reasoning` |
|
||||
| → `message` | `{role: developer\|user\|assistant, content:[{type:"output_text"\|…, text}]}` |
|
||||
| → `function_call` | `{name, arguments (JSON string), call_id}` |
|
||||
| → `function_call_output` | `{call_id, output}` |
|
||||
| `event_msg` | protocol events; `payload.type` ∈ `task_started` · `task_complete` · `user_message` · `agent_message` · `token_count` · lifecycle |
|
||||
| Token usage | `event_msg` with `payload.type = token_count`, interspersed (no fixed cadence) |
|
||||
| Turn linkage | **flat — tool calls/outputs linked by `call_id`, no parent-ref DAG**; causality inferred from temporal order (unlike Claude's `uuid`/`parentUuid`) |
|
||||
| Schema versions | older installs differ ("new ≥0.44 / mid / oldest 2025/08"); adapter version-detects on `session_meta.cli_version` |
|
||||
| Naming / resume | filenames + `session_id` auto-generated; `codex resume --last`; `codex exec` for headless (trajectory-JSON is gh issue #2288) |
|
||||
| Override location | `CODEX_HOME` env var |
|
||||
|
||||
**Adapter notes:** map `event_msg/task_started|task_complete` → `lifecycle`
|
||||
events and outcome; `response_item/message` → `user_msg`/`assistant_msg`;
|
||||
`function_call`+`function_call_output` → `tool_call`/`tool_result` joined on
|
||||
`call_id`; `response_item/reasoning` → `thinking`; `event_msg/token_count` → cost
|
||||
block. Because there is no parent-ref DAG, the adapter assigns `seq`/`parent_seq`
|
||||
from temporal order rather than native links.
|
||||
|
||||
### 2.3 Grok CLI (xAI) ✅ verified on disk
|
||||
|
||||
Grok stores **a directory per session**, which is the richest source of the three.
|
||||
|
||||
| Aspect | Finding |
|
||||
|--------|---------|
|
||||
| Session dir | `~/.grok/sessions/<url-encoded-cwd>/<session-uuid>/` |
|
||||
| `chat_history.jsonl` | full conversation; `type` = `system`/`user`/`assistant` + content |
|
||||
| `events.jsonl` | **structured lifecycle events** — `{ts, type, session_id, turn_number, model_id, yolo_mode, conversation_message_count, session_relationship, schema_version}`; types like `turn_started`, `loop_started` |
|
||||
| `updates.jsonl` | streaming incremental updates |
|
||||
| `summary.json` | `{id, cwd, session_summary, created_at, updated_at}` |
|
||||
| `prompt_context.json` | injected context, incl. which AGENTS.md/CLAUDE.md files were loaded |
|
||||
| `system_prompt.txt` | exact system prompt for the session |
|
||||
| `rewind_points.jsonl`, `plan_mode.json` | rewind/plan-mode state |
|
||||
| Per-cwd prompt history | `~/.grok/sessions/<cwd>/prompt_history.jsonl` — `{timestamp, session_id, prompt, is_bash}` |
|
||||
| Global structured log | `~/.grok/logs/unified.jsonl` — `{ts, src, pid, lvl, msg, ctx, sid, ver}` |
|
||||
| Search index | `~/.grok/sessions/session_search.sqlite` — `session_docs(session_id, cwd, updated_at, title)` + FTS5 (`session_docs_fts`) we can query directly |
|
||||
| Integration surfaces | Grok exposes **ACP (Agent Client Protocol)**, **headless mode** (`grok -p`), and **hooks** (`~/.grok/docs/user-guide/10-hooks.md`) — push-capture options |
|
||||
|
||||
### 2.4 Cross-family summary
|
||||
|
||||
| | Claude Code | Codex CLI | Grok CLI |
|
||||
|--|--|--|--|
|
||||
| Root | `~/.claude/projects/` | `~/.codex/sessions/` | `~/.grok/sessions/` |
|
||||
| Unit | one `.jsonl`/session | one `rollout-*.jsonl`/session | one **dir**/session |
|
||||
| Layout | flat per-cwd dir | date-partitioned `YYYY/MM/DD` | per-cwd, per-session dir |
|
||||
| Discriminator | `type` | `type` (version-dependent) | `type` (in `chat_history`/`events`) |
|
||||
| Lifecycle events | inferred from records | inferred from records | **explicit** `events.jsonl` |
|
||||
| Token usage | `message.usage` | per-line usage | from events/updates |
|
||||
| Push capture | Stop/SessionEnd hooks | `codex exec` wrappers | hooks / ACP |
|
||||
| Pull capture | scan dir by mtime | scan date partitions | scan dirs / query FTS sqlite |
|
||||
|
||||
**Implication:** the common denominator is *"JSONL records discriminated by a
|
||||
`type` field, with a session id, timestamps, turn linkage, tool calls, and token
|
||||
usage."* That maps cleanly onto one normalized schema (§4). Per-family quirks
|
||||
(Grok's explicit `events.jsonl`, Codex's schema versions, Claude's sidechains) are
|
||||
handled inside each adapter.
|
||||
|
||||
## 3. Tiered Storage Model
|
||||
|
||||
```
|
||||
Tier 0 SOURCE (agents' own logs) read-only, never mutated
|
||||
~/.claude/projects ~/.codex/sessions ~/.grok/sessions
|
||||
│ collector adapters (per family) + ingest cursor
|
||||
▼
|
||||
Tier 1 RAW CACHE (bounded, EVICTABLE) normalized Session + Event records
|
||||
│ signal extractors / digesters
|
||||
▼
|
||||
Tier 2 DISTILLED MEMORY (durable, small) session digests + signals + pattern evidence
|
||||
```
|
||||
|
||||
- **Tier 0 — Source.** The agents' own logs. We treat them as read-only. We keep a
|
||||
small **ingest cursor** per source so re-scans are incremental (see §6).
|
||||
- **Tier 1 — Raw cache.** Normalized copies of sessions/events. This is the bulky
|
||||
tier and the *only* tier subject to budget eviction.
|
||||
- **Tier 2 — Distilled memory.** Per-session **digest** (outcome, costs, tool
|
||||
histogram, error/retry/intervention markers, key snippets) plus extracted
|
||||
**signals** and **pattern evidence pointers**. Compact and durable. A session can
|
||||
be fully evicted from Tier 1 once its Tier 2 digest exists.
|
||||
|
||||
This is what makes "drop old content once it has been analyzed" safe: analysis
|
||||
*promotes* the valuable bits into Tier 2 before the raw bytes are dropped.
|
||||
|
||||
### 3.1 Per-session lifecycle / watermarks
|
||||
|
||||
Each session row carries timestamps that drive eviction:
|
||||
|
||||
```
|
||||
discovered_at → ingested_at → analyzed_at → [evictable] → evicted_at
|
||||
```
|
||||
|
||||
- `ingested_at` set when normalized into Tier 1.
|
||||
- `analyzed_at` set when the Tier 2 digest is written. **A session is evictable iff
|
||||
`analyzed_at` is set.**
|
||||
- `evicted_at` set when raw bytes are dropped from Tier 1 (Tier 2 digest remains).
|
||||
|
||||
## 4. Normalized Schema (Tier 1)
|
||||
|
||||
Two record kinds. Field names are stable across all adapters.
|
||||
|
||||
### 4.1 `Session`
|
||||
|
||||
```jsonc
|
||||
{
|
||||
"session_uid": "claude:17092961-…", // "<flavor>:<native id>", globally unique
|
||||
"flavor": "claude" | "codex" | "grok",
|
||||
"native_session_id": "17092961-…",
|
||||
"repo": "agentic-resources", // resolved from cwd
|
||||
"domain": "helix_forge", // resolved from repo→domain map
|
||||
"cwd": "/home/worsch/agentic-resources",
|
||||
"git_branch": "main",
|
||||
"model": "claude-opus-4-8",
|
||||
"started_at": "2026-06-05T21:59:30Z",
|
||||
"ended_at": "2026-06-05T22:14:00Z",
|
||||
"outcome": "success|fail|abandoned|unknown",
|
||||
"cost": { "input_tokens": 0, "output_tokens": 0, "cache_tokens": 0,
|
||||
"wall_clock_s": 0, "turns": 0, "retries": 0 },
|
||||
"task_ref": "AGENTIC-WP-0002-T01", // if derivable; else null
|
||||
"source_path": "~/.claude/projects/…/….jsonl",
|
||||
"source_bytes": 0,
|
||||
"schema_version": 1,
|
||||
"ingested_at": "…", "analyzed_at": null, "evicted_at": null
|
||||
}
|
||||
```
|
||||
|
||||
### 4.2 `SessionEvent`
|
||||
|
||||
```jsonc
|
||||
{
|
||||
"session_uid": "claude:17092961-…",
|
||||
"seq": 12, // monotonic within session
|
||||
"parent_seq": 11, // turn DAG (Claude uuid/parentUuid)
|
||||
"ts": "2026-06-05T22:01:13Z",
|
||||
"kind": "user_msg | assistant_msg | thinking | tool_call | tool_result"
|
||||
+ "| error | test_run | edit | retry | human_intervention | decision"
|
||||
+ "| lifecycle | completion",
|
||||
"role": "user|assistant|system|tool",
|
||||
"tool": "Bash|Edit|Read|…", // when kind=tool_call/result
|
||||
"summary": "ran pytest -q", // short, human-readable
|
||||
"payload_ref": "blob://…", // pointer to full content in Tier 1 blob store
|
||||
"tokens": 0,
|
||||
"is_sidechain": false
|
||||
}
|
||||
```
|
||||
|
||||
Adapters map native records onto `kind`. Grok's `events.jsonl` populates
|
||||
`lifecycle`/`turn` events directly; Claude/Codex lifecycle is inferred from the
|
||||
record stream. Bulky bodies live behind `payload_ref` so Tier 1 rows stay light
|
||||
and blobs can be evicted independently.
|
||||
|
||||
### 4.3 Native → `kind` mapping (all three families)
|
||||
|
||||
Each cell is the native record/discriminator an adapter reads to emit that
|
||||
`SessionEvent.kind`. `—` = not natively present; the adapter synthesizes or omits.
|
||||
|
||||
| `kind` | Claude Code (`type` / `message`) | Codex CLI (`type` → `payload.type`) | Grok CLI (file → `type`) |
|
||||
|--------|----------------------------------|--------------------------------------|---------------------------|
|
||||
| `user_msg` | `user`, `message.role=user` | `response_item` → `message` `role=user`/`developer` | `chat_history` → `user` |
|
||||
| `assistant_msg` | `assistant`, `message.role=assistant`, content `text` | `response_item` → `message` `role=assistant` (`output_text`) | `chat_history` → `assistant` |
|
||||
| `thinking` | `assistant` content block `type=thinking` | `response_item` → `reasoning` | `chat_history`/`updates` reasoning block |
|
||||
| `tool_call` | `assistant` content block `type=tool_use` (`name`,`input`) | `response_item` → `function_call` (`name`,`arguments`,`call_id`) | `chat_history`/`updates` tool-call entry |
|
||||
| `tool_result` | `user`/tool record `type=tool_result` + `toolUseResult` | `response_item` → `function_call_output` (join on `call_id`) | `updates` tool-result entry |
|
||||
| `test_run` | derived from `tool_call` (Bash running tests) | derived from `function_call` (`exec_command`) | derived from tool-call entry |
|
||||
| `edit` | `tool_use` where `name` ∈ Edit/Write/NotebookEdit | `function_call` apply-patch/file-write tool | tool-call entry (edit/write) |
|
||||
| `error` | `toolUseResult` error / non-zero result | `function_call_output` error / `event_msg` error | `events.jsonl` error / failed update |
|
||||
| `retry` | repeated `tool_use` after error (inferred via DAG) | repeated `function_call` after error (inferred, temporal) | `events.jsonl` loop/retry event |
|
||||
| `human_intervention` | `user` record mid-turn (interrupt), `userType` | `event_msg` → `user_message` mid-task | `prompt_history` mid-session / `events.jsonl` |
|
||||
| `decision` | recorded out-of-band (State Hub `/decisions`) | recorded out-of-band (State Hub) | recorded out-of-band (State Hub) |
|
||||
| `lifecycle` | inferred: first/last record, `summary`, `queue-operation` | `event_msg` → `task_started` / `task_complete` | **`events.jsonl`** → `turn_started`/`loop_started`/… (explicit) |
|
||||
| `completion` | inferred: last `assistant` + `Stop`/`SessionEnd` hook | `event_msg` → `task_complete` | `events.jsonl` turn end + `summary.json` |
|
||||
|
||||
**Linkage note (drives `seq`/`parent_seq`):** Claude has a true turn DAG
|
||||
(`uuid`/`parentUuid`) — preserve it directly. Codex is **flat**, joined only by
|
||||
`call_id`; assign `seq` by temporal order. Grok carries explicit `turn_number` in
|
||||
`events.jsonl`; key `seq` off that plus record order.
|
||||
|
||||
**Cost block sources:** Claude `message.usage`; Codex `event_msg/token_count`;
|
||||
Grok `events.jsonl` / `updates.jsonl` token fields.
|
||||
|
||||
## 5. Retention & Eviction
|
||||
|
||||
The user's stated preference: **storage-budget-based**, dropping old content once
|
||||
it has been analyzed or once it no longer fits — *better than* a fixed daily/weekly
|
||||
window. We implement budget-based as primary, with a time backstop and a scheduled
|
||||
cadence as the trigger.
|
||||
|
||||
### 5.1 Configurable knobs
|
||||
|
||||
```toml
|
||||
[session_memory.retention]
|
||||
raw_soft_cap_bytes = "4GiB" # begin evicting analyzed sessions above this
|
||||
raw_hard_cap_bytes = "6GiB" # absolute ceiling for Tier 1
|
||||
raw_max_age_days = 45 # backstop: analyzed raw older than this is evictable regardless of space
|
||||
distilled_cap_bytes = "1GiB" # Tier 2 ceiling (should grow slowly; alert, don't auto-drop)
|
||||
cadence = "daily" # ingest+analyze+evict sweep: daily | weekly | on-hook
|
||||
```
|
||||
|
||||
### 5.2 Eviction algorithm (runs after each ingest+analyze sweep)
|
||||
|
||||
1. **Compute** current Tier 1 usage.
|
||||
2. **Backstop pass:** evict any session where `analyzed_at` is set AND
|
||||
`age > raw_max_age_days`.
|
||||
3. **Budget pass:** while `usage > raw_soft_cap_bytes`:
|
||||
- pick the **oldest `analyzed_at`** session that is not yet evicted;
|
||||
- drop its Tier 1 raw rows + blobs (Tier 2 digest is kept), set `evicted_at`;
|
||||
- if **no analyzed-but-unevicted session remains**, stop the budget pass
|
||||
(we will not destroy un-analyzed data to free space) and go to step 4.
|
||||
4. **Back-pressure / overflow:** if `usage > raw_hard_cap_bytes` and the only
|
||||
remaining bulk is **un-analyzed**:
|
||||
- first try to **analyze now** (run extraction) to make those sessions
|
||||
evictable, then re-run the budget pass;
|
||||
- if still over hard cap (analysis can't keep up or fails), evict the **oldest
|
||||
un-analyzed** sessions as a last resort and emit a
|
||||
`session_memory.data_loss` warning event + a State Hub progress note. This is
|
||||
the only path that loses un-analyzed data, and it is always reported.
|
||||
5. **Tier 2 guard:** if distilled usage > `distilled_cap_bytes`, **do not
|
||||
auto-drop**; flag for human/curation review (digests are the product).
|
||||
|
||||
**Invariant:** *no session's raw bytes are dropped before its Tier 2 digest
|
||||
exists, except the explicitly-reported hard-cap overflow path.*
|
||||
|
||||
### 5.3 Why budget-based beats fixed-window
|
||||
|
||||
A fixed daily/weekly drop either deletes data we never analyzed (lossy) or hoards
|
||||
data we already distilled (wasteful). Budget + `analyzed_at` watermark ties
|
||||
deletion to **two** real conditions the user named — *"once it has been analyzed"*
|
||||
(promoted to Tier 2) and *"doesn't fit any longer"* (over budget) — and only falls
|
||||
back to time as a backstop.
|
||||
|
||||
## 6. Ingest Cursors (incremental, idempotent)
|
||||
|
||||
Per source, persist a small cursor so sweeps are cheap and re-runnable:
|
||||
|
||||
- **Claude / Grok (per-cwd dirs):** track `(file_path, size, mtime)` and last
|
||||
parsed line offset; re-ingest only grown/changed files. `session_uid` dedupes.
|
||||
- **Codex (date partitions):** track last-seen `YYYY/MM/DD` + per-file offset.
|
||||
- Ingest is **idempotent** keyed on `(session_uid, seq)` — safe to re-run after a
|
||||
crash or partial sweep.
|
||||
|
||||
## 7. Capture Modes
|
||||
|
||||
- **Pull (default, portable):** scheduled sweep scans Tier 0 by mtime/partition.
|
||||
Works for all three families with zero coupling to the agent. Triggered on the
|
||||
configured `cadence` via the repo's scheduler (`/schedule`, cron, or `/loop`).
|
||||
- **Push (optional, low-latency):** wire the agent's own hooks to ping the ingester
|
||||
on session close — Claude `Stop`/`SessionEnd` hooks, Grok hooks/ACP, Codex
|
||||
`exec` wrappers. Push just enqueues; the same idempotent pull pipeline does the
|
||||
work.
|
||||
|
||||
Capture must be **non-blocking** (PRD FR-C5): we read copies of logs out-of-band;
|
||||
we never sit in the agent's critical path.
|
||||
|
||||
## 8. Component Layout (proposed, in-repo)
|
||||
|
||||
```
|
||||
session-memory/
|
||||
adapters/
|
||||
claude.py # Tier0→Tier1 normalizer (verified schema)
|
||||
codex.py # version-detecting normalizer (confirm against real rollout)
|
||||
grok.py # reads session dir incl. events.jsonl
|
||||
core/
|
||||
schema.py # Session / SessionEvent dataclasses + versioning
|
||||
store.py # Tier1 (rows+blobs) and Tier2 (digests) — SQLite to start
|
||||
cursor.py # per-source ingest cursors
|
||||
retention.py # §5 eviction algorithm
|
||||
digest.py # Tier1→Tier2 session digest + signal stubs
|
||||
ingest.py # one sweep: discover → normalize → analyze → evict
|
||||
config.toml # §5.1 knobs + repo→domain map + source paths
|
||||
```
|
||||
|
||||
Storage starts as **SQLite + a blob dir** (rows in SQLite, bulky payloads as files
|
||||
under `payload_ref`); graduate to Postgres alongside the State Hub only if volume
|
||||
demands. Digests/decisions are also surfaced to the hub per ADR-001 (files-first;
|
||||
hub indexes).
|
||||
|
||||
## 9. Privacy / Safety
|
||||
|
||||
- Tier 0 logs can contain secrets (the Grok `auth.json` and Claude `.credentials`
|
||||
live in the same trees). The ingester reads **only** session transcripts, never
|
||||
credential files, and **redacts** obvious secret patterns into `payload_ref`
|
||||
blobs.
|
||||
- All data is local; nothing leaves the workstation. Eviction of Tier 1 is a real
|
||||
delete (not just an index drop) so the bounded cache is also a privacy bound.
|
||||
|
||||
## 10. Open Questions
|
||||
|
||||
- ~~**OQ1** Confirm Codex `rollout-*.jsonl` per-line schema.~~ **Resolved** (§2.2):
|
||||
`{timestamp,type,payload}` lines, `type` ∈ `session_meta`/`response_item`/`event_msg`/`turn_context`/`compacted`,
|
||||
tool calls flat-linked by `call_id`, tokens via `event_msg/token_count`. Remaining
|
||||
sub-item: verify the `token_count` payload field names against a real install when
|
||||
Codex is present (older-version variance only).
|
||||
- **OQ2** Outcome inference: how do we reliably label `success/fail/abandoned`
|
||||
across flavors (exit signals differ)? Start heuristic (last-turn + test results +
|
||||
human-intervention markers), refine in Detect phase.
|
||||
- **OQ3** `task_ref` resolution — can we always map a session to a workplan task
|
||||
(via cwd + branch + state-hub), or only sometimes?
|
||||
- ~~**OQ4** Right default for `raw_soft_cap_bytes`.~~ **Measured** (Phase 0, 85
|
||||
real local Claude files / 63 distinct sessions): source bytes per session
|
||||
min 396 · **median ~49 KB** · max 48 MB (one outlier) · ~103 MB total. Claude
|
||||
defaults (4 GiB soft / 6 GiB hard) leave ample headroom; revisit once Grok dirs
|
||||
(heavier, multi-file) are ingested in Phase 1.
|
||||
- **OQ6 (new, found in Phase 0)** Multi-file sessions: ~84 transcript files mapped
|
||||
to ~63 `session_uid`s — some sessions span multiple files (resume/sidechain
|
||||
sharing a `sessionId`). Current behavior upserts (last file wins per
|
||||
`(session_uid, seq)`); a future refinement is to *merge* events across files of
|
||||
one session rather than overwrite. Acceptable for Phase 0.
|
||||
- **OQ5** Should push-hooks be opt-in per machine to avoid surprising the agents?
|
||||
|
||||
---
|
||||
|
||||
*Next step: [AGENTIC-WP-0002] implements Phase 0 — the schema, the Claude
|
||||
collector, the Tier1/Tier2 store, and the budget-based eviction sweep.*
|
||||
|
||||
## Sources
|
||||
|
||||
- Claude Code session format — verified on disk: `~/.claude/projects/*/*.jsonl`, `~/.claude/history.jsonl`.
|
||||
- Grok CLI session format — verified on disk: `~/.grok/sessions/`, `~/.grok/logs/unified.jsonl`, `~/.grok/sessions/session_search.sqlite`; `~/.grok/README.md` (ACP/headless/hooks).
|
||||
- Codex CLI session format — [ccusage Codex guide](https://ccusage.com/guide/codex/), [Codex advanced config](https://developers.openai.com/codex/config-advanced), [codex-trace](https://github.com/PixelPaw-Labs/codex-trace), [codex-logs](https://github.com/wondercoms/codex-logs), [Session/Rollout Files discussion #3827](https://github.com/openai/codex/discussions/3827), [trajectory-JSON issue #2288](https://github.com/openai/codex/issues/2288).
|
||||
279
docs/PRD-helix-forge.md
Normal file
279
docs/PRD-helix-forge.md
Normal file
@@ -0,0 +1,279 @@
|
||||
# Product Requirements Document — Helix Forge
|
||||
|
||||
**Domain:** helix_forge
|
||||
**Repo:** agentic-resources
|
||||
**Status:** Draft v0.1
|
||||
**Author:** Claude (drafted with Bernd Worsch)
|
||||
**Created:** 2026-06-06
|
||||
**Updated:** 2026-06-06
|
||||
|
||||
---
|
||||
|
||||
## 1. Summary
|
||||
|
||||
Helix Forge is a system for **handling a collection of repositories and evolving
|
||||
the utility of what those repositories provide**, by treating the coding sessions
|
||||
run against them as a first-class data source.
|
||||
|
||||
Concretely: across a fleet of repos worked on by multiple coding agents (Claude,
|
||||
Codex, GrokBuild), Helix Forge **inspects the sessions**, **collects data about the
|
||||
problems agents hit and the moves that resolved them**, and turns that data into
|
||||
**reusable solution patterns** that can be discussed, implemented, and re-applied —
|
||||
across every agent flavor, not just the one that discovered the pattern.
|
||||
|
||||
The name is the metaphor: a *helix* of repeated turns (session → pattern → improved
|
||||
session) feeding a *forge* where the tooling, environments, and instructions for our
|
||||
agents are hammered into better shape over time. This is the operational engine
|
||||
behind the INTENT.md goal of an *antifragile, continuously-optimizing agentic
|
||||
ecosystem*.
|
||||
|
||||
## 2. Problem Statement
|
||||
|
||||
We run many coding sessions, across many repos, with several different agents. Today
|
||||
the value of each session is **trapped in that session**:
|
||||
|
||||
- When an agent solves a tricky problem, the solution is not captured in a form
|
||||
another agent (or the same agent next week) can reuse.
|
||||
- When an agent fails, struggles, or burns excess budget on a problem, that failure
|
||||
signal is lost — we re-encounter the same friction repeatedly.
|
||||
- Each agent flavor (Claude, Codex, GrokBuild) has its own environment, instruction
|
||||
format, and extension mechanism, so a fix discovered for one is **not portable** to
|
||||
the others without manual translation.
|
||||
- We have no systematic, evidence-based answer to "what is actually slowing our
|
||||
agents down, and what consistently makes them faster?" — decisions about tooling,
|
||||
prompts, and environments are made on anecdote.
|
||||
|
||||
**The cost:** repeated mistakes, non-transferable wins, slow and uneven improvement
|
||||
of agent performance, and no feedback loop from real session data back into the
|
||||
tools/environments/instructions that shape future sessions.
|
||||
|
||||
## 3. Goals & Non-Goals
|
||||
|
||||
### 3.1 Goals
|
||||
|
||||
| # | Goal |
|
||||
|---|------|
|
||||
| G1 | **Capture** coding sessions from Claude, Codex, and GrokBuild in a normalized, comparable form. |
|
||||
| G2 | **Detect** recurring *problem patterns* (failure, friction, wasted budget) and *success patterns* (efficient resolutions) from that data. |
|
||||
| G3 | **Curate** detected patterns into a reviewed catalog of *solution patterns* that humans and agents can discuss and approve. |
|
||||
| G4 | **Distribute** approved patterns back into agent environments — as instructions, tools, or extensions — in a per-flavor-appropriate form. |
|
||||
| G5 | **Measure** whether distributed patterns actually improved subsequent sessions (close the loop). |
|
||||
| G6 | Keep the whole loop **agent-flavor-agnostic at the core**, with thin per-flavor adapters at the edges. |
|
||||
|
||||
### 3.2 Non-Goals (initial release)
|
||||
|
||||
- Not a replacement for the coding agents themselves; Helix Forge observes and
|
||||
improves them, it does not execute coding tasks.
|
||||
- Not a general APM/observability product; scope is coding-session improvement, not
|
||||
arbitrary infrastructure monitoring.
|
||||
- Not an autonomous self-modifying system — pattern promotion into live agent
|
||||
environments requires human approval (HITL) for the first release.
|
||||
- Not building new model training/fine-tuning pipelines; we optimize *context,
|
||||
tooling, and environment*, not model weights.
|
||||
- Not replacing the Custodian State Hub; Helix Forge is a producer/consumer of hub
|
||||
state, not a competing system of record. (See §9.)
|
||||
|
||||
## 4. Users & Personas
|
||||
|
||||
| Persona | Description | What they need from Helix Forge |
|
||||
|---------|-------------|----------------------------------|
|
||||
| **Operator (Bernd)** | Owns the agentic ecosystem; decides which patterns become standards. | A reviewable catalog of patterns with evidence; control over what ships to agents. |
|
||||
| **Coding agent (Claude / Codex / GrokBuild)** | Runs tasks in a repo; both the *source* of session data and the *consumer* of patterns. | To emit session data cheaply; to receive applicable patterns in its native format at session start. |
|
||||
| **Repo maintainer agent** | The per-repo agent persona (e.g. `agentic-resources`) following AGENTS.md conventions. | Patterns scoped to its repo/domain; integration via existing workplan + state-hub flow. |
|
||||
| **Reviewer (human or kaizen agent)** | Evaluates candidate patterns before they become standards. | Clear pattern proposals, supporting evidence, and a discuss/approve/reject workflow. |
|
||||
|
||||
## 5. Core Concepts (Domain Model)
|
||||
|
||||
- **Session** — one bounded run of a coding agent against a repo. Has an agent flavor,
|
||||
repo, task reference, timeline of events, outcome, and cost (tokens/time).
|
||||
- **Session Event** — a normalized atomic record within a session: tool call, edit,
|
||||
test run, error, retry, human intervention, decision, completion.
|
||||
- **Signal** — a derived indicator extracted from sessions: e.g. *repeated test
|
||||
failure on same file*, *budget overrun*, *fast clean resolution*, *retry storm*,
|
||||
*human escalation*.
|
||||
- **Problem Pattern** — a recurring negative signal cluster ("agents repeatedly fail
|
||||
X because Y").
|
||||
- **Success Pattern** — a recurring positive resolution ("doing Z reliably resolves X
|
||||
cheaply").
|
||||
- **Solution Pattern** — a curated, reviewed artifact pairing a problem with one or
|
||||
more recommended resolutions, written agent-flavor-agnostically, with per-flavor
|
||||
rendering hints.
|
||||
- **Pattern Application** — the act of distributing a solution pattern into a specific
|
||||
agent environment (an instruction snippet, a tool, an extension), plus the record of
|
||||
its effect on later sessions.
|
||||
|
||||
## 6. Functional Requirements
|
||||
|
||||
### 6.1 Capture (G1)
|
||||
|
||||
- **FR-C1** Ingest session transcripts/logs from each supported agent flavor via a
|
||||
per-flavor **collector adapter**.
|
||||
- **FR-C2** Normalize raw logs into the common `Session` + `Session Event` schema,
|
||||
regardless of source flavor.
|
||||
- **FR-C3** Tag every session with: agent flavor, repo, domain, task/workplan id (if
|
||||
any), outcome (success/fail/abandoned), and cost metrics (tokens, wall-clock,
|
||||
retries).
|
||||
- **FR-C4** Support both **batch import** (historical logs) and **incremental ingest**
|
||||
(new sessions as they close).
|
||||
- **FR-C5** Collection must be low-friction and non-blocking — an agent emitting
|
||||
session data must never slow or break the actual coding task.
|
||||
|
||||
### 6.2 Detect (G2)
|
||||
|
||||
- **FR-D1** Run signal extractors over normalized sessions to surface problem and
|
||||
success signals.
|
||||
- **FR-D2** Cluster recurring signals across sessions/repos/flavors into candidate
|
||||
Problem Patterns and Success Patterns.
|
||||
- **FR-D3** For each candidate pattern, attach **evidence**: the supporting sessions,
|
||||
frequency, affected repos, affected flavors, and estimated cost impact.
|
||||
- **FR-D4** Flag **cross-flavor** patterns explicitly (a problem seen in Claude that
|
||||
Codex also hits) — these are the highest-value reuse targets.
|
||||
|
||||
### 6.3 Curate (G3)
|
||||
|
||||
- **FR-U1** Present candidate patterns for review with their evidence in a
|
||||
discuss/approve/reject workflow.
|
||||
- **FR-U2** Allow a reviewer (human or kaizen agent) to promote a candidate into a
|
||||
**Solution Pattern**: a named, versioned artifact with problem description,
|
||||
recommended resolution(s), applicability scope, and per-flavor rendering hints.
|
||||
- **FR-U3** Maintain a **Pattern Catalog** as the source of truth for approved
|
||||
solution patterns, versioned and stored as files in-repo (consistent with ADR-001:
|
||||
files originate work, the hub indexes them).
|
||||
- **FR-U4** Record pattern decisions through the State Hub decision mechanism so
|
||||
rationale is auditable.
|
||||
|
||||
### 6.4 Distribute (G4)
|
||||
|
||||
- **FR-X1** Render each approved solution pattern into per-flavor artifacts via
|
||||
**distributor adapters**:
|
||||
- Claude → `CLAUDE.md` snippets, skills, or settings/hooks.
|
||||
- Codex → `AGENTS.md` snippets / repo conventions.
|
||||
- GrokBuild → its native instruction/extension format.
|
||||
- **FR-X2** Scope distribution by repo and domain, so a pattern only lands where it
|
||||
applies.
|
||||
- **FR-X3** Distribution is **proposed, not auto-applied** in v1 — output is a
|
||||
reviewable change (e.g. a workplan or PR), gated by human approval.
|
||||
- **FR-X4** Track which patterns are currently active in which environments.
|
||||
|
||||
### 6.5 Measure (G5)
|
||||
|
||||
- **FR-M1** After a pattern is applied, compare subsequent sessions touching the same
|
||||
signal against the pre-application baseline (cost, retry rate, success rate,
|
||||
human-intervention rate).
|
||||
- **FR-M2** Surface per-pattern **effectiveness** so ineffective patterns can be
|
||||
revised or retired.
|
||||
- **FR-M3** Provide a fleet-level view: are sessions across the collection getting
|
||||
cheaper / more reliable over time? (the helix turning.)
|
||||
|
||||
### 6.6 Multi-Agent Support (G6)
|
||||
|
||||
- **FR-A1** The core schema, detection, catalog, and measurement are **flavor-agnostic**.
|
||||
- **FR-A2** All flavor-specific knowledge lives in **collector adapters** (input) and
|
||||
**distributor adapters** (output). Adding a fourth agent = adding one collector +
|
||||
one distributor, no core changes.
|
||||
- **FR-A3** A successful pattern discovered via one flavor MUST be expressible for all
|
||||
other supported flavors.
|
||||
|
||||
## 7. Architecture Overview
|
||||
|
||||
```
|
||||
┌──────────── per-flavor edges ────────────┐ ┌──── flavor-agnostic core ────┐
|
||||
│ │ │ │
|
||||
Claude ─┐ │ │ │
|
||||
Codex ─┼─► Collector Adapters ──► Normalizer ─┼────────►│ Session + Event Store │
|
||||
Grok ─┘ │ │ │ │
|
||||
│ │ ▼ │
|
||||
│ │ Signal Extractors │
|
||||
│ │ │ │
|
||||
│ │ ▼ │
|
||||
│ │ Pattern Detector / Clusterer│
|
||||
│ │ │ │
|
||||
│ │ ▼ │
|
||||
│ │ Curation + Pattern Catalog │ ◄─ reviewer (human/kaizen)
|
||||
│ │ │ │
|
||||
Claude ◄┐ │ │ ▼ │
|
||||
Codex ◄┼── Distributor Adapters ◄────────────┼─────────│ Effectiveness Measurement │
|
||||
Grok ◄┘ │ │ │
|
||||
└───────────────────────────────────────────┘ └──────────────────────────────┘
|
||||
▲ feeds back into ▲ tools / environments / instructions
|
||||
```
|
||||
|
||||
**Design principle:** *agnostic core, thin adapters at the edges.* The expensive,
|
||||
reusable intelligence (normalized sessions, detection, catalog, measurement) is built
|
||||
once; each agent flavor only needs an input adapter and an output adapter.
|
||||
|
||||
## 8. Data & Storage
|
||||
|
||||
- **Pattern Catalog** and **workplans**: files in `agentic-resources` (per ADR-001 in
|
||||
AGENTS.md — files are the source of truth, the hub indexes them).
|
||||
- **Session/event data**: a local store (start simple: structured files / SQLite;
|
||||
graduate to Postgres alongside the State Hub if volume warrants).
|
||||
- **Decisions & progress**: recorded through the Custodian State Hub so the broader
|
||||
ecosystem stays aware of Helix Forge's activity.
|
||||
|
||||
## 9. Integration with the Custodian State Hub
|
||||
|
||||
Helix Forge runs inside the `helix_forge` domain and is **not** a competing system of
|
||||
record:
|
||||
|
||||
- Work originates as **workplans** in this repo (`AGENTIC-WP-NNNN`), synced via
|
||||
`make fix-consistency REPO=agentic-resources`.
|
||||
- Pattern-promotion and distribution decisions are logged via the hub's decision API.
|
||||
- Each Helix Forge run logs at least one `add_progress_event()` / `POST /progress/`.
|
||||
- The hub remains a **read model**; Helix Forge writes its durable artifacts as files
|
||||
and lets the hub index them.
|
||||
|
||||
## 10. Success Metrics
|
||||
|
||||
| Metric | Meaning | Target (directional, v1) |
|
||||
|--------|---------|--------------------------|
|
||||
| Sessions captured | Coverage of real work | ≥ 90% of sessions across the 3 flavors normalized |
|
||||
| Patterns cataloged | Knowledge made reusable | A growing, non-trivial catalog of reviewed solution patterns |
|
||||
| Cross-flavor patterns | Reuse leverage | ≥ 1 pattern proven to transfer across flavors |
|
||||
| Pattern effectiveness | Loop is closing | Applied patterns show measurable cost/reliability improvement vs. baseline |
|
||||
| Fleet trend | The helix turns | Median session cost ↓ and success rate ↑ over time |
|
||||
| Repeated-failure rate | Friction eliminated | Known problem patterns recur less after distribution |
|
||||
|
||||
## 11. Phasing / Roadmap
|
||||
|
||||
- **Phase 0 — Foundations.** Define the Session/Event schema and Pattern Catalog
|
||||
format. One collector adapter (Claude) + batch import. Manual inspection only.
|
||||
- **Phase 1 — Detect.** Signal extractors + pattern clustering over captured sessions;
|
||||
candidate patterns surfaced with evidence. Add Codex + GrokBuild collectors.
|
||||
- **Phase 2 — Curate.** Review workflow + versioned Pattern Catalog, wired to hub
|
||||
decisions.
|
||||
- **Phase 3 — Distribute.** Distributor adapters for all three flavors; patterns ship
|
||||
as reviewable workplans/PRs (HITL).
|
||||
- **Phase 4 — Measure.** Baseline-vs-after effectiveness and fleet-level trend
|
||||
reporting; retire ineffective patterns. Loop is closed.
|
||||
|
||||
## 12. Open Questions
|
||||
|
||||
- **OQ1** What is the canonical raw log format available from each of Claude, Codex,
|
||||
and GrokBuild today, and how lossy is normalization from each?
|
||||
- **OQ2** How are sessions reliably bounded and attributed to a repo/task across the
|
||||
three flavors?
|
||||
- **OQ3** Where does detection logic run — local batch jobs, hub-side, or a dedicated
|
||||
service? What volume do we actually expect?
|
||||
- **OQ4** Pattern format: how do we keep one agnostic representation while giving each
|
||||
distributor enough to render high-quality native artifacts?
|
||||
- **OQ5** What's the minimum trustworthy evidence bar before a pattern is allowed to be
|
||||
distributed to live agent environments?
|
||||
- **OQ6** How do we prevent pattern bloat — too many low-value instructions degrading
|
||||
agent context budgets (cf. the token-budget policy in global instructions)?
|
||||
|
||||
## 13. Risks
|
||||
|
||||
| Risk | Mitigation |
|
||||
|------|------------|
|
||||
| Capture overhead slows real coding sessions | Async, non-blocking collection (FR-C5); never in the agent's critical path. |
|
||||
| Patterns become noise / context bloat | Effectiveness gating (FR-M2) + retirement; measure before broad distribution. |
|
||||
| Over-fitting to one flavor | Agnostic core + explicit cross-flavor flagging (FR-D4, FR-A3). |
|
||||
| Bad pattern degrades agents | HITL approval before distribution (FR-X3); baseline measurement to catch regressions. |
|
||||
| Drift from State Hub conventions | Files-first per ADR-001; log via hub; no competing source of record. |
|
||||
|
||||
---
|
||||
|
||||
*This PRD is a draft for discussion. Next step: a `proposed` workplan
|
||||
(`AGENTIC-WP-0002`) scoping Phase 0 — the Session/Event schema and the first
|
||||
(Claude) collector adapter.*
|
||||
75
session_memory/README.md
Normal file
75
session_memory/README.md
Normal file
@@ -0,0 +1,75 @@
|
||||
# session_memory
|
||||
|
||||
Capture + retention layer for Helix Forge — the **Capture** stage of the loop in
|
||||
[../docs/PRD-helix-forge.md](../docs/PRD-helix-forge.md), built to the
|
||||
[../docs/DESIGN-session-memory.md](../docs/DESIGN-session-memory.md) spec.
|
||||
|
||||
It scans coding-agent session logs, normalizes them into one schema, distills a
|
||||
compact per-session digest, and ages out raw bulk under a **storage budget**
|
||||
(dropping sessions once analyzed and once space is needed) rather than a fixed
|
||||
time window.
|
||||
|
||||
## Layout
|
||||
|
||||
```
|
||||
session_memory/
|
||||
adapters/claude.py # Tier0 -> Tier1 normalizer (Codex/Grok land in Phase 1)
|
||||
core/schema.py # Session / SessionEvent / Cost
|
||||
core/store.py # SQLite rows + blob-dir bodies (Tier1) + digests (Tier2)
|
||||
core/cursor.py # incremental ingest cursors
|
||||
core/digest.py # Tier1 -> Tier2 promotion + outcome heuristic
|
||||
core/retention.py # budget-based eviction sweep
|
||||
ingest.py # one sweep: discover -> normalize -> store -> digest -> evict
|
||||
config.toml # store paths, retention caps, sources, repo->domain map
|
||||
```
|
||||
|
||||
The local store lives under `session_memory/.store/` (gitignored).
|
||||
|
||||
## Run a sweep
|
||||
|
||||
```bash
|
||||
# from the repo root
|
||||
python -m session_memory.ingest # ingest + analyze + evict
|
||||
python -m session_memory.ingest --dry-run # discover + parse only, writes nothing
|
||||
python -m session_memory.ingest --config path/to/config.toml
|
||||
```
|
||||
|
||||
Output reports `discovered / ingested / skipped_unchanged / analyzed` and a
|
||||
retention line (`freed`, `final_usage`, and per-pass eviction counts). Sweeps are
|
||||
idempotent — re-running skips unchanged files via the cursor.
|
||||
|
||||
## Scheduling (cadence)
|
||||
|
||||
Retention is budget-based; the `cadence` in `config.toml` only decides how often
|
||||
the sweep *runs*. Trigger it with the repo scheduler, e.g. daily:
|
||||
|
||||
```bash
|
||||
# Claude Code: schedule a daily routine that runs the sweep
|
||||
/schedule "daily session-memory sweep" -- python -m session_memory.ingest
|
||||
```
|
||||
|
||||
or a cron entry / `/loop` on a timer. Push-capture (agent Stop/SessionEnd hooks)
|
||||
can also enqueue a sweep; see design §7.
|
||||
|
||||
## Retention knobs (`[retention]` in config.toml)
|
||||
|
||||
| Key | Meaning |
|
||||
|-----|---------|
|
||||
| `raw_soft_cap_bytes` | begin evicting **analyzed** sessions above this (oldest first) |
|
||||
| `raw_hard_cap_bytes` | absolute Tier 1 ceiling; overflow path may, as a last resort, evict un-analyzed sessions and report `data_loss` |
|
||||
| `raw_max_age_days` | backstop: analyzed raw older than this is evictable regardless of space |
|
||||
| `distilled_cap_bytes` | Tier 2 ceiling — **alert only**, never auto-dropped |
|
||||
|
||||
**Invariant:** a session's raw bytes are never dropped before its Tier 2 digest
|
||||
exists, except the explicitly-reported hard-cap overflow path.
|
||||
|
||||
## Tests
|
||||
|
||||
```bash
|
||||
python -m pytest # 26 tests: schema, adapter, store, digest, retention, ingest
|
||||
```
|
||||
|
||||
## Status
|
||||
|
||||
Phase 0 (AGENTIC-WP-0002): Claude adapter only, end to end. Codex and Grok
|
||||
adapters are designed (schemas confirmed in the design doc) and land in Phase 1.
|
||||
7
session_memory/__init__.py
Normal file
7
session_memory/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
||||
"""Coding Session Memory — Helix Forge capture + retention layer.
|
||||
|
||||
See docs/DESIGN-session-memory.md. Importable package name uses an underscore
|
||||
(``session_memory``) where the design doc writes ``session-memory/``.
|
||||
"""
|
||||
|
||||
__all__ = ["core", "adapters"]
|
||||
1
session_memory/adapters/__init__.py
Normal file
1
session_memory/adapters/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Per-flavor collector adapters (Tier 0 -> Tier 1 normalization)."""
|
||||
228
session_memory/adapters/claude.py
Normal file
228
session_memory/adapters/claude.py
Normal file
@@ -0,0 +1,228 @@
|
||||
"""Claude Code collector adapter — Tier 0 -> Tier 1 (design §2.1, §4.3).
|
||||
|
||||
Reads ``~/.claude/projects/<url-encoded-cwd>/<session-uuid>.jsonl`` (and
|
||||
``agent-*.jsonl`` sidechains), discriminates on the record ``type``, reconstructs
|
||||
the turn DAG via ``uuid``/``parentUuid``, and emits normalized records.
|
||||
|
||||
Returns a :class:`Normalized` bundle: the ``Session``, its ordered
|
||||
``SessionEvent`` list, and a ``blobs`` map (``payload_ref -> full text body``)
|
||||
that the store persists out-of-line so Tier 1 rows stay light.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Iterable, Optional
|
||||
|
||||
from ..core.schema import Cost, Session, SessionEvent
|
||||
|
||||
FLAVOR = "claude"
|
||||
|
||||
# tool_use names that mutate files -> kind "edit"
|
||||
_EDIT_TOOLS = {"Edit", "Write", "NotebookEdit", "MultiEdit"}
|
||||
# crude test-runner detection inside Bash commands -> kind "test_run"
|
||||
_TEST_HINTS = ("pytest", "unittest", "npm test", "npm run test", "go test", "cargo test", "jest", "vitest")
|
||||
|
||||
|
||||
@dataclass
|
||||
class Normalized:
|
||||
session: Session
|
||||
events: list[SessionEvent]
|
||||
blobs: dict[str, str] = field(default_factory=dict)
|
||||
|
||||
|
||||
def _iter_records(path: str) -> Iterable[dict[str, Any]]:
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
yield json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
continue # tolerate partial/corrupt trailing lines
|
||||
|
||||
|
||||
def _resolve_repo(cwd: Optional[str], repo_domain_map: dict[str, str]) -> tuple[Optional[str], Optional[str]]:
|
||||
"""cwd -> (repo, domain). repo is the cwd basename; domain via map."""
|
||||
if not cwd:
|
||||
return None, None
|
||||
repo = os.path.basename(cwd.rstrip("/")) or None
|
||||
domain = repo_domain_map.get(repo) if repo else None
|
||||
return repo, domain
|
||||
|
||||
|
||||
def _is_test_command(text: str) -> bool:
|
||||
low = text.lower()
|
||||
return any(h in low for h in _TEST_HINTS)
|
||||
|
||||
|
||||
def _content_blocks(message: dict[str, Any]) -> list[dict[str, Any]]:
|
||||
content = message.get("content")
|
||||
if isinstance(content, str):
|
||||
return [{"type": "text", "text": content}]
|
||||
if isinstance(content, list):
|
||||
return [b for b in content if isinstance(b, dict)]
|
||||
return []
|
||||
|
||||
|
||||
def parse_session(path: str, repo_domain_map: Optional[dict[str, str]] = None) -> Optional[Normalized]:
|
||||
"""Parse one Claude transcript file into a Normalized bundle.
|
||||
|
||||
Returns None if the file has no usable session records.
|
||||
"""
|
||||
repo_domain_map = repo_domain_map or {}
|
||||
records = list(_iter_records(path))
|
||||
if not records:
|
||||
return None
|
||||
|
||||
session_id: Optional[str] = None
|
||||
cwd = git_branch = version = model = None
|
||||
timestamps: list[str] = []
|
||||
file_is_sidechain = os.path.basename(path).startswith("agent-")
|
||||
|
||||
events: list[SessionEvent] = []
|
||||
blobs: dict[str, str] = {}
|
||||
uuid_to_seq: dict[str, int] = {}
|
||||
cost = Cost()
|
||||
seq = 0
|
||||
|
||||
def add_event(uuid: Optional[str], parent_uuid: Optional[str], ts, kind, *,
|
||||
role=None, tool=None, summary=None, body=None, tokens=0, sidechain=False):
|
||||
nonlocal seq
|
||||
s = seq
|
||||
seq += 1
|
||||
if uuid:
|
||||
uuid_to_seq[uuid] = s
|
||||
parent_seq = uuid_to_seq.get(parent_uuid) if parent_uuid else None
|
||||
payload_ref = None
|
||||
if body:
|
||||
payload_ref = f"blob://{session_id}/{s}"
|
||||
blobs[payload_ref] = body
|
||||
events.append(SessionEvent(
|
||||
session_uid=Session.make_uid(FLAVOR, session_id or "unknown"),
|
||||
seq=s, parent_seq=parent_seq, ts=ts, kind=kind, role=role, tool=tool,
|
||||
summary=(summary or "")[:300] or None, payload_ref=payload_ref,
|
||||
tokens=tokens, is_sidechain=sidechain or file_is_sidechain,
|
||||
))
|
||||
|
||||
for rec in records:
|
||||
rtype = rec.get("type")
|
||||
ts = rec.get("timestamp")
|
||||
if ts:
|
||||
timestamps.append(ts)
|
||||
session_id = session_id or rec.get("sessionId")
|
||||
cwd = cwd or rec.get("cwd")
|
||||
git_branch = git_branch or rec.get("gitBranch")
|
||||
version = version or rec.get("version")
|
||||
uuid = rec.get("uuid")
|
||||
parent = rec.get("parentUuid")
|
||||
sidechain = bool(rec.get("isSidechain"))
|
||||
|
||||
if rtype == "user":
|
||||
msg = rec.get("message", {})
|
||||
for b in _content_blocks(msg):
|
||||
bt = b.get("type")
|
||||
if bt == "tool_result":
|
||||
body = _stringify(b.get("content"))
|
||||
add_event(uuid, parent, ts, "tool_result", role="tool",
|
||||
summary="tool result", body=body, sidechain=sidechain)
|
||||
else:
|
||||
text = b.get("text", "")
|
||||
add_event(uuid, parent, ts, "user_msg", role="user",
|
||||
summary=_first_line(text), body=text, sidechain=sidechain)
|
||||
|
||||
elif rtype == "assistant":
|
||||
msg = rec.get("message", {})
|
||||
model = model or msg.get("model")
|
||||
usage = msg.get("usage") or {}
|
||||
cost.input_tokens += int(usage.get("input_tokens", 0) or 0)
|
||||
cost.output_tokens += int(usage.get("output_tokens", 0) or 0)
|
||||
cost.cache_tokens += int(
|
||||
(usage.get("cache_read_input_tokens", 0) or 0)
|
||||
+ (usage.get("cache_creation_input_tokens", 0) or 0)
|
||||
)
|
||||
out_tokens = int(usage.get("output_tokens", 0) or 0)
|
||||
for b in _content_blocks(msg):
|
||||
bt = b.get("type")
|
||||
if bt == "thinking":
|
||||
add_event(uuid, parent, ts, "thinking", role="assistant",
|
||||
summary="thinking", body=b.get("thinking", ""), sidechain=sidechain)
|
||||
elif bt == "text":
|
||||
text = b.get("text", "")
|
||||
add_event(uuid, parent, ts, "assistant_msg", role="assistant",
|
||||
summary=_first_line(text), body=text, tokens=out_tokens, sidechain=sidechain)
|
||||
elif bt == "tool_use":
|
||||
name = b.get("name", "")
|
||||
inp = b.get("input", {})
|
||||
body = _stringify(inp)
|
||||
kind = "tool_call"
|
||||
if name in _EDIT_TOOLS:
|
||||
kind = "edit"
|
||||
elif name == "Bash" and _is_test_command(_stringify(inp.get("command", ""))):
|
||||
kind = "test_run"
|
||||
add_event(uuid, parent, ts, kind, role="assistant", tool=name,
|
||||
summary=f"{name}", body=body, sidechain=sidechain)
|
||||
|
||||
elif rtype == "summary":
|
||||
add_event(uuid, parent, ts, "lifecycle", summary="summary",
|
||||
body=_stringify(rec.get("summary")), sidechain=sidechain)
|
||||
# queue-operation / ai-title / last-prompt / attachment: skipped as events
|
||||
|
||||
if session_id is None:
|
||||
return None
|
||||
|
||||
cost.turns = sum(1 for e in events if e.kind == "user_msg")
|
||||
started = min(timestamps) if timestamps else None
|
||||
ended = max(timestamps) if timestamps else None
|
||||
cost.wall_clock_s = _seconds_between(started, ended)
|
||||
|
||||
repo, domain = _resolve_repo(cwd, repo_domain_map)
|
||||
session = Session(
|
||||
session_uid=Session.make_uid(FLAVOR, session_id),
|
||||
flavor=FLAVOR,
|
||||
native_session_id=session_id,
|
||||
repo=repo, domain=domain, cwd=cwd, git_branch=git_branch,
|
||||
model=model, started_at=started, ended_at=ended,
|
||||
outcome="unknown", # outcome inference happens in the digest step (T04)
|
||||
cost=cost,
|
||||
source_path=path,
|
||||
source_bytes=os.path.getsize(path) if os.path.exists(path) else 0,
|
||||
discovered_at=_now(),
|
||||
)
|
||||
return Normalized(session=session, events=events, blobs=blobs)
|
||||
|
||||
|
||||
# ---- helpers ---------------------------------------------------------------
|
||||
|
||||
def _stringify(v: Any) -> str:
|
||||
if v is None:
|
||||
return ""
|
||||
if isinstance(v, str):
|
||||
return v
|
||||
try:
|
||||
return json.dumps(v, ensure_ascii=False)[:20000]
|
||||
except (TypeError, ValueError):
|
||||
return str(v)[:20000]
|
||||
|
||||
|
||||
def _first_line(text: str) -> str:
|
||||
return (text or "").strip().splitlines()[0] if (text or "").strip() else ""
|
||||
|
||||
|
||||
def _seconds_between(start: Optional[str], end: Optional[str]) -> float:
|
||||
if not start or not end:
|
||||
return 0.0
|
||||
try:
|
||||
a = datetime.fromisoformat(start.replace("Z", "+00:00"))
|
||||
b = datetime.fromisoformat(end.replace("Z", "+00:00"))
|
||||
return max(0.0, (b - a).total_seconds())
|
||||
except ValueError:
|
||||
return 0.0
|
||||
|
||||
|
||||
def _now() -> str:
|
||||
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
39
session_memory/config.toml
Normal file
39
session_memory/config.toml
Normal file
@@ -0,0 +1,39 @@
|
||||
# Coding Session Memory — configuration (design §5.1, §8).
|
||||
# Paths support ~ expansion. Edit caps to taste; see docs/DESIGN-session-memory.md.
|
||||
|
||||
[store]
|
||||
# Local store lives under the repo by default (gitignored).
|
||||
db_path = "session_memory/.store/mem.db"
|
||||
blob_dir = "session_memory/.store/blobs"
|
||||
cursor = "session_memory/.store/cursors.json"
|
||||
|
||||
[retention]
|
||||
raw_soft_cap_bytes = 4294967296 # 4 GiB — begin evicting analyzed sessions above this
|
||||
raw_hard_cap_bytes = 6442450944 # 6 GiB — absolute Tier 1 ceiling
|
||||
raw_max_age_days = 45 # backstop: analyzed raw older than this is evictable
|
||||
distilled_cap_bytes = 1073741824 # 1 GiB — Tier 2 ceiling (alert, never auto-drop)
|
||||
cadence = "daily" # sweep trigger: daily | weekly | on-hook
|
||||
|
||||
[sources.claude]
|
||||
enabled = true
|
||||
root = "~/.claude/projects"
|
||||
# glob, relative to root; covers sessions and agent-* sidechains
|
||||
glob = "*/*.jsonl"
|
||||
|
||||
# Codex / Grok adapters land in Phase 1 (schemas confirmed in the design doc).
|
||||
[sources.codex]
|
||||
enabled = false
|
||||
root = "~/.codex/sessions"
|
||||
glob = "*/*/*/rollout-*.jsonl"
|
||||
|
||||
[sources.grok]
|
||||
enabled = false
|
||||
root = "~/.grok/sessions"
|
||||
glob = "*/*/chat_history.jsonl"
|
||||
|
||||
# cwd basename -> domain slug. Used to tag sessions with their Custodian domain.
|
||||
[repo_domain_map]
|
||||
agentic-resources = "helix_forge"
|
||||
the-custodian = "custodian"
|
||||
state-hub = "custodian"
|
||||
ops-bridge = "custodian"
|
||||
1
session_memory/core/__init__.py
Normal file
1
session_memory/core/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Flavor-agnostic core: schema, store, cursor, digest, retention."""
|
||||
49
session_memory/core/cursor.py
Normal file
49
session_memory/core/cursor.py
Normal file
@@ -0,0 +1,49 @@
|
||||
"""Per-source ingest cursors (design §6; T06).
|
||||
|
||||
Tracks ``(path -> size, mtime)`` so sweeps re-ingest only changed/grown files.
|
||||
Persisted as a small JSON sidecar. Ingest itself is idempotent on
|
||||
``(session_uid, seq)`` in the store, so the cursor is an optimization, not a
|
||||
correctness requirement — a lost cursor just means a full (still-idempotent)
|
||||
re-scan.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class Cursors:
|
||||
def __init__(self, path: str):
|
||||
self.path = path
|
||||
self._data: dict[str, dict] = {}
|
||||
if os.path.exists(path):
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
self._data = json.load(f)
|
||||
except (OSError, ValueError):
|
||||
self._data = {}
|
||||
|
||||
def is_changed(self, file_path: str) -> bool:
|
||||
"""True if the file is new or has changed size/mtime since last seen."""
|
||||
try:
|
||||
stat = os.stat(file_path)
|
||||
except OSError:
|
||||
return False
|
||||
prev = self._data.get(file_path)
|
||||
return prev is None or prev.get("size") != stat.st_size or prev.get("mtime") != stat.st_mtime
|
||||
|
||||
def mark(self, file_path: str) -> None:
|
||||
try:
|
||||
stat = os.stat(file_path)
|
||||
except OSError:
|
||||
return
|
||||
self._data[file_path] = {"size": stat.st_size, "mtime": stat.st_mtime}
|
||||
|
||||
def save(self) -> None:
|
||||
os.makedirs(os.path.dirname(self.path) or ".", exist_ok=True)
|
||||
tmp = self.path + ".tmp"
|
||||
with open(tmp, "w", encoding="utf-8") as f:
|
||||
json.dump(self._data, f)
|
||||
os.replace(tmp, self.path)
|
||||
159
session_memory/core/digest.py
Normal file
159
session_memory/core/digest.py
Normal file
@@ -0,0 +1,159 @@
|
||||
"""Session digest — Tier 1 -> Tier 2 promotion (design §3, §4; T04).
|
||||
|
||||
Compresses a session's events into a small, durable digest: outcome heuristic,
|
||||
cost totals, tool histogram, and counts of error/retry/test/edit/human markers,
|
||||
plus a few key snippets. Writing the digest sets ``analyzed_at``, which is what
|
||||
makes a session evictable under budget-based retention (design §5).
|
||||
|
||||
Signal extraction beyond this digest is intentionally out of scope here — it
|
||||
belongs to the Detect phase (PRD §6.2).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import collections
|
||||
from typing import Any
|
||||
|
||||
from .schema import Session, SessionEvent
|
||||
|
||||
# Substrings in tool_result bodies / summaries that suggest a failure.
|
||||
_FAIL_HINTS = ("error", "failed", "exception", "traceback", "fatal", "non-zero")
|
||||
# Substrings suggesting a clean test pass.
|
||||
_PASS_HINTS = ("passed", "0 failed", "ok", "success")
|
||||
|
||||
|
||||
def infer_outcome(events: list[SessionEvent], blobs: dict[str, str] | None = None) -> str:
|
||||
"""Heuristic outcome label across flavors (design OQ2).
|
||||
|
||||
- ``abandoned`` if the session has no assistant output at all.
|
||||
- ``fail`` if the last substantive signal is an error / failing test.
|
||||
- ``success`` if it ends on assistant output or a passing test.
|
||||
- ``unknown`` otherwise.
|
||||
"""
|
||||
blobs = blobs or {}
|
||||
assistant = [e for e in events if e.kind == "assistant_msg"]
|
||||
if not assistant:
|
||||
return "abandoned"
|
||||
|
||||
# Look at error and test signals; weight the latest ones.
|
||||
last_fail = _last_index(events, lambda e: e.kind == "error")
|
||||
last_test = _last_index(events, lambda e: e.kind == "test_run")
|
||||
last_completion = _last_index(events, lambda e: e.kind in ("completion", "assistant_msg"))
|
||||
|
||||
test_passed = None
|
||||
if last_test is not None:
|
||||
# inspect the nearest following tool_result body for pass/fail hints
|
||||
body = _nearby_result_body(events, last_test, blobs)
|
||||
if body:
|
||||
low = body.lower()
|
||||
if any(h in low for h in _FAIL_HINTS):
|
||||
test_passed = False
|
||||
elif any(h in low for h in _PASS_HINTS):
|
||||
test_passed = True
|
||||
|
||||
if test_passed is False and (last_test or 0) >= (last_completion or 0):
|
||||
return "fail"
|
||||
if last_fail is not None and last_completion is not None and last_fail > last_completion:
|
||||
return "fail"
|
||||
if test_passed is True:
|
||||
return "success"
|
||||
if last_completion is not None:
|
||||
return "success"
|
||||
return "unknown"
|
||||
|
||||
|
||||
def build_digest(session: Session, events: list[SessionEvent],
|
||||
blobs: dict[str, str] | None = None) -> dict[str, Any]:
|
||||
"""Produce the compact Tier 2 digest dict for a session."""
|
||||
blobs = blobs or {}
|
||||
kind_counts = collections.Counter(e.kind for e in events)
|
||||
tool_hist = collections.Counter(e.tool for e in events if e.tool)
|
||||
retries = kind_counts.get("retry", 0)
|
||||
outcome = infer_outcome(events, blobs)
|
||||
|
||||
return {
|
||||
"session_uid": session.session_uid,
|
||||
"flavor": session.flavor,
|
||||
"repo": session.repo,
|
||||
"domain": session.domain,
|
||||
"model": session.model,
|
||||
"started_at": session.started_at,
|
||||
"ended_at": session.ended_at,
|
||||
"outcome": outcome,
|
||||
"cost": {
|
||||
"input_tokens": session.cost.input_tokens,
|
||||
"output_tokens": session.cost.output_tokens,
|
||||
"cache_tokens": session.cost.cache_tokens,
|
||||
"wall_clock_s": session.cost.wall_clock_s,
|
||||
"turns": session.cost.turns,
|
||||
"retries": retries,
|
||||
},
|
||||
"event_count": len(events),
|
||||
"kind_counts": dict(kind_counts),
|
||||
"tool_histogram": dict(tool_hist),
|
||||
"markers": {
|
||||
"errors": kind_counts.get("error", 0),
|
||||
"retries": retries,
|
||||
"test_runs": kind_counts.get("test_run", 0),
|
||||
"edits": kind_counts.get("edit", 0),
|
||||
"human_interventions": kind_counts.get("human_intervention", 0),
|
||||
},
|
||||
"first_prompt": _first_prompt(events, blobs),
|
||||
"last_assistant": _last_assistant(events, blobs),
|
||||
"schema_version": session.schema_version,
|
||||
}
|
||||
|
||||
|
||||
def analyze(store, session_uid: str) -> dict[str, Any]:
|
||||
"""Read a session from the store, write its digest, return the digest."""
|
||||
session = store.get_session(session_uid)
|
||||
if session is None:
|
||||
raise KeyError(session_uid)
|
||||
events = store.get_events(session_uid)
|
||||
blobs = {e.payload_ref: _read_blob(store, e.payload_ref)
|
||||
for e in events if e.payload_ref}
|
||||
digest = build_digest(session, events, blobs)
|
||||
store.write_digest(session_uid, digest)
|
||||
return digest
|
||||
|
||||
|
||||
# ---- helpers ---------------------------------------------------------------
|
||||
|
||||
def _last_index(events, pred):
|
||||
idx = None
|
||||
for i, e in enumerate(events):
|
||||
if pred(e):
|
||||
idx = i
|
||||
return idx
|
||||
|
||||
|
||||
def _nearby_result_body(events, idx, blobs):
|
||||
for e in events[idx + 1: idx + 4]:
|
||||
if e.kind == "tool_result" and e.payload_ref in blobs:
|
||||
return blobs[e.payload_ref]
|
||||
return None
|
||||
|
||||
|
||||
def _first_prompt(events, blobs):
|
||||
for e in events:
|
||||
if e.kind == "user_msg":
|
||||
return (blobs.get(e.payload_ref) or e.summary or "")[:280]
|
||||
return None
|
||||
|
||||
|
||||
def _last_assistant(events, blobs):
|
||||
for e in reversed(events):
|
||||
if e.kind == "assistant_msg":
|
||||
return (blobs.get(e.payload_ref) or e.summary or "")[:280]
|
||||
return None
|
||||
|
||||
|
||||
def _read_blob(store, ref):
|
||||
row = store.db.execute("SELECT path FROM blobs WHERE ref=?", (ref,)).fetchone()
|
||||
if not row:
|
||||
return ""
|
||||
try:
|
||||
with open(row["path"], "r", encoding="utf-8") as f:
|
||||
return f.read()
|
||||
except OSError:
|
||||
return ""
|
||||
144
session_memory/core/retention.py
Normal file
144
session_memory/core/retention.py
Normal file
@@ -0,0 +1,144 @@
|
||||
"""Budget-based retention sweep (design §5; T05).
|
||||
|
||||
Eviction is tied to the two conditions the design names — a session is dropped
|
||||
from Tier 1 once it has been *analyzed* (its digest is in Tier 2) **and** space is
|
||||
needed, with a max-age backstop. The invariant: raw bytes are never dropped
|
||||
before the Tier 2 digest exists, except the explicitly-reported hard-cap overflow
|
||||
path.
|
||||
|
||||
Order of passes per sweep:
|
||||
1. backstop — evict analyzed sessions older than ``raw_max_age_days``
|
||||
2. budget — while over ``raw_soft_cap_bytes``, evict oldest-analyzed first
|
||||
3. overflow — if still over ``raw_hard_cap_bytes`` and only un-analyzed bulk
|
||||
remains: analyze-now, retry budget; last resort evict oldest
|
||||
un-analyzed and emit a reported ``data_loss`` event.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from typing import Callable, Optional
|
||||
|
||||
from .schema import Session
|
||||
|
||||
|
||||
@dataclass
|
||||
class RetentionConfig:
|
||||
raw_soft_cap_bytes: int = 4 * 1024**3 # 4 GiB
|
||||
raw_hard_cap_bytes: int = 6 * 1024**3 # 6 GiB
|
||||
raw_max_age_days: int = 45
|
||||
distilled_cap_bytes: int = 1 * 1024**3 # 1 GiB (alert only, never auto-drop)
|
||||
|
||||
|
||||
@dataclass
|
||||
class EvictionReport:
|
||||
backstop_evicted: list[str] = field(default_factory=list)
|
||||
budget_evicted: list[str] = field(default_factory=list)
|
||||
overflow_analyzed: list[str] = field(default_factory=list)
|
||||
overflow_data_loss: list[str] = field(default_factory=list)
|
||||
bytes_freed: int = 0
|
||||
final_usage_bytes: int = 0
|
||||
over_hard_cap: bool = False
|
||||
tier2_over_cap: bool = False
|
||||
warnings: list[str] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def lost_data(self) -> bool:
|
||||
return bool(self.overflow_data_loss)
|
||||
|
||||
|
||||
def _parse_ts(ts: Optional[str]) -> Optional[datetime]:
|
||||
if not ts:
|
||||
return None
|
||||
try:
|
||||
return datetime.fromisoformat(ts.replace("Z", "+00:00"))
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def _age_days(s: Session, now: datetime) -> Optional[float]:
|
||||
ref = _parse_ts(s.ended_at) or _parse_ts(s.started_at) or _parse_ts(s.ingested_at)
|
||||
if ref is None:
|
||||
return None
|
||||
if ref.tzinfo is None:
|
||||
ref = ref.replace(tzinfo=timezone.utc)
|
||||
return (now - ref).total_seconds() / 86400.0
|
||||
|
||||
|
||||
def _sort_key(s: Session) -> str:
|
||||
# oldest-analyzed-first; fall back through timestamps
|
||||
return s.analyzed_at or s.ended_at or s.ingested_at or ""
|
||||
|
||||
|
||||
def sweep(store, config: RetentionConfig, *,
|
||||
analyze_fn: Optional[Callable[[object, str], object]] = None,
|
||||
now: Optional[datetime] = None) -> EvictionReport:
|
||||
"""Run one retention sweep against ``store``. Returns an EvictionReport.
|
||||
|
||||
``analyze_fn(store, session_uid)`` is used by the overflow path to make
|
||||
un-analyzed sessions evictable; pass ``digest.analyze``.
|
||||
"""
|
||||
now = now or datetime.now(timezone.utc)
|
||||
report = EvictionReport()
|
||||
|
||||
def live_sessions() -> list[Session]:
|
||||
return [s for s in store.list_sessions() if s.evicted_at is None]
|
||||
|
||||
# 1. backstop pass — analyzed + older than max age
|
||||
for s in sorted(live_sessions(), key=_sort_key):
|
||||
age = _age_days(s, now)
|
||||
if s.is_evictable and age is not None and age > config.raw_max_age_days:
|
||||
report.bytes_freed += store.evict_raw(s.session_uid)
|
||||
report.backstop_evicted.append(s.session_uid)
|
||||
|
||||
# 2. budget pass — evict oldest analyzed while over soft cap
|
||||
while store.tier1_usage_bytes() > config.raw_soft_cap_bytes:
|
||||
candidates = [s for s in live_sessions() if s.is_evictable]
|
||||
if not candidates:
|
||||
break # will not destroy un-analyzed data for space
|
||||
victim = min(candidates, key=_sort_key)
|
||||
report.bytes_freed += store.evict_raw(victim.session_uid)
|
||||
report.budget_evicted.append(victim.session_uid)
|
||||
|
||||
# 3. overflow path — only if still over HARD cap with un-analyzed bulk left
|
||||
if store.tier1_usage_bytes() > config.raw_hard_cap_bytes:
|
||||
# 3a. try to analyze now so those sessions become evictable
|
||||
if analyze_fn is not None:
|
||||
for s in sorted(live_sessions(), key=_sort_key):
|
||||
if not s.is_evictable:
|
||||
try:
|
||||
analyze_fn(store, s.session_uid)
|
||||
report.overflow_analyzed.append(s.session_uid)
|
||||
except Exception as e: # analysis may fail; keep going
|
||||
report.warnings.append(f"analyze failed for {s.session_uid}: {e}")
|
||||
# retry budget pass on the freshly-analyzed sessions
|
||||
while store.tier1_usage_bytes() > config.raw_soft_cap_bytes:
|
||||
candidates = [s for s in live_sessions() if s.is_evictable]
|
||||
if not candidates:
|
||||
break
|
||||
victim = min(candidates, key=_sort_key)
|
||||
report.bytes_freed += store.evict_raw(victim.session_uid)
|
||||
report.budget_evicted.append(victim.session_uid)
|
||||
|
||||
# 3b. last resort — evict oldest un-analyzed, REPORTED as data loss
|
||||
while store.tier1_usage_bytes() > config.raw_hard_cap_bytes:
|
||||
remaining = [s for s in live_sessions() if not s.is_evictable]
|
||||
if not remaining:
|
||||
break
|
||||
victim = min(remaining, key=_sort_key)
|
||||
report.bytes_freed += store.evict_raw(victim.session_uid)
|
||||
report.overflow_data_loss.append(victim.session_uid)
|
||||
report.warnings.append(
|
||||
f"data_loss: evicted un-analyzed {victim.session_uid} to stay under hard cap"
|
||||
)
|
||||
|
||||
usage = store.tier1_usage_bytes()
|
||||
report.final_usage_bytes = usage
|
||||
report.over_hard_cap = usage > config.raw_hard_cap_bytes
|
||||
report.tier2_over_cap = store.tier2_usage_bytes() > config.distilled_cap_bytes
|
||||
if report.tier2_over_cap:
|
||||
report.warnings.append(
|
||||
"tier2 distilled store over cap — flag for curation review (do not auto-drop)"
|
||||
)
|
||||
return report
|
||||
156
session_memory/core/schema.py
Normal file
156
session_memory/core/schema.py
Normal file
@@ -0,0 +1,156 @@
|
||||
"""Normalized session schema (Tier 1) — design doc §4.
|
||||
|
||||
Two record kinds, ``Session`` and ``SessionEvent``, plus the small enums every
|
||||
adapter targets. Field names here are the stable contract; per-flavor quirks are
|
||||
absorbed inside each adapter (see design §4.3 native -> kind mapping).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import asdict, dataclass, field, fields
|
||||
from typing import Any, Optional
|
||||
|
||||
SCHEMA_VERSION = 1
|
||||
|
||||
# Supported agent flavors. ``session_uid`` is always "<flavor>:<native id>".
|
||||
FLAVORS = ("claude", "codex", "grok")
|
||||
|
||||
# SessionEvent.kind universe (design §4.2 / §4.3).
|
||||
KINDS = (
|
||||
"user_msg",
|
||||
"assistant_msg",
|
||||
"thinking",
|
||||
"tool_call",
|
||||
"tool_result",
|
||||
"error",
|
||||
"test_run",
|
||||
"edit",
|
||||
"retry",
|
||||
"human_intervention",
|
||||
"decision",
|
||||
"lifecycle",
|
||||
"completion",
|
||||
)
|
||||
|
||||
# Session.outcome universe.
|
||||
OUTCOMES = ("success", "fail", "abandoned", "unknown")
|
||||
|
||||
|
||||
@dataclass
|
||||
class Cost:
|
||||
"""Token + effort accounting for a session."""
|
||||
|
||||
input_tokens: int = 0
|
||||
output_tokens: int = 0
|
||||
cache_tokens: int = 0
|
||||
wall_clock_s: float = 0.0
|
||||
turns: int = 0
|
||||
retries: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class Session:
|
||||
"""One bounded run of a coding agent against a repo (design §4.1)."""
|
||||
|
||||
session_uid: str # "<flavor>:<native id>" — globally unique
|
||||
flavor: str
|
||||
native_session_id: str
|
||||
repo: Optional[str] = None
|
||||
domain: Optional[str] = None
|
||||
cwd: Optional[str] = None
|
||||
git_branch: Optional[str] = None
|
||||
model: Optional[str] = None
|
||||
started_at: Optional[str] = None # ISO-8601 UTC
|
||||
ended_at: Optional[str] = None
|
||||
outcome: str = "unknown"
|
||||
cost: Cost = field(default_factory=Cost)
|
||||
task_ref: Optional[str] = None
|
||||
source_path: Optional[str] = None
|
||||
source_bytes: int = 0
|
||||
schema_version: int = SCHEMA_VERSION
|
||||
# watermarks (design §3.1): discovered -> ingested -> analyzed -> evicted
|
||||
discovered_at: Optional[str] = None
|
||||
ingested_at: Optional[str] = None
|
||||
analyzed_at: Optional[str] = None
|
||||
evicted_at: Optional[str] = None
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if self.flavor not in FLAVORS:
|
||||
raise ValueError(f"unknown flavor {self.flavor!r}; expected one of {FLAVORS}")
|
||||
if self.outcome not in OUTCOMES:
|
||||
raise ValueError(f"unknown outcome {self.outcome!r}; expected one of {OUTCOMES}")
|
||||
expected_prefix = f"{self.flavor}:"
|
||||
if not self.session_uid.startswith(expected_prefix):
|
||||
raise ValueError(
|
||||
f"session_uid {self.session_uid!r} must start with {expected_prefix!r}"
|
||||
)
|
||||
|
||||
@property
|
||||
def is_evictable(self) -> bool:
|
||||
"""A session may be evicted from Tier 1 only once analyzed (design §3.1)."""
|
||||
return self.analyzed_at is not None and self.evicted_at is None
|
||||
|
||||
@staticmethod
|
||||
def make_uid(flavor: str, native_session_id: str) -> str:
|
||||
return f"{flavor}:{native_session_id}"
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
d = asdict(self)
|
||||
return d
|
||||
|
||||
def to_json(self) -> str:
|
||||
return json.dumps(self.to_dict(), sort_keys=True)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, d: dict[str, Any]) -> "Session":
|
||||
d = dict(d)
|
||||
cost = d.pop("cost", None)
|
||||
obj = cls(**{k: v for k, v in d.items() if k in _SESSION_FIELDS})
|
||||
if cost is not None:
|
||||
obj.cost = Cost(**{k: v for k, v in cost.items() if k in _COST_FIELDS})
|
||||
return obj
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, s: str) -> "Session":
|
||||
return cls.from_dict(json.loads(s))
|
||||
|
||||
|
||||
@dataclass
|
||||
class SessionEvent:
|
||||
"""One atomic record within a session (design §4.2)."""
|
||||
|
||||
session_uid: str
|
||||
seq: int # monotonic within session
|
||||
ts: Optional[str] = None
|
||||
kind: str = "lifecycle"
|
||||
parent_seq: Optional[int] = None # turn DAG (Claude); None for flat flavors
|
||||
role: Optional[str] = None # user|assistant|system|tool
|
||||
tool: Optional[str] = None # when kind in {tool_call, tool_result}
|
||||
summary: Optional[str] = None # short, human-readable
|
||||
payload_ref: Optional[str] = None # pointer to full body in Tier 1 blob store
|
||||
tokens: int = 0
|
||||
is_sidechain: bool = False
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if self.kind not in KINDS:
|
||||
raise ValueError(f"unknown kind {self.kind!r}; expected one of {KINDS}")
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return asdict(self)
|
||||
|
||||
def to_json(self) -> str:
|
||||
return json.dumps(self.to_dict(), sort_keys=True)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, d: dict[str, Any]) -> "SessionEvent":
|
||||
return cls(**{k: v for k, v in d.items() if k in _EVENT_FIELDS})
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, s: str) -> "SessionEvent":
|
||||
return cls.from_dict(json.loads(s))
|
||||
|
||||
|
||||
_SESSION_FIELDS = {f.name for f in fields(Session)}
|
||||
_COST_FIELDS = {f.name for f in fields(Cost)}
|
||||
_EVENT_FIELDS = {f.name for f in fields(SessionEvent)}
|
||||
225
session_memory/core/store.py
Normal file
225
session_memory/core/store.py
Normal file
@@ -0,0 +1,225 @@
|
||||
"""Two-tier store (design §3, §8).
|
||||
|
||||
Tier 1 (bulky, evictable): ``Session`` + ``SessionEvent`` rows in SQLite, with
|
||||
event bodies written out-of-line as files under a blob dir (referenced by
|
||||
``payload_ref``). Tier 2 (compact, durable): per-session ``digest`` rows.
|
||||
|
||||
Writes are idempotent on ``(session_uid, seq)`` for events and on
|
||||
``session_uid`` for sessions/digests, so sweeps are safely re-runnable. Eviction
|
||||
(:meth:`evict_raw`) deletes Tier 1 rows + blobs but keeps the session row and its
|
||||
Tier 2 digest — the invariant that makes budget-based retention non-lossy.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sqlite3
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Optional
|
||||
|
||||
from .schema import Cost, Session, SessionEvent
|
||||
|
||||
_SAFE = re.compile(r"[^A-Za-z0-9._-]+")
|
||||
|
||||
|
||||
def _now() -> str:
|
||||
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
|
||||
|
||||
class Store:
|
||||
def __init__(self, db_path: str, blob_dir: str):
|
||||
self.db_path = db_path
|
||||
self.blob_dir = blob_dir
|
||||
os.makedirs(os.path.dirname(db_path) or ".", exist_ok=True)
|
||||
os.makedirs(blob_dir, exist_ok=True)
|
||||
self.db = sqlite3.connect(db_path)
|
||||
self.db.row_factory = sqlite3.Row
|
||||
self.db.execute("PRAGMA journal_mode=WAL")
|
||||
self._init_schema()
|
||||
|
||||
def close(self) -> None:
|
||||
self.db.close()
|
||||
|
||||
def __enter__(self) -> "Store":
|
||||
return self
|
||||
|
||||
def __exit__(self, *exc) -> None:
|
||||
self.close()
|
||||
|
||||
def _init_schema(self) -> None:
|
||||
self.db.executescript(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS sessions (
|
||||
session_uid TEXT PRIMARY KEY,
|
||||
json TEXT NOT NULL,
|
||||
analyzed_at TEXT,
|
||||
evicted_at TEXT
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS events (
|
||||
session_uid TEXT NOT NULL,
|
||||
seq INTEGER NOT NULL,
|
||||
json TEXT NOT NULL,
|
||||
PRIMARY KEY (session_uid, seq)
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS blobs (
|
||||
ref TEXT PRIMARY KEY,
|
||||
session_uid TEXT NOT NULL,
|
||||
path TEXT NOT NULL,
|
||||
nbytes INTEGER NOT NULL
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS digests (
|
||||
session_uid TEXT PRIMARY KEY,
|
||||
json TEXT NOT NULL,
|
||||
nbytes INTEGER NOT NULL
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS ix_events_uid ON events(session_uid);
|
||||
CREATE INDEX IF NOT EXISTS ix_blobs_uid ON blobs(session_uid);
|
||||
"""
|
||||
)
|
||||
self.db.commit()
|
||||
|
||||
# ---- Tier 1 writes -----------------------------------------------------
|
||||
|
||||
def upsert_session(self, s: Session) -> None:
|
||||
self.db.execute(
|
||||
"INSERT INTO sessions(session_uid, json, analyzed_at, evicted_at) "
|
||||
"VALUES(?,?,?,?) ON CONFLICT(session_uid) DO UPDATE SET "
|
||||
"json=excluded.json, analyzed_at=excluded.analyzed_at, evicted_at=excluded.evicted_at",
|
||||
(s.session_uid, s.to_json(), s.analyzed_at, s.evicted_at),
|
||||
)
|
||||
self.db.commit()
|
||||
|
||||
def upsert_events(self, events: list[SessionEvent]) -> int:
|
||||
rows = [(e.session_uid, e.seq, e.to_json()) for e in events]
|
||||
self.db.executemany(
|
||||
"INSERT INTO events(session_uid, seq, json) VALUES(?,?,?) "
|
||||
"ON CONFLICT(session_uid, seq) DO UPDATE SET json=excluded.json",
|
||||
rows,
|
||||
)
|
||||
self.db.commit()
|
||||
return len(rows)
|
||||
|
||||
def write_blobs(self, session_uid: str, blobs: dict[str, str]) -> int:
|
||||
"""Write event bodies as files; record path + size. Returns bytes written."""
|
||||
total = 0
|
||||
sub = os.path.join(self.blob_dir, _SAFE.sub("_", session_uid))
|
||||
os.makedirs(sub, exist_ok=True)
|
||||
for ref, body in blobs.items():
|
||||
data = body.encode("utf-8")
|
||||
fname = _SAFE.sub("_", ref) + ".txt"
|
||||
path = os.path.join(sub, fname)
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
f.write(body)
|
||||
self.db.execute(
|
||||
"INSERT INTO blobs(ref, session_uid, path, nbytes) VALUES(?,?,?,?) "
|
||||
"ON CONFLICT(ref) DO UPDATE SET path=excluded.path, nbytes=excluded.nbytes",
|
||||
(ref, session_uid, path, len(data)),
|
||||
)
|
||||
total += len(data)
|
||||
self.db.commit()
|
||||
return total
|
||||
|
||||
def ingest(self, bundle) -> None:
|
||||
"""Persist a full Normalized bundle (session + events + blobs)."""
|
||||
s = bundle.session
|
||||
if s.ingested_at is None:
|
||||
s.ingested_at = _now()
|
||||
self.upsert_session(s)
|
||||
self.upsert_events(bundle.events)
|
||||
self.write_blobs(s.session_uid, bundle.blobs)
|
||||
|
||||
# ---- Tier 2 (digest) ---------------------------------------------------
|
||||
|
||||
def write_digest(self, session_uid: str, digest: dict[str, Any], analyzed_at: Optional[str] = None) -> None:
|
||||
payload = json.dumps(digest, sort_keys=True)
|
||||
self.db.execute(
|
||||
"INSERT INTO digests(session_uid, json, nbytes) VALUES(?,?,?) "
|
||||
"ON CONFLICT(session_uid) DO UPDATE SET json=excluded.json, nbytes=excluded.nbytes",
|
||||
(session_uid, payload, len(payload.encode("utf-8"))),
|
||||
)
|
||||
self.db.execute(
|
||||
"UPDATE sessions SET analyzed_at=? WHERE session_uid=?",
|
||||
(analyzed_at or _now(), session_uid),
|
||||
)
|
||||
self.db.commit()
|
||||
|
||||
def get_digest(self, session_uid: str) -> Optional[dict[str, Any]]:
|
||||
row = self.db.execute("SELECT json FROM digests WHERE session_uid=?", (session_uid,)).fetchone()
|
||||
return json.loads(row["json"]) if row else None
|
||||
|
||||
# ---- reads -------------------------------------------------------------
|
||||
|
||||
def get_session(self, session_uid: str) -> Optional[Session]:
|
||||
row = self.db.execute(
|
||||
"SELECT json, analyzed_at, evicted_at FROM sessions WHERE session_uid=?", (session_uid,)
|
||||
).fetchone()
|
||||
return self._row_to_session(row) if row else None
|
||||
|
||||
def list_sessions(self) -> list[Session]:
|
||||
rows = self.db.execute("SELECT json, analyzed_at, evicted_at FROM sessions")
|
||||
return [self._row_to_session(r) for r in rows]
|
||||
|
||||
@staticmethod
|
||||
def _row_to_session(row) -> Session:
|
||||
"""Rebuild a Session, treating the watermark columns as authoritative."""
|
||||
s = Session.from_json(row["json"])
|
||||
s.analyzed_at = row["analyzed_at"]
|
||||
s.evicted_at = row["evicted_at"]
|
||||
return s
|
||||
|
||||
def get_events(self, session_uid: str) -> list[SessionEvent]:
|
||||
rows = self.db.execute(
|
||||
"SELECT json FROM events WHERE session_uid=? ORDER BY seq", (session_uid,)
|
||||
).fetchall()
|
||||
return [SessionEvent.from_json(r["json"]) for r in rows]
|
||||
|
||||
def count_events(self, session_uid: str) -> int:
|
||||
return self.db.execute(
|
||||
"SELECT COUNT(*) c FROM events WHERE session_uid=?", (session_uid,)
|
||||
).fetchone()["c"]
|
||||
|
||||
# ---- usage accounting (drives retention) -------------------------------
|
||||
|
||||
def tier1_usage_bytes(self) -> int:
|
||||
"""Bytes held in Tier 1: event-row JSON + blob bytes for non-evicted sessions."""
|
||||
row = self.db.execute(
|
||||
"SELECT COALESCE(SUM(LENGTH(json)),0) b FROM events e "
|
||||
"WHERE NOT EXISTS (SELECT 1 FROM sessions s "
|
||||
"WHERE s.session_uid=e.session_uid AND s.evicted_at IS NOT NULL)"
|
||||
).fetchone()
|
||||
blob = self.db.execute("SELECT COALESCE(SUM(nbytes),0) b FROM blobs").fetchone()
|
||||
return int(row["b"]) + int(blob["b"])
|
||||
|
||||
def session_tier1_bytes(self, session_uid: str) -> int:
|
||||
ev = self.db.execute(
|
||||
"SELECT COALESCE(SUM(LENGTH(json)),0) b FROM events WHERE session_uid=?", (session_uid,)
|
||||
).fetchone()["b"]
|
||||
bl = self.db.execute(
|
||||
"SELECT COALESCE(SUM(nbytes),0) b FROM blobs WHERE session_uid=?", (session_uid,)
|
||||
).fetchone()["b"]
|
||||
return int(ev) + int(bl)
|
||||
|
||||
def tier2_usage_bytes(self) -> int:
|
||||
return int(self.db.execute("SELECT COALESCE(SUM(nbytes),0) b FROM digests").fetchone()["b"])
|
||||
|
||||
# ---- eviction ----------------------------------------------------------
|
||||
|
||||
def evict_raw(self, session_uid: str) -> int:
|
||||
"""Drop Tier 1 raw (events + blob files) for a session; keep digest + row.
|
||||
|
||||
Sets ``evicted_at``. Returns bytes freed. Safe to call on an
|
||||
already-evicted session (no-op-ish).
|
||||
"""
|
||||
freed = self.session_tier1_bytes(session_uid)
|
||||
for r in self.db.execute("SELECT path FROM blobs WHERE session_uid=?", (session_uid,)).fetchall():
|
||||
try:
|
||||
os.remove(r["path"])
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
self.db.execute("DELETE FROM blobs WHERE session_uid=?", (session_uid,))
|
||||
self.db.execute("DELETE FROM events WHERE session_uid=?", (session_uid,))
|
||||
self.db.execute("UPDATE sessions SET evicted_at=? WHERE session_uid=?", (_now(), session_uid))
|
||||
self.db.commit()
|
||||
return freed
|
||||
128
session_memory/ingest.py
Normal file
128
session_memory/ingest.py
Normal file
@@ -0,0 +1,128 @@
|
||||
"""Session-memory sweep entrypoint (design §7; T06).
|
||||
|
||||
One sweep: discover (per enabled source) -> normalize (adapter) -> store ->
|
||||
digest -> retention-evict. Idempotent and re-runnable; intended to be triggered
|
||||
on the configured cadence (``/schedule`` daily/weekly) or by an agent hook.
|
||||
|
||||
Usage:
|
||||
python -m session_memory.ingest [--config PATH] [--once] [--dry-run]
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import glob
|
||||
import os
|
||||
import sys
|
||||
import tomllib
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
from .adapters import claude as claude_adapter
|
||||
from .core import digest as digest_mod
|
||||
from .core.cursor import Cursors
|
||||
from .core.retention import RetentionConfig, sweep as retention_sweep
|
||||
from .core.store import Store
|
||||
|
||||
# adapter dispatch by source name
|
||||
_ADAPTERS = {"claude": claude_adapter.parse_session}
|
||||
|
||||
|
||||
@dataclass
|
||||
class SweepResult:
|
||||
discovered: int = 0
|
||||
ingested: int = 0
|
||||
skipped_unchanged: int = 0
|
||||
analyzed: int = 0
|
||||
warnings: list[str] = field(default_factory=list)
|
||||
retention: Any = None
|
||||
|
||||
|
||||
def _expand(p: str) -> str:
|
||||
return os.path.expanduser(p)
|
||||
|
||||
|
||||
def load_config(path: str) -> dict[str, Any]:
|
||||
with open(path, "rb") as f:
|
||||
return tomllib.load(f)
|
||||
|
||||
|
||||
def run_sweep(config: dict[str, Any], *, dry_run: bool = False) -> SweepResult:
|
||||
store_cfg = config.get("store", {})
|
||||
ret_cfg = config.get("retention", {})
|
||||
repo_map = config.get("repo_domain_map", {})
|
||||
res = SweepResult()
|
||||
|
||||
# In dry-run we only discover + parse: no store is created or written.
|
||||
store = None if dry_run else Store(_expand(store_cfg["db_path"]), _expand(store_cfg["blob_dir"]))
|
||||
cursors = Cursors(_expand(store_cfg["cursor"]))
|
||||
|
||||
for name, src in config.get("sources", {}).items():
|
||||
if not src.get("enabled"):
|
||||
continue
|
||||
parse = _ADAPTERS.get(name)
|
||||
if parse is None:
|
||||
res.warnings.append(f"no adapter for source {name!r} (Phase 1)")
|
||||
continue
|
||||
root = _expand(src["root"])
|
||||
for fp in sorted(glob.glob(os.path.join(root, src["glob"]))):
|
||||
res.discovered += 1
|
||||
if not cursors.is_changed(fp):
|
||||
res.skipped_unchanged += 1
|
||||
continue
|
||||
try:
|
||||
bundle = parse(fp, repo_map)
|
||||
except Exception as e: # one bad file must not abort the sweep
|
||||
res.warnings.append(f"parse failed {fp}: {e}")
|
||||
continue
|
||||
if bundle is None:
|
||||
cursors.mark(fp)
|
||||
continue
|
||||
if not dry_run:
|
||||
store.ingest(bundle)
|
||||
digest_mod.analyze(store, bundle.session.session_uid)
|
||||
res.analyzed += 1
|
||||
res.ingested += 1
|
||||
cursors.mark(fp)
|
||||
|
||||
if not dry_run and store is not None:
|
||||
cursors.save()
|
||||
rc = RetentionConfig(
|
||||
raw_soft_cap_bytes=int(ret_cfg.get("raw_soft_cap_bytes", RetentionConfig.raw_soft_cap_bytes)),
|
||||
raw_hard_cap_bytes=int(ret_cfg.get("raw_hard_cap_bytes", RetentionConfig.raw_hard_cap_bytes)),
|
||||
raw_max_age_days=int(ret_cfg.get("raw_max_age_days", RetentionConfig.raw_max_age_days)),
|
||||
distilled_cap_bytes=int(ret_cfg.get("distilled_cap_bytes", RetentionConfig.distilled_cap_bytes)),
|
||||
)
|
||||
res.retention = retention_sweep(store, rc, analyze_fn=digest_mod.analyze)
|
||||
res.warnings.extend(res.retention.warnings)
|
||||
|
||||
if store is not None:
|
||||
store.close()
|
||||
return res
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
here = os.path.dirname(os.path.abspath(__file__))
|
||||
ap = argparse.ArgumentParser(description="Run one coding-session-memory sweep.")
|
||||
ap.add_argument("--config", default=os.path.join(here, "config.toml"))
|
||||
ap.add_argument("--dry-run", action="store_true", help="discover + parse, but do not write or evict")
|
||||
ap.add_argument("--once", action="store_true", help="(default) run a single sweep")
|
||||
args = ap.parse_args(argv)
|
||||
|
||||
config = load_config(args.config)
|
||||
res = run_sweep(config, dry_run=args.dry_run)
|
||||
|
||||
print(f"discovered={res.discovered} ingested={res.ingested} "
|
||||
f"skipped_unchanged={res.skipped_unchanged} analyzed={res.analyzed}")
|
||||
if res.retention is not None:
|
||||
r = res.retention
|
||||
print(f"retention: freed={r.bytes_freed}B final_usage={r.final_usage_bytes}B "
|
||||
f"backstop={len(r.backstop_evicted)} budget={len(r.budget_evicted)} "
|
||||
f"overflow_analyzed={len(r.overflow_analyzed)} data_loss={len(r.overflow_data_loss)}")
|
||||
for w in res.warnings:
|
||||
print(f" WARN: {w}", file=sys.stderr)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
99
tests/test_claude_adapter.py
Normal file
99
tests/test_claude_adapter.py
Normal file
@@ -0,0 +1,99 @@
|
||||
"""Claude adapter tests (T02): synthetic fixture + a real on-disk session."""
|
||||
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from session_memory.adapters.claude import parse_session # noqa: E402
|
||||
|
||||
REPO_MAP = {"agentic-resources": "helix_forge"}
|
||||
|
||||
|
||||
def _write_jsonl(path, records):
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
for r in records:
|
||||
f.write(json.dumps(r) + "\n")
|
||||
|
||||
|
||||
def test_synthetic_session(tmp_path):
|
||||
p = tmp_path / "11111111-2222-3333-4444-555555555555.jsonl"
|
||||
_write_jsonl(p, [
|
||||
{"type": "user", "uuid": "u1", "parentUuid": None,
|
||||
"timestamp": "2026-06-06T10:00:00Z", "sessionId": "sess-1",
|
||||
"cwd": "/home/worsch/agentic-resources", "gitBranch": "main",
|
||||
"version": "1.0", "message": {"role": "user", "content": "fix the bug"}},
|
||||
{"type": "assistant", "uuid": "a1", "parentUuid": "u1",
|
||||
"timestamp": "2026-06-06T10:00:05Z", "sessionId": "sess-1",
|
||||
"message": {"role": "assistant", "model": "claude-opus-4-8",
|
||||
"usage": {"input_tokens": 100, "output_tokens": 20,
|
||||
"cache_read_input_tokens": 10},
|
||||
"content": [
|
||||
{"type": "thinking", "thinking": "let me look"},
|
||||
{"type": "text", "text": "I'll edit the file."},
|
||||
{"type": "tool_use", "name": "Edit",
|
||||
"input": {"file_path": "x.py", "old_string": "a", "new_string": "b"}},
|
||||
{"type": "tool_use", "name": "Bash",
|
||||
"input": {"command": "pytest -q"}},
|
||||
]}},
|
||||
{"type": "user", "uuid": "u2", "parentUuid": "a1",
|
||||
"timestamp": "2026-06-06T10:00:10Z", "sessionId": "sess-1",
|
||||
"message": {"role": "user",
|
||||
"content": [{"type": "tool_result", "content": "6 passed"}]}},
|
||||
])
|
||||
|
||||
norm = parse_session(str(p), REPO_MAP)
|
||||
assert norm is not None
|
||||
s = norm.session
|
||||
assert s.session_uid == "claude:sess-1"
|
||||
assert s.repo == "agentic-resources" and s.domain == "helix_forge"
|
||||
assert s.model == "claude-opus-4-8"
|
||||
assert s.cost.input_tokens == 100 and s.cost.output_tokens == 20
|
||||
assert s.cost.cache_tokens == 10
|
||||
assert s.cost.turns == 1
|
||||
assert s.cost.wall_clock_s == 10.0
|
||||
|
||||
kinds = [e.kind for e in norm.events]
|
||||
assert kinds == ["user_msg", "thinking", "assistant_msg", "edit", "test_run", "tool_result"]
|
||||
|
||||
# turn DAG: assistant events link back to the first user msg (seq 0)
|
||||
edit_ev = next(e for e in norm.events if e.kind == "edit")
|
||||
assert edit_ev.parent_seq == 0
|
||||
assert edit_ev.tool == "Edit"
|
||||
|
||||
# bodies captured as blobs, referenced by payload_ref
|
||||
assert edit_ev.payload_ref in norm.blobs
|
||||
assert "x.py" in norm.blobs[edit_ev.payload_ref]
|
||||
|
||||
|
||||
def test_sidechain_filename_marks_events(tmp_path):
|
||||
p = tmp_path / "agent-deadbeef.jsonl"
|
||||
_write_jsonl(p, [
|
||||
{"type": "assistant", "uuid": "a1", "sessionId": "side-1",
|
||||
"timestamp": "2026-06-06T10:00:00Z",
|
||||
"message": {"role": "assistant", "content": [{"type": "text", "text": "hi"}]}},
|
||||
])
|
||||
norm = parse_session(str(p), REPO_MAP)
|
||||
assert norm.events[0].is_sidechain is True
|
||||
|
||||
|
||||
def test_real_local_session_if_available():
|
||||
"""Smoke-parse a real Claude transcript on this workstation, if present."""
|
||||
base = os.path.expanduser("~/.claude/projects/-home-worsch-agentic-resources")
|
||||
files = sorted(glob.glob(os.path.join(base, "*.jsonl")))
|
||||
if not files:
|
||||
return # environment without local sessions; synthetic tests cover logic
|
||||
parsed = 0
|
||||
for fp in files:
|
||||
norm = parse_session(fp, REPO_MAP)
|
||||
if norm is None:
|
||||
continue
|
||||
parsed += 1
|
||||
assert norm.session.session_uid.startswith("claude:")
|
||||
# seq is monotonic and unique
|
||||
seqs = [e.seq for e in norm.events]
|
||||
assert seqs == sorted(seqs)
|
||||
assert len(seqs) == len(set(seqs))
|
||||
assert parsed >= 1
|
||||
82
tests/test_digest.py
Normal file
82
tests/test_digest.py
Normal file
@@ -0,0 +1,82 @@
|
||||
"""Digest tests (T04): outcome heuristic + Tier 2 promotion."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from session_memory.adapters.claude import Normalized # noqa: E402
|
||||
from session_memory.core.digest import analyze, build_digest, infer_outcome # noqa: E402
|
||||
from session_memory.core.schema import Cost, Session, SessionEvent # noqa: E402
|
||||
from session_memory.core.store import Store # noqa: E402
|
||||
|
||||
|
||||
def _ev(uid, seq, kind, **kw):
|
||||
return SessionEvent(session_uid=uid, seq=seq, kind=kind, **kw)
|
||||
|
||||
|
||||
def test_infer_outcome_abandoned():
|
||||
uid = "claude:s"
|
||||
assert infer_outcome([_ev(uid, 0, "user_msg")]) == "abandoned"
|
||||
|
||||
|
||||
def test_infer_outcome_success_on_passing_test():
|
||||
uid = "claude:s"
|
||||
events = [
|
||||
_ev(uid, 0, "user_msg"),
|
||||
_ev(uid, 1, "assistant_msg"),
|
||||
_ev(uid, 2, "test_run", tool="Bash"),
|
||||
_ev(uid, 3, "tool_result", payload_ref="b3"),
|
||||
]
|
||||
assert infer_outcome(events, {"b3": "6 passed in 0.4s"}) == "success"
|
||||
|
||||
|
||||
def test_infer_outcome_fail_on_failing_test():
|
||||
uid = "claude:s"
|
||||
events = [
|
||||
_ev(uid, 0, "user_msg"),
|
||||
_ev(uid, 1, "assistant_msg"),
|
||||
_ev(uid, 2, "test_run", tool="Bash"),
|
||||
_ev(uid, 3, "tool_result", payload_ref="b3"),
|
||||
]
|
||||
assert infer_outcome(events, {"b3": "1 failed, traceback ..."}) == "fail"
|
||||
|
||||
|
||||
def test_build_digest_histograms_and_markers():
|
||||
uid = "claude:s"
|
||||
s = Session(session_uid=uid, flavor="claude", native_session_id="s",
|
||||
repo="agentic-resources", cost=Cost(input_tokens=100, output_tokens=40, turns=2))
|
||||
events = [
|
||||
_ev(uid, 0, "user_msg"),
|
||||
_ev(uid, 1, "edit", tool="Edit"),
|
||||
_ev(uid, 2, "edit", tool="Write"),
|
||||
_ev(uid, 3, "test_run", tool="Bash"),
|
||||
_ev(uid, 4, "error"),
|
||||
_ev(uid, 5, "assistant_msg"),
|
||||
]
|
||||
d = build_digest(s, events)
|
||||
assert d["tool_histogram"] == {"Edit": 1, "Write": 1, "Bash": 1}
|
||||
assert d["markers"]["edits"] == 2
|
||||
assert d["markers"]["errors"] == 1
|
||||
assert d["markers"]["test_runs"] == 1
|
||||
assert d["event_count"] == 6
|
||||
assert d["cost"]["input_tokens"] == 100
|
||||
|
||||
|
||||
def test_analyze_writes_digest_and_sets_analyzed(tmp_path):
|
||||
st = Store(str(tmp_path / "m.db"), str(tmp_path / "blobs"))
|
||||
uid = Session.make_uid("claude", "s1")
|
||||
s = Session(session_uid=uid, flavor="claude", native_session_id="s1")
|
||||
events = [
|
||||
SessionEvent(session_uid=uid, seq=0, kind="user_msg", payload_ref="b0"),
|
||||
SessionEvent(session_uid=uid, seq=1, kind="assistant_msg", payload_ref="b1"),
|
||||
]
|
||||
blobs = {"b0": "please help", "b1": "done"}
|
||||
st.ingest(Normalized(session=s, events=events, blobs=blobs))
|
||||
|
||||
assert st.get_session(uid).is_evictable is False
|
||||
d = analyze(st, uid)
|
||||
assert d["outcome"] == "success"
|
||||
assert d["first_prompt"] == "please help"
|
||||
assert st.get_session(uid).analyzed_at is not None
|
||||
assert st.get_session(uid).is_evictable is True # now promoted -> evictable
|
||||
81
tests/test_ingest.py
Normal file
81
tests/test_ingest.py
Normal file
@@ -0,0 +1,81 @@
|
||||
"""Ingest sweep + cursor tests (T06)."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from session_memory.core.cursor import Cursors # noqa: E402
|
||||
from session_memory.ingest import run_sweep # noqa: E402
|
||||
|
||||
|
||||
def test_cursor_change_detection(tmp_path):
|
||||
f = tmp_path / "a.jsonl"
|
||||
f.write_text("{}\n")
|
||||
cur = Cursors(str(tmp_path / "cur.json"))
|
||||
assert cur.is_changed(str(f)) is True
|
||||
cur.mark(str(f))
|
||||
assert cur.is_changed(str(f)) is False
|
||||
f.write_text("{}\n{}\n") # grow
|
||||
assert cur.is_changed(str(f)) is True
|
||||
|
||||
|
||||
def _claude_session_file(dir_path, native):
|
||||
os.makedirs(dir_path, exist_ok=True)
|
||||
p = os.path.join(dir_path, f"{native}.jsonl")
|
||||
recs = [
|
||||
{"type": "user", "uuid": "u1", "sessionId": native,
|
||||
"timestamp": "2026-06-06T10:00:00Z", "cwd": "/home/worsch/agentic-resources",
|
||||
"gitBranch": "main", "message": {"role": "user", "content": "hi"}},
|
||||
{"type": "assistant", "uuid": "a1", "parentUuid": "u1", "sessionId": native,
|
||||
"timestamp": "2026-06-06T10:00:02Z",
|
||||
"message": {"role": "assistant", "model": "claude-opus-4-8",
|
||||
"usage": {"input_tokens": 5, "output_tokens": 2},
|
||||
"content": [{"type": "text", "text": "hello"}]}},
|
||||
]
|
||||
with open(p, "w", encoding="utf-8") as f:
|
||||
for r in recs:
|
||||
f.write(json.dumps(r) + "\n")
|
||||
return p
|
||||
|
||||
|
||||
def _config(tmp_path, projects_dir):
|
||||
return {
|
||||
"store": {
|
||||
"db_path": str(tmp_path / ".store/mem.db"),
|
||||
"blob_dir": str(tmp_path / ".store/blobs"),
|
||||
"cursor": str(tmp_path / ".store/cursors.json"),
|
||||
},
|
||||
"retention": {"raw_soft_cap_bytes": 10**12, "raw_hard_cap_bytes": 10**12,
|
||||
"raw_max_age_days": 10**6, "distilled_cap_bytes": 10**12},
|
||||
"sources": {"claude": {"enabled": True, "root": str(projects_dir), "glob": "*/*.jsonl"}},
|
||||
"repo_domain_map": {"agentic-resources": "helix_forge"},
|
||||
}
|
||||
|
||||
|
||||
def test_run_sweep_end_to_end(tmp_path):
|
||||
projects = tmp_path / "projects"
|
||||
_claude_session_file(str(projects / "-home-worsch-agentic-resources"), "sess-aaa")
|
||||
cfg = _config(tmp_path, projects)
|
||||
|
||||
res = run_sweep(cfg)
|
||||
assert res.discovered == 1
|
||||
assert res.ingested == 1
|
||||
assert res.analyzed == 1
|
||||
assert res.retention is not None
|
||||
|
||||
# re-run: cursor skips the unchanged file (idempotent, cheap)
|
||||
res2 = run_sweep(cfg)
|
||||
assert res2.skipped_unchanged == 1
|
||||
assert res2.ingested == 0
|
||||
|
||||
|
||||
def test_dry_run_writes_nothing(tmp_path):
|
||||
projects = tmp_path / "projects"
|
||||
_claude_session_file(str(projects / "-home-worsch-agentic-resources"), "sess-bbb")
|
||||
cfg = _config(tmp_path, projects)
|
||||
res = run_sweep(cfg, dry_run=True)
|
||||
assert res.discovered == 1 and res.ingested == 1
|
||||
assert res.retention is None
|
||||
assert not os.path.exists(cfg["store"]["db_path"]) # no store created
|
||||
107
tests/test_retention.py
Normal file
107
tests/test_retention.py
Normal file
@@ -0,0 +1,107 @@
|
||||
"""Retention tests (T05): each pass of the budget-based eviction, with tiny caps."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from session_memory.adapters.claude import Normalized # noqa: E402
|
||||
from session_memory.core import digest as digest_mod # noqa: E402
|
||||
from session_memory.core.retention import RetentionConfig, sweep # noqa: E402
|
||||
from session_memory.core.schema import Cost, Session, SessionEvent # noqa: E402
|
||||
from session_memory.core.store import Store # noqa: E402
|
||||
|
||||
NOW = datetime(2026, 6, 6, tzinfo=timezone.utc)
|
||||
|
||||
|
||||
def _ingest(st, native, *, body_bytes=1000, ended=None, analyze=False):
|
||||
uid = Session.make_uid("claude", native)
|
||||
s = Session(session_uid=uid, flavor="claude", native_session_id=native,
|
||||
ended_at=(ended or NOW).strftime("%Y-%m-%dT%H:%M:%SZ"),
|
||||
ingested_at=NOW.strftime("%Y-%m-%dT%H:%M:%SZ"))
|
||||
ref = f"blob://{native}/0"
|
||||
events = [SessionEvent(session_uid=uid, seq=0, kind="assistant_msg", payload_ref=ref)]
|
||||
st.ingest(Normalized(session=s, events=events, blobs={ref: "x" * body_bytes}))
|
||||
if analyze:
|
||||
digest_mod.analyze(st, uid)
|
||||
return uid
|
||||
|
||||
|
||||
def _store(tmp_path):
|
||||
return Store(str(tmp_path / "m.db"), str(tmp_path / "blobs"))
|
||||
|
||||
|
||||
def test_backstop_evicts_old_analyzed_only(tmp_path):
|
||||
st = _store(tmp_path)
|
||||
old = _ingest(st, "old", ended=NOW - timedelta(days=60), analyze=True)
|
||||
young = _ingest(st, "young", ended=NOW - timedelta(days=1), analyze=True)
|
||||
unanalyzed_old = _ingest(st, "oldraw", ended=NOW - timedelta(days=60), analyze=False)
|
||||
|
||||
cfg = RetentionConfig(raw_soft_cap_bytes=10**12, raw_hard_cap_bytes=10**12, raw_max_age_days=45)
|
||||
rep = sweep(st, cfg, now=NOW)
|
||||
|
||||
assert old in rep.backstop_evicted
|
||||
assert young not in rep.backstop_evicted # too recent
|
||||
assert unanalyzed_old not in rep.backstop_evicted # not analyzed -> protected
|
||||
assert st.get_session(old).evicted_at is not None
|
||||
assert st.get_session(unanalyzed_old).evicted_at is None
|
||||
|
||||
|
||||
def test_budget_pass_evicts_oldest_analyzed_first(tmp_path):
|
||||
st = _store(tmp_path)
|
||||
a = _ingest(st, "a", body_bytes=2000, ended=NOW - timedelta(days=3), analyze=True)
|
||||
b = _ingest(st, "b", body_bytes=2000, ended=NOW - timedelta(days=2), analyze=True)
|
||||
c = _ingest(st, "c", body_bytes=2000, ended=NOW - timedelta(days=1), analyze=True)
|
||||
|
||||
# soft cap that forces evicting ~two of the three
|
||||
cfg = RetentionConfig(raw_soft_cap_bytes=2500, raw_hard_cap_bytes=10**9, raw_max_age_days=10**6)
|
||||
rep = sweep(st, cfg, now=NOW)
|
||||
|
||||
assert rep.budget_evicted[:2] == [a, b] # oldest-first
|
||||
assert st.get_session(c).evicted_at is None # newest survives
|
||||
assert st.tier1_usage_bytes() <= cfg.raw_soft_cap_bytes
|
||||
|
||||
|
||||
def test_budget_pass_never_touches_unanalyzed(tmp_path):
|
||||
st = _store(tmp_path)
|
||||
raw1 = _ingest(st, "r1", body_bytes=5000, analyze=False)
|
||||
raw2 = _ingest(st, "r2", body_bytes=5000, analyze=False)
|
||||
|
||||
cfg = RetentionConfig(raw_soft_cap_bytes=100, raw_hard_cap_bytes=10**9, raw_max_age_days=10**6)
|
||||
rep = sweep(st, cfg, now=NOW)
|
||||
|
||||
# over soft cap but nothing analyzed -> no eviction, no data loss
|
||||
assert rep.budget_evicted == []
|
||||
assert rep.lost_data is False
|
||||
assert st.get_session(raw1).evicted_at is None
|
||||
assert st.get_session(raw2).evicted_at is None
|
||||
assert st.tier1_usage_bytes() > cfg.raw_soft_cap_bytes # tolerated, not destroyed
|
||||
|
||||
|
||||
def test_overflow_analyzes_then_evicts_without_data_loss(tmp_path):
|
||||
st = _store(tmp_path)
|
||||
r1 = _ingest(st, "r1", body_bytes=4000, ended=NOW - timedelta(days=2), analyze=False)
|
||||
r2 = _ingest(st, "r2", body_bytes=4000, ended=NOW - timedelta(days=1), analyze=False)
|
||||
|
||||
cfg = RetentionConfig(raw_soft_cap_bytes=3000, raw_hard_cap_bytes=5000, raw_max_age_days=10**6)
|
||||
rep = sweep(st, cfg, now=NOW, analyze_fn=digest_mod.analyze)
|
||||
|
||||
# overflow path analyzed the un-analyzed sessions, then budget-evicted
|
||||
assert set(rep.overflow_analyzed) == {r1, r2}
|
||||
assert rep.lost_data is False # analysis avoided data loss
|
||||
assert st.tier1_usage_bytes() <= cfg.raw_soft_cap_bytes
|
||||
|
||||
|
||||
def test_overflow_last_resort_reports_data_loss(tmp_path):
|
||||
st = _store(tmp_path)
|
||||
# one un-analyzed session bigger than the hard cap, analysis disabled (no fn)
|
||||
big = _ingest(st, "big", body_bytes=20000, analyze=False)
|
||||
|
||||
cfg = RetentionConfig(raw_soft_cap_bytes=1000, raw_hard_cap_bytes=2000, raw_max_age_days=10**6)
|
||||
rep = sweep(st, cfg, now=NOW, analyze_fn=None)
|
||||
|
||||
assert big in rep.overflow_data_loss
|
||||
assert rep.lost_data is True
|
||||
assert any("data_loss" in w for w in rep.warnings)
|
||||
assert st.get_session(big).evicted_at is not None
|
||||
97
tests/test_schema.py
Normal file
97
tests/test_schema.py
Normal file
@@ -0,0 +1,97 @@
|
||||
"""Round-trip + validation tests for the normalized schema (T01)."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from session_memory.core.schema import ( # noqa: E402
|
||||
SCHEMA_VERSION,
|
||||
Cost,
|
||||
Session,
|
||||
SessionEvent,
|
||||
)
|
||||
|
||||
|
||||
def _sample_session() -> Session:
|
||||
return Session(
|
||||
session_uid=Session.make_uid("claude", "abc-123"),
|
||||
flavor="claude",
|
||||
native_session_id="abc-123",
|
||||
repo="agentic-resources",
|
||||
domain="helix_forge",
|
||||
cwd="/home/worsch/agentic-resources",
|
||||
git_branch="main",
|
||||
model="claude-opus-4-8",
|
||||
started_at="2026-06-06T10:00:00Z",
|
||||
ended_at="2026-06-06T10:15:00Z",
|
||||
outcome="success",
|
||||
cost=Cost(input_tokens=100, output_tokens=50, turns=3, retries=1),
|
||||
task_ref="AGENTIC-WP-0002-T01",
|
||||
source_path="~/.claude/projects/x/abc-123.jsonl",
|
||||
source_bytes=2048,
|
||||
ingested_at="2026-06-06T10:16:00Z",
|
||||
)
|
||||
|
||||
|
||||
def test_session_round_trip():
|
||||
s = _sample_session()
|
||||
restored = Session.from_json(s.to_json())
|
||||
assert restored == s
|
||||
assert restored.cost == s.cost
|
||||
assert restored.schema_version == SCHEMA_VERSION
|
||||
|
||||
|
||||
def test_session_uid_helper_and_prefix_enforced():
|
||||
assert Session.make_uid("grok", "z9") == "grok:z9"
|
||||
with pytest.raises(ValueError):
|
||||
Session(session_uid="codex:wrong", flavor="claude", native_session_id="wrong")
|
||||
|
||||
|
||||
def test_unknown_flavor_and_outcome_rejected():
|
||||
with pytest.raises(ValueError):
|
||||
Session(session_uid="x:1", flavor="x", native_session_id="1")
|
||||
with pytest.raises(ValueError):
|
||||
Session(
|
||||
session_uid="claude:1",
|
||||
flavor="claude",
|
||||
native_session_id="1",
|
||||
outcome="bogus",
|
||||
)
|
||||
|
||||
|
||||
def test_is_evictable_requires_analyzed_not_evicted():
|
||||
s = _sample_session()
|
||||
assert s.is_evictable is False # not analyzed yet
|
||||
s.analyzed_at = "2026-06-06T10:17:00Z"
|
||||
assert s.is_evictable is True
|
||||
s.evicted_at = "2026-06-06T11:00:00Z"
|
||||
assert s.is_evictable is False # already evicted
|
||||
|
||||
|
||||
def test_event_round_trip_and_kind_validation():
|
||||
e = SessionEvent(
|
||||
session_uid="claude:abc-123",
|
||||
seq=4,
|
||||
parent_seq=3,
|
||||
ts="2026-06-06T10:01:00Z",
|
||||
kind="tool_call",
|
||||
role="assistant",
|
||||
tool="Bash",
|
||||
summary="ran pytest -q",
|
||||
payload_ref="blob://abc-123/4",
|
||||
tokens=12,
|
||||
)
|
||||
assert SessionEvent.from_json(e.to_json()) == e
|
||||
with pytest.raises(ValueError):
|
||||
SessionEvent(session_uid="claude:1", seq=0, kind="not_a_kind")
|
||||
|
||||
|
||||
def test_from_dict_ignores_unknown_fields():
|
||||
d = _sample_session().to_dict()
|
||||
d["future_field"] = "ignored"
|
||||
d["cost"]["future_cost"] = 999
|
||||
restored = Session.from_dict(d)
|
||||
assert restored.repo == "agentic-resources"
|
||||
82
tests/test_store.py
Normal file
82
tests/test_store.py
Normal file
@@ -0,0 +1,82 @@
|
||||
"""Store tests (T03): idempotent ingest, usage accounting, eviction."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from session_memory.adapters.claude import Normalized # noqa: E402
|
||||
from session_memory.core.schema import Cost, Session, SessionEvent # noqa: E402
|
||||
from session_memory.core.store import Store # noqa: E402
|
||||
|
||||
|
||||
def _bundle(uid_native="s1", n_events=3):
|
||||
s = Session(
|
||||
session_uid=Session.make_uid("claude", uid_native),
|
||||
flavor="claude", native_session_id=uid_native,
|
||||
repo="agentic-resources", domain="helix_forge",
|
||||
cost=Cost(input_tokens=10),
|
||||
)
|
||||
events, blobs = [], {}
|
||||
for i in range(n_events):
|
||||
ref = f"blob://{uid_native}/{i}"
|
||||
events.append(SessionEvent(session_uid=s.session_uid, seq=i, kind="assistant_msg",
|
||||
payload_ref=ref, summary=f"msg {i}"))
|
||||
blobs[ref] = "x" * 100
|
||||
return Normalized(session=s, events=events, blobs=blobs)
|
||||
|
||||
|
||||
def _store(tmp_path):
|
||||
return Store(str(tmp_path / "mem.db"), str(tmp_path / "blobs"))
|
||||
|
||||
|
||||
def test_ingest_and_read_back(tmp_path):
|
||||
st = _store(tmp_path)
|
||||
b = _bundle("s1", 3)
|
||||
st.ingest(b)
|
||||
s = st.get_session(b.session.session_uid)
|
||||
assert s is not None and s.ingested_at is not None
|
||||
assert st.count_events(b.session.session_uid) == 3
|
||||
assert st.tier1_usage_bytes() > 0
|
||||
|
||||
|
||||
def test_ingest_is_idempotent(tmp_path):
|
||||
st = _store(tmp_path)
|
||||
b = _bundle("s1", 3)
|
||||
st.ingest(b)
|
||||
before = st.tier1_usage_bytes()
|
||||
st.ingest(b) # re-run same sweep
|
||||
assert st.count_events(b.session.session_uid) == 3 # no duplicate rows
|
||||
assert st.tier1_usage_bytes() == before # blobs upserted, not doubled
|
||||
|
||||
|
||||
def test_digest_sets_analyzed_and_tier2_bytes(tmp_path):
|
||||
st = _store(tmp_path)
|
||||
b = _bundle("s1", 2)
|
||||
st.ingest(b)
|
||||
assert st.get_session(b.session.session_uid).analyzed_at is None
|
||||
st.write_digest(b.session.session_uid, {"outcome": "success", "tools": {"Edit": 1}})
|
||||
assert st.get_session(b.session.session_uid).analyzed_at is not None
|
||||
assert st.tier2_usage_bytes() > 0
|
||||
assert st.get_digest(b.session.session_uid)["outcome"] == "success"
|
||||
|
||||
|
||||
def test_evict_raw_keeps_digest_drops_raw(tmp_path):
|
||||
st = _store(tmp_path)
|
||||
b = _bundle("s1", 3)
|
||||
st.ingest(b)
|
||||
st.write_digest(b.session.session_uid, {"outcome": "unknown"})
|
||||
blob_dir_files_before = sum(len(f) for _, _, f in os.walk(str(tmp_path / "blobs")))
|
||||
assert blob_dir_files_before > 0
|
||||
|
||||
freed = st.evict_raw(b.session.session_uid)
|
||||
assert freed > 0
|
||||
assert st.count_events(b.session.session_uid) == 0 # raw gone
|
||||
assert st.get_events(b.session.session_uid) == []
|
||||
assert st.get_session(b.session.session_uid).evicted_at is not None
|
||||
assert st.get_digest(b.session.session_uid) is not None # Tier 2 preserved
|
||||
# blob files removed from disk
|
||||
remaining = [f for _, _, fs in os.walk(str(tmp_path / "blobs")) for f in fs]
|
||||
assert remaining == []
|
||||
# evicted session no longer counts toward Tier 1 usage
|
||||
assert st.tier1_usage_bytes() == 0
|
||||
143
workplans/AGENTIC-WP-0002-session-memory-phase0.md
Normal file
143
workplans/AGENTIC-WP-0002-session-memory-phase0.md
Normal file
@@ -0,0 +1,143 @@
|
||||
---
|
||||
id: AGENTIC-WP-0002
|
||||
type: workplan
|
||||
title: "Coding Session Memory — Phase 0 (Capture + budget-based retention)"
|
||||
domain: helix_forge
|
||||
repo: agentic-resources
|
||||
status: finished
|
||||
owner: codex
|
||||
topic_slug: helix-forge
|
||||
created: "2026-06-06"
|
||||
updated: "2026-06-06"
|
||||
state_hub_workstream_id: "06e6726d-057d-47d8-84f4-0974858f6288"
|
||||
---
|
||||
|
||||
# Coding Session Memory — Phase 0
|
||||
|
||||
Implements Phase 0 of [PRD-helix-forge](../docs/PRD-helix-forge.md) per the
|
||||
[session-memory design](../docs/DESIGN-session-memory.md): a normalized session
|
||||
schema, the first (Claude) collector, a two-tier store, and a budget-based
|
||||
eviction sweep that drops analyzed/over-budget raw content while preserving
|
||||
compact digests.
|
||||
|
||||
Scope is deliberately one agent flavor (Claude, schema verified on disk) end to
|
||||
end, so the agnostic-core / thin-adapter boundary is proven before Codex and Grok
|
||||
adapters land in Phase 1.
|
||||
|
||||
## Define Normalized Session Schema
|
||||
|
||||
```task
|
||||
id: AGENTIC-WP-0002-T01
|
||||
status: done
|
||||
priority: high
|
||||
state_hub_task_id: "61297a16-257c-4579-bd1f-3db035781258"
|
||||
```
|
||||
|
||||
Implement `core/schema.py` with the `Session` and `SessionEvent` dataclasses from
|
||||
design §4, including `schema_version`, the `flavor`-prefixed `session_uid`, the
|
||||
cost block, and the `discovered/ingested/analyzed/evicted` watermarks. Add
|
||||
round-trip (de)serialization tests. This is the contract every adapter targets.
|
||||
|
||||
## Claude Collector Adapter
|
||||
|
||||
```task
|
||||
id: AGENTIC-WP-0002-T02
|
||||
status: done
|
||||
priority: high
|
||||
state_hub_task_id: "3b4e6b35-b4f3-40dc-a845-7ac78aa20d62"
|
||||
```
|
||||
|
||||
Implement `adapters/claude.py`: read `~/.claude/projects/<cwd>/<uuid>.jsonl`,
|
||||
discriminate on `type`, reconstruct the turn DAG via `uuid`/`parentUuid`, map
|
||||
records onto `SessionEvent.kind`, capture `message.usage` into the cost block,
|
||||
handle `agent-*.jsonl` sidechains (`is_sidechain`), and resolve `repo`/`domain`
|
||||
from `cwd`. Verify against real local sessions in this repo's project dir. No
|
||||
Codex/Grok work in Phase 0 (designed for, not built).
|
||||
|
||||
## Tier 1 / Tier 2 Store
|
||||
|
||||
```task
|
||||
id: AGENTIC-WP-0002-T03
|
||||
status: done
|
||||
priority: high
|
||||
state_hub_task_id: "2387258e-ba6d-4a41-919e-f2f4e0822110"
|
||||
```
|
||||
|
||||
Implement `core/store.py`: SQLite for `Session`/`SessionEvent` rows plus a blob
|
||||
dir for `payload_ref` bodies (Tier 1), and a compact `digest` table (Tier 2).
|
||||
Writes are idempotent on `(session_uid, seq)`. Provide usage-bytes accounting for
|
||||
Tier 1 (rows + blobs) and Tier 2, used by retention.
|
||||
|
||||
## Session Digest (Tier 1 → Tier 2)
|
||||
|
||||
```task
|
||||
id: AGENTIC-WP-0002-T04
|
||||
status: done
|
||||
priority: medium
|
||||
state_hub_task_id: "017d8e90-633a-49f2-b342-8690938798cd"
|
||||
```
|
||||
|
||||
Implement `core/digest.py`: produce a per-session digest (outcome heuristic, cost
|
||||
totals, tool histogram, error/retry/human-intervention markers, key snippets) and
|
||||
set `analyzed_at`. This is the promotion step that makes a session evictable.
|
||||
Signal extraction beyond the digest stays stubbed for the Detect phase.
|
||||
|
||||
## Budget-Based Retention Sweep
|
||||
|
||||
```task
|
||||
id: AGENTIC-WP-0002-T05
|
||||
status: done
|
||||
priority: high
|
||||
state_hub_task_id: "89177c79-528e-4023-a7eb-67f8e0276ba9"
|
||||
```
|
||||
|
||||
Implement `core/retention.py` per design §5: backstop pass (`raw_max_age_days`),
|
||||
budget pass (evict oldest `analyzed_at` first while over `raw_soft_cap_bytes`,
|
||||
never touching un-analyzed sessions), and the hard-cap overflow path (analyze-now,
|
||||
then last-resort evict oldest un-analyzed with a reported `data_loss` event).
|
||||
Enforce the invariant: raw bytes are never dropped before the Tier 2 digest
|
||||
exists (except the reported overflow path). Cover each branch with tests using
|
||||
synthetic sessions and tiny caps.
|
||||
|
||||
## Ingest Cursor + Sweep Entrypoint
|
||||
|
||||
```task
|
||||
id: AGENTIC-WP-0002-T06
|
||||
status: done
|
||||
priority: medium
|
||||
state_hub_task_id: "a4b35c76-154d-4e99-b6d0-61cb6e47ecc0"
|
||||
```
|
||||
|
||||
Implement `core/cursor.py` (per-source `(path,size,mtime,offset)` cursors,
|
||||
idempotent re-runs) and `ingest.py` wiring one sweep: discover → normalize
|
||||
(Claude) → store → digest → evict. Add `config.toml` with the §5.1 retention
|
||||
knobs, source paths, and repo→domain map. Document running a sweep and the
|
||||
intended `cadence` trigger (`/schedule` daily/weekly) in the repo docs.
|
||||
|
||||
## Verify End-to-End on Real Sessions
|
||||
|
||||
```task
|
||||
id: AGENTIC-WP-0002-T07
|
||||
status: done
|
||||
priority: medium
|
||||
state_hub_task_id: "98d5cc7c-c285-4556-91a3-a85e0a2bb6df"
|
||||
```
|
||||
|
||||
Run the full sweep against this workstation's real Claude sessions; confirm
|
||||
normalized rows, digests, idempotent re-run, and an eviction cycle under a small
|
||||
test cap (analyzed dropped, un-analyzed preserved, overflow reported). Record
|
||||
results and update the design doc's open questions (esp. OQ4 real per-session
|
||||
sizes). After workplan file updates, notify the custodian operator to run from
|
||||
`~/state-hub`:
|
||||
|
||||
```bash
|
||||
make fix-consistency REPO=agentic-resources
|
||||
```
|
||||
|
||||
**Verification results (2026-06-06):** full suite 26/26 green. Live sweep over 85
|
||||
real local Claude transcripts → 84 parsed (1 empty, tolerated) → 63 distinct
|
||||
sessions ingested + analyzed. Eviction under a 2 MB soft cap freed 26 MB, ended at
|
||||
1.34 MB (under cap), zero data loss, and every evicted session kept its Tier 2
|
||||
digest (invariant holds). Idempotent re-run skipped 84 unchanged, re-ingested 1
|
||||
(the live session). Outcomes 52 success / 11 abandoned. OQ4 sizes recorded in the
|
||||
design doc; OQ6 (multi-file sessions) noted as a Phase-1 refinement.
|
||||
Reference in New Issue
Block a user