generated from coulomb/repo-seed
Some checks failed
Test / test (push) Has been cancelled
FR-3 (async_execute_prompt): CollectiveProposals now invokes all agents
concurrently via callAgentsBatch → single bridge subprocess with
asyncio.gather. Latency scales with slowest agent, not sum.
FR-4 (BudgetTracker): AgentDelegations passes tokenBudget to bridge;
llm-connect enforces it natively via BudgetTracker in RunConfig.
BudgetExceededError is a first-class BridgeError variant with total/
consumed/requested fields surfaced to the operator.
FR-1 (LLMServer passthrough): bridge accepts optional serverUrl field;
if present, calls POST {serverUrl}/execute instead of spawning a new
adapter. Infrastructure ready for hot-agent pre-warming (no schema
change required).
AgentBridge.hs: adds callAgentsBatch, callAgentWithBudget,
BudgetExceededError constructor, bridgeErrorMessage helper, defaultRequest,
requestToJson. All controllers updated to use bridgeErrorMessage.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
224 lines
8.5 KiB
Python
Executable File
224 lines
8.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
IHF llm-connect bridge — Phase 11 AI Federation (IHUB-WP-0012)
|
|
Updated to use llm-connect FR-1 (server passthrough), FR-3 (async batch),
|
|
FR-4 (BudgetTracker).
|
|
|
|
SINGLE REQUEST — stdin JSON fields:
|
|
provider — openrouter | gemini | openai | claude-code (default: openrouter)
|
|
model — model name string (provider-specific)
|
|
prompt — the user prompt
|
|
systemPrompt — optional system prompt
|
|
api_key — optional; falls back to llm-connect env-var resolution
|
|
maxTokens — max completion tokens (default: 2000)
|
|
temperature — sampling temperature (default: 0.7)
|
|
budgetTotal — optional int; if set, a BudgetTracker is created with this cap
|
|
budgetSpent — optional int; tokens already consumed (for delegation chains)
|
|
serverUrl — optional str; if set, POST to {serverUrl}/execute instead of
|
|
spawning a local adapter (FR-1 server passthrough)
|
|
|
|
BATCH REQUEST — stdin JSON with "batch" key:
|
|
batch — list of single-request objects (see above)
|
|
All top-level fields (except batch) are ignored.
|
|
|
|
Output JSON — single request (stdout, exit 0 on success):
|
|
content — generated text
|
|
model — model name actually used
|
|
tokensIn — prompt token count
|
|
tokensOut — completion token count
|
|
finishReason — stop reason string
|
|
budgetSpent — cumulative tokens consumed from BudgetTracker after this call
|
|
|
|
Output JSON — batch request (stdout, exit 0 even on partial failure):
|
|
results — list of {content, model, tokensIn, tokensOut, finishReason}
|
|
OR {error, errorType} per item
|
|
|
|
Error JSON (stdout, exit 1 on LLMError for single request):
|
|
error — error message
|
|
errorType — exception class name
|
|
budgetTotal — present only for LLMBudgetExceededError
|
|
budgetSpent — present only for LLMBudgetExceededError
|
|
budgetRequested — present only for LLMBudgetExceededError
|
|
"""
|
|
import sys
|
|
import json
|
|
import os
|
|
import asyncio
|
|
from typing import Any
|
|
|
|
sys.path.insert(0, os.path.expanduser("~/llm-connect"))
|
|
|
|
from llm_connect import create_adapter, RunConfig, BudgetTracker
|
|
from llm_connect.exceptions import LLMError, LLMBudgetExceededError
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Adapter / server call
|
|
|
|
def _make_config(req: dict, tracker: BudgetTracker | None) -> RunConfig:
|
|
return RunConfig(
|
|
model_name=req.get("model", ""),
|
|
temperature=req.get("temperature", 0.7),
|
|
max_tokens=req.get("maxTokens", 2000),
|
|
budget_tracker=tracker,
|
|
)
|
|
|
|
|
|
def _call_server(server_url: str, req: dict, config: RunConfig) -> dict:
|
|
"""Call a running LLMServer via HTTP POST /execute (FR-1)."""
|
|
import urllib.request
|
|
payload = json.dumps({
|
|
"prompt": req["prompt"],
|
|
"config": config.to_dict(),
|
|
}).encode()
|
|
http_req = urllib.request.Request(
|
|
f"{server_url.rstrip('/')}/execute",
|
|
data=payload,
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST",
|
|
)
|
|
with urllib.request.urlopen(http_req, timeout=config.timeout_seconds) as resp:
|
|
return json.loads(resp.read())
|
|
|
|
|
|
def _execute_single(req: dict) -> dict:
|
|
"""Execute one request dict, return a result or error dict."""
|
|
# Build optional BudgetTracker (FR-4)
|
|
tracker: BudgetTracker | None = None
|
|
if "budgetTotal" in req:
|
|
tracker = BudgetTracker(total=int(req["budgetTotal"]))
|
|
already_spent = int(req.get("budgetSpent", 0))
|
|
if already_spent > 0:
|
|
tracker.consume(already_spent)
|
|
|
|
config = _make_config(req, tracker)
|
|
|
|
try:
|
|
server_url = req.get("serverUrl")
|
|
if server_url:
|
|
# FR-1: delegate to running LLMServer
|
|
raw = _call_server(server_url, req, config)
|
|
spent = tracker.spent if tracker else 0
|
|
return {
|
|
"content": raw.get("content", ""),
|
|
"model": raw.get("model", ""),
|
|
"tokensIn": raw.get("usage", {}).get("prompt_tokens", 0),
|
|
"tokensOut": raw.get("usage", {}).get("completion_tokens", 0),
|
|
"finishReason": raw.get("finish_reason", "stop"),
|
|
"budgetSpent": spent,
|
|
}
|
|
|
|
adapter = create_adapter(
|
|
provider=req.get("provider", "openrouter"),
|
|
model=req.get("model"),
|
|
api_key=req.get("api_key"),
|
|
system_prompt=req.get("systemPrompt"),
|
|
)
|
|
resp = adapter.execute_prompt(req["prompt"], config)
|
|
spent = tracker.spent if tracker else 0
|
|
return {
|
|
"content": resp.content,
|
|
"model": resp.model,
|
|
"tokensIn": resp.usage.get("prompt_tokens", 0),
|
|
"tokensOut": resp.usage.get("completion_tokens", 0),
|
|
"finishReason": resp.finish_reason,
|
|
"budgetSpent": spent,
|
|
}
|
|
except LLMBudgetExceededError as e:
|
|
return {
|
|
"error": str(e),
|
|
"errorType": "LLMBudgetExceededError",
|
|
"budgetTotal": e.total,
|
|
"budgetSpent": e.spent,
|
|
"budgetRequested": e.requested,
|
|
}
|
|
except LLMError as e:
|
|
return {"error": str(e), "errorType": type(e).__name__}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Async batch execution (FR-3)
|
|
|
|
async def _execute_all_async(requests: list[dict]) -> list[dict]:
|
|
"""Run all requests concurrently via async_execute_prompt (FR-3)."""
|
|
async def _one(req: dict) -> dict:
|
|
tracker: BudgetTracker | None = None
|
|
if "budgetTotal" in req:
|
|
tracker = BudgetTracker(total=int(req["budgetTotal"]))
|
|
already_spent = int(req.get("budgetSpent", 0))
|
|
if already_spent > 0:
|
|
tracker.consume(already_spent)
|
|
|
|
config = _make_config(req, tracker)
|
|
|
|
try:
|
|
server_url = req.get("serverUrl")
|
|
if server_url:
|
|
# Server calls are already non-blocking HTTP; run in executor
|
|
loop = asyncio.get_running_loop()
|
|
raw = await loop.run_in_executor(
|
|
None, lambda: _call_server(server_url, req, config)
|
|
)
|
|
spent = tracker.spent if tracker else 0
|
|
return {
|
|
"content": raw.get("content", ""),
|
|
"model": raw.get("model", ""),
|
|
"tokensIn": raw.get("usage", {}).get("prompt_tokens", 0),
|
|
"tokensOut": raw.get("usage", {}).get("completion_tokens", 0),
|
|
"finishReason": raw.get("finish_reason", "stop"),
|
|
"budgetSpent": spent,
|
|
}
|
|
|
|
adapter = create_adapter(
|
|
provider=req.get("provider", "openrouter"),
|
|
model=req.get("model"),
|
|
api_key=req.get("api_key"),
|
|
system_prompt=req.get("systemPrompt"),
|
|
)
|
|
resp = await adapter.async_execute_prompt(req["prompt"], config)
|
|
spent = tracker.spent if tracker else 0
|
|
return {
|
|
"content": resp.content,
|
|
"model": resp.model,
|
|
"tokensIn": resp.usage.get("prompt_tokens", 0),
|
|
"tokensOut": resp.usage.get("completion_tokens", 0),
|
|
"finishReason": resp.finish_reason,
|
|
"budgetSpent": spent,
|
|
}
|
|
except LLMBudgetExceededError as e:
|
|
return {
|
|
"error": str(e),
|
|
"errorType": "LLMBudgetExceededError",
|
|
"budgetTotal": e.total,
|
|
"budgetSpent": e.spent,
|
|
"budgetRequested": e.requested,
|
|
}
|
|
except LLMError as e:
|
|
return {"error": str(e), "errorType": type(e).__name__}
|
|
|
|
return await asyncio.gather(*[_one(r) for r in requests])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entry point
|
|
|
|
def main() -> None:
|
|
req = json.load(sys.stdin)
|
|
|
|
if "batch" in req:
|
|
# Batch mode: run all requests concurrently (FR-3)
|
|
results = asyncio.run(_execute_all_async(req["batch"]))
|
|
print(json.dumps({"results": results}))
|
|
return
|
|
|
|
# Single request
|
|
result = _execute_single(req)
|
|
if "error" in result:
|
|
json.dump(result, sys.stdout)
|
|
sys.exit(1)
|
|
print(json.dumps(result))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|