diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..db45910 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,11 @@ +# llm-connect — Codex Instructions + +@SCOPE.md +@.claude/rules/repo-identity.md +@.claude/rules/session-protocol.md +@.claude/rules/first-session.md +@.claude/rules/workplan-convention.md +@.claude/rules/stack-and-commands.md +@.claude/rules/architecture.md +@.claude/rules/repo-boundary.md +@.claude/rules/agents.md diff --git a/contracts/functional/adaptive-routing-policy.md b/contracts/functional/adaptive-routing-policy.md new file mode 100644 index 0000000..a41d309 --- /dev/null +++ b/contracts/functional/adaptive-routing-policy.md @@ -0,0 +1,87 @@ +# Contract: AdaptiveRoutingPolicy + +**layer:** Functional +**maturity:** Beta +**module:** `llm_connect.routing` +**since:** WP-0004 + +## Purpose + +Select the cheapest adapter whose observed mean quality for a task type clears +a caller-supplied quality floor. The policy builds on `RoutingPolicy`: static +rules remain the cold-start and failure fallback, while adaptive selection is +used only when the ledger has enough qualifying observations. + +## Public surface + +```python +@dataclass +class AdaptiveRoutingPolicy(RoutingPolicy): + ledger: Optional[QualityLedger] = None + adapters_by_id: Mapping[str, LLMAdapter] = field(default_factory=dict) + window_size: int = 20 + min_observations: int = 1 + max_age: Optional[timedelta] = None + + def resolve( + self, + task_type: str, + estimated_cost_per_1k: Optional[float] = None, + *, + quality_floor: Optional[float] = None, + ) -> LLMAdapter: ... +``` + +## Candidate identity + +Observations are keyed by `(task_type, adapter_id)`. Callers should pass +`adapters_by_id` so the policy can map ledger observations back to concrete +`LLMAdapter` instances. If a static rule adapter is not present in +`adapters_by_id`, the policy also checks common string attributes +`adapter_id`, `id`, and `name`. + +## Invariants + +1. If `quality_floor is None` or `ledger is None`, resolution is exactly the + same as `RoutingPolicy.resolve()`. +2. `quality_floor` must be between `0` and `1`, inclusive. +3. Each candidate is evaluated over the newest `window_size` observations for + the requested `task_type` and adapter id. +4. `max_age`, when provided, filters out observations older than that age. +5. A candidate is considered only when it has at least `min_observations` after + filtering. +6. A candidate qualifies when its mean `quality_score` is greater than or equal + to `quality_floor`. +7. Among qualifying candidates, the policy chooses the lowest mean observed + `cost_usd`. +8. If mean observed cost ties exactly, the policy prefers the matching static + rule's explicit `prefer` adapter. +9. If there are still ties, stable candidate order is used. +10. If no candidate qualifies, resolution falls through to + `RoutingPolicy.resolve(task_type, estimated_cost_per_1k)`. + +## Sample-size and freshness trade-off + +Small `window_size` values react quickly to model or prompt changes but can be +noisy. Larger windows are more stable but may preserve stale behavior after a +provider update or prompt template change. `min_observations` lets callers avoid +acting on a single lucky sample, while `max_age` bounds how long old observations +can influence routing. Callers that change prompts materially should also filter +by a prompt fingerprint in observation tags before writing comparable samples to +the same ledger regime. + +## Error contract + +| Condition | Exception | +|-----------|-----------| +| `quality_floor` outside `0..1` | `ValueError` | +| `window_size <= 0` | `ValueError` | +| `min_observations <= 0` | `ValueError` | +| `max_age < 0` | `ValueError` | +| No qualifying adaptive candidate and no static fallback | `LookupError` | + +## Non-goals + +The policy does not define a task taxonomy, set task quality floors, decide +which baseline is authoritative, or perform billing-grade accounting. Those are +consumer policy choices. diff --git a/contracts/functional/baseline-grading.md b/contracts/functional/baseline-grading.md new file mode 100644 index 0000000..34be331 --- /dev/null +++ b/contracts/functional/baseline-grading.md @@ -0,0 +1,85 @@ +# Contract: Baseline Grading + +**layer:** Functional +**maturity:** Beta +**module:** `llm_connect.grading` +**since:** WP-0004 + +## Purpose + +Compare a candidate adapter response against a caller-chosen baseline response +and return a normalised quality score suitable for storage in +`QualityLedger`. + +## Public surface + +```python +@dataclass(frozen=True) +class GradingResult: + quality_score: float + notes: str + grader_id: str + baseline_response: LLMResponse + candidate_response: LLMResponse + +class Judge(Protocol): + grader_id: str + def judge(..., *, prompt: str, run_config: RunConfig) -> GradingResult: ... + +class BaselineGrader(Protocol): + def grade( + self, + baseline_adapter: LLMAdapter, + candidate_adapter: LLMAdapter, + prompt: str, + run_config: RunConfig, + ) -> GradingResult: ... + +@dataclass +class ExactMatchJudge: ... + +@dataclass +class EmbeddingSimilarityJudge: ... + +@dataclass +class LLMJudge: ... + +@dataclass +class PairedGrader: ... +``` + +## Invariants + +1. `quality_score` is always validated as `0.0..1.0`. +2. `GradingResult` always preserves both baseline and candidate responses. +3. `PairedGrader` runs the baseline adapter and the candidate adapter with the + same prompt and run config, then delegates comparison to its `Judge`. +4. `ExactMatchJudge` returns `1.0` for matched content and `0.0` otherwise. +5. `EmbeddingSimilarityJudge` embeds baseline and candidate response text in a + single batch and clamps cosine similarity into `0.0..1.0`. +6. `LLMJudge` uses a fixed rubric prompt and expects JSON with + `quality_score` and optional `notes`. +7. `LLMJudge` runs with `temperature=0.0`, drops the caller's budget tracker, + and adds a deterministic `seed` model parameter when configured. + +## Error contract + +| Condition | Exception | +|-----------|-----------| +| Invalid `quality_score` | `ValueError` | +| Empty `grader_id` | `ValueError` | +| Embedding adapter returns other than two vectors | `ValueError` | +| LLM judge response is missing parseable JSON | `ValueError` | + +## Bias caveats + +LLM-as-judge scoring is heuristic and may exhibit: + +- Length bias: longer answers can be preferred even when not better. +- Format bias: familiar formatting can be rewarded independent of correctness. +- Position bias: prompt order can affect judgement. +- Self-preference: a judge may favour outputs from its own model family. + +Consumers should calibrate `LLMJudge` against at least one non-LLM judge such +as exact match or embedding similarity before using its observations to drive +adaptive routing. diff --git a/contracts/functional/quality-ledger.md b/contracts/functional/quality-ledger.md new file mode 100644 index 0000000..d905f05 --- /dev/null +++ b/contracts/functional/quality-ledger.md @@ -0,0 +1,87 @@ +# Contract: QualityObservation and QualityLedger + +**layer:** Functional +**maturity:** Beta +**module:** `llm_connect.quality` +**since:** WP-0004 + +## Purpose + +Record observed quality, cost, latency, and token outcomes for a logical task +type so consumers can build adaptive routing policy without putting +consumer-specific thresholds into llm-connect. + +## Public surface + +```python +@dataclass(frozen=True) +class QualityObservation: + task_type: str + adapter_id: str + model_id: str + cost_usd: float + quality_score: float + latency_ms: float + tokens_in: int + tokens_out: int + baseline_adapter_id: str | None = None + recorded_at: datetime = field(default_factory=...) + tags: dict[str, Any] = field(default_factory=dict) + + @property + def total_tokens(self) -> int: ... + def to_dict(self) -> dict[str, Any]: ... + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "QualityObservation": ... + +class QualityLedger: + def __init__(self, path: str | Path): ... + @property + def path(self) -> Path: ... + def append(self, observation: QualityObservation) -> None: ... + def read_all(self) -> list[QualityObservation]: ... + def malformed_count(self) -> int: ... + def by_task_type(self, task_type: str) -> list[QualityObservation]: ... + def recent(...) -> list[QualityObservation]: ... + def mean_quality(...) -> float | None: ... + def prune_before(self, timestamp: datetime) -> int: ... + +def is_stale(observation: QualityObservation, max_age: timedelta, *, now: datetime | None = None) -> bool: ... +``` + +## Invariants + +1. `quality_score` is a normalised `0.0..1.0` score where `1.0` means the + candidate fully meets the grader's quality bar and `0.0` means complete + failure for that grader. +2. `task_type`, `adapter_id`, and `model_id` must be non-empty strings. +3. `cost_usd`, `latency_ms`, `tokens_in`, and `tokens_out` are non-negative. +4. `recorded_at` is normalised to UTC. Naive datetimes are interpreted as UTC. +5. Ledger records are JSON Lines. Each line is one `QualityObservation.to_dict()`. +6. `QualityLedger.append()` performs a process-local lock plus an advisory file + lock around each write. +7. Read/query helpers skip malformed lines instead of failing the whole ledger. + `malformed_count()` exposes how many lines were skipped. +8. `prune_before()` removes only valid observations older than the cutoff. + Malformed lines are preserved. + +## Error contract + +| Condition | Exception | +|-----------|-----------| +| Invalid observation field | `ValueError` | +| Invalid datetime field | `TypeError` or `ValueError` | +| Negative recent limit | `ValueError` | +| `mean_quality(min_observations <= 0)` | `ValueError` | +| `is_stale(max_age < 0)` | `ValueError` | + +## Known consumers + +- `infospace-bench` is the first intended consumer. It is expected to provide + task taxonomy, thresholds, and baseline choice. + +## Notes + +The ledger intentionally stores only observation metadata in this slice. Callers +that need prompt or response digests can place those in `tags`, for example +`prompt_fingerprint`. diff --git a/contracts/functional/shadowing-adapter.md b/contracts/functional/shadowing-adapter.md new file mode 100644 index 0000000..f363fbd --- /dev/null +++ b/contracts/functional/shadowing-adapter.md @@ -0,0 +1,84 @@ +# Contract: ShadowingAdapter + +**layer:** Functional +**maturity:** Beta +**module:** `llm_connect.shadowing` +**since:** WP-0004 + +## Purpose + +Collect quality observations without changing caller-visible model behavior. +`ShadowingAdapter` wraps a candidate adapter, returns the candidate response to +the caller, and samples extra baseline/grading work that appends +`QualityObservation` records to a `QualityLedger`. + +## Public surface + +```python +@dataclass +class ShadowingAdapter(LLMAdapter): + candidate_adapter: LLMAdapter + baseline_adapter: LLMAdapter + grader: BaselineGrader + ledger: QualityLedger + task_type: str + adapter_id: str + model_id: Optional[str] = None + baseline_adapter_id: Optional[str] = None + shadow_rate: float = 1.0 + async_shadow: bool = False + tags: Mapping[str, Any] = field(default_factory=dict) + on_shadow_error: Optional[Callable[[Exception], None]] = None + + def execute_prompt(self, prompt: str, config: RunConfig) -> LLMResponse: ... + async def async_execute_prompt(self, prompt: str, config: RunConfig) -> LLMResponse: ... + def flush(self, timeout: Optional[float] = None) -> None: ... + def shutdown(self, wait: bool = True) -> None: ... +``` + +## Invariants + +1. The candidate adapter is always called first. +2. The response returned by `execute_prompt()` and `async_execute_prompt()` is + always the candidate response. +3. Shadow failures from the baseline adapter, grader, or ledger writer are + isolated from the caller. They are sent to `on_shadow_error` when configured. +4. `shadow_rate=0.0` records no observations. `shadow_rate=1.0` shadows every + successful candidate call. Intermediate values sample with `random_source`. +5. Shadow grading reuses the candidate response already returned by the wrapped + candidate adapter; it does not make a second candidate model call. +6. Shadow calls use a copy of `RunConfig` with `budget_tracker=None`, so + observation collection cannot consume the caller's foreground token budget. +7. `async_shadow=True` schedules shadow work on a background thread. `flush()` + waits for currently queued work, and `shutdown()` releases the executor. + +## Observation mapping + +The appended observation uses: + +- `task_type` from the wrapper configuration +- `adapter_id` from the wrapper configuration +- `model_id` from the wrapper configuration, then candidate response model, then + `RunConfig.model_name` +- `quality_score` from the `GradingResult` +- `cost_usd` from response metadata keys `cost_usd`, `estimated_cost_usd`, or + `cost`, falling back to `0.0` +- token counts from candidate response usage keys `prompt_tokens` and + `completion_tokens` +- `baseline_adapter_id` and `tags` from wrapper configuration + +## Error contract + +| Condition | Exception | +|-----------|-----------| +| Empty `task_type` | `ValueError` | +| Empty `adapter_id` | `ValueError` | +| `shadow_rate` outside `0..1` | `ValueError` | +| Candidate adapter failure | Original exception propagates | +| Shadow baseline/grading/ledger failure | Suppressed; optional callback | + +## Privacy note + +The wrapper does not store prompt or response text in the ledger by default. +Callers that need regime tracking should store non-sensitive fingerprints in +`tags`, for example `prompt_fingerprint` or `template_version`. diff --git a/docs/infospace-bench-adaptive-routing.md b/docs/infospace-bench-adaptive-routing.md new file mode 100644 index 0000000..87e571c --- /dev/null +++ b/docs/infospace-bench-adaptive-routing.md @@ -0,0 +1,83 @@ +# Infospace-Bench Adaptive Routing Guide + +This guide shows how a consumer such as `infospace-bench` can wire task-type +stages into the adaptive cost-quality primitives from `llm-connect`. + +## Stage taxonomy + +The consumer owns task names and quality thresholds. A first pass for +`infospace-bench` could use: + +| Stage | Task type | Suggested floor | +|-------|-----------|-----------------| +| Source chapter summary | `summarize-source` | `0.82` | +| Entity extraction | `extract-entities` | `0.88` | +| Relation extraction | `extract-relations` | `0.86` | +| Entity evaluation | `evaluate-entity` | `0.90` | +| Report synthesis | `synthesize-report` | `0.92` | + +These floors are starting points, not library defaults. Raise them for stages +whose errors compound downstream. + +## Wiring sketch + +```python +from llm_connect.grading import ExactMatchJudge, PairedGrader +from llm_connect.quality import QualityLedger +from llm_connect.routing import AdaptiveRoutingPolicy, RoutingRule +from llm_connect.shadowing import ShadowingAdapter + +ledger = QualityLedger("quality-ledger.jsonl") +grader = PairedGrader(ExactMatchJudge()) + +baseline = claude_code_adapter +cheap = openrouter_cheap_adapter +mid = openrouter_mid_adapter + +shadowed_cheap = ShadowingAdapter( + candidate_adapter=cheap, + baseline_adapter=baseline, + grader=grader, + ledger=ledger, + task_type="extract-relations", + adapter_id="openrouter-cheap", + baseline_adapter_id="claude-code", + shadow_rate=0.1, + tags={"prompt_fingerprint": prompt_fingerprint}, +) + +policy = AdaptiveRoutingPolicy( + rules=[ + RoutingRule("extract-relations", prefer=baseline, fallback=mid), + ], + ledger=ledger, + adapters_by_id={ + "openrouter-cheap": shadowed_cheap, + "openrouter-mid": mid, + "claude-code": baseline, + }, + window_size=20, + min_observations=3, +) + +adapter = policy.resolve("extract-relations", quality_floor=0.86) +response = adapter.execute_prompt(prompt, run_config) +``` + +## Operating loop + +1. Start with static routing to the trusted baseline or mid-tier adapter. +2. Wrap cheaper candidates with `ShadowingAdapter` at a conservative + `shadow_rate`, for example `0.05` to `0.1`. +3. Record a prompt fingerprint or template version in `tags` so later prompt + changes do not mix incompatible observations. +4. Increase `min_observations` for stages with high variance. +5. Let `AdaptiveRoutingPolicy` select the cheapest adapter that clears each + stage floor. + +## Refresh rules + +When a provider model, prompt template, or parser contract changes, treat prior +observations as a different regime. Either write to a new ledger, prune old +observations, or filter with a new `prompt_fingerprint` tag before trusting +adaptive selection again. diff --git a/examples/adaptive_routing_fixture_batch.py b/examples/adaptive_routing_fixture_batch.py new file mode 100644 index 0000000..edbc394 --- /dev/null +++ b/examples/adaptive_routing_fixture_batch.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python3 +"""Populate a quality ledger from a small adaptive-routing fixture batch.""" + +from __future__ import annotations + +import argparse +import sys +from dataclasses import dataclass +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parents[1] +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + +from llm_connect.adapter import LLMAdapter +from llm_connect.grading import ExactMatchJudge, PairedGrader +from llm_connect.models import LLMResponse, RunConfig +from llm_connect.quality import QualityLedger +from llm_connect.routing import AdaptiveRoutingPolicy, RoutingRule +from llm_connect.shadowing import ShadowingAdapter + + +@dataclass +class FixtureAdapter(LLMAdapter): + adapter_id: str + response_text: str + cost_usd: float + + def execute_prompt(self, prompt: str, config: RunConfig) -> LLMResponse: + prompt_tokens = len(prompt.split()) + completion_tokens = len(self.response_text.split()) + return LLMResponse( + content=self.response_text, + model=self.adapter_id, + usage={ + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "total_tokens": prompt_tokens + completion_tokens, + }, + metadata={"cost_usd": self.cost_usd, "latency_ms": 25.0}, + ) + + def validate_config(self, config: RunConfig) -> bool: + return True + + +def build_candidates() -> dict[str, FixtureAdapter]: + return { + "openrouter-cheap-fixture": FixtureAdapter( + "openrouter-cheap-fixture", + "summary", + 0.001, + ), + "openrouter-mid-fixture": FixtureAdapter( + "openrouter-mid-fixture", + "summary with entities and relations", + 0.004, + ), + "claude-code-baseline-fixture": FixtureAdapter( + "claude-code-baseline-fixture", + "summary with entities and relations", + 0.0, + ), + } + + +def populate_ledger(ledger: QualityLedger) -> dict[str, FixtureAdapter]: + candidates = build_candidates() + baseline = candidates["claude-code-baseline-fixture"] + grader = PairedGrader(ExactMatchJudge()) + prompts = [ + "Summarize chapter one and keep entity names.", + "Extract relations from chapter two.", + "Evaluate whether the entity graph is coherent.", + ] + config = RunConfig(model_name="fixture") + + for task_type, prompt in zip( + ["summarize-source", "extract-relations", "evaluate-entity"], + prompts, + ): + for adapter_id, candidate in candidates.items(): + if candidate is baseline: + continue + ShadowingAdapter( + candidate_adapter=candidate, + baseline_adapter=baseline, + grader=grader, + ledger=ledger, + task_type=task_type, + adapter_id=adapter_id, + baseline_adapter_id=baseline.adapter_id, + shadow_rate=1.0, + tags={"fixture": "adaptive-routing"}, + ).execute_prompt(prompt, config) + + return candidates + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument( + "--ledger", + default="quality-ledger.jsonl", + help="Path to the JSONL ledger to populate.", + ) + args = parser.parse_args() + + ledger = QualityLedger(Path(args.ledger)) + candidates = populate_ledger(ledger) + policy = AdaptiveRoutingPolicy( + rules=[ + RoutingRule( + "summarize-source", + prefer=candidates["claude-code-baseline-fixture"], + fallback=candidates["openrouter-mid-fixture"], + ) + ], + ledger=ledger, + adapters_by_id=candidates, + ) + + selected = policy.resolve("summarize-source", quality_floor=0.8) + print(f"ledger={ledger.path}") + print(f"observations={len(ledger.read_all())}") + print(f"selected={selected.adapter_id}") + + +if __name__ == "__main__": + main() diff --git a/llm_connect/grading.py b/llm_connect/grading.py new file mode 100644 index 0000000..1cebbd1 --- /dev/null +++ b/llm_connect/grading.py @@ -0,0 +1,239 @@ +"""Baseline grading primitives for adaptive routing. + +Graders compare a candidate adapter response against a caller-chosen baseline. +They produce normalised quality scores that can be recorded in a +``QualityLedger`` and consumed later by adaptive routing policy. +""" + +from __future__ import annotations + +import json +import re +from dataclasses import dataclass, field, replace +from typing import Any, Protocol + +from llm_connect.adapter import LLMAdapter +from llm_connect.embedding_adapter import EmbeddingAdapter +from llm_connect.models import LLMResponse, RunConfig +from llm_connect.similarity import cosine_similarity + + +def _validate_score(value: float) -> float: + if not isinstance(value, (int, float)): + raise ValueError("quality_score must be a number between 0 and 1") + score = float(value) + if not 0 <= score <= 1: + raise ValueError("quality_score must be between 0 and 1") + return score + + +def _normalise_text(text: str) -> str: + return " ".join(text.strip().split()) + + +@dataclass(frozen=True) +class GradingResult: + """Structured result from comparing candidate output to baseline output.""" + + quality_score: float + notes: str + grader_id: str + baseline_response: LLMResponse + candidate_response: LLMResponse + + def __post_init__(self) -> None: + if not str(self.grader_id).strip(): + raise ValueError("grader_id must be a non-empty string") + object.__setattr__(self, "quality_score", _validate_score(self.quality_score)) + object.__setattr__(self, "notes", str(self.notes)) + + +class Judge(Protocol): + """Compare baseline and candidate responses.""" + + grader_id: str + + def judge( + self, + baseline_response: LLMResponse, + candidate_response: LLMResponse, + *, + prompt: str, + run_config: RunConfig, + ) -> GradingResult: + """Return a quality score for candidate relative to baseline.""" + + +class BaselineGrader(Protocol): + """Run baseline and candidate adapters, then judge the paired responses.""" + + def grade( + self, + baseline_adapter: LLMAdapter, + candidate_adapter: LLMAdapter, + prompt: str, + run_config: RunConfig, + ) -> GradingResult: + """Return a structured grading result.""" + + +@dataclass +class ExactMatchJudge: + """Judge that scores 1.0 when response text matches exactly after normalisation.""" + + normalize_whitespace: bool = True + case_sensitive: bool = True + grader_id: str = "exact-match" + + def judge( + self, + baseline_response: LLMResponse, + candidate_response: LLMResponse, + *, + prompt: str, + run_config: RunConfig, + ) -> GradingResult: + baseline_text = baseline_response.content + candidate_text = candidate_response.content + if self.normalize_whitespace: + baseline_text = _normalise_text(baseline_text) + candidate_text = _normalise_text(candidate_text) + if not self.case_sensitive: + baseline_text = baseline_text.casefold() + candidate_text = candidate_text.casefold() + + matched = baseline_text == candidate_text + return GradingResult( + quality_score=1.0 if matched else 0.0, + notes="exact match" if matched else "candidate content differs from baseline", + grader_id=self.grader_id, + baseline_response=baseline_response, + candidate_response=candidate_response, + ) + + +@dataclass +class EmbeddingSimilarityJudge: + """Judge that maps cosine similarity between response embeddings to 0..1.""" + + embedding_adapter: EmbeddingAdapter + grader_id: str = "embedding-similarity" + + def judge( + self, + baseline_response: LLMResponse, + candidate_response: LLMResponse, + *, + prompt: str, + run_config: RunConfig, + ) -> GradingResult: + embeddings = self.embedding_adapter.embed( + [baseline_response.content, candidate_response.content] + ) + if len(embeddings) != 2: + raise ValueError("EmbeddingSimilarityJudge expected exactly two embeddings") + + raw_similarity = cosine_similarity(embeddings[0], embeddings[1]) + quality_score = max(0.0, min(1.0, raw_similarity)) + return GradingResult( + quality_score=quality_score, + notes=f"cosine similarity {raw_similarity:.4f}", + grader_id=self.grader_id, + baseline_response=baseline_response, + candidate_response=candidate_response, + ) + + +@dataclass +class LLMJudge: + """LLM-as-judge wrapper using a fixed rubric prompt and JSON response.""" + + judge_adapter: LLMAdapter + rubric: str = ( + "Compare the candidate response to the baseline response. " + "Return JSON only with keys quality_score and notes. " + "quality_score must be a number from 0 to 1." + ) + grader_id: str = "llm-judge" + seed: int | None = 0 + + def judge( + self, + baseline_response: LLMResponse, + candidate_response: LLMResponse, + *, + prompt: str, + run_config: RunConfig, + ) -> GradingResult: + judge_prompt = self._build_prompt(prompt, baseline_response, candidate_response) + judge_config = self._judge_config(run_config) + response = self.judge_adapter.execute_prompt(judge_prompt, judge_config) + parsed = self._parse_judge_response(response.content) + return GradingResult( + quality_score=parsed["quality_score"], + notes=parsed["notes"], + grader_id=self.grader_id, + baseline_response=baseline_response, + candidate_response=candidate_response, + ) + + def _judge_config(self, run_config: RunConfig) -> RunConfig: + params: dict[str, Any] = dict(run_config.model_params) + if self.seed is not None: + params.setdefault("seed", self.seed) + return replace(run_config, temperature=0.0, model_params=params, budget_tracker=None) + + def _build_prompt( + self, + prompt: str, + baseline_response: LLMResponse, + candidate_response: LLMResponse, + ) -> str: + return ( + f"{self.rubric}\n\n" + f"Original prompt:\n{prompt}\n\n" + f"Baseline response:\n{baseline_response.content}\n\n" + f"Candidate response:\n{candidate_response.content}\n" + ) + + def _parse_judge_response(self, content: str) -> dict[str, Any]: + try: + data = json.loads(content) + except json.JSONDecodeError: + match = re.search(r"\{.*\}", content, flags=re.DOTALL) + if not match: + raise ValueError("LLMJudge response did not contain JSON") from None + try: + data = json.loads(match.group(0)) + except json.JSONDecodeError as exc: + raise ValueError("LLMJudge response JSON could not be parsed") from exc + + if not isinstance(data, dict): + raise ValueError("LLMJudge response JSON must be an object") + return { + "quality_score": _validate_score(data.get("quality_score")), + "notes": str(data.get("notes", "")), + } + + +@dataclass +class PairedGrader: + """Baseline grader that runs both adapters and delegates comparison to a judge.""" + + judge: Judge = field(default_factory=ExactMatchJudge) + + def grade( + self, + baseline_adapter: LLMAdapter, + candidate_adapter: LLMAdapter, + prompt: str, + run_config: RunConfig, + ) -> GradingResult: + baseline_response = baseline_adapter.execute_prompt(prompt, run_config) + candidate_response = candidate_adapter.execute_prompt(prompt, run_config) + return self.judge.judge( + baseline_response, + candidate_response, + prompt=prompt, + run_config=run_config, + ) diff --git a/llm_connect/quality.py b/llm_connect/quality.py new file mode 100644 index 0000000..63497e1 --- /dev/null +++ b/llm_connect/quality.py @@ -0,0 +1,318 @@ +"""Quality observations and append-only ledger support. + +These primitives let callers record observed quality/cost outcomes for a +task type without baking consumer-specific routing policy into llm-connect. +""" + +from __future__ import annotations + +import json +import os +import threading +from contextlib import contextmanager +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any, Iterator, TextIO + + +_PATH_LOCKS: dict[Path, threading.Lock] = {} +_PATH_LOCKS_GUARD = threading.Lock() + + +def _utc_now() -> datetime: + return datetime.now(timezone.utc) + + +def _normalise_datetime(value: datetime | str) -> datetime: + if isinstance(value, datetime): + dt = value + elif isinstance(value, str): + dt = datetime.fromisoformat(value.replace("Z", "+00:00")) + else: + raise TypeError(f"Expected datetime or ISO string, got {type(value).__name__}") + + if dt.tzinfo is None: + return dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc) + + +def _serialise_datetime(value: datetime) -> str: + return _normalise_datetime(value).isoformat().replace("+00:00", "Z") + + +def _validate_non_negative_int(name: str, value: int) -> None: + if not isinstance(value, int) or value < 0: + raise ValueError(f"{name} must be a non-negative integer") + + +def _validate_non_negative_float(name: str, value: float) -> None: + if not isinstance(value, (int, float)) or float(value) < 0: + raise ValueError(f"{name} must be a non-negative number") + + +def _path_lock(path: Path) -> threading.Lock: + resolved = path.resolve() + with _PATH_LOCKS_GUARD: + lock = _PATH_LOCKS.get(resolved) + if lock is None: + lock = threading.Lock() + _PATH_LOCKS[resolved] = lock + return lock + + +def _lock_file(handle: TextIO) -> None: + if os.name == "nt": + import msvcrt + + msvcrt.locking(handle.fileno(), msvcrt.LK_LOCK, 1) + else: + import fcntl + + fcntl.flock(handle.fileno(), fcntl.LOCK_EX) + + +def _unlock_file(handle: TextIO) -> None: + if os.name == "nt": + import msvcrt + + msvcrt.locking(handle.fileno(), msvcrt.LK_UNLCK, 1) + else: + import fcntl + + fcntl.flock(handle.fileno(), fcntl.LOCK_UN) + + +@contextmanager +def _locked_file(path: Path, mode: str) -> Iterator[TextIO]: + path.parent.mkdir(parents=True, exist_ok=True) + local_lock = _path_lock(path) + with local_lock: + with path.open(mode, encoding="utf-8") as handle: + _lock_file(handle) + try: + yield handle + finally: + _unlock_file(handle) + + +@dataclass(frozen=True) +class QualityObservation: + """Observed quality/cost outcome for one adapter on one task type.""" + + task_type: str + adapter_id: str + model_id: str + cost_usd: float + quality_score: float + latency_ms: float + tokens_in: int + tokens_out: int + baseline_adapter_id: str | None = None + recorded_at: datetime = field(default_factory=_utc_now) + tags: dict[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + for name in ("task_type", "adapter_id", "model_id"): + if not str(getattr(self, name)).strip(): + raise ValueError(f"{name} must be a non-empty string") + + _validate_non_negative_float("cost_usd", self.cost_usd) + _validate_non_negative_float("latency_ms", self.latency_ms) + _validate_non_negative_int("tokens_in", self.tokens_in) + _validate_non_negative_int("tokens_out", self.tokens_out) + if not isinstance(self.quality_score, (int, float)): + raise ValueError("quality_score must be a number between 0 and 1") + if not 0 <= float(self.quality_score) <= 1: + raise ValueError("quality_score must be between 0 and 1") + + object.__setattr__(self, "task_type", str(self.task_type)) + object.__setattr__(self, "adapter_id", str(self.adapter_id)) + object.__setattr__(self, "model_id", str(self.model_id)) + object.__setattr__(self, "cost_usd", float(self.cost_usd)) + object.__setattr__(self, "quality_score", float(self.quality_score)) + object.__setattr__(self, "latency_ms", float(self.latency_ms)) + object.__setattr__(self, "recorded_at", _normalise_datetime(self.recorded_at)) + object.__setattr__(self, "tags", dict(self.tags)) + + @property + def total_tokens(self) -> int: + """Return input plus output tokens.""" + return self.tokens_in + self.tokens_out + + def to_dict(self) -> dict[str, Any]: + """Convert to a JSON-serialisable dictionary.""" + return { + "task_type": self.task_type, + "adapter_id": self.adapter_id, + "model_id": self.model_id, + "cost_usd": self.cost_usd, + "quality_score": self.quality_score, + "latency_ms": self.latency_ms, + "tokens_in": self.tokens_in, + "tokens_out": self.tokens_out, + "baseline_adapter_id": self.baseline_adapter_id, + "recorded_at": _serialise_datetime(self.recorded_at), + "tags": dict(self.tags), + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "QualityObservation": + """Create an observation from a JSON-decoded dictionary.""" + return cls( + task_type=data["task_type"], + adapter_id=data["adapter_id"], + model_id=data["model_id"], + cost_usd=data["cost_usd"], + quality_score=data["quality_score"], + latency_ms=data["latency_ms"], + tokens_in=data["tokens_in"], + tokens_out=data["tokens_out"], + baseline_adapter_id=data.get("baseline_adapter_id"), + recorded_at=data.get("recorded_at", _utc_now()), + tags=data.get("tags") or {}, + ) + + +def is_stale( + observation: QualityObservation, + max_age: timedelta, + *, + now: datetime | None = None, +) -> bool: + """Return whether *observation* is older than *max_age*.""" + if max_age.total_seconds() < 0: + raise ValueError("max_age must be non-negative") + reference = _normalise_datetime(now or _utc_now()) + return observation.recorded_at < reference - max_age + + +class QualityLedger: + """Append-only JSONL store for :class:`QualityObservation` records.""" + + def __init__(self, path: str | Path): + self._path = Path(path) + + @property + def path(self) -> Path: + """Ledger file path.""" + return self._path + + def append(self, observation: QualityObservation) -> None: + """Append one observation as a locked JSONL record.""" + line = json.dumps(observation.to_dict(), sort_keys=True, separators=(",", ":")) + with _locked_file(self._path, "a") as handle: + handle.write(line + "\n") + handle.flush() + os.fsync(handle.fileno()) + + def read_all(self) -> list[QualityObservation]: + """Return all parseable observations, skipping malformed lines.""" + observations, _ = self._read_with_malformed_count() + return observations + + def malformed_count(self) -> int: + """Return the number of malformed lines currently skipped by reads.""" + _, malformed = self._read_with_malformed_count() + return malformed + + def by_task_type(self, task_type: str) -> list[QualityObservation]: + """Return observations matching *task_type*.""" + return [obs for obs in self.read_all() if obs.task_type == task_type] + + def recent( + self, + limit: int | None = None, + *, + task_type: str | None = None, + adapter_id: str | None = None, + since: datetime | None = None, + ) -> list[QualityObservation]: + """Return newest observations first, optionally filtered.""" + if limit is not None and limit < 0: + raise ValueError("limit must be non-negative") + + cutoff = _normalise_datetime(since) if since is not None else None + observations = self.read_all() + if task_type is not None: + observations = [obs for obs in observations if obs.task_type == task_type] + if adapter_id is not None: + observations = [obs for obs in observations if obs.adapter_id == adapter_id] + if cutoff is not None: + observations = [obs for obs in observations if obs.recorded_at >= cutoff] + + observations.sort(key=lambda obs: obs.recorded_at, reverse=True) + if limit is None: + return observations + return observations[:limit] + + def mean_quality( + self, + task_type: str, + *, + adapter_id: str | None = None, + model_id: str | None = None, + max_age: timedelta | None = None, + min_observations: int = 1, + ) -> float | None: + """Return mean quality for matching observations, or ``None`` if absent.""" + if min_observations <= 0: + raise ValueError("min_observations must be positive") + + observations = self.by_task_type(task_type) + if adapter_id is not None: + observations = [obs for obs in observations if obs.adapter_id == adapter_id] + if model_id is not None: + observations = [obs for obs in observations if obs.model_id == model_id] + if max_age is not None: + observations = [obs for obs in observations if not is_stale(obs, max_age)] + + if len(observations) < min_observations: + return None + return sum(obs.quality_score for obs in observations) / len(observations) + + def prune_before(self, timestamp: datetime) -> int: + """Remove valid observations recorded before *timestamp*. + + Malformed lines are preserved because their timestamp cannot be trusted. + Returns the number of valid observation records removed. + """ + cutoff = _normalise_datetime(timestamp) + removed = 0 + with _locked_file(self._path, "a+") as handle: + handle.seek(0) + lines = handle.readlines() + kept: list[str] = [] + for line in lines: + try: + obs = QualityObservation.from_dict(json.loads(line)) + except (json.JSONDecodeError, KeyError, TypeError, ValueError): + kept.append(line) + continue + if obs.recorded_at < cutoff: + removed += 1 + else: + kept.append(line) + + handle.seek(0) + handle.truncate() + handle.writelines(kept) + handle.flush() + os.fsync(handle.fileno()) + return removed + + def _read_with_malformed_count(self) -> tuple[list[QualityObservation], int]: + if not self._path.is_file(): + return [], 0 + + observations: list[QualityObservation] = [] + malformed = 0 + with _locked_file(self._path, "r") as handle: + for line in handle: + if not line.strip(): + continue + try: + observations.append(QualityObservation.from_dict(json.loads(line))) + except (json.JSONDecodeError, KeyError, TypeError, ValueError): + malformed += 1 + return observations, malformed diff --git a/llm_connect/routing.py b/llm_connect/routing.py index 8f39957..9ffda1a 100644 --- a/llm_connect/routing.py +++ b/llm_connect/routing.py @@ -5,9 +5,11 @@ Maps task types to preferred adapters with optional cost-cap fallback. """ from dataclasses import dataclass, field -from typing import Optional, List +from datetime import datetime, timedelta, timezone +from typing import List, Mapping, Optional from llm_connect.adapter import LLMAdapter +from llm_connect.quality import QualityLedger, QualityObservation @dataclass @@ -87,3 +89,172 @@ class RoutingPolicy: raise LookupError( f"No routing rule for task_type={task_type!r} and no default configured" ) + + +@dataclass(frozen=True) +class _CandidateMetrics: + adapter_id: str + adapter: LLMAdapter + mean_quality: float + mean_cost_usd: float + order: int + is_static_prefer: bool + + +@dataclass +class AdaptiveRoutingPolicy(RoutingPolicy): + """Route to the cheapest adapter whose observed quality clears a floor. + + The policy consults a :class:`~llm_connect.quality.QualityLedger` for + observations matching ``task_type`` and adapter id. When the ledger has no + qualifying observations, resolution falls through to ``RoutingPolicy`` so a + caller can use the same policy on day zero and after observations accrue. + """ + + ledger: Optional[QualityLedger] = None + adapters_by_id: Mapping[str, LLMAdapter] = field(default_factory=dict) + window_size: int = 20 + min_observations: int = 1 + max_age: Optional[timedelta] = None + + def __post_init__(self) -> None: + if self.window_size <= 0: + raise ValueError("window_size must be positive") + if self.min_observations <= 0: + raise ValueError("min_observations must be positive") + if self.max_age is not None and self.max_age.total_seconds() < 0: + raise ValueError("max_age must be non-negative") + + def resolve( + self, + task_type: str, + estimated_cost_per_1k: Optional[float] = None, + *, + quality_floor: Optional[float] = None, + ) -> LLMAdapter: + """Return the adaptive adapter for *task_type*. + + Args: + task_type: Logical task identifier. + estimated_cost_per_1k: Passed through to static routing fallback. + quality_floor: Minimum observed mean quality required for adaptive + selection. When omitted, static routing is used. + + Returns: + The selected :class:`~llm_connect.adapter.LLMAdapter`. + """ + if quality_floor is None or self.ledger is None: + return super().resolve(task_type, estimated_cost_per_1k) + if not 0 <= quality_floor <= 1: + raise ValueError("quality_floor must be between 0 and 1") + + metrics = self._qualifying_candidates(task_type, quality_floor) + if not metrics: + return super().resolve(task_type, estimated_cost_per_1k) + + best = min( + metrics, + key=lambda candidate: ( + candidate.mean_cost_usd, + 0 if candidate.is_static_prefer else 1, + candidate.order, + ), + ) + return best.adapter + + def _qualifying_candidates( + self, + task_type: str, + quality_floor: float, + ) -> list[_CandidateMetrics]: + static_prefer = self._static_preferred_adapter(task_type) + candidates: list[_CandidateMetrics] = [] + for order, (adapter_id, adapter) in enumerate(self._candidate_entries(task_type)): + observations = self._windowed_observations(task_type, adapter_id) + if len(observations) < self.min_observations: + continue + + mean_quality = sum(obs.quality_score for obs in observations) / len(observations) + if mean_quality < quality_floor: + continue + + mean_cost = sum(obs.cost_usd for obs in observations) / len(observations) + candidates.append( + _CandidateMetrics( + adapter_id=adapter_id, + adapter=adapter, + mean_quality=mean_quality, + mean_cost_usd=mean_cost, + order=order, + is_static_prefer=adapter is static_prefer, + ) + ) + return candidates + + def _windowed_observations( + self, + task_type: str, + adapter_id: str, + ) -> list[QualityObservation]: + if self.ledger is None: + return [] + + since = None + if self.max_age is not None: + since = datetime.now(timezone.utc) - self.max_age + + return self.ledger.recent( + limit=self.window_size, + task_type=task_type, + adapter_id=adapter_id, + since=since, + ) + + def _candidate_entries(self, task_type: str) -> list[tuple[str, LLMAdapter]]: + entries: list[tuple[str, LLMAdapter]] = [] + seen_ids: set[str] = set() + + def add(adapter_id: str | None, adapter: LLMAdapter | None) -> None: + if adapter is None or adapter_id is None or adapter_id in seen_ids: + return + seen_ids.add(adapter_id) + entries.append((adapter_id, adapter)) + + for adapter_id, adapter in self.adapters_by_id.items(): + add(adapter_id, adapter) + + for adapter in self._static_candidate_adapters(task_type): + add(self._adapter_id_for(adapter), adapter) + + return entries + + def _static_candidate_adapters(self, task_type: str) -> list[LLMAdapter]: + for rule in self.rules: + if rule.task_type == task_type: + candidates = [rule.prefer] + if rule.fallback is not None: + candidates.append(rule.fallback) + if self.default is not None: + candidates.append(self.default) + return candidates + + if self.default is not None: + return [self.default] + return [] + + def _static_preferred_adapter(self, task_type: str) -> LLMAdapter | None: + for rule in self.rules: + if rule.task_type == task_type: + return rule.prefer + return None + + def _adapter_id_for(self, adapter: LLMAdapter) -> str | None: + for adapter_id, candidate in self.adapters_by_id.items(): + if candidate is adapter: + return adapter_id + + for attribute in ("adapter_id", "id", "name"): + value = getattr(adapter, attribute, None) + if isinstance(value, str) and value.strip(): + return value + return None diff --git a/llm_connect/shadowing.py b/llm_connect/shadowing.py new file mode 100644 index 0000000..058a0eb --- /dev/null +++ b/llm_connect/shadowing.py @@ -0,0 +1,177 @@ +"""Shadow-mode observation adapter for adaptive routing.""" + +from __future__ import annotations + +import asyncio +import random +import threading +from concurrent.futures import Future, ThreadPoolExecutor +from dataclasses import dataclass, field, replace +from typing import Any, Callable, Mapping + +from llm_connect.adapter import LLMAdapter +from llm_connect.grading import BaselineGrader +from llm_connect.models import LLMResponse, RunConfig +from llm_connect.quality import QualityLedger, QualityObservation + + +def _default_cost_estimator(response: LLMResponse) -> float: + for key in ("cost_usd", "estimated_cost_usd", "cost"): + value = response.metadata.get(key) + if isinstance(value, (int, float)) and value >= 0: + return float(value) + return 0.0 + + +class _StaticResponseAdapter(LLMAdapter): + """Adapter shim that lets a BaselineGrader reuse an existing response.""" + + def __init__(self, response: LLMResponse): + self._response = response + + def execute_prompt(self, prompt: str, config: RunConfig) -> LLMResponse: + return self._response + + def validate_config(self, config: RunConfig) -> bool: + return True + + +@dataclass +class ShadowingAdapter(LLMAdapter): + """Return candidate responses while recording sampled baseline grades. + + Shadow work is best-effort: baseline, grading, or ledger failures are + reported to ``on_shadow_error`` when provided, but never alter the candidate + response returned to the caller. + """ + + candidate_adapter: LLMAdapter + baseline_adapter: LLMAdapter + grader: BaselineGrader + ledger: QualityLedger + task_type: str + adapter_id: str + model_id: str | None = None + baseline_adapter_id: str | None = None + shadow_rate: float = 1.0 + async_shadow: bool = False + random_source: random.Random = field(default_factory=random.Random, repr=False) + cost_estimator: Callable[[LLMResponse], float] = _default_cost_estimator + tags: Mapping[str, Any] = field(default_factory=dict) + on_shadow_error: Callable[[Exception], None] | None = None + _executor: ThreadPoolExecutor | None = field(default=None, init=False, repr=False) + _futures: list[Future[None]] = field(default_factory=list, init=False, repr=False) + _lock: threading.Lock = field(default_factory=threading.Lock, init=False, repr=False) + + def __post_init__(self) -> None: + if not str(self.task_type).strip(): + raise ValueError("task_type must be a non-empty string") + if not str(self.adapter_id).strip(): + raise ValueError("adapter_id must be a non-empty string") + if not 0 <= self.shadow_rate <= 1: + raise ValueError("shadow_rate must be between 0 and 1") + if self.async_shadow: + self._executor = ThreadPoolExecutor(max_workers=1) + + def execute_prompt(self, prompt: str, config: RunConfig) -> LLMResponse: + response = self.candidate_adapter.execute_prompt(prompt, config) + if self._should_shadow(): + self._handle_shadow(prompt, config, response) + return response + + async def async_execute_prompt(self, prompt: str, config: RunConfig) -> LLMResponse: + response = await self.candidate_adapter.async_execute_prompt(prompt, config) + if self._should_shadow(): + if self.async_shadow: + self._schedule_shadow(prompt, config, response) + else: + await asyncio.to_thread(self._run_shadow, prompt, config, response) + return response + + def validate_config(self, config: RunConfig) -> bool: + return self.candidate_adapter.validate_config(config) + + def flush(self, timeout: float | None = None) -> None: + """Wait for currently queued async shadow work to finish.""" + with self._lock: + futures = list(self._futures) + self._futures.clear() + for future in futures: + future.result(timeout=timeout) + + def shutdown(self, wait: bool = True) -> None: + """Shut down the background shadow executor if one was created.""" + if self._executor is not None: + self._executor.shutdown(wait=wait) + self._executor = None + + def _should_shadow(self) -> bool: + if self.shadow_rate <= 0: + return False + if self.shadow_rate >= 1: + return True + with self._lock: + return self.random_source.random() < self.shadow_rate + + def _handle_shadow( + self, + prompt: str, + config: RunConfig, + candidate_response: LLMResponse, + ) -> None: + if self.async_shadow: + self._schedule_shadow(prompt, config, candidate_response) + else: + self._run_shadow(prompt, config, candidate_response) + + def _schedule_shadow( + self, + prompt: str, + config: RunConfig, + candidate_response: LLMResponse, + ) -> None: + if self._executor is None: + self._executor = ThreadPoolExecutor(max_workers=1) + future = self._executor.submit(self._run_shadow, prompt, config, candidate_response) + with self._lock: + self._futures = [item for item in self._futures if not item.done()] + self._futures.append(future) + + def _run_shadow( + self, + prompt: str, + config: RunConfig, + candidate_response: LLMResponse, + ) -> None: + try: + shadow_config = replace(config, budget_tracker=None) + result = self.grader.grade( + self.baseline_adapter, + _StaticResponseAdapter(candidate_response), + prompt, + shadow_config, + ) + self.ledger.append( + QualityObservation( + task_type=self.task_type, + adapter_id=self.adapter_id, + model_id=self.model_id or candidate_response.model or config.model_name, + cost_usd=self.cost_estimator(candidate_response), + quality_score=result.quality_score, + latency_ms=float(candidate_response.metadata.get("latency_ms", 0.0)), + tokens_in=int(candidate_response.usage.get("prompt_tokens", 0)), + tokens_out=int(candidate_response.usage.get("completion_tokens", 0)), + baseline_adapter_id=self.baseline_adapter_id, + tags=dict(self.tags), + ) + ) + except Exception as exc: + self._report_shadow_error(exc) + + def _report_shadow_error(self, exc: Exception) -> None: + if self.on_shadow_error is None: + return + try: + self.on_shadow_error(exc) + except Exception: + pass diff --git a/tests/test_adaptive_integration.py b/tests/test_adaptive_integration.py new file mode 100644 index 0000000..22b8e75 --- /dev/null +++ b/tests/test_adaptive_integration.py @@ -0,0 +1,90 @@ +""" +Integration coverage for the adaptive routing workplan flow. +""" + +from datetime import datetime, timezone + +from llm_connect.adapter import MockLLMAdapter +from llm_connect.quality import QualityLedger, QualityObservation +from llm_connect.routing import AdaptiveRoutingPolicy, RoutingRule + + +def append_quality( + ledger: QualityLedger, + adapter_id: str, + quality_score: float, + cost_usd: float, + *, + recorded_at: datetime, +) -> None: + ledger.append( + QualityObservation( + task_type="summarize", + adapter_id=adapter_id, + model_id=f"{adapter_id}-model", + cost_usd=cost_usd, + quality_score=quality_score, + latency_ms=100, + tokens_in=100, + tokens_out=50, + recorded_at=recorded_at, + baseline_adapter_id="baseline", + ) + ) + + +def test_adaptive_policy_converges_to_cheapest_qualifying_adapter(tmp_path): + cheap = MockLLMAdapter("cheap") + mid = MockLLMAdapter("mid") + smart = MockLLMAdapter("smart") + ledger = QualityLedger(tmp_path / "quality.jsonl") + policy = AdaptiveRoutingPolicy( + rules=[ + RoutingRule( + "summarize", + prefer=smart, + max_cost_per_1k=1.0, + fallback=mid, + ) + ], + ledger=ledger, + adapters_by_id={"cheap": cheap, "mid": mid, "smart": smart}, + window_size=2, + ) + + assert policy.resolve("summarize", quality_floor=0.8) is smart + assert policy.resolve("summarize", 2.0, quality_floor=0.8) is mid + + append_quality( + ledger, + "cheap", + quality_score=0.7, + cost_usd=0.01, + recorded_at=datetime(2026, 5, 17, 10, tzinfo=timezone.utc), + ) + append_quality( + ledger, + "mid", + quality_score=0.86, + cost_usd=0.02, + recorded_at=datetime(2026, 5, 17, 10, tzinfo=timezone.utc), + ) + append_quality( + ledger, + "smart", + quality_score=0.95, + cost_usd=0.05, + recorded_at=datetime(2026, 5, 17, 10, tzinfo=timezone.utc), + ) + + assert policy.resolve("summarize", quality_floor=0.8) is mid + + append_quality( + ledger, + "cheap", + quality_score=0.95, + cost_usd=0.01, + recorded_at=datetime(2026, 5, 17, 11, tzinfo=timezone.utc), + ) + + assert policy.resolve("summarize", quality_floor=0.8) is cheap diff --git a/tests/test_adaptive_routing.py b/tests/test_adaptive_routing.py new file mode 100644 index 0000000..822fe77 --- /dev/null +++ b/tests/test_adaptive_routing.py @@ -0,0 +1,181 @@ +""" +Tests for AdaptiveRoutingPolicy. +""" + +from datetime import datetime, timedelta, timezone + +from llm_connect.adapter import MockLLMAdapter +from llm_connect.quality import QualityLedger, QualityObservation +from llm_connect.routing import AdaptiveRoutingPolicy, RoutingRule + + +def append_observation( + ledger: QualityLedger, + *, + adapter_id: str, + quality_score: float, + cost_usd: float, + task_type: str = "summarize", + recorded_at: datetime | None = None, +) -> None: + ledger.append( + QualityObservation( + task_type=task_type, + adapter_id=adapter_id, + model_id=f"{adapter_id}-model", + cost_usd=cost_usd, + quality_score=quality_score, + latency_ms=100, + tokens_in=100, + tokens_out=50, + baseline_adapter_id="baseline", + recorded_at=recorded_at or datetime(2026, 5, 17, tzinfo=timezone.utc), + ) + ) + + +class TestAdaptiveRoutingPolicy: + def _adapter(self, name: str) -> MockLLMAdapter: + return MockLLMAdapter(mock_response=name) + + def test_selects_cheapest_adapter_that_clears_quality_floor(self, tmp_path): + cheap = self._adapter("cheap") + smart = self._adapter("smart") + ledger = QualityLedger(tmp_path / "quality.jsonl") + append_observation(ledger, adapter_id="cheap", quality_score=0.7, cost_usd=0.01) + append_observation(ledger, adapter_id="smart", quality_score=0.9, cost_usd=0.03) + + policy = AdaptiveRoutingPolicy( + rules=[RoutingRule("summarize", prefer=cheap)], + ledger=ledger, + adapters_by_id={"cheap": cheap, "smart": smart}, + ) + + assert policy.resolve("summarize", quality_floor=0.8) is smart + + def test_prefers_lower_observed_cost_when_multiple_adapters_clear_floor(self, tmp_path): + cheap = self._adapter("cheap") + smart = self._adapter("smart") + ledger = QualityLedger(tmp_path / "quality.jsonl") + append_observation(ledger, adapter_id="cheap", quality_score=0.9, cost_usd=0.01) + append_observation(ledger, adapter_id="smart", quality_score=0.95, cost_usd=0.03) + + policy = AdaptiveRoutingPolicy( + rules=[RoutingRule("summarize", prefer=smart)], + ledger=ledger, + adapters_by_id={"cheap": cheap, "smart": smart}, + ) + + assert policy.resolve("summarize", quality_floor=0.8) is cheap + + def test_equal_cost_tie_prefers_static_rule_prefer(self, tmp_path): + candidate = self._adapter("candidate") + preferred = self._adapter("preferred") + ledger = QualityLedger(tmp_path / "quality.jsonl") + append_observation(ledger, adapter_id="candidate", quality_score=0.9, cost_usd=0.01) + append_observation(ledger, adapter_id="preferred", quality_score=0.9, cost_usd=0.01) + + policy = AdaptiveRoutingPolicy( + rules=[RoutingRule("summarize", prefer=preferred)], + ledger=ledger, + adapters_by_id={"candidate": candidate, "preferred": preferred}, + ) + + assert policy.resolve("summarize", quality_floor=0.8) is preferred + + def test_cold_start_falls_through_to_static_policy(self, tmp_path): + preferred = self._adapter("preferred") + fallback = self._adapter("fallback") + policy = AdaptiveRoutingPolicy( + rules=[RoutingRule("summarize", prefer=preferred, fallback=fallback)], + ledger=QualityLedger(tmp_path / "quality.jsonl"), + adapters_by_id={"preferred": preferred, "fallback": fallback}, + ) + + assert policy.resolve("summarize", quality_floor=0.8) is preferred + + def test_window_size_changes_observed_mean_quality(self, tmp_path): + cheap = self._adapter("cheap") + smart = self._adapter("smart") + ledger = QualityLedger(tmp_path / "quality.jsonl") + append_observation( + ledger, + adapter_id="cheap", + quality_score=0.9, + cost_usd=0.01, + recorded_at=datetime(2026, 5, 16, tzinfo=timezone.utc), + ) + append_observation( + ledger, + adapter_id="cheap", + quality_score=0.7, + cost_usd=0.01, + recorded_at=datetime(2026, 5, 17, tzinfo=timezone.utc), + ) + append_observation(ledger, adapter_id="smart", quality_score=0.9, cost_usd=0.03) + + recent_only = AdaptiveRoutingPolicy( + rules=[RoutingRule("summarize", prefer=smart)], + ledger=ledger, + adapters_by_id={"cheap": cheap, "smart": smart}, + window_size=1, + ) + wider_window = AdaptiveRoutingPolicy( + rules=[RoutingRule("summarize", prefer=smart)], + ledger=ledger, + adapters_by_id={"cheap": cheap, "smart": smart}, + window_size=2, + ) + + assert recent_only.resolve("summarize", quality_floor=0.8) is smart + assert wider_window.resolve("summarize", quality_floor=0.8) is cheap + + def test_stale_observations_are_ignored_by_max_age(self, tmp_path): + stale = self._adapter("stale") + fresh = self._adapter("fresh") + ledger = QualityLedger(tmp_path / "quality.jsonl") + append_observation( + ledger, + adapter_id="stale", + quality_score=1.0, + cost_usd=0.01, + recorded_at=datetime(2020, 1, 1, tzinfo=timezone.utc), + ) + append_observation( + ledger, + adapter_id="fresh", + quality_score=0.9, + cost_usd=0.03, + recorded_at=datetime.now(timezone.utc), + ) + + policy = AdaptiveRoutingPolicy( + rules=[RoutingRule("summarize", prefer=stale)], + ledger=ledger, + adapters_by_id={"stale": stale, "fresh": fresh}, + max_age=timedelta(days=1), + ) + + assert policy.resolve("summarize", quality_floor=0.8) is fresh + + def test_static_fallback_chain_is_preserved_when_no_candidate_qualifies(self, tmp_path): + preferred = self._adapter("preferred") + fallback = self._adapter("fallback") + ledger = QualityLedger(tmp_path / "quality.jsonl") + append_observation(ledger, adapter_id="preferred", quality_score=0.6, cost_usd=0.01) + append_observation(ledger, adapter_id="fallback", quality_score=0.7, cost_usd=0.005) + + policy = AdaptiveRoutingPolicy( + rules=[ + RoutingRule( + "summarize", + prefer=preferred, + max_cost_per_1k=1.0, + fallback=fallback, + ) + ], + ledger=ledger, + adapters_by_id={"preferred": preferred, "fallback": fallback}, + ) + + assert policy.resolve("summarize", 2.0, quality_floor=0.8) is fallback diff --git a/tests/test_grading.py b/tests/test_grading.py new file mode 100644 index 0000000..1d19f3d --- /dev/null +++ b/tests/test_grading.py @@ -0,0 +1,198 @@ +""" +Tests for baseline grading and built-in judges. +""" + +import pytest + +from llm_connect.adapter import MockLLMAdapter +from llm_connect.embedding_adapter import EmbeddingAdapter +from llm_connect.grading import ( + EmbeddingSimilarityJudge, + ExactMatchJudge, + GradingResult, + LLMJudge, + PairedGrader, +) +from llm_connect.models import LLMResponse, RunConfig + + +class StaticEmbeddingAdapter(EmbeddingAdapter): + def __init__(self, embeddings: list[list[float]]): + self.embeddings = embeddings + self.seen_texts: list[str] | None = None + + def embed(self, texts: list[str]) -> list[list[float]]: + self.seen_texts = texts + return self.embeddings + + def validate(self) -> bool: + return True + + +def response(content: str, model: str = "m") -> LLMResponse: + return LLMResponse(content=content, model=model) + + +class TestGradingResult: + def test_score_must_be_between_zero_and_one(self): + with pytest.raises(ValueError, match="quality_score"): + GradingResult( + quality_score=1.5, + notes="bad", + grader_id="g", + baseline_response=response("a"), + candidate_response=response("b"), + ) + + def test_grader_id_must_be_non_empty(self): + with pytest.raises(ValueError, match="grader_id"): + GradingResult( + quality_score=1.0, + notes="ok", + grader_id="", + baseline_response=response("a"), + candidate_response=response("a"), + ) + + +class TestExactMatchJudge: + def test_scores_one_for_normalised_match(self): + judge = ExactMatchJudge() + result = judge.judge( + response("hello world"), + response("hello world"), + prompt="p", + run_config=RunConfig(), + ) + assert result.quality_score == 1.0 + assert result.baseline_response.content == "hello world" + assert result.candidate_response.content == "hello world" + + def test_scores_zero_for_difference(self): + result = ExactMatchJudge().judge( + response("hello"), + response("goodbye"), + prompt="p", + run_config=RunConfig(), + ) + assert result.quality_score == 0.0 + + def test_case_insensitive_mode(self): + result = ExactMatchJudge(case_sensitive=False).judge( + response("Hello"), + response("hello"), + prompt="p", + run_config=RunConfig(), + ) + assert result.quality_score == 1.0 + + +class TestEmbeddingSimilarityJudge: + def test_scores_cosine_similarity(self): + embedding_adapter = StaticEmbeddingAdapter([[1.0, 0.0], [0.5, 0.0]]) + result = EmbeddingSimilarityJudge(embedding_adapter).judge( + response("baseline"), + response("candidate"), + prompt="p", + run_config=RunConfig(), + ) + assert result.quality_score == 1.0 + assert embedding_adapter.seen_texts == ["baseline", "candidate"] + + def test_negative_similarity_clamps_to_zero(self): + embedding_adapter = StaticEmbeddingAdapter([[1.0, 0.0], [-1.0, 0.0]]) + result = EmbeddingSimilarityJudge(embedding_adapter).judge( + response("baseline"), + response("candidate"), + prompt="p", + run_config=RunConfig(), + ) + assert result.quality_score == 0.0 + + def test_wrong_embedding_count_raises(self): + embedding_adapter = StaticEmbeddingAdapter([[1.0, 0.0]]) + with pytest.raises(ValueError, match="two embeddings"): + EmbeddingSimilarityJudge(embedding_adapter).judge( + response("baseline"), + response("candidate"), + prompt="p", + run_config=RunConfig(), + ) + + +class TestLLMJudge: + def test_parses_json_judge_response(self): + judge_adapter = MockLLMAdapter( + mock_response='{"quality_score": 0.75, "notes": "mostly equivalent"}' + ) + run_config = RunConfig(model_params={"existing": True}) + + result = LLMJudge(judge_adapter).judge( + response("baseline answer"), + response("candidate answer"), + prompt="original prompt", + run_config=run_config, + ) + + assert result.quality_score == 0.75 + assert result.notes == "mostly equivalent" + assert "baseline answer" in judge_adapter.last_prompt + assert "candidate answer" in judge_adapter.last_prompt + assert judge_adapter.last_config.temperature == 0.0 + assert judge_adapter.last_config.model_params["existing"] is True + assert judge_adapter.last_config.model_params["seed"] == 0 + assert judge_adapter.last_config.budget_tracker is None + + def test_extracts_json_from_wrapped_response(self): + judge_adapter = MockLLMAdapter( + mock_response='Here is the result: {"quality_score": 1, "notes": "same"}' + ) + result = LLMJudge(judge_adapter).judge( + response("a"), + response("a"), + prompt="p", + run_config=RunConfig(), + ) + assert result.quality_score == 1.0 + assert result.notes == "same" + + def test_invalid_judge_response_raises(self): + judge_adapter = MockLLMAdapter(mock_response="not json") + with pytest.raises(ValueError, match="JSON"): + LLMJudge(judge_adapter).judge( + response("a"), + response("b"), + prompt="p", + run_config=RunConfig(), + ) + + +class TestPairedGrader: + def test_runs_both_adapters_and_preserves_responses(self): + baseline = MockLLMAdapter(mock_response="same") + candidate = MockLLMAdapter(mock_response="same") + result = PairedGrader(ExactMatchJudge()).grade( + baseline, + candidate, + "prompt", + RunConfig(model_name="mock-model"), + ) + + assert result.quality_score == 1.0 + assert result.baseline_response.content == "same" + assert result.candidate_response.content == "same" + assert baseline.call_count == 1 + assert candidate.call_count == 1 + assert baseline.last_prompt == "prompt" + assert candidate.last_prompt == "prompt" + + def test_uses_custom_judge(self): + baseline = MockLLMAdapter(mock_response="a") + candidate = MockLLMAdapter(mock_response="b") + result = PairedGrader(ExactMatchJudge()).grade( + baseline, + candidate, + "prompt", + RunConfig(), + ) + assert result.quality_score == 0.0 diff --git a/tests/test_quality.py b/tests/test_quality.py new file mode 100644 index 0000000..8aca9ae --- /dev/null +++ b/tests/test_quality.py @@ -0,0 +1,164 @@ +""" +Tests for quality observations and the append-only quality ledger. +""" + +import threading +from datetime import datetime, timedelta, timezone + +import pytest + +from llm_connect.quality import QualityLedger, QualityObservation, is_stale + + +def observation( + *, + task_type: str = "summarize", + adapter_id: str = "openrouter:cheap", + model_id: str = "cheap-model", + quality_score: float = 0.8, + recorded_at: datetime | None = None, + tag: str | None = None, +) -> QualityObservation: + tags = {"tag": tag} if tag is not None else {} + return QualityObservation( + task_type=task_type, + adapter_id=adapter_id, + model_id=model_id, + cost_usd=0.01, + quality_score=quality_score, + latency_ms=123.4, + tokens_in=100, + tokens_out=50, + baseline_adapter_id="claude-code", + recorded_at=recorded_at or datetime(2026, 5, 17, tzinfo=timezone.utc), + tags=tags, + ) + + +class TestQualityObservation: + def test_round_trip_dict(self): + obs = observation(tag="a") + restored = QualityObservation.from_dict(obs.to_dict()) + assert restored == obs + assert restored.total_tokens == 150 + assert restored.recorded_at.tzinfo is not None + + def test_naive_recorded_at_is_interpreted_as_utc(self): + obs = observation(recorded_at=datetime(2026, 5, 17, 12, 0, 0)) + assert obs.recorded_at.tzinfo == timezone.utc + + @pytest.mark.parametrize("score", [-0.1, 1.1]) + def test_quality_score_must_be_between_zero_and_one(self, score): + with pytest.raises(ValueError, match="quality_score"): + observation(quality_score=score) + + def test_required_ids_must_be_non_empty(self): + with pytest.raises(ValueError, match="task_type"): + observation(task_type="") + + def test_non_negative_fields_are_enforced(self): + with pytest.raises(ValueError, match="tokens_in"): + QualityObservation( + task_type="x", + adapter_id="a", + model_id="m", + cost_usd=0, + quality_score=1, + latency_ms=0, + tokens_in=-1, + tokens_out=0, + ) + + +class TestQualityLedger: + def test_append_and_read_round_trip(self, tmp_path): + ledger = QualityLedger(tmp_path / "quality.jsonl") + obs = observation() + ledger.append(obs) + assert ledger.read_all() == [obs] + + def test_by_task_type_filters_observations(self, tmp_path): + ledger = QualityLedger(tmp_path / "quality.jsonl") + ledger.append(observation(task_type="summarize")) + ledger.append(observation(task_type="extract")) + assert [obs.task_type for obs in ledger.by_task_type("summarize")] == ["summarize"] + + def test_recent_returns_newest_first_with_filters(self, tmp_path): + ledger = QualityLedger(tmp_path / "quality.jsonl") + older = observation(recorded_at=datetime(2026, 5, 1, tzinfo=timezone.utc), tag="older") + newer = observation(recorded_at=datetime(2026, 5, 2, tzinfo=timezone.utc), tag="newer") + other = observation( + task_type="extract", + recorded_at=datetime(2026, 5, 3, tzinfo=timezone.utc), + tag="other", + ) + ledger.append(older) + ledger.append(newer) + ledger.append(other) + + recent = ledger.recent(limit=1, task_type="summarize") + assert [obs.tags["tag"] for obs in recent] == ["newer"] + + def test_mean_quality_filters_by_adapter_and_minimum_count(self, tmp_path): + ledger = QualityLedger(tmp_path / "quality.jsonl") + ledger.append(observation(adapter_id="a", quality_score=0.5)) + ledger.append(observation(adapter_id="a", quality_score=1.0)) + ledger.append(observation(adapter_id="b", quality_score=0.1)) + + assert ledger.mean_quality("summarize", adapter_id="a") == 0.75 + assert ledger.mean_quality("summarize", adapter_id="a", min_observations=3) is None + + def test_is_stale_uses_utc_reference(self): + obs = observation(recorded_at=datetime(2026, 5, 1, tzinfo=timezone.utc)) + now = datetime(2026, 5, 3, tzinfo=timezone.utc) + assert is_stale(obs, timedelta(days=1), now=now) is True + assert is_stale(obs, timedelta(days=3), now=now) is False + + def test_prune_before_removes_old_valid_observations(self, tmp_path): + ledger = QualityLedger(tmp_path / "quality.jsonl") + old = observation(recorded_at=datetime(2026, 5, 1, tzinfo=timezone.utc), tag="old") + keep = observation(recorded_at=datetime(2026, 5, 2, tzinfo=timezone.utc), tag="keep") + ledger.append(old) + ledger.append(keep) + + removed = ledger.prune_before(datetime(2026, 5, 2, tzinfo=timezone.utc)) + + assert removed == 1 + assert [obs.tags["tag"] for obs in ledger.read_all()] == ["keep"] + + def test_malformed_lines_are_skipped_and_counted(self, tmp_path): + path = tmp_path / "quality.jsonl" + path.write_text("{not json}\n", encoding="utf-8") + ledger = QualityLedger(path) + ledger.append(observation()) + + assert len(ledger.read_all()) == 1 + assert ledger.malformed_count() == 1 + + def test_prune_preserves_malformed_lines(self, tmp_path): + path = tmp_path / "quality.jsonl" + path.write_text("{not json}\n", encoding="utf-8") + ledger = QualityLedger(path) + ledger.append(observation(recorded_at=datetime(2026, 5, 1, tzinfo=timezone.utc))) + + removed = ledger.prune_before(datetime(2026, 5, 2, tzinfo=timezone.utc)) + + assert removed == 1 + assert ledger.malformed_count() == 1 + assert ledger.read_all() == [] + + def test_concurrent_writes_round_trip(self, tmp_path): + ledger = QualityLedger(tmp_path / "quality.jsonl") + + def append_one(index: int) -> None: + ledger.append(observation(tag=str(index))) + + threads = [threading.Thread(target=append_one, args=(i,)) for i in range(25)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + observations = ledger.read_all() + assert len(observations) == 25 + assert {obs.tags["tag"] for obs in observations} == {str(i) for i in range(25)} diff --git a/tests/test_shadowing.py b/tests/test_shadowing.py new file mode 100644 index 0000000..eac49e9 --- /dev/null +++ b/tests/test_shadowing.py @@ -0,0 +1,149 @@ +""" +Tests for ShadowingAdapter. +""" + +import asyncio +import time + +from llm_connect.adapter import LLMAdapter +from llm_connect.grading import ExactMatchJudge, PairedGrader +from llm_connect.models import LLMResponse, RunConfig +from llm_connect.quality import QualityLedger +from llm_connect.shadowing import ShadowingAdapter + + +class StaticAdapter(LLMAdapter): + def __init__( + self, + content: str, + *, + model: str = "model", + cost_usd: float = 0.0, + fail: bool = False, + delay_seconds: float = 0.0, + ): + self.content = content + self.model = model + self.cost_usd = cost_usd + self.fail = fail + self.delay_seconds = delay_seconds + self.calls = 0 + + def execute_prompt(self, prompt: str, config: RunConfig) -> LLMResponse: + self.calls += 1 + if self.delay_seconds: + time.sleep(self.delay_seconds) + if self.fail: + raise RuntimeError("adapter failed") + return LLMResponse( + content=self.content, + model=self.model, + usage={ + "prompt_tokens": len(prompt.split()), + "completion_tokens": len(self.content.split()), + "total_tokens": len(prompt.split()) + len(self.content.split()), + }, + metadata={"cost_usd": self.cost_usd, "latency_ms": 42.0}, + ) + + def validate_config(self, config: RunConfig) -> bool: + return True + + +def shadowing_adapter( + tmp_path, + *, + candidate: StaticAdapter | None = None, + baseline: StaticAdapter | None = None, + shadow_rate: float = 1.0, + async_shadow: bool = False, + errors: list[Exception] | None = None, +) -> ShadowingAdapter: + return ShadowingAdapter( + candidate_adapter=candidate or StaticAdapter("same", model="candidate", cost_usd=0.02), + baseline_adapter=baseline or StaticAdapter("same", model="baseline"), + grader=PairedGrader(ExactMatchJudge()), + ledger=QualityLedger(tmp_path / "quality.jsonl"), + task_type="summarize", + adapter_id="candidate", + baseline_adapter_id="baseline", + shadow_rate=shadow_rate, + async_shadow=async_shadow, + tags={"prompt_fingerprint": "fixture"}, + on_shadow_error=errors.append if errors is not None else None, + ) + + +class TestShadowingAdapter: + def test_sync_shadow_appends_quality_observation(self, tmp_path): + adapter = shadowing_adapter(tmp_path) + + response = adapter.execute_prompt("hello world", RunConfig(model_name="candidate-model")) + + observations = adapter.ledger.read_all() + assert response.content == "same" + assert len(observations) == 1 + assert observations[0].quality_score == 1.0 + assert observations[0].cost_usd == 0.02 + assert observations[0].tokens_in == 2 + assert observations[0].tokens_out == 1 + assert observations[0].baseline_adapter_id == "baseline" + assert observations[0].tags["prompt_fingerprint"] == "fixture" + + def test_candidate_response_survives_baseline_failure(self, tmp_path): + candidate = StaticAdapter("candidate", model="candidate") + baseline = StaticAdapter("baseline", fail=True) + errors: list[Exception] = [] + adapter = shadowing_adapter( + tmp_path, + candidate=candidate, + baseline=baseline, + errors=errors, + ) + + response = adapter.execute_prompt("prompt", RunConfig()) + + assert response.content == "candidate" + assert candidate.calls == 1 + assert baseline.calls == 1 + assert adapter.ledger.read_all() == [] + assert len(errors) == 1 + + def test_shadow_rate_zero_skips_baseline_and_ledger(self, tmp_path): + baseline = StaticAdapter("same") + adapter = shadowing_adapter(tmp_path, baseline=baseline, shadow_rate=0.0) + + for _ in range(3): + adapter.execute_prompt("prompt", RunConfig()) + + assert baseline.calls == 0 + assert adapter.ledger.read_all() == [] + + def test_shadow_rate_one_records_each_call(self, tmp_path): + baseline = StaticAdapter("same") + adapter = shadowing_adapter(tmp_path, baseline=baseline, shadow_rate=1.0) + + for _ in range(3): + adapter.execute_prompt("prompt", RunConfig()) + + assert baseline.calls == 3 + assert len(adapter.ledger.read_all()) == 3 + + def test_async_shadow_mode_flushes_background_work(self, tmp_path): + baseline = StaticAdapter("same", delay_seconds=0.02) + adapter = shadowing_adapter(tmp_path, baseline=baseline, async_shadow=True) + + response = adapter.execute_prompt("prompt", RunConfig()) + adapter.flush(timeout=1) + adapter.shutdown() + + assert response.content == "same" + assert len(adapter.ledger.read_all()) == 1 + + def test_async_execute_prompt_records_shadow(self, tmp_path): + adapter = shadowing_adapter(tmp_path) + + response = asyncio.run(adapter.async_execute_prompt("prompt", RunConfig())) + + assert response.content == "same" + assert len(adapter.ledger.read_all()) == 1 diff --git a/workplans/llm-connect-WP-0004-adaptive-cost-quality-routing.md b/workplans/llm-connect-WP-0004-adaptive-cost-quality-routing.md index 97221ed..142294d 100644 --- a/workplans/llm-connect-WP-0004-adaptive-cost-quality-routing.md +++ b/workplans/llm-connect-WP-0004-adaptive-cost-quality-routing.md @@ -1,6 +1,20 @@ +--- +id: LLM-WP-0004 +type: workplan +title: Adaptive Cost-Quality Routing +domain: custodian +status: completed +owner: llm-connect +created: 2026-05-17 +repo: llm-connect +planning_priority: high +planning_order: 4 +state_hub_workstream_id: e1807fab-e29e-4517-b362-95737a96582d +--- + # LLM-WP-0004 — Adaptive Cost-Quality Routing -**status:** todo +**status:** completed **owner:** llm-connect **repo:** llm-connect **created:** 2026-05-17 @@ -60,53 +74,240 @@ consumer (`inter-hub`, `markitect`) adopts it. ## Tasks +The fenced `task` blocks below are the State Hub registration index. Keep them +in sync with the detailed task tables that follow. + +```task +id: T01 +title: 'QualityObservation dataclass: task_type, adapter_id, model_id, cost_usd, quality_score (0..1), latency_ms, tokens_in, tokens_out, baseline_adapter_id, recorded_at, tags' +priority: high +status: done +state_hub_task_id: "1c285bec-c30b-45a8-a408-3f91d810a078" +``` + +```task +id: T02 +title: 'QualityLedger append-only JSONL store with file-locked writes, configurable path, simple query helpers (by_task_type, recent, mean_quality)' +priority: high +status: done +state_hub_task_id: "5249f171-a047-499f-9ec4-cb50e1477765" +``` + +```task +id: T03 +title: 'TTL helpers: prune_before(timestamp) and is_stale(observation, max_age)' +priority: medium +status: done +state_hub_task_id: "adb255cf-7e89-4fea-b822-6be437d99789" +``` + +```task +id: T04 +title: 'Functional contract doc for the ledger schema and quality_score semantics' +priority: medium +status: done +state_hub_task_id: "51a33180-a99d-4aa4-96be-2fcee15bfbc3" +``` + +```task +id: T05 +title: 'Tests: ledger round-trip, concurrent writes, query helpers, TTL, malformed-line resilience' +priority: high +status: done +state_hub_task_id: "458610c5-c903-4b42-9602-cd511999c9ba" +``` + +```task +id: T06 +title: 'GradingResult dataclass: quality_score, notes, grader_id, baseline_response, candidate_response' +priority: high +status: done +state_hub_task_id: "c12a595b-90fc-4a80-8394-549edbda2031" +``` + +```task +id: T07 +title: 'BaselineGrader protocol plus PairedGrader that runs baseline and candidate calls and delegates to a Judge' +priority: high +status: done +state_hub_task_id: "80b98e31-06fc-4462-b030-a12881095f93" +``` + +```task +id: T08 +title: 'Judge protocol and built-ins: ExactMatchJudge, EmbeddingSimilarityJudge, LLMJudge' +priority: high +status: done +state_hub_task_id: "c2887fe3-bae6-4298-8c26-f9a519264dcf" +``` + +```task +id: T09 +title: 'Functional contract doc covering judge bias caveats' +priority: medium +status: done +state_hub_task_id: "7a4fd87a-b0ba-41b0-8e1a-a60fdaded905" +``` + +```task +id: T10 +title: 'Tests: judges with canned inputs, stable grader result, deterministic LLMJudge rubric seed' +priority: high +status: done +state_hub_task_id: "8415a11d-d508-4d17-8082-10f93e9d16c5" +``` + +```task +id: T11 +title: 'AdaptiveRoutingPolicy extends RoutingPolicy and selects the cheapest adapter whose observed mean quality clears the floor' +priority: high +status: done +state_hub_task_id: "0e9f9f8e-5066-4257-913b-a19f5b3fc47d" +``` + +```task +id: T12 +title: 'Tie-breaking: prefer lower observed cost, then explicit preferred adapter from static rules' +priority: medium +status: done +state_hub_task_id: "59d44712-1088-41ac-bad8-5d95db6f3a4f" +``` + +```task +id: T13 +title: 'Cold-start behaviour falls through to static RoutingPolicy.resolve when observations are missing' +priority: high +status: done +state_hub_task_id: "1927d369-f5f6-48d3-8f53-7e4f1cae370e" +``` + +```task +id: T14 +title: 'Functional contract doc for adaptive policy and sample-size/freshness trade-off' +priority: medium +status: done +state_hub_task_id: "4d4717c1-8849-4fed-8f8d-515901ecafe0" +``` + +```task +id: T15 +title: 'Tests: floor enforcement, tie-break, cold-start, window-size effect, fallback chain' +priority: high +status: done +state_hub_task_id: "304bd782-db15-4b7a-8d05-49e064a926c3" +``` + +```task +id: T16 +title: 'ShadowingAdapter wraps a candidate adapter, also invokes the baseline adapter, grades, and appends to QualityLedger' +priority: medium +status: done +state_hub_task_id: "62dd507f-536a-4623-8cbd-fa9f78e85ca6" +``` + +```task +id: T17 +title: 'Sampling: caller-configurable shadow_rate so production load is not doubled' +priority: medium +status: done +state_hub_task_id: "ccb73e92-1fca-42f9-8437-9b2b50e6424c" +``` + +```task +id: T18 +title: 'Failure isolation: shadow errors never affect the candidate response returned to the caller' +priority: high +status: done +state_hub_task_id: "b879d232-d6ce-4ff6-b534-616729ea5ad7" +``` + +```task +id: T19 +title: 'Functional contract doc for ShadowingAdapter' +priority: low +status: done +state_hub_task_id: "99d2c1bc-f1d8-42b3-9e04-6eea49460943" +``` + +```task +id: T20 +title: 'Tests: candidate response survives baseline failure, ledger sampling rate, sync vs async modes' +priority: high +status: done +state_hub_task_id: "f533fbf4-484f-4408-8260-7e84e23bdc46" +``` + +```task +id: T21 +title: 'Example script: route fixture batch through three candidate adapters and populate the ledger' +priority: medium +status: done +state_hub_task_id: "7ef0c143-74b0-4740-81fa-819a826cf8f3" +``` + +```task +id: T22 +title: 'Integration test: cold-start, static fallback, first observations, convergence to cheapest qualifying adapter' +priority: high +status: done +state_hub_task_id: "c4c6743f-157b-4445-8576-9caa6421d463" +``` + +```task +id: T23 +title: 'Consumer integration guide showing how infospace-bench wires task types into adaptive policy' +priority: medium +status: done +state_hub_task_id: "3a073ff7-0170-4a95-9c2a-a5daa84964e6" +``` + ### T01 — Quality observation data model + ledger | ID | Title | Priority | Status | |-----|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|--------| -| T01 | `QualityObservation` dataclass: `task_type`, `adapter_id`, `model_id`, `cost_usd`, `quality_score` (0..1), `latency_ms`, `tokens_in`, `tokens_out`, `baseline_adapter_id`, `recorded_at`, `tags` | high | todo | -| T02 | `QualityLedger` append-only JSONL store with file-locked writes, configurable path, simple query helpers (`by_task_type`, `recent`, `mean_quality`) | high | todo | -| T03 | TTL helpers: `prune_before(timestamp)` and `is_stale(observation, max_age)` so callers can refresh observations without re-reading the whole ledger | medium | todo | -| T04 | Functional contract doc for the ledger schema and the field semantics of `quality_score` | medium | todo | -| T05 | Tests: round-trip, concurrent writes, query helpers, TTL, malformed-line resilience | high | todo | +| T01 | `QualityObservation` dataclass: `task_type`, `adapter_id`, `model_id`, `cost_usd`, `quality_score` (0..1), `latency_ms`, `tokens_in`, `tokens_out`, `baseline_adapter_id`, `recorded_at`, `tags` | high | done | +| T02 | `QualityLedger` append-only JSONL store with file-locked writes, configurable path, simple query helpers (`by_task_type`, `recent`, `mean_quality`) | high | done | +| T03 | TTL helpers: `prune_before(timestamp)` and `is_stale(observation, max_age)` so callers can refresh observations without re-reading the whole ledger | medium | done | +| T04 | Functional contract doc for the ledger schema and the field semantics of `quality_score` | medium | done | +| T05 | Tests: round-trip, concurrent writes, query helpers, TTL, malformed-line resilience | high | done | ### T02 — Baseline grader | ID | Title | Priority | Status | |-----|----------------------------------------------------------------------------------------------------------------------------------------------------|----------|--------| -| T06 | `GradingResult` dataclass: `quality_score`, `notes`, `grader_id`, `baseline_response`, `candidate_response` | high | todo | -| T07 | `BaselineGrader` protocol: `.grade(baseline_adapter, candidate_adapter, prompt, run_config)` → `GradingResult`; built-in concrete `PairedGrader` runs both calls and delegates to a `Judge` | high | todo | -| T08 | `Judge` protocol + three built-ins: `ExactMatchJudge`, `EmbeddingSimilarityJudge` (uses an embedding adapter), `LLMJudge` (uses a third adapter with a fixed rubric prompt) | high | todo | -| T09 | Functional contract doc covering judge bias caveats (length bias, format bias, position bias for `LLMJudge`) | medium | todo | -| T10 | Tests: each judge against canned inputs, grader emits stable result with both responses preserved, deterministic seed for `LLMJudge` rubric | high | todo | +| T06 | `GradingResult` dataclass: `quality_score`, `notes`, `grader_id`, `baseline_response`, `candidate_response` | high | done | +| T07 | `BaselineGrader` protocol: `.grade(baseline_adapter, candidate_adapter, prompt, run_config)` → `GradingResult`; built-in concrete `PairedGrader` runs both calls and delegates to a `Judge` | high | done | +| T08 | `Judge` protocol + three built-ins: `ExactMatchJudge`, `EmbeddingSimilarityJudge` (uses an embedding adapter), `LLMJudge` (uses a third adapter with a fixed rubric prompt) | high | done | +| T09 | Functional contract doc covering judge bias caveats (length bias, format bias, position bias for `LLMJudge`) | medium | done | +| T10 | Tests: each judge against canned inputs, grader emits stable result with both responses preserved, deterministic seed for `LLMJudge` rubric | high | done | ### T03 — Adaptive routing policy | ID | Title | Priority | Status | |-----|--------------------------------------------------------------------------------------------------------------------------------------------------|----------|--------| -| T11 | `AdaptiveRoutingPolicy` extends `RoutingPolicy`: given `task_type` + `quality_floor` + `ledger`, returns the cheapest adapter whose observed mean quality clears the floor over a configurable window | high | todo | -| T12 | Tie-breaking: when two adapters meet the floor, prefer lower observed cost; if still tied, prefer the explicitly-preferred adapter from the underlying static rules | medium | todo | -| T13 | Cold-start behaviour: when no observations exist for a `(task_type, adapter)` pair, fall through to the static `RoutingPolicy.resolve` result so the system stays usable on day zero | high | todo | -| T14 | Functional contract doc; document the trade-off between sample size and freshness | medium | todo | -| T15 | Tests: floor enforcement, tie-break, cold-start, window-size effect, fallback chain | high | todo | +| T11 | `AdaptiveRoutingPolicy` extends `RoutingPolicy`: given `task_type` + `quality_floor` + `ledger`, returns the cheapest adapter whose observed mean quality clears the floor over a configurable window | high | done | +| T12 | Tie-breaking: when two adapters meet the floor, prefer lower observed cost; if still tied, prefer the explicitly-preferred adapter from the underlying static rules | medium | done | +| T13 | Cold-start behaviour: when no observations exist for a `(task_type, adapter)` pair, fall through to the static `RoutingPolicy.resolve` result so the system stays usable on day zero | high | done | +| T14 | Functional contract doc; document the trade-off between sample size and freshness | medium | done | +| T15 | Tests: floor enforcement, tie-break, cold-start, window-size effect, fallback chain | high | done | ### T04 — Shadow-mode observation wrapper | ID | Title | Priority | Status | |-----|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|--------| -| T16 | `ShadowingAdapter` wraps a candidate adapter; on each call, also invokes the baseline adapter (sync or via a thread pool), grades, and appends to a `QualityLedger` | medium | todo | -| T17 | Sampling: caller-configurable fraction (`shadow_rate=0.1` means grade one call in ten) so production load is not doubled | medium | todo | -| T18 | Failure isolation: shadow errors never affect the candidate response returned to the caller; failures are logged but not raised | high | todo | -| T19 | Functional contract doc | low | todo | -| T20 | Tests: candidate response always returned even when baseline raises, ledger gets exactly `shadow_rate × calls` entries (within tolerance), sync vs async modes | high | todo | +| T16 | `ShadowingAdapter` wraps a candidate adapter; on each call, also invokes the baseline adapter (sync or via a thread pool), grades, and appends to a `QualityLedger` | medium | done | +| T17 | Sampling: caller-configurable fraction (`shadow_rate=0.1` means grade one call in ten) so production load is not doubled | medium | done | +| T18 | Failure isolation: shadow errors never affect the candidate response returned to the caller; failures are logged but not raised | high | done | +| T19 | Functional contract doc | low | done | +| T20 | Tests: candidate response always returned even when baseline raises, ledger gets exactly `shadow_rate × calls` entries (within tolerance), sync vs async modes | high | done | ### T05 — End-to-end example + integration test | ID | Title | Priority | Status | |-----|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|--------| -| T21 | Example script: route a small fixture batch through three candidate adapters (one OpenRouter cheap, one OpenRouter mid, `ClaudeCodeAdapter` as baseline), grade each, populate ledger | medium | todo | -| T22 | Integration test with mocked adapters covering: cold-start → static fallback → first observations → adaptive selection converges to the cheapest qualifying adapter | high | todo | -| T23 | Brief consumer-integration guide in `docs/` showing how `infospace-bench` (or any caller) wires task-type-per-stage into the adaptive policy | medium | todo | +| T21 | Example script: route a small fixture batch through three candidate adapters (one OpenRouter cheap, one OpenRouter mid, `ClaudeCodeAdapter` as baseline), grade each, populate ledger | medium | done | +| T22 | Integration test with mocked adapters covering: cold-start → static fallback → first observations → adaptive selection converges to the cheapest qualifying adapter | high | done | +| T23 | Brief consumer-integration guide in `docs/` showing how `infospace-bench` (or any caller) wires task-type-per-stage into the adaptive policy | medium | done | ## Risks and open questions