generated from coulomb/repo-seed
42 lines
1.5 KiB
Python
42 lines
1.5 KiB
Python
import json
|
|
from io import BytesIO
|
|
|
|
from phase_memory.service_app import ServiceAppConfig, create_wsgi_app, main, service_app_metadata
|
|
|
|
|
|
def test_service_app_metadata_exposes_deployable_routes_without_listener(tmp_path) -> None:
|
|
config = ServiceAppConfig(host="127.0.0.1", port=8123, local_store_path=str(tmp_path))
|
|
|
|
metadata = service_app_metadata(config)
|
|
|
|
assert metadata["schema_version"] == "phase_memory.service.app.v1"
|
|
assert metadata["config"]["port"] == 8123
|
|
assert metadata["readiness"]["ok"] is True
|
|
assert metadata["routes"]["operations"] == "/operations/{operation}"
|
|
|
|
|
|
def test_service_wsgi_app_can_dispatch_without_opening_listener(tmp_path) -> None:
|
|
app = create_wsgi_app(ServiceAppConfig(local_store_path=str(tmp_path)))
|
|
statuses: list[str] = []
|
|
|
|
payload = json.dumps({"selection": {"schema_version": "markitect.memory.selection.v1", "id": "svc", "nodes": [], "events": []}}).encode("utf-8")
|
|
body = b"".join(
|
|
app(
|
|
{
|
|
"REQUEST_METHOD": "POST",
|
|
"PATH_INFO": "/operations/package.compile",
|
|
"CONTENT_LENGTH": str(len(payload)),
|
|
"wsgi.input": BytesIO(payload),
|
|
},
|
|
lambda status, _headers: statuses.append(status),
|
|
)
|
|
)
|
|
|
|
response = json.loads(body.decode("utf-8"))
|
|
assert statuses == ["200 OK"]
|
|
assert response["operation"] == "package.compile"
|
|
|
|
|
|
def test_service_main_check_builds_app_without_serving(tmp_path) -> None:
|
|
assert main(["--check", "--store", str(tmp_path), "--port", "8124"]) == 0
|