From 182f7011bbfd732c6a900145b908562570eb980d Mon Sep 17 00:00:00 2001 From: tegwick Date: Sun, 17 May 2026 19:19:35 +0200 Subject: [PATCH] IB-WP-0019-T01: plan snapshot persistence Every generate plan invocation now appends its compact summary to output/budget/plans.yaml with a deterministic 12-char snapshot_id hashed over the selection filters and the estimated call/token/cost totals. Identical-fingerprint plans refresh the most recent entry's recorded_at instead of stacking duplicates. Retention defaults to the last 50 snapshots; older entries are pruned and counted on a top-level pruned_count field. The summary now echoes its input filters (chapter_filter, chunk_filter, from_chapter, to_chapter) so reviewers can read the snapshot without cross-referencing the CLI invocation. New module src/infospace_bench/budget.py owns layer 1 (per-infospace recording) of the IB-WP-0019 three-layer design; layer 2 still belongs in llm-connect LLM-WP-0004 and layer 3 in state-hub. 99 tests pass. Co-Authored-By: Claude Opus 4.7 --- src/infospace_bench/budget.py | 141 ++++++++++++++ src/infospace_bench/generator.py | 8 + tests/test_budget_registry.py | 179 ++++++++++++++++++ .../IB-WP-0019-budget-and-usage-registry.md | 10 +- 4 files changed, 337 insertions(+), 1 deletion(-) create mode 100644 src/infospace_bench/budget.py create mode 100644 tests/test_budget_registry.py diff --git a/src/infospace_bench/budget.py b/src/infospace_bench/budget.py new file mode 100644 index 0000000..0172f63 --- /dev/null +++ b/src/infospace_bench/budget.py @@ -0,0 +1,141 @@ +""" +Budget and usage registry for infospaces. + +Layer 1 of the three-layer design (see IB-WP-0019): +- This module persists per-infospace plan snapshots, usage rollups, and + plan-vs-actual variance under `output/budget/`. +- Layer 2 (cross-application observations for adaptive routing) lives in + llm-connect's QualityLedger (LLM-WP-0004). +- Layer 3 (organizational rollup) is state-hub `record_token_event`. +""" + +from __future__ import annotations + +import hashlib +import json +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import yaml + +BUDGET_DIR = Path("output/budget") +PLANS_FILE = BUDGET_DIR / "plans.yaml" +PLAN_RETENTION_DEFAULT = 50 +PLANS_SCHEMA_VERSION = 1 + +_SNAPSHOT_FINGERPRINT_FIELDS = ( + "stage", + "selected_chunk_count", + "selected_chunk_ids", + "selected_chapter_numbers", + "total_provider_calls_estimate", + "total_prompt_tokens_estimate", + "estimated_cost_usd", + "cost_per_1k_tokens", + "max_calls", + "cost_cap", +) + + +def record_plan_snapshot( + root: str | Path, + summary: dict[str, Any], + *, + retention: int = PLAN_RETENTION_DEFAULT, +) -> str: + """Persist a compact plan summary to ``output/budget/plans.yaml``. + + Returns the snapshot_id assigned to this entry. If a snapshot with the + same fingerprint already exists at the head of the list, its + ``recorded_at`` is refreshed instead of producing a duplicate entry. + """ + root_path = Path(root) + budget_path = root_path / PLANS_FILE + budget_path.parent.mkdir(parents=True, exist_ok=True) + snapshot = _build_snapshot(summary) + payload = _read_plans(budget_path) + snapshots = payload.get("snapshots") or [] + pruned_count = int(payload.get("pruned_count") or 0) + if snapshots and snapshots[-1].get("snapshot_id") == snapshot["snapshot_id"]: + snapshots[-1]["recorded_at"] = snapshot["recorded_at"] + else: + snapshots.append(snapshot) + if retention > 0 and len(snapshots) > retention: + overflow = len(snapshots) - retention + pruned_count += overflow + snapshots = snapshots[overflow:] + _write_plans( + budget_path, + { + "schema_version": PLANS_SCHEMA_VERSION, + "pruned_count": pruned_count, + "snapshots": snapshots, + }, + ) + return snapshot["snapshot_id"] + + +def read_plan_snapshots(root: str | Path) -> list[dict[str, Any]]: + """Return the persisted plan snapshots in chronological order.""" + payload = _read_plans(Path(root) / PLANS_FILE) + return list(payload.get("snapshots") or []) + + +def _build_snapshot(summary: dict[str, Any]) -> dict[str, Any]: + filters = { + "stage": summary.get("stage"), + "chapter_filter": summary.get("chapter_filter"), + "chunk_filter": summary.get("chunk_filter"), + "from_chapter": summary.get("from_chapter"), + "to_chapter": summary.get("to_chapter"), + } + fingerprint_source = { + key: summary.get(key) for key in _SNAPSHOT_FINGERPRINT_FIELDS + } + fingerprint_source["filters"] = filters + snapshot_id = _fingerprint(fingerprint_source) + return { + "snapshot_id": snapshot_id, + "recorded_at": _now(), + "stage": summary.get("stage"), + "filters": filters, + "selected_chunk_count": summary.get("selected_chunk_count"), + "selected_chunk_ids": list(summary.get("selected_chunk_ids") or []), + "selected_chapter_numbers": list(summary.get("selected_chapter_numbers") or []), + "per_workflow": list(summary.get("per_workflow") or []), + "total_provider_calls_estimate": summary.get("total_provider_calls_estimate"), + "total_prompt_tokens_estimate": summary.get("total_prompt_tokens_estimate"), + "total_prompt_words_estimate": summary.get("total_prompt_words_estimate"), + "estimated_cost_usd": summary.get("estimated_cost_usd"), + "cost_per_1k_tokens": summary.get("cost_per_1k_tokens"), + "max_calls": summary.get("max_calls"), + "cost_cap": summary.get("cost_cap"), + "exceeds_max_calls": bool(summary.get("exceeds_max_calls")), + "exceeds_cost_cap": bool(summary.get("exceeds_cost_cap")), + } + + +def _fingerprint(payload: dict[str, Any]) -> str: + serialised = json.dumps(payload, sort_keys=True, default=str) + return hashlib.sha256(serialised.encode("utf-8")).hexdigest()[:12] + + +def _read_plans(path: Path) -> dict[str, Any]: + if not path.is_file(): + return {"schema_version": PLANS_SCHEMA_VERSION, "pruned_count": 0, "snapshots": []} + try: + data = yaml.safe_load(path.read_text(encoding="utf-8")) + except yaml.YAMLError: + return {"schema_version": PLANS_SCHEMA_VERSION, "pruned_count": 0, "snapshots": []} + if not isinstance(data, dict): + return {"schema_version": PLANS_SCHEMA_VERSION, "pruned_count": 0, "snapshots": []} + return data + + +def _write_plans(path: Path, payload: dict[str, Any]) -> None: + path.write_text(yaml.safe_dump(payload, sort_keys=False), encoding="utf-8") + + +def _now() -> str: + return datetime.now(timezone.utc).isoformat() diff --git a/src/infospace_bench/generator.py b/src/infospace_bench/generator.py index b997fe4..b921aa1 100644 --- a/src/infospace_bench/generator.py +++ b/src/infospace_bench/generator.py @@ -15,6 +15,7 @@ from .evaluation_io import read_entity_evaluations from .history import get_history, read_metrics_file, record_check_results from .lifecycle import create_infospace, load_infospace, register_artifact from .openrouter import OpenRouterAssistedGenerationAdapter +from .budget import record_plan_snapshot from .source_intake import SourceChunk, normalize_source from .workflow import ( AssistedGenerationAdapter, @@ -113,6 +114,7 @@ def plan_generation( words_per_token: float = WORDS_PER_TOKEN_DEFAULT, entities_per_chunk: int = ENTITIES_PER_CHUNK_ESTIMATE, full: bool = False, + persist: bool = True, ) -> dict[str, Any]: root_path = Path(root) status = status_generation(root_path) @@ -129,9 +131,15 @@ def plan_generation( words_per_token=words_per_token, entities_per_chunk=entities_per_chunk, ) + summary["chapter_filter"] = list(chapter_filter) if chapter_filter else None + summary["chunk_filter"] = list(chunk_filter) if chunk_filter else None + summary["from_chapter"] = from_chapter + summary["to_chapter"] = to_chapter summary["root"] = str(root_path) summary["stale"] = status["stale"] summary["status"] = "planned" + if persist: + summary["snapshot_id"] = record_plan_snapshot(root_path, summary) if not full: return summary workflow_ids = _workflow_ids_for_stage(stage) diff --git a/tests/test_budget_registry.py b/tests/test_budget_registry.py new file mode 100644 index 0000000..8706d1a --- /dev/null +++ b/tests/test_budget_registry.py @@ -0,0 +1,179 @@ +import json +import os +import subprocess +import sys +import zipfile +from pathlib import Path + +import yaml + +from infospace_bench.budget import ( + PLAN_RETENTION_DEFAULT, + PLANS_FILE, + PLANS_SCHEMA_VERSION, + read_plan_snapshots, + record_plan_snapshot, +) +from infospace_bench.generator import init_generation_infospace, plan_generation + + +CONTAINER_XML = """ + + + + + +""" + +PACKAGE_OPF = """ + + + urn:test:budget + Budget Test Book + Author + en + + + + + + + + + + + + +""" + + +def _write_three_chapter_epub(path: Path) -> None: + with zipfile.ZipFile(path, "w") as archive: + archive.writestr("mimetype", "application/epub+zip") + archive.writestr("META-INF/container.xml", CONTAINER_XML) + archive.writestr("OEBPS/content.opf", PACKAGE_OPF) + for idx, label in enumerate(("I", "II", "III"), start=1): + archive.writestr( + f"OEBPS/ch{idx}.xhtml", + f"Book" + f"

{label}

" + f"

Body of chapter {label} with " + " ".join(f"word{n}" for n in range(40)) + ".

", + ) + + +def _build_infospace(tmp_path: Path) -> Path: + book = tmp_path / "book.epub" + _write_three_chapter_epub(book) + infospace = init_generation_infospace( + tmp_path, book, "budget-test", name="Budget Test", profile="general-knowledge" + ) + return infospace.root + + +def test_record_plan_snapshot_writes_yaml_with_stable_id(tmp_path: Path) -> None: + root = _build_infospace(tmp_path) + + summary = plan_generation(root, persist=False) + snapshot_id_1 = record_plan_snapshot(root, summary) + snapshot_id_2 = record_plan_snapshot(root, summary) + + persisted = (root / PLANS_FILE).read_text(encoding="utf-8") + data = yaml.safe_load(persisted) + + assert data["schema_version"] == PLANS_SCHEMA_VERSION + assert data["pruned_count"] == 0 + assert snapshot_id_1 == snapshot_id_2, "same summary must yield same snapshot_id" + # Duplicate writes refresh recorded_at instead of stacking + assert len(data["snapshots"]) == 1 + assert data["snapshots"][0]["snapshot_id"] == snapshot_id_1 + + +def test_different_filters_produce_distinct_snapshots(tmp_path: Path) -> None: + root = _build_infospace(tmp_path) + + full_plan = plan_generation(root, persist=False) + chapter_only = plan_generation(root, from_chapter=2, to_chapter=2, persist=False) + record_plan_snapshot(root, full_plan) + record_plan_snapshot(root, chapter_only) + + snapshots = read_plan_snapshots(root) + assert len(snapshots) == 2 + ids = {snap["snapshot_id"] for snap in snapshots} + assert len(ids) == 2 + # Filter values are echoed back into the snapshot + chapter_snapshot = next(s for s in snapshots if s["selected_chunk_count"] == 1) + assert chapter_snapshot["filters"]["from_chapter"] == 2 + assert chapter_snapshot["filters"]["to_chapter"] == 2 + + +def test_plan_generation_persists_snapshot_by_default(tmp_path: Path) -> None: + root = _build_infospace(tmp_path) + + result = plan_generation(root, from_chapter=1, to_chapter=2) + + assert "snapshot_id" in result + assert (root / PLANS_FILE).is_file() + snapshots = read_plan_snapshots(root) + assert len(snapshots) == 1 + assert snapshots[0]["snapshot_id"] == result["snapshot_id"] + + +def test_plan_generation_persist_false_skips_write(tmp_path: Path) -> None: + root = _build_infospace(tmp_path) + + plan_generation(root, persist=False) + + assert not (root / PLANS_FILE).exists() + + +def test_plan_snapshot_retention_prunes_old_entries(tmp_path: Path) -> None: + root = _build_infospace(tmp_path) + + # Produce 5 distinct snapshots and cap retention at 3. + for chapter in (1, 2, 3, None, None): + kwargs = {"from_chapter": chapter, "to_chapter": chapter} if chapter else {} + summary = plan_generation(root, persist=False, **kwargs) + if not chapter: + # vary another field to avoid duplicate refresh + summary["max_calls"] = (summary.get("max_calls") or 0) + 1 + summary["exceeds_max_calls"] = False + record_plan_snapshot(root, summary, retention=3) + + data = yaml.safe_load((root / PLANS_FILE).read_text(encoding="utf-8")) + assert len(data["snapshots"]) == 3 + assert data["pruned_count"] >= 1 + + +def test_plan_cli_writes_snapshot(tmp_path: Path) -> None: + root = _build_infospace(tmp_path) + env = os.environ.copy() + env["PYTHONPATH"] = "src:/home/worsch/markitect-tool/src" + + result = subprocess.run( + [ + sys.executable, + "-m", + "infospace_bench", + "generate", + "plan", + str(root), + "--from-chapter", + "1", + "--to-chapter", + "2", + "--cost-per-1k", + "0.5", + ], + check=False, + env=env, + text=True, + capture_output=True, + ) + + assert result.returncode == 0, result.stderr + payload = json.loads(result.stdout) + assert "snapshot_id" in payload + snapshots = read_plan_snapshots(root) + assert len(snapshots) == 1 + assert snapshots[0]["filters"]["from_chapter"] == 1 + assert snapshots[0]["filters"]["to_chapter"] == 2 diff --git a/workplans/IB-WP-0019-budget-and-usage-registry.md b/workplans/IB-WP-0019-budget-and-usage-registry.md index 600215d..75e3a5d 100644 --- a/workplans/IB-WP-0019-budget-and-usage-registry.md +++ b/workplans/IB-WP-0019-budget-and-usage-registry.md @@ -15,6 +15,7 @@ related_workplans: - IB-WP-0014 - IB-WP-0018 - LLM-WP-0004 +state_hub_workstream_id: "063c6285-a56e-476b-8666-109d6fa35858" --- # IB-WP-0019 — Budget and Usage Registry for Infospaces @@ -76,8 +77,9 @@ Three layers, each owned by a different repo: ```task id: IB-WP-0019-T01 -status: todo +status: done priority: high +state_hub_task_id: "7f1a4e0a-c1ad-49f3-aad1-6946de9b1219" ``` - Append the compact `plan_generation_summary` payload to @@ -95,6 +97,7 @@ priority: high id: IB-WP-0019-T02 status: todo priority: high +state_hub_task_id: "a612f8d4-f96d-4fae-9aa6-66a7946414f5" ``` - On `run` and `resume` completion, scan the run-record YAML written by @@ -116,6 +119,7 @@ priority: high id: IB-WP-0019-T03 status: todo priority: high +state_hub_task_id: "688c590d-8885-455e-bcf6-61409a45e001" ``` - Add `docs/model-rates.yaml` with `model -> {prompt_per_1k, @@ -135,6 +139,7 @@ priority: high id: IB-WP-0019-T04 status: todo priority: medium +state_hub_task_id: "c6adc4fb-9062-4c81-a0b2-98d3166e047d" ``` - Compute a small variance record on each run: actual_calls / @@ -154,6 +159,7 @@ priority: medium id: IB-WP-0019-T05 status: todo priority: medium +state_hub_task_id: "968bca1d-63ff-4818-83bb-ca314b1e633c" ``` - After each completed run, call state-hub `record_token_event` with @@ -173,6 +179,7 @@ priority: medium id: IB-WP-0019-T06 status: todo priority: medium +state_hub_task_id: "7cb34bfc-c562-4dda-a6d4-b44158644e19" ``` - Add `infospace-bench budget list ` that walks @@ -190,6 +197,7 @@ priority: medium id: IB-WP-0019-T07 status: todo priority: low +state_hub_task_id: "b97906e0-2835-4246-9868-840c02d64fae" ``` - Confirm `output/budget/` ends up inside the archive package built by