generated from coulomb/repo-seed
detect/quality.py: is_real_coding_session drops health-checks / smoke-tests / interrupted / trivially-short sessions (event floor, repo present, substantive tool activity, non-trivial prompt). Wired into run_detect so signals only form over real sessions — fixes the abandoned false-positive. [detect.quality] knobs; existing detect/curate fixtures made realistic. 8 new tests; suite 80/80. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
73 lines
2.7 KiB
Python
73 lines
2.7 KiB
Python
"""Detect entrypoint (T07): digests -> signals -> clusters -> report.
|
|
|
|
python -m session_memory.detect [--config PATH] [--json] [--min-frequency N]
|
|
|
|
Reads Tier 2 digests from the store, extracts signals, clusters them into
|
|
candidate patterns, persists the candidates, and prints a ranked report
|
|
(cross-flavor first) — the input to the Curate phase (Phase 2).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
|
|
from ..core.store import Store
|
|
from ..ingest import _expand, load_config
|
|
from .cluster import cluster
|
|
from .quality import filter_real, quality_config
|
|
from .signals import extract_signals
|
|
|
|
|
|
def run_detect(config: dict, *, min_frequency: int = 2) -> list[dict]:
|
|
store_cfg = config.get("store", {})
|
|
store = Store(_expand(store_cfg["db_path"]), _expand(store_cfg["blob_dir"]))
|
|
digests = filter_real(store.list_digests(), quality_config(config))
|
|
signals = extract_signals(digests)
|
|
patterns = [p.to_dict() for p in cluster(signals, min_frequency=min_frequency)]
|
|
store.save_patterns(patterns)
|
|
store.close()
|
|
return patterns
|
|
|
|
|
|
def _format_report(patterns: list[dict], n_digests: int) -> str:
|
|
lines = [f"# Candidate Patterns ({len(patterns)} from {n_digests} sessions)", ""]
|
|
if not patterns:
|
|
lines.append("No recurring patterns above the frequency threshold yet.")
|
|
return "\n".join(lines)
|
|
for i, p in enumerate(patterns, 1):
|
|
flag = " [CROSS-FLAVOR]" if p["cross_flavor"] else ""
|
|
lines.append(f"{i}. {p['title']}{flag}")
|
|
lines.append(f" score={p['score']} freq={p['frequency']} "
|
|
f"impact={p['cost_impact']} flavors={','.join(p['flavors'])}")
|
|
lines.append(f" repos={','.join(p['repos']) or '-'} "
|
|
f"sessions={len(p['sessions'])}")
|
|
lines.append("")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main(argv=None) -> int:
|
|
here = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
ap = argparse.ArgumentParser(description="Detect candidate patterns from session digests.")
|
|
ap.add_argument("--config", default=os.path.join(here, "config.toml"))
|
|
ap.add_argument("--min-frequency", type=int, default=2)
|
|
ap.add_argument("--json", action="store_true", help="emit machine-readable JSON")
|
|
args = ap.parse_args(argv)
|
|
|
|
config = load_config(args.config)
|
|
store_cfg = config.get("store", {})
|
|
all_digests = Store(_expand(store_cfg["db_path"]), _expand(store_cfg["blob_dir"])).list_digests()
|
|
n = len(filter_real(all_digests, quality_config(config)))
|
|
patterns = run_detect(config, min_frequency=args.min_frequency)
|
|
|
|
if args.json:
|
|
print(json.dumps(patterns, indent=2))
|
|
else:
|
|
print(_format_report(patterns, n))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|