Files
markitect-main/markitect/prompts/execution/batch.py
tegwick 144a88c0c2 feat(prompts): add batch LLM evaluation orchestrator (S1.6)
BatchEvaluator runs evaluation prompts across item batches with
incremental evaluation (skip unchanged via content digest), per-item
error isolation, progress callbacks, and aggregate token usage tracking.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 01:40:13 +01:00

169 lines
5.5 KiB
Python

"""
Batch LLM evaluation orchestrator.
Runs an evaluation prompt against a batch of items (entities, pairs,
etc.), collecting structured results. Handles:
- Incremental evaluation (skip items whose content hasn't changed)
- Progress reporting via callback
- Graceful error handling per item (one failure doesn't stop the batch)
- Aggregate token usage tracking
This is the mechanism by which infospace tooling delegates LLM work
to the platform. The adapter's own retry logic handles transient
API errors (rate limits, 5xx).
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional
from markitect.prompts.execution.llm_adapter import LLMAdapter
from markitect.prompts.execution.models import LLMResponse, RunConfig
@dataclass
class BatchItem:
"""A single item to evaluate in a batch.
Attributes:
key: Unique identifier (e.g. entity slug).
prompt: The compiled prompt text to send to the LLM.
content_digest: Hash of the source content, used for
incremental evaluation (skip if unchanged).
metadata: Arbitrary pass-through metadata.
"""
key: str
prompt: str
content_digest: str = ""
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class BatchResult:
"""Result for a single batch item.
Attributes:
key: Matches the input :attr:`BatchItem.key`.
status: One of ``"success"``, ``"error"``, ``"skipped"``.
response: The LLM response (``None`` if skipped or error).
error: Error message (``None`` if success or skipped).
metadata: Pass-through metadata from the input item.
"""
key: str
status: str
response: Optional[LLMResponse] = None
error: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class BatchSummary:
"""Aggregate results from a batch evaluation run."""
total: int = 0
succeeded: int = 0
failed: int = 0
skipped: int = 0
results: List[BatchResult] = field(default_factory=list)
total_prompt_tokens: int = 0
total_completion_tokens: int = 0
@property
def total_tokens(self) -> int:
return self.total_prompt_tokens + self.total_completion_tokens
def success_rate(self) -> float:
"""Fraction of non-skipped items that succeeded."""
attempted = self.total - self.skipped
if attempted == 0:
return 1.0
return self.succeeded / attempted
class BatchEvaluator:
"""Orchestrates LLM evaluation across a batch of items.
Args:
adapter: The LLM adapter to use for evaluation.
config: Run configuration (model, temperature, etc.).
progress_callback: Optional ``fn(completed, total, result)``
called after each item is processed.
previous_digests: Optional ``{key: digest}`` mapping from a
previous run. Items whose digest matches are skipped.
"""
def __init__(
self,
adapter: LLMAdapter,
config: Optional[RunConfig] = None,
progress_callback: Optional[Callable[[int, int, BatchResult], None]] = None,
previous_digests: Optional[Dict[str, str]] = None,
):
self._adapter = adapter
self._config = config or RunConfig()
self._progress_callback = progress_callback
self._previous_digests = previous_digests or {}
def evaluate(self, items: List[BatchItem]) -> BatchSummary:
"""Run evaluation for all items and return aggregate results.
Items whose :attr:`~BatchItem.content_digest` matches an entry
in *previous_digests* are skipped. All other items are sent to
the LLM adapter. Errors on individual items are captured
without aborting the batch.
"""
summary = BatchSummary(total=len(items))
for idx, item in enumerate(items):
result = self._evaluate_one(item)
summary.results.append(result)
if result.status == "success":
summary.succeeded += 1
usage = result.response.usage if result.response else {}
summary.total_prompt_tokens += usage.get("prompt_tokens", 0)
summary.total_completion_tokens += usage.get("completion_tokens", 0)
elif result.status == "skipped":
summary.skipped += 1
else:
summary.failed += 1
if self._progress_callback is not None:
self._progress_callback(idx + 1, len(items), result)
return summary
def _evaluate_one(self, item: BatchItem) -> BatchResult:
"""Evaluate a single item, handling skip logic and errors."""
# Incremental: skip if digest unchanged
if (
item.content_digest
and item.key in self._previous_digests
and self._previous_digests[item.key] == item.content_digest
):
return BatchResult(
key=item.key,
status="skipped",
metadata=item.metadata,
)
try:
response = self._adapter.execute_prompt(item.prompt, self._config)
return BatchResult(
key=item.key,
status="success",
response=response,
metadata=item.metadata,
)
except Exception as exc:
return BatchResult(
key=item.key,
status="error",
error=str(exc),
metadata=item.metadata,
)