Files
state-hub/api/edge/outbox.py

359 lines
12 KiB
Python

from __future__ import annotations
import json
import os
import sqlite3
import stat
import uuid
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
from api.services.write_idempotency import route_class_for
DEFAULT_OUTBOX_PATH = Path(os.environ.get("STATEHUB_OUTBOX_PATH", "~/.statehub/edge-outbox.sqlite3")).expanduser()
MAX_PAYLOAD_BYTES = 64 * 1024
SECRET_FIELD_NAMES = {
"authorization",
"cookie",
"set-cookie",
"password",
"passwd",
"secret",
"api_key",
"apikey",
"access_token",
"refresh_token",
"bearer_token",
"client_secret",
"private_key",
"credential",
"credentials",
}
@dataclass(frozen=True)
class OutboxEnvelope:
id: str
idempotency_key: str
method: str
path: str
body: dict[str, Any] | list[Any] | None
route_class: str
source_agent: str | None
source_host: str | None
repo_slug: str | None
session_id: str | None
observed_revision: dict[str, Any] | None
status: str
attempt_count: int
next_retry_at: str | None
last_error: str | None
response_status: int | None
response_body: dict[str, Any] | list[Any] | str | None
created_at: str
updated_at: str
acked_at: str | None
class PayloadRejected(ValueError):
pass
def utcnow() -> str:
return datetime.now(tz=timezone.utc).isoformat()
def default_outbox_path() -> Path:
return DEFAULT_OUTBOX_PATH
def scrub_payload(value: Any) -> Any:
if isinstance(value, dict):
scrubbed: dict[str, Any] = {}
for key, item in value.items():
normalized = str(key).lower().replace("-", "_")
if normalized in SECRET_FIELD_NAMES:
scrubbed[key] = "[redacted]"
else:
scrubbed[key] = scrub_payload(item)
return scrubbed
if isinstance(value, list):
return [scrub_payload(item) for item in value]
return value
def _json_loads(raw: str | None) -> Any:
if raw is None:
return None
return json.loads(raw)
def _json_dumps(value: Any) -> str | None:
if value is None:
return None
return json.dumps(value, sort_keys=True, separators=(",", ":"))
def _parse_dt(value: str | None) -> datetime | None:
if not value:
return None
return datetime.fromisoformat(value)
class OutboxStore:
def __init__(self, path: str | Path | None = None) -> None:
self.path = Path(path).expanduser() if path is not None else default_outbox_path()
self.path.parent.mkdir(parents=True, exist_ok=True)
self._init_db()
self._chmod_private()
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.path)
conn.row_factory = sqlite3.Row
return conn
def _init_db(self) -> None:
with self._connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS outbox_envelopes (
id TEXT PRIMARY KEY,
idempotency_key TEXT NOT NULL UNIQUE,
method TEXT NOT NULL,
path TEXT NOT NULL,
body_json TEXT,
route_class TEXT NOT NULL,
source_agent TEXT,
source_host TEXT,
repo_slug TEXT,
session_id TEXT,
observed_revision_json TEXT,
status TEXT NOT NULL,
attempt_count INTEGER NOT NULL DEFAULT 0,
next_retry_at TEXT,
last_error TEXT,
response_status INTEGER,
response_body_json TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
acked_at TEXT
)
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS ix_outbox_status ON outbox_envelopes(status)")
conn.execute("CREATE INDEX IF NOT EXISTS ix_outbox_next_retry ON outbox_envelopes(next_retry_at)")
conn.commit()
def _chmod_private(self) -> None:
try:
os.chmod(self.path, stat.S_IRUSR | stat.S_IWUSR)
except OSError:
pass
def enqueue(
self,
*,
method: str,
path: str,
body: Any,
idempotency_key: str | None = None,
source_agent: str | None = None,
source_host: str | None = None,
repo_slug: str | None = None,
session_id: str | None = None,
observed_revision: dict[str, Any] | None = None,
) -> OutboxEnvelope:
route_class = route_class_for(method, path)
if route_class is None:
raise PayloadRejected(f"{method.upper()} {path} is not queueable")
scrubbed = scrub_payload(body)
encoded = _json_dumps(scrubbed)
if encoded is not None and len(encoded.encode("utf-8")) > MAX_PAYLOAD_BYTES:
raise PayloadRejected("payload exceeds offline outbox size limit")
now = utcnow()
envelope_id = str(uuid.uuid4())
key = idempotency_key or f"statehub-edge:{envelope_id}"
method_upper = method.upper()
with self._connect() as conn:
if route_class == "replace":
conn.execute(
"""
UPDATE outbox_envelopes
SET status = 'cancelled', updated_at = ?, last_error = ?
WHERE status = 'queued'
AND route_class = 'replace'
AND method = ?
AND path = ?
""",
(now, f"superseded by {envelope_id}", method_upper, path),
)
conn.execute(
"""
INSERT INTO outbox_envelopes (
id, idempotency_key, method, path, body_json, route_class,
source_agent, source_host, repo_slug, session_id,
observed_revision_json, status, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'queued', ?, ?)
""",
(
envelope_id,
key,
method_upper,
path,
encoded,
route_class,
source_agent,
source_host,
repo_slug,
session_id,
_json_dumps(observed_revision),
now,
now,
),
)
conn.commit()
return self.get(envelope_id)
def get(self, envelope_id: str) -> OutboxEnvelope:
with self._connect() as conn:
row = conn.execute("SELECT * FROM outbox_envelopes WHERE id = ?", (envelope_id,)).fetchone()
if row is None:
raise KeyError(envelope_id)
return self._row_to_envelope(row)
def list(self, *, status: str | None = None, limit: int = 100) -> list[OutboxEnvelope]:
with self._connect() as conn:
if status:
rows = conn.execute(
"SELECT * FROM outbox_envelopes WHERE status = ? ORDER BY created_at LIMIT ?",
(status, limit),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM outbox_envelopes ORDER BY created_at LIMIT ?",
(limit,),
).fetchall()
return [self._row_to_envelope(row) for row in rows]
def due(self, *, limit: int = 50) -> list[OutboxEnvelope]:
now = utcnow()
with self._connect() as conn:
rows = conn.execute(
"""
SELECT * FROM outbox_envelopes
WHERE status = 'queued' AND (next_retry_at IS NULL OR next_retry_at <= ?)
ORDER BY created_at
LIMIT ?
""",
(now, limit),
).fetchall()
return [self._row_to_envelope(row) for row in rows]
def summary(self) -> dict[str, Any]:
with self._connect() as conn:
rows = conn.execute(
"SELECT status, COUNT(*) AS count, MIN(created_at) AS oldest FROM outbox_envelopes GROUP BY status"
).fetchall()
by_status = {row["status"]: row["count"] for row in rows}
oldest_pending = None
for row in rows:
if row["status"] in {"queued", "sending", "conflict"} and row["oldest"]:
oldest_pending = min(filter(None, [oldest_pending, row["oldest"]])) if oldest_pending else row["oldest"]
return {
"path": str(self.path),
"by_status": by_status,
"pending_count": sum(by_status.get(status, 0) for status in ("queued", "sending")),
"conflict_count": by_status.get("conflict", 0),
"oldest_pending_at": oldest_pending,
}
def mark_sending(self, envelope_id: str) -> None:
self._update(envelope_id, status="sending", updated_at=utcnow())
def mark_acked(self, envelope_id: str, *, response_status: int, response_body: Any) -> None:
now = utcnow()
self._update(
envelope_id,
status="acked",
response_status=response_status,
response_body_json=_json_dumps(response_body),
updated_at=now,
acked_at=now,
last_error=None,
next_retry_at=None,
)
def mark_conflict(self, envelope_id: str, *, response_status: int, response_body: Any) -> None:
self._update(
envelope_id,
status="conflict",
response_status=response_status,
response_body_json=_json_dumps(response_body),
updated_at=utcnow(),
last_error="conflict",
)
def mark_dead(self, envelope_id: str, *, error: str, response_status: int | None = None, response_body: Any = None) -> None:
self._update(
envelope_id,
status="dead",
response_status=response_status,
response_body_json=_json_dumps(response_body),
updated_at=utcnow(),
last_error=error,
)
def mark_retry(self, envelope_id: str, *, error: str, attempt_count: int) -> None:
delay_seconds = min(3600, 2 ** min(attempt_count, 10))
next_retry = datetime.now(tz=timezone.utc) + timedelta(seconds=delay_seconds)
self._update(
envelope_id,
status="queued",
attempt_count=attempt_count,
next_retry_at=next_retry.isoformat(),
updated_at=utcnow(),
last_error=error[:500],
)
def retry(self, envelope_id: str) -> None:
self._update(envelope_id, status="queued", next_retry_at=None, updated_at=utcnow())
def cancel(self, envelope_id: str) -> None:
self._update(envelope_id, status="cancelled", updated_at=utcnow())
def export(self, *, status: str | None = None, limit: int = 1000) -> list[dict[str, Any]]:
return [envelope.__dict__ for envelope in self.list(status=status, limit=limit)]
def _update(self, envelope_id: str, **values: Any) -> None:
assignments = ", ".join(f"{key} = ?" for key in values)
params = list(values.values()) + [envelope_id]
with self._connect() as conn:
conn.execute(f"UPDATE outbox_envelopes SET {assignments} WHERE id = ?", params)
conn.commit()
def _row_to_envelope(self, row: sqlite3.Row) -> OutboxEnvelope:
return OutboxEnvelope(
id=row["id"],
idempotency_key=row["idempotency_key"],
method=row["method"],
path=row["path"],
body=_json_loads(row["body_json"]),
route_class=row["route_class"],
source_agent=row["source_agent"],
source_host=row["source_host"],
repo_slug=row["repo_slug"],
session_id=row["session_id"],
observed_revision=_json_loads(row["observed_revision_json"]),
status=row["status"],
attempt_count=row["attempt_count"],
next_retry_at=row["next_retry_at"],
last_error=row["last_error"],
response_status=row["response_status"],
response_body=_json_loads(row["response_body_json"]),
created_at=row["created_at"],
updated_at=row["updated_at"],
acked_at=row["acked_at"],
)