Compare commits

...

2 Commits

Author SHA1 Message Date
e51fd8154d session-memory Phase 2: review workflow (T03)
UI-free discuss/approve/reject engine driving detect candidates into the
catalog via a decide callback. candidate_to_pattern builds a provisional
SolutionPattern with per-flavor rendering-hint stubs. ReviewLog makes
re-review idempotent: prior rejects remembered, re-surfaced only when the
evidence fingerprint changes. 6 new tests; suite 58/58 green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-07 00:25:10 +02:00
c6164a82ba session-memory Phase 2: versioned Pattern Catalog store (T02)
Files-first catalog (one JSON per pattern, id = source-key). Single
idempotent upsert path: added / unchanged / updated (status-only, no bump) /
versioned (content change bumps semver + archives prior to <id>.history.jsonl).
Dedup is structural on pattern id. 5 new tests; suite 52/52 green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-07 00:18:01 +02:00
5 changed files with 448 additions and 2 deletions

View File

@@ -0,0 +1,132 @@
"""Versioned Pattern Catalog — files-first source of truth (FR-U3; T02).
The catalog is a directory of one JSON file per Solution Pattern
(``<catalog_dir>/<pattern-id>.json``). Files originate the work; the State Hub
indexes them (ADR-001 / PRD §9). Identity is the pattern ``id`` (derived from the
source candidate key), so re-promoting the same detect candidate maps to the same
file — dedup is structural, not heuristic.
:meth:`Catalog.upsert` is the one write path and is **idempotent**:
* new id -> written as-is (``added``)
* same id, identical content -> no write, no version bump (``unchanged``)
* same id, only status/flags -> updated in place, no bump (``updated``)
* same id, content changed -> version bumped, prior snapshot
appended to ``<id>.history.jsonl`` (``versioned``)
History is append-only alongside the current file, so the catalog dir stays one
clean current file per pattern while every superseded version is recoverable.
"""
from __future__ import annotations
import json
import os
from datetime import datetime, timezone
from typing import Optional
from .schema import SolutionPattern
# Content fields that define a pattern's substance. Version, timestamps, status,
# and distribution_ready are metadata — changes to them never bump the version.
_CONTENT_KEYS = ("name", "polarity", "problem", "resolutions", "scope",
"provenance", "rendering_hints")
ADDED = "added"
UNCHANGED = "unchanged"
UPDATED = "updated"
VERSIONED = "versioned"
def _now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _content(p: SolutionPattern) -> str:
d = p.to_dict()
return json.dumps({k: d[k] for k in _CONTENT_KEYS}, sort_keys=True)
class Catalog:
"""File-backed catalog of versioned :class:`SolutionPattern` artifacts."""
def __init__(self, catalog_dir: str) -> None:
self.dir = catalog_dir
os.makedirs(self.dir, exist_ok=True)
# --- paths --------------------------------------------------------------
def _path(self, pattern_id: str) -> str:
return os.path.join(self.dir, f"{pattern_id}.json")
def _history_path(self, pattern_id: str) -> str:
return os.path.join(self.dir, f"{pattern_id}.history.jsonl")
# --- reads --------------------------------------------------------------
def load(self, pattern_id: str) -> Optional[SolutionPattern]:
path = self._path(pattern_id)
if not os.path.exists(path):
return None
with open(path, encoding="utf-8") as fh:
return SolutionPattern.from_json(fh.read())
def list(self) -> list[SolutionPattern]:
out: list[SolutionPattern] = []
for name in sorted(os.listdir(self.dir)):
if name.endswith(".json") and not name.endswith(".history.jsonl"):
with open(os.path.join(self.dir, name), encoding="utf-8") as fh:
out.append(SolutionPattern.from_json(fh.read()))
return out
def history(self, pattern_id: str) -> list[dict]:
path = self._history_path(pattern_id)
if not os.path.exists(path):
return []
with open(path, encoding="utf-8") as fh:
return [json.loads(line) for line in fh if line.strip()]
# --- the single write path ---------------------------------------------
def upsert(self, pattern: SolutionPattern) -> str:
"""Insert or version-update a pattern. Returns the action taken."""
existing = self.load(pattern.id)
now = _now()
if existing is None:
pattern.created_at = pattern.created_at or now
pattern.updated_at = now
self._write(pattern)
return ADDED
if _content(existing) == _content(pattern):
# substance unchanged — only persist a metadata (status/flag) change
if (existing.status == pattern.status
and existing.distribution_ready == pattern.distribution_ready):
return UNCHANGED
existing.status = pattern.status
existing.distribution_ready = pattern.distribution_ready
existing.updated_at = now
self._write(existing)
return UPDATED
# substance changed: archive the old version, bump, write the new one
self._append_history(existing)
pattern.version = SolutionPattern.bump_version(existing.version)
pattern.created_at = existing.created_at or now
pattern.updated_at = now
self._write(pattern)
return VERSIONED
# --- internals ----------------------------------------------------------
def _write(self, pattern: SolutionPattern) -> None:
with open(self._path(pattern.id), "w", encoding="utf-8") as fh:
fh.write(pattern.to_json())
fh.write("\n")
def _append_history(self, superseded: SolutionPattern) -> None:
superseded.status = "superseded"
with open(self._history_path(superseded.id), "a", encoding="utf-8") as fh:
fh.write(json.dumps(superseded.to_dict(), sort_keys=True))
fh.write("\n")

View File

@@ -0,0 +1,135 @@
"""Curation review workflow (FR-U1/FR-U2; T03).
Drives Phase 1 detect candidates through a **discuss / approve / reject** review
and, on approve, promotes the candidate into a :class:`SolutionPattern` written to
the :class:`Catalog`. The actual decision is supplied by a ``decide`` callback so
this engine stays UI-free — the ``__main__`` entrypoint (T06) plugs in interactive
or batch (auto-approve) logic.
Re-review is **idempotent** via a :class:`ReviewLog`: a candidate already decided
is skipped unless its *evidence fingerprint* changed (new sessions/frequency), so
a prior **reject** is remembered and not re-surfaced, and a prior **approve** is
updated in place rather than duplicated (catalog dedup does the rest).
"""
from __future__ import annotations
import hashlib
import json
import os
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Callable, Optional
from .catalog import Catalog
from .schema import Provenance, Resolution, Scope, SolutionPattern
APPROVE = "approve"
REJECT = "reject"
DISCUSS = "discuss" # defer — no final decision recorded
# Default per-flavor rendering-hint stubs a reviewer can later refine (OQ4).
_DEFAULT_TARGET = {"claude": "CLAUDE.md", "codex": "AGENTS.md", "grok": "instructions"}
# A decision callback: (candidate dict) -> (action, rationale)
Decider = Callable[[dict], tuple]
def _now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def evidence_fingerprint(candidate: dict) -> str:
"""Stable hash of the evidence that would justify (re)reviewing a candidate."""
keys = ("frequency", "cost_impact", "flavors", "repos", "sessions", "cross_flavor")
payload = {k: candidate.get(k) for k in keys}
return hashlib.sha1(json.dumps(payload, sort_keys=True).encode("utf-8")).hexdigest()
def candidate_to_pattern(candidate: dict) -> SolutionPattern:
"""Build a (provisional) Solution Pattern from a detect candidate."""
src = candidate["key"]
flavors = list(candidate.get("flavors", []))
hints = {f: {"target": _DEFAULT_TARGET.get(f, ""), "note": "TODO: refine rendering"}
for f in flavors}
return SolutionPattern(
id=SolutionPattern.make_id(src),
name=candidate.get("title") or src,
version="1.0.0",
polarity=candidate.get("polarity", "problem"),
problem=candidate.get("title") or src,
resolutions=[Resolution(summary="TODO: capture the recommended resolution")],
scope=Scope(flavors=flavors, repos=list(candidate.get("repos", []))),
provenance=Provenance(source_key=src, evidence=dict(candidate), promoted_at=_now()),
rendering_hints=hints,
status="provisional",
)
@dataclass
class ReviewLog:
"""Append-only record of final decisions, keyed by candidate source key."""
path: str
_by_key: dict = field(default_factory=dict)
def __post_init__(self) -> None:
if os.path.exists(self.path):
with open(self.path, encoding="utf-8") as fh:
for line in fh:
if line.strip():
rec = json.loads(line)
self._by_key[rec["source_key"]] = rec # last write wins
def prior(self, source_key: str) -> Optional[dict]:
return self._by_key.get(source_key)
def already_decided(self, candidate: dict) -> bool:
rec = self._by_key.get(candidate["key"])
return bool(rec) and rec["fingerprint"] == evidence_fingerprint(candidate)
def record(self, candidate: dict, action: str, rationale: str) -> None:
rec = {
"source_key": candidate["key"],
"action": action,
"rationale": rationale,
"fingerprint": evidence_fingerprint(candidate),
"ts": _now(),
}
self._by_key[candidate["key"]] = rec
os.makedirs(os.path.dirname(self.path) or ".", exist_ok=True)
with open(self.path, "a", encoding="utf-8") as fh:
fh.write(json.dumps(rec, sort_keys=True))
fh.write("\n")
@dataclass
class ReviewResult:
approved: list = field(default_factory=list) # (source_key, catalog_action)
rejected: list = field(default_factory=list) # source_key
deferred: list = field(default_factory=list) # source_key (discuss)
skipped: list = field(default_factory=list) # source_key (already decided)
def review(candidates: list[dict], decide: Decider, catalog: Catalog,
log: ReviewLog) -> ReviewResult:
"""Run each candidate through ``decide``; promote approvals into ``catalog``."""
result = ReviewResult()
for cand in candidates:
key = cand["key"]
if log.already_decided(cand):
result.skipped.append(key)
continue
action, rationale = decide(cand)
if action == DISCUSS:
result.deferred.append(key)
continue # not a final decision — leave for a later pass
if action == APPROVE:
cat_action = catalog.upsert(candidate_to_pattern(cand))
result.approved.append((key, cat_action))
elif action == REJECT:
result.rejected.append(key)
else:
raise ValueError(f"unknown review action {action!r}")
log.record(cand, action, rationale)
return result

View File

@@ -0,0 +1,86 @@
"""Versioned Pattern Catalog tests (T02): round-trip, dedup, idempotent upsert."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.curate.catalog import ( # noqa: E402
ADDED,
UNCHANGED,
UPDATED,
VERSIONED,
Catalog,
)
from session_memory.curate.schema import ( # noqa: E402
Provenance,
Resolution,
Scope,
SolutionPattern,
)
def _pattern(src="success:clean_pass:outcome", problem="ran tests, clean finish"):
return SolutionPattern(
id=SolutionPattern.make_id(src),
name="Run tests before declaring success",
version="1.0.0",
polarity="success",
problem=problem,
resolutions=[Resolution(summary="run the suite")],
scope=Scope(flavors=["claude", "grok"]),
provenance=Provenance(source_key=src, evidence={"frequency": 18}),
)
def test_add_then_load_round_trips(tmp_path):
cat = Catalog(str(tmp_path))
assert cat.upsert(_pattern()) == ADDED
loaded = cat.load(SolutionPattern.make_id("success:clean_pass:outcome"))
assert loaded is not None
assert loaded.problem == "ran tests, clean finish"
assert loaded.created_at and loaded.updated_at
assert [p.id for p in cat.list()] == [loaded.id]
def test_resave_identical_is_noop(tmp_path):
cat = Catalog(str(tmp_path))
cat.upsert(_pattern())
assert cat.upsert(_pattern()) == UNCHANGED
# version not bumped, no history written
assert cat.load(_pattern().id).version == "1.0.0"
assert cat.history(_pattern().id) == []
def test_dedup_on_source_key(tmp_path):
cat = Catalog(str(tmp_path))
cat.upsert(_pattern())
cat.upsert(_pattern()) # same source key -> same id -> one file
assert len(cat.list()) == 1
def test_content_change_bumps_version_and_archives(tmp_path):
cat = Catalog(str(tmp_path))
cat.upsert(_pattern())
assert cat.upsert(_pattern(problem="now with more nuance")) == VERSIONED
current = cat.load(_pattern().id)
assert current.version == "1.0.1"
assert current.problem == "now with more nuance"
hist = cat.history(_pattern().id)
assert len(hist) == 1
assert hist[0]["version"] == "1.0.0"
assert hist[0]["status"] == "superseded"
def test_status_only_change_updates_without_bump(tmp_path):
cat = Catalog(str(tmp_path))
cat.upsert(_pattern())
p = _pattern()
p.status = "approved"
p.distribution_ready = True
assert cat.upsert(p) == UPDATED
current = cat.load(p.id)
assert current.status == "approved"
assert current.distribution_ready is True
assert current.version == "1.0.0" # metadata change, no bump
assert cat.history(p.id) == []

View File

@@ -0,0 +1,93 @@
"""Review workflow tests (T03): promote/reject/discuss + idempotent re-review."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.curate.catalog import Catalog # noqa: E402
from session_memory.curate.review import ( # noqa: E402
APPROVE,
DISCUSS,
REJECT,
ReviewLog,
candidate_to_pattern,
review,
)
from session_memory.curate.schema import SolutionPattern # noqa: E402
def _candidate(key="success:clean_pass:outcome", freq=18, flavors=("claude", "grok")):
return {
"key": key,
"polarity": key.split(":")[0],
"signal_type": key.split(":")[1],
"locus": key.split(":")[2],
"title": "cross-flavor success: clean pass",
"frequency": freq,
"flavors": list(flavors),
"repos": ["agentic-resources"],
"sessions": [f"s{i}" for i in range(freq)],
"cross_flavor": len(flavors) > 1,
"cost_impact": 12.5,
}
def _decider(action, rationale="because"):
return lambda cand: (action, rationale)
def test_approve_promotes_to_catalog(tmp_path):
cat = Catalog(str(tmp_path / "catalog"))
log = ReviewLog(str(tmp_path / "reviews.jsonl"))
res = review([_candidate()], _decider(APPROVE), cat, log)
assert len(res.approved) == 1
p = cat.load(SolutionPattern.make_id("success:clean_pass:outcome"))
assert p is not None
assert p.scope.flavors == ["claude", "grok"]
assert set(p.rendering_hints) == {"claude", "grok"}
assert p.provenance.evidence["frequency"] == 18
def test_reject_records_no_catalog_write(tmp_path):
cat = Catalog(str(tmp_path / "catalog"))
log = ReviewLog(str(tmp_path / "reviews.jsonl"))
res = review([_candidate()], _decider(REJECT), cat, log)
assert res.rejected == ["success:clean_pass:outcome"]
assert cat.list() == []
def test_discuss_defers_and_is_not_final(tmp_path):
cat = Catalog(str(tmp_path / "catalog"))
log = ReviewLog(str(tmp_path / "reviews.jsonl"))
res = review([_candidate()], _decider(DISCUSS), cat, log)
assert res.deferred == ["success:clean_pass:outcome"]
# not recorded as final -> a later pass re-surfaces it
res2 = review([_candidate()], _decider(APPROVE), cat, log)
assert len(res2.approved) == 1
def test_prior_reject_remembered_same_evidence(tmp_path):
cat = Catalog(str(tmp_path / "catalog"))
log_path = str(tmp_path / "reviews.jsonl")
review([_candidate()], _decider(REJECT), cat, ReviewLog(log_path))
# fresh log instance (reloads from disk) + same evidence -> skipped
res = review([_candidate()], _decider(APPROVE), cat, ReviewLog(log_path))
assert res.skipped == ["success:clean_pass:outcome"]
assert cat.list() == []
def test_changed_evidence_resurfaces(tmp_path):
cat = Catalog(str(tmp_path / "catalog"))
log_path = str(tmp_path / "reviews.jsonl")
review([_candidate(freq=18)], _decider(REJECT), cat, ReviewLog(log_path))
# more evidence now -> not skipped, gets re-reviewed
res = review([_candidate(freq=40)], _decider(APPROVE), cat, ReviewLog(log_path))
assert len(res.approved) == 1
def test_candidate_to_pattern_defaults():
p = candidate_to_pattern(_candidate(flavors=("claude",)))
assert p.status == "provisional"
assert p.rendering_hints["claude"]["target"] == "CLAUDE.md"
assert p.polarity == "success"

View File

@@ -59,7 +59,7 @@ contract for the embedded evidence. Unit-tested for round-trip stability.
```task
id: AGENTIC-WP-0004-T02
status: todo
status: done
priority: high
state_hub_task_id: "d40c7810-fd1e-4b14-8577-b8a64ddd337b"
```
@@ -76,7 +76,7 @@ re-saving an unchanged pattern is a no-op (no spurious version bump).
```task
id: AGENTIC-WP-0004-T03
status: todo
status: done
priority: high
state_hub_task_id: "e303d01f-564e-4499-9ce5-22cf959ed84c"
```