IB-WP-0016-T03: scale-aware planning

Replace generate plan's full-prompt dump with a compact summary that
reports selected-chunk counts, selected chapter numbers, per-workflow
call counts, prompt-word and token estimates, and a rough USD cost when
--cost-per-1k is supplied. Selection filters --chapter (label or number,
repeatable), --from-chapter / --to-chapter (numeric range), and --chunk
(repeatable id) shape the estimate. Budget caps --max-calls and
--cost-cap are reported as exceeds_* booleans so callers can fail fast
before run.

The old full per-workflow plan with prompts remains available behind
--full so deep inspection is opt-in instead of the default.

Whole-Lefevre estimate at default max_words=800: 146 chunks, 730 calls,
~518k prompt tokens, ~$155 at $0.30/1k. Chapters 3-5 only: 19 chunks,
95 calls, ~64k tokens. 87 tests pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-17 18:18:09 +02:00
parent f8289699e7
commit 13f9c1895c
6 changed files with 434 additions and 9 deletions

View File

@@ -155,6 +155,33 @@ def build_parser() -> argparse.ArgumentParser:
)
generate_plan.add_argument("root")
generate_plan.add_argument("--stage", default="all")
generate_plan.add_argument(
"--chapter",
action="append",
default=[],
help="Filter to chapter label or number (repeatable: --chapter I --chapter II)",
)
generate_plan.add_argument("--from-chapter", type=int, default=None)
generate_plan.add_argument("--to-chapter", type=int, default=None)
generate_plan.add_argument(
"--chunk",
action="append",
default=[],
help="Filter to source chunk id (repeatable)",
)
generate_plan.add_argument("--max-calls", type=int, default=None)
generate_plan.add_argument("--cost-cap", type=float, default=None)
generate_plan.add_argument(
"--cost-per-1k", type=float, default=0.0, help="USD per 1k prompt tokens for rough cost estimate"
)
generate_plan.add_argument(
"--entities-per-chunk", type=int, default=2, help="Estimate of entities each chunk yields"
)
generate_plan.add_argument(
"--full",
action="store_true",
help="Include full per-stage prompts in the output (off by default for long corpora)",
)
generate_run = generate_sub.add_parser(
"run",
@@ -448,7 +475,21 @@ def main(argv: list[str] | None = None) -> int:
}
)
elif args.generate_command == "plan":
_write_json(plan_generation(Path(args.root), stage=args.stage))
_write_json(
plan_generation(
Path(args.root),
stage=args.stage,
chapter_filter=args.chapter or None,
chunk_filter=args.chunk or None,
from_chapter=args.from_chapter,
to_chapter=args.to_chapter,
max_calls=args.max_calls,
cost_cap=args.cost_cap,
cost_per_1k_tokens=args.cost_per_1k,
entities_per_chunk=args.entities_per_chunk,
full=args.full,
)
)
elif args.generate_command == "run":
_write_json(
run_generation(

View File

@@ -89,8 +89,51 @@ def init_generation_infospace(
return load_infospace(infospace.root)
def plan_generation(root: str | Path, *, stage: str = "all") -> dict[str, Any]:
WORDS_PER_TOKEN_DEFAULT = 0.75
ENTITIES_PER_CHUNK_ESTIMATE = 2
_CALLS_PER_CHUNK_BY_WORKFLOW = {
"generic-source-summary": 1,
"generic-source-entities": 1,
"generic-source-relations": 1,
}
def plan_generation(
root: str | Path,
*,
stage: str = "all",
chapter_filter: list[str] | None = None,
chunk_filter: list[str] | None = None,
from_chapter: int | None = None,
to_chapter: int | None = None,
max_calls: int | None = None,
cost_cap: float | None = None,
cost_per_1k_tokens: float = 0.0,
words_per_token: float = WORDS_PER_TOKEN_DEFAULT,
entities_per_chunk: int = ENTITIES_PER_CHUNK_ESTIMATE,
full: bool = False,
) -> dict[str, Any]:
root_path = Path(root)
status = status_generation(root_path)
summary = plan_generation_summary(
root_path,
stage=stage,
chapter_filter=chapter_filter,
chunk_filter=chunk_filter,
from_chapter=from_chapter,
to_chapter=to_chapter,
max_calls=max_calls,
cost_cap=cost_cap,
cost_per_1k_tokens=cost_per_1k_tokens,
words_per_token=words_per_token,
entities_per_chunk=entities_per_chunk,
)
summary["root"] = str(root_path)
summary["stale"] = status["stale"]
summary["status"] = "planned"
if not full:
return summary
workflow_ids = _workflow_ids_for_stage(stage)
plans: list[dict[str, Any]] = []
for workflow_id in workflow_ids:
@@ -104,17 +147,168 @@ def plan_generation(root: str | Path, *, stage: str = "all") -> dict[str, Any]:
"error": exc.to_dict(),
}
)
status = status_generation(root_path)
summary["workflows"] = plans
return summary
def plan_generation_summary(
root: str | Path,
*,
stage: str = "all",
chapter_filter: list[str] | None = None,
chunk_filter: list[str] | None = None,
from_chapter: int | None = None,
to_chapter: int | None = None,
max_calls: int | None = None,
cost_cap: float | None = None,
cost_per_1k_tokens: float = 0.0,
words_per_token: float = WORDS_PER_TOKEN_DEFAULT,
entities_per_chunk: int = ENTITIES_PER_CHUNK_ESTIMATE,
) -> dict[str, Any]:
root_path = Path(root)
infospace = load_infospace(root_path)
sources = [item for item in infospace.artifacts if item.kind == "source"]
selected = _select_source_chunks(
sources,
chapter_filter=chapter_filter,
chunk_filter=chunk_filter,
from_chapter=from_chapter,
to_chapter=to_chapter,
)
workflow_ids = _workflow_ids_for_stage(stage)
profile_name = _read_profile_name(root_path)
template_words = _profile_template_words(root_path, profile_name)
chunk_word_total = sum(_source_word_count(root_path, item) for item in selected)
per_stage: list[dict[str, Any]] = []
total_calls = 0
total_prompt_words = 0
for workflow_id in workflow_ids:
if workflow_id == "generic-source-evaluations":
calls = len(selected) * max(0, entities_per_chunk)
template_label = "evaluate-entity"
entity_words_estimate = 80
prompt_words = calls * (
template_words.get(template_label, 0) + entity_words_estimate
)
else:
calls = len(selected) * _CALLS_PER_CHUNK_BY_WORKFLOW.get(workflow_id, 0)
template_label = _template_for_workflow(workflow_id)
prompt_words = calls * template_words.get(template_label, 0) + chunk_word_total * (
1 if calls else 0
)
per_stage.append(
{
"workflow_id": workflow_id,
"calls": calls,
"prompt_words_estimate": prompt_words,
}
)
total_calls += calls
total_prompt_words += prompt_words
total_tokens = int(round(total_prompt_words / words_per_token)) if words_per_token > 0 else 0
cost: float | None = None
if cost_per_1k_tokens > 0:
cost = round((total_tokens / 1000.0) * cost_per_1k_tokens, 4)
chapter_numbers = sorted(
{
int(item.provenance.get("chapter_number"))
for item in selected
if isinstance(item.provenance.get("chapter_number"), int)
}
)
return {
"root": str(root_path),
"stage": stage,
"status": "planned",
"stale": status["stale"],
"source_chunk_count": status["source_chunk_count"],
"workflows": plans,
"source_chunk_count": len(sources),
"selected_chunk_count": len(selected),
"selected_chunk_ids": [item.id.split("/", 1)[-1].rsplit(".md", 1)[0] for item in selected],
"selected_chapter_numbers": chapter_numbers,
"per_workflow": per_stage,
"total_provider_calls_estimate": total_calls,
"total_prompt_words_estimate": total_prompt_words,
"total_prompt_tokens_estimate": total_tokens,
"estimated_cost_usd": cost,
"cost_per_1k_tokens": cost_per_1k_tokens or None,
"words_per_token": words_per_token,
"entities_per_chunk_estimate": entities_per_chunk,
"max_calls": max_calls,
"cost_cap": cost_cap,
"exceeds_max_calls": bool(max_calls is not None and total_calls > max_calls),
"exceeds_cost_cap": bool(cost_cap is not None and cost is not None and cost > cost_cap),
}
def _select_source_chunks(
sources: list[Any],
*,
chapter_filter: list[str] | None,
chunk_filter: list[str] | None,
from_chapter: int | None,
to_chapter: int | None,
) -> list[Any]:
chunk_set = {value.strip() for value in (chunk_filter or []) if value.strip()}
label_set = {value.strip().lower() for value in (chapter_filter or []) if value.strip()}
out: list[Any] = []
for item in sources:
chunk_id = item.provenance.get("chunk_id") or item.id.split("/", 1)[-1].rsplit(".md", 1)[0]
if chunk_set and chunk_id not in chunk_set:
continue
chapter_number = item.provenance.get("chapter_number")
chapter_label = (item.provenance.get("chapter_label") or "").strip().lower()
if label_set:
number_match = (
isinstance(chapter_number, int) and str(chapter_number) in label_set
)
label_match = chapter_label in label_set if chapter_label else False
if not (number_match or label_match):
continue
if from_chapter is not None or to_chapter is not None:
if not isinstance(chapter_number, int):
continue
if from_chapter is not None and chapter_number < from_chapter:
continue
if to_chapter is not None and chapter_number > to_chapter:
continue
out.append(item)
return out
def _template_for_workflow(workflow_id: str) -> str:
mapping = {
"generic-source-summary": "summarize-source",
"generic-source-entities": "extract-entities",
"generic-source-relations": "extract-relations",
"generic-source-evaluations": "evaluate-entity",
}
return mapping.get(workflow_id, "")
def _profile_template_words(root: Path, profile: str) -> dict[str, int]:
template_dir = Path(root) / "profiles" / profile / "templates"
counts: dict[str, int] = {}
if not template_dir.is_dir():
return counts
for path in template_dir.glob("*.md"):
try:
text = path.read_text(encoding="utf-8")
except OSError:
continue
counts[path.stem] = len(text.split())
return counts
def _source_word_count(root: Path, artifact: Any) -> int:
path = Path(root) / artifact.path
try:
return len(path.read_text(encoding="utf-8").split())
except OSError:
return 0
def _read_profile_name(root: Path) -> str:
state = _read_state(root)
return str(state.get("profile") or DEFAULT_PROFILE)
def run_generation(
root: str | Path,
*,