from __future__ import annotations import os import socket from typing import Any import httpx from fastapi import FastAPI, Request from fastapi.responses import JSONResponse, Response from api.edge.outbox import OutboxEnvelope, OutboxStore, PayloadRejected, default_outbox_path from api.services.write_idempotency import route_class_for HOP_BY_HOP_HEADERS = { "connection", "keep-alive", "proxy-authenticate", "proxy-authorization", "te", "trailer", "transfer-encoding", "upgrade", "content-encoding", "content-length", } def _safe_response_headers(headers: httpx.Headers) -> dict[str, str]: return {key: value for key, value in headers.items() if key.lower() not in HOP_BY_HOP_HEADERS} def _body_summary(response: httpx.Response) -> Any: try: return response.json() except ValueError: return {"text": response.text[:500]} def queued_receipt(envelope: OutboxEnvelope, upstream_error: str) -> dict[str, Any]: return { "queued": True, "outbox_id": envelope.id, "idempotency_key": envelope.idempotency_key, "upstream": "unreachable", "upstream_error": upstream_error, "route_class": envelope.route_class, } async def replay_pending( store: OutboxStore, *, upstream_url: str, limit: int = 50, timeout: float = 10.0, ) -> dict[str, int]: counts = {"sent": 0, "acked": 0, "conflict": 0, "retry": 0, "dead": 0} async with httpx.AsyncClient(base_url=upstream_url.rstrip("/"), timeout=timeout) as client: for envelope in store.due(limit=limit): counts["sent"] += 1 store.mark_sending(envelope.id) try: response = await client.request( envelope.method, envelope.path, json=envelope.body, headers={ "Idempotency-Key": envelope.idempotency_key, "X-StateHub-Source-Agent": envelope.source_agent or "statehub-edge", "X-StateHub-Source-Host": envelope.source_host or socket.gethostname(), }, ) except httpx.HTTPError as exc: counts["retry"] += 1 store.mark_retry(envelope.id, error=str(exc), attempt_count=envelope.attempt_count + 1) continue response_body = _body_summary(response) if response.status_code == 409: counts["conflict"] += 1 store.mark_conflict(envelope.id, response_status=response.status_code, response_body=response_body) elif 200 <= response.status_code < 300: counts["acked"] += 1 store.mark_acked(envelope.id, response_status=response.status_code, response_body=response_body) elif response.status_code >= 500: counts["retry"] += 1 store.mark_retry( envelope.id, error=f"HTTP {response.status_code}: {response.text[:300]}", attempt_count=envelope.attempt_count + 1, ) else: counts["dead"] += 1 store.mark_dead( envelope.id, error=f"HTTP {response.status_code}: not retryable", response_status=response.status_code, response_body=response_body, ) return counts def create_app( *, upstream_url: str | None = None, outbox_path: str | None = None, timeout: float = 10.0, ) -> FastAPI: upstream = (upstream_url or os.environ.get("STATEHUB_UPSTREAM_URL") or os.environ.get("API_BASE") or "http://127.0.0.1:8000").rstrip("/") store_path = outbox_path or default_outbox_path() store_instance: OutboxStore | None = None def get_store() -> OutboxStore: nonlocal store_instance if store_instance is None: store_instance = OutboxStore(store_path) return store_instance app = FastAPI(title="State Hub Edge Relay", version="0.1.0") @app.get("/edge/health") async def edge_health() -> dict[str, Any]: reachable = False error = None try: async with httpx.AsyncClient(base_url=upstream, timeout=2.0) as client: response = await client.get("/state/health") reachable = response.status_code < 500 except httpx.HTTPError as exc: error = str(exc) return { "status": "ok", "upstream": upstream, "upstream_reachable": reachable, "upstream_error": error, "outbox": get_store().summary(), } @app.post("/edge/replay") async def edge_replay(limit: int = 50) -> dict[str, int]: return await replay_pending(get_store(), upstream_url=upstream, limit=limit, timeout=timeout) @app.api_route("/{path:path}", methods=["GET", "POST", "PATCH", "PUT", "DELETE"]) async def proxy(path: str, request: Request) -> Response: api_path = "/" + path body: Any = None if request.method in {"POST", "PATCH", "PUT"}: try: body = await request.json() except ValueError: body = None headers = {} if idempotency_key := request.headers.get("idempotency-key"): headers["Idempotency-Key"] = idempotency_key if request.headers.get("content-type"): headers["Content-Type"] = request.headers["content-type"] try: async with httpx.AsyncClient(base_url=upstream, timeout=timeout) as client: response = await client.request( request.method, api_path, params=request.query_params, json=body if body is not None else None, headers=headers, ) return Response( content=response.content, status_code=response.status_code, headers=_safe_response_headers(response.headers), media_type=response.headers.get("content-type"), ) except httpx.HTTPError as exc: route_class = route_class_for(request.method, api_path) if route_class is None or request.method not in {"POST", "PATCH"}: return JSONResponse( status_code=503, content={ "error": "upstream unreachable and route is not queueable", "method": request.method, "path": api_path, "upstream": upstream, "detail": str(exc), }, ) try: envelope = get_store().enqueue( method=request.method, path=api_path, body=body, idempotency_key=request.headers.get("idempotency-key"), source_agent=request.headers.get("x-statehub-source-agent"), source_host=request.headers.get("x-statehub-source-host") or socket.gethostname(), repo_slug=request.headers.get("x-statehub-repo-slug"), session_id=request.headers.get("x-statehub-session-id"), observed_revision=None, ) except PayloadRejected as reject: return JSONResponse(status_code=422, content={"error": str(reject)}) return JSONResponse(status_code=202, content=queued_receipt(envelope, str(exc))) return app app = create_app()