diff --git a/src/infospace_bench/budget.py b/src/infospace_bench/budget.py index 0172f63..90b53f0 100644 --- a/src/infospace_bench/budget.py +++ b/src/infospace_bench/budget.py @@ -21,8 +21,10 @@ import yaml BUDGET_DIR = Path("output/budget") PLANS_FILE = BUDGET_DIR / "plans.yaml" +USAGE_FILE = BUDGET_DIR / "usage.yaml" PLAN_RETENTION_DEFAULT = 50 PLANS_SCHEMA_VERSION = 1 +USAGE_SCHEMA_VERSION = 1 _SNAPSHOT_FINGERPRINT_FIELDS = ( "stage", @@ -82,6 +84,162 @@ def read_plan_snapshots(root: str | Path) -> list[dict[str, Any]]: return list(payload.get("snapshots") or []) +def latest_plan_snapshot_id(root: str | Path) -> str | None: + snapshots = read_plan_snapshots(root) + if not snapshots: + return None + return snapshots[-1].get("snapshot_id") + + +def record_run_usage( + root: str | Path, + workflow_results: list[dict[str, Any]], + *, + snapshot_id: str | None = None, + duration_seconds: float | None = None, + started_at: str | None = None, + cost_resolver: Any | None = None, +) -> dict[str, Any]: + """Aggregate per-call usage from completed workflow run records. + + ``cost_resolver`` is a callable ``(provider, model, prompt_tokens, + completion_tokens) -> float | None`` used to fill ``cost_usd_estimated`` + when the adapter did not return a cost. Left as ``None`` here; T03 + wires the rate-table resolver in. + """ + root_path = Path(root) + usage_path = root_path / USAGE_FILE + usage_path.parent.mkdir(parents=True, exist_ok=True) + buckets: dict[tuple, dict[str, Any]] = {} + workflow_summaries: list[dict[str, Any]] = [] + for workflow in workflow_results or []: + if not isinstance(workflow, dict): + continue + workflow_id = str(workflow.get("workflow_id") or "") + workflow_summary = { + "run_id": workflow.get("run_id"), + "workflow_id": workflow_id, + "status": workflow.get("status"), + "stage_count": len(workflow.get("stages") or []), + } + workflow_summaries.append(workflow_summary) + for stage in workflow.get("stages") or []: + if not isinstance(stage, dict): + continue + provider = str(stage.get("provider") or "") + if not provider: + continue + metadata = stage.get("metadata") or {} + model = str(metadata.get("model") or "") + usage = metadata.get("usage") or {} + prompt_tokens = int(usage.get("prompt_tokens") or 0) + completion_tokens = int(usage.get("completion_tokens") or 0) + reported_cost = _coerce_float(usage.get("cost")) + bucket_key = (workflow_id, str(stage.get("stage_id") or ""), provider, model) + bucket = buckets.setdefault( + bucket_key, + { + "workflow_id": workflow_id, + "stage_id": str(stage.get("stage_id") or ""), + "provider": provider, + "model": model, + "calls": 0, + "prompt_tokens": 0, + "completion_tokens": 0, + "total_tokens": 0, + "cost_usd_known": 0.0, + "cost_usd_estimated": 0.0, + "cost_status": "known" if reported_cost is not None else "unknown", + "cost_estimated_for_calls": 0, + }, + ) + bucket["calls"] += 1 + bucket["prompt_tokens"] += prompt_tokens + bucket["completion_tokens"] += completion_tokens + bucket["total_tokens"] += prompt_tokens + completion_tokens + if reported_cost is not None: + bucket["cost_usd_known"] = round(bucket["cost_usd_known"] + reported_cost, 6) + bucket["cost_status"] = "known" + elif cost_resolver is not None: + estimated = cost_resolver(provider, model, prompt_tokens, completion_tokens) + if estimated is not None: + bucket["cost_usd_estimated"] = round( + bucket["cost_usd_estimated"] + float(estimated), 6 + ) + bucket["cost_estimated_for_calls"] += 1 + if bucket["cost_status"] != "known": + bucket["cost_status"] = "estimated" + per_bucket = list(buckets.values()) + for bucket in per_bucket: + if bucket["cost_usd_estimated"] == 0.0 and bucket["cost_estimated_for_calls"] == 0: + bucket["cost_usd_estimated"] = None + rollup = { + "total_calls": sum(b["calls"] for b in per_bucket), + "total_prompt_tokens": sum(b["prompt_tokens"] for b in per_bucket), + "total_completion_tokens": sum(b["completion_tokens"] for b in per_bucket), + "total_tokens": sum(b["total_tokens"] for b in per_bucket), + "total_cost_usd_known": round(sum(b["cost_usd_known"] for b in per_bucket), 6), + "total_cost_usd_estimated": round( + sum(b["cost_usd_estimated"] or 0.0 for b in per_bucket), 6 + ) + or None, + } + completed_at = _now() + entry = { + "run_index": _next_run_index(usage_path), + "started_at": started_at, + "completed_at": completed_at, + "duration_seconds": duration_seconds, + "snapshot_id": snapshot_id, + "workflows": workflow_summaries, + "rollup": rollup, + "per_bucket": per_bucket, + } + payload = _read_usage(usage_path) + runs = list(payload.get("runs") or []) + runs.append(entry) + _write_usage( + usage_path, + {"schema_version": USAGE_SCHEMA_VERSION, "runs": runs}, + ) + return entry + + +def read_usage_runs(root: str | Path) -> list[dict[str, Any]]: + payload = _read_usage(Path(root) / USAGE_FILE) + return list(payload.get("runs") or []) + + +def _coerce_float(value: Any) -> float | None: + if value is None: + return None + try: + return float(value) + except (TypeError, ValueError): + return None + + +def _next_run_index(usage_path: Path) -> int: + payload = _read_usage(usage_path) + return len(payload.get("runs") or []) + 1 + + +def _read_usage(path: Path) -> dict[str, Any]: + if not path.is_file(): + return {"schema_version": USAGE_SCHEMA_VERSION, "runs": []} + try: + data = yaml.safe_load(path.read_text(encoding="utf-8")) + except yaml.YAMLError: + return {"schema_version": USAGE_SCHEMA_VERSION, "runs": []} + if not isinstance(data, dict): + return {"schema_version": USAGE_SCHEMA_VERSION, "runs": []} + return data + + +def _write_usage(path: Path, payload: dict[str, Any]) -> None: + path.write_text(yaml.safe_dump(payload, sort_keys=False), encoding="utf-8") + + def _build_snapshot(summary: dict[str, Any]) -> dict[str, Any]: filters = { "stage": summary.get("stage"), diff --git a/src/infospace_bench/generator.py b/src/infospace_bench/generator.py index b921aa1..8d30ba4 100644 --- a/src/infospace_bench/generator.py +++ b/src/infospace_bench/generator.py @@ -2,11 +2,14 @@ from __future__ import annotations import hashlib import shutil +import time from dataclasses import asdict, dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any +_monotonic = time.monotonic + import yaml from .checks import run_collection_checks @@ -15,7 +18,11 @@ from .evaluation_io import read_entity_evaluations from .history import get_history, read_metrics_file, record_check_results from .lifecycle import create_infospace, load_infospace, register_artifact from .openrouter import OpenRouterAssistedGenerationAdapter -from .budget import record_plan_snapshot +from .budget import ( + latest_plan_snapshot_id, + record_plan_snapshot, + record_run_usage, +) from .source_intake import SourceChunk, normalize_source from .workflow import ( AssistedGenerationAdapter, @@ -343,6 +350,8 @@ def run_generation( metrics=status.get("metrics", {}), ) + started_wall = datetime.now(timezone.utc) + monotonic_start = _monotonic() adapter = ( _adapter_for(provider, model=model, fixture_responses=fixture_responses) if workflow_ids @@ -379,6 +388,15 @@ def run_generation( } ) _write_state(root_path, state) + if workflow_results: + duration_seconds = round(_monotonic() - monotonic_start, 3) + record_run_usage( + root_path, + workflow_results, + snapshot_id=latest_plan_snapshot_id(root_path), + duration_seconds=duration_seconds, + started_at=started_wall.isoformat(), + ) return GenerationRunResult( root=str(root_path), status="completed", diff --git a/tests/test_budget_registry.py b/tests/test_budget_registry.py index 8706d1a..8b3269b 100644 --- a/tests/test_budget_registry.py +++ b/tests/test_budget_registry.py @@ -61,6 +61,46 @@ def _write_three_chapter_epub(path: Path) -> None: ) +def _write_minimal_fixture(path: Path) -> None: + data = { + "responses": [ + { + "stage_id": "summarize-source", + "input_artifact_id": "*", + "markdown": "# Source Summary\n\nA stub summary.\n", + }, + { + "stage_id": "extract-entities", + "input_artifact_id": "*", + "markdown": ( + "# Stub Entity\n\n## Definition\n\nA stub.\n\n## Context\n\nFor a budget test.\n" + ), + }, + { + "stage_id": "extract-relations", + "input_artifact_id": "*", + "markdown": ( + "# Stub Entity Practices Something\n\n## Subject\n\nStub Entity\n\n" + "## Predicate\n\npractices\n\n## Object\n\nSomething\n\n## Relation Type\n\nsupport\n\n" + "## Evidence\n\nA stub.\n" + ), + }, + { + "stage_id": "evaluate-entity", + "input_artifact_id": "*", + "markdown": ( + "---\nartifact_id: entity/stub-entity.md\nevaluator: fixture\n" + "evaluated_at: '2026-05-17T00:00:00'\n" + "scores:\n - name: groundedness\n value: 4.0\n max_value: 5.0\n" + " - name: usefulness\n value: 4.0\n max_value: 5.0\n---\n\n" + "# Evaluation: entity/stub-entity.md\n" + ), + }, + ] + } + path.write_text(yaml.safe_dump(data, sort_keys=False), encoding="utf-8") + + def _build_infospace(tmp_path: Path) -> Path: book = tmp_path / "book.epub" _write_three_chapter_epub(book) @@ -144,6 +184,103 @@ def test_plan_snapshot_retention_prunes_old_entries(tmp_path: Path) -> None: assert data["pruned_count"] >= 1 +def test_record_run_usage_aggregates_by_workflow_stage_provider_model(tmp_path: Path) -> None: + root = _build_infospace(tmp_path) + from infospace_bench.budget import record_run_usage, read_usage_runs + + workflow_results = [ + { + "run_id": "run-1", + "workflow_id": "generic-source-entities", + "status": "completed", + "stages": [ + { + "stage_id": "extract-entities", + "provider": "openrouter", + "metadata": { + "model": "openai/gpt-4o-mini", + "usage": {"prompt_tokens": 1000, "completion_tokens": 200, "total_tokens": 1200}, + }, + }, + { + "stage_id": "extract-entities", + "provider": "openrouter", + "metadata": { + "model": "openai/gpt-4o-mini", + "usage": {"prompt_tokens": 800, "completion_tokens": 150, "cost": 0.0012}, + }, + }, + {"stage_id": "split-entities", "message": "split 3 entities"}, + ], + } + ] + + entry = record_run_usage(root, workflow_results, snapshot_id="abc123", duration_seconds=4.2) + + assert entry["rollup"]["total_calls"] == 2 + assert entry["rollup"]["total_prompt_tokens"] == 1800 + assert entry["rollup"]["total_completion_tokens"] == 350 + assert entry["rollup"]["total_cost_usd_known"] == 0.0012 + assert entry["snapshot_id"] == "abc123" + assert entry["duration_seconds"] == 4.2 + assert len(entry["per_bucket"]) == 1 + bucket = entry["per_bucket"][0] + assert bucket["workflow_id"] == "generic-source-entities" + assert bucket["stage_id"] == "extract-entities" + assert bucket["provider"] == "openrouter" + assert bucket["model"] == "openai/gpt-4o-mini" + assert bucket["calls"] == 2 + + runs = read_usage_runs(root) + assert len(runs) == 1 + assert runs[0]["run_index"] == 1 + + +def test_record_run_usage_handles_fixture_runs_without_aborting(tmp_path: Path) -> None: + root = _build_infospace(tmp_path) + from infospace_bench.budget import record_run_usage + + workflow_results = [ + { + "run_id": "fix-1", + "workflow_id": "generic-source-summary", + "stages": [ + {"stage_id": "summarize-source", "provider": "fixture"}, + {"stage_id": "summarize-source", "provider": "fixture"}, + ], + } + ] + + entry = record_run_usage(root, workflow_results) + + fixture_bucket = next(b for b in entry["per_bucket"] if b["provider"] == "fixture") + assert fixture_bucket["calls"] == 2 + assert fixture_bucket["prompt_tokens"] == 0 + assert fixture_bucket["cost_status"] == "unknown" + assert entry["rollup"]["total_cost_usd_known"] == 0.0 + + +def test_run_generation_writes_usage_yaml_with_plan_snapshot_id(tmp_path: Path) -> None: + root = _build_infospace(tmp_path) + from infospace_bench.budget import USAGE_FILE, read_usage_runs + from infospace_bench.generator import run_generation + + fixture = tmp_path / "responses.yaml" + _write_minimal_fixture(fixture) + + plan_payload = plan_generation(root) + run_generation(root, fixture_responses=fixture) + + runs = read_usage_runs(root) + assert (root / USAGE_FILE).is_file() + assert len(runs) == 1 + assert runs[0]["snapshot_id"] == plan_payload["snapshot_id"] + assert runs[0]["duration_seconds"] is not None and runs[0]["duration_seconds"] >= 0 + assert runs[0]["rollup"]["total_calls"] >= 0 + # Fixture mode runs should not claim any known cost + assert runs[0]["rollup"]["total_cost_usd_known"] == 0.0 + + def test_plan_cli_writes_snapshot(tmp_path: Path) -> None: root = _build_infospace(tmp_path) env = os.environ.copy() diff --git a/workplans/IB-WP-0019-budget-and-usage-registry.md b/workplans/IB-WP-0019-budget-and-usage-registry.md index 75e3a5d..bfba44b 100644 --- a/workplans/IB-WP-0019-budget-and-usage-registry.md +++ b/workplans/IB-WP-0019-budget-and-usage-registry.md @@ -95,7 +95,7 @@ state_hub_task_id: "7f1a4e0a-c1ad-49f3-aad1-6946de9b1219" ```task id: IB-WP-0019-T02 -status: todo +status: done priority: high state_hub_task_id: "a612f8d4-f96d-4fae-9aa6-66a7946414f5" ```