From f818acfc6274218342841dbb269c0ee1b432d45a Mon Sep 17 00:00:00 2001 From: tegwick Date: Mon, 18 May 2026 11:52:05 +0200 Subject: [PATCH] IB-WP-0018-T03+T04: shadow sampling + report/CLI surfacing; close IB-WP-0018 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit T03 — wrap_with_shadow_sampling() helper in routing.py: builds a llm-connect ShadowingAdapter around any candidate LLMAdapter with a caller-supplied baseline, grader, and QualityLedger. async_shadow=True by default so production load is not doubled; on_shadow_error escape hatch keeps caller logs informed when a baseline outage swallows the shadow path. The returned adapter is still an LLMAdapter so it slots into a RoutingPolicy rule without further code change. T04 — generation report enrichment plus a small CLI helper: - _collect_adapter_choices walks artifact provenance, groups by (stage_id, adapter_id), and surfaces calls + prompt/completion tokens per (stage, adapter) pair in a new ## Per-stage adapter choices section. Runs that did not go through the bridge have no provider_metadata.adapter_id and emit an empty list, so fixture-only reports stay terse. - summarise_quality_ledger() rolls a llm-connect QualityLedger up by (task_type, adapter_id) with mean quality, mean cost, observations, and cumulative tokens. - infospace-bench routing ledger CLI prints the rollup as JSON. Five new tests cover shadow happy-path, shadow failure isolation, ledger rollup, the routing CLI, and the report's adapter-choice aggregation. Closes IB-WP-0018: T01-T05 are all done and the workplan status flips from blocked to done now that LLM-WP-0004's primitives have shipped. 144 tests pass, 1 skipped (the OpenRouter live smoke, gated as before). Co-Authored-By: Claude Opus 4.7 --- src/infospace_bench/cli.py | 19 ++ src/infospace_bench/generator.py | 49 +++++ src/infospace_bench/routing.py | 85 ++++++++ tests/test_routing_adapter.py | 194 ++++++++++++++++++ ...B-WP-0018-adaptive-llm-routing-consumer.md | 21 +- 5 files changed, 365 insertions(+), 3 deletions(-) diff --git a/src/infospace_bench/cli.py b/src/infospace_bench/cli.py index 55d2e83..87aa744 100644 --- a/src/infospace_bench/cli.py +++ b/src/infospace_bench/cli.py @@ -256,6 +256,14 @@ def build_parser() -> argparse.ArgumentParser: ) generate_from_source.add_argument("--apply", action="store_true") + routing = sub.add_parser("routing", help="Inspect llm-connect routing observations") + routing_sub = routing.add_subparsers(dest="routing_command", required=True) + routing_ledger = routing_sub.add_parser( + "ledger", + help="Summarise a llm-connect QualityLedger by (task_type, adapter_id)", + ) + routing_ledger.add_argument("ledger_path") + budget = sub.add_parser("budget", help="Inspect per-infospace budget and usage records") budget_sub = budget.add_subparsers(dest="budget_command", required=True) budget_list = budget_sub.add_parser( @@ -587,6 +595,17 @@ def main(argv: list[str] | None = None) -> int: _write_json(plan_generation(infospace.root, stage=args.stage)) else: parser.error(f"Unhandled generate command: {args.generate_command}") + elif args.command == "routing": + from .routing import summarise_quality_ledger + if args.routing_command == "ledger": + _write_json( + { + "ledger_path": str(Path(args.ledger_path)), + "rows": summarise_quality_ledger(args.ledger_path), + } + ) + else: + parser.error(f"Unhandled routing command: {args.routing_command}") elif args.command == "budget": from .budget import budget_list_workspace, budget_show if args.budget_command == "list": diff --git a/src/infospace_bench/generator.py b/src/infospace_bench/generator.py index 990a6f4..21c8053 100644 --- a/src/infospace_bench/generator.py +++ b/src/infospace_bench/generator.py @@ -791,6 +791,15 @@ def _write_generation_report(root: Path, metrics: dict[str, Any], snapshot_id: s "", ] ) + if review.get("adapter_choices"): + lines.extend(["## Per-stage adapter choices", ""]) + for row in review["adapter_choices"]: + lines.append( + f"- `{row['stage_id']}` ({row['task_type']}) -> " + f"`{row['adapter_id']}` · {row['calls']} call(s) · " + f"{row['prompt_tokens']} prompt + {row['completion_tokens']} completion tokens" + ) + lines.append("") text = "\n".join(lines) path = root / "reports" / "generation-summary.md" path.parent.mkdir(parents=True, exist_ok=True) @@ -872,15 +881,55 @@ def _collect_review_report(root: Path) -> dict[str, Any]: entity_titles = sorted( {item.title for item in infospace.artifacts if item.kind == "entity" and item.title} ) + adapter_choices = _collect_adapter_choices(generated) return { "chapter_coverage": chapter_coverage, "entity_titles": entity_titles, "unmapped_sources": unmapped, "page_anchor_total": len(anchors), "page_anchor_sample": anchors[:6], + "adapter_choices": adapter_choices, } +def _collect_adapter_choices(generated: list[Any]) -> list[dict[str, Any]]: + """Roll up which adapter ran each stage when the routing bridge was used. + + Returns one row per (stage_id, adapter_id) with call counts and + cumulative tokens. Entries without provider_metadata are skipped so + fixture-only runs produce an empty list rather than a noisy section. + """ + buckets: dict[tuple[str, str], dict[str, Any]] = {} + for item in generated: + provenance = item.provenance or {} + metadata = provenance.get("provider_metadata") or {} + if not isinstance(metadata, dict): + continue + adapter_id = str(metadata.get("adapter_id") or metadata.get("model") or "") + if not adapter_id: + continue + stage_id = str(metadata.get("stage_id") or provenance.get("stage_id") or "") + if not stage_id: + continue + usage = metadata.get("usage") or {} + key = (stage_id, adapter_id) + bucket = buckets.setdefault( + key, + { + "stage_id": stage_id, + "adapter_id": adapter_id, + "task_type": metadata.get("task_type") or stage_id, + "calls": 0, + "prompt_tokens": 0, + "completion_tokens": 0, + }, + ) + bucket["calls"] += 1 + bucket["prompt_tokens"] += int(usage.get("prompt_tokens") or 0) + bucket["completion_tokens"] += int(usage.get("completion_tokens") or 0) + return sorted(buckets.values(), key=lambda row: (row["stage_id"], row["adapter_id"])) + + def _workflow_ids_for_stage(stage: str) -> list[str]: normalized = stage.strip().lower() if normalized == "intake": diff --git a/src/infospace_bench/routing.py b/src/infospace_bench/routing.py index f07e106..74796fb 100644 --- a/src/infospace_bench/routing.py +++ b/src/infospace_bench/routing.py @@ -15,8 +15,11 @@ from dataclasses import dataclass, field from typing import Any from llm_connect.adapter import LLMAdapter +from llm_connect.grading import BaselineGrader from llm_connect.models import RunConfig +from llm_connect.quality import QualityLedger from llm_connect.routing import AdaptiveRoutingPolicy, RoutingPolicy +from llm_connect.shadowing import ShadowingAdapter from .workflow import AssistedGenerationRequest, AssistedGenerationResult @@ -116,6 +119,88 @@ def _identify_adapter(adapter: LLMAdapter) -> str: return name +def wrap_with_shadow_sampling( + *, + candidate: LLMAdapter, + baseline: LLMAdapter, + grader: BaselineGrader, + ledger: QualityLedger, + task_type: str, + adapter_id: str | None = None, + baseline_adapter_id: str | None = None, + shadow_rate: float = 0.1, + async_shadow: bool = True, + on_shadow_error: Any | None = None, +) -> ShadowingAdapter: + """Wrap ``candidate`` with llm-connect's ``ShadowingAdapter``. + + Sampled baseline grading collects QualityLedger observations without + changing the response the caller sees. Errors in the shadow path + (baseline outage, grader failure, ledger write error) never alter the + candidate response — failures land on ``on_shadow_error`` when + provided, else are silently swallowed by the underlying adapter. + + The returned ``ShadowingAdapter`` is still an ``LLMAdapter``, so it + can be slotted into a ``RoutingPolicy`` rule and used through + ``RoutingAssistedGenerationAdapter`` without further changes. + """ + return ShadowingAdapter( + candidate_adapter=candidate, + baseline_adapter=baseline, + grader=grader, + ledger=ledger, + task_type=task_type, + adapter_id=adapter_id or _identify_adapter(candidate), + baseline_adapter_id=baseline_adapter_id or _identify_adapter(baseline), + shadow_rate=shadow_rate, + async_shadow=async_shadow, + on_shadow_error=on_shadow_error, + ) + + +def summarise_quality_ledger( + ledger_path: str | Any, +) -> list[dict[str, Any]]: + """Roll up a QualityLedger into one row per (task_type, adapter_id). + + Useful as a CLI helper or a quick budget-style inspection without + loading llm-connect's full ledger API at the call site. + """ + from pathlib import Path + + ledger = QualityLedger(path=Path(ledger_path)) + observations = ledger.read_all() + grouped: dict[tuple[str, str], dict[str, Any]] = {} + for obs in observations: + key = (obs.task_type, obs.adapter_id) + bucket = grouped.setdefault( + key, + { + "task_type": obs.task_type, + "adapter_id": obs.adapter_id, + "observations": 0, + "mean_quality": 0.0, + "mean_cost_usd": 0.0, + "total_tokens_in": 0, + "total_tokens_out": 0, + }, + ) + bucket["observations"] += 1 + bucket["mean_quality"] += float(obs.quality_score) + bucket["mean_cost_usd"] += float(obs.cost_usd) + bucket["total_tokens_in"] += int(getattr(obs, "tokens_in", 0) or 0) + bucket["total_tokens_out"] += int(getattr(obs, "tokens_out", 0) or 0) + rows: list[dict[str, Any]] = [] + for bucket in grouped.values(): + count = bucket["observations"] + if count: + bucket["mean_quality"] = round(bucket["mean_quality"] / count, 4) + bucket["mean_cost_usd"] = round(bucket["mean_cost_usd"] / count, 6) + rows.append(bucket) + rows.sort(key=lambda row: (row["task_type"], row["adapter_id"])) + return rows + + def _provider_tag(adapter: LLMAdapter) -> str: """Coarse provider tag matching the strings already used in run records. diff --git a/tests/test_routing_adapter.py b/tests/test_routing_adapter.py index d13fb4e..1b90534 100644 --- a/tests/test_routing_adapter.py +++ b/tests/test_routing_adapter.py @@ -213,6 +213,200 @@ def test_bridge_preserves_response_metadata_and_provider_tag() -> None: assert result.provider == "mock" +def test_wrap_with_shadow_sampling_passes_candidate_through(tmp_path) -> None: + from llm_connect.grading import ExactMatchJudge, PairedGrader + from infospace_bench.routing import wrap_with_shadow_sampling + + candidate = _MockAdapter(model="cheap-1", content="match") + baseline = _MockAdapter(model="baseline-1", content="match") + ledger = QualityLedger(path=tmp_path / "quality.jsonl") + grader = PairedGrader(judge=ExactMatchJudge()) + + shadow = wrap_with_shadow_sampling( + candidate=candidate, + baseline=baseline, + grader=grader, + ledger=ledger, + task_type="extract-entities", + shadow_rate=1.0, + async_shadow=False, + ) + + config = RunConfig(model_name="cheap-1") + response = shadow.execute_prompt("Hello.", config) + + assert response.content == "match" + # Baseline ran in the shadow path; ledger now has one observation. + assert baseline.calls, "baseline must have been called when shadow_rate=1.0" + observations = ledger.by_task_type("extract-entities") + assert observations, "shadow path should append at least one observation" + + +def test_wrap_with_shadow_sampling_isolates_baseline_failure(tmp_path) -> None: + from llm_connect.grading import ExactMatchJudge, PairedGrader + from infospace_bench.routing import wrap_with_shadow_sampling + + candidate = _MockAdapter(model="cheap-1", content="ok") + + class _AngryBaseline(LLMAdapter): + def execute_prompt(self, prompt, config): + raise RuntimeError("baseline outage") + + def validate_config(self, config): + return True + + seen_errors: list[Exception] = [] + shadow = wrap_with_shadow_sampling( + candidate=candidate, + baseline=_AngryBaseline(), + grader=PairedGrader(judge=ExactMatchJudge()), + ledger=QualityLedger(path=tmp_path / "quality.jsonl"), + task_type="summarize-source", + shadow_rate=1.0, + async_shadow=False, + on_shadow_error=seen_errors.append, + ) + response = shadow.execute_prompt("Hello.", RunConfig(model_name="cheap-1")) + + assert response.content == "ok", "candidate response must survive baseline outage" + assert seen_errors and "baseline outage" in str(seen_errors[0]) + + +def test_summarise_quality_ledger_rolls_up_by_task_and_adapter(tmp_path) -> None: + from infospace_bench.routing import summarise_quality_ledger + + ledger_path = tmp_path / "quality.jsonl" + ledger = QualityLedger(path=ledger_path) + for quality in (0.9, 0.95, 0.85): + ledger.append( + QualityObservation( + task_type="extract-entities", + adapter_id="cheap-1", + model_id="cheap-1", + cost_usd=0.001, + quality_score=quality, + tokens_in=100, + tokens_out=50, + latency_ms=10, + ) + ) + ledger.append( + QualityObservation( + task_type="summarize-source", + adapter_id="cheaper-1", + model_id="cheaper-1", + cost_usd=0.0001, + quality_score=0.7, + tokens_in=80, + tokens_out=20, + latency_ms=5, + ) + ) + + rows = summarise_quality_ledger(ledger_path) + + by_key = {(row["task_type"], row["adapter_id"]): row for row in rows} + extract = by_key[("extract-entities", "cheap-1")] + assert extract["observations"] == 3 + assert extract["mean_quality"] == round((0.9 + 0.95 + 0.85) / 3, 4) + assert extract["mean_cost_usd"] == 0.001 + summarize = by_key[("summarize-source", "cheaper-1")] + assert summarize["observations"] == 1 + + +def test_collect_adapter_choices_rolls_up_per_stage(tmp_path) -> None: + """Unit test: report helper aggregates adapter choices from artifact provenance.""" + from infospace_bench.generator import _collect_adapter_choices + + class _FakeArtifact: + def __init__(self, kind: str, provenance: dict) -> None: + self.kind = kind + self.provenance = provenance + + artifacts = [ + _FakeArtifact( + kind="entity", + provenance={ + "stage_id": "extract-entities", + "provider_metadata": { + "adapter_id": "_MockAdapter:cheap-1", + "task_type": "extract-entities", + "usage": {"prompt_tokens": 120, "completion_tokens": 40}, + }, + }, + ), + _FakeArtifact( + kind="entity", + provenance={ + "stage_id": "extract-entities", + "provider_metadata": { + "adapter_id": "_MockAdapter:cheap-1", + "task_type": "extract-entities", + "usage": {"prompt_tokens": 130, "completion_tokens": 50}, + }, + }, + ), + _FakeArtifact( + kind="relation", + provenance={ + "stage_id": "extract-relations", + "provider_metadata": { + "adapter_id": "_MockAdapter:smart-1", + "task_type": "extract-relations", + "usage": {"prompt_tokens": 200, "completion_tokens": 80}, + }, + }, + ), + # Artifact without provider_metadata should be ignored. + _FakeArtifact(kind="generated", provenance={"stage_id": "summarize-source"}), + ] + + rows = _collect_adapter_choices(artifacts) + + by_key = {(row["stage_id"], row["adapter_id"]): row for row in rows} + entities_row = by_key[("extract-entities", "_MockAdapter:cheap-1")] + relations_row = by_key[("extract-relations", "_MockAdapter:smart-1")] + assert entities_row["calls"] == 2 + assert entities_row["prompt_tokens"] == 250 + assert entities_row["completion_tokens"] == 90 + assert relations_row["calls"] == 1 + assert relations_row["task_type"] == "extract-relations" + + +def test_routing_ledger_cli(tmp_path) -> None: + import json as _json + import subprocess as _sub + import sys as _sys + import os as _os + + ledger_path = tmp_path / "quality.jsonl" + ledger = QualityLedger(path=ledger_path) + ledger.append( + QualityObservation( + task_type="extract-entities", + adapter_id="cheap-1", + model_id="cheap-1", + cost_usd=0.001, + quality_score=0.9, + tokens_in=100, + tokens_out=50, + latency_ms=10, + ) + ) + + env = _os.environ.copy() + env["PYTHONPATH"] = "src:/home/worsch/markitect-tool/src:/home/worsch/llm-connect" + result = _sub.run( + [_sys.executable, "-m", "infospace_bench", "routing", "ledger", str(ledger_path)], + check=False, env=env, text=True, capture_output=True, + ) + + assert result.returncode == 0, result.stderr + payload = _json.loads(result.stdout) + assert payload["ledger_path"] == str(ledger_path) + assert payload["rows"] and payload["rows"][0]["task_type"] == "extract-entities" + + def test_bridge_passes_estimated_cost_per_1k_through() -> None: captured: dict[str, Any] = {} diff --git a/workplans/IB-WP-0018-adaptive-llm-routing-consumer.md b/workplans/IB-WP-0018-adaptive-llm-routing-consumer.md index c4ce0e8..9bd5243 100644 --- a/workplans/IB-WP-0018-adaptive-llm-routing-consumer.md +++ b/workplans/IB-WP-0018-adaptive-llm-routing-consumer.md @@ -4,11 +4,11 @@ type: workplan title: "Adaptive LLM Routing — infospace-bench Consumer Wiring" domain: markitect repo: infospace-bench -status: blocked +status: done owner: markitect topic_slug: markitect created: "2026-05-17" -updated: "2026-05-17" +updated: "2026-05-18" depends_on_workplans: - LLM-WP-0004 related_workplans: @@ -33,7 +33,22 @@ list will be refined once that API is stable. ## Status -Blocked on `LLM-WP-0004` T01..T03. +Done. LLM-WP-0004 landed `QualityLedger`, `QualityObservation`, +`BaselineGrader`/`PairedGrader`/`ExactMatchJudge`/`EmbeddingSimilarityJudge`/ +`LLMJudge`, `AdaptiveRoutingPolicy`, and `ShadowingAdapter` in +llm-connect; the five tasks below are all complete. + +- T01 — task-type taxonomy (`docs/routing-task-types.md`) +- T02 — `RoutingAssistedGenerationAdapter` bridge in + `src/infospace_bench/routing.py` +- T03 — `wrap_with_shadow_sampling()` helper that opt-in installs + llm-connect's `ShadowingAdapter` around any candidate +- T04 — `## Per-stage adapter choices` section in + `reports/generation-summary.md` (driven from artifact + `provenance.provider_metadata`) and `infospace-bench routing ledger` + CLI subcommand +- T05 — `tests/test_routing_adapter.py` (13 tests, including a CLI + smoke and the adapter-choices unit test) ## Why this is a separate workplan