diff --git a/src/infospace_bench/budget.py b/src/infospace_bench/budget.py index 764cd59..b57a1d7 100644 --- a/src/infospace_bench/budget.py +++ b/src/infospace_bench/budget.py @@ -25,9 +25,11 @@ _PACKAGE_RATES_PATH = Path(__file__).parent / "model_rates.yaml" BUDGET_DIR = Path("output/budget") PLANS_FILE = BUDGET_DIR / "plans.yaml" USAGE_FILE = BUDGET_DIR / "usage.yaml" +SUMMARY_FILE = BUDGET_DIR / "summary.yaml" PLAN_RETENTION_DEFAULT = 50 PLANS_SCHEMA_VERSION = 1 USAGE_SCHEMA_VERSION = 1 +SUMMARY_SCHEMA_VERSION = 1 _SNAPSHOT_FINGERPRINT_FIELDS = ( "stage", @@ -213,6 +215,150 @@ def read_usage_runs(root: str | Path) -> list[dict[str, Any]]: return list(payload.get("runs") or []) +def record_run_variance( + root: str | Path, + run_entry: dict[str, Any], +) -> dict[str, Any]: + """Compute and persist plan-vs-actual variance for the just-completed run. + + Reads the plan snapshot referenced by ``run_entry['snapshot_id']`` from + ``output/budget/plans.yaml``, derives call/token/cost variance, writes the + result to ``output/budget/summary.yaml`` (overwrite), and returns it. + + When no snapshot is referenced or the snapshot cannot be located, the + variance payload is still written with null comparison fields so the + consumer always sees a current summary. + """ + root_path = Path(root) + summary_path = root_path / SUMMARY_FILE + summary_path.parent.mkdir(parents=True, exist_ok=True) + snapshot_id = run_entry.get("snapshot_id") + snapshot = _lookup_snapshot(root_path, snapshot_id) if snapshot_id else None + + rollup = run_entry.get("rollup") or {} + actual_calls = int(rollup.get("total_calls") or 0) + actual_tokens = int(rollup.get("total_tokens") or 0) + actual_prompt_tokens = int(rollup.get("total_prompt_tokens") or 0) + actual_cost_known = _coerce_float(rollup.get("total_cost_usd_known")) or 0.0 + actual_cost_estimated = _coerce_float(rollup.get("total_cost_usd_estimated")) or 0.0 + actual_cost_total = round(actual_cost_known + actual_cost_estimated, 6) + + if snapshot is not None: + estimated_calls = int(snapshot.get("total_provider_calls_estimate") or 0) + estimated_prompt_tokens = int(snapshot.get("total_prompt_tokens_estimate") or 0) + estimated_cost = _coerce_float(snapshot.get("estimated_cost_usd")) + else: + estimated_calls = None + estimated_prompt_tokens = None + estimated_cost = None + + summary = { + "schema_version": SUMMARY_SCHEMA_VERSION, + "recorded_at": _now(), + "run_index": run_entry.get("run_index"), + "snapshot_id": snapshot_id, + "snapshot_resolved": snapshot is not None, + "calls": _variance_pair(estimated_calls, actual_calls), + "prompt_tokens": _variance_pair(estimated_prompt_tokens, actual_prompt_tokens), + "total_tokens": _variance_pair(estimated_prompt_tokens, actual_tokens), + "cost_usd": { + "estimated": estimated_cost, + "actual_known": actual_cost_known, + "actual_estimated_from_rates": actual_cost_estimated, + "actual_total": actual_cost_total, + **_variance_delta_ratio(estimated_cost, actual_cost_total), + }, + "per_workflow": _per_workflow_variance(snapshot, run_entry), + "duration_seconds": run_entry.get("duration_seconds"), + } + summary_path.write_text(yaml.safe_dump(summary, sort_keys=False), encoding="utf-8") + return summary + + +def read_run_variance(root: str | Path) -> dict[str, Any] | None: + path = Path(root) / SUMMARY_FILE + if not path.is_file(): + return None + try: + data = yaml.safe_load(path.read_text(encoding="utf-8")) + except yaml.YAMLError: + return None + return data if isinstance(data, dict) else None + + +def _lookup_snapshot(root: Path, snapshot_id: str) -> dict[str, Any] | None: + for snap in reversed(read_plan_snapshots(root)): + if snap.get("snapshot_id") == snapshot_id: + return snap + return None + + +def _variance_pair(estimated: int | None, actual: int) -> dict[str, Any]: + delta = None if estimated is None else actual - estimated + ratio = _safe_ratio(actual, estimated) + return { + "estimated": estimated, + "actual": actual, + "delta": delta, + "ratio": ratio, + } + + +def _variance_delta_ratio(estimated: float | None, actual: float) -> dict[str, Any]: + delta = None if estimated is None else round(actual - estimated, 6) + ratio = _safe_ratio(actual, estimated) + return {"delta": delta, "ratio": ratio} + + +def _safe_ratio(actual: float | int, estimated: float | int | None) -> float | None: + if estimated in (None, 0, 0.0): + return None + return round(float(actual) / float(estimated), 4) + + +def _per_workflow_variance( + snapshot: dict[str, Any] | None, run_entry: dict[str, Any] +) -> list[dict[str, Any]]: + actuals: dict[str, dict[str, int]] = {} + for bucket in run_entry.get("per_bucket") or []: + workflow_id = bucket.get("workflow_id") or "" + if not workflow_id: + continue + agg = actuals.setdefault( + workflow_id, {"calls": 0, "prompt_tokens": 0, "completion_tokens": 0} + ) + agg["calls"] += int(bucket.get("calls") or 0) + agg["prompt_tokens"] += int(bucket.get("prompt_tokens") or 0) + agg["completion_tokens"] += int(bucket.get("completion_tokens") or 0) + + estimates: dict[str, dict[str, int]] = {} + if snapshot is not None: + for entry in snapshot.get("per_workflow") or []: + workflow_id = entry.get("workflow_id") or "" + if not workflow_id: + continue + estimates[workflow_id] = { + "calls": int(entry.get("calls") or 0), + "prompt_words_estimate": int(entry.get("prompt_words_estimate") or 0), + } + + workflow_ids = sorted(set(actuals) | set(estimates)) + out: list[dict[str, Any]] = [] + for workflow_id in workflow_ids: + actual = actuals.get(workflow_id, {"calls": 0, "prompt_tokens": 0}) + estimate = estimates.get(workflow_id) + estimated_calls = estimate["calls"] if estimate else None + out.append( + { + "workflow_id": workflow_id, + "calls": _variance_pair(estimated_calls, actual["calls"]), + "prompt_tokens_actual": actual["prompt_tokens"], + "prompt_words_estimate": estimate["prompt_words_estimate"] if estimate else None, + } + ) + return out + + def load_rate_table(workspace: Path | str | None = None) -> dict[str, dict[str, float]]: """Load the model rate table, with optional workspace override. diff --git a/src/infospace_bench/generator.py b/src/infospace_bench/generator.py index e73e57c..015e57b 100644 --- a/src/infospace_bench/generator.py +++ b/src/infospace_bench/generator.py @@ -21,8 +21,10 @@ from .openrouter import OpenRouterAssistedGenerationAdapter from .budget import ( latest_plan_snapshot_id, make_cost_resolver, + read_run_variance, record_plan_snapshot, record_run_usage, + record_run_variance, ) from .source_intake import SourceChunk, normalize_source from .workflow import ( @@ -325,6 +327,31 @@ def _read_profile_name(root: Path) -> str: return str(state.get("profile") or DEFAULT_PROFILE) +def _format_variance_line(summary: dict[str, Any] | None) -> str: + if not summary: + return "" + calls = summary.get("calls") or {} + cost = summary.get("cost_usd") or {} + parts: list[str] = [] + calls_actual = calls.get("actual") + calls_estimated = calls.get("estimated") + if calls_actual is not None: + if calls_estimated is not None: + parts.append(f"calls {calls_actual}/{calls_estimated}") + else: + parts.append(f"calls {calls_actual} (no plan)") + actual_cost = cost.get("actual_total") + estimated_cost = cost.get("estimated") + if actual_cost is not None: + if estimated_cost is not None: + parts.append(f"cost ${actual_cost:.4f}/${estimated_cost:.4f}") + elif actual_cost > 0: + parts.append(f"cost ${actual_cost:.4f}") + if not parts: + return "" + return "- " + " · ".join(parts) + + def _workspace_for(root: Path) -> Path: """Resolve the workspace directory that contains this infospace. @@ -373,6 +400,18 @@ def run_generation( workflow_results.append(result.to_dict()) state = _mark_workflow_completed(state, result) + if workflow_results: + duration_seconds = round(_monotonic() - monotonic_start, 3) + usage_entry = record_run_usage( + root_path, + workflow_results, + snapshot_id=latest_plan_snapshot_id(root_path), + duration_seconds=duration_seconds, + started_at=started_wall.isoformat(), + cost_resolver=make_cost_resolver(_workspace_for(root_path)), + ) + record_run_variance(root_path, usage_entry) + metrics: dict[str, Any] = {} snapshot_id = "" if stage_key in {"all", "metrics"}: @@ -398,16 +437,6 @@ def run_generation( } ) _write_state(root_path, state) - if workflow_results: - duration_seconds = round(_monotonic() - monotonic_start, 3) - record_run_usage( - root_path, - workflow_results, - snapshot_id=latest_plan_snapshot_id(root_path), - duration_seconds=duration_seconds, - started_at=started_wall.isoformat(), - cost_resolver=make_cost_resolver(_workspace_for(root_path)), - ) return GenerationRunResult( root=str(root_path), status="completed", @@ -449,6 +478,7 @@ def status_generation(root: str | Path) -> dict[str, Any]: "stale_profile": stale_profile, "completed": bool(state.get("completed", False)), "stage_status": state.get("stage_status", {}), + "budget_summary": read_run_variance(infospace.root), } @@ -636,22 +666,24 @@ def _record_metrics(root: Path) -> Any: def _write_generation_report(root: Path, metrics: dict[str, Any], snapshot_id: str) -> None: status = status_generation(root) - text = "\n".join( - [ - "# Generation Report", - "", - f"Snapshot: {snapshot_id}", - f"Sources: {status['source_chunk_count']}", - f"Entities: {status['entity_count']}", - f"Relations: {status['relation_count']}", - f"Evaluations: {status['evaluation_count']}", - "", - "## Metrics", - "", - *[f"- {name}: {value}" for name, value in sorted(metrics.items())], - "", - ] - ) + lines = [ + "# Generation Report", + "", + f"Snapshot: {snapshot_id}", + f"Sources: {status['source_chunk_count']}", + f"Entities: {status['entity_count']}", + f"Relations: {status['relation_count']}", + f"Evaluations: {status['evaluation_count']}", + "", + "## Metrics", + "", + *[f"- {name}: {value}" for name, value in sorted(metrics.items())], + "", + ] + variance_line = _format_variance_line(status.get("budget_summary")) + if variance_line: + lines.extend(["## Plan variance", "", variance_line, ""]) + text = "\n".join(lines) path = root / "reports" / "generation-summary.md" path.parent.mkdir(parents=True, exist_ok=True) path.write_text(text, encoding="utf-8") diff --git a/tests/test_budget_registry.py b/tests/test_budget_registry.py index 4526d2e..9a5d2b7 100644 --- a/tests/test_budget_registry.py +++ b/tests/test_budget_registry.py @@ -382,6 +382,113 @@ def test_record_run_usage_fills_estimated_cost_via_resolver(tmp_path: Path) -> N assert entry["rollup"]["total_cost_usd_estimated"] == round(0.0009, 6) +def test_record_run_variance_computes_plan_vs_actual(tmp_path: Path) -> None: + root = _build_infospace(tmp_path) + from infospace_bench.budget import record_run_variance + + run_entry = { + "run_index": 1, + "snapshot_id": "abc123", + "rollup": { + "total_calls": 10, + "total_prompt_tokens": 1500, + "total_completion_tokens": 500, + "total_tokens": 2000, + "total_cost_usd_known": 0.1, + "total_cost_usd_estimated": 0.05, + }, + "per_bucket": [ + {"workflow_id": "generic-source-entities", "calls": 6, "prompt_tokens": 1200, "completion_tokens": 400}, + {"workflow_id": "generic-source-summary", "calls": 4, "prompt_tokens": 300, "completion_tokens": 100}, + ], + "duration_seconds": 3.5, + } + + # No snapshot persisted yet — variance fields fall back to null + summary = record_run_variance(root, run_entry) + + assert summary["snapshot_id"] == "abc123" + assert summary["snapshot_resolved"] is False + assert summary["calls"]["estimated"] is None + assert summary["calls"]["actual"] == 10 + assert summary["cost_usd"]["actual_known"] == 0.1 + assert summary["cost_usd"]["actual_estimated_from_rates"] == 0.05 + assert summary["cost_usd"]["actual_total"] == round(0.15, 6) + + +def test_record_run_variance_resolves_snapshot_and_computes_ratios(tmp_path: Path) -> None: + from infospace_bench.budget import record_plan_snapshot, record_run_variance + + root = _build_infospace(tmp_path) + plan_summary = plan_generation(root, cost_per_1k_tokens=0.5, persist=False) + plan_summary["total_provider_calls_estimate"] = 8 + plan_summary["total_prompt_tokens_estimate"] = 1000 + plan_summary["estimated_cost_usd"] = 0.5 + snapshot_id = record_plan_snapshot(root, plan_summary) + + run_entry = { + "run_index": 1, + "snapshot_id": snapshot_id, + "rollup": { + "total_calls": 10, + "total_prompt_tokens": 1500, + "total_completion_tokens": 500, + "total_tokens": 2000, + "total_cost_usd_known": 0.0, + "total_cost_usd_estimated": 0.625, + }, + "per_bucket": [], + } + + summary = record_run_variance(root, run_entry) + + assert summary["snapshot_resolved"] is True + assert summary["calls"]["estimated"] == 8 + assert summary["calls"]["actual"] == 10 + assert summary["calls"]["delta"] == 2 + assert summary["calls"]["ratio"] == 1.25 + assert summary["prompt_tokens"]["delta"] == 500 + assert summary["cost_usd"]["estimated"] == 0.5 + assert summary["cost_usd"]["actual_total"] == 0.625 + assert summary["cost_usd"]["delta"] == 0.125 + assert summary["cost_usd"]["ratio"] == 1.25 + + +def test_run_generation_persists_variance_and_status_surfaces_it(tmp_path: Path) -> None: + from infospace_bench.budget import SUMMARY_FILE + from infospace_bench.generator import run_generation, status_generation + + root = _build_infospace(tmp_path) + fixture = tmp_path / "responses.yaml" + _write_minimal_fixture(fixture) + plan_payload = plan_generation(root) + + run_generation(root, fixture_responses=fixture) + status = status_generation(root) + + assert (root / SUMMARY_FILE).is_file() + assert status["budget_summary"] is not None + assert status["budget_summary"]["snapshot_id"] == plan_payload["snapshot_id"] + assert status["budget_summary"]["snapshot_resolved"] is True + # Fixture runs report zero known cost; per_workflow variance is keyed by workflow_id + per_workflow = {item["workflow_id"]: item for item in status["budget_summary"]["per_workflow"]} + assert "generic-source-entities" in per_workflow + + +def test_generation_report_includes_variance_line(tmp_path: Path) -> None: + from infospace_bench.generator import run_generation + + root = _build_infospace(tmp_path) + fixture = tmp_path / "responses.yaml" + _write_minimal_fixture(fixture) + plan_generation(root) + run_generation(root, fixture_responses=fixture) + + report = (root / "reports" / "generation-summary.md").read_text(encoding="utf-8") + assert "## Plan variance" in report + assert "calls" in report.lower() + + def test_plan_cli_writes_snapshot(tmp_path: Path) -> None: root = _build_infospace(tmp_path) env = os.environ.copy() diff --git a/workplans/IB-WP-0019-budget-and-usage-registry.md b/workplans/IB-WP-0019-budget-and-usage-registry.md index 160edde..6baffd2 100644 --- a/workplans/IB-WP-0019-budget-and-usage-registry.md +++ b/workplans/IB-WP-0019-budget-and-usage-registry.md @@ -137,7 +137,7 @@ state_hub_task_id: "688c590d-8885-455e-bcf6-61409a45e001" ```task id: IB-WP-0019-T04 -status: todo +status: done priority: medium state_hub_task_id: "c6adc4fb-9062-4c81-a0b2-98d3166e047d" ```