Files
llm-connect/tests/test_server.py
tegwick 14ba47c129
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
Add activity-core LLM endpoint support
2026-06-07 19:24:45 +02:00

413 lines
14 KiB
Python

"""
Tests for LLMServer HTTP serve mode (FR-1).
"""
import threading
import time
from concurrent.futures import ThreadPoolExecutor
import json
import urllib.error
import urllib.request
import pytest
from llm_connect._diagnostics import (
record_adapter_transformation,
record_provider_request,
record_provider_response,
)
from llm_connect.adapter import MockLLMAdapter, ErrorLLMAdapter
from llm_connect.exceptions import LLMAPIError, LLMConfigurationError, LLMTimeoutError
from llm_connect.models import LLMResponse, RunConfig
from llm_connect.profiles import CUSTODIAN_TRIAGE_BALANCED, ProfiledLLMAdapter, RuntimeProfile
from llm_connect.server import LLMServer
@pytest.fixture()
def server():
"""Start a server on a free port; stop after each test."""
s = LLMServer(adapter=MockLLMAdapter(mock_response="hello world"), port=0)
s.start()
yield s
s.stop()
def _get(url: str) -> tuple[int, dict]:
try:
with urllib.request.urlopen(url) as resp:
return resp.status, json.loads(resp.read())
except urllib.error.HTTPError as exc:
return exc.code, json.loads(exc.read())
def _post(url: str, body: dict) -> tuple[int, dict]:
payload = json.dumps(body).encode()
req = urllib.request.Request(
url,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req) as resp:
return resp.status, json.loads(resp.read())
except urllib.error.HTTPError as exc:
return exc.code, json.loads(exc.read())
class DiagnosticLLMAdapter(MockLLMAdapter):
def execute_prompt(self, prompt: str, config: RunConfig) -> LLMResponse:
record_provider_request(
url="https://provider.example/v1/chat",
payload={"prompt": prompt, "model": config.model_name},
headers={"Authorization": "Bearer secret-token"},
)
response = super().execute_prompt(prompt, config)
response.metadata["provider"] = "diagnostic"
response.metadata["response_id"] = "diag-response"
record_provider_response(status=200, body={"id": "diag-response", "content": response.content})
record_adapter_transformation(
"diagnostic_transform",
{"before": prompt},
{"after": response.content},
)
return response
class BarrierLLMAdapter(MockLLMAdapter):
def __init__(self):
super().__init__(mock_response="parallel")
self._barrier = threading.Barrier(2)
def execute_prompt(self, prompt: str, config: RunConfig) -> LLMResponse:
self._barrier.wait(timeout=2.0)
return super().execute_prompt(prompt, config)
class TestHealth:
def test_health_returns_200(self, server):
status, body = _get(f"http://127.0.0.1:{server.port}/health")
assert status == 200
assert body["status"] == "ok"
def test_unknown_get_returns_404(self, server):
status, body = _get(f"http://127.0.0.1:{server.port}/nope")
assert status == 404
class TestExecute:
def test_post_execute_round_trip(self, server):
status, body = _post(
f"http://127.0.0.1:{server.port}/execute",
{"prompt": "say hello"},
)
assert status == 200
assert body["content"] == "hello world"
assert body["finish_reason"] == "stop"
assert "debug" not in body
def test_response_includes_usage(self, server):
status, body = _post(
f"http://127.0.0.1:{server.port}/execute",
{"prompt": "count tokens"},
)
assert status == 200
assert "usage" in body
assert body["usage"]["total_tokens"] > 0
def test_missing_prompt_returns_400(self, server):
status, body = _post(
f"http://127.0.0.1:{server.port}/execute",
{"config": {}},
)
assert status == 400
assert "prompt" in body["error"]
def test_invalid_json_returns_400(self, server):
req = urllib.request.Request(
f"http://127.0.0.1:{server.port}/execute",
data=b"not json",
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req) as resp:
status, body = resp.status, json.loads(resp.read())
except urllib.error.HTTPError as exc:
status, body = exc.code, json.loads(exc.read())
assert status == 400
def test_unknown_post_path_returns_404(self, server):
status, body = _post(
f"http://127.0.0.1:{server.port}/wrong",
{"prompt": "hi"},
)
assert status == 404
def test_adapter_error_returns_500(self):
s = LLMServer(adapter=ErrorLLMAdapter("boom"), port=0)
s.start()
try:
status, body = _post(
f"http://127.0.0.1:{s.port}/execute",
{"prompt": "hello"},
)
assert status == 500
assert body["error"] == "internal_error"
assert "boom" in body["message"]
finally:
s.stop()
def test_config_fields_forwarded(self):
"""Config fields in request body reach the adapter via RunConfig."""
adapter = MockLLMAdapter(mock_response="x")
s = LLMServer(adapter=adapter, port=0)
s.start()
try:
status, body = _post(
f"http://127.0.0.1:{s.port}/execute",
{
"prompt": "hi",
"config": {
"model_name": "gpt-3.5-turbo",
"max_tokens": 100,
"max_depth": 2,
"model_params": {"reasoning_effort": "medium"},
},
},
)
assert status == 200
assert adapter.last_config.model_name == "gpt-3.5-turbo"
assert adapter.last_config.max_tokens == 100
assert adapter.last_config.max_depth == 2
assert adapter.last_config.model_params == {"reasoning_effort": "medium"}
finally:
s.stop()
def test_config_must_be_object(self, server):
status, body = _post(
f"http://127.0.0.1:{server.port}/execute",
{"prompt": "hi", "config": "not an object"},
)
assert status == 400
assert "config" in body["error"]
def test_profile_execute_resolves_model_and_metadata(self):
created: list[MockLLMAdapter] = []
def factory(provider: str, model: str) -> MockLLMAdapter:
created.append(MockLLMAdapter(mock_response="profile response"))
return created[-1]
adapter = ProfiledLLMAdapter(
MockLLMAdapter(mock_response="default"),
{
CUSTODIAN_TRIAGE_BALANCED: RuntimeProfile(
name=CUSTODIAN_TRIAGE_BALANCED,
provider="mock",
model="triage-model",
config=RunConfig(
model_name="triage-model",
temperature=0.2,
max_tokens=1800,
max_depth=2,
model_params={"reasoning_effort": "medium"},
),
)
},
adapter_factory=factory,
)
s = LLMServer(adapter=adapter, port=0)
s.start()
try:
status, body = _post(
f"http://127.0.0.1:{s.port}/execute",
{
"prompt": "Return JSON.",
"config": {
"model_name": CUSTODIAN_TRIAGE_BALANCED,
"model_params": {"json_schema": {"type": "object"}},
},
},
)
finally:
s.stop()
assert status == 200
assert body["model"] == "triage-model"
assert body["metadata"]["profile"] == CUSTODIAN_TRIAGE_BALANCED
assert body["metadata"]["profile_provider"] == "mock"
assert len(created) == 1
assert created[0].last_config.model_name == "triage-model"
assert created[0].last_config.temperature == 0.2
assert created[0].last_config.max_tokens == 1800
assert created[0].last_config.max_depth == 2
assert created[0].last_config.model_params == {
"reasoning_effort": "medium",
"json_schema": {"type": "object"},
}
def test_unknown_profile_returns_400(self):
s = LLMServer(adapter=ProfiledLLMAdapter(MockLLMAdapter(), {}), port=0)
s.start()
try:
status, body = _post(
f"http://127.0.0.1:{s.port}/execute",
{"prompt": "hello", "config": {"model_name": "custodian-missing"}},
)
finally:
s.stop()
assert status == 400
assert body["error"] == "unknown_profile"
assert body["context"]["profile"] == "custodian-missing"
def test_configuration_error_is_sanitized(self):
class SecretConfigAdapter(MockLLMAdapter):
def execute_prompt(self, prompt: str, config: RunConfig) -> LLMResponse:
raise LLMConfigurationError(
"Bad api_key=sk-supersecret with Bearer secret-token",
context={"api_key": "sk-supersecret", "provider": "openai"},
)
s = LLMServer(adapter=SecretConfigAdapter(), port=0)
s.start()
try:
status, body = _post(
f"http://127.0.0.1:{s.port}/execute",
{"prompt": "hello"},
)
finally:
s.stop()
assert status == 500
assert body["error"] == "configuration_error"
assert "sk-supersecret" not in json.dumps(body)
assert "secret-token" not in json.dumps(body)
assert body["context"]["api_key"] == "<redacted>"
assert body["context"]["provider"] == "openai"
def test_provider_errors_are_categorized_and_sanitized(self):
class ProviderErrorAdapter(MockLLMAdapter):
def execute_prompt(self, prompt: str, config: RunConfig) -> LLMResponse:
raise LLMAPIError(
"HTTP 500 from https://provider.example/v1?key=gemini-secret",
status_code=500,
)
s = LLMServer(adapter=ProviderErrorAdapter(), port=0)
s.start()
try:
status, body = _post(
f"http://127.0.0.1:{s.port}/execute",
{"prompt": "hello"},
)
finally:
s.stop()
assert status == 502
assert body["error"] == "provider_api_error"
assert body["provider_status"] == 500
assert "gemini-secret" not in body["message"]
def test_timeout_error_returns_504(self):
class TimeoutAdapter(MockLLMAdapter):
def execute_prompt(self, prompt: str, config: RunConfig) -> LLMResponse:
raise LLMTimeoutError("Request timed out after 300s")
s = LLMServer(adapter=TimeoutAdapter(), port=0)
s.start()
try:
status, body = _post(
f"http://127.0.0.1:{s.port}/execute",
{"prompt": "hello"},
)
finally:
s.stop()
assert status == 504
assert body["error"] == "provider_timeout"
def test_debug_query_returns_diagnostics(self):
s = LLMServer(adapter=DiagnosticLLMAdapter(mock_response="debug body"), port=0)
s.start()
try:
status, body = _post(
f"http://127.0.0.1:{s.port}/execute?debug=1",
{"prompt": "inspect", "config": {"model_name": "diagnostic-model"}},
)
finally:
s.stop()
assert status == 200
assert body["content"] == "debug body"
debug = body["debug"]
assert debug["provider_request"]["payload"] == {
"prompt": "inspect",
"model": "diagnostic-model",
}
assert debug["provider_request"]["headers_redacted"]["Authorization"] == "Bearer <redacted>"
assert debug["provider_response"]["status"] == 200
assert debug["adapter_transformations"][0]["step"] == "diagnostic_transform"
def test_debug_env_returns_diagnostics(self, monkeypatch):
monkeypatch.setenv("LLM_CONNECT_DEBUG", "1")
s = LLMServer(adapter=DiagnosticLLMAdapter(mock_response="debug body"), port=0)
s.start()
try:
status, body = _post(
f"http://127.0.0.1:{s.port}/execute",
{"prompt": "inspect"},
)
finally:
s.stop()
assert status == 200
assert "debug" in body
def test_audit_dir_records_replayable_call(self, monkeypatch, tmp_path):
monkeypatch.setenv("LLM_CONNECT_AUDIT_DIR", str(tmp_path))
s = LLMServer(adapter=DiagnosticLLMAdapter(mock_response="audit body"), port=0)
s.start()
try:
status, body = _post(
f"http://127.0.0.1:{s.port}/execute",
{"prompt": "audit me", "config": {"model_name": "audit-model"}},
)
finally:
s.stop()
assert status == 200
assert "debug" not in body
files = list(tmp_path.glob("*.json"))
assert len(files) == 1
record = json.loads(files[0].read_text(encoding="utf-8"))
assert record["prompt"] == "audit me"
assert record["config"]["model_name"] == "audit-model"
assert record["parsed_content"] == "audit body"
assert record["provider_request"]["headers_redacted"]["Authorization"] == "Bearer <redacted>"
assert record["provider_response"]["body"]["id"] == "diag-response"
assert record["latency_seconds"] >= 0
def test_execute_requests_run_concurrently(self):
s = LLMServer(adapter=BarrierLLMAdapter(), port=0)
s.start()
try:
start = time.monotonic()
with ThreadPoolExecutor(max_workers=2) as pool:
futures = [
pool.submit(
_post,
f"http://127.0.0.1:{s.port}/execute",
{"prompt": f"request {idx}"},
)
for idx in range(2)
]
results = [future.result(timeout=3.0) for future in futures]
elapsed = time.monotonic() - start
finally:
s.stop()
assert [status for status, _body in results] == [200, 200]
assert elapsed < 1.5