diff --git a/src/infospace_bench/budget.py b/src/infospace_bench/budget.py
new file mode 100644
index 0000000..0172f63
--- /dev/null
+++ b/src/infospace_bench/budget.py
@@ -0,0 +1,141 @@
+"""
+Budget and usage registry for infospaces.
+
+Layer 1 of the three-layer design (see IB-WP-0019):
+- This module persists per-infospace plan snapshots, usage rollups, and
+ plan-vs-actual variance under `output/budget/`.
+- Layer 2 (cross-application observations for adaptive routing) lives in
+ llm-connect's QualityLedger (LLM-WP-0004).
+- Layer 3 (organizational rollup) is state-hub `record_token_event`.
+"""
+
+from __future__ import annotations
+
+import hashlib
+import json
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Any
+
+import yaml
+
+BUDGET_DIR = Path("output/budget")
+PLANS_FILE = BUDGET_DIR / "plans.yaml"
+PLAN_RETENTION_DEFAULT = 50
+PLANS_SCHEMA_VERSION = 1
+
+_SNAPSHOT_FINGERPRINT_FIELDS = (
+ "stage",
+ "selected_chunk_count",
+ "selected_chunk_ids",
+ "selected_chapter_numbers",
+ "total_provider_calls_estimate",
+ "total_prompt_tokens_estimate",
+ "estimated_cost_usd",
+ "cost_per_1k_tokens",
+ "max_calls",
+ "cost_cap",
+)
+
+
+def record_plan_snapshot(
+ root: str | Path,
+ summary: dict[str, Any],
+ *,
+ retention: int = PLAN_RETENTION_DEFAULT,
+) -> str:
+ """Persist a compact plan summary to ``output/budget/plans.yaml``.
+
+ Returns the snapshot_id assigned to this entry. If a snapshot with the
+ same fingerprint already exists at the head of the list, its
+ ``recorded_at`` is refreshed instead of producing a duplicate entry.
+ """
+ root_path = Path(root)
+ budget_path = root_path / PLANS_FILE
+ budget_path.parent.mkdir(parents=True, exist_ok=True)
+ snapshot = _build_snapshot(summary)
+ payload = _read_plans(budget_path)
+ snapshots = payload.get("snapshots") or []
+ pruned_count = int(payload.get("pruned_count") or 0)
+ if snapshots and snapshots[-1].get("snapshot_id") == snapshot["snapshot_id"]:
+ snapshots[-1]["recorded_at"] = snapshot["recorded_at"]
+ else:
+ snapshots.append(snapshot)
+ if retention > 0 and len(snapshots) > retention:
+ overflow = len(snapshots) - retention
+ pruned_count += overflow
+ snapshots = snapshots[overflow:]
+ _write_plans(
+ budget_path,
+ {
+ "schema_version": PLANS_SCHEMA_VERSION,
+ "pruned_count": pruned_count,
+ "snapshots": snapshots,
+ },
+ )
+ return snapshot["snapshot_id"]
+
+
+def read_plan_snapshots(root: str | Path) -> list[dict[str, Any]]:
+ """Return the persisted plan snapshots in chronological order."""
+ payload = _read_plans(Path(root) / PLANS_FILE)
+ return list(payload.get("snapshots") or [])
+
+
+def _build_snapshot(summary: dict[str, Any]) -> dict[str, Any]:
+ filters = {
+ "stage": summary.get("stage"),
+ "chapter_filter": summary.get("chapter_filter"),
+ "chunk_filter": summary.get("chunk_filter"),
+ "from_chapter": summary.get("from_chapter"),
+ "to_chapter": summary.get("to_chapter"),
+ }
+ fingerprint_source = {
+ key: summary.get(key) for key in _SNAPSHOT_FINGERPRINT_FIELDS
+ }
+ fingerprint_source["filters"] = filters
+ snapshot_id = _fingerprint(fingerprint_source)
+ return {
+ "snapshot_id": snapshot_id,
+ "recorded_at": _now(),
+ "stage": summary.get("stage"),
+ "filters": filters,
+ "selected_chunk_count": summary.get("selected_chunk_count"),
+ "selected_chunk_ids": list(summary.get("selected_chunk_ids") or []),
+ "selected_chapter_numbers": list(summary.get("selected_chapter_numbers") or []),
+ "per_workflow": list(summary.get("per_workflow") or []),
+ "total_provider_calls_estimate": summary.get("total_provider_calls_estimate"),
+ "total_prompt_tokens_estimate": summary.get("total_prompt_tokens_estimate"),
+ "total_prompt_words_estimate": summary.get("total_prompt_words_estimate"),
+ "estimated_cost_usd": summary.get("estimated_cost_usd"),
+ "cost_per_1k_tokens": summary.get("cost_per_1k_tokens"),
+ "max_calls": summary.get("max_calls"),
+ "cost_cap": summary.get("cost_cap"),
+ "exceeds_max_calls": bool(summary.get("exceeds_max_calls")),
+ "exceeds_cost_cap": bool(summary.get("exceeds_cost_cap")),
+ }
+
+
+def _fingerprint(payload: dict[str, Any]) -> str:
+ serialised = json.dumps(payload, sort_keys=True, default=str)
+ return hashlib.sha256(serialised.encode("utf-8")).hexdigest()[:12]
+
+
+def _read_plans(path: Path) -> dict[str, Any]:
+ if not path.is_file():
+ return {"schema_version": PLANS_SCHEMA_VERSION, "pruned_count": 0, "snapshots": []}
+ try:
+ data = yaml.safe_load(path.read_text(encoding="utf-8"))
+ except yaml.YAMLError:
+ return {"schema_version": PLANS_SCHEMA_VERSION, "pruned_count": 0, "snapshots": []}
+ if not isinstance(data, dict):
+ return {"schema_version": PLANS_SCHEMA_VERSION, "pruned_count": 0, "snapshots": []}
+ return data
+
+
+def _write_plans(path: Path, payload: dict[str, Any]) -> None:
+ path.write_text(yaml.safe_dump(payload, sort_keys=False), encoding="utf-8")
+
+
+def _now() -> str:
+ return datetime.now(timezone.utc).isoformat()
diff --git a/src/infospace_bench/generator.py b/src/infospace_bench/generator.py
index b997fe4..b921aa1 100644
--- a/src/infospace_bench/generator.py
+++ b/src/infospace_bench/generator.py
@@ -15,6 +15,7 @@ from .evaluation_io import read_entity_evaluations
from .history import get_history, read_metrics_file, record_check_results
from .lifecycle import create_infospace, load_infospace, register_artifact
from .openrouter import OpenRouterAssistedGenerationAdapter
+from .budget import record_plan_snapshot
from .source_intake import SourceChunk, normalize_source
from .workflow import (
AssistedGenerationAdapter,
@@ -113,6 +114,7 @@ def plan_generation(
words_per_token: float = WORDS_PER_TOKEN_DEFAULT,
entities_per_chunk: int = ENTITIES_PER_CHUNK_ESTIMATE,
full: bool = False,
+ persist: bool = True,
) -> dict[str, Any]:
root_path = Path(root)
status = status_generation(root_path)
@@ -129,9 +131,15 @@ def plan_generation(
words_per_token=words_per_token,
entities_per_chunk=entities_per_chunk,
)
+ summary["chapter_filter"] = list(chapter_filter) if chapter_filter else None
+ summary["chunk_filter"] = list(chunk_filter) if chunk_filter else None
+ summary["from_chapter"] = from_chapter
+ summary["to_chapter"] = to_chapter
summary["root"] = str(root_path)
summary["stale"] = status["stale"]
summary["status"] = "planned"
+ if persist:
+ summary["snapshot_id"] = record_plan_snapshot(root_path, summary)
if not full:
return summary
workflow_ids = _workflow_ids_for_stage(stage)
diff --git a/tests/test_budget_registry.py b/tests/test_budget_registry.py
new file mode 100644
index 0000000..8706d1a
--- /dev/null
+++ b/tests/test_budget_registry.py
@@ -0,0 +1,179 @@
+import json
+import os
+import subprocess
+import sys
+import zipfile
+from pathlib import Path
+
+import yaml
+
+from infospace_bench.budget import (
+ PLAN_RETENTION_DEFAULT,
+ PLANS_FILE,
+ PLANS_SCHEMA_VERSION,
+ read_plan_snapshots,
+ record_plan_snapshot,
+)
+from infospace_bench.generator import init_generation_infospace, plan_generation
+
+
+CONTAINER_XML = """
+
Body of chapter {label} with " + " ".join(f"word{n}" for n in range(40)) + ".
", + ) + + +def _build_infospace(tmp_path: Path) -> Path: + book = tmp_path / "book.epub" + _write_three_chapter_epub(book) + infospace = init_generation_infospace( + tmp_path, book, "budget-test", name="Budget Test", profile="general-knowledge" + ) + return infospace.root + + +def test_record_plan_snapshot_writes_yaml_with_stable_id(tmp_path: Path) -> None: + root = _build_infospace(tmp_path) + + summary = plan_generation(root, persist=False) + snapshot_id_1 = record_plan_snapshot(root, summary) + snapshot_id_2 = record_plan_snapshot(root, summary) + + persisted = (root / PLANS_FILE).read_text(encoding="utf-8") + data = yaml.safe_load(persisted) + + assert data["schema_version"] == PLANS_SCHEMA_VERSION + assert data["pruned_count"] == 0 + assert snapshot_id_1 == snapshot_id_2, "same summary must yield same snapshot_id" + # Duplicate writes refresh recorded_at instead of stacking + assert len(data["snapshots"]) == 1 + assert data["snapshots"][0]["snapshot_id"] == snapshot_id_1 + + +def test_different_filters_produce_distinct_snapshots(tmp_path: Path) -> None: + root = _build_infospace(tmp_path) + + full_plan = plan_generation(root, persist=False) + chapter_only = plan_generation(root, from_chapter=2, to_chapter=2, persist=False) + record_plan_snapshot(root, full_plan) + record_plan_snapshot(root, chapter_only) + + snapshots = read_plan_snapshots(root) + assert len(snapshots) == 2 + ids = {snap["snapshot_id"] for snap in snapshots} + assert len(ids) == 2 + # Filter values are echoed back into the snapshot + chapter_snapshot = next(s for s in snapshots if s["selected_chunk_count"] == 1) + assert chapter_snapshot["filters"]["from_chapter"] == 2 + assert chapter_snapshot["filters"]["to_chapter"] == 2 + + +def test_plan_generation_persists_snapshot_by_default(tmp_path: Path) -> None: + root = _build_infospace(tmp_path) + + result = plan_generation(root, from_chapter=1, to_chapter=2) + + assert "snapshot_id" in result + assert (root / PLANS_FILE).is_file() + snapshots = read_plan_snapshots(root) + assert len(snapshots) == 1 + assert snapshots[0]["snapshot_id"] == result["snapshot_id"] + + +def test_plan_generation_persist_false_skips_write(tmp_path: Path) -> None: + root = _build_infospace(tmp_path) + + plan_generation(root, persist=False) + + assert not (root / PLANS_FILE).exists() + + +def test_plan_snapshot_retention_prunes_old_entries(tmp_path: Path) -> None: + root = _build_infospace(tmp_path) + + # Produce 5 distinct snapshots and cap retention at 3. + for chapter in (1, 2, 3, None, None): + kwargs = {"from_chapter": chapter, "to_chapter": chapter} if chapter else {} + summary = plan_generation(root, persist=False, **kwargs) + if not chapter: + # vary another field to avoid duplicate refresh + summary["max_calls"] = (summary.get("max_calls") or 0) + 1 + summary["exceeds_max_calls"] = False + record_plan_snapshot(root, summary, retention=3) + + data = yaml.safe_load((root / PLANS_FILE).read_text(encoding="utf-8")) + assert len(data["snapshots"]) == 3 + assert data["pruned_count"] >= 1 + + +def test_plan_cli_writes_snapshot(tmp_path: Path) -> None: + root = _build_infospace(tmp_path) + env = os.environ.copy() + env["PYTHONPATH"] = "src:/home/worsch/markitect-tool/src" + + result = subprocess.run( + [ + sys.executable, + "-m", + "infospace_bench", + "generate", + "plan", + str(root), + "--from-chapter", + "1", + "--to-chapter", + "2", + "--cost-per-1k", + "0.5", + ], + check=False, + env=env, + text=True, + capture_output=True, + ) + + assert result.returncode == 0, result.stderr + payload = json.loads(result.stdout) + assert "snapshot_id" in payload + snapshots = read_plan_snapshots(root) + assert len(snapshots) == 1 + assert snapshots[0]["filters"]["from_chapter"] == 1 + assert snapshots[0]["filters"]["to_chapter"] == 2 diff --git a/workplans/IB-WP-0019-budget-and-usage-registry.md b/workplans/IB-WP-0019-budget-and-usage-registry.md index 600215d..75e3a5d 100644 --- a/workplans/IB-WP-0019-budget-and-usage-registry.md +++ b/workplans/IB-WP-0019-budget-and-usage-registry.md @@ -15,6 +15,7 @@ related_workplans: - IB-WP-0014 - IB-WP-0018 - LLM-WP-0004 +state_hub_workstream_id: "063c6285-a56e-476b-8666-109d6fa35858" --- # IB-WP-0019 — Budget and Usage Registry for Infospaces @@ -76,8 +77,9 @@ Three layers, each owned by a different repo: ```task id: IB-WP-0019-T01 -status: todo +status: done priority: high +state_hub_task_id: "7f1a4e0a-c1ad-49f3-aad1-6946de9b1219" ``` - Append the compact `plan_generation_summary` payload to @@ -95,6 +97,7 @@ priority: high id: IB-WP-0019-T02 status: todo priority: high +state_hub_task_id: "a612f8d4-f96d-4fae-9aa6-66a7946414f5" ``` - On `run` and `resume` completion, scan the run-record YAML written by @@ -116,6 +119,7 @@ priority: high id: IB-WP-0019-T03 status: todo priority: high +state_hub_task_id: "688c590d-8885-455e-bcf6-61409a45e001" ``` - Add `docs/model-rates.yaml` with `model -> {prompt_per_1k, @@ -135,6 +139,7 @@ priority: high id: IB-WP-0019-T04 status: todo priority: medium +state_hub_task_id: "c6adc4fb-9062-4c81-a0b2-98d3166e047d" ``` - Compute a small variance record on each run: actual_calls / @@ -154,6 +159,7 @@ priority: medium id: IB-WP-0019-T05 status: todo priority: medium +state_hub_task_id: "968bca1d-63ff-4818-83bb-ca314b1e633c" ``` - After each completed run, call state-hub `record_token_event` with @@ -173,6 +179,7 @@ priority: medium id: IB-WP-0019-T06 status: todo priority: medium +state_hub_task_id: "7cb34bfc-c562-4dda-a6d4-b44158644e19" ``` - Add `infospace-bench budget list