Compare commits

...

7 Commits

Author SHA1 Message Date
4f28cd67cf session-memory: Phase 4 Measure — baseline, effectiveness, trend (WP-0009)
Closes the loop. metrics.py: fleet metrics (infra-overhead share, error rate,
schema-thrash, token percentiles, success) + persisted baseline trend. effect.py:
before/after per-pattern effectiveness with an improved verdict per metric.
measure entrypoint with trend + --since effectiveness + JSON. Recorded pre-fix
baseline: 27 sessions, overhead median 11.7%, error rate 0.96, schema-thrash 8.
13 new tests; suite 139/139. Capture->Detect->Curate->Distribute->Measure complete.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-07 15:49:22 +02:00
035c7a20d3 session-memory: Read-before-Edit reflex + curated pattern (WP-0008)
Acts on the #1 friction finding. T01: added a data-cited Read-before-Edit /
re-read-on-stale reflex to AGENTS.md (top error: 'File has not been read yet',
12/27 sessions). T02: captured it as a curated SolutionPattern
(sp-problem-file_not_read-edit, approved/distribution_ready) with real
resolutions + per-flavor hints, so Distribute proposes it across repos/flavors —
closing assess->curate->distribute on a real pattern. Suite 126/126.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-07 15:27:22 +02:00
59632e94db session-memory: distribute entrypoint + live verify (WP-0007 T05)
python -m session_memory.distribute: reads approved catalog patterns, builds
targets from repo->domain map x flavors, renders scoped per-flavor proposals
(HITL) + active registry. Live verify against the real catalog: 12 renders
across 5 repos, idempotent, provisional skipped. proposals/ gitignored
(regenerated); active_patterns.json committed. README documents detect->curate->
distribute. Phase 3 finished; suite 126/126.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-07 15:25:20 +02:00
00e8958540 session-memory: scoping + proposals + active registry (WP-0007 T04)
distribute/proposals.py: Scope-aware targeting (FR-X2, empty axis = any), render
distributable (approved+distribution_ready) patterns into a proposals/ tree
mirroring target paths — proposed not applied (FR-X3, HITL), idempotent on re-run.
ActiveRegistry (FR-X4) records which pattern+version is proposed in which
(repo,flavor). 6 new tests; suite 123/123.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-07 15:09:40 +02:00
9e28b1b806 session-memory: Claude + Codex + Grok distributors + registry (WP-0007 T02/T03)
Thin per-flavor distributors over the shared base: Claude (CLAUDE.md, optional
skill-stub mode), Codex (AGENTS.md), Grok (.grok/instructions.md). registry maps
flavor->distributor — adding a flavor is one entry + one module. Same agnostic
body renders to distinct per-flavor targets (FR-A3). 7 new tests; suite 117/117.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-07 15:06:15 +02:00
7646cbc358 session-memory: distributor base + Artifact (WP-0007 T01)
distribute/base.py: Artifact dataclass + Distributor protocol + idempotent
BEGIN/END snippet markers (upsert_block replaces a pattern's block in place so
re-distribution doesn't duplicate) + agnostic markdown body rendering from
SolutionPattern fields. BaseDistributor honours per-flavor body/target hints.
8 new tests; suite 110/110.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-07 15:02:47 +02:00
9e6f8a6e08 Register WP-0007 (Distribute), WP-0008 (Read-before-Edit), WP-0009 (Measure)
Three workplans queued and registered with the State Hub (via REST — MCP write
layer is erroring this session):
- AGENTIC-WP-0007 Phase 3 Distribute: per-flavor distributor adapters render
  approved catalog patterns into proposed (HITL) artifacts, scoped by repo/domain.
- AGENTIC-WP-0008 Read-before-Edit reflex: act on the #1 friction finding.
- AGENTIC-WP-0009 Phase 4 Measure: baseline + before/after effectiveness + trend.
Proceeding in that order.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-07 14:58:03 +02:00
30 changed files with 1713 additions and 1 deletions

2
.gitignore vendored
View File

@@ -177,6 +177,8 @@ cython_debug/
# session-memory local store
session_memory/.store/
# generated per-flavor distribution proposals (HITL, regenerated each run)
session_memory/proposals/
__pycache__/
*.pyc
.pytest_cache/

View File

@@ -34,6 +34,18 @@ green), and for pipeline changes do a live `ingest → detect → curate` pass a
the local store. See `session_memory/README.md` for the full layout and the
detect → curate → distribute flow.
### Editing files — Read before you Edit
**Read a file (or the region you'll touch) before Edit/Write.** The most common
error across our own captured coding sessions was *"File has not been read yet.
Read it first before writing to it"* — 12 of 27 real sessions, 8 repos
(`docs/ASSESSMENT-infra-friction.md`). Two cheap reflexes eliminate it:
- **Read → then Edit/Write.** Don't blind-write a file you haven't read this
session; the edit tools reject it and the retry wastes a turn.
- **On `File has been modified since read`, re-Read then re-Edit.** A stale read
means the file changed under you — refresh before retrying, don't loop.
---
## State Hub Integration

View File

@@ -33,6 +33,15 @@ session_memory/
curate/decisions.py # hub decision audit trail (graceful local-queue fallback)
curate/__main__.py # python -m session_memory.curate (interactive / --auto-approve)
catalog/ # the committed Pattern Catalog (source of truth)
distribute/base.py # Artifact + Distributor protocol + idempotent snippet markers
distribute/claude.py # CLAUDE.md (or skill) renderer } per-flavor edges
distribute/codex.py # AGENTS.md renderer } (agnostic body,
distribute/grok.py # native instruction renderer } different targets)
distribute/proposals.py # scoping + proposed-not-applied output + active registry
distribute/__main__.py # python -m session_memory.distribute
measure/metrics.py # fleet metrics + persisted baseline snapshots
measure/effect.py # before/after per-pattern effectiveness
measure/__main__.py # python -m session_memory.measure
config.toml # store paths, retention caps, sources, repo->domain map, curate gate
```
@@ -114,6 +123,46 @@ python -m session_memory.curate --json # machine-readable result
| `dist_require_cross_flavor` | require cross-flavor evidence to be distribution-eligible |
| `dist_min_frequency` / `dist_min_cost_impact` | stricter floor for `distribution_ready` |
## Distribute patterns as per-flavor proposals
Render approved catalog patterns into per-flavor artifacts — **proposed, never
auto-applied** (HITL). Completes the loop: **detect → curate → distribute**.
```bash
python -m session_memory.distribute # proposals for all repos/flavors
python -m session_memory.distribute --repo state-hub --flavor claude
python -m session_memory.distribute --json
```
- Only `approved` + `distribution_ready` patterns are rendered; each pattern's
`Scope` (repos/domains/flavors) decides where it lands (FR-X2).
- Each flavor renders the **same agnostic body** to its own target (Claude →
`CLAUDE.md`/skill, Codex → `AGENTS.md`, Grok → native) via `rendering_hints`
(FR-A3); blocks carry stable `BEGIN/END` markers so re-running updates in place.
- Output goes to `session_memory/proposals/<repo>/<target>` (gitignored,
regenerated) — a reviewable diff a human applies (FR-X3). The committed
`distribute/active_patterns.json` records which pattern+version is proposed in
which `(repo, flavor)` (FR-X4).
## Measure effectiveness (closing the loop)
Track whether the fleet is getting cheaper / more reliable, and whether a
distributed pattern actually helped.
```bash
python -m session_memory.measure --label "baseline" # snapshot + trend
python -m session_memory.measure --since 2026-06-07 # before/after a change
python -m session_memory.measure --no-save --json
```
- A **snapshot** (infra-overhead share, error rate, schema-thrash, token
percentiles, success rate) is appended to `measure/baselines.jsonl` to build a
trend (FR-M3).
- `--since DATE` splits sessions before/after a change and diffs the metrics, with
an `improved` verdict per metric (FR-M1/FR-M2) — so ineffective patterns can be
retired. Recorded pre-fix baseline (2026-06-07): 27 sessions, infra-overhead
median 11.7 %, error rate 0.96, schema-thrash 8 sessions.
## Retention knobs (`[retention]` in config.toml)
| Key | Meaning |
@@ -141,4 +190,12 @@ python -m pytest # schema, adapters, store, digest, retention, ingest,
- **Phase 2** (AGENTIC-WP-0004): Curate — Solution Pattern schema, versioned
files-first Pattern Catalog, discuss/approve/reject review with an evidence bar +
bloat guard, and hub-decision audit trail.
- **Next — Phase 3 (Distribute) / Phase 4 (Measure)** follow per the PRD.
- **Detect hardening** (AGENTIC-WP-0005): session-quality filter + tool-mix /
infra-overhead signals. **Error mining** (AGENTIC-WP-0006): recurring error
fingerprints → root-cause patterns.
- **Phase 3** (AGENTIC-WP-0007): Distribute — per-flavor distributor adapters
render approved patterns into proposed (HITL) artifacts, scoped by repo/domain,
with an active-pattern registry.
- **Phase 4** (AGENTIC-WP-0009): Measure — fleet baseline/trend + before/after
per-pattern effectiveness. The Capture → Detect → Curate → Distribute → Measure
loop is closed.

View File

@@ -0,0 +1,58 @@
{
"created_at": "2026-06-07T13:26:25Z",
"distribution_ready": true,
"id": "sp-problem-file_not_read-edit",
"name": "Read before you Edit",
"polarity": "problem",
"problem": "Agents call Edit/Write on a file they have not read in the current session, or after it changed under them. The edit tools reject this ('File has not been read yet' / 'File has been modified since read'), and the retry burns a turn. Top recurring error in the corpus (12/27 sessions, 8 repos).",
"provenance": {
"detected_at": null,
"evidence": {
"frequency": 32,
"origin": "AGENTIC-WP-0006 error mining / ASSESSMENT-infra-friction.md",
"polarity": "problem",
"repos": 8,
"sessions": 12
},
"promoted_at": null,
"source_key": "problem:file_not_read:edit"
},
"rendering_hints": {
"claude": {
"target": "CLAUDE.md"
},
"codex": {
"target": "AGENTS.md"
},
"grok": {
"target": ".grok/instructions.md"
}
},
"resolutions": [
{
"detail": "Never blind-write a file you haven't read this session.",
"steps": [
"Read the target file",
"Then Edit/Write"
],
"summary": "Read the file (or the region you'll touch) before Edit/Write"
},
{
"detail": "A stale read means the file changed under you; refresh, don't loop.",
"steps": [
"Re-Read the file",
"Re-apply the Edit"
],
"summary": "On 'modified since read', re-Read then re-Edit"
}
],
"schema_version": 1,
"scope": {
"domains": [],
"flavors": [],
"repos": []
},
"status": "approved",
"updated_at": "2026-06-07T13:26:25Z",
"version": "1.0.0"
}

View File

@@ -39,6 +39,16 @@ min_substantive = 3 # require >= this many substantive (edit/read/shell) tool
min_prompt_len = 25 # first prompt shorter than this is treated as trivial
# Curate phase (AGENTIC-WP-0004): catalog location + promotion evidence bar.
# Measure phase (AGENTIC-WP-0009): persisted baseline/trend of fleet metrics.
[measure]
baselines = "session_memory/measure/baselines.jsonl" # timestamped metric snapshots (committed)
# Distribute phase (AGENTIC-WP-0007): where per-flavor proposals + the active
# registry are written. Proposals are HITL — reviewed, never auto-applied.
[distribute]
proposals_dir = "session_memory/proposals" # reviewable proposals (gitignored, regenerated)
active_registry = "session_memory/distribute/active_patterns.json" # what's proposed/active where (committed)
[curate]
catalog_dir = "session_memory/catalog" # files-first Pattern Catalog (committed)
review_log = "session_memory/.store/reviews.jsonl" # remembered decisions (gitignored)

View File

@@ -0,0 +1,9 @@
"""Distribute phase (PRD §6.4) — render approved Solution Patterns into per-flavor
artifacts. Mirror of the collector design: agnostic core, thin distributor edges.
base.py Artifact + Distributor protocol + idempotent snippet markers (T01)
claude.py CLAUDE.md snippet distributor (T02)
codex.py AGENTS.md snippet distributor (T03)
grok.py native instruction distributor (T03)
__main__.py `python -m session_memory.distribute` (T05)
"""

View File

@@ -0,0 +1,89 @@
"""Distribute entrypoint (T05): catalog -> per-flavor proposals (HITL).
python -m session_memory.distribute [--config PATH] [--repo R] [--flavor F] [--json]
Reads approved / distribution-ready Solution Patterns from the Pattern Catalog and
renders them into per-flavor **proposals** (never auto-applied) scoped by
repo/domain, recording what is proposed where in the active-pattern registry.
Targets are the repo->domain map in ``config.toml`` crossed with the known
distributor flavors; each pattern's own ``Scope`` filters where it actually lands.
"""
from __future__ import annotations
import argparse
import json
import os
from ..curate.catalog import Catalog
from ..ingest import _expand, load_config
from .proposals import ActiveRegistry, Target, propose
from .registry import all_flavors
def build_targets(config: dict, repo_filter=None, flavor_filter=None) -> list[Target]:
repo_map = config.get("repo_domain_map", {})
flavors = [flavor_filter] if flavor_filter else all_flavors()
targets = []
for repo, domain in repo_map.items():
if repo_filter and repo != repo_filter:
continue
for flavor in flavors:
targets.append(Target(repo=repo, domain=domain, flavor=flavor))
return targets
def run_distribute(config: dict, *, repo_filter=None, flavor_filter=None):
cur = config.get("curate", {})
dist = config.get("distribute", {})
catalog = Catalog(_expand(cur.get("catalog_dir", "session_memory/catalog")))
patterns = catalog.list()
targets = build_targets(config, repo_filter, flavor_filter)
registry = ActiveRegistry(_expand(dist.get("active_registry",
"session_memory/distribute/active_patterns.json")))
out_dir = _expand(dist.get("proposals_dir", "session_memory/proposals"))
return propose(patterns, targets, out_dir, registry)
def _summary(res) -> str:
by_repo = {}
for repo, flavor, pid, _ in res.proposals:
by_repo.setdefault(repo, []).append(f"{pid}[{flavor}]")
lines = [f"# Distribute proposals ({len(res.proposals)} renders, "
f"{len(res.files_written)} files)"]
for repo in sorted(by_repo):
lines.append(f" {repo}: {', '.join(sorted(by_repo[repo]))}")
if res.skipped_not_distributable:
lines.append(f" skipped (not distribution-ready): "
f"{len(set(res.skipped_not_distributable))} pattern(s)")
if not res.proposals:
lines.append(" (no approved/distribution-ready patterns matched any target)")
return "\n".join(lines)
def main(argv=None) -> int:
here = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
ap = argparse.ArgumentParser(description="Distribute approved patterns as per-flavor proposals.")
ap.add_argument("--config", default=os.path.join(here, "config.toml"))
ap.add_argument("--repo", default=None, help="limit to one target repo")
ap.add_argument("--flavor", default=None, help="limit to one flavor")
ap.add_argument("--json", action="store_true")
args = ap.parse_args(argv)
config = load_config(args.config)
res = run_distribute(config, repo_filter=args.repo, flavor_filter=args.flavor)
if args.json:
print(json.dumps({
"proposals": [{"repo": r, "flavor": f, "pattern_id": p, "path": path}
for r, f, p, path in res.proposals],
"files_written": res.files_written,
"skipped": sorted(set(res.skipped_not_distributable)),
}, indent=2))
else:
print(_summary(res))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,106 @@
[
{
"flavor": "claude",
"pattern_id": "sp-problem-file_not_read-edit",
"repo": "agentic-resources",
"status": "proposed",
"updated_at": "2026-06-07T13:26:26Z",
"version": "1.0.0"
},
{
"flavor": "claude",
"pattern_id": "sp-problem-schema_thrash-schema_load",
"repo": "ops-bridge",
"status": "proposed",
"updated_at": "2026-06-07T13:20:58Z",
"version": "1.0.0"
},
{
"flavor": "claude",
"pattern_id": "sp-problem-tool_thrash-tool-bash",
"repo": "state-hub",
"status": "proposed",
"updated_at": "2026-06-07T13:20:58Z",
"version": "1.0.0"
},
{
"flavor": "claude",
"pattern_id": "sp-success-clean_pass-outcome",
"repo": "agentic-resources",
"status": "proposed",
"updated_at": "2026-06-07T13:26:26Z",
"version": "1.0.0"
},
{
"flavor": "grok",
"pattern_id": "sp-success-clean_pass-outcome",
"repo": "agentic-resources",
"status": "proposed",
"updated_at": "2026-06-07T13:20:58Z",
"version": "1.0.0"
},
{
"flavor": "claude",
"pattern_id": "sp-success-clean_pass-outcome",
"repo": "can-you-assist",
"status": "proposed",
"updated_at": "2026-06-07T13:20:58Z",
"version": "1.0.0"
},
{
"flavor": "grok",
"pattern_id": "sp-success-clean_pass-outcome",
"repo": "can-you-assist",
"status": "proposed",
"updated_at": "2026-06-07T13:20:58Z",
"version": "1.0.0"
},
{
"flavor": "claude",
"pattern_id": "sp-success-clean_pass-outcome",
"repo": "ops-bridge",
"status": "proposed",
"updated_at": "2026-06-07T13:20:58Z",
"version": "1.0.0"
},
{
"flavor": "grok",
"pattern_id": "sp-success-clean_pass-outcome",
"repo": "ops-bridge",
"status": "proposed",
"updated_at": "2026-06-07T13:20:58Z",
"version": "1.0.0"
},
{
"flavor": "claude",
"pattern_id": "sp-success-clean_pass-outcome",
"repo": "state-hub",
"status": "proposed",
"updated_at": "2026-06-07T13:20:58Z",
"version": "1.0.0"
},
{
"flavor": "grok",
"pattern_id": "sp-success-clean_pass-outcome",
"repo": "state-hub",
"status": "proposed",
"updated_at": "2026-06-07T13:20:58Z",
"version": "1.0.0"
},
{
"flavor": "claude",
"pattern_id": "sp-success-clean_pass-outcome",
"repo": "the-custodian",
"status": "proposed",
"updated_at": "2026-06-07T13:20:58Z",
"version": "1.0.0"
},
{
"flavor": "grok",
"pattern_id": "sp-success-clean_pass-outcome",
"repo": "the-custodian",
"status": "proposed",
"updated_at": "2026-06-07T13:20:58Z",
"version": "1.0.0"
}
]

View File

@@ -0,0 +1,115 @@
"""Distributor base — Artifact, the Distributor protocol, and idempotent markers
(PRD §6.4 FR-X1; T01).
A **distributor** turns one agnostic :class:`SolutionPattern` into a per-flavor
:class:`Artifact` (a target path + a snippet of content). Everything flavor-neutral
lives here; each flavor adapter (T02/T03) only supplies its target filename and may
override the rendered body using the pattern's ``rendering_hints``.
Snippets carry stable ``BEGIN/END`` markers keyed on the pattern id, so
re-distributing a pattern **updates its block in place** instead of duplicating it
— the property that lets Distribute run repeatedly (HITL) without drift.
"""
from __future__ import annotations
import re
from dataclasses import dataclass
from typing import Any, Optional, Protocol, runtime_checkable
from ..curate.schema import SolutionPattern
@dataclass
class Artifact:
"""A proposed per-flavor rendering of a pattern (FR-X1/FR-X3 — proposed, not applied)."""
flavor: str
target_path: str # repo-relative file the snippet belongs in (e.g. "CLAUDE.md")
pattern_id: str
content: str # the marker-wrapped snippet block
@runtime_checkable
class Distributor(Protocol):
flavor: str
target_path: str
def render(self, pattern: SolutionPattern) -> Artifact: ...
# --- idempotent snippet markers ---------------------------------------------
_MARK = "helix-forge pattern"
def begin_marker(pattern_id: str) -> str:
return f"<!-- BEGIN {_MARK}:{pattern_id} -->"
def end_marker(pattern_id: str) -> str:
return f"<!-- END {_MARK}:{pattern_id} -->"
def wrap_block(pattern_id: str, body: str, version: str = "") -> str:
"""Wrap a rendered body in stable BEGIN/END markers."""
ver = f" v{version}" if version else ""
return f"{begin_marker(pattern_id)}{ver}\n{body.strip()}\n{end_marker(pattern_id)}"
def upsert_block(doc_text: str, pattern_id: str, block: str) -> str:
"""Insert or replace a pattern's marked block within a document (idempotent)."""
pat = re.compile(
re.escape(begin_marker(pattern_id)) + r".*?" + re.escape(end_marker(pattern_id)),
re.DOTALL,
)
if pat.search(doc_text):
return pat.sub(block, doc_text)
sep = "" if doc_text.endswith("\n\n") or not doc_text else "\n\n"
return f"{doc_text}{sep}{block}\n"
# --- agnostic body rendering ------------------------------------------------
def render_markdown_body(pattern: SolutionPattern) -> str:
"""Default flavor-neutral snippet body from the agnostic pattern fields."""
label = "Avoid" if pattern.polarity == "problem" else "Prefer"
lines = [f"### {pattern.name}", "", pattern.problem.strip(), ""]
if pattern.resolutions:
lines.append(f"**{label}:**")
for r in pattern.resolutions:
detail = f"{r.detail}" if r.detail else ""
lines.append(f"- {r.summary}{detail}")
for step in r.steps:
lines.append(f" - {step}")
return "\n".join(lines).strip()
def hint(pattern: SolutionPattern, flavor: str, key: str, default: Any = None) -> Any:
"""Read a per-flavor rendering hint, falling back to ``default``."""
return (pattern.rendering_hints.get(flavor) or {}).get(key, default)
class BaseDistributor:
"""Shared distributor: renders the agnostic body, honouring a ``body`` hint
override and a ``target`` hint, then wraps it in idempotent markers."""
flavor: str = ""
target_path: str = ""
def __init__(self, flavor: Optional[str] = None, target_path: Optional[str] = None) -> None:
if flavor is not None:
self.flavor = flavor
if target_path is not None:
self.target_path = target_path
def body(self, pattern: SolutionPattern) -> str:
return hint(pattern, self.flavor, "body") or render_markdown_body(pattern)
def target(self, pattern: SolutionPattern) -> str:
return hint(pattern, self.flavor, "target") or self.target_path
def render(self, pattern: SolutionPattern) -> Artifact:
block = wrap_block(pattern.id, self.body(pattern), pattern.version)
return Artifact(flavor=self.flavor, target_path=self.target(pattern),
pattern_id=pattern.id, content=block)

View File

@@ -0,0 +1,42 @@
"""Claude distributor (PRD §6.4 FR-X1; T02).
Renders an approved Solution Pattern into a ``CLAUDE.md`` snippet block. Most logic
is inherited from :class:`BaseDistributor`; the Claude-specific touch is an
optional **skill** rendering mode (``rendering_hints["claude"]["as"] == "skill"``)
that emits a skill-style stub instead of a plain instruction snippet — Claude's
native distribution targets are CLAUDE.md snippets, skills, or hooks.
"""
from __future__ import annotations
from ..curate.schema import SolutionPattern
from .base import BaseDistributor, hint, render_markdown_body
class ClaudeDistributor(BaseDistributor):
flavor = "claude"
target_path = "CLAUDE.md"
def body(self, pattern: SolutionPattern) -> str:
override = hint(pattern, self.flavor, "body")
if override:
return override
if hint(pattern, self.flavor, "as") == "skill":
return self._skill_stub(pattern)
return render_markdown_body(pattern)
@staticmethod
def _skill_stub(pattern: SolutionPattern) -> str:
trigger = "avoid" if pattern.polarity == "problem" else "apply"
lines = [
f"## Skill: {pattern.name}",
"",
f"**When:** situations where you would {trigger}{pattern.problem.strip()}",
"",
"**Steps:**",
]
for r in pattern.resolutions:
lines.append(f"- {r.summary}" + (f"{r.detail}" if r.detail else ""))
for step in r.steps:
lines.append(f" - {step}")
return "\n".join(lines).strip()

View File

@@ -0,0 +1,15 @@
"""Codex distributor (PRD §6.4 FR-X1; T03).
Renders an approved Solution Pattern into an ``AGENTS.md`` snippet — Codex's native
repo-convention surface. Identical agnostic body to the other flavors (FR-A3: one
pattern, expressible everywhere); only the target file differs.
"""
from __future__ import annotations
from .base import BaseDistributor
class CodexDistributor(BaseDistributor):
flavor = "codex"
target_path = "AGENTS.md"

View File

@@ -0,0 +1,15 @@
"""Grok distributor (PRD §6.4 FR-X1; T03).
Renders an approved Solution Pattern into Grok's native instruction format. Defaults
to a ``.grok/instructions.md`` snippet; the same agnostic body as the other flavors
(FR-A3), overridable via ``rendering_hints["grok"]``.
"""
from __future__ import annotations
from .base import BaseDistributor
class GrokDistributor(BaseDistributor):
flavor = "grok"
target_path = ".grok/instructions.md"

View File

@@ -0,0 +1,136 @@
"""Scoping, proposed-not-applied output, and the active-pattern registry
(PRD §6.4 FR-X2/FR-X3/FR-X4; T04).
* **Scope (FR-X2):** a pattern lands in a target environment only if the target's
repo/domain/flavor are within the pattern's :class:`Scope` (an empty scope list
means "unrestricted on that axis").
* **Proposed, not applied (FR-X3):** rendered artifacts are written under a
``proposals/`` tree mirroring the target path — a reviewable diff a human applies,
never auto-written into the live file. Re-running upserts each pattern's block in
place (idempotent), so proposals don't accumulate duplicates.
* **Active-pattern registry (FR-X4):** a JSON record of which pattern (and version)
is proposed/active in which (repo, flavor) environment.
"""
from __future__ import annotations
import json
import os
from dataclasses import dataclass
from datetime import datetime, timezone
from ..curate.schema import SolutionPattern
from .base import upsert_block
from .registry import get_distributor
def _now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
@dataclass(frozen=True)
class Target:
"""An environment a pattern could be distributed to."""
repo: str
domain: str = ""
flavor: str = "claude"
def applies(pattern: SolutionPattern, target: Target) -> bool:
"""True if ``target`` is within the pattern's scope (empty axis == any)."""
sc = pattern.scope
if sc.repos and target.repo not in sc.repos:
return False
if sc.domains and target.domain and target.domain not in sc.domains:
return False
if sc.flavors and target.flavor not in sc.flavors:
return False
return True
def is_distributable(pattern: SolutionPattern) -> bool:
return pattern.status == "approved" and pattern.distribution_ready
class ActiveRegistry:
"""JSON record of patterns proposed/active per (repo, flavor) — FR-X4."""
def __init__(self, path: str) -> None:
self.path = path
self._entries: dict[str, dict] = {}
if os.path.exists(path):
with open(path, encoding="utf-8") as fh:
for e in json.load(fh):
self._entries[self._key(e["pattern_id"], e["repo"], e["flavor"])] = e
@staticmethod
def _key(pid: str, repo: str, flavor: str) -> str:
return f"{pid}|{repo}|{flavor}"
def record(self, pid: str, repo: str, flavor: str, version: str,
status: str = "proposed") -> None:
self._entries[self._key(pid, repo, flavor)] = {
"pattern_id": pid, "repo": repo, "flavor": flavor,
"version": version, "status": status, "updated_at": _now(),
}
def entries(self) -> list[dict]:
return [self._entries[k] for k in sorted(self._entries)]
def save(self) -> None:
os.makedirs(os.path.dirname(self.path) or ".", exist_ok=True)
with open(self.path, "w", encoding="utf-8") as fh:
json.dump(self.entries(), fh, indent=2, sort_keys=True)
fh.write("\n")
@dataclass
class ProposalResult:
proposals: list = None # (repo, flavor, pattern_id, proposal_path)
files_written: list = None # absolute proposal paths
skipped_not_distributable: list = None # pattern ids
def __post_init__(self):
self.proposals = self.proposals or []
self.files_written = self.files_written or []
self.skipped_not_distributable = self.skipped_not_distributable or []
def propose(patterns: list[SolutionPattern], targets: list[Target], out_dir: str,
registry: ActiveRegistry) -> ProposalResult:
"""Render in-scope, distributable patterns into per-target proposal files."""
result = ProposalResult()
pending: dict[str, str] = {} # proposal path -> accumulated content
for p in patterns:
if not is_distributable(p):
result.skipped_not_distributable.append(p.id)
continue
for t in targets:
dist = get_distributor(t.flavor)
if dist is None or not applies(p, t):
continue
art = dist.render(p)
path = os.path.join(out_dir, t.repo, art.target_path)
if path not in pending:
pending[path] = _read(path)
pending[path] = upsert_block(pending[path], p.id, art.content)
registry.record(p.id, t.repo, t.flavor, p.version)
result.proposals.append((t.repo, t.flavor, p.id, path))
for path, content in pending.items():
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as fh:
fh.write(content if content.endswith("\n") else content + "\n")
result.files_written.append(path)
registry.save()
return result
def _read(path: str) -> str:
if os.path.exists(path):
with open(path, encoding="utf-8") as fh:
return fh.read()
return ""

View File

@@ -0,0 +1,26 @@
"""Distributor registry (T03) — flavor -> distributor, the one place that knows
about all flavor edges. Adding a flavor = one entry here + one adapter module.
"""
from __future__ import annotations
from typing import Optional
from .base import BaseDistributor
from .claude import ClaudeDistributor
from .codex import CodexDistributor
from .grok import GrokDistributor
_REGISTRY: dict[str, BaseDistributor] = {
"claude": ClaudeDistributor(),
"codex": CodexDistributor(),
"grok": GrokDistributor(),
}
def get_distributor(flavor: str) -> Optional[BaseDistributor]:
return _REGISTRY.get(flavor)
def all_flavors() -> list[str]:
return list(_REGISTRY)

View File

@@ -0,0 +1,9 @@
"""Measure phase (PRD §6.5) — the loop-closer.
metrics.py fleet metrics + persisted baseline snapshots (T01)
effect.py before/after per-pattern effectiveness (T02)
__main__.py python -m session_memory.measure (T03)
Computation over existing digests (reusing WP-0005 tool buckets + WP-0006 error
mining); no new capture.
"""

View File

@@ -0,0 +1,101 @@
"""Measure entrypoint (T03): fleet trend + per-pattern effectiveness.
python -m session_memory.measure [--config PATH] [--label L] [--since DATE]
[--no-save] [--json]
Computes current fleet metrics over the real (quality-filtered) sessions, appends
them to the baseline trend, and reports whether the fleet is getting cheaper /
more reliable over time (FR-M3). With ``--since DATE`` it also reports before/after
effectiveness around a change (FR-M1/FR-M2).
"""
from __future__ import annotations
import argparse
import json
import os
from ..core.store import Store
from ..detect.quality import filter_real, quality_config
from ..ingest import _expand, load_config
from .effect import effectiveness
from .metrics import load_baselines, save_baseline, snapshot
_TREND_KEYS = ("infra_overhead_share_median", "error_rate", "schema_thrash_sessions",
"tokens_p50", "success_rate")
def real_digests(config: dict) -> list[dict]:
s = config.get("store", {})
store = Store(_expand(s["db_path"]), _expand(s["blob_dir"]))
out = filter_real(store.list_digests(), quality_config(config))
store.close()
return out
def _fmt_trend(baselines: list[dict]) -> str:
if not baselines:
return " (no prior snapshots)"
lines = []
recent = baselines[-5:]
for b in recent:
when = (b.get("captured_at") or "")[:10]
lbl = f" {b['label']}" if b.get("label") else ""
lines.append(f" {when}{lbl}: overhead_med={b.get('infra_overhead_share_median')} "
f"err_rate={b.get('error_rate')} schema_thrash={b.get('schema_thrash_sessions')} "
f"tok_p50={b.get('tokens_p50')} success={b.get('success_rate')} "
f"(n={b.get('n_sessions')})")
return "\n".join(lines)
def _report(current: dict, baselines: list[dict], eff: dict | None) -> str:
lines = [f"# Fleet metrics (n={current.get('n_sessions')} real sessions)"]
for k in _TREND_KEYS:
lines.append(f" {k} = {current.get(k)}")
lines.append("\n## Trend (recent snapshots)")
lines.append(_fmt_trend(baselines))
if eff is not None:
lines.append(f"\n## Effectiveness since {eff['applied_at']} "
f"(before={eff['n_before']}, after={eff['n_after']})")
if eff["insufficient_data"]:
lines.append(" insufficient data on one side of the date")
else:
for k in _TREND_KEYS:
d = eff["deltas"].get(k, {})
mark = {True: "improved", False: "worse", None: ""}[d.get("improved")]
lines.append(f" {k}: {d.get('before')} -> {d.get('after')} "
f"({d.get('change'):+}) {mark}")
return "\n".join(lines)
def main(argv=None) -> int:
here = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
ap = argparse.ArgumentParser(description="Measure fleet metrics + per-pattern effectiveness.")
ap.add_argument("--config", default=os.path.join(here, "config.toml"))
ap.add_argument("--label", default="")
ap.add_argument("--since", default=None, help="ISO date for before/after effectiveness")
ap.add_argument("--no-save", action="store_true", help="don't append to the baseline trend")
ap.add_argument("--json", action="store_true")
args = ap.parse_args(argv)
config = load_config(args.config)
digests = real_digests(config)
current = snapshot(digests, label=args.label)
path = _expand(config.get("measure", {}).get("baselines", "session_memory/measure/baselines.jsonl"))
prior = load_baselines(path)
if not args.no_save:
save_baseline(current, path)
eff = effectiveness(digests, args.since, label=args.label) if args.since else None
if args.json:
print(json.dumps({"current": current, "trend": prior + [current], "effectiveness": eff},
indent=2))
else:
print(_report(current, prior + [current], eff))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1 @@
{"captured_at": "2026-06-07T13:30:14Z", "error_rate": 0.963, "infra_overhead_share_median": 0.117, "infra_overhead_share_p90": 0.261, "label": "phase4-baseline (pre-fixes)", "n_sessions": 27, "recurring_error_occurrences": 505, "schema_thrash_sessions": 8, "success_rate": 1.0, "tokens_p50": 250725, "tokens_p90": 1423966}

View File

@@ -0,0 +1,60 @@
"""Before/after per-pattern effectiveness (PRD §6.5 FR-M1/FR-M2; T02).
Given a change/pattern with an ``applied_at`` date, split sessions into *before*
and *after* by their start time, aggregate each side, and diff the headline
metrics — so we can say whether a distributed pattern (e.g. the Read-before-Edit
reflex, or the State Hub skill) actually moved the numbers, and retire it if not.
"""
from __future__ import annotations
from .metrics import aggregate
# Metrics where a *lower* value after the change means improvement.
_LOWER_IS_BETTER = {
"infra_overhead_share_median", "infra_overhead_share_p90", "error_rate",
"recurring_error_occurrences", "schema_thrash_sessions", "tokens_p50", "tokens_p90",
}
# Metrics where a *higher* value is improvement.
_HIGHER_IS_BETTER = {"success_rate"}
def split_by_date(digests: list[dict], applied_at: str) -> tuple[list[dict], list[dict]]:
"""Partition digests into (before, after) by ``started_at`` vs ``applied_at``."""
before, after = [], []
for d in digests:
ts = d.get("started_at") or ""
(after if ts and ts >= applied_at else before).append(d)
return before, after
def _delta(metric: str, before: float, after: float) -> dict:
change = round(after - before, 3)
if metric in _LOWER_IS_BETTER:
improved = change < 0
elif metric in _HIGHER_IS_BETTER:
improved = change > 0
else:
improved = None
return {"before": before, "after": after, "change": change, "improved": improved}
def effectiveness(digests: list[dict], applied_at: str, *, label: str = "") -> dict:
"""Compare fleet metrics after ``applied_at`` against the prior period."""
before, after = split_by_date(digests, applied_at)
b_agg, a_agg = aggregate(before), aggregate(after)
metrics = (_LOWER_IS_BETTER | _HIGHER_IS_BETTER)
deltas = {}
if before and after:
for m in metrics:
deltas[m] = _delta(m, b_agg.get(m, 0.0), a_agg.get(m, 0.0))
return {
"label": label,
"applied_at": applied_at,
"n_before": len(before),
"n_after": len(after),
"before": b_agg,
"after": a_agg,
"deltas": deltas,
"insufficient_data": not (before and after),
}

View File

@@ -0,0 +1,102 @@
"""Fleet metrics + persisted baselines (PRD §6.5 FR-M3; T01).
Computes the headline health metrics of the captured corpus — the same quantities
the friction assessment reported — so they can be tracked over time and compared
before/after a change. Reuses :func:`detect.signals.tool_bucket` (WP-0005) and the
digest ``error_snippets`` (WP-0006); no new capture.
A **baseline** is a timestamped metrics snapshot appended to a JSONL file, so
successive runs build a trend the entrypoint (T03) can chart.
"""
from __future__ import annotations
import collections
import json
import os
from datetime import datetime, timezone
from ..detect.signals import tool_bucket
def _now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _pct(values: list[float], q: float) -> float:
if not values:
return 0.0
s = sorted(values)
return round(s[int(q * (len(s) - 1))], 3)
def _median(values: list[float]) -> float:
return _pct(values, 0.5)
def _buckets(digest: dict) -> collections.Counter:
b: collections.Counter = collections.Counter()
for tool, n in (digest.get("tool_histogram") or {}).items():
b[tool_bucket(tool)] += n
return b
def session_metrics(digest: dict) -> dict:
"""Per-session metrics used to build fleet aggregates."""
b = _buckets(digest)
total = sum(b.values()) or 1
overhead = b["statehub_mcp"] + b["task_mgmt"] + b["schema_load"]
cost = digest.get("cost", {})
tokens = cost.get("input_tokens", 0) + cost.get("output_tokens", 0)
return {
"infra_overhead_share": overhead / total,
"tool_calls": total,
"schema_load": b["schema_load"],
"error_occurrences": sum(s.get("count", 1) for s in (digest.get("error_snippets") or [])),
"has_error": bool(digest.get("error_snippets")),
"tokens": tokens,
"success": digest.get("outcome") == "success",
}
def aggregate(digests: list[dict], *, schema_thrash_threshold: int = 5) -> dict:
"""Fleet-level metrics over a set of (already quality-filtered) digests."""
per = [session_metrics(d) for d in digests]
n = len(per)
if n == 0:
return {"n_sessions": 0}
shares = [m["infra_overhead_share"] for m in per]
tokens = [m["tokens"] for m in per]
return {
"n_sessions": n,
"infra_overhead_share_median": _median(shares),
"infra_overhead_share_p90": _pct(shares, 0.9),
"error_rate": round(sum(m["has_error"] for m in per) / n, 3),
"recurring_error_occurrences": sum(m["error_occurrences"] for m in per),
"schema_thrash_sessions": sum(1 for m in per if m["schema_load"] >= schema_thrash_threshold),
"tokens_p50": _pct(tokens, 0.5),
"tokens_p90": _pct(tokens, 0.9),
"success_rate": round(sum(m["success"] for m in per) / n, 3),
}
def snapshot(digests: list[dict], *, label: str = "") -> dict:
m = aggregate(digests)
m["captured_at"] = _now()
m["label"] = label
return m
def save_baseline(metrics: dict, path: str) -> None:
"""Append a metrics snapshot to the baseline JSONL trend file."""
os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
with open(path, "a", encoding="utf-8") as fh:
fh.write(json.dumps(metrics, sort_keys=True))
fh.write("\n")
def load_baselines(path: str) -> list[dict]:
if not os.path.exists(path):
return []
with open(path, encoding="utf-8") as fh:
return [json.loads(line) for line in fh if line.strip()]

View File

@@ -0,0 +1,88 @@
"""Distributor base tests (WP-0007 T01): markers, idempotent upsert, rendering."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.curate.schema import Resolution, SolutionPattern # noqa: E402
from session_memory.distribute.base import ( # noqa: E402
Artifact,
BaseDistributor,
Distributor,
render_markdown_body,
upsert_block,
wrap_block,
)
def _pattern(pid="sp-x", polarity="problem"):
return SolutionPattern(
id=pid, name="Read before edit", version="1.2.0", polarity=polarity,
problem="Agents edit files they have not read.",
resolutions=[Resolution(summary="Read the file first", detail="then Edit",
steps=["Read", "Edit"])],
rendering_hints={"claude": {"target": "CLAUDE.md"}},
)
def test_render_markdown_body_has_problem_and_resolution():
body = render_markdown_body(_pattern())
assert "### Read before edit" in body
assert "Agents edit files" in body
assert "**Avoid:**" in body # problem polarity
assert "- Read the file first — then Edit" in body
assert " - Read" in body
def test_success_polarity_label():
assert "**Prefer:**" in render_markdown_body(_pattern(polarity="success"))
def test_wrap_block_has_markers_and_version():
block = wrap_block("sp-x", "hello", "1.2.0")
assert block.startswith("<!-- BEGIN helix-forge pattern:sp-x --> v1.2.0")
assert block.rstrip().endswith("<!-- END helix-forge pattern:sp-x -->")
def test_upsert_inserts_then_replaces_in_place():
doc = "# Title\n\nsome text\n"
b1 = wrap_block("sp-x", "first", "1")
once = upsert_block(doc, "sp-x", b1)
assert "first" in once and once.count("BEGIN helix-forge pattern:sp-x") == 1
# re-distributing the same id replaces, does not duplicate
b2 = wrap_block("sp-x", "second", "2")
twice = upsert_block(once, "sp-x", b2)
assert "second" in twice and "first" not in twice
assert twice.count("BEGIN helix-forge pattern:sp-x") == 1
def test_upsert_keeps_other_patterns():
doc = upsert_block("", "sp-a", wrap_block("sp-a", "A"))
doc = upsert_block(doc, "sp-b", wrap_block("sp-b", "B"))
assert "sp-a" in doc and "sp-b" in doc
def test_base_distributor_renders_artifact():
d = BaseDistributor(flavor="claude", target_path="CLAUDE.md")
art = d.render(_pattern())
assert isinstance(art, Artifact)
assert isinstance(d, Distributor) # satisfies the protocol
assert art.flavor == "claude"
assert art.target_path == "CLAUDE.md"
assert "BEGIN helix-forge pattern:sp-x" in art.content
assert "Read before edit" in art.content
def test_body_hint_overrides_default():
p = _pattern()
p.rendering_hints["claude"]["body"] = "custom claude body"
d = BaseDistributor(flavor="claude", target_path="CLAUDE.md")
assert "custom claude body" in d.render(p).content
def test_target_hint_overrides_default():
p = _pattern()
p.rendering_hints["claude"]["target"] = "docs/CLAUDE.md"
d = BaseDistributor(flavor="claude", target_path="CLAUDE.md")
assert d.render(p).target_path == "docs/CLAUDE.md"

View File

@@ -0,0 +1,40 @@
"""Claude distributor tests (WP-0007 T02)."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.curate.schema import Resolution, SolutionPattern # noqa: E402
from session_memory.distribute.claude import ClaudeDistributor # noqa: E402
def _pattern(hints=None):
return SolutionPattern(
id="sp-read-before-edit", name="Read before edit", version="1.0.0",
polarity="problem", problem="Agents edit files they have not read.",
resolutions=[Resolution(summary="Read the file first", steps=["Read", "Edit"])],
rendering_hints=hints or {"claude": {}},
)
def test_default_targets_claude_md():
art = ClaudeDistributor().render(_pattern())
assert art.flavor == "claude"
assert art.target_path == "CLAUDE.md"
assert "BEGIN helix-forge pattern:sp-read-before-edit" in art.content
assert "### Read before edit" in art.content
def test_skill_mode_emits_skill_stub():
art = ClaudeDistributor().render(_pattern({"claude": {"as": "skill"}}))
assert "## Skill: Read before edit" in art.content
assert "**When:**" in art.content
assert " - Read" in art.content
def test_idempotent_marker_present_for_reupsert():
art = ClaudeDistributor().render(_pattern())
# same id in both renders -> caller can upsert in place
art2 = ClaudeDistributor().render(_pattern())
assert art.pattern_id == art2.pattern_id == "sp-read-before-edit"

View File

@@ -0,0 +1,49 @@
"""Codex + Grok distributor + registry tests (WP-0007 T03)."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.curate.schema import Resolution, SolutionPattern # noqa: E402
from session_memory.distribute.codex import CodexDistributor # noqa: E402
from session_memory.distribute.grok import GrokDistributor # noqa: E402
from session_memory.distribute.registry import all_flavors, get_distributor # noqa: E402
def _pattern():
return SolutionPattern(
id="sp-x", name="Read before edit", version="1.0.0", polarity="problem",
problem="Agents edit files they have not read.",
resolutions=[Resolution(summary="Read the file first")],
)
def test_codex_targets_agents_md():
art = CodexDistributor().render(_pattern())
assert art.flavor == "codex" and art.target_path == "AGENTS.md"
assert "Read before edit" in art.content
def test_grok_targets_native_instructions():
art = GrokDistributor().render(_pattern())
assert art.flavor == "grok" and art.target_path == ".grok/instructions.md"
def test_same_pattern_expressible_for_all_flavors():
# FR-A3: one pattern, rendered for every flavor (same body, different targets)
p = _pattern()
bodies = {}
for f in all_flavors():
art = get_distributor(f).render(p)
# strip markers -> compare agnostic body
inner = art.content.split("\n", 1)[1].rsplit("\n", 1)[0]
bodies[f] = inner
targets = {get_distributor(f).render(p).target_path for f in all_flavors()}
assert len(targets) == 3 # distinct per-flavor targets
assert len(set(bodies.values())) == 1 # identical agnostic body
def test_registry_unknown_flavor():
assert get_distributor("gpt") is None
assert set(all_flavors()) == {"claude", "codex", "grok"}

View File

@@ -0,0 +1,76 @@
"""Distribute entrypoint tests (WP-0007 T05)."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.curate.catalog import Catalog # noqa: E402
from session_memory.curate.schema import Resolution, Scope, SolutionPattern # noqa: E402
from session_memory.distribute.__main__ import build_targets, main, run_distribute # noqa: E402
def _pattern(pid, repos, flavors, status="approved", ready=True):
return SolutionPattern(
id=pid, name=pid, version="1.0.0", polarity="problem", problem="p",
resolutions=[Resolution(summary="do x")],
scope=Scope(repos=repos, flavors=flavors), status=status, distribution_ready=ready,
)
def _config(tmp_path):
return {
"repo_domain_map": {"agentic-resources": "helix_forge", "state-hub": "custodian"},
"curate": {"catalog_dir": str(tmp_path / "catalog")},
"distribute": {"proposals_dir": str(tmp_path / "proposals"),
"active_registry": str(tmp_path / "active.json")},
}
def test_build_targets_crosses_repos_and_flavors():
cfg = {"repo_domain_map": {"r1": "d1", "r2": "d2"}}
targets = build_targets(cfg)
assert len(targets) == 2 * 3 # 2 repos x 3 flavors
assert build_targets(cfg, repo_filter="r1") and all(t.repo == "r1"
for t in build_targets(cfg, repo_filter="r1"))
assert all(t.flavor == "claude" for t in build_targets(cfg, flavor_filter="claude"))
def test_run_distribute_scopes_to_catalog(tmp_path):
cfg = _config(tmp_path)
cat = Catalog(cfg["curate"]["catalog_dir"])
# in-scope for agentic-resources/claude only
cat.upsert(_pattern("sp-a", ["agentic-resources"], ["claude"]))
# provisional -> must be skipped
cat.upsert(_pattern("sp-prov", [], [], status="provisional", ready=False))
res = run_distribute(cfg)
rendered = {pid for _, _, pid, _ in res.proposals}
assert "sp-a" in rendered
assert "sp-prov" not in rendered
assert "sp-prov" in res.skipped_not_distributable
# landed only in the agentic-resources/CLAUDE.md proposal
p = os.path.join(cfg["distribute"]["proposals_dir"], "agentic-resources", "CLAUDE.md")
assert os.path.exists(p)
assert not os.path.exists(
os.path.join(cfg["distribute"]["proposals_dir"], "state-hub", "CLAUDE.md"))
def test_main_runs_json(tmp_path, capsys):
cfg = _config(tmp_path)
cat = Catalog(cfg["curate"]["catalog_dir"])
cat.upsert(_pattern("sp-a", [], ["claude"])) # unrestricted repos
# write a config file
import json as _json
cfg_path = tmp_path / "c.json"
# main() loads TOML; emulate by calling run_distribute path via a tiny toml
toml = tmp_path / "config.toml"
toml.write_text(
f'[repo_domain_map]\nagentic-resources = "helix_forge"\n'
f'[curate]\ncatalog_dir = "{cfg["curate"]["catalog_dir"]}"\n'
f'[distribute]\nproposals_dir = "{cfg["distribute"]["proposals_dir"]}"\n'
f'active_registry = "{cfg["distribute"]["active_registry"]}"\n')
rc = main(["--config", str(toml), "--json"])
assert rc == 0
out = capsys.readouterr().out
assert "sp-a" in out
_json.loads(out) # valid JSON

View File

@@ -0,0 +1,79 @@
"""Scoping + proposals + active registry tests (WP-0007 T04)."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.curate.schema import Resolution, Scope, SolutionPattern # noqa: E402
from session_memory.distribute.proposals import ( # noqa: E402
ActiveRegistry,
Target,
applies,
propose,
)
def _pattern(pid="sp-x", repos=None, flavors=None, status="approved", ready=True):
return SolutionPattern(
id=pid, name="Read before edit", version="1.0.0", polarity="problem",
problem="edit before read", resolutions=[Resolution(summary="read first")],
scope=Scope(repos=repos or [], flavors=flavors or []),
status=status, distribution_ready=ready,
)
def test_applies_respects_scope():
p = _pattern(repos=["agentic-resources"], flavors=["claude"])
assert applies(p, Target("agentic-resources", flavor="claude"))
assert not applies(p, Target("other-repo", flavor="claude"))
assert not applies(p, Target("agentic-resources", flavor="codex"))
def test_empty_scope_is_unrestricted():
assert applies(_pattern(), Target("any", flavor="grok"))
def test_propose_writes_scoped_proposal_files(tmp_path):
out = str(tmp_path / "proposals")
reg = ActiveRegistry(str(tmp_path / "active.json"))
p = _pattern(flavors=["claude"])
res = propose([p], [Target("agentic-resources", flavor="claude"),
Target("agentic-resources", flavor="codex")], out, reg)
# only claude target is in scope
assert len(res.proposals) == 1
path = os.path.join(out, "agentic-resources", "CLAUDE.md")
assert os.path.exists(path)
assert "BEGIN helix-forge pattern:sp-x" in open(path).read()
def test_not_distributable_skipped(tmp_path):
reg = ActiveRegistry(str(tmp_path / "active.json"))
prov = _pattern(status="provisional", ready=False)
res = propose([prov], [Target("r", flavor="claude")], str(tmp_path / "p"), reg)
assert res.proposals == []
assert "sp-x" in res.skipped_not_distributable
def test_proposals_idempotent_on_rerun(tmp_path):
out = str(tmp_path / "proposals")
reg_path = str(tmp_path / "active.json")
p = _pattern()
propose([p], [Target("r", flavor="claude")], out, ActiveRegistry(reg_path))
propose([p], [Target("r", flavor="claude")], out, ActiveRegistry(reg_path))
content = open(os.path.join(out, "r", "CLAUDE.md")).read()
assert content.count("BEGIN helix-forge pattern:sp-x") == 1 # no duplication
def test_active_registry_records_environment(tmp_path):
reg_path = str(tmp_path / "active.json")
reg = ActiveRegistry(reg_path)
propose([_pattern()], [Target("r", domain="helix_forge", flavor="claude")],
str(tmp_path / "p"), reg)
reg2 = ActiveRegistry(reg_path) # reload from disk
entries = reg2.entries()
assert len(entries) == 1
assert entries[0]["pattern_id"] == "sp-x"
assert entries[0]["repo"] == "r"
assert entries[0]["flavor"] == "claude"
assert entries[0]["status"] == "proposed"

View File

@@ -0,0 +1,49 @@
"""Before/after effectiveness tests (WP-0009 T02)."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.measure.effect import effectiveness, split_by_date # noqa: E402
def _digest(ts, tools=None, errors=0, outcome="success"):
return {
"started_at": ts, "outcome": outcome,
"cost": {"input_tokens": 100, "output_tokens": 0},
"tool_histogram": tools or {"Bash": 10},
"error_snippets": [{"fingerprint": f"e{i}", "count": 1} for i in range(errors)],
}
def test_split_by_date():
digs = [_digest("2026-06-01"), _digest("2026-06-05"), _digest("2026-06-10")]
before, after = split_by_date(digs, "2026-06-05")
assert len(before) == 1 and len(after) == 2 # >= applied_at goes to after
def test_effectiveness_detects_improvement():
# before: lots of errors + hub overhead; after: clean
before = [_digest("2026-06-01", tools={"mcp__state-hub__x": 8, "Bash": 2}, errors=3)
for _ in range(3)]
after = [_digest("2026-06-10", tools={"Bash": 10}, errors=0) for _ in range(3)]
e = effectiveness(before + after, "2026-06-05", label="read-before-edit")
assert not e["insufficient_data"]
assert e["n_before"] == 3 and e["n_after"] == 3
assert e["deltas"]["error_rate"]["improved"] is True
assert e["deltas"]["infra_overhead_share_median"]["improved"] is True
assert e["deltas"]["error_rate"]["change"] < 0
def test_effectiveness_insufficient_data():
e = effectiveness([_digest("2026-06-01")], "2026-06-05")
assert e["insufficient_data"] is True
assert e["deltas"] == {}
def test_success_rate_higher_is_better():
before = [_digest("2026-06-01", outcome="fail") for _ in range(2)]
after = [_digest("2026-06-10", outcome="success") for _ in range(2)]
e = effectiveness(before + after, "2026-06-05")
assert e["deltas"]["success_rate"]["improved"] is True

View File

@@ -0,0 +1,79 @@
"""Measure entrypoint tests (WP-0009 T03)."""
import json
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.core.store import Store # noqa: E402
from session_memory.measure.__main__ import main, real_digests # noqa: E402
from session_memory.measure.metrics import load_baselines # noqa: E402
def _digest(uid, ts, tools=None):
return {
"session_uid": uid, "flavor": "claude", "repo": "agentic-resources",
"outcome": "success", "started_at": ts,
"cost": {"input_tokens": 100, "output_tokens": 10},
"event_count": 40, "first_prompt": "Implement the measure entrypoint cleanly",
"tool_histogram": tools or {"Bash": 20, "Edit": 12, "Read": 8},
"error_snippets": [],
}
def _write_config(tmp_path) -> str:
store = tmp_path / ".store"
toml = tmp_path / "config.toml"
toml.write_text(
f'[store]\ndb_path = "{store / "m.db"}"\nblob_dir = "{store / "blobs"}"\n'
f'cursor = "{store / "c.json"}"\n'
f'[measure]\nbaselines = "{tmp_path / "baselines.jsonl"}"\n')
return str(toml), str(store)
def _seed(store_dir):
st = Store(os.path.join(store_dir, "m.db"), os.path.join(store_dir, "blobs"))
st.write_digest("claude:a", _digest("claude:a", "2026-06-01"))
st.write_digest("claude:b", _digest("claude:b", "2026-06-10",
tools={"mcp__state-hub__x": 18, "Bash": 8, "Edit": 4}))
st.close()
def test_real_digests_filters_and_loads(tmp_path):
cfg_path, store_dir = _write_config(tmp_path)
_seed(store_dir)
from session_memory.ingest import load_config
digs = real_digests(load_config(cfg_path))
assert len(digs) == 2
def test_main_writes_baseline_and_reports(tmp_path, capsys):
cfg_path, store_dir = _write_config(tmp_path)
_seed(store_dir)
rc = main(["--config", cfg_path, "--label", "first"])
assert rc == 0
out = capsys.readouterr().out
assert "Fleet metrics" in out
rows = load_baselines(str(tmp_path / "baselines.jsonl"))
assert len(rows) == 1 and rows[0]["label"] == "first"
def test_main_no_save_and_json(tmp_path, capsys):
cfg_path, store_dir = _write_config(tmp_path)
_seed(store_dir)
rc = main(["--config", cfg_path, "--no-save", "--json"])
assert rc == 0
data = json.loads(capsys.readouterr().out)
assert data["current"]["n_sessions"] == 2
assert not os.path.exists(str(tmp_path / "baselines.jsonl"))
def test_main_effectiveness_since(tmp_path, capsys):
cfg_path, store_dir = _write_config(tmp_path)
_seed(store_dir)
rc = main(["--config", cfg_path, "--no-save", "--since", "2026-06-05", "--json"])
assert rc == 0
data = json.loads(capsys.readouterr().out)
assert data["effectiveness"]["n_before"] == 1
assert data["effectiveness"]["n_after"] == 1

View File

@@ -0,0 +1,63 @@
"""Fleet metrics + baseline tests (WP-0009 T01)."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.measure.metrics import ( # noqa: E402
aggregate,
load_baselines,
save_baseline,
session_metrics,
snapshot,
)
def _digest(tools=None, errors=0, tokens=100, outcome="success"):
return {
"outcome": outcome,
"cost": {"input_tokens": tokens, "output_tokens": 0},
"tool_histogram": tools or {"Bash": 10, "Edit": 5},
"error_snippets": [{"fingerprint": f"e{i}", "count": 1} for i in range(errors)],
}
def test_session_metrics_overhead_and_errors():
m = session_metrics(_digest(tools={"mcp__state-hub__create_task": 6, "Bash": 4}, errors=2))
assert abs(m["infra_overhead_share"] - 0.6) < 1e-9
assert m["error_occurrences"] == 2
assert m["has_error"] is True
def test_aggregate_rates_and_percentiles():
digs = [
_digest(tools={"mcp__state-hub__x": 8, "Bash": 2}, errors=1, tokens=50), # 80% overhead
_digest(tools={"Bash": 9, "Edit": 1}, errors=0, tokens=200), # 0% overhead
_digest(tools={"ToolSearch": 6, "Bash": 4}, errors=0, tokens=100, outcome="fail"),
]
a = aggregate(digs)
assert a["n_sessions"] == 3
assert a["error_rate"] == round(1 / 3, 3)
assert a["success_rate"] == round(2 / 3, 3)
assert a["schema_thrash_sessions"] == 1 # the ToolSearch=6 session
assert 0 <= a["infra_overhead_share_median"] <= 1
def test_aggregate_empty():
assert aggregate([]) == {"n_sessions": 0}
def test_snapshot_has_timestamp_and_label():
s = snapshot([_digest()], label="baseline")
assert s["label"] == "baseline"
assert "captured_at" in s and s["n_sessions"] == 1
def test_baseline_roundtrip_appends(tmp_path):
path = str(tmp_path / "baselines.jsonl")
save_baseline(snapshot([_digest()], label="a"), path)
save_baseline(snapshot([_digest(), _digest()], label="b"), path)
rows = load_baselines(path)
assert [r["label"] for r in rows] == ["a", "b"]
assert rows[1]["n_sessions"] == 2

View File

@@ -0,0 +1,100 @@
---
id: AGENTIC-WP-0007
type: workplan
title: "Coding Session Memory — Phase 3 (Distribute: per-flavor artifacts, HITL)"
domain: helix_forge
repo: agentic-resources
status: finished
owner: codex
topic_slug: helix-forge
created: "2026-06-07"
updated: "2026-06-07"
state_hub_workstream_id: "766c9089-d5de-472a-8c0f-85529028cfb9"
---
# Coding Session Memory — Phase 3 (Distribute)
Implements **Distribute** (PRD §6.4, FR-X1FR-X4), continuing
[AGENTIC-WP-0004](AGENTIC-WP-0004-session-memory-phase2.md) (Curate). Distributor
adapters render the **approved / `distribution_ready`** SolutionPatterns from the
Pattern Catalog into per-flavor artifacts, using the `rendering_hints` produced in
Phase 2. Mirror image of the collector design: **agnostic core, thin adapters at
the edges** (FR-A2) — adding a flavor = one collector + one distributor.
Key boundary (FR-X3): output is **proposed, not auto-applied** — artifacts are
written as reviewable proposals (HITL), scoped by repo/domain (FR-X2), with an
active-pattern registry tracking which patterns are live where (FR-X4).
## Distributor Adapter Interface + Artifact Base
```task
id: AGENTIC-WP-0007-T01
status: done
priority: high
state_hub_task_id: "ff618fa6-a78b-4b80-846b-8cde7ad65451"
```
Define a `Distributor` protocol and an `Artifact` dataclass (flavor, target_path,
content, pattern_id) in `session_memory/distribute/`. `render(pattern, scope)`
reads the agnostic `SolutionPattern` plus its per-flavor `rendering_hints`; base
helpers handle idempotent snippet markers. Agnostic core; flavor logic only in
adapters. Unit-tested.
## Claude Distributor (CLAUDE.md snippet)
```task
id: AGENTIC-WP-0007-T02
status: done
priority: high
state_hub_task_id: "64f50bd4-1fdf-452e-ae14-890253ab9f33"
```
`distribute/claude.py`: render an approved pattern into a `CLAUDE.md` snippet block
(or skill stub) with stable `BEGIN/END` markers so re-distribution updates in
place rather than duplicating. Uses `rendering_hints["claude"]`. Unit-tested.
## Codex + Grok Distributors
```task
id: AGENTIC-WP-0007-T03
status: done
priority: high
state_hub_task_id: "382790f5-1fb4-4394-b039-1649cbf3b20a"
```
`distribute/codex.py` (`AGENTS.md` snippet) and `distribute/grok.py` (native
instruction format), each rendering the *same* agnostic pattern via its
`rendering_hints`. Confirms FR-A3: a pattern discovered via one flavor is
expressible for all. Unit-tested.
## Scoping + Proposed-Not-Applied Output + Active-Pattern Registry
```task
id: AGENTIC-WP-0007-T04
status: done
priority: high
state_hub_task_id: "2c690f29-2aee-460a-b9cd-3566018f6b3c"
```
Filter patterns by `Scope` (repos/domains/flavors) so a pattern only lands where it
applies (FR-X2). Write artifacts as **proposals** under a `proposals/` dir, never
auto-applied (FR-X3, HITL). Track which patterns are active in which environments
in an active-pattern registry (FR-X4). Unit-tested.
## Distribute Entrypoint + Tests + Verify
```task
id: AGENTIC-WP-0007-T05
status: done
priority: medium
state_hub_task_id: "f9e24c13-7049-4c1c-a2d6-3a4dc4e752fd"
```
`python -m session_memory.distribute`: read approved catalog patterns, render
per-flavor proposals scoped by repo/domain, emit a proposal summary + JSON.
Document in `session_memory/README.md`. Verify end-to-end against the real catalog.
After workplan updates, notify the operator to run from `~/state-hub`:
```bash
make fix-consistency REPO=agentic-resources
```

View File

@@ -0,0 +1,56 @@
---
id: AGENTIC-WP-0008
type: workplan
title: "Act on #1 friction — Read-before-Edit reflex"
domain: helix_forge
repo: agentic-resources
status: finished
owner: codex
topic_slug: helix-forge
created: "2026-06-07"
updated: "2026-06-07"
state_hub_workstream_id: "6aac5cfc-4799-4d07-9537-42a203af2d1b"
---
# Act on #1 Friction — Read-before-Edit Reflex
The error-body mining ([AGENTIC-WP-0006](AGENTIC-WP-0006-error-body-mining.md))
found that the single most common error across real coding sessions is
**`File has not been read yet. Read it first before writing to it.`** — Edit/Write
before Read, in **12 of 27 sessions across 8 repos** — followed by the stale-read
**`File has been modified since read`** (6 sessions). See
[ASSESSMENT-infra-friction.md](../docs/ASSESSMENT-infra-friction.md).
This is the cheapest high-value fix surfaced by the whole analysis: a short
behavioural reflex in the agent instructions. We also capture it as a curated
SolutionPattern so Phase 3 Distribute can propose it to other repos/flavors —
closing the assess → curate → distribute loop by hand for one real pattern.
## Add Read-before-Edit Reflex to Agent Instructions
```task
id: AGENTIC-WP-0008-T01
status: done
priority: high
state_hub_task_id: "549c84c1-5bd8-4ff6-b61d-1c72946b8b8e"
```
Add a concise, data-cited **Read-before-Edit / re-read-on-"modified since read"**
reflex to `AGENTS.md` (and note for `CLAUDE.md`), targeting the #1 and #2 recurring
errors. Keep it short to avoid context bloat (cf. PRD OQ6 — pattern bloat degrades
context budgets).
## Capture as Curated SolutionPattern for Distribute
```task
id: AGENTIC-WP-0008-T02
status: done
priority: medium
state_hub_task_id: "c007baf9-db14-40fa-b944-d1f1a71ea28b"
```
Promote the recurring "file not read" problem into a curated `SolutionPattern` in
the Pattern Catalog with per-flavor `rendering_hints`, so Phase 3 Distribute can
render and propose it across repos/flavors. Links assess → curate → distribute end
to end on a real pattern. After updates, notify the operator to run
`make fix-consistency REPO=agentic-resources`.

View File

@@ -0,0 +1,68 @@
---
id: AGENTIC-WP-0009
type: workplan
title: "Coding Session Memory — Phase 4 (Measure: effectiveness + fleet trend)"
domain: helix_forge
repo: agentic-resources
status: finished
owner: codex
topic_slug: helix-forge
created: "2026-06-07"
updated: "2026-06-07"
state_hub_workstream_id: "99f1d836-3be0-40e5-9f17-63d3ecc5fcca"
---
# Coding Session Memory — Phase 4 (Measure)
Implements **Measure** (PRD §6.5, FR-M1FR-M3) — the loop-closer. After patterns
are distributed (Phase 3) and changes land (e.g. the State Hub skill
[STATE-WP-0058] and the Read-before-Edit reflex
[AGENTIC-WP-0008](AGENTIC-WP-0008-read-before-edit-reflex.md)), Measure answers:
**did it actually help?**
Reuses what is already captured — WP-0005 tool buckets, WP-0006 error mining — so
this is computation over existing digests, not new capture.
## Baseline Metrics Module + Persisted Baseline
```task
id: AGENTIC-WP-0009-T01
status: done
priority: high
state_hub_task_id: "e5c2016a-2d51-4382-a013-7153e053e8ed"
```
`session_memory/measure/metrics.py`: compute fleet metrics over real sessions
(infra-overhead share, error rate, recurring-error count, schema-thrash, cost
percentiles) and persist a **timestamped baseline snapshot**. Reuses
`detect.signals.tool_bucket` and the digest `error_snippets`. Unit-tested.
## Before/After Per-Pattern Effectiveness
```task
id: AGENTIC-WP-0009-T02
status: done
priority: high
state_hub_task_id: "aa097a00-3462-41da-a137-67e1d61d8d33"
```
Given a change/pattern with an applied-at date, compare sessions **after** it
against the pre-change baseline (cost, error rate, infra-overhead, success) to
surface **per-pattern effectiveness** so ineffective patterns can be revised or
retired (FR-M1/FR-M2). Unit-tested.
## Fleet-Trend Report + Entrypoint + Tests
```task
id: AGENTIC-WP-0009-T03
status: done
priority: medium
state_hub_task_id: "f1147d59-2fb7-4d35-baec-b8f001bb9d62"
```
`python -m session_memory.measure`: fleet-level trend (is the median session
getting cheaper / more reliable over time, FR-M3) plus per-pattern effectiveness;
markdown + JSON. Document in `session_memory/README.md`. After updates, notify the
operator to run `make fix-consistency REPO=agentic-resources`.
[STATE-WP-0058]: handed off to the state-hub repo worker