"""Signal extractors (PRD §6.2; T04). Pure functions over a session digest (Tier 2) — the compact, durable view. Each extractor emits zero or more :class:`Signal`s. A signal records its source session, a *locus* (what it's about), a *polarity* (problem vs. success), and a *magnitude*. Signals are the atoms the clusterer groups into candidate patterns. No new capture happens here; everything is derived from digests already written by the Capture layer, so detection is cheap and re-runnable. """ from __future__ import annotations from dataclasses import dataclass, field from typing import Any, Callable, Optional # polarity PROBLEM = "problem" SUCCESS = "success" @dataclass class Signal: session_uid: str flavor: str repo: Optional[str] type: str # e.g. "budget_overrun", "clean_pass" polarity: str # PROBLEM | SUCCESS locus: str # normalized subject key (tool, marker, ...) magnitude: float = 1.0 # strength / cost weight detail: dict[str, Any] = field(default_factory=dict) # --- individual extractors -------------------------------------------------- # Each takes (digest, ctx) and returns a list[Signal]. ctx carries corpus-level # stats (e.g. cost percentiles) so extractors can compare a session to its peers. def _base(digest, type_, polarity, locus, magnitude=1.0, **detail) -> Signal: return Signal( session_uid=digest["session_uid"], flavor=digest["flavor"], repo=digest.get("repo"), type=type_, polarity=polarity, locus=locus, magnitude=magnitude, detail=detail, ) def sig_retry_storm(digest, ctx) -> list[Signal]: retries = digest.get("markers", {}).get("retries", 0) if retries >= ctx.get("retry_storm_threshold", 3): return [_base(digest, "retry_storm", PROBLEM, "retries", float(retries), retries=retries)] return [] def sig_repeated_errors(digest, ctx) -> list[Signal]: errors = digest.get("markers", {}).get("errors", 0) if errors >= ctx.get("error_threshold", 3): return [_base(digest, "repeated_errors", PROBLEM, "errors", float(errors), errors=errors)] return [] def sig_budget_overrun(digest, ctx) -> list[Signal]: total = digest.get("cost", {}).get("input_tokens", 0) + digest.get("cost", {}).get("output_tokens", 0) p90 = ctx.get("tokens_p90", 0) if p90 and total > p90: return [_base(digest, "budget_overrun", PROBLEM, "tokens", float(total) / max(p90, 1), tokens=total, p90=p90)] return [] def sig_abandoned(digest, ctx) -> list[Signal]: if digest.get("outcome") == "abandoned": return [_base(digest, "abandoned", PROBLEM, "outcome", 1.0)] return [] def sig_clean_pass(digest, ctx) -> list[Signal]: """Success: ended success, ran tests, no errors, modest cost.""" m = digest.get("markers", {}) if (digest.get("outcome") == "success" and m.get("test_runs", 0) >= 1 and m.get("errors", 0) == 0 and m.get("retries", 0) == 0): return [_base(digest, "clean_pass", SUCCESS, "outcome", 1.0, test_runs=m.get("test_runs"))] return [] def sig_error_then_recovery(digest, ctx) -> list[Signal]: """Success despite hitting errors — a recovery worth learning from.""" m = digest.get("markers", {}) if digest.get("outcome") == "success" and m.get("errors", 0) >= 1: return [_base(digest, "error_then_recovery", SUCCESS, "errors", float(m.get("errors", 1)), errors=m.get("errors"))] return [] EXTRACTORS: list[Callable] = [ sig_retry_storm, sig_repeated_errors, sig_budget_overrun, sig_abandoned, sig_clean_pass, sig_error_then_recovery, ] def build_context(digests: list[dict]) -> dict[str, Any]: """Corpus-level stats so extractors can compare a session to its peers.""" totals = sorted( d.get("cost", {}).get("input_tokens", 0) + d.get("cost", {}).get("output_tokens", 0) for d in digests ) p90 = totals[int(0.9 * (len(totals) - 1))] if totals else 0 return {"tokens_p90": p90, "retry_storm_threshold": 3, "error_threshold": 3} def extract_signals(digests: list[dict], ctx: Optional[dict] = None) -> list[Signal]: ctx = ctx or build_context(digests) out: list[Signal] = [] for d in digests: for ex in EXTRACTORS: out.extend(ex(d, ctx)) return out