generated from coulomb/repo-seed
118 lines
3.7 KiB
Python
118 lines
3.7 KiB
Python
import httpx
|
|
import pytest
|
|
from httpx import ASGITransport, AsyncClient
|
|
|
|
from api.edge.outbox import OutboxStore
|
|
from api.edge.relay import create_app, replay_pending
|
|
|
|
|
|
class FailingAsyncClient:
|
|
def __init__(self, *args, **kwargs):
|
|
pass
|
|
|
|
async def __aenter__(self):
|
|
return self
|
|
|
|
async def __aexit__(self, *exc_info):
|
|
return False
|
|
|
|
async def request(self, *args, **kwargs):
|
|
raise httpx.ConnectError("upstream down")
|
|
|
|
async def get(self, *args, **kwargs):
|
|
raise httpx.ConnectError("upstream down")
|
|
|
|
|
|
class ConflictAsyncClient:
|
|
def __init__(self, *args, **kwargs):
|
|
pass
|
|
|
|
async def __aenter__(self):
|
|
return self
|
|
|
|
async def __aexit__(self, *exc_info):
|
|
return False
|
|
|
|
async def request(self, method, path, **kwargs):
|
|
request = httpx.Request(method, f"http://upstream{path}")
|
|
return httpx.Response(409, json={"error": "conflict"}, request=request)
|
|
|
|
|
|
class SuccessAsyncClient:
|
|
def __init__(self, *args, **kwargs):
|
|
pass
|
|
|
|
async def __aenter__(self):
|
|
return self
|
|
|
|
async def __aexit__(self, *exc_info):
|
|
return False
|
|
|
|
async def request(self, method, path, **kwargs):
|
|
request = httpx.Request(method, f"http://upstream{path}")
|
|
return httpx.Response(201, json={"id": "central-id", "path": path}, request=request)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_relay_queues_allowlisted_write_when_upstream_unreachable(tmp_path, monkeypatch):
|
|
from api.edge import relay
|
|
|
|
monkeypatch.setattr(relay.httpx, "AsyncClient", FailingAsyncClient)
|
|
outbox_path = tmp_path / "edge.sqlite3"
|
|
app = create_app(upstream_url="http://upstream", outbox_path=str(outbox_path))
|
|
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://edge") as client:
|
|
response = await client.post("/progress/", json={"event_type": "note", "summary": "queued"})
|
|
|
|
assert response.status_code == 202
|
|
body = response.json()
|
|
assert body["queued"] is True
|
|
assert body["route_class"] == "append"
|
|
|
|
store = OutboxStore(outbox_path)
|
|
queued = store.list(status="queued")
|
|
assert len(queued) == 1
|
|
assert queued[0].path == "/progress/"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_relay_replay_acks_successful_envelope(tmp_path, monkeypatch):
|
|
from api.edge import relay
|
|
|
|
monkeypatch.setattr(relay.httpx, "AsyncClient", SuccessAsyncClient)
|
|
store = OutboxStore(tmp_path / "edge.sqlite3")
|
|
envelope = store.enqueue(method="POST", path="/progress/", body={"event_type": "note", "summary": "queued"})
|
|
|
|
result = await replay_pending(store, upstream_url="http://upstream")
|
|
|
|
assert result["acked"] == 1
|
|
assert store.get(envelope.id).status == "acked"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_relay_rejects_online_only_write_when_upstream_unreachable(tmp_path, monkeypatch):
|
|
from api.edge import relay
|
|
|
|
monkeypatch.setattr(relay.httpx, "AsyncClient", FailingAsyncClient)
|
|
app = create_app(upstream_url="http://upstream", outbox_path=str(tmp_path / "edge.sqlite3"))
|
|
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://edge") as client:
|
|
response = await client.delete("/tasks/abc")
|
|
|
|
assert response.status_code == 503
|
|
assert "not queueable" in response.json()["error"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_relay_replay_marks_conflict(tmp_path, monkeypatch):
|
|
from api.edge import relay
|
|
|
|
monkeypatch.setattr(relay.httpx, "AsyncClient", ConflictAsyncClient)
|
|
store = OutboxStore(tmp_path / "edge.sqlite3")
|
|
envelope = store.enqueue(method="PATCH", path="/tasks/task-1", body={"status": "done"})
|
|
|
|
result = await replay_pending(store, upstream_url="http://upstream")
|
|
|
|
assert result["conflict"] == 1
|
|
assert store.get(envelope.id).status == "conflict"
|