""" Routing config schema (IB-WP-0020-T01). Parser-only: this module reads a YAML file into validated dataclasses. The follow-on task T02 takes a ``RoutingConfig`` and constructs the actual llm-connect ``RoutingPolicy`` / ``AdaptiveRoutingPolicy`` plus LLMAdapter instances (which involves API keys and provider-specific construction). Keeping parsing separate lets T01 stay network-free and deterministically testable. """ from __future__ import annotations import os from dataclasses import dataclass, field from pathlib import Path from typing import Any, Callable, Mapping import yaml from .errors import InfospaceError ROUTING_SCHEMA_VERSION = 1 # Provider names that the T02 loader will know how to construct. # Validation happens here so a config typo fails before any work begins. SUPPORTED_PROVIDERS: frozenset[str] = frozenset( {"openrouter", "claude_code", "openai", "gemini"} ) # Default env var per provider when the config does not name one explicitly. DEFAULT_API_KEY_ENV: dict[str, str] = { "openrouter": "OPENROUTER_API_KEY", "openai": "OPENAI_API_KEY", "gemini": "GOOGLE_API_KEY", # claude_code shells out to the local CLI and needs no API key } AdapterFactory = Callable[["RoutingCandidateConfig", Mapping[str, str]], Any] @dataclass(frozen=True) class RoutingCandidateConfig: """One candidate adapter inside a task_type rule.""" id: str provider: str model: str api_key_env: str = "" max_cost_per_1k: float | None = None @dataclass(frozen=True) class RoutingTaskTypeConfig: """All candidate adapters for one task_type, with an optional quality floor.""" task_type: str candidates: tuple[RoutingCandidateConfig, ...] quality_floor: float | None = None @dataclass(frozen=True) class RoutingConfig: """Top-level routing config payload, parsed from YAML.""" schema_version: int task_types: tuple[RoutingTaskTypeConfig, ...] default_quality_floor: float | None = None ledger_path: str | None = None stage_to_task_type: dict[str, str] = field(default_factory=dict) def load_routing_config(path: str | Path) -> RoutingConfig: """Read and validate a routing config YAML file.""" config_path = Path(path) if not config_path.is_file(): raise InfospaceError( "missing_routing_config", f"Routing config does not exist: {config_path}", {"path": str(config_path)}, ) raw_text = config_path.read_text(encoding="utf-8") try: data = yaml.safe_load(raw_text) except yaml.YAMLError as exc: raise InfospaceError( "invalid_routing_config_yaml", f"Routing config is not valid YAML: {exc}", {"path": str(config_path)}, ) from exc if not isinstance(data, dict): raise InfospaceError( "invalid_routing_config", "Routing config must be a YAML mapping at the top level", {"path": str(config_path)}, ) return parse_routing_config(data, source=str(config_path)) def parse_routing_config( data: dict[str, Any], *, source: str = "" ) -> RoutingConfig: """Validate a parsed routing config dict and return a frozen config.""" schema_version = data.get("schema_version") if not isinstance(schema_version, int) or schema_version != ROUTING_SCHEMA_VERSION: raise InfospaceError( "unsupported_routing_schema", f"Routing config schema_version must be {ROUTING_SCHEMA_VERSION}", {"source": source, "got": schema_version}, ) task_types_raw = data.get("task_types") or {} if not isinstance(task_types_raw, dict) or not task_types_raw: raise InfospaceError( "empty_routing_task_types", "Routing config must declare at least one task_type with candidates", {"source": source}, ) task_types: list[RoutingTaskTypeConfig] = [] for task_type, entry in task_types_raw.items(): task_types.append(_parse_task_type(str(task_type), entry, source=source)) default_floor = _optional_quality_floor( data.get("default_quality_floor"), "default_quality_floor", source ) ledger_path_value = data.get("ledger_path") if ledger_path_value is not None and not isinstance(ledger_path_value, str): raise InfospaceError( "invalid_routing_ledger_path", "ledger_path must be a string when present", {"source": source}, ) stage_map_raw = data.get("stage_to_task_type") or {} if not isinstance(stage_map_raw, dict): raise InfospaceError( "invalid_routing_stage_map", "stage_to_task_type must be a mapping", {"source": source}, ) stage_to_task_type = {str(key): str(value) for key, value in stage_map_raw.items()} return RoutingConfig( schema_version=schema_version, task_types=tuple(task_types), default_quality_floor=default_floor, ledger_path=ledger_path_value if isinstance(ledger_path_value, str) else None, stage_to_task_type=stage_to_task_type, ) def _parse_task_type( task_type: str, entry: Any, *, source: str ) -> RoutingTaskTypeConfig: if not isinstance(entry, dict): raise InfospaceError( "invalid_routing_task_type", f"task_types.{task_type} must be a mapping", {"source": source, "task_type": task_type}, ) candidates_raw = entry.get("candidates") or [] if not isinstance(candidates_raw, list) or not candidates_raw: raise InfospaceError( "empty_routing_candidates", f"task_types.{task_type} must declare at least one candidate", {"source": source, "task_type": task_type}, ) candidates: list[RoutingCandidateConfig] = [] seen_ids: set[str] = set() for index, candidate_raw in enumerate(candidates_raw): candidate = _parse_candidate(task_type, index, candidate_raw, source=source) if candidate.id in seen_ids: raise InfospaceError( "duplicate_routing_candidate_id", f"task_types.{task_type} has duplicate candidate id {candidate.id!r}", {"source": source, "task_type": task_type, "id": candidate.id}, ) seen_ids.add(candidate.id) candidates.append(candidate) quality_floor = _optional_quality_floor( entry.get("quality_floor"), f"task_types.{task_type}.quality_floor", source, ) return RoutingTaskTypeConfig( task_type=task_type, candidates=tuple(candidates), quality_floor=quality_floor, ) def _parse_candidate( task_type: str, index: int, candidate_raw: Any, *, source: str ) -> RoutingCandidateConfig: if not isinstance(candidate_raw, dict): raise InfospaceError( "invalid_routing_candidate", f"task_types.{task_type}.candidates[{index}] must be a mapping", {"source": source, "task_type": task_type, "index": index}, ) candidate_id = str(candidate_raw.get("id") or "").strip() provider = str(candidate_raw.get("provider") or "").strip().lower() model = str(candidate_raw.get("model") or "").strip() missing = [ field_name for field_name, value in (("id", candidate_id), ("provider", provider), ("model", model)) if not value ] if missing: raise InfospaceError( "missing_routing_candidate_field", f"task_types.{task_type}.candidates[{index}] is missing required fields: " f"{', '.join(missing)}", { "source": source, "task_type": task_type, "index": index, "missing": missing, }, ) if provider not in SUPPORTED_PROVIDERS: raise InfospaceError( "unsupported_routing_provider", f"Unsupported provider {provider!r}; allowed: {sorted(SUPPORTED_PROVIDERS)}", { "source": source, "task_type": task_type, "index": index, "provider": provider, }, ) max_cost = _optional_float( candidate_raw.get("max_cost_per_1k"), f"task_types.{task_type}.candidates[{index}].max_cost_per_1k", source, ) if max_cost is not None and max_cost < 0: raise InfospaceError( "invalid_routing_max_cost", "max_cost_per_1k must be non-negative", {"source": source, "task_type": task_type, "index": index, "value": max_cost}, ) api_key_env = str(candidate_raw.get("api_key_env") or "").strip() return RoutingCandidateConfig( id=candidate_id, provider=provider, model=model, api_key_env=api_key_env, max_cost_per_1k=max_cost, ) def _optional_float(value: Any, name: str, source: str) -> float | None: if value is None: return None try: return float(value) except (TypeError, ValueError) as exc: raise InfospaceError( "invalid_routing_float", f"{name} must be numeric", {"source": source, "value": value}, ) from exc def _optional_quality_floor(value: Any, name: str, source: str) -> float | None: floor = _optional_float(value, name, source) if floor is None: return None if not 0 <= floor <= 1: raise InfospaceError( "invalid_routing_quality_floor", f"{name} must be between 0 and 1", {"source": source, "name": name, "value": floor}, ) return floor # --------------------------------------------------------------------------- # T02 — build live policies and adapters from a parsed config # --------------------------------------------------------------------------- def build_routing_policy_from_config( config: RoutingConfig, *, workspace: str | Path | None = None, env: Mapping[str, str] | None = None, adapter_factory: AdapterFactory | None = None, shadow_baseline_id: str | None = None, shadow_rate: float | None = None, ) -> Any: """Materialise a parsed config into a live llm-connect routing policy. Returns an ``AdaptiveRoutingPolicy`` when the config sets a ``default_quality_floor``, any per-task ``quality_floor``, or a ``ledger_path``; otherwise returns a static ``RoutingPolicy``. ``adapter_factory`` is an opt-in override that builds an ``LLMAdapter`` from a ``RoutingCandidateConfig`` + env mapping. Tests inject a factory to avoid hitting real provider constructors; the default factory resolves API keys from ``env`` and instantiates the matching llm-connect adapter. Fails fast (before any network call) when a candidate's required API key env var is missing from ``env``. When ``shadow_baseline_id`` is set, every non-baseline candidate is wrapped in an llm-connect ``ShadowingAdapter`` using the named baseline candidate plus a PairedGrader(ExactMatchJudge()) and the QualityLedger from ``config.ledger_path``. ``shadow_rate`` controls the sampling fraction (defaults to 0.1). The baseline candidate itself is never wrapped — that would shadow it against itself. """ from llm_connect.routing import AdaptiveRoutingPolicy, RoutingPolicy, RoutingRule environment: Mapping[str, str] = env if env is not None else os.environ factory: AdapterFactory = adapter_factory or _default_adapter_factory if shadow_rate is not None and shadow_baseline_id is None: raise InfospaceError( "shadow_rate_without_baseline", "shadow_rate requires shadow_baseline_id; pass --shadow-baseline with --shadow-rate", {"shadow_rate": shadow_rate}, ) use_adaptive = ( config.default_quality_floor is not None or any(task.quality_floor is not None for task in config.task_types) or config.ledger_path is not None or shadow_baseline_id is not None ) ledger = _resolve_ledger(config, workspace, required=shadow_baseline_id is not None) raw_adapters: dict[str, Any] = {} for task in config.task_types: for candidate in task.candidates: if candidate.id not in raw_adapters: raw_adapters[candidate.id] = factory(candidate, environment) baseline_adapter = None if shadow_baseline_id is not None: if shadow_baseline_id not in raw_adapters: raise InfospaceError( "missing_shadow_baseline", f"shadow_baseline_id {shadow_baseline_id!r} not declared as a candidate in the routing config", {"shadow_baseline_id": shadow_baseline_id}, ) baseline_adapter = raw_adapters[shadow_baseline_id] adapters_by_id: dict[str, Any] = {} if shadow_baseline_id is None: adapters_by_id = dict(raw_adapters) else: # Wrap each candidate (per task) in a ShadowingAdapter unless it *is* the baseline. from .routing import wrap_with_shadow_sampling from llm_connect.grading import ExactMatchJudge, PairedGrader assert ledger is not None # _resolve_ledger raised if required and missing grader = PairedGrader(judge=ExactMatchJudge()) effective_rate = shadow_rate if shadow_rate is not None else 0.1 for task in config.task_types: for candidate in task.candidates: key = (candidate.id, task.task_type) if candidate.id == shadow_baseline_id: adapters_by_id[candidate.id] = raw_adapters[candidate.id] continue # One ShadowingAdapter per (candidate, task_type) pair so the # task_type tagged on observations matches the rule it serves. shadow_id = f"shadow:{candidate.id}@{task.task_type}" adapters_by_id[shadow_id] = wrap_with_shadow_sampling( candidate=raw_adapters[candidate.id], baseline=baseline_adapter, grader=grader, ledger=ledger, task_type=task.task_type, adapter_id=candidate.id, baseline_adapter_id=shadow_baseline_id, shadow_rate=effective_rate, async_shadow=True, ) adapters_by_id[key] = adapters_by_id[shadow_id] # task-keyed reverse lookup rules: list[RoutingRule] = [] for task in config.task_types: candidates = [] for candidate in task.candidates: if shadow_baseline_id is not None and candidate.id != shadow_baseline_id: candidates.append(adapters_by_id[(candidate.id, task.task_type)]) else: candidates.append(adapters_by_id[candidate.id]) prefer = candidates[0] prefer_candidate = task.candidates[0] fallback = candidates[1] if len(candidates) > 1 else None rules.append( RoutingRule( task_type=task.task_type, prefer=prefer, max_cost_per_1k=prefer_candidate.max_cost_per_1k, fallback=fallback, ) ) if not use_adaptive: return RoutingPolicy(rules=rules) # Clean adapters_by_id for AdaptiveRoutingPolicy: keep stable string keys only. string_keyed = {key: value for key, value in adapters_by_id.items() if isinstance(key, str)} return AdaptiveRoutingPolicy( rules=rules, ledger=ledger, adapters_by_id=string_keyed, ) def _resolve_ledger( config: RoutingConfig, workspace: str | Path | None, *, required: bool ) -> Any: from llm_connect.quality import QualityLedger if not config.ledger_path: if required: raise InfospaceError( "missing_routing_ledger_for_shadow", "Shadow sampling requires a ledger_path in the routing config", {"config_ledger_path": config.ledger_path}, ) return None ledger_path = Path(config.ledger_path) if not ledger_path.is_absolute() and workspace is not None: ledger_path = Path(workspace) / ledger_path ledger_path.parent.mkdir(parents=True, exist_ok=True) return QualityLedger(path=ledger_path) def _default_adapter_factory( candidate: RoutingCandidateConfig, env: Mapping[str, str] ) -> Any: """Build a real llm-connect adapter for one config candidate. API keys are resolved from ``env`` before construction; a missing key raises ``missing_routing_api_key`` rather than letting the adapter blow up later mid-run. """ provider = candidate.provider if provider == "claude_code": from llm_connect.claude_code import ClaudeCodeAdapter return ClaudeCodeAdapter(model=candidate.model) env_var = candidate.api_key_env or DEFAULT_API_KEY_ENV.get(provider, "") api_key = env.get(env_var, "") if env_var else "" if not api_key: raise InfospaceError( "missing_routing_api_key", f"Candidate {candidate.id!r} ({provider}) needs API key from " f"env var {env_var!r}, but it is unset", { "candidate_id": candidate.id, "provider": provider, "api_key_env": env_var, }, ) if provider == "openrouter": from llm_connect.openrouter import OpenRouterAdapter return OpenRouterAdapter(model=candidate.model, api_key=api_key) if provider == "openai": from llm_connect.openai import OpenAIAdapter return OpenAIAdapter(model=candidate.model, api_key=api_key) if provider == "gemini": from llm_connect.gemini import GeminiAdapter return GeminiAdapter(model=candidate.model, api_key=api_key) # Should have been rejected by the parser; defensive guard for callers # that build a RoutingConfig programmatically without going through # parse_routing_config. raise InfospaceError( "unsupported_routing_provider", f"Cannot build adapter for unsupported provider {provider!r}", {"candidate_id": candidate.id, "provider": provider}, )