Compare commits

...

2 Commits

Author SHA1 Message Date
4b7a628b6f session-memory Phase 2: hub decision integration (T05)
decisions.py: every final promote/reject becomes a record_decision-shaped
payload (rationale + source key + evidence snapshot). DecisionRecorder degrades
gracefully under a hub outage — pluggable sink with a durable local-queue
fallback and ordered flush/replay (mirrors Phase 1's after-the-fact sync).
Wired into review() via an optional recorder. 6 new tests; suite 70/70 green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-07 00:31:22 +02:00
ab22d22bfb session-memory Phase 2: evidence-bar + bloat guard (T04)
gating.py: two-tier evidence bar (OQ5) — promote floor (frequency/sessions/
cost_impact) plus a stricter distribution-eligibility floor that sets a
promoted pattern to approved+distribution_ready vs provisional. Wired into
review() so thin approvals land provisional. bloat_warnings flags duplicate
and near-duplicate (same signal-type+locus) candidates (OQ6). [curate]/
[curate.gate] knobs in config.toml. 6 new tests; suite 64/64 green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-07 00:28:34 +02:00
7 changed files with 423 additions and 8 deletions

View File

@@ -31,6 +31,21 @@ enabled = true
root = "~/.grok/sessions"
glob = "*/*/chat_history.jsonl"
# Curate phase (AGENTIC-WP-0004): catalog location + promotion evidence bar.
[curate]
catalog_dir = "session_memory/catalog" # files-first Pattern Catalog (committed)
review_log = "session_memory/.store/reviews.jsonl" # remembered decisions (gitignored)
# Evidence bar (OQ5): floors to promote at all, and stricter floors to be
# distribution-eligible (status=approved, distribution_ready=true).
[curate.gate]
min_frequency = 2 # >= this many supporting signals to promote
min_sessions = 2 # >= this many distinct sessions
min_cost_impact = 0.0
dist_require_cross_flavor = false # require cross-flavor evidence to distribute
dist_min_frequency = 3
dist_min_cost_impact = 0.0
# cwd basename -> domain slug. Used to tag sessions with their Custodian domain.
[repo_domain_map]
agentic-resources = "helix_forge"

View File

@@ -0,0 +1,114 @@
"""State Hub decision integration (FR-U4; T05).
Every final promote/reject is recorded as an auditable decision so the rationale,
the source candidate key, and an evidence snapshot are traceable. The catalog
file remains the durable artifact (ADR-001); the decision is the audit trail.
The recorder is **graceful under a hub outage** — exactly the condition hit during
Phase 1, where statuses were synced after the fact. A pluggable ``sink`` does the
actual write (HTTP to the hub, or the MCP ``record_decision`` tool driven by the
operator). If the sink is absent or raises, the decision is appended to a local
queue (``decisions.queue.jsonl``) and can be replayed later with :meth:`flush`.
"""
from __future__ import annotations
import json
import os
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Callable, Optional
# A sink takes a hub-shaped decision payload and persists it (may raise on failure).
Sink = Callable[[dict], None]
def _now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def build_decision(candidate: dict, action: str, rationale: str,
*, workstream_id: Optional[str] = None,
decided_by: str = "curator") -> dict:
"""Shape a curate decision as a State Hub ``record_decision`` payload."""
key = candidate["key"]
verb = "Promote" if action == "approve" else "Reject"
return {
"title": f"{verb} pattern candidate {key}",
"decision_type": "made",
"workstream_id": workstream_id,
"rationale": rationale,
"decided_by": decided_by,
"description": json.dumps({
"action": action,
"source_key": key,
"evidence": candidate,
}, sort_keys=True),
"recorded_at": _now(),
}
@dataclass
class DecisionRecorder:
"""Records decisions through ``sink`` with a durable local-queue fallback."""
queue_path: str
sink: Optional[Sink] = None
workstream_id: Optional[str] = None
decided_by: str = "curator"
_queued: int = field(default=0, init=False)
def record(self, candidate: dict, action: str, rationale: str) -> bool:
"""Record one decision. Returns True if the sink accepted it, else queued."""
payload = build_decision(candidate, action, rationale,
workstream_id=self.workstream_id, decided_by=self.decided_by)
if self.sink is not None:
try:
self.sink(payload)
return True
except Exception: # hub down / transient — fall through to the queue
pass
self._append(payload)
return False
def pending(self) -> list[dict]:
if not os.path.exists(self.queue_path):
return []
with open(self.queue_path, encoding="utf-8") as fh:
return [json.loads(line) for line in fh if line.strip()]
def flush(self, sink: Optional[Sink] = None) -> int:
"""Replay queued decisions through ``sink``. Returns count synced.
Stops at the first failure so ordering is preserved; the unsynced tail is
rewritten back to the queue.
"""
sink = sink or self.sink
if sink is None:
return 0
items = self.pending()
synced = 0
for i, payload in enumerate(items):
try:
sink(payload)
synced += 1
except Exception:
self._rewrite(items[i:])
return synced
self._rewrite([])
return synced
# --- internals ----------------------------------------------------------
def _append(self, payload: dict) -> None:
os.makedirs(os.path.dirname(self.queue_path) or ".", exist_ok=True)
with open(self.queue_path, "a", encoding="utf-8") as fh:
fh.write(json.dumps(payload, sort_keys=True))
fh.write("\n")
self._queued += 1
def _rewrite(self, items: list[dict]) -> None:
with open(self.queue_path, "w", encoding="utf-8") as fh:
for payload in items:
fh.write(json.dumps(payload, sort_keys=True))
fh.write("\n")

View File

@@ -0,0 +1,117 @@
"""Promotion evidence-bar + bloat guard (design OQ5/OQ6; T04).
Two gates protect the catalog:
* **Evidence bar (OQ5)** — a candidate must clear configurable floors
(frequency, distinct supporting sessions) before it may be promoted at all.
A separate, stricter bar decides whether the promoted pattern is
*distribution-eligible* (``status="approved"``, ``distribution_ready=True``)
vs. merely ``provisional`` — the minimum trustworthy evidence before a pattern
is allowed near live agent environments.
* **Bloat guard (OQ6)** — flags candidates that would add little: a duplicate of
an already-cataloged pattern, or a near-duplicate sharing the same
signal-type+locus. Keeps the catalog lean so agent context budgets aren't
degraded by low-value instructions.
Knobs live under ``[curate]`` in ``config.toml``; :func:`gate_config` reads them
with safe defaults so the module also works config-free (tests).
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Optional
from .schema import SolutionPattern
@dataclass
class GateConfig:
# promotion floor (OQ5)
min_frequency: int = 2
min_sessions: int = 2
min_cost_impact: float = 0.0
# distribution-eligibility floor (stricter; OQ5)
dist_require_cross_flavor: bool = False
dist_min_frequency: int = 3
dist_min_cost_impact: float = 0.0
def gate_config(config: Optional[dict] = None) -> GateConfig:
c = (config or {}).get("curate", {}) if config else {}
g = c.get("gate", {}) if isinstance(c, dict) else {}
return GateConfig(
min_frequency=g.get("min_frequency", 2),
min_sessions=g.get("min_sessions", 2),
min_cost_impact=g.get("min_cost_impact", 0.0),
dist_require_cross_flavor=g.get("dist_require_cross_flavor", False),
dist_min_frequency=g.get("dist_min_frequency", 3),
dist_min_cost_impact=g.get("dist_min_cost_impact", 0.0),
)
@dataclass
class GateResult:
promotable: bool
distribution_ready: bool
status: str # "approved" if distribution-ready else "provisional"
reasons: list = field(default_factory=list)
def _n_sessions(candidate: dict) -> int:
return len(candidate.get("sessions", []) or [])
def evaluate(candidate: dict, config: Optional[GateConfig] = None) -> GateResult:
"""Decide whether a candidate may be promoted, and at what trust level."""
cfg = config or GateConfig()
reasons: list[str] = []
freq = candidate.get("frequency", 0)
sessions = _n_sessions(candidate)
impact = candidate.get("cost_impact", 0.0)
promotable = True
if freq < cfg.min_frequency:
promotable = False
reasons.append(f"frequency {freq} < min {cfg.min_frequency}")
if sessions < cfg.min_sessions:
promotable = False
reasons.append(f"sessions {sessions} < min {cfg.min_sessions}")
if impact < cfg.min_cost_impact:
promotable = False
reasons.append(f"cost_impact {impact} < min {cfg.min_cost_impact}")
dist = promotable
if cfg.dist_require_cross_flavor and not candidate.get("cross_flavor", False):
dist = False
reasons.append("not cross-flavor (required for distribution)")
if freq < cfg.dist_min_frequency:
dist = False
reasons.append(f"frequency {freq} < distribution min {cfg.dist_min_frequency}")
if impact < cfg.dist_min_cost_impact:
dist = False
reasons.append(f"cost_impact {impact} < distribution min {cfg.dist_min_cost_impact}")
return GateResult(
promotable=promotable,
distribution_ready=bool(dist),
status="approved" if dist else "provisional",
reasons=reasons,
)
def bloat_warnings(candidate: dict, existing: list[SolutionPattern]) -> list[str]:
"""Flag low-value adds against what is already catalogued (OQ6)."""
warnings: list[str] = []
cand_id = SolutionPattern.make_id(candidate["key"])
_, sig_type, locus = (candidate["key"].split(":", 2) + ["", ""])[:3]
for p in existing:
if p.id == cand_id:
warnings.append(f"duplicate of catalogued pattern {p.id}")
continue
p_parts = (p.provenance.source_key.split(":", 2) + ["", ""])[:3]
if (p_parts[1], p_parts[2]) == (sig_type, locus):
warnings.append(f"near-duplicate of {p.id} (same {sig_type}/{locus})")
return warnings

View File

@@ -22,6 +22,8 @@ from datetime import datetime, timezone
from typing import Callable, Optional
from .catalog import Catalog
from .decisions import DecisionRecorder
from .gating import GateConfig, evaluate
from .schema import Provenance, Resolution, Scope, SolutionPattern
APPROVE = "approve"
@@ -46,8 +48,13 @@ def evidence_fingerprint(candidate: dict) -> str:
return hashlib.sha1(json.dumps(payload, sort_keys=True).encode("utf-8")).hexdigest()
def candidate_to_pattern(candidate: dict) -> SolutionPattern:
"""Build a (provisional) Solution Pattern from a detect candidate."""
def candidate_to_pattern(candidate: dict, *, status: str = "provisional",
distribution_ready: bool = False) -> SolutionPattern:
"""Build a Solution Pattern from a detect candidate.
``status``/``distribution_ready`` come from the evidence gate (T04); they
default to a provisional, non-distribution-ready pattern when ungated.
"""
src = candidate["key"]
flavors = list(candidate.get("flavors", []))
hints = {f: {"target": _DEFAULT_TARGET.get(f, ""), "note": "TODO: refine rendering"}
@@ -62,7 +69,8 @@ def candidate_to_pattern(candidate: dict) -> SolutionPattern:
scope=Scope(flavors=flavors, repos=list(candidate.get("repos", []))),
provenance=Provenance(source_key=src, evidence=dict(candidate), promoted_at=_now()),
rendering_hints=hints,
status="provisional",
status=status,
distribution_ready=distribution_ready,
)
@@ -112,8 +120,17 @@ class ReviewResult:
def review(candidates: list[dict], decide: Decider, catalog: Catalog,
log: ReviewLog) -> ReviewResult:
"""Run each candidate through ``decide``; promote approvals into ``catalog``."""
log: ReviewLog, gate: Optional[GateConfig] = None,
recorder: Optional[DecisionRecorder] = None) -> ReviewResult:
"""Run each candidate through ``decide``; promote approvals into ``catalog``.
When a ``gate`` (T04 evidence bar) is supplied, the promoted pattern's
``status``/``distribution_ready`` are set from the gate evaluation, so an
approved-but-thin candidate lands as ``provisional`` rather than
distribution-ready. When a ``recorder`` (T05) is supplied, each final
promote/reject is logged as an auditable hub decision (queued if the hub is
down).
"""
result = ReviewResult()
for cand in candidates:
key = cand["key"]
@@ -125,11 +142,17 @@ def review(candidates: list[dict], decide: Decider, catalog: Catalog,
result.deferred.append(key)
continue # not a final decision — leave for a later pass
if action == APPROVE:
cat_action = catalog.upsert(candidate_to_pattern(cand))
g = evaluate(cand, gate) if gate is not None else None
pattern = (candidate_to_pattern(cand, status=g.status,
distribution_ready=g.distribution_ready)
if g is not None else candidate_to_pattern(cand))
cat_action = catalog.upsert(pattern)
result.approved.append((key, cat_action))
elif action == REJECT:
result.rejected.append(key)
else:
raise ValueError(f"unknown review action {action!r}")
log.record(cand, action, rationale)
if recorder is not None:
recorder.record(cand, action, rationale)
return result

View File

@@ -0,0 +1,70 @@
"""Hub decision integration tests (T05): payload shape + graceful queue/flush."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.curate.catalog import Catalog # noqa: E402
from session_memory.curate.decisions import DecisionRecorder, build_decision # noqa: E402
from session_memory.curate.review import APPROVE, REJECT, ReviewLog, review # noqa: E402
def _candidate(key="success:clean_pass:outcome"):
return {"key": key, "frequency": 18, "sessions": ["a", "b"],
"cost_impact": 9.0, "cross_flavor": True, "flavors": ["claude", "grok"]}
def test_build_decision_payload_shape():
d = build_decision(_candidate(), "approve", "looks solid", workstream_id="ws-1")
assert d["decision_type"] == "made"
assert d["workstream_id"] == "ws-1"
assert "Promote" in d["title"]
assert d["rationale"] == "looks solid"
assert "success:clean_pass:outcome" in d["description"]
def test_sink_accepts_decision(tmp_path):
captured = []
rec = DecisionRecorder(str(tmp_path / "q.jsonl"), sink=captured.append)
assert rec.record(_candidate(), "approve", "ok") is True
assert rec.pending() == []
assert len(captured) == 1
def test_queues_when_sink_down(tmp_path):
def boom(_):
raise RuntimeError("hub down")
rec = DecisionRecorder(str(tmp_path / "q.jsonl"), sink=boom)
assert rec.record(_candidate(), "reject", "noise") is False
assert len(rec.pending()) == 1
def test_no_sink_defaults_to_queue(tmp_path):
rec = DecisionRecorder(str(tmp_path / "q.jsonl"))
rec.record(_candidate(), "approve", "ok")
assert len(rec.pending()) == 1
def test_flush_replays_queue(tmp_path):
rec = DecisionRecorder(str(tmp_path / "q.jsonl")) # offline -> queue
rec.record(_candidate("problem:abandoned:outcome"), "reject", "x")
rec.record(_candidate("success:clean_pass:outcome"), "approve", "y")
captured = []
assert rec.flush(sink=captured.append) == 2
assert rec.pending() == []
assert len(captured) == 2
def test_review_records_each_final_decision(tmp_path):
cat = Catalog(str(tmp_path / "catalog"))
log = ReviewLog(str(tmp_path / "reviews.jsonl"))
captured = []
rec = DecisionRecorder(str(tmp_path / "q.jsonl"), sink=captured.append, workstream_id="ws")
cands = [_candidate("success:clean_pass:outcome"), _candidate("problem:abandoned:outcome")]
review(cands, lambda c: (APPROVE if "success" in c["key"] else REJECT, "r"), cat, log,
recorder=rec)
assert len(captured) == 2
actions = sorted("Promote" in d["title"] for d in captured)
assert actions == [False, True]

View File

@@ -0,0 +1,76 @@
"""Evidence-bar + bloat-guard tests (T04)."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from session_memory.curate.catalog import Catalog # noqa: E402
from session_memory.curate.gating import ( # noqa: E402
GateConfig,
bloat_warnings,
evaluate,
gate_config,
)
from session_memory.curate.review import candidate_to_pattern # noqa: E402
def _candidate(key="success:clean_pass:outcome", freq=5, sessions=5, impact=10.0,
cross=True, flavors=("claude", "grok")):
return {
"key": key,
"frequency": freq,
"sessions": [f"s{i}" for i in range(sessions)],
"cost_impact": impact,
"cross_flavor": cross,
"flavors": list(flavors),
}
def test_clears_bar_and_distribution_ready():
r = evaluate(_candidate(), GateConfig(dist_min_frequency=3))
assert r.promotable and r.distribution_ready
assert r.status == "approved"
def test_thin_candidate_promotable_but_provisional():
# meets promote floor (freq>=2) but below distribution floor (freq<3)
r = evaluate(_candidate(freq=2, sessions=2), GateConfig(dist_min_frequency=3))
assert r.promotable
assert not r.distribution_ready
assert r.status == "provisional"
def test_below_promote_floor_not_promotable():
r = evaluate(_candidate(freq=1, sessions=1))
assert not r.promotable
assert any("frequency" in reason for reason in r.reasons)
def test_cross_flavor_required_for_distribution():
r = evaluate(_candidate(cross=False), GateConfig(dist_require_cross_flavor=True))
assert r.promotable
assert not r.distribution_ready
assert any("cross-flavor" in reason for reason in r.reasons)
def test_gate_config_reads_toml_dict():
cfg = gate_config({"curate": {"gate": {"min_frequency": 9, "dist_require_cross_flavor": True}}})
assert cfg.min_frequency == 9
assert cfg.dist_require_cross_flavor is True
# defaults preserved for unspecified keys
assert cfg.dist_min_frequency == 3
def test_bloat_flags_duplicate_and_near_duplicate(tmp_path):
cat = Catalog(str(tmp_path))
cat.upsert(candidate_to_pattern(_candidate(key="success:clean_pass:outcome")))
existing = cat.list()
# exact same key -> duplicate
dup = bloat_warnings(_candidate(key="success:clean_pass:outcome"), existing)
assert any("duplicate" in w for w in dup)
# different polarity, same signal_type+locus -> near-duplicate
near = bloat_warnings(_candidate(key="problem:clean_pass:outcome"), existing)
assert any("near-duplicate" in w for w in near)
# unrelated -> no warnings
assert bloat_warnings(_candidate(key="problem:retry_storm:retries"), existing) == []

View File

@@ -94,7 +94,7 @@ prior reject is remembered so it is not re-surfaced unless evidence changed.
```task
id: AGENTIC-WP-0004-T04
status: todo
status: done
priority: medium
state_hub_task_id: "d474425d-18af-48e4-8f5b-7716b2da0057"
```
@@ -112,7 +112,7 @@ catalog stays lean and agent context budgets are protected. Knobs live in
```task
id: AGENTIC-WP-0004-T05
status: todo
status: done
priority: medium
state_hub_task_id: "449f12d4-fae0-450d-873f-143b3a570b5a"
```