IB-WP-0018-T01+T02+T05: routing bridge to llm-connect

T01 — task-type taxonomy. docs/routing-task-types.md names the five
generation stages as the default identity-mapped task types
(summarize-source, extract-entities, extract-relations,
evaluate-entity, synthesize-report) and records the recommended quality
floors per stage. The taxonomy explicitly does not decide which adapter
ships per task type, where the ledger lives, or what a quality score
means — those stay with the caller per the LLM-WP-0004 scope guardrail.

T02 — RoutingAssistedGenerationAdapter bridge in
src/infospace_bench/routing.py. Wraps any llm-connect RoutingPolicy or
AdaptiveRoutingPolicy as an infospace-bench AssistedGenerationAdapter:
maps stage_id -> task_type (overridable), resolves an LLMAdapter,
delegates execute_prompt with a configurable RunConfig, and surfaces
the resolved adapter id, task type, model, usage, and finish_reason
back on AssistedGenerationResult.metadata. Provider tag stays
back-compatible with the strings already used in run records and the
budget rollup (openrouter / claude_code / openai / gemini / mock /
routing).

T05 — eight tests in tests/test_routing_adapter.py cover: static-policy
per-stage resolution, stage_to_task_type overrides, default-mapping
completeness, fall-through for unmapped stage ids, the adaptive path
selecting the cheaper qualifying adapter when a quality_floor is set,
adaptive policy falling back to static when no floor is set, response
metadata round-trip with provider tagging, and estimated_cost_per_1k
pass-through.

Adds llm-connect as a path dependency on pyproject.toml and to the
pytest pythonpath. Static OpenRouter and fixture paths are unchanged;
this commit only adds the option of routing.

139 tests pass, 1 skipped (the OpenRouter live smoke, gated as before).

T03 (shadow-mode integration) and T04 (CLI + per-stage chosen-adapter
in the generation report) follow next.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-18 11:33:58 +02:00
parent 1d62dffae9
commit 0a83e908ce
4 changed files with 449 additions and 1 deletions

View File

@@ -0,0 +1,78 @@
# Task-Type Taxonomy for Routing
Workplan: IB-WP-0018 (T01)
Depends on: llm-connect LLM-WP-0004 (RoutingPolicy, AdaptiveRoutingPolicy)
This file names the task types that infospace-bench emits when it routes
each generation stage through llm-connect. The names are the consumer
side of LLM-WP-0004's scope guardrail: llm-connect ships the routing
primitives, infospace-bench owns the taxonomy.
## Default identity mapping
`RoutingAssistedGenerationAdapter` (see `src/infospace_bench/routing.py`)
maps stage ids to task types using the identity mapping below by
default. Callers override individual entries via
`RoutingAssistedGenerationAdapter(..., stage_to_task_type={...})`.
| Stage id | Task type | Notes |
|---|---|---|
| `summarize-source` | `summarize-source` | One call per source chunk. Cheap, high-volume; small models usually clear the bar. |
| `extract-entities` | `extract-entities` | One call per source chunk. Quality matters most here — bad extractions cascade. |
| `extract-relations` | `extract-relations` | One call per source chunk. Quality close to extraction; relations rely on entity titles being stable. |
| `evaluate-entity` | `evaluate-entity` | One call per generated entity. Cheap, often a different model than extraction to avoid self-grading. |
| `synthesize-report` | `synthesize-report` | One call at the end. Volume-of-one; quality matters; cost negligible. |
## Quality expectations
`AdaptiveRoutingPolicy.resolve(task_type, quality_floor=...)` picks the
cheapest adapter whose ledger-observed mean quality clears the floor.
The recommended starting floors:
| Task type | Quality floor | Rationale |
|---|---|---|
| `summarize-source` | 0.70 | Summaries are intermediate. Slight quality loss is recoverable downstream. |
| `extract-entities` | 0.85 | Entities are the durable output. Be strict. |
| `extract-relations` | 0.80 | Relations depend on entities; slightly looser is OK as long as evidence is intact. |
| `evaluate-entity` | 0.80 | Judge-level reliability. Self-grading bias is more of a concern than absolute score. |
| `synthesize-report` | 0.70 | The report is a review surface; tolerate looser language for cheaper models. |
These are starting points. Bind them at the calling site
(`RoutingAssistedGenerationAdapter(..., quality_floor=0.85)` for
extraction stages) — they are not enforced by this taxonomy.
## Common overrides
Callers may want to **collapse** task types to share observations across
related stages, or **split** a task type to pin a specific model to a
narrow workload. Two illustrative overrides:
```python
# Collapse extraction stages so a single ledger drives both
stage_to_task_type = {
"extract-entities": "extraction",
"extract-relations": "extraction",
}
```
```python
# Split entity evaluation by category — useful when a profile has very
# different quality bars for different entity categories (e.g.
# trading-literature's `evidence_bearing_claim` is harder to judge than
# `instrument`).
stage_to_task_type = {
"evaluate-entity": "judge",
}
```
Anything not in the override map falls through to the identity mapping.
## What this taxonomy does NOT decide
- **Which adapter ships per task type.** That belongs to the caller's
`RoutingPolicy` rule list.
- **Where the quality ledger lives.** Caller-supplied path on the
`AdaptiveRoutingPolicy`.
- **When to refresh observations.** Caller decides via the ledger's TTL
helpers in llm-connect.
- **What a quality score means.** Each judge defines its own.

View File

@@ -7,6 +7,7 @@ dependencies = [
"PyYAML>=6",
"markitect-tool @ file:///home/worsch/markitect-tool",
"artifactstore @ file:///home/worsch/artifact-store",
"llm-connect @ file:///home/worsch/llm-connect",
]
[project.scripts]
@@ -16,5 +17,5 @@ infospace-bench = "infospace_bench.cli:main"
infospace_bench = ["profiles/**/*", "model_rates.yaml"]
[tool.pytest.ini_options]
pythonpath = ["src", "../markitect-tool/src"]
pythonpath = ["src", "../markitect-tool/src", "../llm-connect"]
testpaths = ["tests"]

View File

@@ -0,0 +1,137 @@
"""
Bridge between infospace-bench's ``AssistedGenerationAdapter`` protocol and
llm-connect's ``RoutingPolicy`` / ``AdaptiveRoutingPolicy`` primitives
(LLM-WP-0004). Lets a generation run delegate each stage to a task-typed
route without touching ``workflow.py``.
The mapping from infospace-bench workflow stage ids to llm-connect task
types is the consumer side of LLM-WP-0004's scope guardrail: llm-connect
ships the routing primitives, infospace-bench names the tasks.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
from llm_connect.adapter import LLMAdapter
from llm_connect.models import RunConfig
from llm_connect.routing import AdaptiveRoutingPolicy, RoutingPolicy
from .workflow import AssistedGenerationRequest, AssistedGenerationResult
# Default identity mapping: every generation stage shipped by the
# generic-source profile is its own task type. Callers can override
# individual stages via the ``stage_to_task_type`` field — for example to
# collapse ``extract-entities`` and ``extract-relations`` into a single
# ``extraction`` route, or to widen ``evaluate-entity`` to ``judge``.
STAGE_TO_TASK_TYPE_DEFAULT: dict[str, str] = {
"summarize-source": "summarize-source",
"extract-entities": "extract-entities",
"extract-relations": "extract-relations",
"evaluate-entity": "evaluate-entity",
"synthesize-report": "synthesize-report",
}
@dataclass(frozen=True)
class RoutingAssistedGenerationAdapter:
"""Route assisted-generation requests through an llm-connect policy.
On each ``generate(request)`` call:
1. Resolves ``task_type`` from ``request.stage_id`` (overridable via
``stage_to_task_type``; default falls back to the stage id itself).
2. Asks the policy for an adapter. When the policy is an
``AdaptiveRoutingPolicy`` and ``quality_floor`` is set, the
adaptive path is used; otherwise the policy resolves statically.
3. Calls the resolved llm-connect ``LLMAdapter.execute_prompt`` with a
``RunConfig`` built from ``default_run_config``.
4. Maps the ``LLMResponse`` back to an ``AssistedGenerationResult``
and preserves model, usage, finish_reason, and the resolved
task_type / adapter_id in ``metadata``.
"""
policy: RoutingPolicy
stage_to_task_type: dict[str, str] = field(default_factory=dict)
default_run_config: RunConfig = field(default_factory=RunConfig)
quality_floor: float | None = None
estimated_cost_per_1k: float | None = None
def generate(
self, request: AssistedGenerationRequest
) -> AssistedGenerationResult:
task_type = self._task_type_for(request.stage_id)
adapter = self._resolve(task_type)
response = adapter.execute_prompt(request.prompt, self.default_run_config)
adapter_id = _identify_adapter(adapter)
metadata: dict[str, Any] = {
"task_type": task_type,
"adapter_id": adapter_id,
"model": response.model or self.default_run_config.model_name,
"usage": dict(response.usage or {}),
"finish_reason": response.finish_reason,
}
if response.metadata:
metadata.update(response.metadata)
return AssistedGenerationResult(
markdown=response.content,
provider=_provider_tag(adapter),
metadata=metadata,
)
def _resolve(self, task_type: str) -> LLMAdapter:
if isinstance(self.policy, AdaptiveRoutingPolicy) and self.quality_floor is not None:
return self.policy.resolve(
task_type,
estimated_cost_per_1k=self.estimated_cost_per_1k,
quality_floor=self.quality_floor,
)
return self.policy.resolve(
task_type,
estimated_cost_per_1k=self.estimated_cost_per_1k,
)
def _task_type_for(self, stage_id: str) -> str:
merged = dict(STAGE_TO_TASK_TYPE_DEFAULT)
merged.update(self.stage_to_task_type)
return merged.get(stage_id, stage_id)
def _identify_adapter(adapter: LLMAdapter) -> str:
"""Best-effort stable id for an llm-connect adapter instance.
Prefers an explicit ``adapter_id`` attribute (some adapters set it),
falls back to ``{class_name}:{model_attr}`` when a model attribute is
present, otherwise just the class name.
"""
adapter_id = getattr(adapter, "adapter_id", "")
if adapter_id:
return str(adapter_id)
model = getattr(adapter, "model", "") or getattr(adapter, "model_name", "")
name = type(adapter).__name__
if model:
return f"{name}:{model}"
return name
def _provider_tag(adapter: LLMAdapter) -> str:
"""Coarse provider tag matching the strings already used in run records.
Returns ``openrouter`` / ``claude_code`` / ``openai`` / ``gemini`` /
``routing`` so existing tooling (budget rollup buckets, archive
metadata) keeps its bucket keys stable.
"""
name = type(adapter).__name__.lower()
if "openrouter" in name:
return "openrouter"
if "claudecode" in name or "claude_code" in name:
return "claude_code"
if "openai" in name:
return "openai"
if "gemini" in name:
return "gemini"
if "mock" in name or "static" in name:
return "mock"
return "routing"

View File

@@ -0,0 +1,232 @@
"""
Tests for the routing bridge that wraps an llm-connect RoutingPolicy as
an infospace-bench AssistedGenerationAdapter (IB-WP-0018 T02/T05).
All tests use mocked llm-connect ``LLMAdapter`` instances — no network.
"""
from __future__ import annotations
from typing import Any
import pytest
from llm_connect.adapter import LLMAdapter
from llm_connect.models import LLMResponse, RunConfig
from llm_connect.routing import (
AdaptiveRoutingPolicy,
RoutingPolicy,
RoutingRule,
)
from llm_connect.quality import QualityLedger, QualityObservation
from infospace_bench.routing import (
STAGE_TO_TASK_TYPE_DEFAULT,
RoutingAssistedGenerationAdapter,
)
from infospace_bench.workflow import AssistedGenerationRequest
class _MockAdapter(LLMAdapter):
"""Test double: returns a configured ``LLMResponse`` and records calls."""
def __init__(self, *, model: str, content: str = "ok", cost_per_call: float = 0.0) -> None:
self.model = model
self._content = content
self._cost_per_call = cost_per_call
self.calls: list[tuple[str, RunConfig]] = []
def execute_prompt(self, prompt: str, config: RunConfig) -> LLMResponse:
self.calls.append((prompt, config))
return LLMResponse(
content=self._content,
model=self.model,
usage={"prompt_tokens": 100, "completion_tokens": 50},
finish_reason="stop",
metadata={"cost_per_call_usd": self._cost_per_call},
)
def validate_config(self, config: RunConfig) -> bool: # pragma: no cover - trivial
return True
def _request(stage_id: str, prompt: str = "Hello.") -> AssistedGenerationRequest:
return AssistedGenerationRequest(
stage_id=stage_id,
workflow_id="generic-source-entities",
input_artifact_id="source/test.md",
prompt=prompt,
)
def test_bridge_resolves_static_policy_per_stage() -> None:
cheap = _MockAdapter(model="cheap-1", content="# Cheap")
smart = _MockAdapter(model="smart-1", content="# Smart")
policy = RoutingPolicy(
rules=[
RoutingRule(task_type="summarize-source", prefer=cheap),
RoutingRule(task_type="extract-entities", prefer=smart),
],
default=cheap,
)
bridge = RoutingAssistedGenerationAdapter(policy=policy)
summary = bridge.generate(_request("summarize-source", "Source A"))
entities = bridge.generate(_request("extract-entities", "Source A"))
assert summary.markdown == "# Cheap"
assert summary.metadata["task_type"] == "summarize-source"
assert summary.metadata["model"] == "cheap-1"
assert summary.metadata["usage"]["prompt_tokens"] == 100
assert entities.markdown == "# Smart"
assert entities.metadata["model"] == "smart-1"
assert len(cheap.calls) == 1
assert len(smart.calls) == 1
def test_bridge_honours_stage_to_task_type_overrides() -> None:
extraction = _MockAdapter(model="extraction-1")
policy = RoutingPolicy(
rules=[RoutingRule(task_type="extraction", prefer=extraction)],
)
bridge = RoutingAssistedGenerationAdapter(
policy=policy,
stage_to_task_type={
"extract-entities": "extraction",
"extract-relations": "extraction",
},
)
bridge.generate(_request("extract-entities"))
bridge.generate(_request("extract-relations"))
assert len(extraction.calls) == 2
def test_bridge_default_task_type_map_covers_all_known_stages() -> None:
expected = {
"summarize-source",
"extract-entities",
"extract-relations",
"evaluate-entity",
"synthesize-report",
}
assert set(STAGE_TO_TASK_TYPE_DEFAULT) == expected
# Identity mapping by default
for stage in expected:
assert STAGE_TO_TASK_TYPE_DEFAULT[stage] == stage
def test_bridge_falls_through_to_stage_id_when_no_known_mapping() -> None:
custom_adapter = _MockAdapter(model="custom-1")
policy = RoutingPolicy(
rules=[RoutingRule(task_type="custom-stage", prefer=custom_adapter)],
)
bridge = RoutingAssistedGenerationAdapter(policy=policy)
result = bridge.generate(_request("custom-stage"))
assert result.markdown == "# ok" or result.markdown == "ok"
assert custom_adapter.calls, "custom stage_id should fall through to the same task_type"
def test_bridge_uses_adaptive_path_when_quality_floor_set(tmp_path) -> None:
cheap = _MockAdapter(model="cheap-1")
smart = _MockAdapter(model="smart-1")
ledger = QualityLedger(path=tmp_path / "quality.jsonl")
# Cheap clears the floor; smart does too but at a higher cost.
for _ in range(3):
ledger.append(
QualityObservation(
task_type="extract-entities",
adapter_id="cheap-1",
model_id="cheap-1",
cost_usd=0.001,
quality_score=0.9,
tokens_in=100,
tokens_out=50,
latency_ms=10,
)
)
ledger.append(
QualityObservation(
task_type="extract-entities",
adapter_id="smart-1",
model_id="smart-1",
cost_usd=0.01,
quality_score=0.95,
tokens_in=100,
tokens_out=50,
latency_ms=10,
)
)
policy = AdaptiveRoutingPolicy(
rules=[RoutingRule(task_type="extract-entities", prefer=smart)],
default=cheap,
ledger=ledger,
adapters_by_id={"cheap-1": cheap, "smart-1": smart},
)
bridge = RoutingAssistedGenerationAdapter(policy=policy, quality_floor=0.8)
bridge.generate(_request("extract-entities"))
assert cheap.calls, "adaptive policy should pick the cheaper qualifying adapter"
assert not smart.calls
def test_bridge_falls_back_to_static_when_quality_floor_unset(tmp_path) -> None:
cheap = _MockAdapter(model="cheap-1")
smart = _MockAdapter(model="smart-1")
ledger = QualityLedger(path=tmp_path / "quality.jsonl")
policy = AdaptiveRoutingPolicy(
rules=[RoutingRule(task_type="extract-entities", prefer=smart)],
ledger=ledger,
)
bridge = RoutingAssistedGenerationAdapter(policy=policy) # no quality_floor
bridge.generate(_request("extract-entities"))
assert smart.calls, "without a quality_floor the bridge must use static routing"
assert not cheap.calls
def test_bridge_preserves_response_metadata_and_provider_tag() -> None:
adapter = _MockAdapter(model="cheap-1")
adapter.execute_prompt = lambda prompt, config: LLMResponse( # type: ignore[assignment]
content="# ok",
model="cheap-1",
usage={"prompt_tokens": 7, "completion_tokens": 3},
finish_reason="stop",
metadata={"request_id": "req-42"},
)
policy = RoutingPolicy(rules=[RoutingRule(task_type="custom", prefer=adapter)])
bridge = RoutingAssistedGenerationAdapter(policy=policy)
result = bridge.generate(_request("custom"))
assert result.metadata["request_id"] == "req-42"
assert result.metadata["usage"] == {"prompt_tokens": 7, "completion_tokens": 3}
assert result.metadata["task_type"] == "custom"
assert result.metadata["adapter_id"].endswith(":cheap-1")
assert result.provider == "mock"
def test_bridge_passes_estimated_cost_per_1k_through() -> None:
captured: dict[str, Any] = {}
class _PolicyProbe(RoutingPolicy):
def resolve(self, task_type, estimated_cost_per_1k=None): # type: ignore[override]
captured["task_type"] = task_type
captured["estimated_cost_per_1k"] = estimated_cost_per_1k
return _MockAdapter(model="x")
bridge = RoutingAssistedGenerationAdapter(
policy=_PolicyProbe(),
estimated_cost_per_1k=0.5,
)
bridge.generate(_request("summarize-source"))
assert captured["task_type"] == "summarize-source"
assert captured["estimated_cost_per_1k"] == 0.5