Files
llm-connect/scripts/smoke_activity_core_endpoint.py
tegwick 14ba47c129
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
Add activity-core LLM endpoint support
2026-06-07 19:24:45 +02:00

234 lines
8.2 KiB
Python

#!/usr/bin/env python3
"""Smoke-test the activity-core llm-connect endpoint contract."""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_REQUEST = ROOT / "fixtures" / "activity_core" / "daily-triage-execute-request.json"
DEFAULT_SCHEMA = ROOT / "fixtures" / "activity_core" / "daily-triage-report.schema.json"
class SmokeError(RuntimeError):
pass
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
description="Validate /health, /execute, and daily triage JSON content.",
)
parser.add_argument(
"--url",
default=os.environ.get("LLM_CONNECT_URL", "http://127.0.0.1:8080"),
help="Base llm-connect URL (default: env LLM_CONNECT_URL or localhost:8080)",
)
parser.add_argument("--request", type=Path, default=DEFAULT_REQUEST)
parser.add_argument("--schema", type=Path, default=DEFAULT_SCHEMA)
parser.add_argument(
"--timeout",
type=float,
default=float(os.environ.get("LLM_CONNECT_TIMEOUT_SECONDS", "300")),
help="HTTP timeout in seconds (default: env LLM_CONNECT_TIMEOUT_SECONDS or 300)",
)
parser.add_argument("--skip-health", action="store_true")
args = parser.parse_args(argv)
try:
result = run_smoke(
base_url=args.url,
request_path=args.request,
schema_path=args.schema,
timeout=args.timeout,
check_health=not args.skip_health,
)
except SmokeError as exc:
print(f"smoke: fail: {exc}", file=sys.stderr)
return 1
print(
"smoke: pass "
f"health={result['health']} "
f"latency_seconds={result['latency_seconds']:.3f} "
f"recommendations={result['recommendations']}"
)
return 0
def run_smoke(
*,
base_url: str,
request_path: Path,
schema_path: Path,
timeout: float,
check_health: bool = True,
) -> dict[str, Any]:
base = base_url.rstrip("/")
if check_health:
health = _get_json(f"{base}/health", timeout=timeout)
if health.get("status") != "ok":
raise SmokeError("/health did not return status=ok")
health_status = "ok"
else:
health_status = "skipped"
request_body = _load_json(request_path)
schema = _load_json(schema_path)
start = time.monotonic()
response = _post_json(f"{base}/execute", request_body, timeout=timeout)
latency = time.monotonic() - start
content = response.get("content")
if not isinstance(content, str):
raise SmokeError("/execute response did not include a string content field")
try:
content_json = json.loads(content)
except json.JSONDecodeError as exc:
raise SmokeError(f"content was not valid JSON: {exc}") from exc
errors = validate_json_schema(content_json, schema)
if errors:
raise SmokeError("content schema validation failed: " + "; ".join(errors[:5]))
return {
"health": health_status,
"latency_seconds": latency,
"recommendations": len(content_json.get("recommendations", [])),
}
def validate_json_schema(instance: Any, schema: dict[str, Any]) -> list[str]:
"""Validate the subset of JSON Schema used by the activity-core fixture."""
errors: list[str] = []
_validate(instance, schema, "$", errors)
return errors
def _validate(instance: Any, schema: dict[str, Any], path: str, errors: list[str]) -> None:
expected_type = schema.get("type")
if expected_type and not _matches_type(instance, expected_type):
errors.append(f"{path}: expected {expected_type}, got {type(instance).__name__}")
return
if "enum" in schema and instance not in schema["enum"]:
errors.append(f"{path}: value {instance!r} not in enum")
if expected_type == "object":
assert isinstance(instance, dict)
required = schema.get("required", [])
for key in required:
if key not in instance:
errors.append(f"{path}: missing required property {key!r}")
properties = schema.get("properties", {})
if schema.get("additionalProperties") is False:
for key in instance:
if key not in properties:
errors.append(f"{path}: unexpected property {key!r}")
for key, subschema in properties.items():
if key in instance and isinstance(subschema, dict):
_validate(instance[key], subschema, f"{path}.{key}", errors)
return
if expected_type == "array":
assert isinstance(instance, list)
min_items = schema.get("minItems")
max_items = schema.get("maxItems")
if isinstance(min_items, int) and len(instance) < min_items:
errors.append(f"{path}: expected at least {min_items} items")
if isinstance(max_items, int) and len(instance) > max_items:
errors.append(f"{path}: expected at most {max_items} items")
item_schema = schema.get("items")
if isinstance(item_schema, dict):
for index, item in enumerate(instance):
_validate(item, item_schema, f"{path}[{index}]", errors)
return
if expected_type in {"integer", "number"}:
minimum = schema.get("minimum")
maximum = schema.get("maximum")
if isinstance(minimum, (int, float)) and instance < minimum:
errors.append(f"{path}: expected >= {minimum}")
if isinstance(maximum, (int, float)) and instance > maximum:
errors.append(f"{path}: expected <= {maximum}")
def _matches_type(instance: Any, expected_type: str) -> bool:
if expected_type == "object":
return isinstance(instance, dict)
if expected_type == "array":
return isinstance(instance, list)
if expected_type == "string":
return isinstance(instance, str)
if expected_type == "integer":
return isinstance(instance, int) and not isinstance(instance, bool)
if expected_type == "number":
return isinstance(instance, (int, float)) and not isinstance(instance, bool)
if expected_type == "boolean":
return isinstance(instance, bool)
if expected_type == "null":
return instance is None
return True
def _load_json(path: Path) -> Any:
try:
return json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as exc:
raise SmokeError(f"could not load JSON from {path}: {exc}") from exc
def _get_json(url: str, *, timeout: float) -> dict[str, Any]:
try:
with urllib.request.urlopen(url, timeout=timeout) as response:
return _decode_json(response.read())
except urllib.error.HTTPError as exc:
raise SmokeError(f"GET /health returned HTTP {exc.code}") from exc
except urllib.error.URLError as exc:
raise SmokeError(f"GET /health failed: {exc.reason}") from exc
def _post_json(url: str, body: dict[str, Any], *, timeout: float) -> dict[str, Any]:
request = urllib.request.Request(
url,
data=json.dumps(body).encode(),
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(request, timeout=timeout) as response:
return _decode_json(response.read())
except urllib.error.HTTPError as exc:
try:
error_body = _decode_json(exc.read())
code = error_body.get("error", "unknown_error")
message = error_body.get("message", "")
detail = f"{code}: {message}" if message else code
except SmokeError:
detail = "non-JSON error body"
raise SmokeError(f"POST /execute returned HTTP {exc.code}: {detail}") from exc
except urllib.error.URLError as exc:
raise SmokeError(f"POST /execute failed: {exc.reason}") from exc
def _decode_json(data: bytes) -> dict[str, Any]:
try:
decoded = json.loads(data.decode())
except (UnicodeDecodeError, json.JSONDecodeError) as exc:
raise SmokeError(f"response was not JSON: {exc}") from exc
if not isinstance(decoded, dict):
raise SmokeError("response JSON was not an object")
return decoded
if __name__ == "__main__":
raise SystemExit(main())