IB-WP-0019-T05: state-hub token-event emission with failure isolation

Emit one record_token_event payload per completed generate run, derived
from the just-recorded usage rollup. tokens_in/out come from the
rollup, model defaults to the dominant model used (or "mixed" when
buckets disagree), agent="infospace-bench", ref_type="session", and
ref_id="<slug>/run-<run_index>". The note carries the infospace slug,
workspace, snapshot_id, and any known/estimated cost so the hub event
is self-describing.

Failure isolation: any exception from the HTTP poster (hub down,
timeout, 5xx) is caught, logged to stderr, and reported as
status=failed; the generate run still completes. INFOSPACE_BENCH_HUB_URL
overrides the default http://127.0.0.1:8000 base;
INFOSPACE_BENCH_DISABLE_HUB_TOKEN_EVENTS skips emission entirely.

Tests cover the happy path, the disable env var, poster failure, the
no-usage skip, multi-model coalescing to "mixed", and an end-to-end
run_generation against an unbindable hub port to prove the run survives
when the hub is unreachable. 116 tests pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-17 20:33:29 +02:00
parent d4c9c56f5c
commit 110c78b9ad
4 changed files with 241 additions and 1 deletions

View File

@@ -13,6 +13,10 @@ from __future__ import annotations
import hashlib import hashlib
import json import json
import os
import sys
import urllib.error
import urllib.request
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Any, Callable from typing import Any, Callable
@@ -22,6 +26,12 @@ import yaml
RATES_FILENAME = "model-rates.yaml" RATES_FILENAME = "model-rates.yaml"
_PACKAGE_RATES_PATH = Path(__file__).parent / "model_rates.yaml" _PACKAGE_RATES_PATH = Path(__file__).parent / "model_rates.yaml"
HUB_URL_ENV = "INFOSPACE_BENCH_HUB_URL"
HUB_DISABLE_ENV = "INFOSPACE_BENCH_DISABLE_HUB_TOKEN_EVENTS"
DEFAULT_HUB_URL = "http://127.0.0.1:8000"
TOKEN_EVENTS_PATH = "/state/token-events"
HUB_TIMEOUT_SECONDS = 3.0
BUDGET_DIR = Path("output/budget") BUDGET_DIR = Path("output/budget")
PLANS_FILE = BUDGET_DIR / "plans.yaml" PLANS_FILE = BUDGET_DIR / "plans.yaml"
USAGE_FILE = BUDGET_DIR / "usage.yaml" USAGE_FILE = BUDGET_DIR / "usage.yaml"
@@ -275,6 +285,96 @@ def record_run_variance(
return summary return summary
def emit_token_event(
run_entry: dict[str, Any],
*,
infospace_slug: str,
workspace: str | None = None,
hub_url: str | None = None,
poster: Callable[[str, dict[str, Any], float], Any] | None = None,
) -> dict[str, Any]:
"""POST one record_token_event payload to state-hub.
Returns a result dict with ``status`` in {emitted, disabled, skipped,
failed} and a ``reason`` field when not emitted. Never raises: a
state-hub outage is logged to stderr and reported as ``failed``.
"""
if os.environ.get(HUB_DISABLE_ENV):
return {"status": "disabled", "reason": f"{HUB_DISABLE_ENV} set"}
rollup = run_entry.get("rollup") or {}
tokens_in = int(rollup.get("total_prompt_tokens") or 0)
tokens_out = int(rollup.get("total_completion_tokens") or 0)
if tokens_in == 0 and tokens_out == 0:
return {"status": "skipped", "reason": "no token usage to report"}
model = _dominant_model(run_entry.get("per_bucket") or [])
payload = {
"tokens_in": tokens_in,
"tokens_out": tokens_out,
"model": model,
"agent": "infospace-bench",
"ref_type": "session",
"ref_id": f"{infospace_slug}/run-{run_entry.get('run_index')}",
"note": _token_event_note(run_entry, infospace_slug, workspace),
}
url = (hub_url or os.environ.get(HUB_URL_ENV) or DEFAULT_HUB_URL).rstrip("/") + TOKEN_EVENTS_PATH
try:
if poster is not None:
poster(url, payload, HUB_TIMEOUT_SECONDS)
else:
_post_json(url, payload, HUB_TIMEOUT_SECONDS)
except Exception as exc: # never let hub problems fail the run
sys.stderr.write(
f"[budget] state-hub token event failed ({url}): {exc}\n"
)
return {"status": "failed", "reason": str(exc), "url": url}
return {"status": "emitted", "url": url, "model": model, "tokens_in": tokens_in, "tokens_out": tokens_out}
def _dominant_model(per_bucket: list[dict[str, Any]]) -> str:
totals: dict[str, int] = {}
for bucket in per_bucket:
model = str(bucket.get("model") or "")
if not model:
continue
totals[model] = totals.get(model, 0) + int(bucket.get("total_tokens") or 0)
if not totals:
return ""
if len(totals) == 1:
return next(iter(totals))
return "mixed"
def _token_event_note(
run_entry: dict[str, Any], infospace_slug: str, workspace: str | None
) -> str:
rollup = run_entry.get("rollup") or {}
parts = [f"infospace={infospace_slug}"]
if workspace:
parts.append(f"workspace={workspace}")
snapshot_id = run_entry.get("snapshot_id")
if snapshot_id:
parts.append(f"snapshot={snapshot_id}")
cost_known = rollup.get("total_cost_usd_known")
if cost_known:
parts.append(f"cost_known_usd={cost_known}")
cost_estimated = rollup.get("total_cost_usd_estimated")
if cost_estimated:
parts.append(f"cost_estimated_usd={cost_estimated}")
return " ".join(parts)
def _post_json(url: str, payload: dict[str, Any], timeout: float) -> None:
body = json.dumps(payload).encode("utf-8")
request = urllib.request.Request(
url,
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(request, timeout=timeout) as response:
response.read()
def read_run_variance(root: str | Path) -> dict[str, Any] | None: def read_run_variance(root: str | Path) -> dict[str, Any] | None:
path = Path(root) / SUMMARY_FILE path = Path(root) / SUMMARY_FILE
if not path.is_file(): if not path.is_file():

View File

@@ -19,6 +19,7 @@ from .history import get_history, read_metrics_file, record_check_results
from .lifecycle import create_infospace, load_infospace, register_artifact from .lifecycle import create_infospace, load_infospace, register_artifact
from .openrouter import OpenRouterAssistedGenerationAdapter from .openrouter import OpenRouterAssistedGenerationAdapter
from .budget import ( from .budget import (
emit_token_event,
latest_plan_snapshot_id, latest_plan_snapshot_id,
make_cost_resolver, make_cost_resolver,
read_run_variance, read_run_variance,
@@ -411,6 +412,11 @@ def run_generation(
cost_resolver=make_cost_resolver(_workspace_for(root_path)), cost_resolver=make_cost_resolver(_workspace_for(root_path)),
) )
record_run_variance(root_path, usage_entry) record_run_variance(root_path, usage_entry)
emit_token_event(
usage_entry,
infospace_slug=load_infospace(root_path).config.slug,
workspace=str(_workspace_for(root_path)),
)
metrics: dict[str, Any] = {} metrics: dict[str, Any] = {}
snapshot_id = "" snapshot_id = ""

View File

@@ -489,6 +489,140 @@ def test_generation_report_includes_variance_line(tmp_path: Path) -> None:
assert "calls" in report.lower() assert "calls" in report.lower()
def test_emit_token_event_calls_poster_with_record_token_payload(tmp_path: Path) -> None:
from infospace_bench.budget import emit_token_event
calls: list[tuple[str, dict, float]] = []
def fake_poster(url: str, payload: dict, timeout: float) -> None:
calls.append((url, payload, timeout))
run_entry = {
"run_index": 2,
"snapshot_id": "abc123",
"rollup": {
"total_prompt_tokens": 1200,
"total_completion_tokens": 400,
"total_cost_usd_known": 0.0,
"total_cost_usd_estimated": 0.05,
},
"per_bucket": [
{"model": "openai/gpt-4o-mini", "total_tokens": 1600},
],
}
result = emit_token_event(
run_entry,
infospace_slug="lefevre",
workspace="/tmp/workspaces/lefevre",
hub_url="http://hub.example",
poster=fake_poster,
)
assert result["status"] == "emitted"
assert len(calls) == 1
url, payload, timeout = calls[0]
assert url == "http://hub.example/state/token-events"
assert payload["tokens_in"] == 1200
assert payload["tokens_out"] == 400
assert payload["model"] == "openai/gpt-4o-mini"
assert payload["agent"] == "infospace-bench"
assert payload["ref_type"] == "session"
assert payload["ref_id"] == "lefevre/run-2"
assert "infospace=lefevre" in payload["note"]
assert "snapshot=abc123" in payload["note"]
assert timeout > 0
def test_emit_token_event_respects_disable_env(monkeypatch, tmp_path: Path) -> None:
from infospace_bench.budget import HUB_DISABLE_ENV, emit_token_event
monkeypatch.setenv(HUB_DISABLE_ENV, "1")
calls: list = []
result = emit_token_event(
{"run_index": 1, "rollup": {"total_prompt_tokens": 100, "total_completion_tokens": 50}, "per_bucket": []},
infospace_slug="foo",
poster=lambda *a, **k: calls.append(a),
)
assert result["status"] == "disabled"
assert calls == []
def test_emit_token_event_isolates_poster_failure(tmp_path: Path) -> None:
from infospace_bench.budget import emit_token_event
def angry_poster(url: str, payload: dict, timeout: float) -> None:
raise RuntimeError("hub down")
result = emit_token_event(
{
"run_index": 1,
"rollup": {"total_prompt_tokens": 50, "total_completion_tokens": 25},
"per_bucket": [{"model": "openai/gpt-4o-mini", "total_tokens": 75}],
},
infospace_slug="foo",
poster=angry_poster,
)
assert result["status"] == "failed"
assert "hub down" in result["reason"]
def test_emit_token_event_skips_when_no_token_usage() -> None:
from infospace_bench.budget import emit_token_event
result = emit_token_event(
{"run_index": 1, "rollup": {"total_prompt_tokens": 0, "total_completion_tokens": 0}, "per_bucket": []},
infospace_slug="foo",
poster=lambda *a, **k: None,
)
assert result["status"] == "skipped"
def test_emit_token_event_marks_multi_model_as_mixed() -> None:
from infospace_bench.budget import emit_token_event
captured: list[dict] = []
def fake_poster(url: str, payload: dict, timeout: float) -> None:
captured.append(payload)
emit_token_event(
{
"run_index": 1,
"rollup": {"total_prompt_tokens": 200, "total_completion_tokens": 100},
"per_bucket": [
{"model": "openai/gpt-4o-mini", "total_tokens": 150},
{"model": "anthropic/claude-3.5-haiku", "total_tokens": 150},
],
},
infospace_slug="foo",
poster=fake_poster,
)
assert captured[0]["model"] == "mixed"
def test_run_generation_never_fails_when_hub_is_down(tmp_path: Path, monkeypatch) -> None:
# Force the default hub URL to a known-bad port so the real poster fails fast.
from infospace_bench.budget import HUB_URL_ENV
from infospace_bench.generator import run_generation, status_generation
monkeypatch.setenv(HUB_URL_ENV, "http://127.0.0.1:1") # reserved unbindable port
root = _build_infospace(tmp_path)
fixture = tmp_path / "responses.yaml"
_write_minimal_fixture(fixture)
plan_generation(root)
result = run_generation(root, fixture_responses=fixture)
status = status_generation(root)
assert result.status == "completed"
assert status["completed"] is True
def test_plan_cli_writes_snapshot(tmp_path: Path) -> None: def test_plan_cli_writes_snapshot(tmp_path: Path) -> None:
root = _build_infospace(tmp_path) root = _build_infospace(tmp_path)
env = os.environ.copy() env = os.environ.copy()

View File

@@ -157,7 +157,7 @@ state_hub_task_id: "c6adc4fb-9062-4c81-a0b2-98d3166e047d"
```task ```task
id: IB-WP-0019-T05 id: IB-WP-0019-T05
status: todo status: done
priority: medium priority: medium
state_hub_task_id: "968bca1d-63ff-4818-83bb-ca314b1e633c" state_hub_task_id: "968bca1d-63ff-4818-83bb-ca314b1e633c"
``` ```