IB-WP-0019-T04: plan-vs-actual variance and surfacing

After every generate run, compute variance between the executing plan
snapshot and the just-recorded usage rollup, persist it to
output/budget/summary.yaml (overwrite-on-run), and surface it both in
the generate status JSON (new budget_summary field) and as a "Plan
variance" line in reports/generation-summary.md.

Variance fields: calls / prompt_tokens / total_tokens each carry
{estimated, actual, delta, ratio}; cost_usd carries {estimated,
actual_known, actual_estimated_from_rates, actual_total, delta, ratio};
per_workflow rolls the per-bucket usage up to the same workflow_id grain
the plan reports. Runs whose snapshot_id cannot be resolved (no prior
plan, or pruned from the retention window) still record a variance row
with null comparison fields and snapshot_resolved=false, so the
consumer always sees a current summary.

Reordered run_generation so usage and variance are written before the
generation report, allowing the report to embed the variance line on
the same pass.

110 tests pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-17 20:06:19 +02:00
parent a4dde53fc3
commit d4c9c56f5c
4 changed files with 312 additions and 27 deletions

View File

@@ -25,9 +25,11 @@ _PACKAGE_RATES_PATH = Path(__file__).parent / "model_rates.yaml"
BUDGET_DIR = Path("output/budget")
PLANS_FILE = BUDGET_DIR / "plans.yaml"
USAGE_FILE = BUDGET_DIR / "usage.yaml"
SUMMARY_FILE = BUDGET_DIR / "summary.yaml"
PLAN_RETENTION_DEFAULT = 50
PLANS_SCHEMA_VERSION = 1
USAGE_SCHEMA_VERSION = 1
SUMMARY_SCHEMA_VERSION = 1
_SNAPSHOT_FINGERPRINT_FIELDS = (
"stage",
@@ -213,6 +215,150 @@ def read_usage_runs(root: str | Path) -> list[dict[str, Any]]:
return list(payload.get("runs") or [])
def record_run_variance(
root: str | Path,
run_entry: dict[str, Any],
) -> dict[str, Any]:
"""Compute and persist plan-vs-actual variance for the just-completed run.
Reads the plan snapshot referenced by ``run_entry['snapshot_id']`` from
``output/budget/plans.yaml``, derives call/token/cost variance, writes the
result to ``output/budget/summary.yaml`` (overwrite), and returns it.
When no snapshot is referenced or the snapshot cannot be located, the
variance payload is still written with null comparison fields so the
consumer always sees a current summary.
"""
root_path = Path(root)
summary_path = root_path / SUMMARY_FILE
summary_path.parent.mkdir(parents=True, exist_ok=True)
snapshot_id = run_entry.get("snapshot_id")
snapshot = _lookup_snapshot(root_path, snapshot_id) if snapshot_id else None
rollup = run_entry.get("rollup") or {}
actual_calls = int(rollup.get("total_calls") or 0)
actual_tokens = int(rollup.get("total_tokens") or 0)
actual_prompt_tokens = int(rollup.get("total_prompt_tokens") or 0)
actual_cost_known = _coerce_float(rollup.get("total_cost_usd_known")) or 0.0
actual_cost_estimated = _coerce_float(rollup.get("total_cost_usd_estimated")) or 0.0
actual_cost_total = round(actual_cost_known + actual_cost_estimated, 6)
if snapshot is not None:
estimated_calls = int(snapshot.get("total_provider_calls_estimate") or 0)
estimated_prompt_tokens = int(snapshot.get("total_prompt_tokens_estimate") or 0)
estimated_cost = _coerce_float(snapshot.get("estimated_cost_usd"))
else:
estimated_calls = None
estimated_prompt_tokens = None
estimated_cost = None
summary = {
"schema_version": SUMMARY_SCHEMA_VERSION,
"recorded_at": _now(),
"run_index": run_entry.get("run_index"),
"snapshot_id": snapshot_id,
"snapshot_resolved": snapshot is not None,
"calls": _variance_pair(estimated_calls, actual_calls),
"prompt_tokens": _variance_pair(estimated_prompt_tokens, actual_prompt_tokens),
"total_tokens": _variance_pair(estimated_prompt_tokens, actual_tokens),
"cost_usd": {
"estimated": estimated_cost,
"actual_known": actual_cost_known,
"actual_estimated_from_rates": actual_cost_estimated,
"actual_total": actual_cost_total,
**_variance_delta_ratio(estimated_cost, actual_cost_total),
},
"per_workflow": _per_workflow_variance(snapshot, run_entry),
"duration_seconds": run_entry.get("duration_seconds"),
}
summary_path.write_text(yaml.safe_dump(summary, sort_keys=False), encoding="utf-8")
return summary
def read_run_variance(root: str | Path) -> dict[str, Any] | None:
path = Path(root) / SUMMARY_FILE
if not path.is_file():
return None
try:
data = yaml.safe_load(path.read_text(encoding="utf-8"))
except yaml.YAMLError:
return None
return data if isinstance(data, dict) else None
def _lookup_snapshot(root: Path, snapshot_id: str) -> dict[str, Any] | None:
for snap in reversed(read_plan_snapshots(root)):
if snap.get("snapshot_id") == snapshot_id:
return snap
return None
def _variance_pair(estimated: int | None, actual: int) -> dict[str, Any]:
delta = None if estimated is None else actual - estimated
ratio = _safe_ratio(actual, estimated)
return {
"estimated": estimated,
"actual": actual,
"delta": delta,
"ratio": ratio,
}
def _variance_delta_ratio(estimated: float | None, actual: float) -> dict[str, Any]:
delta = None if estimated is None else round(actual - estimated, 6)
ratio = _safe_ratio(actual, estimated)
return {"delta": delta, "ratio": ratio}
def _safe_ratio(actual: float | int, estimated: float | int | None) -> float | None:
if estimated in (None, 0, 0.0):
return None
return round(float(actual) / float(estimated), 4)
def _per_workflow_variance(
snapshot: dict[str, Any] | None, run_entry: dict[str, Any]
) -> list[dict[str, Any]]:
actuals: dict[str, dict[str, int]] = {}
for bucket in run_entry.get("per_bucket") or []:
workflow_id = bucket.get("workflow_id") or ""
if not workflow_id:
continue
agg = actuals.setdefault(
workflow_id, {"calls": 0, "prompt_tokens": 0, "completion_tokens": 0}
)
agg["calls"] += int(bucket.get("calls") or 0)
agg["prompt_tokens"] += int(bucket.get("prompt_tokens") or 0)
agg["completion_tokens"] += int(bucket.get("completion_tokens") or 0)
estimates: dict[str, dict[str, int]] = {}
if snapshot is not None:
for entry in snapshot.get("per_workflow") or []:
workflow_id = entry.get("workflow_id") or ""
if not workflow_id:
continue
estimates[workflow_id] = {
"calls": int(entry.get("calls") or 0),
"prompt_words_estimate": int(entry.get("prompt_words_estimate") or 0),
}
workflow_ids = sorted(set(actuals) | set(estimates))
out: list[dict[str, Any]] = []
for workflow_id in workflow_ids:
actual = actuals.get(workflow_id, {"calls": 0, "prompt_tokens": 0})
estimate = estimates.get(workflow_id)
estimated_calls = estimate["calls"] if estimate else None
out.append(
{
"workflow_id": workflow_id,
"calls": _variance_pair(estimated_calls, actual["calls"]),
"prompt_tokens_actual": actual["prompt_tokens"],
"prompt_words_estimate": estimate["prompt_words_estimate"] if estimate else None,
}
)
return out
def load_rate_table(workspace: Path | str | None = None) -> dict[str, dict[str, float]]:
"""Load the model rate table, with optional workspace override.

View File

@@ -21,8 +21,10 @@ from .openrouter import OpenRouterAssistedGenerationAdapter
from .budget import (
latest_plan_snapshot_id,
make_cost_resolver,
read_run_variance,
record_plan_snapshot,
record_run_usage,
record_run_variance,
)
from .source_intake import SourceChunk, normalize_source
from .workflow import (
@@ -325,6 +327,31 @@ def _read_profile_name(root: Path) -> str:
return str(state.get("profile") or DEFAULT_PROFILE)
def _format_variance_line(summary: dict[str, Any] | None) -> str:
if not summary:
return ""
calls = summary.get("calls") or {}
cost = summary.get("cost_usd") or {}
parts: list[str] = []
calls_actual = calls.get("actual")
calls_estimated = calls.get("estimated")
if calls_actual is not None:
if calls_estimated is not None:
parts.append(f"calls {calls_actual}/{calls_estimated}")
else:
parts.append(f"calls {calls_actual} (no plan)")
actual_cost = cost.get("actual_total")
estimated_cost = cost.get("estimated")
if actual_cost is not None:
if estimated_cost is not None:
parts.append(f"cost ${actual_cost:.4f}/${estimated_cost:.4f}")
elif actual_cost > 0:
parts.append(f"cost ${actual_cost:.4f}")
if not parts:
return ""
return "- " + " · ".join(parts)
def _workspace_for(root: Path) -> Path:
"""Resolve the workspace directory that contains this infospace.
@@ -373,6 +400,18 @@ def run_generation(
workflow_results.append(result.to_dict())
state = _mark_workflow_completed(state, result)
if workflow_results:
duration_seconds = round(_monotonic() - monotonic_start, 3)
usage_entry = record_run_usage(
root_path,
workflow_results,
snapshot_id=latest_plan_snapshot_id(root_path),
duration_seconds=duration_seconds,
started_at=started_wall.isoformat(),
cost_resolver=make_cost_resolver(_workspace_for(root_path)),
)
record_run_variance(root_path, usage_entry)
metrics: dict[str, Any] = {}
snapshot_id = ""
if stage_key in {"all", "metrics"}:
@@ -398,16 +437,6 @@ def run_generation(
}
)
_write_state(root_path, state)
if workflow_results:
duration_seconds = round(_monotonic() - monotonic_start, 3)
record_run_usage(
root_path,
workflow_results,
snapshot_id=latest_plan_snapshot_id(root_path),
duration_seconds=duration_seconds,
started_at=started_wall.isoformat(),
cost_resolver=make_cost_resolver(_workspace_for(root_path)),
)
return GenerationRunResult(
root=str(root_path),
status="completed",
@@ -449,6 +478,7 @@ def status_generation(root: str | Path) -> dict[str, Any]:
"stale_profile": stale_profile,
"completed": bool(state.get("completed", False)),
"stage_status": state.get("stage_status", {}),
"budget_summary": read_run_variance(infospace.root),
}
@@ -636,22 +666,24 @@ def _record_metrics(root: Path) -> Any:
def _write_generation_report(root: Path, metrics: dict[str, Any], snapshot_id: str) -> None:
status = status_generation(root)
text = "\n".join(
[
"# Generation Report",
"",
f"Snapshot: {snapshot_id}",
f"Sources: {status['source_chunk_count']}",
f"Entities: {status['entity_count']}",
f"Relations: {status['relation_count']}",
f"Evaluations: {status['evaluation_count']}",
"",
"## Metrics",
"",
*[f"- {name}: {value}" for name, value in sorted(metrics.items())],
"",
]
)
lines = [
"# Generation Report",
"",
f"Snapshot: {snapshot_id}",
f"Sources: {status['source_chunk_count']}",
f"Entities: {status['entity_count']}",
f"Relations: {status['relation_count']}",
f"Evaluations: {status['evaluation_count']}",
"",
"## Metrics",
"",
*[f"- {name}: {value}" for name, value in sorted(metrics.items())],
"",
]
variance_line = _format_variance_line(status.get("budget_summary"))
if variance_line:
lines.extend(["## Plan variance", "", variance_line, ""])
text = "\n".join(lines)
path = root / "reports" / "generation-summary.md"
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(text, encoding="utf-8")