generated from coulomb/repo-seed
234 lines
8.2 KiB
Python
234 lines
8.2 KiB
Python
#!/usr/bin/env python3
|
|
"""Smoke-test the activity-core llm-connect endpoint contract."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
DEFAULT_REQUEST = ROOT / "fixtures" / "activity_core" / "daily-triage-execute-request.json"
|
|
DEFAULT_SCHEMA = ROOT / "fixtures" / "activity_core" / "daily-triage-report.schema.json"
|
|
|
|
|
|
class SmokeError(RuntimeError):
|
|
pass
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
parser = argparse.ArgumentParser(
|
|
description="Validate /health, /execute, and daily triage JSON content.",
|
|
)
|
|
parser.add_argument(
|
|
"--url",
|
|
default=os.environ.get("LLM_CONNECT_URL", "http://127.0.0.1:8080"),
|
|
help="Base llm-connect URL (default: env LLM_CONNECT_URL or localhost:8080)",
|
|
)
|
|
parser.add_argument("--request", type=Path, default=DEFAULT_REQUEST)
|
|
parser.add_argument("--schema", type=Path, default=DEFAULT_SCHEMA)
|
|
parser.add_argument(
|
|
"--timeout",
|
|
type=float,
|
|
default=float(os.environ.get("LLM_CONNECT_TIMEOUT_SECONDS", "300")),
|
|
help="HTTP timeout in seconds (default: env LLM_CONNECT_TIMEOUT_SECONDS or 300)",
|
|
)
|
|
parser.add_argument("--skip-health", action="store_true")
|
|
args = parser.parse_args(argv)
|
|
|
|
try:
|
|
result = run_smoke(
|
|
base_url=args.url,
|
|
request_path=args.request,
|
|
schema_path=args.schema,
|
|
timeout=args.timeout,
|
|
check_health=not args.skip_health,
|
|
)
|
|
except SmokeError as exc:
|
|
print(f"smoke: fail: {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
print(
|
|
"smoke: pass "
|
|
f"health={result['health']} "
|
|
f"latency_seconds={result['latency_seconds']:.3f} "
|
|
f"recommendations={result['recommendations']}"
|
|
)
|
|
return 0
|
|
|
|
|
|
def run_smoke(
|
|
*,
|
|
base_url: str,
|
|
request_path: Path,
|
|
schema_path: Path,
|
|
timeout: float,
|
|
check_health: bool = True,
|
|
) -> dict[str, Any]:
|
|
base = base_url.rstrip("/")
|
|
if check_health:
|
|
health = _get_json(f"{base}/health", timeout=timeout)
|
|
if health.get("status") != "ok":
|
|
raise SmokeError("/health did not return status=ok")
|
|
health_status = "ok"
|
|
else:
|
|
health_status = "skipped"
|
|
|
|
request_body = _load_json(request_path)
|
|
schema = _load_json(schema_path)
|
|
start = time.monotonic()
|
|
response = _post_json(f"{base}/execute", request_body, timeout=timeout)
|
|
latency = time.monotonic() - start
|
|
|
|
content = response.get("content")
|
|
if not isinstance(content, str):
|
|
raise SmokeError("/execute response did not include a string content field")
|
|
try:
|
|
content_json = json.loads(content)
|
|
except json.JSONDecodeError as exc:
|
|
raise SmokeError(f"content was not valid JSON: {exc}") from exc
|
|
|
|
errors = validate_json_schema(content_json, schema)
|
|
if errors:
|
|
raise SmokeError("content schema validation failed: " + "; ".join(errors[:5]))
|
|
|
|
return {
|
|
"health": health_status,
|
|
"latency_seconds": latency,
|
|
"recommendations": len(content_json.get("recommendations", [])),
|
|
}
|
|
|
|
|
|
def validate_json_schema(instance: Any, schema: dict[str, Any]) -> list[str]:
|
|
"""Validate the subset of JSON Schema used by the activity-core fixture."""
|
|
|
|
errors: list[str] = []
|
|
_validate(instance, schema, "$", errors)
|
|
return errors
|
|
|
|
|
|
def _validate(instance: Any, schema: dict[str, Any], path: str, errors: list[str]) -> None:
|
|
expected_type = schema.get("type")
|
|
if expected_type and not _matches_type(instance, expected_type):
|
|
errors.append(f"{path}: expected {expected_type}, got {type(instance).__name__}")
|
|
return
|
|
|
|
if "enum" in schema and instance not in schema["enum"]:
|
|
errors.append(f"{path}: value {instance!r} not in enum")
|
|
|
|
if expected_type == "object":
|
|
assert isinstance(instance, dict)
|
|
required = schema.get("required", [])
|
|
for key in required:
|
|
if key not in instance:
|
|
errors.append(f"{path}: missing required property {key!r}")
|
|
properties = schema.get("properties", {})
|
|
if schema.get("additionalProperties") is False:
|
|
for key in instance:
|
|
if key not in properties:
|
|
errors.append(f"{path}: unexpected property {key!r}")
|
|
for key, subschema in properties.items():
|
|
if key in instance and isinstance(subschema, dict):
|
|
_validate(instance[key], subschema, f"{path}.{key}", errors)
|
|
return
|
|
|
|
if expected_type == "array":
|
|
assert isinstance(instance, list)
|
|
min_items = schema.get("minItems")
|
|
max_items = schema.get("maxItems")
|
|
if isinstance(min_items, int) and len(instance) < min_items:
|
|
errors.append(f"{path}: expected at least {min_items} items")
|
|
if isinstance(max_items, int) and len(instance) > max_items:
|
|
errors.append(f"{path}: expected at most {max_items} items")
|
|
item_schema = schema.get("items")
|
|
if isinstance(item_schema, dict):
|
|
for index, item in enumerate(instance):
|
|
_validate(item, item_schema, f"{path}[{index}]", errors)
|
|
return
|
|
|
|
if expected_type in {"integer", "number"}:
|
|
minimum = schema.get("minimum")
|
|
maximum = schema.get("maximum")
|
|
if isinstance(minimum, (int, float)) and instance < minimum:
|
|
errors.append(f"{path}: expected >= {minimum}")
|
|
if isinstance(maximum, (int, float)) and instance > maximum:
|
|
errors.append(f"{path}: expected <= {maximum}")
|
|
|
|
|
|
def _matches_type(instance: Any, expected_type: str) -> bool:
|
|
if expected_type == "object":
|
|
return isinstance(instance, dict)
|
|
if expected_type == "array":
|
|
return isinstance(instance, list)
|
|
if expected_type == "string":
|
|
return isinstance(instance, str)
|
|
if expected_type == "integer":
|
|
return isinstance(instance, int) and not isinstance(instance, bool)
|
|
if expected_type == "number":
|
|
return isinstance(instance, (int, float)) and not isinstance(instance, bool)
|
|
if expected_type == "boolean":
|
|
return isinstance(instance, bool)
|
|
if expected_type == "null":
|
|
return instance is None
|
|
return True
|
|
|
|
|
|
def _load_json(path: Path) -> Any:
|
|
try:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
except (OSError, json.JSONDecodeError) as exc:
|
|
raise SmokeError(f"could not load JSON from {path}: {exc}") from exc
|
|
|
|
|
|
def _get_json(url: str, *, timeout: float) -> dict[str, Any]:
|
|
try:
|
|
with urllib.request.urlopen(url, timeout=timeout) as response:
|
|
return _decode_json(response.read())
|
|
except urllib.error.HTTPError as exc:
|
|
raise SmokeError(f"GET /health returned HTTP {exc.code}") from exc
|
|
except urllib.error.URLError as exc:
|
|
raise SmokeError(f"GET /health failed: {exc.reason}") from exc
|
|
|
|
|
|
def _post_json(url: str, body: dict[str, Any], *, timeout: float) -> dict[str, Any]:
|
|
request = urllib.request.Request(
|
|
url,
|
|
data=json.dumps(body).encode(),
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST",
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=timeout) as response:
|
|
return _decode_json(response.read())
|
|
except urllib.error.HTTPError as exc:
|
|
try:
|
|
error_body = _decode_json(exc.read())
|
|
code = error_body.get("error", "unknown_error")
|
|
message = error_body.get("message", "")
|
|
detail = f"{code}: {message}" if message else code
|
|
except SmokeError:
|
|
detail = "non-JSON error body"
|
|
raise SmokeError(f"POST /execute returned HTTP {exc.code}: {detail}") from exc
|
|
except urllib.error.URLError as exc:
|
|
raise SmokeError(f"POST /execute failed: {exc.reason}") from exc
|
|
|
|
|
|
def _decode_json(data: bytes) -> dict[str, Any]:
|
|
try:
|
|
decoded = json.loads(data.decode())
|
|
except (UnicodeDecodeError, json.JSONDecodeError) as exc:
|
|
raise SmokeError(f"response was not JSON: {exc}") from exc
|
|
if not isinstance(decoded, dict):
|
|
raise SmokeError("response JSON was not an object")
|
|
return decoded
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|