#!/usr/bin/env python3 """ IHF llm-connect bridge — Phase 11 AI Federation (IHUB-WP-0012) Updated to use llm-connect FR-1 (server passthrough), FR-3 (async batch), FR-4 (BudgetTracker). SINGLE REQUEST — stdin JSON fields: provider — openrouter | gemini | openai | claude-code (default: openrouter) model — model name string (provider-specific) prompt — the user prompt systemPrompt — optional system prompt api_key — optional; falls back to llm-connect env-var resolution maxTokens — max completion tokens (default: 2000) temperature — sampling temperature (default: 0.7) budgetTotal — optional int; if set, a BudgetTracker is created with this cap budgetSpent — optional int; tokens already consumed (for delegation chains) serverUrl — optional str; if set, POST to {serverUrl}/execute instead of spawning a local adapter (FR-1 server passthrough) BATCH REQUEST — stdin JSON with "batch" key: batch — list of single-request objects (see above) All top-level fields (except batch) are ignored. Output JSON — single request (stdout, exit 0 on success): content — generated text model — model name actually used tokensIn — prompt token count tokensOut — completion token count finishReason — stop reason string budgetSpent — cumulative tokens consumed from BudgetTracker after this call Output JSON — batch request (stdout, exit 0 even on partial failure): results — list of {content, model, tokensIn, tokensOut, finishReason} OR {error, errorType} per item Error JSON (stdout, exit 1 on LLMError for single request): error — error message errorType — exception class name budgetTotal — present only for LLMBudgetExceededError budgetSpent — present only for LLMBudgetExceededError budgetRequested — present only for LLMBudgetExceededError """ import sys import json import os import asyncio from typing import Any sys.path.insert(0, os.path.expanduser("~/llm-connect")) from llm_connect import create_adapter, RunConfig, BudgetTracker from llm_connect.exceptions import LLMError, LLMBudgetExceededError # --------------------------------------------------------------------------- # Adapter / server call def _make_config(req: dict, tracker: BudgetTracker | None) -> RunConfig: return RunConfig( model_name=req.get("model", ""), temperature=req.get("temperature", 0.7), max_tokens=req.get("maxTokens", 2000), budget_tracker=tracker, ) def _call_server(server_url: str, req: dict, config: RunConfig) -> dict: """Call a running LLMServer via HTTP POST /execute (FR-1).""" import urllib.request payload = json.dumps({ "prompt": req["prompt"], "config": config.to_dict(), }).encode() http_req = urllib.request.Request( f"{server_url.rstrip('/')}/execute", data=payload, headers={"Content-Type": "application/json"}, method="POST", ) with urllib.request.urlopen(http_req, timeout=config.timeout_seconds) as resp: return json.loads(resp.read()) def _execute_single(req: dict) -> dict: """Execute one request dict, return a result or error dict.""" # Build optional BudgetTracker (FR-4) tracker: BudgetTracker | None = None if "budgetTotal" in req: tracker = BudgetTracker(total=int(req["budgetTotal"])) already_spent = int(req.get("budgetSpent", 0)) if already_spent > 0: tracker.consume(already_spent) config = _make_config(req, tracker) try: server_url = req.get("serverUrl") if server_url: # FR-1: delegate to running LLMServer raw = _call_server(server_url, req, config) spent = tracker.spent if tracker else 0 return { "content": raw.get("content", ""), "model": raw.get("model", ""), "tokensIn": raw.get("usage", {}).get("prompt_tokens", 0), "tokensOut": raw.get("usage", {}).get("completion_tokens", 0), "finishReason": raw.get("finish_reason", "stop"), "budgetSpent": spent, } adapter = create_adapter( provider=req.get("provider", "openrouter"), model=req.get("model"), api_key=req.get("api_key"), system_prompt=req.get("systemPrompt"), ) resp = adapter.execute_prompt(req["prompt"], config) spent = tracker.spent if tracker else 0 return { "content": resp.content, "model": resp.model, "tokensIn": resp.usage.get("prompt_tokens", 0), "tokensOut": resp.usage.get("completion_tokens", 0), "finishReason": resp.finish_reason, "budgetSpent": spent, } except LLMBudgetExceededError as e: return { "error": str(e), "errorType": "LLMBudgetExceededError", "budgetTotal": e.total, "budgetSpent": e.spent, "budgetRequested": e.requested, } except LLMError as e: return {"error": str(e), "errorType": type(e).__name__} # --------------------------------------------------------------------------- # Async batch execution (FR-3) async def _execute_all_async(requests: list[dict]) -> list[dict]: """Run all requests concurrently via async_execute_prompt (FR-3).""" async def _one(req: dict) -> dict: tracker: BudgetTracker | None = None if "budgetTotal" in req: tracker = BudgetTracker(total=int(req["budgetTotal"])) already_spent = int(req.get("budgetSpent", 0)) if already_spent > 0: tracker.consume(already_spent) config = _make_config(req, tracker) try: server_url = req.get("serverUrl") if server_url: # Server calls are already non-blocking HTTP; run in executor loop = asyncio.get_running_loop() raw = await loop.run_in_executor( None, lambda: _call_server(server_url, req, config) ) spent = tracker.spent if tracker else 0 return { "content": raw.get("content", ""), "model": raw.get("model", ""), "tokensIn": raw.get("usage", {}).get("prompt_tokens", 0), "tokensOut": raw.get("usage", {}).get("completion_tokens", 0), "finishReason": raw.get("finish_reason", "stop"), "budgetSpent": spent, } adapter = create_adapter( provider=req.get("provider", "openrouter"), model=req.get("model"), api_key=req.get("api_key"), system_prompt=req.get("systemPrompt"), ) resp = await adapter.async_execute_prompt(req["prompt"], config) spent = tracker.spent if tracker else 0 return { "content": resp.content, "model": resp.model, "tokensIn": resp.usage.get("prompt_tokens", 0), "tokensOut": resp.usage.get("completion_tokens", 0), "finishReason": resp.finish_reason, "budgetSpent": spent, } except LLMBudgetExceededError as e: return { "error": str(e), "errorType": "LLMBudgetExceededError", "budgetTotal": e.total, "budgetSpent": e.spent, "budgetRequested": e.requested, } except LLMError as e: return {"error": str(e), "errorType": type(e).__name__} return await asyncio.gather(*[_one(r) for r in requests]) # --------------------------------------------------------------------------- # Entry point def main() -> None: req = json.load(sys.stdin) if "batch" in req: # Batch mode: run all requests concurrently (FR-3) results = asyncio.run(_execute_all_async(req["batch"])) print(json.dumps({"results": results})) return # Single request result = _execute_single(req) if "error" in result: json.dump(result, sys.stdout) sys.exit(1) print(json.dumps(result)) if __name__ == "__main__": main()