generated from coulomb/repo-seed
59 lines
2.1 KiB
Python
59 lines
2.1 KiB
Python
import json
|
|
from io import BytesIO
|
|
|
|
from phase_memory import LocalServiceRunner, ServiceBinding
|
|
from phase_memory.service_binding import READINESS_REPORT_SCHEMA
|
|
|
|
|
|
def test_service_binding_exposes_health_readiness_and_contracts_without_listener() -> None:
|
|
binding = ServiceBinding(LocalServiceRunner())
|
|
|
|
health = binding.route("GET", "/health")
|
|
ready = binding.route("GET", "/ready")
|
|
contracts = binding.route("GET", "/contracts")
|
|
|
|
assert health.status == 200
|
|
assert ready.status == 200
|
|
assert ready.body["schema_version"] == READINESS_REPORT_SCHEMA
|
|
assert ready.body["unsupported_operations"] == []
|
|
assert "package.compile" in contracts.body["operations"]
|
|
|
|
|
|
def test_service_binding_dispatches_operation_payloads() -> None:
|
|
binding = ServiceBinding(LocalServiceRunner())
|
|
selection = {
|
|
"schema_version": "markitect.memory.selection.v1",
|
|
"id": "binding.selection",
|
|
"nodes": [],
|
|
"events": [],
|
|
}
|
|
|
|
response = binding.route("POST", "/operations/package.compile", {"selection": selection})
|
|
|
|
assert response.status == 200
|
|
assert response.body["operation"] == "package.compile"
|
|
assert response.body["data"]["package_response"]["package_ref"] == "package:binding.selection"
|
|
|
|
|
|
def test_service_binding_wsgi_adapter_is_callable_without_starting_server() -> None:
|
|
binding = ServiceBinding(LocalServiceRunner())
|
|
app = binding.as_wsgi_app()
|
|
statuses: list[str] = []
|
|
headers: list[list[tuple[str, str]]] = []
|
|
|
|
payload = json.dumps({"operation": "health.check", "payload": {}}).encode("utf-8")
|
|
result = app(
|
|
{
|
|
"REQUEST_METHOD": "POST",
|
|
"PATH_INFO": "/operations",
|
|
"CONTENT_LENGTH": str(len(payload)),
|
|
"wsgi.input": BytesIO(payload),
|
|
},
|
|
lambda status, response_headers: (statuses.append(status), headers.append(response_headers)),
|
|
)
|
|
|
|
body = json.loads(b"".join(result).decode("utf-8"))
|
|
assert statuses == ["200 OK"]
|
|
assert body["schema_version"] == "phase_memory.health.report.v1"
|
|
assert ("content-type", "application/json") in headers[0]
|