From 21c714e2861992f831ac9291001d161b18a9de54 Mon Sep 17 00:00:00 2001 From: tegwick Date: Sun, 7 Jun 2026 11:12:09 +0200 Subject: [PATCH] session-memory: infra-overhead + thrash signals (WP-0005 T02) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit signals.py: tool_bucket helper + three tool_histogram-based extractors that the outcome/marker signals were blind to — sig_infra_overhead (hub+task+schema share of tool calls over threshold), sig_schema_thrash (repeated ToolSearch), and sig_tool_thrash (one tool dominating). Thresholds in build_context. 8 new tests; suite 88/88 green. Co-Authored-By: Claude Opus 4.8 --- session_memory/detect/signals.py | 73 ++++++++++++++++- tests/test_detect_infra_signals.py | 80 +++++++++++++++++++ workplans/AGENTIC-WP-0005-detect-hardening.md | 2 +- 3 files changed, 153 insertions(+), 2 deletions(-) create mode 100644 tests/test_detect_infra_signals.py diff --git a/session_memory/detect/signals.py b/session_memory/detect/signals.py index 8429cb8..f7b11be 100644 --- a/session_memory/detect/signals.py +++ b/session_memory/detect/signals.py @@ -91,9 +91,75 @@ def sig_error_then_recovery(digest, ctx) -> list[Signal]: return [] +# --- tool-mix / infrastructure-overhead signals (WP-0005 T02) ---------------- +# These read the captured ``tool_histogram`` — friction that the outcome+marker +# signals above are blind to (sessions still "succeed", just expensively). + +def tool_bucket(tool: str) -> str: + """Group a tool name into a coarse activity bucket (flavor-agnostic).""" + if tool.startswith("mcp__state-hub"): + return "statehub_mcp" + if tool in ("TaskUpdate", "TaskCreate", "TaskGet", "TaskList", "TaskOutput", + "TaskStop", "todo_write", "update_task_status"): + return "task_mgmt" + if tool == "ToolSearch": + return "schema_load" + if tool in ("Bash", "run_terminal_command"): + return "shell" + if tool in ("Edit", "Write", "search_replace", "write", "NotebookEdit"): + return "edit" + if tool in ("Read", "read_file", "grep", "Grep", "glob", "Glob"): + return "read" + return "other" + + +def _bucketed(digest) -> tuple[dict, int]: + buckets: dict[str, int] = {} + for tool, n in (digest.get("tool_histogram") or {}).items(): + buckets[tool_bucket(tool)] = buckets.get(tool_bucket(tool), 0) + n + return buckets, sum(buckets.values()) + + +def sig_infra_overhead(digest, ctx) -> list[Signal]: + """Problem: a large share of tool calls is hub/task/schema plumbing, not work.""" + buckets, total = _bucketed(digest) + if total < ctx.get("infra_min_calls", 20): + return [] + overhead = buckets.get("statehub_mcp", 0) + buckets.get("task_mgmt", 0) + buckets.get("schema_load", 0) + share = overhead / total + if share >= ctx.get("infra_overhead_threshold", 0.30): + return [_base(digest, "infra_overhead", PROBLEM, "infra_overhead", round(share, 3), + overhead_calls=overhead, total_calls=total, + statehub=buckets.get("statehub_mcp", 0), + task_mgmt=buckets.get("task_mgmt", 0), + schema_load=buckets.get("schema_load", 0))] + return [] + + +def sig_schema_thrash(digest, ctx) -> list[Signal]: + """Problem: repeated ToolSearch — deferred-tool schemas reloaded over and over.""" + buckets, _ = _bucketed(digest) + n = buckets.get("schema_load", 0) + if n >= ctx.get("schema_thrash_threshold", 5): + return [_base(digest, "schema_thrash", PROBLEM, "schema_load", float(n), tool_searches=n)] + return [] + + +def sig_tool_thrash(digest, ctx) -> list[Signal]: + """Problem: a single tool is hammered far more than any other — likely churn.""" + hist = digest.get("tool_histogram") or {} + if not hist: + return [] + tool, n = max(hist.items(), key=lambda kv: kv[1]) + if n >= ctx.get("tool_thrash_threshold", 80): + return [_base(digest, "tool_thrash", PROBLEM, f"tool:{tool}", float(n), tool=tool, calls=n)] + return [] + + EXTRACTORS: list[Callable] = [ sig_retry_storm, sig_repeated_errors, sig_budget_overrun, sig_abandoned, sig_clean_pass, sig_error_then_recovery, + sig_infra_overhead, sig_schema_thrash, sig_tool_thrash, ] @@ -104,7 +170,12 @@ def build_context(digests: list[dict]) -> dict[str, Any]: for d in digests ) p90 = totals[int(0.9 * (len(totals) - 1))] if totals else 0 - return {"tokens_p90": p90, "retry_storm_threshold": 3, "error_threshold": 3} + return { + "tokens_p90": p90, "retry_storm_threshold": 3, "error_threshold": 3, + # tool-mix / infra-overhead thresholds (WP-0005 T02) + "infra_min_calls": 20, "infra_overhead_threshold": 0.30, + "schema_thrash_threshold": 5, "tool_thrash_threshold": 80, + } def extract_signals(digests: list[dict], ctx: Optional[dict] = None) -> list[Signal]: diff --git a/tests/test_detect_infra_signals.py b/tests/test_detect_infra_signals.py new file mode 100644 index 0000000..39f880e --- /dev/null +++ b/tests/test_detect_infra_signals.py @@ -0,0 +1,80 @@ +"""Infra-overhead + thrash signal tests (WP-0005 T02).""" + +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from session_memory.detect.signals import ( # noqa: E402 + build_context, + extract_signals, + sig_infra_overhead, + sig_schema_thrash, + sig_tool_thrash, + tool_bucket, +) + + +def _digest(uid="claude:a", repo="r1", tools=None): + return {"session_uid": uid, "flavor": "claude", "repo": repo, "outcome": "success", + "cost": {"input_tokens": 1, "output_tokens": 1}, + "markers": {"errors": 0, "retries": 0, "test_runs": 0}, + "tool_histogram": tools or {}} + + +CTX = {"infra_min_calls": 20, "infra_overhead_threshold": 0.30, + "schema_thrash_threshold": 5, "tool_thrash_threshold": 80} + + +def test_tool_bucket_mapping(): + assert tool_bucket("mcp__state-hub__update_task_status") == "statehub_mcp" + assert tool_bucket("ToolSearch") == "schema_load" + assert tool_bucket("TaskUpdate") == "task_mgmt" + assert tool_bucket("Bash") == "shell" + assert tool_bucket("Edit") == "edit" + + +def test_infra_overhead_fires_above_share(): + # 18 statehub of 30 total = 60% overhead + d = _digest(tools={"mcp__state-hub__create_task": 18, "Bash": 8, "Edit": 4}) + sig = sig_infra_overhead(d, CTX) + assert sig and sig[0].type == "infra_overhead" + assert sig[0].magnitude >= 0.30 + assert sig[0].detail["statehub"] == 18 + + +def test_infra_overhead_quiet_when_mostly_work(): + d = _digest(tools={"mcp__state-hub__create_task": 3, "Bash": 40, "Edit": 30}) + assert sig_infra_overhead(d, CTX) == [] + + +def test_infra_overhead_ignores_tiny_sessions(): + d = _digest(tools={"mcp__state-hub__create_task": 5}) # below infra_min_calls + assert sig_infra_overhead(d, CTX) == [] + + +def test_schema_thrash_fires(): + d = _digest(tools={"ToolSearch": 9, "Bash": 5}) + sig = sig_schema_thrash(d, CTX) + assert sig and sig[0].type == "schema_thrash" + assert sig[0].detail["tool_searches"] == 9 + + +def test_tool_thrash_fires_on_dominant_tool(): + d = _digest(tools={"Bash": 120, "Edit": 5}) + sig = sig_tool_thrash(d, CTX) + assert sig and sig[0].locus == "tool:Bash" + + +def test_extract_signals_includes_infra(): + d = _digest(tools={"mcp__state-hub__create_task": 18, "Bash": 8, "Edit": 4, + "ToolSearch": 6}) + types = {s.type for s in extract_signals([d])} + assert "infra_overhead" in types + assert "schema_thrash" in types + + +def test_build_context_has_infra_defaults(): + ctx = build_context([]) + assert ctx["infra_overhead_threshold"] == 0.30 + assert ctx["schema_thrash_threshold"] == 5 diff --git a/workplans/AGENTIC-WP-0005-detect-hardening.md b/workplans/AGENTIC-WP-0005-detect-hardening.md index e57f171..7f4fd07 100644 --- a/workplans/AGENTIC-WP-0005-detect-hardening.md +++ b/workplans/AGENTIC-WP-0005-detect-hardening.md @@ -52,7 +52,7 @@ sessions — fixing the `abandoned` false-positive. Knobs under `[detect]` in ```task id: AGENTIC-WP-0005-T02 -status: todo +status: done priority: high state_hub_task_id: "10d57b05-a731-4ece-bf45-f6a98ac77555" ```