From 110c78b9adab87d509df241cf17fdb074852ce9a Mon Sep 17 00:00:00 2001 From: tegwick Date: Sun, 17 May 2026 20:33:29 +0200 Subject: [PATCH] IB-WP-0019-T05: state-hub token-event emission with failure isolation Emit one record_token_event payload per completed generate run, derived from the just-recorded usage rollup. tokens_in/out come from the rollup, model defaults to the dominant model used (or "mixed" when buckets disagree), agent="infospace-bench", ref_type="session", and ref_id="/run-". The note carries the infospace slug, workspace, snapshot_id, and any known/estimated cost so the hub event is self-describing. Failure isolation: any exception from the HTTP poster (hub down, timeout, 5xx) is caught, logged to stderr, and reported as status=failed; the generate run still completes. INFOSPACE_BENCH_HUB_URL overrides the default http://127.0.0.1:8000 base; INFOSPACE_BENCH_DISABLE_HUB_TOKEN_EVENTS skips emission entirely. Tests cover the happy path, the disable env var, poster failure, the no-usage skip, multi-model coalescing to "mixed", and an end-to-end run_generation against an unbindable hub port to prove the run survives when the hub is unreachable. 116 tests pass. Co-Authored-By: Claude Opus 4.7 --- src/infospace_bench/budget.py | 100 +++++++++++++ src/infospace_bench/generator.py | 6 + tests/test_budget_registry.py | 134 ++++++++++++++++++ .../IB-WP-0019-budget-and-usage-registry.md | 2 +- 4 files changed, 241 insertions(+), 1 deletion(-) diff --git a/src/infospace_bench/budget.py b/src/infospace_bench/budget.py index b57a1d7..db0cb29 100644 --- a/src/infospace_bench/budget.py +++ b/src/infospace_bench/budget.py @@ -13,6 +13,10 @@ from __future__ import annotations import hashlib import json +import os +import sys +import urllib.error +import urllib.request from datetime import datetime, timezone from pathlib import Path from typing import Any, Callable @@ -22,6 +26,12 @@ import yaml RATES_FILENAME = "model-rates.yaml" _PACKAGE_RATES_PATH = Path(__file__).parent / "model_rates.yaml" +HUB_URL_ENV = "INFOSPACE_BENCH_HUB_URL" +HUB_DISABLE_ENV = "INFOSPACE_BENCH_DISABLE_HUB_TOKEN_EVENTS" +DEFAULT_HUB_URL = "http://127.0.0.1:8000" +TOKEN_EVENTS_PATH = "/state/token-events" +HUB_TIMEOUT_SECONDS = 3.0 + BUDGET_DIR = Path("output/budget") PLANS_FILE = BUDGET_DIR / "plans.yaml" USAGE_FILE = BUDGET_DIR / "usage.yaml" @@ -275,6 +285,96 @@ def record_run_variance( return summary +def emit_token_event( + run_entry: dict[str, Any], + *, + infospace_slug: str, + workspace: str | None = None, + hub_url: str | None = None, + poster: Callable[[str, dict[str, Any], float], Any] | None = None, +) -> dict[str, Any]: + """POST one record_token_event payload to state-hub. + + Returns a result dict with ``status`` in {emitted, disabled, skipped, + failed} and a ``reason`` field when not emitted. Never raises: a + state-hub outage is logged to stderr and reported as ``failed``. + """ + if os.environ.get(HUB_DISABLE_ENV): + return {"status": "disabled", "reason": f"{HUB_DISABLE_ENV} set"} + rollup = run_entry.get("rollup") or {} + tokens_in = int(rollup.get("total_prompt_tokens") or 0) + tokens_out = int(rollup.get("total_completion_tokens") or 0) + if tokens_in == 0 and tokens_out == 0: + return {"status": "skipped", "reason": "no token usage to report"} + model = _dominant_model(run_entry.get("per_bucket") or []) + payload = { + "tokens_in": tokens_in, + "tokens_out": tokens_out, + "model": model, + "agent": "infospace-bench", + "ref_type": "session", + "ref_id": f"{infospace_slug}/run-{run_entry.get('run_index')}", + "note": _token_event_note(run_entry, infospace_slug, workspace), + } + url = (hub_url or os.environ.get(HUB_URL_ENV) or DEFAULT_HUB_URL).rstrip("/") + TOKEN_EVENTS_PATH + try: + if poster is not None: + poster(url, payload, HUB_TIMEOUT_SECONDS) + else: + _post_json(url, payload, HUB_TIMEOUT_SECONDS) + except Exception as exc: # never let hub problems fail the run + sys.stderr.write( + f"[budget] state-hub token event failed ({url}): {exc}\n" + ) + return {"status": "failed", "reason": str(exc), "url": url} + return {"status": "emitted", "url": url, "model": model, "tokens_in": tokens_in, "tokens_out": tokens_out} + + +def _dominant_model(per_bucket: list[dict[str, Any]]) -> str: + totals: dict[str, int] = {} + for bucket in per_bucket: + model = str(bucket.get("model") or "") + if not model: + continue + totals[model] = totals.get(model, 0) + int(bucket.get("total_tokens") or 0) + if not totals: + return "" + if len(totals) == 1: + return next(iter(totals)) + return "mixed" + + +def _token_event_note( + run_entry: dict[str, Any], infospace_slug: str, workspace: str | None +) -> str: + rollup = run_entry.get("rollup") or {} + parts = [f"infospace={infospace_slug}"] + if workspace: + parts.append(f"workspace={workspace}") + snapshot_id = run_entry.get("snapshot_id") + if snapshot_id: + parts.append(f"snapshot={snapshot_id}") + cost_known = rollup.get("total_cost_usd_known") + if cost_known: + parts.append(f"cost_known_usd={cost_known}") + cost_estimated = rollup.get("total_cost_usd_estimated") + if cost_estimated: + parts.append(f"cost_estimated_usd={cost_estimated}") + return " ".join(parts) + + +def _post_json(url: str, payload: dict[str, Any], timeout: float) -> None: + body = json.dumps(payload).encode("utf-8") + request = urllib.request.Request( + url, + data=body, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(request, timeout=timeout) as response: + response.read() + + def read_run_variance(root: str | Path) -> dict[str, Any] | None: path = Path(root) / SUMMARY_FILE if not path.is_file(): diff --git a/src/infospace_bench/generator.py b/src/infospace_bench/generator.py index 015e57b..14b6aec 100644 --- a/src/infospace_bench/generator.py +++ b/src/infospace_bench/generator.py @@ -19,6 +19,7 @@ from .history import get_history, read_metrics_file, record_check_results from .lifecycle import create_infospace, load_infospace, register_artifact from .openrouter import OpenRouterAssistedGenerationAdapter from .budget import ( + emit_token_event, latest_plan_snapshot_id, make_cost_resolver, read_run_variance, @@ -411,6 +412,11 @@ def run_generation( cost_resolver=make_cost_resolver(_workspace_for(root_path)), ) record_run_variance(root_path, usage_entry) + emit_token_event( + usage_entry, + infospace_slug=load_infospace(root_path).config.slug, + workspace=str(_workspace_for(root_path)), + ) metrics: dict[str, Any] = {} snapshot_id = "" diff --git a/tests/test_budget_registry.py b/tests/test_budget_registry.py index 9a5d2b7..d2cb190 100644 --- a/tests/test_budget_registry.py +++ b/tests/test_budget_registry.py @@ -489,6 +489,140 @@ def test_generation_report_includes_variance_line(tmp_path: Path) -> None: assert "calls" in report.lower() +def test_emit_token_event_calls_poster_with_record_token_payload(tmp_path: Path) -> None: + from infospace_bench.budget import emit_token_event + + calls: list[tuple[str, dict, float]] = [] + + def fake_poster(url: str, payload: dict, timeout: float) -> None: + calls.append((url, payload, timeout)) + + run_entry = { + "run_index": 2, + "snapshot_id": "abc123", + "rollup": { + "total_prompt_tokens": 1200, + "total_completion_tokens": 400, + "total_cost_usd_known": 0.0, + "total_cost_usd_estimated": 0.05, + }, + "per_bucket": [ + {"model": "openai/gpt-4o-mini", "total_tokens": 1600}, + ], + } + + result = emit_token_event( + run_entry, + infospace_slug="lefevre", + workspace="/tmp/workspaces/lefevre", + hub_url="http://hub.example", + poster=fake_poster, + ) + + assert result["status"] == "emitted" + assert len(calls) == 1 + url, payload, timeout = calls[0] + assert url == "http://hub.example/state/token-events" + assert payload["tokens_in"] == 1200 + assert payload["tokens_out"] == 400 + assert payload["model"] == "openai/gpt-4o-mini" + assert payload["agent"] == "infospace-bench" + assert payload["ref_type"] == "session" + assert payload["ref_id"] == "lefevre/run-2" + assert "infospace=lefevre" in payload["note"] + assert "snapshot=abc123" in payload["note"] + assert timeout > 0 + + +def test_emit_token_event_respects_disable_env(monkeypatch, tmp_path: Path) -> None: + from infospace_bench.budget import HUB_DISABLE_ENV, emit_token_event + + monkeypatch.setenv(HUB_DISABLE_ENV, "1") + calls: list = [] + result = emit_token_event( + {"run_index": 1, "rollup": {"total_prompt_tokens": 100, "total_completion_tokens": 50}, "per_bucket": []}, + infospace_slug="foo", + poster=lambda *a, **k: calls.append(a), + ) + + assert result["status"] == "disabled" + assert calls == [] + + +def test_emit_token_event_isolates_poster_failure(tmp_path: Path) -> None: + from infospace_bench.budget import emit_token_event + + def angry_poster(url: str, payload: dict, timeout: float) -> None: + raise RuntimeError("hub down") + + result = emit_token_event( + { + "run_index": 1, + "rollup": {"total_prompt_tokens": 50, "total_completion_tokens": 25}, + "per_bucket": [{"model": "openai/gpt-4o-mini", "total_tokens": 75}], + }, + infospace_slug="foo", + poster=angry_poster, + ) + + assert result["status"] == "failed" + assert "hub down" in result["reason"] + + +def test_emit_token_event_skips_when_no_token_usage() -> None: + from infospace_bench.budget import emit_token_event + + result = emit_token_event( + {"run_index": 1, "rollup": {"total_prompt_tokens": 0, "total_completion_tokens": 0}, "per_bucket": []}, + infospace_slug="foo", + poster=lambda *a, **k: None, + ) + + assert result["status"] == "skipped" + + +def test_emit_token_event_marks_multi_model_as_mixed() -> None: + from infospace_bench.budget import emit_token_event + + captured: list[dict] = [] + + def fake_poster(url: str, payload: dict, timeout: float) -> None: + captured.append(payload) + + emit_token_event( + { + "run_index": 1, + "rollup": {"total_prompt_tokens": 200, "total_completion_tokens": 100}, + "per_bucket": [ + {"model": "openai/gpt-4o-mini", "total_tokens": 150}, + {"model": "anthropic/claude-3.5-haiku", "total_tokens": 150}, + ], + }, + infospace_slug="foo", + poster=fake_poster, + ) + + assert captured[0]["model"] == "mixed" + + +def test_run_generation_never_fails_when_hub_is_down(tmp_path: Path, monkeypatch) -> None: + # Force the default hub URL to a known-bad port so the real poster fails fast. + from infospace_bench.budget import HUB_URL_ENV + from infospace_bench.generator import run_generation, status_generation + + monkeypatch.setenv(HUB_URL_ENV, "http://127.0.0.1:1") # reserved unbindable port + root = _build_infospace(tmp_path) + fixture = tmp_path / "responses.yaml" + _write_minimal_fixture(fixture) + plan_generation(root) + + result = run_generation(root, fixture_responses=fixture) + status = status_generation(root) + + assert result.status == "completed" + assert status["completed"] is True + + def test_plan_cli_writes_snapshot(tmp_path: Path) -> None: root = _build_infospace(tmp_path) env = os.environ.copy() diff --git a/workplans/IB-WP-0019-budget-and-usage-registry.md b/workplans/IB-WP-0019-budget-and-usage-registry.md index 6baffd2..dd19db8 100644 --- a/workplans/IB-WP-0019-budget-and-usage-registry.md +++ b/workplans/IB-WP-0019-budget-and-usage-registry.md @@ -157,7 +157,7 @@ state_hub_task_id: "c6adc4fb-9062-4c81-a0b2-98d3166e047d" ```task id: IB-WP-0019-T05 -status: todo +status: done priority: medium state_hub_task_id: "968bca1d-63ff-4818-83bb-ca314b1e633c" ```