"""Curate entrypoint (T06): review detect candidates into the Pattern Catalog. python -m session_memory.curate [--config PATH] [--auto-approve] [--json] [--workstream-id ID] Refreshes candidate patterns (runs the detect pipeline), then drives them through the review workflow — **interactive** by default, or **batch** with ``--auto-approve`` (promote everything clearing the evidence bar, reject the rest) for kaizen-agent runs. Candidates are presented cross-flavor first (detect's ranking). Emits a catalog diff summary and, with ``--json``, a machine-readable result. Approvals land in the files-first catalog; each final decision is logged as a hub decision (queued if the hub is down). """ from __future__ import annotations import argparse import json import os from ..detect.__main__ import run_detect from ..ingest import _expand, load_config from .catalog import Catalog from .decisions import DecisionRecorder from .gating import bloat_warnings, evaluate, gate_config from .review import APPROVE, DISCUSS, REJECT, ReviewLog, review def _curate_paths(config: dict): c = config.get("curate", {}) catalog_dir = _expand(c.get("catalog_dir", "session_memory/catalog")) review_log = _expand(c.get("review_log", "session_memory/.store/reviews.jsonl")) queue = _expand(c.get("decision_queue", "session_memory/.store/decisions.queue.jsonl")) ws_id = c.get("state_hub_workstream_id") return catalog_dir, review_log, queue, ws_id def _render_candidate(cand: dict, gate, existing) -> str: g = evaluate(cand, gate) flag = " [CROSS-FLAVOR]" if cand.get("cross_flavor") else "" lines = [ f"\n{cand['title']}{flag}", f" key={cand['key']} score={cand.get('score')} freq={cand['frequency']} " f"impact={cand.get('cost_impact')}", f" flavors={','.join(cand.get('flavors', []))} " f"repos={','.join(cand.get('repos', [])) or '-'} sessions={len(cand.get('sessions', []))}", f" gate: promotable={g.promotable} distribution_ready={g.distribution_ready}" + (f" ({'; '.join(g.reasons)})" if g.reasons else ""), ] for w in bloat_warnings(cand, existing): lines.append(f" bloat: {w}") return "\n".join(lines) def _interactive_decider(gate, catalog): def decide(cand): print(_render_candidate(cand, gate, catalog.list())) while True: choice = input(" [a]pprove / [r]eject / [d]iscuss ? ").strip().lower() if choice in ("a", "approve"): return (APPROVE, input(" rationale: ").strip() or "approved") if choice in ("r", "reject"): return (REJECT, input(" rationale: ").strip() or "rejected") if choice in ("d", "discuss"): return (DISCUSS, "deferred for discussion") return decide def _auto_decider(gate): """Batch policy: approve candidates clearing the promote floor, reject the rest.""" def decide(cand): g = evaluate(cand, gate) if g.promotable: return (APPROVE, "auto-approved: clears evidence bar") return (REJECT, "auto-rejected: " + "; ".join(g.reasons)) return decide def _summary(result, n_candidates: int) -> str: added = [k for k, a in result.approved if a in ("added", "versioned", "updated")] lines = [ f"# Curate summary ({n_candidates} candidates reviewed)", f" approved : {len(result.approved)} ({', '.join(f'{k}:{a}' for k, a in result.approved) or '-'})", f" rejected : {len(result.rejected)} ({', '.join(result.rejected) or '-'})", f" deferred : {len(result.deferred)} ({', '.join(result.deferred) or '-'})", f" skipped : {len(result.skipped)} (already decided)", f" catalog writes: {len(added)}", ] return "\n".join(lines) def main(argv=None) -> int: here = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) ap = argparse.ArgumentParser(description="Curate detect candidates into the Pattern Catalog.") ap.add_argument("--config", default=os.path.join(here, "config.toml")) ap.add_argument("--auto-approve", action="store_true", help="batch mode: promote everything clearing the evidence bar") ap.add_argument("--min-frequency", type=int, default=2) ap.add_argument("--workstream-id", default=None, help="hub workstream for decisions") ap.add_argument("--json", action="store_true", help="emit machine-readable JSON") args = ap.parse_args(argv) config = load_config(args.config) candidates = run_detect(config, min_frequency=args.min_frequency) catalog_dir, review_log_path, queue_path, ws_id = _curate_paths(config) gate = gate_config(config) catalog = Catalog(catalog_dir) log = ReviewLog(review_log_path) recorder = DecisionRecorder(queue_path, workstream_id=args.workstream_id or ws_id) decide = _auto_decider(gate) if args.auto_approve else _interactive_decider(gate, catalog) result = review(candidates, decide, catalog, log, gate=gate, recorder=recorder) if args.json: print(json.dumps({ "approved": result.approved, "rejected": result.rejected, "deferred": result.deferred, "skipped": result.skipped, "decisions_queued": len(recorder.pending()), }, indent=2)) else: print(_summary(result, len(candidates))) if recorder.pending(): print(f" decisions queued (hub offline): {len(recorder.pending())} " f"-> {queue_path}") return 0 if __name__ == "__main__": raise SystemExit(main())