import httpx import pytest from httpx import ASGITransport, AsyncClient from api.edge.outbox import OutboxStore from api.edge.relay import create_app, replay_pending class FailingAsyncClient: def __init__(self, *args, **kwargs): pass async def __aenter__(self): return self async def __aexit__(self, *exc_info): return False async def request(self, *args, **kwargs): raise httpx.ConnectError("upstream down") async def get(self, *args, **kwargs): raise httpx.ConnectError("upstream down") class ConflictAsyncClient: def __init__(self, *args, **kwargs): pass async def __aenter__(self): return self async def __aexit__(self, *exc_info): return False async def request(self, method, path, **kwargs): request = httpx.Request(method, f"http://upstream{path}") return httpx.Response(409, json={"error": "conflict"}, request=request) class SuccessAsyncClient: def __init__(self, *args, **kwargs): pass async def __aenter__(self): return self async def __aexit__(self, *exc_info): return False async def request(self, method, path, **kwargs): request = httpx.Request(method, f"http://upstream{path}") return httpx.Response(201, json={"id": "central-id", "path": path}, request=request) @pytest.mark.asyncio async def test_relay_queues_allowlisted_write_when_upstream_unreachable(tmp_path, monkeypatch): from api.edge import relay monkeypatch.setattr(relay.httpx, "AsyncClient", FailingAsyncClient) outbox_path = tmp_path / "edge.sqlite3" app = create_app(upstream_url="http://upstream", outbox_path=str(outbox_path)) async with AsyncClient(transport=ASGITransport(app=app), base_url="http://edge") as client: response = await client.post("/progress/", json={"event_type": "note", "summary": "queued"}) assert response.status_code == 202 body = response.json() assert body["queued"] is True assert body["route_class"] == "append" store = OutboxStore(outbox_path) queued = store.list(status="queued") assert len(queued) == 1 assert queued[0].path == "/progress/" @pytest.mark.asyncio async def test_relay_replay_acks_successful_envelope(tmp_path, monkeypatch): from api.edge import relay monkeypatch.setattr(relay.httpx, "AsyncClient", SuccessAsyncClient) store = OutboxStore(tmp_path / "edge.sqlite3") envelope = store.enqueue(method="POST", path="/progress/", body={"event_type": "note", "summary": "queued"}) result = await replay_pending(store, upstream_url="http://upstream") assert result["acked"] == 1 assert store.get(envelope.id).status == "acked" @pytest.mark.asyncio async def test_relay_rejects_online_only_write_when_upstream_unreachable(tmp_path, monkeypatch): from api.edge import relay monkeypatch.setattr(relay.httpx, "AsyncClient", FailingAsyncClient) app = create_app(upstream_url="http://upstream", outbox_path=str(tmp_path / "edge.sqlite3")) async with AsyncClient(transport=ASGITransport(app=app), base_url="http://edge") as client: response = await client.delete("/tasks/abc") assert response.status_code == 503 assert "not queueable" in response.json()["error"] @pytest.mark.asyncio async def test_relay_replay_marks_conflict(tmp_path, monkeypatch): from api.edge import relay monkeypatch.setattr(relay.httpx, "AsyncClient", ConflictAsyncClient) store = OutboxStore(tmp_path / "edge.sqlite3") envelope = store.enqueue(method="PATCH", path="/tasks/task-1", body={"status": "done"}) result = await replay_pending(store, upstream_url="http://upstream") assert result["conflict"] == 1 assert store.get(envelope.id).status == "conflict"