Files
infospace-bench/src/infospace_bench/routing_config.py
tegwick c11a942bb7 IB-WP-0020-T01: routing config schema and parser
Add a small YAML routing config schema (schema_version 1) and a
parser-only loader at src/infospace_bench/routing_config.py. The
loader validates the declarative shape — task_types with candidates,
optional per-task quality_floor, optional default_quality_floor,
optional ledger_path, optional stage_to_task_type override map — and
refuses bad shapes before any network or workspace work happens.

Supported provider names: openrouter, claude_code, openai, gemini.
Unknown providers, missing required candidate fields, out-of-range
quality floors, negative max_cost_per_1k, duplicate candidate ids
within a task type, and non-mapping stage_to_task_type all raise
focused InfospaceError codes that callers can pattern-match.

docs/routing-config.md documents the schema with two annotated
examples (OpenRouter-only two-tier, and adaptive with a ClaudeCode
baseline) plus the full "what fails fast" list.

16 parser tests cover happy-path round-trip, file load, missing file,
malformed YAML, and every validation surface (wrong/missing schema
version, empty task_types, empty candidates, missing required fields,
unsupported provider, negative cost, out-of-range quality_floor,
duplicate ids, non-mapping stage_map, non-string ledger_path).

T02 will turn a RoutingConfig into a live llm-connect RoutingPolicy /
AdaptiveRoutingPolicy with constructed LLMAdapter instances.

160 tests pass, 1 skipped.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-18 18:09:28 +02:00

266 lines
9.1 KiB
Python

"""
Routing config schema (IB-WP-0020-T01).
Parser-only: this module reads a YAML file into validated dataclasses.
The follow-on task T02 takes a ``RoutingConfig`` and constructs the
actual llm-connect ``RoutingPolicy`` / ``AdaptiveRoutingPolicy`` plus
LLMAdapter instances (which involves API keys and provider-specific
construction). Keeping parsing separate lets T01 stay network-free and
deterministically testable.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import yaml
from .errors import InfospaceError
ROUTING_SCHEMA_VERSION = 1
# Provider names that the T02 loader will know how to construct.
# Validation happens here so a config typo fails before any work begins.
SUPPORTED_PROVIDERS: frozenset[str] = frozenset(
{"openrouter", "claude_code", "openai", "gemini"}
)
@dataclass(frozen=True)
class RoutingCandidateConfig:
"""One candidate adapter inside a task_type rule."""
id: str
provider: str
model: str
api_key_env: str = ""
max_cost_per_1k: float | None = None
@dataclass(frozen=True)
class RoutingTaskTypeConfig:
"""All candidate adapters for one task_type, with an optional quality floor."""
task_type: str
candidates: tuple[RoutingCandidateConfig, ...]
quality_floor: float | None = None
@dataclass(frozen=True)
class RoutingConfig:
"""Top-level routing config payload, parsed from YAML."""
schema_version: int
task_types: tuple[RoutingTaskTypeConfig, ...]
default_quality_floor: float | None = None
ledger_path: str | None = None
stage_to_task_type: dict[str, str] = field(default_factory=dict)
def load_routing_config(path: str | Path) -> RoutingConfig:
"""Read and validate a routing config YAML file."""
config_path = Path(path)
if not config_path.is_file():
raise InfospaceError(
"missing_routing_config",
f"Routing config does not exist: {config_path}",
{"path": str(config_path)},
)
raw_text = config_path.read_text(encoding="utf-8")
try:
data = yaml.safe_load(raw_text)
except yaml.YAMLError as exc:
raise InfospaceError(
"invalid_routing_config_yaml",
f"Routing config is not valid YAML: {exc}",
{"path": str(config_path)},
) from exc
if not isinstance(data, dict):
raise InfospaceError(
"invalid_routing_config",
"Routing config must be a YAML mapping at the top level",
{"path": str(config_path)},
)
return parse_routing_config(data, source=str(config_path))
def parse_routing_config(
data: dict[str, Any], *, source: str = "<inline>"
) -> RoutingConfig:
"""Validate a parsed routing config dict and return a frozen config."""
schema_version = data.get("schema_version")
if not isinstance(schema_version, int) or schema_version != ROUTING_SCHEMA_VERSION:
raise InfospaceError(
"unsupported_routing_schema",
f"Routing config schema_version must be {ROUTING_SCHEMA_VERSION}",
{"source": source, "got": schema_version},
)
task_types_raw = data.get("task_types") or {}
if not isinstance(task_types_raw, dict) or not task_types_raw:
raise InfospaceError(
"empty_routing_task_types",
"Routing config must declare at least one task_type with candidates",
{"source": source},
)
task_types: list[RoutingTaskTypeConfig] = []
for task_type, entry in task_types_raw.items():
task_types.append(_parse_task_type(str(task_type), entry, source=source))
default_floor = _optional_quality_floor(
data.get("default_quality_floor"), "default_quality_floor", source
)
ledger_path_value = data.get("ledger_path")
if ledger_path_value is not None and not isinstance(ledger_path_value, str):
raise InfospaceError(
"invalid_routing_ledger_path",
"ledger_path must be a string when present",
{"source": source},
)
stage_map_raw = data.get("stage_to_task_type") or {}
if not isinstance(stage_map_raw, dict):
raise InfospaceError(
"invalid_routing_stage_map",
"stage_to_task_type must be a mapping",
{"source": source},
)
stage_to_task_type = {str(key): str(value) for key, value in stage_map_raw.items()}
return RoutingConfig(
schema_version=schema_version,
task_types=tuple(task_types),
default_quality_floor=default_floor,
ledger_path=ledger_path_value if isinstance(ledger_path_value, str) else None,
stage_to_task_type=stage_to_task_type,
)
def _parse_task_type(
task_type: str, entry: Any, *, source: str
) -> RoutingTaskTypeConfig:
if not isinstance(entry, dict):
raise InfospaceError(
"invalid_routing_task_type",
f"task_types.{task_type} must be a mapping",
{"source": source, "task_type": task_type},
)
candidates_raw = entry.get("candidates") or []
if not isinstance(candidates_raw, list) or not candidates_raw:
raise InfospaceError(
"empty_routing_candidates",
f"task_types.{task_type} must declare at least one candidate",
{"source": source, "task_type": task_type},
)
candidates: list[RoutingCandidateConfig] = []
seen_ids: set[str] = set()
for index, candidate_raw in enumerate(candidates_raw):
candidate = _parse_candidate(task_type, index, candidate_raw, source=source)
if candidate.id in seen_ids:
raise InfospaceError(
"duplicate_routing_candidate_id",
f"task_types.{task_type} has duplicate candidate id {candidate.id!r}",
{"source": source, "task_type": task_type, "id": candidate.id},
)
seen_ids.add(candidate.id)
candidates.append(candidate)
quality_floor = _optional_quality_floor(
entry.get("quality_floor"),
f"task_types.{task_type}.quality_floor",
source,
)
return RoutingTaskTypeConfig(
task_type=task_type,
candidates=tuple(candidates),
quality_floor=quality_floor,
)
def _parse_candidate(
task_type: str, index: int, candidate_raw: Any, *, source: str
) -> RoutingCandidateConfig:
if not isinstance(candidate_raw, dict):
raise InfospaceError(
"invalid_routing_candidate",
f"task_types.{task_type}.candidates[{index}] must be a mapping",
{"source": source, "task_type": task_type, "index": index},
)
candidate_id = str(candidate_raw.get("id") or "").strip()
provider = str(candidate_raw.get("provider") or "").strip().lower()
model = str(candidate_raw.get("model") or "").strip()
missing = [
field_name
for field_name, value in (("id", candidate_id), ("provider", provider), ("model", model))
if not value
]
if missing:
raise InfospaceError(
"missing_routing_candidate_field",
f"task_types.{task_type}.candidates[{index}] is missing required fields: "
f"{', '.join(missing)}",
{
"source": source,
"task_type": task_type,
"index": index,
"missing": missing,
},
)
if provider not in SUPPORTED_PROVIDERS:
raise InfospaceError(
"unsupported_routing_provider",
f"Unsupported provider {provider!r}; allowed: {sorted(SUPPORTED_PROVIDERS)}",
{
"source": source,
"task_type": task_type,
"index": index,
"provider": provider,
},
)
max_cost = _optional_float(
candidate_raw.get("max_cost_per_1k"),
f"task_types.{task_type}.candidates[{index}].max_cost_per_1k",
source,
)
if max_cost is not None and max_cost < 0:
raise InfospaceError(
"invalid_routing_max_cost",
"max_cost_per_1k must be non-negative",
{"source": source, "task_type": task_type, "index": index, "value": max_cost},
)
api_key_env = str(candidate_raw.get("api_key_env") or "").strip()
return RoutingCandidateConfig(
id=candidate_id,
provider=provider,
model=model,
api_key_env=api_key_env,
max_cost_per_1k=max_cost,
)
def _optional_float(value: Any, name: str, source: str) -> float | None:
if value is None:
return None
try:
return float(value)
except (TypeError, ValueError) as exc:
raise InfospaceError(
"invalid_routing_float",
f"{name} must be numeric",
{"source": source, "value": value},
) from exc
def _optional_quality_floor(value: Any, name: str, source: str) -> float | None:
floor = _optional_float(value, name, source)
if floor is None:
return None
if not 0 <= floor <= 1:
raise InfospaceError(
"invalid_routing_quality_floor",
f"{name} must be between 0 and 1",
{"source": source, "name": name, "value": floor},
)
return floor