session-memory Phase 0: normalized schema (T01) + Claude adapter (T02)

- session_memory/core/schema.py: Session/SessionEvent/Cost dataclasses,
  flavor-prefixed uids, watermarks, kind/outcome validation (T01)
- session_memory/adapters/claude.py: JSONL -> Normalized bundle, turn DAG
  via uuid/parentUuid, kind mapping, cost from message.usage (T02)
- tests: schema round-trip + adapter (synthetic + real local session)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-06 19:06:10 +02:00
parent ffe191d44e
commit 1c29a94fa9
9 changed files with 598 additions and 3 deletions

6
.gitignore vendored
View File

@@ -174,3 +174,9 @@ cython_debug/
# PyPI configuration file
.pypirc
# session-memory local store
session_memory/.store/
__pycache__/
*.pyc
.pytest_cache/

View File

@@ -0,0 +1,7 @@
"""Coding Session Memory — Helix Forge capture + retention layer.
See docs/DESIGN-session-memory.md. Importable package name uses an underscore
(``session_memory``) where the design doc writes ``session-memory/``.
"""
__all__ = ["core", "adapters"]

View File

@@ -0,0 +1 @@
"""Per-flavor collector adapters (Tier 0 -> Tier 1 normalization)."""

View File

@@ -0,0 +1,228 @@
"""Claude Code collector adapter — Tier 0 -> Tier 1 (design §2.1, §4.3).
Reads ``~/.claude/projects/<url-encoded-cwd>/<session-uuid>.jsonl`` (and
``agent-*.jsonl`` sidechains), discriminates on the record ``type``, reconstructs
the turn DAG via ``uuid``/``parentUuid``, and emits normalized records.
Returns a :class:`Normalized` bundle: the ``Session``, its ordered
``SessionEvent`` list, and a ``blobs`` map (``payload_ref -> full text body``)
that the store persists out-of-line so Tier 1 rows stay light.
"""
from __future__ import annotations
import json
import os
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Iterable, Optional
from ..core.schema import Cost, Session, SessionEvent
FLAVOR = "claude"
# tool_use names that mutate files -> kind "edit"
_EDIT_TOOLS = {"Edit", "Write", "NotebookEdit", "MultiEdit"}
# crude test-runner detection inside Bash commands -> kind "test_run"
_TEST_HINTS = ("pytest", "unittest", "npm test", "npm run test", "go test", "cargo test", "jest", "vitest")
@dataclass
class Normalized:
session: Session
events: list[SessionEvent]
blobs: dict[str, str] = field(default_factory=dict)
def _iter_records(path: str) -> Iterable[dict[str, Any]]:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError:
continue # tolerate partial/corrupt trailing lines
def _resolve_repo(cwd: Optional[str], repo_domain_map: dict[str, str]) -> tuple[Optional[str], Optional[str]]:
"""cwd -> (repo, domain). repo is the cwd basename; domain via map."""
if not cwd:
return None, None
repo = os.path.basename(cwd.rstrip("/")) or None
domain = repo_domain_map.get(repo) if repo else None
return repo, domain
def _is_test_command(text: str) -> bool:
low = text.lower()
return any(h in low for h in _TEST_HINTS)
def _content_blocks(message: dict[str, Any]) -> list[dict[str, Any]]:
content = message.get("content")
if isinstance(content, str):
return [{"type": "text", "text": content}]
if isinstance(content, list):
return [b for b in content if isinstance(b, dict)]
return []
def parse_session(path: str, repo_domain_map: Optional[dict[str, str]] = None) -> Optional[Normalized]:
"""Parse one Claude transcript file into a Normalized bundle.
Returns None if the file has no usable session records.
"""
repo_domain_map = repo_domain_map or {}
records = list(_iter_records(path))
if not records:
return None
session_id: Optional[str] = None
cwd = git_branch = version = model = None
timestamps: list[str] = []
file_is_sidechain = os.path.basename(path).startswith("agent-")
events: list[SessionEvent] = []
blobs: dict[str, str] = {}
uuid_to_seq: dict[str, int] = {}
cost = Cost()
seq = 0
def add_event(uuid: Optional[str], parent_uuid: Optional[str], ts, kind, *,
role=None, tool=None, summary=None, body=None, tokens=0, sidechain=False):
nonlocal seq
s = seq
seq += 1
if uuid:
uuid_to_seq[uuid] = s
parent_seq = uuid_to_seq.get(parent_uuid) if parent_uuid else None
payload_ref = None
if body:
payload_ref = f"blob://{session_id}/{s}"
blobs[payload_ref] = body
events.append(SessionEvent(
session_uid=Session.make_uid(FLAVOR, session_id or "unknown"),
seq=s, parent_seq=parent_seq, ts=ts, kind=kind, role=role, tool=tool,
summary=(summary or "")[:300] or None, payload_ref=payload_ref,
tokens=tokens, is_sidechain=sidechain or file_is_sidechain,
))
for rec in records:
rtype = rec.get("type")
ts = rec.get("timestamp")
if ts:
timestamps.append(ts)
session_id = session_id or rec.get("sessionId")
cwd = cwd or rec.get("cwd")
git_branch = git_branch or rec.get("gitBranch")
version = version or rec.get("version")
uuid = rec.get("uuid")
parent = rec.get("parentUuid")
sidechain = bool(rec.get("isSidechain"))
if rtype == "user":
msg = rec.get("message", {})
for b in _content_blocks(msg):
bt = b.get("type")
if bt == "tool_result":
body = _stringify(b.get("content"))
add_event(uuid, parent, ts, "tool_result", role="tool",
summary="tool result", body=body, sidechain=sidechain)
else:
text = b.get("text", "")
add_event(uuid, parent, ts, "user_msg", role="user",
summary=_first_line(text), body=text, sidechain=sidechain)
elif rtype == "assistant":
msg = rec.get("message", {})
model = model or msg.get("model")
usage = msg.get("usage") or {}
cost.input_tokens += int(usage.get("input_tokens", 0) or 0)
cost.output_tokens += int(usage.get("output_tokens", 0) or 0)
cost.cache_tokens += int(
(usage.get("cache_read_input_tokens", 0) or 0)
+ (usage.get("cache_creation_input_tokens", 0) or 0)
)
out_tokens = int(usage.get("output_tokens", 0) or 0)
for b in _content_blocks(msg):
bt = b.get("type")
if bt == "thinking":
add_event(uuid, parent, ts, "thinking", role="assistant",
summary="thinking", body=b.get("thinking", ""), sidechain=sidechain)
elif bt == "text":
text = b.get("text", "")
add_event(uuid, parent, ts, "assistant_msg", role="assistant",
summary=_first_line(text), body=text, tokens=out_tokens, sidechain=sidechain)
elif bt == "tool_use":
name = b.get("name", "")
inp = b.get("input", {})
body = _stringify(inp)
kind = "tool_call"
if name in _EDIT_TOOLS:
kind = "edit"
elif name == "Bash" and _is_test_command(_stringify(inp.get("command", ""))):
kind = "test_run"
add_event(uuid, parent, ts, kind, role="assistant", tool=name,
summary=f"{name}", body=body, sidechain=sidechain)
elif rtype == "summary":
add_event(uuid, parent, ts, "lifecycle", summary="summary",
body=_stringify(rec.get("summary")), sidechain=sidechain)
# queue-operation / ai-title / last-prompt / attachment: skipped as events
if session_id is None:
return None
cost.turns = sum(1 for e in events if e.kind == "user_msg")
started = min(timestamps) if timestamps else None
ended = max(timestamps) if timestamps else None
cost.wall_clock_s = _seconds_between(started, ended)
repo, domain = _resolve_repo(cwd, repo_domain_map)
session = Session(
session_uid=Session.make_uid(FLAVOR, session_id),
flavor=FLAVOR,
native_session_id=session_id,
repo=repo, domain=domain, cwd=cwd, git_branch=git_branch,
model=model, started_at=started, ended_at=ended,
outcome="unknown", # outcome inference happens in the digest step (T04)
cost=cost,
source_path=path,
source_bytes=os.path.getsize(path) if os.path.exists(path) else 0,
discovered_at=_now(),
)
return Normalized(session=session, events=events, blobs=blobs)
# ---- helpers ---------------------------------------------------------------
def _stringify(v: Any) -> str:
if v is None:
return ""
if isinstance(v, str):
return v
try:
return json.dumps(v, ensure_ascii=False)[:20000]
except (TypeError, ValueError):
return str(v)[:20000]
def _first_line(text: str) -> str:
return (text or "").strip().splitlines()[0] if (text or "").strip() else ""
def _seconds_between(start: Optional[str], end: Optional[str]) -> float:
if not start or not end:
return 0.0
try:
a = datetime.fromisoformat(start.replace("Z", "+00:00"))
b = datetime.fromisoformat(end.replace("Z", "+00:00"))
return max(0.0, (b - a).total_seconds())
except ValueError:
return 0.0
def _now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")

View File

@@ -0,0 +1 @@
"""Flavor-agnostic core: schema, store, cursor, digest, retention."""

View File

@@ -0,0 +1,156 @@
"""Normalized session schema (Tier 1) — design doc §4.
Two record kinds, ``Session`` and ``SessionEvent``, plus the small enums every
adapter targets. Field names here are the stable contract; per-flavor quirks are
absorbed inside each adapter (see design §4.3 native -> kind mapping).
"""
from __future__ import annotations
import json
from dataclasses import asdict, dataclass, field, fields
from typing import Any, Optional
SCHEMA_VERSION = 1
# Supported agent flavors. ``session_uid`` is always "<flavor>:<native id>".
FLAVORS = ("claude", "codex", "grok")
# SessionEvent.kind universe (design §4.2 / §4.3).
KINDS = (
"user_msg",
"assistant_msg",
"thinking",
"tool_call",
"tool_result",
"error",
"test_run",
"edit",
"retry",
"human_intervention",
"decision",
"lifecycle",
"completion",
)
# Session.outcome universe.
OUTCOMES = ("success", "fail", "abandoned", "unknown")
@dataclass
class Cost:
"""Token + effort accounting for a session."""
input_tokens: int = 0
output_tokens: int = 0
cache_tokens: int = 0
wall_clock_s: float = 0.0
turns: int = 0
retries: int = 0
@dataclass
class Session:
"""One bounded run of a coding agent against a repo (design §4.1)."""
session_uid: str # "<flavor>:<native id>" — globally unique
flavor: str
native_session_id: str
repo: Optional[str] = None
domain: Optional[str] = None
cwd: Optional[str] = None
git_branch: Optional[str] = None
model: Optional[str] = None
started_at: Optional[str] = None # ISO-8601 UTC
ended_at: Optional[str] = None
outcome: str = "unknown"
cost: Cost = field(default_factory=Cost)
task_ref: Optional[str] = None
source_path: Optional[str] = None
source_bytes: int = 0
schema_version: int = SCHEMA_VERSION
# watermarks (design §3.1): discovered -> ingested -> analyzed -> evicted
discovered_at: Optional[str] = None
ingested_at: Optional[str] = None
analyzed_at: Optional[str] = None
evicted_at: Optional[str] = None
def __post_init__(self) -> None:
if self.flavor not in FLAVORS:
raise ValueError(f"unknown flavor {self.flavor!r}; expected one of {FLAVORS}")
if self.outcome not in OUTCOMES:
raise ValueError(f"unknown outcome {self.outcome!r}; expected one of {OUTCOMES}")
expected_prefix = f"{self.flavor}:"
if not self.session_uid.startswith(expected_prefix):
raise ValueError(
f"session_uid {self.session_uid!r} must start with {expected_prefix!r}"
)
@property
def is_evictable(self) -> bool:
"""A session may be evicted from Tier 1 only once analyzed (design §3.1)."""
return self.analyzed_at is not None and self.evicted_at is None
@staticmethod
def make_uid(flavor: str, native_session_id: str) -> str:
return f"{flavor}:{native_session_id}"
def to_dict(self) -> dict[str, Any]:
d = asdict(self)
return d
def to_json(self) -> str:
return json.dumps(self.to_dict(), sort_keys=True)
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "Session":
d = dict(d)
cost = d.pop("cost", None)
obj = cls(**{k: v for k, v in d.items() if k in _SESSION_FIELDS})
if cost is not None:
obj.cost = Cost(**{k: v for k, v in cost.items() if k in _COST_FIELDS})
return obj
@classmethod
def from_json(cls, s: str) -> "Session":
return cls.from_dict(json.loads(s))
@dataclass
class SessionEvent:
"""One atomic record within a session (design §4.2)."""
session_uid: str
seq: int # monotonic within session
ts: Optional[str] = None
kind: str = "lifecycle"
parent_seq: Optional[int] = None # turn DAG (Claude); None for flat flavors
role: Optional[str] = None # user|assistant|system|tool
tool: Optional[str] = None # when kind in {tool_call, tool_result}
summary: Optional[str] = None # short, human-readable
payload_ref: Optional[str] = None # pointer to full body in Tier 1 blob store
tokens: int = 0
is_sidechain: bool = False
def __post_init__(self) -> None:
if self.kind not in KINDS:
raise ValueError(f"unknown kind {self.kind!r}; expected one of {KINDS}")
def to_dict(self) -> dict[str, Any]:
return asdict(self)
def to_json(self) -> str:
return json.dumps(self.to_dict(), sort_keys=True)
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "SessionEvent":
return cls(**{k: v for k, v in d.items() if k in _EVENT_FIELDS})
@classmethod
def from_json(cls, s: str) -> "SessionEvent":
return cls.from_dict(json.loads(s))
_SESSION_FIELDS = {f.name for f in fields(Session)}
_COST_FIELDS = {f.name for f in fields(Cost)}
_EVENT_FIELDS = {f.name for f in fields(SessionEvent)}

View File

@@ -0,0 +1,99 @@
"""Claude adapter tests (T02): synthetic fixture + a real on-disk session."""
import glob
import json
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.adapters.claude import parse_session # noqa: E402
REPO_MAP = {"agentic-resources": "helix_forge"}
def _write_jsonl(path, records):
with open(path, "w", encoding="utf-8") as f:
for r in records:
f.write(json.dumps(r) + "\n")
def test_synthetic_session(tmp_path):
p = tmp_path / "11111111-2222-3333-4444-555555555555.jsonl"
_write_jsonl(p, [
{"type": "user", "uuid": "u1", "parentUuid": None,
"timestamp": "2026-06-06T10:00:00Z", "sessionId": "sess-1",
"cwd": "/home/worsch/agentic-resources", "gitBranch": "main",
"version": "1.0", "message": {"role": "user", "content": "fix the bug"}},
{"type": "assistant", "uuid": "a1", "parentUuid": "u1",
"timestamp": "2026-06-06T10:00:05Z", "sessionId": "sess-1",
"message": {"role": "assistant", "model": "claude-opus-4-8",
"usage": {"input_tokens": 100, "output_tokens": 20,
"cache_read_input_tokens": 10},
"content": [
{"type": "thinking", "thinking": "let me look"},
{"type": "text", "text": "I'll edit the file."},
{"type": "tool_use", "name": "Edit",
"input": {"file_path": "x.py", "old_string": "a", "new_string": "b"}},
{"type": "tool_use", "name": "Bash",
"input": {"command": "pytest -q"}},
]}},
{"type": "user", "uuid": "u2", "parentUuid": "a1",
"timestamp": "2026-06-06T10:00:10Z", "sessionId": "sess-1",
"message": {"role": "user",
"content": [{"type": "tool_result", "content": "6 passed"}]}},
])
norm = parse_session(str(p), REPO_MAP)
assert norm is not None
s = norm.session
assert s.session_uid == "claude:sess-1"
assert s.repo == "agentic-resources" and s.domain == "helix_forge"
assert s.model == "claude-opus-4-8"
assert s.cost.input_tokens == 100 and s.cost.output_tokens == 20
assert s.cost.cache_tokens == 10
assert s.cost.turns == 1
assert s.cost.wall_clock_s == 10.0
kinds = [e.kind for e in norm.events]
assert kinds == ["user_msg", "thinking", "assistant_msg", "edit", "test_run", "tool_result"]
# turn DAG: assistant events link back to the first user msg (seq 0)
edit_ev = next(e for e in norm.events if e.kind == "edit")
assert edit_ev.parent_seq == 0
assert edit_ev.tool == "Edit"
# bodies captured as blobs, referenced by payload_ref
assert edit_ev.payload_ref in norm.blobs
assert "x.py" in norm.blobs[edit_ev.payload_ref]
def test_sidechain_filename_marks_events(tmp_path):
p = tmp_path / "agent-deadbeef.jsonl"
_write_jsonl(p, [
{"type": "assistant", "uuid": "a1", "sessionId": "side-1",
"timestamp": "2026-06-06T10:00:00Z",
"message": {"role": "assistant", "content": [{"type": "text", "text": "hi"}]}},
])
norm = parse_session(str(p), REPO_MAP)
assert norm.events[0].is_sidechain is True
def test_real_local_session_if_available():
"""Smoke-parse a real Claude transcript on this workstation, if present."""
base = os.path.expanduser("~/.claude/projects/-home-worsch-agentic-resources")
files = sorted(glob.glob(os.path.join(base, "*.jsonl")))
if not files:
return # environment without local sessions; synthetic tests cover logic
parsed = 0
for fp in files:
norm = parse_session(fp, REPO_MAP)
if norm is None:
continue
parsed += 1
assert norm.session.session_uid.startswith("claude:")
# seq is monotonic and unique
seqs = [e.seq for e in norm.events]
assert seqs == sorted(seqs)
assert len(seqs) == len(set(seqs))
assert parsed >= 1

97
tests/test_schema.py Normal file
View File

@@ -0,0 +1,97 @@
"""Round-trip + validation tests for the normalized schema (T01)."""
import os
import sys
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.core.schema import ( # noqa: E402
SCHEMA_VERSION,
Cost,
Session,
SessionEvent,
)
def _sample_session() -> Session:
return Session(
session_uid=Session.make_uid("claude", "abc-123"),
flavor="claude",
native_session_id="abc-123",
repo="agentic-resources",
domain="helix_forge",
cwd="/home/worsch/agentic-resources",
git_branch="main",
model="claude-opus-4-8",
started_at="2026-06-06T10:00:00Z",
ended_at="2026-06-06T10:15:00Z",
outcome="success",
cost=Cost(input_tokens=100, output_tokens=50, turns=3, retries=1),
task_ref="AGENTIC-WP-0002-T01",
source_path="~/.claude/projects/x/abc-123.jsonl",
source_bytes=2048,
ingested_at="2026-06-06T10:16:00Z",
)
def test_session_round_trip():
s = _sample_session()
restored = Session.from_json(s.to_json())
assert restored == s
assert restored.cost == s.cost
assert restored.schema_version == SCHEMA_VERSION
def test_session_uid_helper_and_prefix_enforced():
assert Session.make_uid("grok", "z9") == "grok:z9"
with pytest.raises(ValueError):
Session(session_uid="codex:wrong", flavor="claude", native_session_id="wrong")
def test_unknown_flavor_and_outcome_rejected():
with pytest.raises(ValueError):
Session(session_uid="x:1", flavor="x", native_session_id="1")
with pytest.raises(ValueError):
Session(
session_uid="claude:1",
flavor="claude",
native_session_id="1",
outcome="bogus",
)
def test_is_evictable_requires_analyzed_not_evicted():
s = _sample_session()
assert s.is_evictable is False # not analyzed yet
s.analyzed_at = "2026-06-06T10:17:00Z"
assert s.is_evictable is True
s.evicted_at = "2026-06-06T11:00:00Z"
assert s.is_evictable is False # already evicted
def test_event_round_trip_and_kind_validation():
e = SessionEvent(
session_uid="claude:abc-123",
seq=4,
parent_seq=3,
ts="2026-06-06T10:01:00Z",
kind="tool_call",
role="assistant",
tool="Bash",
summary="ran pytest -q",
payload_ref="blob://abc-123/4",
tokens=12,
)
assert SessionEvent.from_json(e.to_json()) == e
with pytest.raises(ValueError):
SessionEvent(session_uid="claude:1", seq=0, kind="not_a_kind")
def test_from_dict_ignores_unknown_fields():
d = _sample_session().to_dict()
d["future_field"] = "ignored"
d["cost"]["future_cost"] = 999
restored = Session.from_dict(d)
assert restored.repo == "agentic-resources"

View File

@@ -28,7 +28,7 @@ adapters land in Phase 1.
```task
id: AGENTIC-WP-0002-T01
status: todo
status: done
priority: high
state_hub_task_id: "61297a16-257c-4579-bd1f-3db035781258"
```
@@ -42,7 +42,7 @@ round-trip (de)serialization tests. This is the contract every adapter targets.
```task
id: AGENTIC-WP-0002-T02
status: todo
status: done
priority: high
state_hub_task_id: "3b4e6b35-b4f3-40dc-a845-7ac78aa20d62"
```
@@ -58,7 +58,7 @@ Codex/Grok work in Phase 0 (designed for, not built).
```task
id: AGENTIC-WP-0002-T03
status: todo
status: progress
priority: high
state_hub_task_id: "2387258e-ba6d-4a41-919e-f2f4e0822110"
```