IB-WP-0019-T02: usage rollup from run records

Every completed generate run now aggregates per-call adapter usage from
the workflow-engine run records into output/budget/usage.yaml. Per-call
data is bucketed by (workflow_id, stage_id, provider, model) with
running totals for calls, prompt_tokens, completion_tokens,
total_tokens, and cost_usd_known (sum of adapter-reported cost when the
provider returns it; usually zero today). A run-level entry captures
run_index, started_at, completed_at, duration_seconds, the executing
plan snapshot_id (resolved from the latest plans.yaml entry), and the
workflow-level run_id / stage_count summaries.

cost_usd_estimated is left as None for this task; T03 wires the
rate-table resolver so the same bucket gets a model-priced fallback
when the adapter does not return cost directly.

Fixture-mode runs are recorded with provider='fixture', zero tokens,
and cost_status='unknown' rather than silently skipped, so the rollup
honestly reflects which stages actually ran.

102 tests pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-17 19:46:40 +02:00
parent 37bbaf9fab
commit 678508226a
4 changed files with 315 additions and 2 deletions

View File

@@ -21,8 +21,10 @@ import yaml
BUDGET_DIR = Path("output/budget")
PLANS_FILE = BUDGET_DIR / "plans.yaml"
USAGE_FILE = BUDGET_DIR / "usage.yaml"
PLAN_RETENTION_DEFAULT = 50
PLANS_SCHEMA_VERSION = 1
USAGE_SCHEMA_VERSION = 1
_SNAPSHOT_FINGERPRINT_FIELDS = (
"stage",
@@ -82,6 +84,162 @@ def read_plan_snapshots(root: str | Path) -> list[dict[str, Any]]:
return list(payload.get("snapshots") or [])
def latest_plan_snapshot_id(root: str | Path) -> str | None:
snapshots = read_plan_snapshots(root)
if not snapshots:
return None
return snapshots[-1].get("snapshot_id")
def record_run_usage(
root: str | Path,
workflow_results: list[dict[str, Any]],
*,
snapshot_id: str | None = None,
duration_seconds: float | None = None,
started_at: str | None = None,
cost_resolver: Any | None = None,
) -> dict[str, Any]:
"""Aggregate per-call usage from completed workflow run records.
``cost_resolver`` is a callable ``(provider, model, prompt_tokens,
completion_tokens) -> float | None`` used to fill ``cost_usd_estimated``
when the adapter did not return a cost. Left as ``None`` here; T03
wires the rate-table resolver in.
"""
root_path = Path(root)
usage_path = root_path / USAGE_FILE
usage_path.parent.mkdir(parents=True, exist_ok=True)
buckets: dict[tuple, dict[str, Any]] = {}
workflow_summaries: list[dict[str, Any]] = []
for workflow in workflow_results or []:
if not isinstance(workflow, dict):
continue
workflow_id = str(workflow.get("workflow_id") or "")
workflow_summary = {
"run_id": workflow.get("run_id"),
"workflow_id": workflow_id,
"status": workflow.get("status"),
"stage_count": len(workflow.get("stages") or []),
}
workflow_summaries.append(workflow_summary)
for stage in workflow.get("stages") or []:
if not isinstance(stage, dict):
continue
provider = str(stage.get("provider") or "")
if not provider:
continue
metadata = stage.get("metadata") or {}
model = str(metadata.get("model") or "")
usage = metadata.get("usage") or {}
prompt_tokens = int(usage.get("prompt_tokens") or 0)
completion_tokens = int(usage.get("completion_tokens") or 0)
reported_cost = _coerce_float(usage.get("cost"))
bucket_key = (workflow_id, str(stage.get("stage_id") or ""), provider, model)
bucket = buckets.setdefault(
bucket_key,
{
"workflow_id": workflow_id,
"stage_id": str(stage.get("stage_id") or ""),
"provider": provider,
"model": model,
"calls": 0,
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0,
"cost_usd_known": 0.0,
"cost_usd_estimated": 0.0,
"cost_status": "known" if reported_cost is not None else "unknown",
"cost_estimated_for_calls": 0,
},
)
bucket["calls"] += 1
bucket["prompt_tokens"] += prompt_tokens
bucket["completion_tokens"] += completion_tokens
bucket["total_tokens"] += prompt_tokens + completion_tokens
if reported_cost is not None:
bucket["cost_usd_known"] = round(bucket["cost_usd_known"] + reported_cost, 6)
bucket["cost_status"] = "known"
elif cost_resolver is not None:
estimated = cost_resolver(provider, model, prompt_tokens, completion_tokens)
if estimated is not None:
bucket["cost_usd_estimated"] = round(
bucket["cost_usd_estimated"] + float(estimated), 6
)
bucket["cost_estimated_for_calls"] += 1
if bucket["cost_status"] != "known":
bucket["cost_status"] = "estimated"
per_bucket = list(buckets.values())
for bucket in per_bucket:
if bucket["cost_usd_estimated"] == 0.0 and bucket["cost_estimated_for_calls"] == 0:
bucket["cost_usd_estimated"] = None
rollup = {
"total_calls": sum(b["calls"] for b in per_bucket),
"total_prompt_tokens": sum(b["prompt_tokens"] for b in per_bucket),
"total_completion_tokens": sum(b["completion_tokens"] for b in per_bucket),
"total_tokens": sum(b["total_tokens"] for b in per_bucket),
"total_cost_usd_known": round(sum(b["cost_usd_known"] for b in per_bucket), 6),
"total_cost_usd_estimated": round(
sum(b["cost_usd_estimated"] or 0.0 for b in per_bucket), 6
)
or None,
}
completed_at = _now()
entry = {
"run_index": _next_run_index(usage_path),
"started_at": started_at,
"completed_at": completed_at,
"duration_seconds": duration_seconds,
"snapshot_id": snapshot_id,
"workflows": workflow_summaries,
"rollup": rollup,
"per_bucket": per_bucket,
}
payload = _read_usage(usage_path)
runs = list(payload.get("runs") or [])
runs.append(entry)
_write_usage(
usage_path,
{"schema_version": USAGE_SCHEMA_VERSION, "runs": runs},
)
return entry
def read_usage_runs(root: str | Path) -> list[dict[str, Any]]:
payload = _read_usage(Path(root) / USAGE_FILE)
return list(payload.get("runs") or [])
def _coerce_float(value: Any) -> float | None:
if value is None:
return None
try:
return float(value)
except (TypeError, ValueError):
return None
def _next_run_index(usage_path: Path) -> int:
payload = _read_usage(usage_path)
return len(payload.get("runs") or []) + 1
def _read_usage(path: Path) -> dict[str, Any]:
if not path.is_file():
return {"schema_version": USAGE_SCHEMA_VERSION, "runs": []}
try:
data = yaml.safe_load(path.read_text(encoding="utf-8"))
except yaml.YAMLError:
return {"schema_version": USAGE_SCHEMA_VERSION, "runs": []}
if not isinstance(data, dict):
return {"schema_version": USAGE_SCHEMA_VERSION, "runs": []}
return data
def _write_usage(path: Path, payload: dict[str, Any]) -> None:
path.write_text(yaml.safe_dump(payload, sort_keys=False), encoding="utf-8")
def _build_snapshot(summary: dict[str, Any]) -> dict[str, Any]:
filters = {
"stage": summary.get("stage"),

View File

@@ -2,11 +2,14 @@ from __future__ import annotations
import hashlib
import shutil
import time
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
_monotonic = time.monotonic
import yaml
from .checks import run_collection_checks
@@ -15,7 +18,11 @@ from .evaluation_io import read_entity_evaluations
from .history import get_history, read_metrics_file, record_check_results
from .lifecycle import create_infospace, load_infospace, register_artifact
from .openrouter import OpenRouterAssistedGenerationAdapter
from .budget import record_plan_snapshot
from .budget import (
latest_plan_snapshot_id,
record_plan_snapshot,
record_run_usage,
)
from .source_intake import SourceChunk, normalize_source
from .workflow import (
AssistedGenerationAdapter,
@@ -343,6 +350,8 @@ def run_generation(
metrics=status.get("metrics", {}),
)
started_wall = datetime.now(timezone.utc)
monotonic_start = _monotonic()
adapter = (
_adapter_for(provider, model=model, fixture_responses=fixture_responses)
if workflow_ids
@@ -379,6 +388,15 @@ def run_generation(
}
)
_write_state(root_path, state)
if workflow_results:
duration_seconds = round(_monotonic() - monotonic_start, 3)
record_run_usage(
root_path,
workflow_results,
snapshot_id=latest_plan_snapshot_id(root_path),
duration_seconds=duration_seconds,
started_at=started_wall.isoformat(),
)
return GenerationRunResult(
root=str(root_path),
status="completed",