generated from coulomb/repo-seed
session-memory: infra-overhead + thrash signals (WP-0005 T02)
signals.py: tool_bucket helper + three tool_histogram-based extractors that the outcome/marker signals were blind to — sig_infra_overhead (hub+task+schema share of tool calls over threshold), sig_schema_thrash (repeated ToolSearch), and sig_tool_thrash (one tool dominating). Thresholds in build_context. 8 new tests; suite 88/88 green. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -91,9 +91,75 @@ def sig_error_then_recovery(digest, ctx) -> list[Signal]:
|
||||
return []
|
||||
|
||||
|
||||
# --- tool-mix / infrastructure-overhead signals (WP-0005 T02) ----------------
|
||||
# These read the captured ``tool_histogram`` — friction that the outcome+marker
|
||||
# signals above are blind to (sessions still "succeed", just expensively).
|
||||
|
||||
def tool_bucket(tool: str) -> str:
|
||||
"""Group a tool name into a coarse activity bucket (flavor-agnostic)."""
|
||||
if tool.startswith("mcp__state-hub"):
|
||||
return "statehub_mcp"
|
||||
if tool in ("TaskUpdate", "TaskCreate", "TaskGet", "TaskList", "TaskOutput",
|
||||
"TaskStop", "todo_write", "update_task_status"):
|
||||
return "task_mgmt"
|
||||
if tool == "ToolSearch":
|
||||
return "schema_load"
|
||||
if tool in ("Bash", "run_terminal_command"):
|
||||
return "shell"
|
||||
if tool in ("Edit", "Write", "search_replace", "write", "NotebookEdit"):
|
||||
return "edit"
|
||||
if tool in ("Read", "read_file", "grep", "Grep", "glob", "Glob"):
|
||||
return "read"
|
||||
return "other"
|
||||
|
||||
|
||||
def _bucketed(digest) -> tuple[dict, int]:
|
||||
buckets: dict[str, int] = {}
|
||||
for tool, n in (digest.get("tool_histogram") or {}).items():
|
||||
buckets[tool_bucket(tool)] = buckets.get(tool_bucket(tool), 0) + n
|
||||
return buckets, sum(buckets.values())
|
||||
|
||||
|
||||
def sig_infra_overhead(digest, ctx) -> list[Signal]:
|
||||
"""Problem: a large share of tool calls is hub/task/schema plumbing, not work."""
|
||||
buckets, total = _bucketed(digest)
|
||||
if total < ctx.get("infra_min_calls", 20):
|
||||
return []
|
||||
overhead = buckets.get("statehub_mcp", 0) + buckets.get("task_mgmt", 0) + buckets.get("schema_load", 0)
|
||||
share = overhead / total
|
||||
if share >= ctx.get("infra_overhead_threshold", 0.30):
|
||||
return [_base(digest, "infra_overhead", PROBLEM, "infra_overhead", round(share, 3),
|
||||
overhead_calls=overhead, total_calls=total,
|
||||
statehub=buckets.get("statehub_mcp", 0),
|
||||
task_mgmt=buckets.get("task_mgmt", 0),
|
||||
schema_load=buckets.get("schema_load", 0))]
|
||||
return []
|
||||
|
||||
|
||||
def sig_schema_thrash(digest, ctx) -> list[Signal]:
|
||||
"""Problem: repeated ToolSearch — deferred-tool schemas reloaded over and over."""
|
||||
buckets, _ = _bucketed(digest)
|
||||
n = buckets.get("schema_load", 0)
|
||||
if n >= ctx.get("schema_thrash_threshold", 5):
|
||||
return [_base(digest, "schema_thrash", PROBLEM, "schema_load", float(n), tool_searches=n)]
|
||||
return []
|
||||
|
||||
|
||||
def sig_tool_thrash(digest, ctx) -> list[Signal]:
|
||||
"""Problem: a single tool is hammered far more than any other — likely churn."""
|
||||
hist = digest.get("tool_histogram") or {}
|
||||
if not hist:
|
||||
return []
|
||||
tool, n = max(hist.items(), key=lambda kv: kv[1])
|
||||
if n >= ctx.get("tool_thrash_threshold", 80):
|
||||
return [_base(digest, "tool_thrash", PROBLEM, f"tool:{tool}", float(n), tool=tool, calls=n)]
|
||||
return []
|
||||
|
||||
|
||||
EXTRACTORS: list[Callable] = [
|
||||
sig_retry_storm, sig_repeated_errors, sig_budget_overrun, sig_abandoned,
|
||||
sig_clean_pass, sig_error_then_recovery,
|
||||
sig_infra_overhead, sig_schema_thrash, sig_tool_thrash,
|
||||
]
|
||||
|
||||
|
||||
@@ -104,7 +170,12 @@ def build_context(digests: list[dict]) -> dict[str, Any]:
|
||||
for d in digests
|
||||
)
|
||||
p90 = totals[int(0.9 * (len(totals) - 1))] if totals else 0
|
||||
return {"tokens_p90": p90, "retry_storm_threshold": 3, "error_threshold": 3}
|
||||
return {
|
||||
"tokens_p90": p90, "retry_storm_threshold": 3, "error_threshold": 3,
|
||||
# tool-mix / infra-overhead thresholds (WP-0005 T02)
|
||||
"infra_min_calls": 20, "infra_overhead_threshold": 0.30,
|
||||
"schema_thrash_threshold": 5, "tool_thrash_threshold": 80,
|
||||
}
|
||||
|
||||
|
||||
def extract_signals(digests: list[dict], ctx: Optional[dict] = None) -> list[Signal]:
|
||||
|
||||
80
tests/test_detect_infra_signals.py
Normal file
80
tests/test_detect_infra_signals.py
Normal file
@@ -0,0 +1,80 @@
|
||||
"""Infra-overhead + thrash signal tests (WP-0005 T02)."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from session_memory.detect.signals import ( # noqa: E402
|
||||
build_context,
|
||||
extract_signals,
|
||||
sig_infra_overhead,
|
||||
sig_schema_thrash,
|
||||
sig_tool_thrash,
|
||||
tool_bucket,
|
||||
)
|
||||
|
||||
|
||||
def _digest(uid="claude:a", repo="r1", tools=None):
|
||||
return {"session_uid": uid, "flavor": "claude", "repo": repo, "outcome": "success",
|
||||
"cost": {"input_tokens": 1, "output_tokens": 1},
|
||||
"markers": {"errors": 0, "retries": 0, "test_runs": 0},
|
||||
"tool_histogram": tools or {}}
|
||||
|
||||
|
||||
CTX = {"infra_min_calls": 20, "infra_overhead_threshold": 0.30,
|
||||
"schema_thrash_threshold": 5, "tool_thrash_threshold": 80}
|
||||
|
||||
|
||||
def test_tool_bucket_mapping():
|
||||
assert tool_bucket("mcp__state-hub__update_task_status") == "statehub_mcp"
|
||||
assert tool_bucket("ToolSearch") == "schema_load"
|
||||
assert tool_bucket("TaskUpdate") == "task_mgmt"
|
||||
assert tool_bucket("Bash") == "shell"
|
||||
assert tool_bucket("Edit") == "edit"
|
||||
|
||||
|
||||
def test_infra_overhead_fires_above_share():
|
||||
# 18 statehub of 30 total = 60% overhead
|
||||
d = _digest(tools={"mcp__state-hub__create_task": 18, "Bash": 8, "Edit": 4})
|
||||
sig = sig_infra_overhead(d, CTX)
|
||||
assert sig and sig[0].type == "infra_overhead"
|
||||
assert sig[0].magnitude >= 0.30
|
||||
assert sig[0].detail["statehub"] == 18
|
||||
|
||||
|
||||
def test_infra_overhead_quiet_when_mostly_work():
|
||||
d = _digest(tools={"mcp__state-hub__create_task": 3, "Bash": 40, "Edit": 30})
|
||||
assert sig_infra_overhead(d, CTX) == []
|
||||
|
||||
|
||||
def test_infra_overhead_ignores_tiny_sessions():
|
||||
d = _digest(tools={"mcp__state-hub__create_task": 5}) # below infra_min_calls
|
||||
assert sig_infra_overhead(d, CTX) == []
|
||||
|
||||
|
||||
def test_schema_thrash_fires():
|
||||
d = _digest(tools={"ToolSearch": 9, "Bash": 5})
|
||||
sig = sig_schema_thrash(d, CTX)
|
||||
assert sig and sig[0].type == "schema_thrash"
|
||||
assert sig[0].detail["tool_searches"] == 9
|
||||
|
||||
|
||||
def test_tool_thrash_fires_on_dominant_tool():
|
||||
d = _digest(tools={"Bash": 120, "Edit": 5})
|
||||
sig = sig_tool_thrash(d, CTX)
|
||||
assert sig and sig[0].locus == "tool:Bash"
|
||||
|
||||
|
||||
def test_extract_signals_includes_infra():
|
||||
d = _digest(tools={"mcp__state-hub__create_task": 18, "Bash": 8, "Edit": 4,
|
||||
"ToolSearch": 6})
|
||||
types = {s.type for s in extract_signals([d])}
|
||||
assert "infra_overhead" in types
|
||||
assert "schema_thrash" in types
|
||||
|
||||
|
||||
def test_build_context_has_infra_defaults():
|
||||
ctx = build_context([])
|
||||
assert ctx["infra_overhead_threshold"] == 0.30
|
||||
assert ctx["schema_thrash_threshold"] == 5
|
||||
@@ -52,7 +52,7 @@ sessions — fixing the `abandoned` false-positive. Knobs under `[detect]` in
|
||||
|
||||
```task
|
||||
id: AGENTIC-WP-0005-T02
|
||||
status: todo
|
||||
status: done
|
||||
priority: high
|
||||
state_hub_task_id: "10d57b05-a731-4ece-bf45-f6a98ac77555"
|
||||
```
|
||||
|
||||
Reference in New Issue
Block a user