From b0d67ae79e9174c1e24c3317242920dbfb5f7814 Mon Sep 17 00:00:00 2001 From: tegwick Date: Mon, 18 May 2026 23:30:36 +0200 Subject: [PATCH] IB-WP-0020-T05: shadow-mode CLI flags; close IB-WP-0020 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add --shadow-baseline and --shadow-rate opt-in flags to generate run, generate resume, and generate from-source. When --shadow-baseline names a candidate id from the routing config, build_routing_policy_from_config wraps every other candidate in an llm-connect ShadowingAdapter using that baseline plus a PairedGrader(ExactMatchJudge()) and the workspace-resolved QualityLedger. The baseline candidate itself is never wrapped — that would shadow it against itself. --shadow-rate defaults to 0.1 when --shadow-baseline is set; passing --shadow-rate without --shadow-baseline fails fast with shadow_rate_without_baseline. Setting --shadow-baseline without a ledger_path in the config fails with missing_routing_ledger_for_shadow so observations have a place to land before any call goes out. run_generation grew shadow_baseline + shadow_rate kwargs and _adapter_for("routing", ...) plumbs them into build_routing_policy_from_config. The wrapped ShadowingAdapter slots into the policy's prefer/fallback per task type via a (candidate_id, task_type) reverse lookup, and adapters_by_id on the adaptive policy gets the string-keyed entries. Five new tests cover: shadow_rate without baseline fails fast, shadow mode without a ledger fails fast, unknown shadow baseline id fails fast, structural assertion that ShadowingAdapter wraps non-baseline candidates and leaves the baseline raw, and a behavioural check that shadow_rate=1.0 calls the baseline on every call while shadow_rate=0.0 skips entirely. Test forces async_shadow=False so the call counter is deterministic. Closes IB-WP-0020: T01-T05 all done. Workplan status flips from active to finished. 179 tests pass, 2 skipped (both live OpenRouter smokes). Co-Authored-By: Claude Opus 4.7 --- src/infospace_bench/cli.py | 12 ++ src/infospace_bench/generator.py | 13 +- src/infospace_bench/routing_config.py | 120 ++++++++++-- tests/test_routing_config.py | 181 +++++++++++++++++++ workplans/IB-WP-0020-provider-routing-cli.md | 4 +- 5 files changed, 308 insertions(+), 22 deletions(-) diff --git a/src/infospace_bench/cli.py b/src/infospace_bench/cli.py index f4c79dc..db78c5c 100644 --- a/src/infospace_bench/cli.py +++ b/src/infospace_bench/cli.py @@ -208,6 +208,8 @@ def build_parser() -> argparse.ArgumentParser: generate_run.add_argument("--fixture-responses", default="") generate_run.add_argument("--routing-config", default="", help="YAML routing config (required with --provider routing)") generate_run.add_argument("--quality-floor", type=float, default=None, help="Override the config's default_quality_floor for this run") + generate_run.add_argument("--shadow-baseline", default="", help="Candidate id from the routing config to use as the shadow-grading baseline") + generate_run.add_argument("--shadow-rate", type=float, default=None, help="Shadow sampling rate 0..1 (default 0.1 when --shadow-baseline is set)") generate_run.add_argument("--resume", action="store_true") generate_run.add_argument("--force", action="store_true") @@ -222,6 +224,8 @@ def build_parser() -> argparse.ArgumentParser: generate_resume.add_argument("--fixture-responses", default="") generate_resume.add_argument("--routing-config", default="") generate_resume.add_argument("--quality-floor", type=float, default=None) + generate_resume.add_argument("--shadow-baseline", default="") + generate_resume.add_argument("--shadow-rate", type=float, default=None) generate_resume.add_argument("--force", action="store_true") generate_status = generate_sub.add_parser( @@ -245,6 +249,8 @@ def build_parser() -> argparse.ArgumentParser: generate_from_source.add_argument("--fixture-responses", default="") generate_from_source.add_argument("--routing-config", default="", help="YAML routing config (required with --provider routing)") generate_from_source.add_argument("--quality-floor", type=float, default=None) + generate_from_source.add_argument("--shadow-baseline", default="") + generate_from_source.add_argument("--shadow-rate", type=float, default=None) generate_from_source.add_argument("--max-chunks", type=int, default=0) generate_from_source.add_argument( "--chapter", @@ -559,6 +565,8 @@ def main(argv: list[str] | None = None) -> int: fixture_responses=args.fixture_responses or None, routing_config=args.routing_config or None, quality_floor=args.quality_floor, + shadow_baseline=args.shadow_baseline or None, + shadow_rate=args.shadow_rate, resume=args.resume, force=args.force, ).to_dict() @@ -573,6 +581,8 @@ def main(argv: list[str] | None = None) -> int: fixture_responses=args.fixture_responses or None, routing_config=args.routing_config or None, quality_floor=args.quality_floor, + shadow_baseline=args.shadow_baseline or None, + shadow_rate=args.shadow_rate, resume=True, force=args.force, ).to_dict() @@ -601,6 +611,8 @@ def main(argv: list[str] | None = None) -> int: fixture_responses=args.fixture_responses or None, routing_config=args.routing_config or None, quality_floor=args.quality_floor, + shadow_baseline=args.shadow_baseline or None, + shadow_rate=args.shadow_rate, ) _write_json(result.to_dict()) else: diff --git a/src/infospace_bench/generator.py b/src/infospace_bench/generator.py index 3815fc2..354eaf4 100644 --- a/src/infospace_bench/generator.py +++ b/src/infospace_bench/generator.py @@ -429,6 +429,8 @@ def run_generation( fixture_responses: str | Path | None = None, routing_config: str | Path | None = None, quality_floor: float | None = None, + shadow_baseline: str | None = None, + shadow_rate: float | None = None, resume: bool = False, force: bool = False, ) -> GenerationRunResult: @@ -457,6 +459,8 @@ def run_generation( fixture_responses=fixture_responses, routing_config=routing_config, quality_floor=quality_floor, + shadow_baseline=shadow_baseline, + shadow_rate=shadow_rate, workspace=_workspace_for(root_path), ) if workflow_ids @@ -562,6 +566,8 @@ def _adapter_for( fixture_responses: str | Path | None, routing_config: str | Path | None = None, quality_floor: float | None = None, + shadow_baseline: str | None = None, + shadow_rate: float | None = None, workspace: Path | None = None, ) -> AssistedGenerationAdapter: if fixture_responses: @@ -582,7 +588,12 @@ def _adapter_for( ) config = load_routing_config(routing_config) - policy = build_routing_policy_from_config(config, workspace=workspace) + policy = build_routing_policy_from_config( + config, + workspace=workspace, + shadow_baseline_id=shadow_baseline, + shadow_rate=shadow_rate, + ) effective_floor = ( quality_floor if quality_floor is not None diff --git a/src/infospace_bench/routing_config.py b/src/infospace_bench/routing_config.py index 5418c94..88fb7b6 100644 --- a/src/infospace_bench/routing_config.py +++ b/src/infospace_bench/routing_config.py @@ -287,6 +287,8 @@ def build_routing_policy_from_config( workspace: str | Path | None = None, env: Mapping[str, str] | None = None, adapter_factory: AdapterFactory | None = None, + shadow_baseline_id: str | None = None, + shadow_rate: float | None = None, ) -> Any: """Materialise a parsed config into a live llm-connect routing policy. @@ -302,20 +304,92 @@ def build_routing_policy_from_config( Fails fast (before any network call) when a candidate's required API key env var is missing from ``env``. + + When ``shadow_baseline_id`` is set, every non-baseline candidate is + wrapped in an llm-connect ``ShadowingAdapter`` using the named + baseline candidate plus a PairedGrader(ExactMatchJudge()) and the + QualityLedger from ``config.ledger_path``. ``shadow_rate`` controls + the sampling fraction (defaults to 0.1). The baseline candidate + itself is never wrapped — that would shadow it against itself. """ from llm_connect.routing import AdaptiveRoutingPolicy, RoutingPolicy, RoutingRule environment: Mapping[str, str] = env if env is not None else os.environ factory: AdapterFactory = adapter_factory or _default_adapter_factory + if shadow_rate is not None and shadow_baseline_id is None: + raise InfospaceError( + "shadow_rate_without_baseline", + "shadow_rate requires shadow_baseline_id; pass --shadow-baseline with --shadow-rate", + {"shadow_rate": shadow_rate}, + ) + + use_adaptive = ( + config.default_quality_floor is not None + or any(task.quality_floor is not None for task in config.task_types) + or config.ledger_path is not None + or shadow_baseline_id is not None + ) + + ledger = _resolve_ledger(config, workspace, required=shadow_baseline_id is not None) + + raw_adapters: dict[str, Any] = {} + for task in config.task_types: + for candidate in task.candidates: + if candidate.id not in raw_adapters: + raw_adapters[candidate.id] = factory(candidate, environment) + + baseline_adapter = None + if shadow_baseline_id is not None: + if shadow_baseline_id not in raw_adapters: + raise InfospaceError( + "missing_shadow_baseline", + f"shadow_baseline_id {shadow_baseline_id!r} not declared as a candidate in the routing config", + {"shadow_baseline_id": shadow_baseline_id}, + ) + baseline_adapter = raw_adapters[shadow_baseline_id] + adapters_by_id: dict[str, Any] = {} + if shadow_baseline_id is None: + adapters_by_id = dict(raw_adapters) + else: + # Wrap each candidate (per task) in a ShadowingAdapter unless it *is* the baseline. + from .routing import wrap_with_shadow_sampling + from llm_connect.grading import ExactMatchJudge, PairedGrader + + assert ledger is not None # _resolve_ledger raised if required and missing + grader = PairedGrader(judge=ExactMatchJudge()) + effective_rate = shadow_rate if shadow_rate is not None else 0.1 + for task in config.task_types: + for candidate in task.candidates: + key = (candidate.id, task.task_type) + if candidate.id == shadow_baseline_id: + adapters_by_id[candidate.id] = raw_adapters[candidate.id] + continue + # One ShadowingAdapter per (candidate, task_type) pair so the + # task_type tagged on observations matches the rule it serves. + shadow_id = f"shadow:{candidate.id}@{task.task_type}" + adapters_by_id[shadow_id] = wrap_with_shadow_sampling( + candidate=raw_adapters[candidate.id], + baseline=baseline_adapter, + grader=grader, + ledger=ledger, + task_type=task.task_type, + adapter_id=candidate.id, + baseline_adapter_id=shadow_baseline_id, + shadow_rate=effective_rate, + async_shadow=True, + ) + adapters_by_id[key] = adapters_by_id[shadow_id] # task-keyed reverse lookup + rules: list[RoutingRule] = [] for task in config.task_types: - candidates: list[Any] = [] + candidates = [] for candidate in task.candidates: - if candidate.id not in adapters_by_id: - adapters_by_id[candidate.id] = factory(candidate, environment) - candidates.append(adapters_by_id[candidate.id]) + if shadow_baseline_id is not None and candidate.id != shadow_baseline_id: + candidates.append(adapters_by_id[(candidate.id, task.task_type)]) + else: + candidates.append(adapters_by_id[candidate.id]) prefer = candidates[0] prefer_candidate = task.candidates[0] fallback = candidates[1] if len(candidates) > 1 else None @@ -328,30 +402,38 @@ def build_routing_policy_from_config( ) ) - use_adaptive = ( - config.default_quality_floor is not None - or any(task.quality_floor is not None for task in config.task_types) - or config.ledger_path is not None - ) if not use_adaptive: return RoutingPolicy(rules=rules) - from llm_connect.quality import QualityLedger - - ledger: QualityLedger | None = None - if config.ledger_path: - ledger_path = Path(config.ledger_path) - if not ledger_path.is_absolute() and workspace is not None: - ledger_path = Path(workspace) / ledger_path - ledger_path.parent.mkdir(parents=True, exist_ok=True) - ledger = QualityLedger(path=ledger_path) + # Clean adapters_by_id for AdaptiveRoutingPolicy: keep stable string keys only. + string_keyed = {key: value for key, value in adapters_by_id.items() if isinstance(key, str)} return AdaptiveRoutingPolicy( rules=rules, ledger=ledger, - adapters_by_id=dict(adapters_by_id), + adapters_by_id=string_keyed, ) +def _resolve_ledger( + config: RoutingConfig, workspace: str | Path | None, *, required: bool +) -> Any: + from llm_connect.quality import QualityLedger + + if not config.ledger_path: + if required: + raise InfospaceError( + "missing_routing_ledger_for_shadow", + "Shadow sampling requires a ledger_path in the routing config", + {"config_ledger_path": config.ledger_path}, + ) + return None + ledger_path = Path(config.ledger_path) + if not ledger_path.is_absolute() and workspace is not None: + ledger_path = Path(workspace) / ledger_path + ledger_path.parent.mkdir(parents=True, exist_ok=True) + return QualityLedger(path=ledger_path) + + def _default_adapter_factory( candidate: RoutingCandidateConfig, env: Mapping[str, str] ) -> Any: diff --git a/tests/test_routing_config.py b/tests/test_routing_config.py index 62e505a..4daa4c3 100644 --- a/tests/test_routing_config.py +++ b/tests/test_routing_config.py @@ -463,6 +463,187 @@ def test_build_routing_policy_honours_custom_api_key_env() -> None: assert isinstance(policy.rules[0].prefer, OpenRouterAdapter) +def test_shadow_rate_without_baseline_fails_fast() -> None: + from infospace_bench.routing_config import build_routing_policy_from_config + + config = parse_routing_config(MINIMAL) + with pytest.raises(InfospaceError) as exc_info: + build_routing_policy_from_config( + config, + shadow_rate=0.5, + adapter_factory=_fake_adapter_factory_record([]), + ) + assert exc_info.value.code == "shadow_rate_without_baseline" + + +def test_shadow_baseline_without_ledger_path_fails_fast() -> None: + """ShadowingAdapter needs a place to write observations; require ledger_path.""" + from infospace_bench.routing_config import build_routing_policy_from_config + + config = parse_routing_config(MINIMAL) + with pytest.raises(InfospaceError) as exc_info: + build_routing_policy_from_config( + config, + shadow_baseline_id="openrouter:gpt-4o-mini", + adapter_factory=_fake_adapter_factory_record([]), + ) + assert exc_info.value.code == "missing_routing_ledger_for_shadow" + + +def test_shadow_baseline_not_in_config_fails_fast(tmp_path: Path) -> None: + from infospace_bench.routing_config import build_routing_policy_from_config + + data = {**MINIMAL, "ledger_path": "quality.jsonl"} + config = parse_routing_config(data) + with pytest.raises(InfospaceError) as exc_info: + build_routing_policy_from_config( + config, + workspace=tmp_path, + shadow_baseline_id="not-in-config", + adapter_factory=_fake_adapter_factory_record([]), + ) + assert exc_info.value.code == "missing_shadow_baseline" + + +def test_shadow_wraps_candidates_excluding_baseline(tmp_path: Path) -> None: + from llm_connect.adapter import LLMAdapter + from llm_connect.models import LLMResponse, RunConfig + from llm_connect.shadowing import ShadowingAdapter + from infospace_bench.routing_config import build_routing_policy_from_config + + data = { + "schema_version": 1, + "ledger_path": "quality.jsonl", + "task_types": { + "extract-entities": { + "candidates": [ + {"id": "candidate-a", "provider": "openrouter", "model": "openai/gpt-4o-mini"}, + {"id": "baseline-x", "provider": "claude_code", "model": "claude-opus-4-7"}, + ], + }, + }, + } + config = parse_routing_config(data) + + class _Stub(LLMAdapter): + def __init__(self, name): + self.name = name + self.calls = 0 + + def execute_prompt(self, prompt, config): + self.calls += 1 + return LLMResponse(content="match", model=self.name, usage={"prompt_tokens": 1, "completion_tokens": 1}) + + def validate_config(self, config): + return True + + stubs: dict[str, _Stub] = {} + + def factory(candidate, env): + stubs[candidate.id] = _Stub(candidate.id) + return stubs[candidate.id] + + policy = build_routing_policy_from_config( + config, + workspace=tmp_path, + adapter_factory=factory, + shadow_baseline_id="baseline-x", + shadow_rate=1.0, + ) + + rule = policy.rules[0] + # The prefer slot is now a ShadowingAdapter wrapping candidate-a. + assert isinstance(rule.prefer, ShadowingAdapter) + assert rule.prefer.candidate_adapter is stubs["candidate-a"] + assert rule.prefer.baseline_adapter is stubs["baseline-x"] + assert rule.prefer.task_type == "extract-entities" + # The baseline candidate (fallback) is NOT wrapped. + assert rule.fallback is stubs["baseline-x"] + + +def test_shadow_rate_one_fires_per_call_and_zero_skips(tmp_path: Path) -> None: + """ShadowingAdapter is best-effort and supplied by llm-connect. + + Spot-check the wiring: at rate=1.0 the baseline.execute_prompt runs on + every call; at rate=0.0 it never runs. + """ + from llm_connect.adapter import LLMAdapter + from llm_connect.models import LLMResponse, RunConfig + from infospace_bench.routing_config import build_routing_policy_from_config + + data = { + "schema_version": 1, + "ledger_path": "quality.jsonl", + "task_types": { + "extract-entities": { + "candidates": [ + {"id": "candidate-a", "provider": "openrouter", "model": "openai/gpt-4o-mini"}, + {"id": "baseline-x", "provider": "claude_code", "model": "claude-opus-4-7"}, + ], + }, + }, + } + config = parse_routing_config(data) + + class _Counter(LLMAdapter): + def __init__(self, name): + self.name = name + self.calls = 0 + + def execute_prompt(self, prompt, config): + self.calls += 1 + return LLMResponse(content="match", model=self.name, usage={"prompt_tokens": 1, "completion_tokens": 1}) + + def validate_config(self, config): + return True + + def make_factory(): + stubs: dict[str, _Counter] = {} + + def factory(candidate, env): + stubs[candidate.id] = _Counter(candidate.id) + return stubs[candidate.id] + + return factory, stubs + + factory, stubs = make_factory() + policy_full = build_routing_policy_from_config( + config, + workspace=tmp_path, + adapter_factory=factory, + shadow_baseline_id="baseline-x", + shadow_rate=1.0, + ) + # Drive the prefer adapter (synchronous shadow) and force any + # background shadow work to drain before we count calls. + shadow_adapter = policy_full.rules[0].prefer + shadow_adapter.async_shadow = False # force sync grading for a deterministic count + for _ in range(3): + shadow_adapter.execute_prompt("hello", RunConfig(model_name="x")) + assert stubs["candidate-a"].calls == 3 + assert stubs["baseline-x"].calls == 3, "rate=1.0 should call baseline on every call" + + # Fresh factory + stubs for the zero-rate run so counters reset. + factory2, stubs2 = make_factory() + # Use a unique ledger path so the two policies do not share state. + (tmp_path / "subdir").mkdir(exist_ok=True) + data2 = {**data, "ledger_path": "subdir/quality.jsonl"} + config2 = parse_routing_config(data2) + policy_zero = build_routing_policy_from_config( + config2, + workspace=tmp_path, + adapter_factory=factory2, + shadow_baseline_id="baseline-x", + shadow_rate=0.0, + ) + shadow_adapter2 = policy_zero.rules[0].prefer + shadow_adapter2.async_shadow = False + for _ in range(3): + shadow_adapter2.execute_prompt("hello", RunConfig(model_name="x")) + assert stubs2["candidate-a"].calls == 3 + assert stubs2["baseline-x"].calls == 0, "rate=0.0 should skip baseline entirely" + + def test_rejects_non_string_ledger_path() -> None: payload = { "schema_version": 1, diff --git a/workplans/IB-WP-0020-provider-routing-cli.md b/workplans/IB-WP-0020-provider-routing-cli.md index 670fc01..0d8c654 100644 --- a/workplans/IB-WP-0020-provider-routing-cli.md +++ b/workplans/IB-WP-0020-provider-routing-cli.md @@ -4,7 +4,7 @@ type: workplan title: "Provider Routing CLI Integration" domain: markitect repo: infospace-bench -status: active +status: finished owner: markitect topic_slug: markitect created: "2026-05-18" @@ -158,7 +158,7 @@ state_hub_task_id: "69288131-f265-4db5-a4b0-b0c8a6f55dd8" ```task id: IB-WP-0020-T05 -status: todo +status: done priority: medium state_hub_task_id: "02658420-056c-4d73-8055-e6a7ab51876b" ```