feat: integrate llm-connect FR-1/FR-3/FR-4 into IHF bridge
Some checks failed
Test / test (push) Has been cancelled

FR-3 (async_execute_prompt): CollectiveProposals now invokes all agents
concurrently via callAgentsBatch → single bridge subprocess with
asyncio.gather. Latency scales with slowest agent, not sum.

FR-4 (BudgetTracker): AgentDelegations passes tokenBudget to bridge;
llm-connect enforces it natively via BudgetTracker in RunConfig.
BudgetExceededError is a first-class BridgeError variant with total/
consumed/requested fields surfaced to the operator.

FR-1 (LLMServer passthrough): bridge accepts optional serverUrl field;
if present, calls POST {serverUrl}/execute instead of spawning a new
adapter. Infrastructure ready for hot-agent pre-warming (no schema
change required).

AgentBridge.hs: adds callAgentsBatch, callAgentWithBudget,
BudgetExceededError constructor, bridgeErrorMessage helper, defaultRequest,
requestToJson. All controllers updated to use bridgeErrorMessage.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-01 22:48:29 +00:00
parent a400365d50
commit 674f5da0e1
7 changed files with 350 additions and 75 deletions

View File

@@ -1,11 +1,10 @@
#!/usr/bin/env python3
"""
IHF llm-connect bridge — Phase 11 AI Federation (IHUB-WP-0012)
Updated to use llm-connect FR-1 (server passthrough), FR-3 (async batch),
FR-4 (BudgetTracker).
Usage:
echo '{"provider":"openrouter","model":"...","prompt":"..."}' | python3 scripts/llm_bridge.py
Input JSON fields:
SINGLE REQUEST — stdin JSON fields:
provider — openrouter | gemini | openai | claude-code (default: openrouter)
model — model name string (provider-specific)
prompt — the user prompt
@@ -13,54 +12,211 @@ Input JSON fields:
api_key — optional; falls back to llm-connect env-var resolution
maxTokens — max completion tokens (default: 2000)
temperature — sampling temperature (default: 0.7)
budgetTotal — optional int; if set, a BudgetTracker is created with this cap
budgetSpent — optional int; tokens already consumed (for delegation chains)
serverUrl — optional str; if set, POST to {serverUrl}/execute instead of
spawning a local adapter (FR-1 server passthrough)
Output JSON (stdout, exit 0 on success):
BATCH REQUEST — stdin JSON with "batch" key:
batch — list of single-request objects (see above)
All top-level fields (except batch) are ignored.
Output JSON — single request (stdout, exit 0 on success):
content — generated text
model — model name actually used
tokensIn — prompt token count
tokensOut — completion token count
finishReason — stop reason string
budgetSpent — cumulative tokens consumed from BudgetTracker after this call
Error JSON (stdout, exit 1 on LLMError):
Output JSON — batch request (stdout, exit 0 even on partial failure):
results — list of {content, model, tokensIn, tokensOut, finishReason}
OR {error, errorType} per item
Error JSON (stdout, exit 1 on LLMError for single request):
error — error message
errorType — exception class name
budgetTotal — present only for LLMBudgetExceededError
budgetSpent — present only for LLMBudgetExceededError
budgetRequested — present only for LLMBudgetExceededError
"""
import sys
import json
import os
import asyncio
from typing import Any
sys.path.insert(0, os.path.expanduser("~/llm-connect"))
from llm_connect import create_adapter, RunConfig
from llm_connect.exceptions import LLMError
from llm_connect import create_adapter, RunConfig, BudgetTracker
from llm_connect.exceptions import LLMError, LLMBudgetExceededError
def main() -> None:
req = json.load(sys.stdin)
# ---------------------------------------------------------------------------
# Adapter / server call
def _make_config(req: dict, tracker: BudgetTracker | None) -> RunConfig:
return RunConfig(
model_name=req.get("model", ""),
temperature=req.get("temperature", 0.7),
max_tokens=req.get("maxTokens", 2000),
budget_tracker=tracker,
)
def _call_server(server_url: str, req: dict, config: RunConfig) -> dict:
"""Call a running LLMServer via HTTP POST /execute (FR-1)."""
import urllib.request
payload = json.dumps({
"prompt": req["prompt"],
"config": config.to_dict(),
}).encode()
http_req = urllib.request.Request(
f"{server_url.rstrip('/')}/execute",
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(http_req, timeout=config.timeout_seconds) as resp:
return json.loads(resp.read())
def _execute_single(req: dict) -> dict:
"""Execute one request dict, return a result or error dict."""
# Build optional BudgetTracker (FR-4)
tracker: BudgetTracker | None = None
if "budgetTotal" in req:
tracker = BudgetTracker(total=int(req["budgetTotal"]))
already_spent = int(req.get("budgetSpent", 0))
if already_spent > 0:
tracker.consume(already_spent)
config = _make_config(req, tracker)
try:
server_url = req.get("serverUrl")
if server_url:
# FR-1: delegate to running LLMServer
raw = _call_server(server_url, req, config)
spent = tracker.spent if tracker else 0
return {
"content": raw.get("content", ""),
"model": raw.get("model", ""),
"tokensIn": raw.get("usage", {}).get("prompt_tokens", 0),
"tokensOut": raw.get("usage", {}).get("completion_tokens", 0),
"finishReason": raw.get("finish_reason", "stop"),
"budgetSpent": spent,
}
adapter = create_adapter(
provider=req.get("provider", "openrouter"),
model=req.get("model"),
api_key=req.get("api_key"),
system_prompt=req.get("systemPrompt"),
)
config = RunConfig(
model_name=req.get("model", ""),
temperature=req.get("temperature", 0.7),
max_tokens=req.get("maxTokens", 2000),
)
resp = adapter.execute_prompt(req["prompt"], config)
print(json.dumps({
"content": resp.content,
"model": resp.model,
"tokensIn": resp.usage.get("prompt_tokens", 0),
"tokensOut": resp.usage.get("completion_tokens", 0),
spent = tracker.spent if tracker else 0
return {
"content": resp.content,
"model": resp.model,
"tokensIn": resp.usage.get("prompt_tokens", 0),
"tokensOut": resp.usage.get("completion_tokens", 0),
"finishReason": resp.finish_reason,
}))
"budgetSpent": spent,
}
except LLMBudgetExceededError as e:
return {
"error": str(e),
"errorType": "LLMBudgetExceededError",
"budgetTotal": e.total,
"budgetSpent": e.spent,
"budgetRequested": e.requested,
}
except LLMError as e:
json.dump({"error": str(e), "errorType": type(e).__name__}, sys.stdout)
return {"error": str(e), "errorType": type(e).__name__}
# ---------------------------------------------------------------------------
# Async batch execution (FR-3)
async def _execute_all_async(requests: list[dict]) -> list[dict]:
"""Run all requests concurrently via async_execute_prompt (FR-3)."""
async def _one(req: dict) -> dict:
tracker: BudgetTracker | None = None
if "budgetTotal" in req:
tracker = BudgetTracker(total=int(req["budgetTotal"]))
already_spent = int(req.get("budgetSpent", 0))
if already_spent > 0:
tracker.consume(already_spent)
config = _make_config(req, tracker)
try:
server_url = req.get("serverUrl")
if server_url:
# Server calls are already non-blocking HTTP; run in executor
loop = asyncio.get_running_loop()
raw = await loop.run_in_executor(
None, lambda: _call_server(server_url, req, config)
)
spent = tracker.spent if tracker else 0
return {
"content": raw.get("content", ""),
"model": raw.get("model", ""),
"tokensIn": raw.get("usage", {}).get("prompt_tokens", 0),
"tokensOut": raw.get("usage", {}).get("completion_tokens", 0),
"finishReason": raw.get("finish_reason", "stop"),
"budgetSpent": spent,
}
adapter = create_adapter(
provider=req.get("provider", "openrouter"),
model=req.get("model"),
api_key=req.get("api_key"),
system_prompt=req.get("systemPrompt"),
)
resp = await adapter.async_execute_prompt(req["prompt"], config)
spent = tracker.spent if tracker else 0
return {
"content": resp.content,
"model": resp.model,
"tokensIn": resp.usage.get("prompt_tokens", 0),
"tokensOut": resp.usage.get("completion_tokens", 0),
"finishReason": resp.finish_reason,
"budgetSpent": spent,
}
except LLMBudgetExceededError as e:
return {
"error": str(e),
"errorType": "LLMBudgetExceededError",
"budgetTotal": e.total,
"budgetSpent": e.spent,
"budgetRequested": e.requested,
}
except LLMError as e:
return {"error": str(e), "errorType": type(e).__name__}
return await asyncio.gather(*[_one(r) for r in requests])
# ---------------------------------------------------------------------------
# Entry point
def main() -> None:
req = json.load(sys.stdin)
if "batch" in req:
# Batch mode: run all requests concurrently (FR-3)
results = asyncio.run(_execute_all_async(req["batch"]))
print(json.dumps({"results": results}))
return
# Single request
result = _execute_single(req)
if "error" in result:
json.dump(result, sys.stdout)
sys.exit(1)
print(json.dumps(result))
if __name__ == "__main__":