""" Budget and usage registry for infospaces. Layer 1 of the three-layer design (see IB-WP-0019): - This module persists per-infospace plan snapshots, usage rollups, and plan-vs-actual variance under `output/budget/`. - Layer 2 (cross-application observations for adaptive routing) lives in llm-connect's QualityLedger (LLM-WP-0004). - Layer 3 (organizational rollup) is state-hub `record_token_event`. """ from __future__ import annotations import hashlib import json from datetime import datetime, timezone from pathlib import Path from typing import Any import yaml BUDGET_DIR = Path("output/budget") PLANS_FILE = BUDGET_DIR / "plans.yaml" PLAN_RETENTION_DEFAULT = 50 PLANS_SCHEMA_VERSION = 1 _SNAPSHOT_FINGERPRINT_FIELDS = ( "stage", "selected_chunk_count", "selected_chunk_ids", "selected_chapter_numbers", "total_provider_calls_estimate", "total_prompt_tokens_estimate", "estimated_cost_usd", "cost_per_1k_tokens", "max_calls", "cost_cap", ) def record_plan_snapshot( root: str | Path, summary: dict[str, Any], *, retention: int = PLAN_RETENTION_DEFAULT, ) -> str: """Persist a compact plan summary to ``output/budget/plans.yaml``. Returns the snapshot_id assigned to this entry. If a snapshot with the same fingerprint already exists at the head of the list, its ``recorded_at`` is refreshed instead of producing a duplicate entry. """ root_path = Path(root) budget_path = root_path / PLANS_FILE budget_path.parent.mkdir(parents=True, exist_ok=True) snapshot = _build_snapshot(summary) payload = _read_plans(budget_path) snapshots = payload.get("snapshots") or [] pruned_count = int(payload.get("pruned_count") or 0) if snapshots and snapshots[-1].get("snapshot_id") == snapshot["snapshot_id"]: snapshots[-1]["recorded_at"] = snapshot["recorded_at"] else: snapshots.append(snapshot) if retention > 0 and len(snapshots) > retention: overflow = len(snapshots) - retention pruned_count += overflow snapshots = snapshots[overflow:] _write_plans( budget_path, { "schema_version": PLANS_SCHEMA_VERSION, "pruned_count": pruned_count, "snapshots": snapshots, }, ) return snapshot["snapshot_id"] def read_plan_snapshots(root: str | Path) -> list[dict[str, Any]]: """Return the persisted plan snapshots in chronological order.""" payload = _read_plans(Path(root) / PLANS_FILE) return list(payload.get("snapshots") or []) def _build_snapshot(summary: dict[str, Any]) -> dict[str, Any]: filters = { "stage": summary.get("stage"), "chapter_filter": summary.get("chapter_filter"), "chunk_filter": summary.get("chunk_filter"), "from_chapter": summary.get("from_chapter"), "to_chapter": summary.get("to_chapter"), } fingerprint_source = { key: summary.get(key) for key in _SNAPSHOT_FINGERPRINT_FIELDS } fingerprint_source["filters"] = filters snapshot_id = _fingerprint(fingerprint_source) return { "snapshot_id": snapshot_id, "recorded_at": _now(), "stage": summary.get("stage"), "filters": filters, "selected_chunk_count": summary.get("selected_chunk_count"), "selected_chunk_ids": list(summary.get("selected_chunk_ids") or []), "selected_chapter_numbers": list(summary.get("selected_chapter_numbers") or []), "per_workflow": list(summary.get("per_workflow") or []), "total_provider_calls_estimate": summary.get("total_provider_calls_estimate"), "total_prompt_tokens_estimate": summary.get("total_prompt_tokens_estimate"), "total_prompt_words_estimate": summary.get("total_prompt_words_estimate"), "estimated_cost_usd": summary.get("estimated_cost_usd"), "cost_per_1k_tokens": summary.get("cost_per_1k_tokens"), "max_calls": summary.get("max_calls"), "cost_cap": summary.get("cost_cap"), "exceeds_max_calls": bool(summary.get("exceeds_max_calls")), "exceeds_cost_cap": bool(summary.get("exceeds_cost_cap")), } def _fingerprint(payload: dict[str, Any]) -> str: serialised = json.dumps(payload, sort_keys=True, default=str) return hashlib.sha256(serialised.encode("utf-8")).hexdigest()[:12] def _read_plans(path: Path) -> dict[str, Any]: if not path.is_file(): return {"schema_version": PLANS_SCHEMA_VERSION, "pruned_count": 0, "snapshots": []} try: data = yaml.safe_load(path.read_text(encoding="utf-8")) except yaml.YAMLError: return {"schema_version": PLANS_SCHEMA_VERSION, "pruned_count": 0, "snapshots": []} if not isinstance(data, dict): return {"schema_version": PLANS_SCHEMA_VERSION, "pruned_count": 0, "snapshots": []} return data def _write_plans(path: Path, payload: dict[str, Any]) -> None: path.write_text(yaml.safe_dump(payload, sort_keys=False), encoding="utf-8") def _now() -> str: return datetime.now(timezone.utc).isoformat()