from __future__ import annotations import json import os import sqlite3 import stat import uuid from dataclasses import dataclass from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any from api.services.write_idempotency import route_class_for DEFAULT_OUTBOX_PATH = Path(os.environ.get("STATEHUB_OUTBOX_PATH", "~/.statehub/edge-outbox.sqlite3")).expanduser() MAX_PAYLOAD_BYTES = 64 * 1024 SECRET_FIELD_NAMES = { "authorization", "cookie", "set-cookie", "password", "passwd", "secret", "api_key", "apikey", "access_token", "refresh_token", "bearer_token", "client_secret", "private_key", "credential", "credentials", } @dataclass(frozen=True) class OutboxEnvelope: id: str idempotency_key: str method: str path: str body: dict[str, Any] | list[Any] | None route_class: str source_agent: str | None source_host: str | None repo_slug: str | None session_id: str | None observed_revision: dict[str, Any] | None status: str attempt_count: int next_retry_at: str | None last_error: str | None response_status: int | None response_body: dict[str, Any] | list[Any] | str | None created_at: str updated_at: str acked_at: str | None class PayloadRejected(ValueError): pass def utcnow() -> str: return datetime.now(tz=timezone.utc).isoformat() def default_outbox_path() -> Path: return DEFAULT_OUTBOX_PATH def scrub_payload(value: Any) -> Any: if isinstance(value, dict): scrubbed: dict[str, Any] = {} for key, item in value.items(): normalized = str(key).lower().replace("-", "_") if normalized in SECRET_FIELD_NAMES: scrubbed[key] = "[redacted]" else: scrubbed[key] = scrub_payload(item) return scrubbed if isinstance(value, list): return [scrub_payload(item) for item in value] return value def _json_loads(raw: str | None) -> Any: if raw is None: return None return json.loads(raw) def _json_dumps(value: Any) -> str | None: if value is None: return None return json.dumps(value, sort_keys=True, separators=(",", ":")) def _parse_dt(value: str | None) -> datetime | None: if not value: return None return datetime.fromisoformat(value) class OutboxStore: def __init__(self, path: str | Path | None = None) -> None: self.path = Path(path).expanduser() if path is not None else default_outbox_path() self.path.parent.mkdir(parents=True, exist_ok=True) self._init_db() self._chmod_private() def _connect(self) -> sqlite3.Connection: conn = sqlite3.connect(self.path) conn.row_factory = sqlite3.Row return conn def _init_db(self) -> None: with self._connect() as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS outbox_envelopes ( id TEXT PRIMARY KEY, idempotency_key TEXT NOT NULL UNIQUE, method TEXT NOT NULL, path TEXT NOT NULL, body_json TEXT, route_class TEXT NOT NULL, source_agent TEXT, source_host TEXT, repo_slug TEXT, session_id TEXT, observed_revision_json TEXT, status TEXT NOT NULL, attempt_count INTEGER NOT NULL DEFAULT 0, next_retry_at TEXT, last_error TEXT, response_status INTEGER, response_body_json TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, acked_at TEXT ) """ ) conn.execute("CREATE INDEX IF NOT EXISTS ix_outbox_status ON outbox_envelopes(status)") conn.execute("CREATE INDEX IF NOT EXISTS ix_outbox_next_retry ON outbox_envelopes(next_retry_at)") conn.commit() def _chmod_private(self) -> None: try: os.chmod(self.path, stat.S_IRUSR | stat.S_IWUSR) except OSError: pass def enqueue( self, *, method: str, path: str, body: Any, idempotency_key: str | None = None, source_agent: str | None = None, source_host: str | None = None, repo_slug: str | None = None, session_id: str | None = None, observed_revision: dict[str, Any] | None = None, ) -> OutboxEnvelope: route_class = route_class_for(method, path) if route_class is None: raise PayloadRejected(f"{method.upper()} {path} is not queueable") scrubbed = scrub_payload(body) encoded = _json_dumps(scrubbed) if encoded is not None and len(encoded.encode("utf-8")) > MAX_PAYLOAD_BYTES: raise PayloadRejected("payload exceeds offline outbox size limit") now = utcnow() envelope_id = str(uuid.uuid4()) key = idempotency_key or f"statehub-edge:{envelope_id}" method_upper = method.upper() with self._connect() as conn: if route_class == "replace": conn.execute( """ UPDATE outbox_envelopes SET status = 'cancelled', updated_at = ?, last_error = ? WHERE status = 'queued' AND route_class = 'replace' AND method = ? AND path = ? """, (now, f"superseded by {envelope_id}", method_upper, path), ) conn.execute( """ INSERT INTO outbox_envelopes ( id, idempotency_key, method, path, body_json, route_class, source_agent, source_host, repo_slug, session_id, observed_revision_json, status, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'queued', ?, ?) """, ( envelope_id, key, method_upper, path, encoded, route_class, source_agent, source_host, repo_slug, session_id, _json_dumps(observed_revision), now, now, ), ) conn.commit() return self.get(envelope_id) def get(self, envelope_id: str) -> OutboxEnvelope: with self._connect() as conn: row = conn.execute("SELECT * FROM outbox_envelopes WHERE id = ?", (envelope_id,)).fetchone() if row is None: raise KeyError(envelope_id) return self._row_to_envelope(row) def list(self, *, status: str | None = None, limit: int = 100) -> list[OutboxEnvelope]: with self._connect() as conn: if status: rows = conn.execute( "SELECT * FROM outbox_envelopes WHERE status = ? ORDER BY created_at LIMIT ?", (status, limit), ).fetchall() else: rows = conn.execute( "SELECT * FROM outbox_envelopes ORDER BY created_at LIMIT ?", (limit,), ).fetchall() return [self._row_to_envelope(row) for row in rows] def due(self, *, limit: int = 50) -> list[OutboxEnvelope]: now = utcnow() with self._connect() as conn: rows = conn.execute( """ SELECT * FROM outbox_envelopes WHERE status = 'queued' AND (next_retry_at IS NULL OR next_retry_at <= ?) ORDER BY created_at LIMIT ? """, (now, limit), ).fetchall() return [self._row_to_envelope(row) for row in rows] def summary(self) -> dict[str, Any]: with self._connect() as conn: rows = conn.execute( "SELECT status, COUNT(*) AS count, MIN(created_at) AS oldest FROM outbox_envelopes GROUP BY status" ).fetchall() by_status = {row["status"]: row["count"] for row in rows} oldest_pending = None for row in rows: if row["status"] in {"queued", "sending", "conflict"} and row["oldest"]: oldest_pending = min(filter(None, [oldest_pending, row["oldest"]])) if oldest_pending else row["oldest"] return { "path": str(self.path), "by_status": by_status, "pending_count": sum(by_status.get(status, 0) for status in ("queued", "sending")), "conflict_count": by_status.get("conflict", 0), "oldest_pending_at": oldest_pending, } def mark_sending(self, envelope_id: str) -> None: self._update(envelope_id, status="sending", updated_at=utcnow()) def mark_acked(self, envelope_id: str, *, response_status: int, response_body: Any) -> None: now = utcnow() self._update( envelope_id, status="acked", response_status=response_status, response_body_json=_json_dumps(response_body), updated_at=now, acked_at=now, last_error=None, next_retry_at=None, ) def mark_conflict(self, envelope_id: str, *, response_status: int, response_body: Any) -> None: self._update( envelope_id, status="conflict", response_status=response_status, response_body_json=_json_dumps(response_body), updated_at=utcnow(), last_error="conflict", ) def mark_dead(self, envelope_id: str, *, error: str, response_status: int | None = None, response_body: Any = None) -> None: self._update( envelope_id, status="dead", response_status=response_status, response_body_json=_json_dumps(response_body), updated_at=utcnow(), last_error=error, ) def mark_retry(self, envelope_id: str, *, error: str, attempt_count: int) -> None: delay_seconds = min(3600, 2 ** min(attempt_count, 10)) next_retry = datetime.now(tz=timezone.utc) + timedelta(seconds=delay_seconds) self._update( envelope_id, status="queued", attempt_count=attempt_count, next_retry_at=next_retry.isoformat(), updated_at=utcnow(), last_error=error[:500], ) def retry(self, envelope_id: str) -> None: self._update(envelope_id, status="queued", next_retry_at=None, updated_at=utcnow()) def cancel(self, envelope_id: str) -> None: self._update(envelope_id, status="cancelled", updated_at=utcnow()) def export(self, *, status: str | None = None, limit: int = 1000) -> list[dict[str, Any]]: return [envelope.__dict__ for envelope in self.list(status=status, limit=limit)] def _update(self, envelope_id: str, **values: Any) -> None: assignments = ", ".join(f"{key} = ?" for key in values) params = list(values.values()) + [envelope_id] with self._connect() as conn: conn.execute(f"UPDATE outbox_envelopes SET {assignments} WHERE id = ?", params) conn.commit() def _row_to_envelope(self, row: sqlite3.Row) -> OutboxEnvelope: return OutboxEnvelope( id=row["id"], idempotency_key=row["idempotency_key"], method=row["method"], path=row["path"], body=_json_loads(row["body_json"]), route_class=row["route_class"], source_agent=row["source_agent"], source_host=row["source_host"], repo_slug=row["repo_slug"], session_id=row["session_id"], observed_revision=_json_loads(row["observed_revision_json"]), status=row["status"], attempt_count=row["attempt_count"], next_retry_at=row["next_retry_at"], last_error=row["last_error"], response_status=row["response_status"], response_body=_json_loads(row["response_body_json"]), created_at=row["created_at"], updated_at=row["updated_at"], acked_at=row["acked_at"], )