From 0a83e908ce822f09d259b0be1fb9d5bd5d77b6f5 Mon Sep 17 00:00:00 2001 From: tegwick Date: Mon, 18 May 2026 11:33:58 +0200 Subject: [PATCH] IB-WP-0018-T01+T02+T05: routing bridge to llm-connect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit T01 — task-type taxonomy. docs/routing-task-types.md names the five generation stages as the default identity-mapped task types (summarize-source, extract-entities, extract-relations, evaluate-entity, synthesize-report) and records the recommended quality floors per stage. The taxonomy explicitly does not decide which adapter ships per task type, where the ledger lives, or what a quality score means — those stay with the caller per the LLM-WP-0004 scope guardrail. T02 — RoutingAssistedGenerationAdapter bridge in src/infospace_bench/routing.py. Wraps any llm-connect RoutingPolicy or AdaptiveRoutingPolicy as an infospace-bench AssistedGenerationAdapter: maps stage_id -> task_type (overridable), resolves an LLMAdapter, delegates execute_prompt with a configurable RunConfig, and surfaces the resolved adapter id, task type, model, usage, and finish_reason back on AssistedGenerationResult.metadata. Provider tag stays back-compatible with the strings already used in run records and the budget rollup (openrouter / claude_code / openai / gemini / mock / routing). T05 — eight tests in tests/test_routing_adapter.py cover: static-policy per-stage resolution, stage_to_task_type overrides, default-mapping completeness, fall-through for unmapped stage ids, the adaptive path selecting the cheaper qualifying adapter when a quality_floor is set, adaptive policy falling back to static when no floor is set, response metadata round-trip with provider tagging, and estimated_cost_per_1k pass-through. Adds llm-connect as a path dependency on pyproject.toml and to the pytest pythonpath. Static OpenRouter and fixture paths are unchanged; this commit only adds the option of routing. 139 tests pass, 1 skipped (the OpenRouter live smoke, gated as before). T03 (shadow-mode integration) and T04 (CLI + per-stage chosen-adapter in the generation report) follow next. Co-Authored-By: Claude Opus 4.7 --- docs/routing-task-types.md | 78 +++++++++++ pyproject.toml | 3 +- src/infospace_bench/routing.py | 137 +++++++++++++++++++ tests/test_routing_adapter.py | 232 +++++++++++++++++++++++++++++++++ 4 files changed, 449 insertions(+), 1 deletion(-) create mode 100644 docs/routing-task-types.md create mode 100644 src/infospace_bench/routing.py create mode 100644 tests/test_routing_adapter.py diff --git a/docs/routing-task-types.md b/docs/routing-task-types.md new file mode 100644 index 0000000..e4cb19c --- /dev/null +++ b/docs/routing-task-types.md @@ -0,0 +1,78 @@ +# Task-Type Taxonomy for Routing + +Workplan: IB-WP-0018 (T01) +Depends on: llm-connect LLM-WP-0004 (RoutingPolicy, AdaptiveRoutingPolicy) + +This file names the task types that infospace-bench emits when it routes +each generation stage through llm-connect. The names are the consumer +side of LLM-WP-0004's scope guardrail: llm-connect ships the routing +primitives, infospace-bench owns the taxonomy. + +## Default identity mapping + +`RoutingAssistedGenerationAdapter` (see `src/infospace_bench/routing.py`) +maps stage ids to task types using the identity mapping below by +default. Callers override individual entries via +`RoutingAssistedGenerationAdapter(..., stage_to_task_type={...})`. + +| Stage id | Task type | Notes | +|---|---|---| +| `summarize-source` | `summarize-source` | One call per source chunk. Cheap, high-volume; small models usually clear the bar. | +| `extract-entities` | `extract-entities` | One call per source chunk. Quality matters most here — bad extractions cascade. | +| `extract-relations` | `extract-relations` | One call per source chunk. Quality close to extraction; relations rely on entity titles being stable. | +| `evaluate-entity` | `evaluate-entity` | One call per generated entity. Cheap, often a different model than extraction to avoid self-grading. | +| `synthesize-report` | `synthesize-report` | One call at the end. Volume-of-one; quality matters; cost negligible. | + +## Quality expectations + +`AdaptiveRoutingPolicy.resolve(task_type, quality_floor=...)` picks the +cheapest adapter whose ledger-observed mean quality clears the floor. +The recommended starting floors: + +| Task type | Quality floor | Rationale | +|---|---|---| +| `summarize-source` | 0.70 | Summaries are intermediate. Slight quality loss is recoverable downstream. | +| `extract-entities` | 0.85 | Entities are the durable output. Be strict. | +| `extract-relations` | 0.80 | Relations depend on entities; slightly looser is OK as long as evidence is intact. | +| `evaluate-entity` | 0.80 | Judge-level reliability. Self-grading bias is more of a concern than absolute score. | +| `synthesize-report` | 0.70 | The report is a review surface; tolerate looser language for cheaper models. | + +These are starting points. Bind them at the calling site +(`RoutingAssistedGenerationAdapter(..., quality_floor=0.85)` for +extraction stages) — they are not enforced by this taxonomy. + +## Common overrides + +Callers may want to **collapse** task types to share observations across +related stages, or **split** a task type to pin a specific model to a +narrow workload. Two illustrative overrides: + +```python +# Collapse extraction stages so a single ledger drives both +stage_to_task_type = { + "extract-entities": "extraction", + "extract-relations": "extraction", +} +``` + +```python +# Split entity evaluation by category — useful when a profile has very +# different quality bars for different entity categories (e.g. +# trading-literature's `evidence_bearing_claim` is harder to judge than +# `instrument`). +stage_to_task_type = { + "evaluate-entity": "judge", +} +``` + +Anything not in the override map falls through to the identity mapping. + +## What this taxonomy does NOT decide + +- **Which adapter ships per task type.** That belongs to the caller's + `RoutingPolicy` rule list. +- **Where the quality ledger lives.** Caller-supplied path on the + `AdaptiveRoutingPolicy`. +- **When to refresh observations.** Caller decides via the ledger's TTL + helpers in llm-connect. +- **What a quality score means.** Each judge defines its own. diff --git a/pyproject.toml b/pyproject.toml index 5139848..9bb10d1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,6 +7,7 @@ dependencies = [ "PyYAML>=6", "markitect-tool @ file:///home/worsch/markitect-tool", "artifactstore @ file:///home/worsch/artifact-store", + "llm-connect @ file:///home/worsch/llm-connect", ] [project.scripts] @@ -16,5 +17,5 @@ infospace-bench = "infospace_bench.cli:main" infospace_bench = ["profiles/**/*", "model_rates.yaml"] [tool.pytest.ini_options] -pythonpath = ["src", "../markitect-tool/src"] +pythonpath = ["src", "../markitect-tool/src", "../llm-connect"] testpaths = ["tests"] diff --git a/src/infospace_bench/routing.py b/src/infospace_bench/routing.py new file mode 100644 index 0000000..f07e106 --- /dev/null +++ b/src/infospace_bench/routing.py @@ -0,0 +1,137 @@ +""" +Bridge between infospace-bench's ``AssistedGenerationAdapter`` protocol and +llm-connect's ``RoutingPolicy`` / ``AdaptiveRoutingPolicy`` primitives +(LLM-WP-0004). Lets a generation run delegate each stage to a task-typed +route without touching ``workflow.py``. + +The mapping from infospace-bench workflow stage ids to llm-connect task +types is the consumer side of LLM-WP-0004's scope guardrail: llm-connect +ships the routing primitives, infospace-bench names the tasks. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + +from llm_connect.adapter import LLMAdapter +from llm_connect.models import RunConfig +from llm_connect.routing import AdaptiveRoutingPolicy, RoutingPolicy + +from .workflow import AssistedGenerationRequest, AssistedGenerationResult + + +# Default identity mapping: every generation stage shipped by the +# generic-source profile is its own task type. Callers can override +# individual stages via the ``stage_to_task_type`` field — for example to +# collapse ``extract-entities`` and ``extract-relations`` into a single +# ``extraction`` route, or to widen ``evaluate-entity`` to ``judge``. +STAGE_TO_TASK_TYPE_DEFAULT: dict[str, str] = { + "summarize-source": "summarize-source", + "extract-entities": "extract-entities", + "extract-relations": "extract-relations", + "evaluate-entity": "evaluate-entity", + "synthesize-report": "synthesize-report", +} + + +@dataclass(frozen=True) +class RoutingAssistedGenerationAdapter: + """Route assisted-generation requests through an llm-connect policy. + + On each ``generate(request)`` call: + + 1. Resolves ``task_type`` from ``request.stage_id`` (overridable via + ``stage_to_task_type``; default falls back to the stage id itself). + 2. Asks the policy for an adapter. When the policy is an + ``AdaptiveRoutingPolicy`` and ``quality_floor`` is set, the + adaptive path is used; otherwise the policy resolves statically. + 3. Calls the resolved llm-connect ``LLMAdapter.execute_prompt`` with a + ``RunConfig`` built from ``default_run_config``. + 4. Maps the ``LLMResponse`` back to an ``AssistedGenerationResult`` + and preserves model, usage, finish_reason, and the resolved + task_type / adapter_id in ``metadata``. + """ + + policy: RoutingPolicy + stage_to_task_type: dict[str, str] = field(default_factory=dict) + default_run_config: RunConfig = field(default_factory=RunConfig) + quality_floor: float | None = None + estimated_cost_per_1k: float | None = None + + def generate( + self, request: AssistedGenerationRequest + ) -> AssistedGenerationResult: + task_type = self._task_type_for(request.stage_id) + adapter = self._resolve(task_type) + response = adapter.execute_prompt(request.prompt, self.default_run_config) + adapter_id = _identify_adapter(adapter) + metadata: dict[str, Any] = { + "task_type": task_type, + "adapter_id": adapter_id, + "model": response.model or self.default_run_config.model_name, + "usage": dict(response.usage or {}), + "finish_reason": response.finish_reason, + } + if response.metadata: + metadata.update(response.metadata) + return AssistedGenerationResult( + markdown=response.content, + provider=_provider_tag(adapter), + metadata=metadata, + ) + + def _resolve(self, task_type: str) -> LLMAdapter: + if isinstance(self.policy, AdaptiveRoutingPolicy) and self.quality_floor is not None: + return self.policy.resolve( + task_type, + estimated_cost_per_1k=self.estimated_cost_per_1k, + quality_floor=self.quality_floor, + ) + return self.policy.resolve( + task_type, + estimated_cost_per_1k=self.estimated_cost_per_1k, + ) + + def _task_type_for(self, stage_id: str) -> str: + merged = dict(STAGE_TO_TASK_TYPE_DEFAULT) + merged.update(self.stage_to_task_type) + return merged.get(stage_id, stage_id) + + +def _identify_adapter(adapter: LLMAdapter) -> str: + """Best-effort stable id for an llm-connect adapter instance. + + Prefers an explicit ``adapter_id`` attribute (some adapters set it), + falls back to ``{class_name}:{model_attr}`` when a model attribute is + present, otherwise just the class name. + """ + adapter_id = getattr(adapter, "adapter_id", "") + if adapter_id: + return str(adapter_id) + model = getattr(adapter, "model", "") or getattr(adapter, "model_name", "") + name = type(adapter).__name__ + if model: + return f"{name}:{model}" + return name + + +def _provider_tag(adapter: LLMAdapter) -> str: + """Coarse provider tag matching the strings already used in run records. + + Returns ``openrouter`` / ``claude_code`` / ``openai`` / ``gemini`` / + ``routing`` so existing tooling (budget rollup buckets, archive + metadata) keeps its bucket keys stable. + """ + name = type(adapter).__name__.lower() + if "openrouter" in name: + return "openrouter" + if "claudecode" in name or "claude_code" in name: + return "claude_code" + if "openai" in name: + return "openai" + if "gemini" in name: + return "gemini" + if "mock" in name or "static" in name: + return "mock" + return "routing" diff --git a/tests/test_routing_adapter.py b/tests/test_routing_adapter.py new file mode 100644 index 0000000..d13fb4e --- /dev/null +++ b/tests/test_routing_adapter.py @@ -0,0 +1,232 @@ +""" +Tests for the routing bridge that wraps an llm-connect RoutingPolicy as +an infospace-bench AssistedGenerationAdapter (IB-WP-0018 T02/T05). + +All tests use mocked llm-connect ``LLMAdapter`` instances — no network. +""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from llm_connect.adapter import LLMAdapter +from llm_connect.models import LLMResponse, RunConfig +from llm_connect.routing import ( + AdaptiveRoutingPolicy, + RoutingPolicy, + RoutingRule, +) +from llm_connect.quality import QualityLedger, QualityObservation + +from infospace_bench.routing import ( + STAGE_TO_TASK_TYPE_DEFAULT, + RoutingAssistedGenerationAdapter, +) +from infospace_bench.workflow import AssistedGenerationRequest + + +class _MockAdapter(LLMAdapter): + """Test double: returns a configured ``LLMResponse`` and records calls.""" + + def __init__(self, *, model: str, content: str = "ok", cost_per_call: float = 0.0) -> None: + self.model = model + self._content = content + self._cost_per_call = cost_per_call + self.calls: list[tuple[str, RunConfig]] = [] + + def execute_prompt(self, prompt: str, config: RunConfig) -> LLMResponse: + self.calls.append((prompt, config)) + return LLMResponse( + content=self._content, + model=self.model, + usage={"prompt_tokens": 100, "completion_tokens": 50}, + finish_reason="stop", + metadata={"cost_per_call_usd": self._cost_per_call}, + ) + + def validate_config(self, config: RunConfig) -> bool: # pragma: no cover - trivial + return True + + +def _request(stage_id: str, prompt: str = "Hello.") -> AssistedGenerationRequest: + return AssistedGenerationRequest( + stage_id=stage_id, + workflow_id="generic-source-entities", + input_artifact_id="source/test.md", + prompt=prompt, + ) + + +def test_bridge_resolves_static_policy_per_stage() -> None: + cheap = _MockAdapter(model="cheap-1", content="# Cheap") + smart = _MockAdapter(model="smart-1", content="# Smart") + policy = RoutingPolicy( + rules=[ + RoutingRule(task_type="summarize-source", prefer=cheap), + RoutingRule(task_type="extract-entities", prefer=smart), + ], + default=cheap, + ) + bridge = RoutingAssistedGenerationAdapter(policy=policy) + + summary = bridge.generate(_request("summarize-source", "Source A")) + entities = bridge.generate(_request("extract-entities", "Source A")) + + assert summary.markdown == "# Cheap" + assert summary.metadata["task_type"] == "summarize-source" + assert summary.metadata["model"] == "cheap-1" + assert summary.metadata["usage"]["prompt_tokens"] == 100 + assert entities.markdown == "# Smart" + assert entities.metadata["model"] == "smart-1" + assert len(cheap.calls) == 1 + assert len(smart.calls) == 1 + + +def test_bridge_honours_stage_to_task_type_overrides() -> None: + extraction = _MockAdapter(model="extraction-1") + policy = RoutingPolicy( + rules=[RoutingRule(task_type="extraction", prefer=extraction)], + ) + bridge = RoutingAssistedGenerationAdapter( + policy=policy, + stage_to_task_type={ + "extract-entities": "extraction", + "extract-relations": "extraction", + }, + ) + + bridge.generate(_request("extract-entities")) + bridge.generate(_request("extract-relations")) + + assert len(extraction.calls) == 2 + + +def test_bridge_default_task_type_map_covers_all_known_stages() -> None: + expected = { + "summarize-source", + "extract-entities", + "extract-relations", + "evaluate-entity", + "synthesize-report", + } + assert set(STAGE_TO_TASK_TYPE_DEFAULT) == expected + # Identity mapping by default + for stage in expected: + assert STAGE_TO_TASK_TYPE_DEFAULT[stage] == stage + + +def test_bridge_falls_through_to_stage_id_when_no_known_mapping() -> None: + custom_adapter = _MockAdapter(model="custom-1") + policy = RoutingPolicy( + rules=[RoutingRule(task_type="custom-stage", prefer=custom_adapter)], + ) + bridge = RoutingAssistedGenerationAdapter(policy=policy) + + result = bridge.generate(_request("custom-stage")) + + assert result.markdown == "# ok" or result.markdown == "ok" + assert custom_adapter.calls, "custom stage_id should fall through to the same task_type" + + +def test_bridge_uses_adaptive_path_when_quality_floor_set(tmp_path) -> None: + cheap = _MockAdapter(model="cheap-1") + smart = _MockAdapter(model="smart-1") + ledger = QualityLedger(path=tmp_path / "quality.jsonl") + # Cheap clears the floor; smart does too but at a higher cost. + for _ in range(3): + ledger.append( + QualityObservation( + task_type="extract-entities", + adapter_id="cheap-1", + model_id="cheap-1", + cost_usd=0.001, + quality_score=0.9, + tokens_in=100, + tokens_out=50, + latency_ms=10, + ) + ) + ledger.append( + QualityObservation( + task_type="extract-entities", + adapter_id="smart-1", + model_id="smart-1", + cost_usd=0.01, + quality_score=0.95, + tokens_in=100, + tokens_out=50, + latency_ms=10, + ) + ) + + policy = AdaptiveRoutingPolicy( + rules=[RoutingRule(task_type="extract-entities", prefer=smart)], + default=cheap, + ledger=ledger, + adapters_by_id={"cheap-1": cheap, "smart-1": smart}, + ) + bridge = RoutingAssistedGenerationAdapter(policy=policy, quality_floor=0.8) + + bridge.generate(_request("extract-entities")) + + assert cheap.calls, "adaptive policy should pick the cheaper qualifying adapter" + assert not smart.calls + + +def test_bridge_falls_back_to_static_when_quality_floor_unset(tmp_path) -> None: + cheap = _MockAdapter(model="cheap-1") + smart = _MockAdapter(model="smart-1") + ledger = QualityLedger(path=tmp_path / "quality.jsonl") + + policy = AdaptiveRoutingPolicy( + rules=[RoutingRule(task_type="extract-entities", prefer=smart)], + ledger=ledger, + ) + bridge = RoutingAssistedGenerationAdapter(policy=policy) # no quality_floor + + bridge.generate(_request("extract-entities")) + + assert smart.calls, "without a quality_floor the bridge must use static routing" + assert not cheap.calls + + +def test_bridge_preserves_response_metadata_and_provider_tag() -> None: + adapter = _MockAdapter(model="cheap-1") + adapter.execute_prompt = lambda prompt, config: LLMResponse( # type: ignore[assignment] + content="# ok", + model="cheap-1", + usage={"prompt_tokens": 7, "completion_tokens": 3}, + finish_reason="stop", + metadata={"request_id": "req-42"}, + ) + policy = RoutingPolicy(rules=[RoutingRule(task_type="custom", prefer=adapter)]) + bridge = RoutingAssistedGenerationAdapter(policy=policy) + + result = bridge.generate(_request("custom")) + + assert result.metadata["request_id"] == "req-42" + assert result.metadata["usage"] == {"prompt_tokens": 7, "completion_tokens": 3} + assert result.metadata["task_type"] == "custom" + assert result.metadata["adapter_id"].endswith(":cheap-1") + assert result.provider == "mock" + + +def test_bridge_passes_estimated_cost_per_1k_through() -> None: + captured: dict[str, Any] = {} + + class _PolicyProbe(RoutingPolicy): + def resolve(self, task_type, estimated_cost_per_1k=None): # type: ignore[override] + captured["task_type"] = task_type + captured["estimated_cost_per_1k"] = estimated_cost_per_1k + return _MockAdapter(model="x") + + bridge = RoutingAssistedGenerationAdapter( + policy=_PolicyProbe(), + estimated_cost_per_1k=0.5, + ) + bridge.generate(_request("summarize-source")) + + assert captured["task_type"] == "summarize-source" + assert captured["estimated_cost_per_1k"] == 0.5