generated from coulomb/repo-seed
Implement live-shaped readiness workplan
This commit is contained in:
58
tests/test_service_binding.py
Normal file
58
tests/test_service_binding.py
Normal file
@@ -0,0 +1,58 @@
|
||||
import json
|
||||
from io import BytesIO
|
||||
|
||||
from phase_memory import LocalServiceRunner, ServiceBinding
|
||||
from phase_memory.service_binding import READINESS_REPORT_SCHEMA
|
||||
|
||||
|
||||
def test_service_binding_exposes_health_readiness_and_contracts_without_listener() -> None:
|
||||
binding = ServiceBinding(LocalServiceRunner())
|
||||
|
||||
health = binding.route("GET", "/health")
|
||||
ready = binding.route("GET", "/ready")
|
||||
contracts = binding.route("GET", "/contracts")
|
||||
|
||||
assert health.status == 200
|
||||
assert ready.status == 200
|
||||
assert ready.body["schema_version"] == READINESS_REPORT_SCHEMA
|
||||
assert ready.body["unsupported_operations"] == []
|
||||
assert "package.compile" in contracts.body["operations"]
|
||||
|
||||
|
||||
def test_service_binding_dispatches_operation_payloads() -> None:
|
||||
binding = ServiceBinding(LocalServiceRunner())
|
||||
selection = {
|
||||
"schema_version": "markitect.memory.selection.v1",
|
||||
"id": "binding.selection",
|
||||
"nodes": [],
|
||||
"events": [],
|
||||
}
|
||||
|
||||
response = binding.route("POST", "/operations/package.compile", {"selection": selection})
|
||||
|
||||
assert response.status == 200
|
||||
assert response.body["operation"] == "package.compile"
|
||||
assert response.body["data"]["package_response"]["package_ref"] == "package:binding.selection"
|
||||
|
||||
|
||||
def test_service_binding_wsgi_adapter_is_callable_without_starting_server() -> None:
|
||||
binding = ServiceBinding(LocalServiceRunner())
|
||||
app = binding.as_wsgi_app()
|
||||
statuses: list[str] = []
|
||||
headers: list[list[tuple[str, str]]] = []
|
||||
|
||||
payload = json.dumps({"operation": "health.check", "payload": {}}).encode("utf-8")
|
||||
result = app(
|
||||
{
|
||||
"REQUEST_METHOD": "POST",
|
||||
"PATH_INFO": "/operations",
|
||||
"CONTENT_LENGTH": str(len(payload)),
|
||||
"wsgi.input": BytesIO(payload),
|
||||
},
|
||||
lambda status, response_headers: (statuses.append(status), headers.append(response_headers)),
|
||||
)
|
||||
|
||||
body = json.loads(b"".join(result).decode("utf-8"))
|
||||
assert statuses == ["200 OK"]
|
||||
assert body["schema_version"] == "phase_memory.health.report.v1"
|
||||
assert ("content-type", "application/json") in headers[0]
|
||||
Reference in New Issue
Block a user