Files
inter-hub/Application/Helper/AgentBridge.hs
Bernd Worsch 674f5da0e1
Some checks failed
Test / test (push) Has been cancelled
feat: integrate llm-connect FR-1/FR-3/FR-4 into IHF bridge
FR-3 (async_execute_prompt): CollectiveProposals now invokes all agents
concurrently via callAgentsBatch → single bridge subprocess with
asyncio.gather. Latency scales with slowest agent, not sum.

FR-4 (BudgetTracker): AgentDelegations passes tokenBudget to bridge;
llm-connect enforces it natively via BudgetTracker in RunConfig.
BudgetExceededError is a first-class BridgeError variant with total/
consumed/requested fields surfaced to the operator.

FR-1 (LLMServer passthrough): bridge accepts optional serverUrl field;
if present, calls POST {serverUrl}/execute instead of spawning a new
adapter. Infrastructure ready for hot-agent pre-warming (no schema
change required).

AgentBridge.hs: adds callAgentsBatch, callAgentWithBudget,
BudgetExceededError constructor, bridgeErrorMessage helper, defaultRequest,
requestToJson. All controllers updated to use bridgeErrorMessage.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-01 22:48:29 +00:00

219 lines
8.3 KiB
Haskell

module Application.Helper.AgentBridge where
-- IHF Phase 11 — Advanced AI Federation (IHUB-WP-0012)
-- Haskell wrapper around scripts/llm_bridge.py (llm-connect subprocess bridge).
-- Updated to use FR-1 (server passthrough), FR-3 (async batch), FR-4 (BudgetTracker).
import IHP.Prelude
import IHP.ControllerPrelude
import Data.Aeson (object, (.=), encode, decode, Value, FromJSON(..), (.:), (.:?))
import qualified Data.Aeson as A
import qualified Data.ByteString.Lazy as LBS
import System.Process (readProcessWithExitCode)
import System.Exit (ExitCode(..))
import Generated.Types
-- ---------------------------------------------------------------------------
-- Request / response types
data BridgeRequest = BridgeRequest
{ provider :: !Text
, model :: !Text
, systemPrompt :: !(Maybe Text)
, prompt :: !Text
, maxTokens :: !Int
, temperature :: !Double
-- FR-4: optional BudgetTracker fields
, budgetTotal :: !(Maybe Int) -- cap for this call; Nothing = no budget enforcement
, budgetSpent :: !(Maybe Int) -- tokens already consumed in delegation chain
-- FR-1: optional running LLMServer URL; if set, HTTP POST instead of new adapter
, serverUrl :: !(Maybe Text)
}
defaultRequest :: BridgeRequest
defaultRequest = BridgeRequest
{ provider = "openrouter"
, model = ""
, systemPrompt = Nothing
, prompt = ""
, maxTokens = 2000
, temperature = 0.7
, budgetTotal = Nothing
, budgetSpent = Nothing
, serverUrl = Nothing
}
data BridgeResponse = BridgeResponse
{ content :: !Text
, modelUsed :: !Text
, tokensIn :: !Int
, tokensOut :: !Int
, finishReason :: !Text
, budgetSpent :: !Int -- cumulative tokens spent (0 when no tracker)
} deriving (Show)
data BridgeError
= BridgeError
{ errorMessage :: !Text
, errorType :: !Text
}
| BudgetExceededError
{ errorMessage :: !Text
, budgetTotal :: !Int
, budgetConsumed :: !Int
, budgetRequested :: !Int
}
deriving (Show)
-- BridgeError message for display
bridgeErrorMessage :: BridgeError -> Text
bridgeErrorMessage BridgeError { errorMessage } = errorMessage
bridgeErrorMessage BudgetExceededError { errorMessage } = errorMessage
instance FromJSON BridgeResponse where
parseJSON = A.withObject "BridgeResponse" \o -> BridgeResponse
<$> o .: "content"
<*> o .: "model"
<*> o .: "tokensIn"
<*> o .: "tokensOut"
<*> o .: "finishReason"
<*> (o .:? "budgetSpent" >>= pure . fromMaybe 0)
instance FromJSON BridgeError where
parseJSON = A.withObject "BridgeError" \o -> do
errType <- o .: "errorType"
if errType == ("LLMBudgetExceededError" :: Text)
then BudgetExceededError
<$> o .: "error"
<*> (o .:? "budgetTotal" >>= pure . fromMaybe 0)
<*> (o .:? "budgetSpent" >>= pure . fromMaybe 0)
<*> (o .:? "budgetRequested" >>= pure . fromMaybe 0)
else BridgeError
<$> o .: "error"
<*> pure errType
-- ---------------------------------------------------------------------------
-- JSON serialisation of a BridgeRequest
requestToJson :: BridgeRequest -> Value
requestToJson req = A.object $
[ "provider" .= req.provider
, "model" .= req.model
, "systemPrompt" .= req.systemPrompt
, "prompt" .= req.prompt
, "maxTokens" .= req.maxTokens
, "temperature" .= req.temperature
] <>
[ "budgetTotal" .= t | Just t <- [req.budgetTotal] ] <>
[ "budgetSpent" .= s | Just s <- [req.budgetSpent] ] <>
[ "serverUrl" .= u | Just u <- [req.serverUrl] ]
-- ---------------------------------------------------------------------------
-- Core bridge call — single request
-- | Invoke the llm_bridge.py subprocess with the given request.
callBridge :: BridgeRequest -> IO (Either BridgeError BridgeResponse)
callBridge req = do
let payload = LBS.toStrict . A.encode $ requestToJson req
(exitCode, stdout, stderr) <-
readProcessWithExitCode "python3" ["scripts/llm_bridge.py"] (cs payload)
let outBytes = LBS.fromStrict (cs stdout)
case exitCode of
ExitSuccess ->
case A.decode outBytes of
Just v -> pure (Right v)
Nothing -> pure (Left (BridgeError "Unparseable bridge output" "ParseError"))
ExitFailure _ ->
case A.decode outBytes of
Just v -> pure (Left v)
Nothing -> pure (Left (BridgeError (cs stderr) "BridgeError"))
-- | Call the bridge using an AgentRegistration record.
callAgent :: AgentRegistration -> Text -> IO (Either BridgeError BridgeResponse)
callAgent agent userPrompt =
callBridge defaultRequest
{ provider = agent.provider
, model = agent.modelName
, systemPrompt = agent.systemPrompt
, prompt = userPrompt
}
-- | Call the bridge with an explicit token budget (FR-4).
-- Used by AgentDelegations to enforce the configured tokenBudget at the bridge level.
callAgentWithBudget :: AgentRegistration -> Text -> Int -> Int -> IO (Either BridgeError BridgeResponse)
callAgentWithBudget agent userPrompt budgetCap alreadySpent =
callBridge defaultRequest
{ provider = agent.provider
, model = agent.modelName
, systemPrompt = agent.systemPrompt
, prompt = userPrompt
, maxTokens = budgetCap
, budgetTotal = Just budgetCap
, budgetSpent = if alreadySpent > 0 then Just alreadySpent else Nothing
}
-- ---------------------------------------------------------------------------
-- Batch bridge call — parallel execution via FR-3 async (single subprocess)
-- | Invoke all requests concurrently in a single bridge subprocess using
-- asyncio.gather. Returns one result per input in the same order.
-- This replaces sequential forM in CollectiveProposals.
callBridgeBatch :: [BridgeRequest] -> IO [Either BridgeError BridgeResponse]
callBridgeBatch [] = pure []
callBridgeBatch reqs = do
let payload = LBS.toStrict . A.encode $
A.object ["batch" .= map requestToJson reqs]
(exitCode, stdout, _stderr) <-
readProcessWithExitCode "python3" ["scripts/llm_bridge.py"] (cs payload)
let outBytes = LBS.fromStrict (cs stdout)
case A.decode @A.Value outBytes of
Just (A.Object o) | Just (A.Array arr) <- A.lookup "results" o ->
pure $ map parseResult (toList arr)
_ ->
pure $ replicate (length reqs) (Left (BridgeError "Unparseable batch output" "ParseError"))
where
parseResult v = case A.fromJSON v of
A.Success resp -> Right resp
A.Error _ -> case A.fromJSON v of
A.Success err -> Left err
A.Error _ -> Left (BridgeError "Unparseable batch item" "ParseError")
-- | Batch variant using AgentRegistration records.
callAgentsBatch :: [(AgentRegistration, Text)] -> IO [Either BridgeError BridgeResponse]
callAgentsBatch pairs =
callBridgeBatch
[ defaultRequest
{ provider = agent.provider
, model = agent.modelName
, systemPrompt = agent.systemPrompt
, prompt = userPrompt
}
| (agent, userPrompt) <- pairs
]
-- ---------------------------------------------------------------------------
-- AI governance policy check
-- | Returns True if the agent is allowed to perform the 'propose' action on
-- the given artifact_type in this hub.
-- When no policy exists the default is permissive (True).
checkGovernancePolicy ::
(?modelContext :: ModelContext) =>
Id Hub -> Id AgentRegistration -> Text -> IO Bool
checkGovernancePolicy hubId agentId artifactType = do
mPolicy <- query @AiGovernancePolicy
|> filterWhere (#hubId, hubId)
|> filterWhere (#agentRegistrationId, agentId)
|> filterWhere (#artifactType, artifactType)
|> filterWhere (#isActive, True)
|> fetchOneOrNothing
case mPolicy of
Nothing -> pure True
Just p -> pure ("propose" `elem` jsonArrayTexts p.allowedActions)
-- | Extract Text values from a JSONB array.
jsonArrayTexts :: Value -> [Text]
jsonArrayTexts (A.Array vs) =
[ t | A.String t <- toList vs ]
jsonArrayTexts _ = []