diff --git a/api/edge/__init__.py b/api/edge/__init__.py new file mode 100644 index 0000000..c7b5b01 --- /dev/null +++ b/api/edge/__init__.py @@ -0,0 +1 @@ +"""State Hub edge relay and durable outbox helpers.""" diff --git a/api/edge/outbox.py b/api/edge/outbox.py new file mode 100644 index 0000000..a244f90 --- /dev/null +++ b/api/edge/outbox.py @@ -0,0 +1,358 @@ +from __future__ import annotations + +import json +import os +import sqlite3 +import stat +import uuid +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any + +from api.services.write_idempotency import route_class_for + +DEFAULT_OUTBOX_PATH = Path(os.environ.get("STATEHUB_OUTBOX_PATH", "~/.statehub/edge-outbox.sqlite3")).expanduser() +MAX_PAYLOAD_BYTES = 64 * 1024 +SECRET_FIELD_NAMES = { + "authorization", + "cookie", + "set-cookie", + "password", + "passwd", + "secret", + "api_key", + "apikey", + "access_token", + "refresh_token", + "bearer_token", + "client_secret", + "private_key", + "credential", + "credentials", +} + + +@dataclass(frozen=True) +class OutboxEnvelope: + id: str + idempotency_key: str + method: str + path: str + body: dict[str, Any] | list[Any] | None + route_class: str + source_agent: str | None + source_host: str | None + repo_slug: str | None + session_id: str | None + observed_revision: dict[str, Any] | None + status: str + attempt_count: int + next_retry_at: str | None + last_error: str | None + response_status: int | None + response_body: dict[str, Any] | list[Any] | str | None + created_at: str + updated_at: str + acked_at: str | None + + +class PayloadRejected(ValueError): + pass + + +def utcnow() -> str: + return datetime.now(tz=timezone.utc).isoformat() + + +def default_outbox_path() -> Path: + return DEFAULT_OUTBOX_PATH + + +def scrub_payload(value: Any) -> Any: + if isinstance(value, dict): + scrubbed: dict[str, Any] = {} + for key, item in value.items(): + normalized = str(key).lower().replace("-", "_") + if normalized in SECRET_FIELD_NAMES: + scrubbed[key] = "[redacted]" + else: + scrubbed[key] = scrub_payload(item) + return scrubbed + if isinstance(value, list): + return [scrub_payload(item) for item in value] + return value + + +def _json_loads(raw: str | None) -> Any: + if raw is None: + return None + return json.loads(raw) + + +def _json_dumps(value: Any) -> str | None: + if value is None: + return None + return json.dumps(value, sort_keys=True, separators=(",", ":")) + + +def _parse_dt(value: str | None) -> datetime | None: + if not value: + return None + return datetime.fromisoformat(value) + + +class OutboxStore: + def __init__(self, path: str | Path | None = None) -> None: + self.path = Path(path).expanduser() if path is not None else default_outbox_path() + self.path.parent.mkdir(parents=True, exist_ok=True) + self._init_db() + self._chmod_private() + + def _connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(self.path) + conn.row_factory = sqlite3.Row + return conn + + def _init_db(self) -> None: + with self._connect() as conn: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS outbox_envelopes ( + id TEXT PRIMARY KEY, + idempotency_key TEXT NOT NULL UNIQUE, + method TEXT NOT NULL, + path TEXT NOT NULL, + body_json TEXT, + route_class TEXT NOT NULL, + source_agent TEXT, + source_host TEXT, + repo_slug TEXT, + session_id TEXT, + observed_revision_json TEXT, + status TEXT NOT NULL, + attempt_count INTEGER NOT NULL DEFAULT 0, + next_retry_at TEXT, + last_error TEXT, + response_status INTEGER, + response_body_json TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + acked_at TEXT + ) + """ + ) + conn.execute("CREATE INDEX IF NOT EXISTS ix_outbox_status ON outbox_envelopes(status)") + conn.execute("CREATE INDEX IF NOT EXISTS ix_outbox_next_retry ON outbox_envelopes(next_retry_at)") + conn.commit() + + def _chmod_private(self) -> None: + try: + os.chmod(self.path, stat.S_IRUSR | stat.S_IWUSR) + except OSError: + pass + + def enqueue( + self, + *, + method: str, + path: str, + body: Any, + idempotency_key: str | None = None, + source_agent: str | None = None, + source_host: str | None = None, + repo_slug: str | None = None, + session_id: str | None = None, + observed_revision: dict[str, Any] | None = None, + ) -> OutboxEnvelope: + route_class = route_class_for(method, path) + if route_class is None: + raise PayloadRejected(f"{method.upper()} {path} is not queueable") + scrubbed = scrub_payload(body) + encoded = _json_dumps(scrubbed) + if encoded is not None and len(encoded.encode("utf-8")) > MAX_PAYLOAD_BYTES: + raise PayloadRejected("payload exceeds offline outbox size limit") + now = utcnow() + envelope_id = str(uuid.uuid4()) + key = idempotency_key or f"statehub-edge:{envelope_id}" + method_upper = method.upper() + with self._connect() as conn: + if route_class == "replace": + conn.execute( + """ + UPDATE outbox_envelopes + SET status = 'cancelled', updated_at = ?, last_error = ? + WHERE status = 'queued' + AND route_class = 'replace' + AND method = ? + AND path = ? + """, + (now, f"superseded by {envelope_id}", method_upper, path), + ) + conn.execute( + """ + INSERT INTO outbox_envelopes ( + id, idempotency_key, method, path, body_json, route_class, + source_agent, source_host, repo_slug, session_id, + observed_revision_json, status, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'queued', ?, ?) + """, + ( + envelope_id, + key, + method_upper, + path, + encoded, + route_class, + source_agent, + source_host, + repo_slug, + session_id, + _json_dumps(observed_revision), + now, + now, + ), + ) + conn.commit() + return self.get(envelope_id) + + def get(self, envelope_id: str) -> OutboxEnvelope: + with self._connect() as conn: + row = conn.execute("SELECT * FROM outbox_envelopes WHERE id = ?", (envelope_id,)).fetchone() + if row is None: + raise KeyError(envelope_id) + return self._row_to_envelope(row) + + def list(self, *, status: str | None = None, limit: int = 100) -> list[OutboxEnvelope]: + with self._connect() as conn: + if status: + rows = conn.execute( + "SELECT * FROM outbox_envelopes WHERE status = ? ORDER BY created_at LIMIT ?", + (status, limit), + ).fetchall() + else: + rows = conn.execute( + "SELECT * FROM outbox_envelopes ORDER BY created_at LIMIT ?", + (limit,), + ).fetchall() + return [self._row_to_envelope(row) for row in rows] + + def due(self, *, limit: int = 50) -> list[OutboxEnvelope]: + now = utcnow() + with self._connect() as conn: + rows = conn.execute( + """ + SELECT * FROM outbox_envelopes + WHERE status = 'queued' AND (next_retry_at IS NULL OR next_retry_at <= ?) + ORDER BY created_at + LIMIT ? + """, + (now, limit), + ).fetchall() + return [self._row_to_envelope(row) for row in rows] + + def summary(self) -> dict[str, Any]: + with self._connect() as conn: + rows = conn.execute( + "SELECT status, COUNT(*) AS count, MIN(created_at) AS oldest FROM outbox_envelopes GROUP BY status" + ).fetchall() + by_status = {row["status"]: row["count"] for row in rows} + oldest_pending = None + for row in rows: + if row["status"] in {"queued", "sending", "conflict"} and row["oldest"]: + oldest_pending = min(filter(None, [oldest_pending, row["oldest"]])) if oldest_pending else row["oldest"] + return { + "path": str(self.path), + "by_status": by_status, + "pending_count": sum(by_status.get(status, 0) for status in ("queued", "sending")), + "conflict_count": by_status.get("conflict", 0), + "oldest_pending_at": oldest_pending, + } + + def mark_sending(self, envelope_id: str) -> None: + self._update(envelope_id, status="sending", updated_at=utcnow()) + + def mark_acked(self, envelope_id: str, *, response_status: int, response_body: Any) -> None: + now = utcnow() + self._update( + envelope_id, + status="acked", + response_status=response_status, + response_body_json=_json_dumps(response_body), + updated_at=now, + acked_at=now, + last_error=None, + next_retry_at=None, + ) + + def mark_conflict(self, envelope_id: str, *, response_status: int, response_body: Any) -> None: + self._update( + envelope_id, + status="conflict", + response_status=response_status, + response_body_json=_json_dumps(response_body), + updated_at=utcnow(), + last_error="conflict", + ) + + def mark_dead(self, envelope_id: str, *, error: str, response_status: int | None = None, response_body: Any = None) -> None: + self._update( + envelope_id, + status="dead", + response_status=response_status, + response_body_json=_json_dumps(response_body), + updated_at=utcnow(), + last_error=error, + ) + + def mark_retry(self, envelope_id: str, *, error: str, attempt_count: int) -> None: + delay_seconds = min(3600, 2 ** min(attempt_count, 10)) + next_retry = datetime.now(tz=timezone.utc) + timedelta(seconds=delay_seconds) + self._update( + envelope_id, + status="queued", + attempt_count=attempt_count, + next_retry_at=next_retry.isoformat(), + updated_at=utcnow(), + last_error=error[:500], + ) + + def retry(self, envelope_id: str) -> None: + self._update(envelope_id, status="queued", next_retry_at=None, updated_at=utcnow()) + + def cancel(self, envelope_id: str) -> None: + self._update(envelope_id, status="cancelled", updated_at=utcnow()) + + def export(self, *, status: str | None = None, limit: int = 1000) -> list[dict[str, Any]]: + return [envelope.__dict__ for envelope in self.list(status=status, limit=limit)] + + def _update(self, envelope_id: str, **values: Any) -> None: + assignments = ", ".join(f"{key} = ?" for key in values) + params = list(values.values()) + [envelope_id] + with self._connect() as conn: + conn.execute(f"UPDATE outbox_envelopes SET {assignments} WHERE id = ?", params) + conn.commit() + + def _row_to_envelope(self, row: sqlite3.Row) -> OutboxEnvelope: + return OutboxEnvelope( + id=row["id"], + idempotency_key=row["idempotency_key"], + method=row["method"], + path=row["path"], + body=_json_loads(row["body_json"]), + route_class=row["route_class"], + source_agent=row["source_agent"], + source_host=row["source_host"], + repo_slug=row["repo_slug"], + session_id=row["session_id"], + observed_revision=_json_loads(row["observed_revision_json"]), + status=row["status"], + attempt_count=row["attempt_count"], + next_retry_at=row["next_retry_at"], + last_error=row["last_error"], + response_status=row["response_status"], + response_body=_json_loads(row["response_body_json"]), + created_at=row["created_at"], + updated_at=row["updated_at"], + acked_at=row["acked_at"], + ) diff --git a/api/edge/relay.py b/api/edge/relay.py new file mode 100644 index 0000000..9277652 --- /dev/null +++ b/api/edge/relay.py @@ -0,0 +1,206 @@ +from __future__ import annotations + +import os +import socket +from typing import Any + +import httpx +from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse, Response + +from api.edge.outbox import OutboxEnvelope, OutboxStore, PayloadRejected, default_outbox_path +from api.services.write_idempotency import route_class_for + +HOP_BY_HOP_HEADERS = { + "connection", + "keep-alive", + "proxy-authenticate", + "proxy-authorization", + "te", + "trailer", + "transfer-encoding", + "upgrade", + "content-encoding", + "content-length", +} + + +def _safe_response_headers(headers: httpx.Headers) -> dict[str, str]: + return {key: value for key, value in headers.items() if key.lower() not in HOP_BY_HOP_HEADERS} + + +def _body_summary(response: httpx.Response) -> Any: + try: + return response.json() + except ValueError: + return {"text": response.text[:500]} + + +def queued_receipt(envelope: OutboxEnvelope, upstream_error: str) -> dict[str, Any]: + return { + "queued": True, + "outbox_id": envelope.id, + "idempotency_key": envelope.idempotency_key, + "upstream": "unreachable", + "upstream_error": upstream_error, + "route_class": envelope.route_class, + } + + +async def replay_pending( + store: OutboxStore, + *, + upstream_url: str, + limit: int = 50, + timeout: float = 10.0, +) -> dict[str, int]: + counts = {"sent": 0, "acked": 0, "conflict": 0, "retry": 0, "dead": 0} + async with httpx.AsyncClient(base_url=upstream_url.rstrip("/"), timeout=timeout) as client: + for envelope in store.due(limit=limit): + counts["sent"] += 1 + store.mark_sending(envelope.id) + try: + response = await client.request( + envelope.method, + envelope.path, + json=envelope.body, + headers={ + "Idempotency-Key": envelope.idempotency_key, + "X-StateHub-Source-Agent": envelope.source_agent or "statehub-edge", + "X-StateHub-Source-Host": envelope.source_host or socket.gethostname(), + }, + ) + except httpx.HTTPError as exc: + counts["retry"] += 1 + store.mark_retry(envelope.id, error=str(exc), attempt_count=envelope.attempt_count + 1) + continue + + response_body = _body_summary(response) + if response.status_code == 409: + counts["conflict"] += 1 + store.mark_conflict(envelope.id, response_status=response.status_code, response_body=response_body) + elif 200 <= response.status_code < 300: + counts["acked"] += 1 + store.mark_acked(envelope.id, response_status=response.status_code, response_body=response_body) + elif response.status_code >= 500: + counts["retry"] += 1 + store.mark_retry( + envelope.id, + error=f"HTTP {response.status_code}: {response.text[:300]}", + attempt_count=envelope.attempt_count + 1, + ) + else: + counts["dead"] += 1 + store.mark_dead( + envelope.id, + error=f"HTTP {response.status_code}: not retryable", + response_status=response.status_code, + response_body=response_body, + ) + return counts + + +def create_app( + *, + upstream_url: str | None = None, + outbox_path: str | None = None, + timeout: float = 10.0, +) -> FastAPI: + upstream = (upstream_url or os.environ.get("STATEHUB_UPSTREAM_URL") or os.environ.get("API_BASE") or "http://127.0.0.1:8000").rstrip("/") + store_path = outbox_path or default_outbox_path() + store_instance: OutboxStore | None = None + + def get_store() -> OutboxStore: + nonlocal store_instance + if store_instance is None: + store_instance = OutboxStore(store_path) + return store_instance + + app = FastAPI(title="State Hub Edge Relay", version="0.1.0") + + @app.get("/edge/health") + async def edge_health() -> dict[str, Any]: + reachable = False + error = None + try: + async with httpx.AsyncClient(base_url=upstream, timeout=2.0) as client: + response = await client.get("/state/health") + reachable = response.status_code < 500 + except httpx.HTTPError as exc: + error = str(exc) + return { + "status": "ok", + "upstream": upstream, + "upstream_reachable": reachable, + "upstream_error": error, + "outbox": get_store().summary(), + } + + @app.post("/edge/replay") + async def edge_replay(limit: int = 50) -> dict[str, int]: + return await replay_pending(get_store(), upstream_url=upstream, limit=limit, timeout=timeout) + + @app.api_route("/{path:path}", methods=["GET", "POST", "PATCH", "PUT", "DELETE"]) + async def proxy(path: str, request: Request) -> Response: + api_path = "/" + path + body: Any = None + if request.method in {"POST", "PATCH", "PUT"}: + try: + body = await request.json() + except ValueError: + body = None + + headers = {} + if idempotency_key := request.headers.get("idempotency-key"): + headers["Idempotency-Key"] = idempotency_key + if request.headers.get("content-type"): + headers["Content-Type"] = request.headers["content-type"] + + try: + async with httpx.AsyncClient(base_url=upstream, timeout=timeout) as client: + response = await client.request( + request.method, + api_path, + params=request.query_params, + json=body if body is not None else None, + headers=headers, + ) + return Response( + content=response.content, + status_code=response.status_code, + headers=_safe_response_headers(response.headers), + media_type=response.headers.get("content-type"), + ) + except httpx.HTTPError as exc: + route_class = route_class_for(request.method, api_path) + if route_class is None or request.method not in {"POST", "PATCH"}: + return JSONResponse( + status_code=503, + content={ + "error": "upstream unreachable and route is not queueable", + "method": request.method, + "path": api_path, + "upstream": upstream, + "detail": str(exc), + }, + ) + try: + envelope = get_store().enqueue( + method=request.method, + path=api_path, + body=body, + idempotency_key=request.headers.get("idempotency-key"), + source_agent=request.headers.get("x-statehub-source-agent"), + source_host=request.headers.get("x-statehub-source-host") or socket.gethostname(), + repo_slug=request.headers.get("x-statehub-repo-slug"), + session_id=request.headers.get("x-statehub-session-id"), + observed_revision=None, + ) + except PayloadRejected as reject: + return JSONResponse(status_code=422, content={"error": str(reject)}) + return JSONResponse(status_code=202, content=queued_receipt(envelope, str(exc))) + + return app + + +app = create_app() diff --git a/api/main.py b/api/main.py index 0699713..bc14b74 100644 --- a/api/main.py +++ b/api/main.py @@ -11,6 +11,7 @@ from starlette.responses import Response as StarletteResponse from api.database import engine from api.events import shutdown_publisher +from api.services.write_idempotency import WriteIdempotencyMiddleware from api.routers import decisions, extension_points, progress, state, tasks, technical_debt, topics, workstreams, workstream_dependencies from api.routers import domains, repos, contributions, sbom, policy, domain_goals, repo_goals, messages, capability_requests, tpsc, services from api.routers import token_events @@ -91,13 +92,14 @@ _default_dashboard_origins = [ _cors_env = os.getenv("CORS_ORIGINS", ",".join(_default_dashboard_origins)) _cors_origins = [o.strip() for o in _cors_env.split(",") if o.strip()] +app.add_middleware(WriteIdempotencyMiddleware) app.add_middleware(ETagMiddleware) app.add_middleware( CORSMiddleware, allow_origins=_cors_origins, allow_methods=["GET", "POST", "PATCH", "DELETE", "PUT"], - allow_headers=["Content-Type", "If-None-Match"], - expose_headers=["ETag", "X-StateHub-Elapsed-Ms", "X-StateHub-Response-Bytes", "X-StateHub-Cache"], + allow_headers=["Content-Type", "If-None-Match", "Idempotency-Key", "X-StateHub-Source-Agent", "X-StateHub-Source-Host"], + expose_headers=["ETag", "X-StateHub-Elapsed-Ms", "X-StateHub-Response-Bytes", "X-StateHub-Cache", "X-StateHub-Idempotency-Replay"], ) app.include_router(domains.router) diff --git a/api/models/__init__.py b/api/models/__init__.py index 1a1b1a6..f40b01b 100644 --- a/api/models/__init__.py +++ b/api/models/__init__.py @@ -33,6 +33,7 @@ from api.models.interface_change import InterfaceChange from api.models.workplan_launch_request import WorkplanLaunchRequest from api.models.fabric_graph import FabricGraphImport, FabricGraphNode, FabricGraphEdge from api.models.legacy_meter import LegacyInterface, LegacyInterfaceUsageBucket +from api.models.write_idempotency_key import WriteIdempotencyKey __all__ = [ "Base", @@ -65,4 +66,5 @@ __all__ = [ "WorkplanLaunchRequest", "FabricGraphImport", "FabricGraphNode", "FabricGraphEdge", "LegacyInterface", "LegacyInterfaceUsageBucket", + "WriteIdempotencyKey", ] \ No newline at end of file diff --git a/api/models/write_idempotency_key.py b/api/models/write_idempotency_key.py new file mode 100644 index 0000000..3ebc21f --- /dev/null +++ b/api/models/write_idempotency_key.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +import uuid +from datetime import datetime +from typing import Any + +from sqlalchemy import DateTime, Integer, String, Text, UniqueConstraint +from sqlalchemy.dialects.postgresql import JSONB, UUID +from sqlalchemy.orm import Mapped, mapped_column + +from api.models.base import Base, new_uuid + + +class WriteIdempotencyKey(Base): + __tablename__ = "write_idempotency_keys" + __table_args__ = ( + UniqueConstraint("key", name="uq_write_idempotency_keys_key"), + ) + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=new_uuid) + key: Mapped[str] = mapped_column(String(200), nullable=False, index=True) + method: Mapped[str] = mapped_column(String(10), nullable=False) + path: Mapped[str] = mapped_column(Text, nullable=False) + route_class: Mapped[str] = mapped_column(String(30), nullable=False) + request_hash: Mapped[str] = mapped_column(String(64), nullable=False) + response_status: Mapped[int] = mapped_column(Integer, nullable=False) + response_body: Mapped[Any] = mapped_column(JSONB, nullable=True) + source_host: Mapped[str | None] = mapped_column(String(200), nullable=True) + source_agent: Mapped[str | None] = mapped_column(String(100), nullable=True) + first_seen_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + last_seen_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + expires_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True, index=True) diff --git a/api/services/write_idempotency.py b/api/services/write_idempotency.py new file mode 100644 index 0000000..a7c06b5 --- /dev/null +++ b/api/services/write_idempotency.py @@ -0,0 +1,221 @@ +from __future__ import annotations + +import hashlib +import json +import re +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from typing import Any + +from sqlalchemy import select +from starlette.responses import JSONResponse +from starlette.types import ASGIApp, Message, Receive, Scope, Send + +from api.database import async_session_factory +from api.models.write_idempotency_key import WriteIdempotencyKey + +IDEMPOTENCY_HEADER = b"idempotency-key" +REPLAY_HEADER = "X-StateHub-Idempotency-Replay" +CONFLICT_STATUS = 409 +DEFAULT_IDEMPOTENCY_TTL_DAYS = 14 + + +@dataclass(frozen=True) +class WriteRouteRule: + method: str + pattern: str + route_class: str + description: str + + def matches(self, method: str, path: str) -> bool: + normalized = path.rstrip("/") or "/" + return self.method == method.upper() and re.fullmatch(self.pattern, normalized) is not None + + +WRITE_ROUTE_RULES: tuple[WriteRouteRule, ...] = ( + WriteRouteRule("POST", r"/progress", "append", "append progress event"), + WriteRouteRule("POST", r"/messages", "append", "send agent message"), + WriteRouteRule("PATCH", r"/messages/[^/]+/read", "append", "mark known message read"), + WriteRouteRule("POST", r"/token-events", "append", "record token event"), + WriteRouteRule("POST", r"/token-events/upsert", "append", "upsert token event"), + WriteRouteRule("POST", r"/decisions", "append", "record decision"), + WriteRouteRule("PATCH", r"/tasks/[^/]+", "replace", "update task"), + WriteRouteRule("POST", r"/tasks/bulk-status-sync", "replace", "bulk task status sync"), + WriteRouteRule("PATCH", r"/decisions/[^/]+", "replace", "update decision"), + WriteRouteRule("POST", r"/decisions/[^/]+/resolve", "replace", "resolve decision"), + WriteRouteRule("PATCH", r"/workplans/[^/]+", "replace", "update workplan"), + WriteRouteRule("PATCH", r"/workstreams/[^/]+", "replace", "update legacy workstream alias"), +) + + +def route_rule_for(method: str, path: str) -> WriteRouteRule | None: + for rule in WRITE_ROUTE_RULES: + if rule.matches(method, path): + return rule + return None + + +def route_class_for(method: str, path: str) -> str | None: + rule = route_rule_for(method, path) + return rule.route_class if rule else None + + +def canonical_request_hash(method: str, path: str, query_string: bytes, body: bytes) -> str: + try: + parsed: Any = json.loads(body.decode("utf-8")) if body else None + body_repr = json.dumps(parsed, sort_keys=True, separators=(",", ":")) + except (UnicodeDecodeError, json.JSONDecodeError): + body_repr = body.hex() + query = query_string.decode("utf-8", errors="replace") + seed = f"{method.upper()}\n{path}\n{query}\n{body_repr}".encode("utf-8") + return hashlib.sha256(seed).hexdigest() + + +def _header_value(headers: list[tuple[bytes, bytes]], name: bytes) -> str | None: + lname = name.lower() + for key, value in headers: + if key.lower() == lname: + return value.decode("utf-8", errors="replace") + return None + + +async def _send_json_response(response: JSONResponse, scope: Scope, receive: Receive, send: Send) -> None: + await response(scope, receive, send) + + +class WriteIdempotencyMiddleware: + """Replay exact duplicate write requests carrying Idempotency-Key. + + The middleware is intentionally narrow: it only participates on the offline + relay allowlist. Non-allowlisted routes keep their normal behavior even if a + caller sends an Idempotency-Key header. + """ + + def __init__(self, app: ASGIApp, *, ttl_days: int = DEFAULT_IDEMPOTENCY_TTL_DAYS) -> None: + self.app = app + self.ttl_days = ttl_days + + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + method = str(scope.get("method", "")).upper() + path = str(scope.get("path", "")) + rule = route_rule_for(method, path) + headers = list(scope.get("headers") or []) + key = _header_value(headers, IDEMPOTENCY_HEADER) + if rule is None or not key: + await self.app(scope, receive, send) + return + + body = await self._read_body(receive) + request_hash = canonical_request_hash(method, path, scope.get("query_string", b""), body) + source_host = _header_value(headers, b"x-statehub-source-host") + source_agent = _header_value(headers, b"x-statehub-source-agent") + + async with async_session_factory() as session: + existing = (await session.execute( + select(WriteIdempotencyKey).where(WriteIdempotencyKey.key == key) + )).scalar_one_or_none() + if existing is not None: + existing.last_seen_at = datetime.now(tz=timezone.utc) + await session.commit() + if existing.request_hash != request_hash: + await _send_json_response( + JSONResponse( + status_code=CONFLICT_STATUS, + content={ + "error": "Idempotency-Key was reused with a different request", + "idempotency_key": key, + }, + ), + scope, + self._receive_from_body(body), + send, + ) + return + await _send_json_response( + JSONResponse( + status_code=existing.response_status, + content=existing.response_body, + headers={REPLAY_HEADER: "true"}, + ), + scope, + self._receive_from_body(body), + send, + ) + return + + start_message: Message | None = None + body_parts: list[bytes] = [] + + async def capture_send(message: Message) -> None: + nonlocal start_message + if message["type"] == "http.response.start": + start_message = message + elif message["type"] == "http.response.body": + body_parts.append(message.get("body", b"")) + await send(message) + + await self.app(scope, self._receive_from_body(body), capture_send) + + if start_message is None: + return + status = int(start_message.get("status", 500)) + if status < 200 or status >= 300: + return + + response_body_bytes = b"".join(body_parts) + try: + response_body = json.loads(response_body_bytes.decode("utf-8")) if response_body_bytes else None + except (UnicodeDecodeError, json.JSONDecodeError): + return + + async with async_session_factory() as session: + existing = (await session.execute( + select(WriteIdempotencyKey).where(WriteIdempotencyKey.key == key) + )).scalar_one_or_none() + if existing is not None: + return + now = datetime.now(tz=timezone.utc) + session.add(WriteIdempotencyKey( + key=key, + method=method, + path=path, + route_class=rule.route_class, + request_hash=request_hash, + response_status=status, + response_body=response_body, + source_host=source_host, + source_agent=source_agent, + first_seen_at=now, + last_seen_at=now, + expires_at=now + timedelta(days=self.ttl_days), + )) + await session.commit() + + @staticmethod + async def _read_body(receive: Receive) -> bytes: + chunks: list[bytes] = [] + while True: + message = await receive() + if message["type"] != "http.request": + continue + chunks.append(message.get("body", b"")) + if not message.get("more_body", False): + break + return b"".join(chunks) + + @staticmethod + def _receive_from_body(body: bytes) -> Receive: + sent = False + + async def receive() -> Message: + nonlocal sent + if sent: + return {"type": "http.request", "body": b"", "more_body": False} + sent = True + return {"type": "http.request", "body": body, "more_body": False} + + return receive diff --git a/custodian_cli.py b/custodian_cli.py index 159ef35..c2cd650 100644 --- a/custodian_cli.py +++ b/custodian_cli.py @@ -465,6 +465,55 @@ def cmd_status(_args: argparse.Namespace) -> None: print(f" [{deadline}] {d['title']}") + +def _outbox_store(args): + from api.edge.outbox import OutboxStore, default_outbox_path + + return OutboxStore(args.outbox_path or default_outbox_path()) + + +def cmd_outbox_status(args: argparse.Namespace) -> None: + store = _outbox_store(args) + print(json.dumps(store.summary(), indent=2)) + + +def cmd_outbox_list(args: argparse.Namespace) -> None: + store = _outbox_store(args) + rows = store.export(status=args.status, limit=args.limit) + print(json.dumps(rows, indent=2)) + + +def cmd_outbox_export(args: argparse.Namespace) -> None: + store = _outbox_store(args) + payload = store.export(status=args.status, limit=args.limit) + if args.output: + Path(args.output).write_text(json.dumps(payload, indent=2) + "\n") + print(f"Exported {len(payload)} envelope(s) to {args.output}") + else: + print(json.dumps(payload, indent=2)) + + +def cmd_outbox_replay(args: argparse.Namespace) -> None: + import asyncio + from api.edge.relay import replay_pending + + store = _outbox_store(args) + upstream = args.upstream_url or os.environ.get("STATEHUB_UPSTREAM_URL") or API_BASE + result = asyncio.run(replay_pending(store, upstream_url=upstream, limit=args.limit)) + print(json.dumps(result, indent=2)) + + +def cmd_outbox_retry(args: argparse.Namespace) -> None: + store = _outbox_store(args) + store.retry(args.envelope_id) + print(f"Queued {args.envelope_id} for retry") + + +def cmd_outbox_cancel(args: argparse.Namespace) -> None: + store = _outbox_store(args) + store.cancel(args.envelope_id) + print(f"Cancelled {args.envelope_id}") + # ── Entry point ──────────────────────────────────────────────────────────────── def main() -> None: @@ -549,12 +598,47 @@ def main() -> None: ctask.add_argument("--assignee", default=None) ctask.add_argument("--description", default=None) + + # outbox + outbox = sub.add_parser("outbox", help="Inspect and replay the local State Hub edge outbox") + outbox.add_argument("--outbox-path", default=None, help="SQLite outbox path (defaults to ~/.statehub/edge-outbox.sqlite3)") + out_sub = outbox.add_subparsers(dest="outbox_command", required=True) + + out_status = out_sub.add_parser("status", help="Show pending, conflict, and ack counts") + out_status.set_defaults(func=cmd_outbox_status) + + out_list = out_sub.add_parser("list", help="List outbox envelopes as JSON") + out_list.add_argument("--status", default=None, help="Filter by status") + out_list.add_argument("--limit", type=int, default=100) + out_list.set_defaults(func=cmd_outbox_list) + + out_export = out_sub.add_parser("export", help="Export non-secret envelopes") + out_export.add_argument("--status", default=None, help="Filter by status") + out_export.add_argument("--limit", type=int, default=1000) + out_export.add_argument("--output", default=None, help="Write JSON to a file instead of stdout") + out_export.set_defaults(func=cmd_outbox_export) + + out_replay = out_sub.add_parser("replay", help="Replay due queued envelopes") + out_replay.add_argument("--upstream-url", default=None, help="Central State Hub API base URL") + out_replay.add_argument("--limit", type=int, default=50) + out_replay.set_defaults(func=cmd_outbox_replay) + + out_retry = out_sub.add_parser("retry", help="Force one envelope back to queued") + out_retry.add_argument("envelope_id") + out_retry.set_defaults(func=cmd_outbox_retry) + + out_cancel = out_sub.add_parser("cancel", help="Cancel one envelope") + out_cancel.add_argument("envelope_id") + out_cancel.set_defaults(func=cmd_outbox_cancel) + # status sub.add_parser("status", help="Show State Hub health and summary totals") args = parser.parse_args() - if args.command == "register": + if hasattr(args, "func"): + args.func(args) + elif args.command == "register": run_statehub_register(args) elif args.command == "register-project": cmd_register(args) diff --git a/docs/offline-write-buffer.md b/docs/offline-write-buffer.md new file mode 100644 index 0000000..97515d9 --- /dev/null +++ b/docs/offline-write-buffer.md @@ -0,0 +1,95 @@ +# State Hub Offline Write Buffer + +## Decision + +State Hub supports outage buffering through an edge relay with a durable local +outbox, plus central idempotency on replayed writes. + +The central service cannot buffer requests that never reach it. Agents should +therefore send writes to a local statehub-edge relay when buffering is enabled. +The relay forwards immediately while the upstream API is reachable. If the +upstream is offline, the relay persists queueable write envelopes in a local +SQLite outbox and returns an explicit queued receipt. + +Queued receipts are pending evidence, not successful central commits. Operators +must inspect and replay the outbox after recovery. + +## Defaults + +- Relay listen target: operator-selected, recommended 127.0.0.1:18080. +- Upstream API: STATEHUB_UPSTREAM_URL, then API_BASE, then + http://127.0.0.1:8000. +- Outbox path: STATEHUB_OUTBOX_PATH, default + ~/.statehub/edge-outbox.sqlite3. +- Central idempotency retention: 14 days. + +## Route Classes + +### Append-Only, Queueable + +| Method | Path | Notes | +| --- | --- | --- | +| POST | /progress/ | Session-close progress events. | +| POST | /messages/ | Agent coordination messages. | +| PATCH | /messages/{id}/read | Safe only when the message id is already known. | +| POST | /token-events/ | Token accounting events. | +| POST | /token-events/upsert | Source-id based token upsert. | +| POST | /decisions/ | Queue only when the caller does not need the generated id immediately. | + +Append-only writes replay with Idempotency-Key. Exact duplicate retries return +the original central response. Same key with a different request returns HTTP +409. + +### Replace-Style, Queueable With Conflict Checks + +| Method | Path | Notes | +| --- | --- | --- | +| PATCH | /tasks/{id} | Task status and metadata updates. | +| POST | /tasks/bulk-status-sync | Ordered batch; future coalescing may decompose by task. | +| PATCH | /decisions/{id} | Decision field update. | +| POST | /decisions/{id}/resolve | Decision resolution. | +| PATCH | /workplans/{id} | Workplan lifecycle/status updates. | +| PATCH | /workstreams/{id} | Legacy alias for workplan update. | + +In v1 the relay does not silently overwrite newer central state after a replay +conflict. A 409 response marks the envelope conflict and leaves it available for +operator review. + +### Online-Only In V1 + +The relay forwards these while the upstream is reachable and returns a clear +503 during outage: + +- DELETE endpoints. +- Repository sync/import/ingest endpoints. +- Consistency sweep mutation endpoints. +- Fabric graph exports and external pulls. +- Schema/bootstrap/admin operations. +- Requests with credentials, authorization tokens, attachments, or large opaque + payloads. + +## Non-Secret Outbox Contract + +The outbox stores method, path, scrubbed JSON body, route class, source metadata, +idempotency key, retry status, last error, and central response summaries. It +never stores authorization headers, bearer tokens, cookies, API keys, passwords, +or secret-looking JSON fields. Payloads over 64 KiB are rejected. + +## Operator Commands + + statehub outbox status + statehub outbox list --status queued + statehub outbox replay --upstream-url http://127.0.0.1:8000 + statehub outbox export --output /tmp/statehub-outbox.json + statehub outbox retry ENVELOPE_ID + statehub outbox cancel ENVELOPE_ID + +## Recovery Checklist + +1. Confirm the central State Hub API is reachable. +2. Run statehub outbox status on each host that may have queued writes. +3. Run statehub outbox replay until no due queued envelopes remain. +4. Review conflict envelopes manually. +5. Run make fix-consistency REPO=state-hub so file-backed workplan/task state + remains canonical after replay. +6. Record a progress note with non-secret replay counts. diff --git a/mcp_server/TOOLS.md b/mcp_server/TOOLS.md index 9b61f13..632c5c9 100644 --- a/mcp_server/TOOLS.md +++ b/mcp_server/TOOLS.md @@ -82,6 +82,13 @@ succeeds but its automatic progress event fails, the tool returns an error with the successful `write_result` included so the caller can avoid duplicating the entity while recording the missing progress event. +When API_BASE points at the optional State Hub edge relay and the central API is +unreachable, queueable write tools may return a queued receipt instead of the +normal REST shape. The receipt means the local outbox accepted the write; it is +not yet a central commit. Automatic progress-event side effects are skipped for +queued primary writes so replay does not duplicate records. Operators can inspect +and replay with statehub outbox status and statehub outbox replay. + --- ## Query Tools (read-only, use freely) diff --git a/mcp_server/server.py b/mcp_server/server.py index 9ed5ccf..c571fe6 100644 --- a/mcp_server/server.py +++ b/mcp_server/server.py @@ -120,12 +120,23 @@ def _mcp_error(tool_name: str, message: str, response: Any | None = None) -> dic return payload + +def _mcp_queued(tool_name: str, response: dict[str, Any]) -> dict[str, Any]: + return { + "queued": True, + "tool": tool_name, + "message": "Write queued by State Hub edge relay; central commit is pending replay.", + "receipt": response, + } + def _response_error( tool_name: str, response: Any, required_fields: tuple[str, ...] = (), ) -> dict[str, Any] | None: """Return an MCP-visible error payload for failed or malformed API results.""" + if isinstance(response, dict) and response.get("queued") is True: + return _mcp_queued(tool_name, response) if isinstance(response, dict) and isinstance(response.get("error"), str): return _mcp_error(tool_name, response["error"], response) if not isinstance(response, dict): diff --git a/migrations/versions/e9f0a1b2c3d4_write_idempotency_keys.py b/migrations/versions/e9f0a1b2c3d4_write_idempotency_keys.py new file mode 100644 index 0000000..bfd00cf --- /dev/null +++ b/migrations/versions/e9f0a1b2c3d4_write_idempotency_keys.py @@ -0,0 +1,43 @@ +"""add write idempotency keys + +Revision ID: e9f0a1b2c3d4 +Revises: d8e9f0a1b2c3 +Create Date: 2026-06-23 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSONB, UUID + +revision = "e9f0a1b2c3d4" +down_revision = "d8e9f0a1b2c3" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "write_idempotency_keys", + sa.Column("id", UUID(as_uuid=True), primary_key=True), + sa.Column("key", sa.String(length=200), nullable=False), + sa.Column("method", sa.String(length=10), nullable=False), + sa.Column("path", sa.Text(), nullable=False), + sa.Column("route_class", sa.String(length=30), nullable=False), + sa.Column("request_hash", sa.String(length=64), nullable=False), + sa.Column("response_status", sa.Integer(), nullable=False), + sa.Column("response_body", JSONB(), nullable=True), + sa.Column("source_host", sa.String(length=200), nullable=True), + sa.Column("source_agent", sa.String(length=100), nullable=True), + sa.Column("first_seen_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("last_seen_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("expires_at", sa.DateTime(timezone=True), nullable=True), + sa.UniqueConstraint("key", name="uq_write_idempotency_keys_key"), + ) + op.create_index("ix_write_idempotency_keys_key", "write_idempotency_keys", ["key"]) + op.create_index("ix_write_idempotency_keys_expires_at", "write_idempotency_keys", ["expires_at"]) + + +def downgrade() -> None: + op.drop_index("ix_write_idempotency_keys_expires_at", table_name="write_idempotency_keys") + op.drop_index("ix_write_idempotency_keys_key", table_name="write_idempotency_keys") + op.drop_table("write_idempotency_keys") diff --git a/scripts/consistency_check.py b/scripts/consistency_check.py index 83be0bd..278eb8a 100644 --- a/scripts/consistency_check.py +++ b/scripts/consistency_check.py @@ -572,7 +572,7 @@ def _api_patch(api_base: str, path: str, body: dict) -> Any: def _api_post(api_base: str, path: str, body: dict) -> Any: if not _HAS_HTTPX: - return None + return {"_error": "httpx is not installed"} if not path.endswith("/"): path += "/" try: @@ -580,8 +580,13 @@ def _api_post(api_base: str, path: str, body: dict) -> Any: r = c.post(path, json=body) r.raise_for_status() return r.json() - except Exception: - return None + except _httpx.HTTPStatusError as exc: + detail = exc.response.text + if len(detail) > 500: + detail = detail[:497] + "..." + return {"_error": f"{exc.response.status_code} {exc.response.reason_phrase}: {detail}"} + except Exception as exc: + return {"_error": str(exc)} # --------------------------------------------------------------------------- @@ -836,6 +841,7 @@ def check_repo(api_base: str, repo_slug: str, repo_path_override: str | None = N "repo_id": repo_id, "domain": file_domain, "repo_market_domain": repo_market_domain, + "repo_slug": repo_slug, }, ) continue @@ -1019,11 +1025,13 @@ def check_repo(api_base: str, repo_slug: str, repo_path_override: str | None = N existing_dep_keys = set() if isinstance(existing_deps, list): for dep in existing_deps: - if dep.get("from_workstream_id") != ws_id: + from_id = dep.get("from_workstream_id") or dep.get("from_workplan_id") + if from_id != ws_id: continue rel = dep.get("relationship_type") or "blocks" - if dep.get("to_workstream_id"): - existing_dep_keys.add(("workstream", dep["to_workstream_id"], rel)) + to_workplan_id = dep.get("to_workstream_id") or dep.get("to_workplan_id") + if to_workplan_id: + existing_dep_keys.add(("workstream", to_workplan_id, rel)) if dep.get("to_task_id"): existing_dep_keys.add(("task", dep["to_task_id"], rel)) @@ -1770,20 +1778,58 @@ def fix_repo( ) continue - slug = re.sub(r"[^a-z0-9-]", "-", wp_id.lower()).strip("-") - ws_data = _api_post(api_base, "/workstreams", { - "topic_id": topic_id, - "repo_id": repo_id_val, - "slug": slug, - "title": title or wp_id, - "status": status, - "owner": str(meta.get("owner", "")).strip() or None, - "planning_priority": str(meta.get("planning_priority", "")).strip() or None, - "planning_order": _as_int_or_none(meta.get("planning_order")), - }) + base_slug = re.sub(r"[^a-z0-9-]", "-", wp_id.lower()).strip("-") or "workplan" + repo_slug_part = re.sub( + r"[^a-z0-9-]", "-", str(ctx.get("repo_slug") or "").lower() + ).strip("-") + slug_candidates = [base_slug] + repo_qualified_slug = base_slug + if repo_slug_part and not base_slug.startswith(f"{repo_slug_part}-"): + repo_qualified_slug = f"{repo_slug_part}-{base_slug}" + slug_candidates.append(repo_qualified_slug) + for suffix in range(2, 21): + slug_candidates.append(f"{repo_qualified_slug}-{suffix}") + + ws_data = None + last_error = None + for slug in slug_candidates: + existing = _api_get(api_base, "/workstreams", {"slug": slug}, return_error=True) + if isinstance(existing, dict) and "_error" in existing: + last_error = existing["_error"] + continue + if isinstance(existing, list) and existing: + existing_same_repo = next( + (w for w in existing if w.get("repo_id") == repo_id_val), + None, + ) + if existing_same_repo and existing_same_repo.get("title") == (title or wp_id): + ws_data = existing_same_repo + report.fixes_applied.append( + f"C-06 reusing existing workstream {ws_data['id'][:8]}... for {wp_id}" + ) + break + last_error = f"slug {slug!r} already belongs to another workstream" + continue + + ws_data = _api_post(api_base, "/workstreams", { + "topic_id": topic_id, + "repo_id": repo_id_val, + "slug": slug, + "title": title or wp_id, + "status": status, + "owner": str(meta.get("owner", "")).strip() or None, + "planning_priority": str(meta.get("planning_priority", "")).strip() or None, + "planning_order": _as_int_or_none(meta.get("planning_order")), + }) + if ws_data is None or (isinstance(ws_data, dict) and "_error" in ws_data): + last_error = ws_data.get("_error") if isinstance(ws_data, dict) else "no response" + ws_data = None + continue + break + if ws_data is None: report.fixes_applied.append( - f"C-06 FAIL {wp_id}: could not create workstream in DB" + f"C-06 FAIL {wp_id}: could not create workstream in DB: {last_error or 'no usable slug'}" ) continue @@ -1814,7 +1860,7 @@ def fix_repo( "priority": t_priority, "assignee": task.get("assignee") or None, }) - if t_data: + if t_data and "_error" not in t_data: t_db_id = t_data["id"] injected = _inject_task_id_into_block( wp_file, "state_hub_task_id", t_db_id, t_id @@ -1822,6 +1868,10 @@ def fix_repo( if not injected: _inject_task_id_frontmatter_list(wp_file, t_db_id, t_id) report.fixes_applied.append(f" + task {t_id} → {t_db_id[:8]}…") + elif t_data: + report.fixes_applied.append( + f" ! task {t_id} not created: {t_data.get('_error', t_data)}" + ) elif issue.check_id == "C-09": ws_id = ctx["ws_id"] diff --git a/scripts/project_rules/agents-codex.template b/scripts/project_rules/agents-codex.template index 633b1d3..367f9fe 100644 --- a/scripts/project_rules/agents-codex.template +++ b/scripts/project_rules/agents-codex.template @@ -20,6 +20,12 @@ there is no MCP server for Codex agents. |---------|-----| | Local workstation | `http://127.0.0.1:8000` | | Remote via tunnel | `http://127.0.0.1:18000` | +| Optional local edge relay | http://127.0.0.1:18080 | + +When an operator has enabled the edge relay, set API_BASE to the relay URL. +Queueable writes return an explicit queued receipt if the central hub is +unreachable. Treat that as pending local evidence, then ask the operator to run +statehub outbox status/replay after connectivity returns. ### Orient at session start diff --git a/tests/conftest.py b/tests/conftest.py index 0b7a18e..5358d7a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -98,10 +98,17 @@ async def client(test_engine): async with factory() as session: yield session + from api.services import write_idempotency as _write_idempotency + + old_session_factory = _write_idempotency.async_session_factory + _write_idempotency.async_session_factory = factory app.dependency_overrides[get_session] = _override - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac: - yield ac - app.dependency_overrides.clear() + try: + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac: + yield ac + finally: + app.dependency_overrides.clear() + _write_idempotency.async_session_factory = old_session_factory # --------------------------------------------------------------------------- diff --git a/tests/test_consistency_check.py b/tests/test_consistency_check.py index 5d91941..e693552 100644 --- a/tests/test_consistency_check.py +++ b/tests/test_consistency_check.py @@ -1015,6 +1015,154 @@ class TestLifecycleRenormalization: assert any("C-23 fixed" in fix for fix in report.fixes_applied) + +class TestC20DependencyDetection: + def test_canonical_dependency_fields_satisfy_workplan_dependency(self, tmp_path, monkeypatch): + repo = tmp_path / "repo" + workplans = repo / "workplans" + workplans.mkdir(parents=True) + (workplans / "STATE-WP-0001-base.md").write_text( + "---\n" + "id: STATE-WP-0001\n" + "title: Base\n" + "domain: financials\n" + "repo: demo-repo\n" + "status: active\n" + "state_hub_workstream_id: \"base-ws\"\n" + "---\n\n", + encoding="utf-8", + ) + (workplans / "STATE-WP-0002-dependent.md").write_text( + "---\n" + "id: STATE-WP-0002\n" + "title: Dependent\n" + "domain: financials\n" + "repo: demo-repo\n" + "status: active\n" + "state_hub_workstream_id: \"dependent-ws\"\n" + "depends_on_workplans:\n" + " - STATE-WP-0001\n" + "---\n\n", + encoding="utf-8", + ) + + def fake_get(_api_base, path, params=None, **_kwargs): + if path == "/repos/demo-repo": + import socket + + return { + "id": "repo-1", + "slug": "demo-repo", + "local_path": str(repo), + "host_paths": {socket.gethostname(): str(repo)}, + "domain_slug": "financials", + } + if path == "/workstreams/base-ws": + return {"id": "base-ws", "repo_id": "repo-1", "slug": "state-wp-0001", "title": "Base", "status": "active"} + if path == "/workstreams/dependent-ws": + return {"id": "dependent-ws", "repo_id": "repo-1", "slug": "state-wp-0002", "title": "Dependent", "status": "active"} + if path == "/tasks" and params and params.get("workstream_id") in {"base-ws", "dependent-ws"}: + return [] + if path == "/workstreams/base-ws/dependencies": + return [] + if path == "/workstreams/dependent-ws/dependencies": + return [ + { + "id": "dep-1", + "from_workplan_id": "dependent-ws", + "to_workplan_id": "base-ws", + "to_task_id": None, + "relationship_type": "blocks", + } + ] + if path == "/workstreams" and params == {"repo_id": "repo-1"}: + return [] + return [] + + monkeypatch.setattr("consistency_check._api_get", fake_get) + + report = check_repo("http://unused", "demo-repo") + + assert "C-20" not in [issue.check_id for issue in report.issues] + + +class TestC06WorkstreamCreation: + def test_fix_repo_uses_repo_qualified_slug_when_base_slug_is_taken(self, tmp_path, monkeypatch): + repo = tmp_path / "repo" + workplans = repo / "workplans" + workplans.mkdir(parents=True) + wp = workplans / "STATE-WP-0001-demo.md" + wp.write_text( + "---\n" + "id: STATE-WP-0001\n" + "type: workplan\n" + "title: Demo Workplan\n" + "domain: financials\n" + "repo: demo-repo\n" + "status: ready\n" + "owner: codex\n" + "---\n\n" + "## Implement Demo\n\n" + "```task\n" + "id: STATE-WP-0001-T01\n" + "status: todo\n" + "priority: high\n" + "```\n", + encoding="utf-8", + ) + + created_workstreams = [] + created_tasks = [] + + def fake_get(_api_base, path, params=None, **_kwargs): + if path == "/repos/demo-repo": + import socket + + return { + "id": "repo-1", + "slug": "demo-repo", + "local_path": str(repo), + "host_paths": {socket.gethostname(): str(repo)}, + "domain_slug": "financials", + } + if path == "/topics": + return [{"id": "topic-1", "domain_slug": "financials"}] + if path == "/workstreams" and params == {"slug": "state-wp-0001"}: + return [{"id": "old-ws", "repo_id": "other-repo", "title": "Old Workplan"}] + if path == "/workstreams" and params == {"slug": "demo-repo-state-wp-0001"}: + return [] + if path == "/workstreams" and params == {"repo_id": "repo-1"}: + return [] + if path == "/workstreams" and params and params.get("topic_id") == "topic-1": + return [] + return [] + + def fake_post(_api_base, path, body): + if path == "/workstreams": + created_workstreams.append(body) + return {"id": "new-ws", **body} + if path == "/tasks": + created_tasks.append(body) + return {"id": "new-task", **body} + return {"ok": True} + + monkeypatch.setattr("consistency_check._api_get", fake_get) + monkeypatch.setattr("consistency_check._api_post", fake_post) + monkeypatch.setattr("consistency_check._api_patch", lambda *args, **kwargs: {"ok": True}) + monkeypatch.setattr("consistency_check._detect_behind_remote", lambda _repo_path: False) + monkeypatch.setattr("consistency_check._detect_ahead_of_remote", lambda _repo_path: 0) + monkeypatch.setattr("consistency_check._write_custodian_brief", lambda *args, **kwargs: False) + monkeypatch.setattr("consistency_check._git_push", lambda _repo_path: (True, "pushed")) + + report = fix_repo("http://unused", "demo-repo") + + assert created_workstreams[0]["slug"] == "demo-repo-state-wp-0001" + assert created_tasks[0]["workstream_id"] == "new-ws" + patched = wp.read_text(encoding="utf-8") + assert 'state_hub_workstream_id: "new-ws"' in patched + assert 'state_hub_task_id: "new-task"' in patched + assert any("C-06 fixed" in fix for fix in report.fixes_applied) + # --------------------------------------------------------------------------- # _git_pull (T02 remote fix helper) # --------------------------------------------------------------------------- diff --git a/tests/test_edge_outbox.py b/tests/test_edge_outbox.py new file mode 100644 index 0000000..ec7d8a9 --- /dev/null +++ b/tests/test_edge_outbox.py @@ -0,0 +1,51 @@ +from api.edge.outbox import OutboxStore, PayloadRejected +from api.services.write_idempotency import route_class_for + + +def test_route_classifier_matches_safe_writes(): + assert route_class_for("POST", "/progress/") == "append" + assert route_class_for("PATCH", "/tasks/abc") == "replace" + assert route_class_for("DELETE", "/tasks/abc") is None + + +def test_outbox_scrubs_secret_fields_and_tracks_status(tmp_path): + store = OutboxStore(tmp_path / "outbox.sqlite3") + envelope = store.enqueue( + method="POST", + path="/progress/", + body={"summary": "offline", "password": "secret", "tokens_in": 12}, + source_agent="pytest", + source_host="host-a", + ) + + assert envelope.status == "queued" + assert envelope.route_class == "append" + assert envelope.body["password"] == "[redacted]" + assert envelope.body["tokens_in"] == 12 + assert store.summary()["pending_count"] == 1 + + store.mark_acked(envelope.id, response_status=201, response_body={"id": "central"}) + acked = store.get(envelope.id) + assert acked.status == "acked" + assert acked.response_body == {"id": "central"} + assert store.summary()["pending_count"] == 0 + + +def test_outbox_rejects_non_queueable_routes(tmp_path): + store = OutboxStore(tmp_path / "outbox.sqlite3") + try: + store.enqueue(method="DELETE", path="/tasks/abc", body={}) + except PayloadRejected as exc: + assert "not queueable" in str(exc) + else: + raise AssertionError("DELETE should not be queueable") + + +def test_replace_writes_coalesce_superseded_queued_envelopes(tmp_path): + store = OutboxStore(tmp_path / "outbox.sqlite3") + first = store.enqueue(method="PATCH", path="/tasks/task-1", body={"status": "progress"}) + second = store.enqueue(method="PATCH", path="/tasks/task-1", body={"status": "done"}) + + assert store.get(first.id).status == "cancelled" + assert store.get(second.id).status == "queued" + assert len(store.due()) == 1 diff --git a/tests/test_edge_relay.py b/tests/test_edge_relay.py new file mode 100644 index 0000000..204ae72 --- /dev/null +++ b/tests/test_edge_relay.py @@ -0,0 +1,117 @@ +import httpx +import pytest +from httpx import ASGITransport, AsyncClient + +from api.edge.outbox import OutboxStore +from api.edge.relay import create_app, replay_pending + + +class FailingAsyncClient: + def __init__(self, *args, **kwargs): + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, *exc_info): + return False + + async def request(self, *args, **kwargs): + raise httpx.ConnectError("upstream down") + + async def get(self, *args, **kwargs): + raise httpx.ConnectError("upstream down") + + +class ConflictAsyncClient: + def __init__(self, *args, **kwargs): + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, *exc_info): + return False + + async def request(self, method, path, **kwargs): + request = httpx.Request(method, f"http://upstream{path}") + return httpx.Response(409, json={"error": "conflict"}, request=request) + + +class SuccessAsyncClient: + def __init__(self, *args, **kwargs): + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, *exc_info): + return False + + async def request(self, method, path, **kwargs): + request = httpx.Request(method, f"http://upstream{path}") + return httpx.Response(201, json={"id": "central-id", "path": path}, request=request) + + +@pytest.mark.asyncio +async def test_relay_queues_allowlisted_write_when_upstream_unreachable(tmp_path, monkeypatch): + from api.edge import relay + + monkeypatch.setattr(relay.httpx, "AsyncClient", FailingAsyncClient) + outbox_path = tmp_path / "edge.sqlite3" + app = create_app(upstream_url="http://upstream", outbox_path=str(outbox_path)) + + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://edge") as client: + response = await client.post("/progress/", json={"event_type": "note", "summary": "queued"}) + + assert response.status_code == 202 + body = response.json() + assert body["queued"] is True + assert body["route_class"] == "append" + + store = OutboxStore(outbox_path) + queued = store.list(status="queued") + assert len(queued) == 1 + assert queued[0].path == "/progress/" + + +@pytest.mark.asyncio +async def test_relay_replay_acks_successful_envelope(tmp_path, monkeypatch): + from api.edge import relay + + monkeypatch.setattr(relay.httpx, "AsyncClient", SuccessAsyncClient) + store = OutboxStore(tmp_path / "edge.sqlite3") + envelope = store.enqueue(method="POST", path="/progress/", body={"event_type": "note", "summary": "queued"}) + + result = await replay_pending(store, upstream_url="http://upstream") + + assert result["acked"] == 1 + assert store.get(envelope.id).status == "acked" + + +@pytest.mark.asyncio +async def test_relay_rejects_online_only_write_when_upstream_unreachable(tmp_path, monkeypatch): + from api.edge import relay + + monkeypatch.setattr(relay.httpx, "AsyncClient", FailingAsyncClient) + app = create_app(upstream_url="http://upstream", outbox_path=str(tmp_path / "edge.sqlite3")) + + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://edge") as client: + response = await client.delete("/tasks/abc") + + assert response.status_code == 503 + assert "not queueable" in response.json()["error"] + + +@pytest.mark.asyncio +async def test_relay_replay_marks_conflict(tmp_path, monkeypatch): + from api.edge import relay + + monkeypatch.setattr(relay.httpx, "AsyncClient", ConflictAsyncClient) + store = OutboxStore(tmp_path / "edge.sqlite3") + envelope = store.enqueue(method="PATCH", path="/tasks/task-1", body={"status": "done"}) + + result = await replay_pending(store, upstream_url="http://upstream") + + assert result["conflict"] == 1 + assert store.get(envelope.id).status == "conflict" diff --git a/tests/test_mcp_queued_receipts.py b/tests/test_mcp_queued_receipts.py new file mode 100644 index 0000000..a75e4ca --- /dev/null +++ b/tests/test_mcp_queued_receipts.py @@ -0,0 +1,22 @@ +import json + +from mcp_server import server + + +def test_mcp_write_returns_queued_receipt_without_requiring_rest_shape(monkeypatch): + monkeypatch.setattr( + server, + "_post", + lambda path, body: { + "queued": True, + "outbox_id": "env-1", + "idempotency_key": "statehub-edge:env-1", + "upstream": "unreachable", + }, + ) + + result = json.loads(server.add_progress_event("queued progress")) + + assert result["queued"] is True + assert result["tool"] == "add_progress_event" + assert result["receipt"]["outbox_id"] == "env-1" diff --git a/tests/test_write_idempotency.py b/tests/test_write_idempotency.py new file mode 100644 index 0000000..83ab42b --- /dev/null +++ b/tests/test_write_idempotency.py @@ -0,0 +1,47 @@ +import pytest + + +@pytest.mark.asyncio +async def test_idempotent_progress_post_replays_original_response(client): + payload = {"event_type": "note", "summary": "first idempotent write", "author": "codex"} + headers = {"Idempotency-Key": "test-progress-key", "X-StateHub-Source-Agent": "pytest"} + + first = await client.post("/progress/", json=payload, headers=headers) + assert first.status_code in {200, 201} + first_body = first.json() + + second = await client.post("/progress/", json=dict(reversed(list(payload.items()))), headers=headers) + assert second.status_code == first.status_code + assert second.headers["x-statehub-idempotency-replay"] == "true" + assert second.json() == first_body + + listed = await client.get("/progress/") + assert len([row for row in listed.json() if row["summary"] == payload["summary"]]) == 1 + + +@pytest.mark.asyncio +async def test_idempotency_key_reuse_with_different_request_conflicts(client): + headers = {"Idempotency-Key": "same-key-different-body"} + first = await client.post( + "/progress/", + json={"event_type": "note", "summary": "original"}, + headers=headers, + ) + assert first.status_code in {200, 201} + + second = await client.post( + "/progress/", + json={"event_type": "note", "summary": "changed"}, + headers=headers, + ) + assert second.status_code == 409 + assert "different request" in second.json()["error"] + + +@pytest.mark.asyncio +async def test_idempotency_header_on_unsupported_route_is_ignored(client): + first = await client.get("/state/health", headers={"Idempotency-Key": "ignored-on-read"}) + second = await client.get("/state/health", headers={"Idempotency-Key": "ignored-on-read"}) + assert first.status_code == 200 + assert second.status_code == 200 + assert "x-statehub-idempotency-replay" not in second.headers diff --git a/workplans/STATE-WP-0068-offline-write-buffer-and-edge-relay.md b/workplans/STATE-WP-0068-offline-write-buffer-and-edge-relay.md new file mode 100644 index 0000000..0003c74 --- /dev/null +++ b/workplans/STATE-WP-0068-offline-write-buffer-and-edge-relay.md @@ -0,0 +1,428 @@ +--- +id: STATE-WP-0068 +type: workplan +title: "State Hub offline write buffer and edge relay" +domain: infotech +repo: state-hub +status: finished +owner: codex +topic_slug: custodian +created: "2026-06-23" +updated: "2026-06-23" +finished: "2026-06-23" +state_hub_workstream_id: "189508bd-b3cb-4caf-ac95-30bf2823201d" +--- + +# STATE-WP-0068 - State Hub offline write buffer and edge relay + +## Summary + +Build a durable client-side write buffer for State Hub so agents can keep +recording progress, decisions, messages, and safe status updates when the +central State Hub deployment or its private tunnel is offline. + +The improved design is deliberately split into two layers: + +- **Central HA** makes the primary State Hub deployment fail less often + (`CUST-WP-0011`, `CUST-WP-0038`). +- **Edge buffering** makes agent write attempts durable when the central + deployment is still unreachable. + +The central service cannot buffer requests it never receives. The buffer must +live close to the callers: operator workstation, agent host, bridge host, or +MCP wrapper. State Hub should therefore provide a small local relay/outbox that +accepts sanctioned writes, persists them locally, and replays them to the +central API when connectivity returns. + +## Critical Review of the Original Suggestion + +The suggestion is directionally right but incomplete if phrased as "the central +State Hub buffers while offline." If the central endpoint is unreachable, the +client needs somewhere else to put the write. + +The robust version is: + +1. Agents send writes to a local State Hub edge relay, not directly to the + remote central endpoint. +2. The relay forwards immediately while the central API is reachable. +3. On outage, the relay stores a durable, non-secret write envelope in a local + SQLite outbox and returns an explicit queued receipt. +4. A replay worker flushes the outbox with idempotency keys when the central + API recovers. +5. The central API deduplicates retries and rejects or flags conflicting stale + writes instead of silently overwriting newer state. + +This keeps State Hub local-first and file-canon aligned. It does not make a +multi-master database, and it does not turn queued writes into pretend success. + +## Goals + +- Preserve session-close writes during central State Hub or tunnel outages. +- Make offline write state observable to operators and agents. +- Prevent duplicate progress/events when a replay retries after partial + success. +- Detect stale/conflicting replace-style writes, especially task status and + decision resolution changes. +- Keep secrets out of the buffer. +- Reuse the existing REST contract and MCP write-layer reliability work. + +## Non-Goals + +- Replacing `CUST-WP-0038` high availability, backup, restore, or failover + work. +- Accepting arbitrary offline edits as authoritative current state. +- Queuing destructive deletes, imports, repo syncs, or bulk maintenance jobs in + v1. +- Publicly exposing State Hub. +- Adding Redis, Kafka, or NATS as a required edge dependency. The edge path + should work during local bootstrap with only Python and SQLite. + +## Target Architecture + +``` +Codex / Claude / agent process + -> MCP server or REST client + -> local statehub-edge relay + -> central State Hub API when reachable + -> local SQLite outbox when unreachable + -> replay worker + -> central State Hub API with idempotency key + -> normal DB commit and lifecycle event publication +``` + +The relay is a local process with an explicit listen port, for example +`127.0.0.1:18080`, configured with an upstream central API such as +`http://127.0.0.1:18000` or the local development API. + +## Write Classification + +### Offline-safe append-only writes + +These should be queueable in v1: + +- `POST /progress/` +- `POST /messages/` +- `PATCH /messages/{id}/read` when message id is already known +- `POST /token-events/` +- `POST /decisions/` with an idempotency key and no immediate dependency on the + generated decision id + +### Offline-safe replace-style writes with conflict checks + +These may be queueable only with an expected revision or last-observed +timestamp: + +- `PATCH /tasks/{task_id}` +- `POST /tasks/bulk-status-sync` decomposed into per-task envelopes or replayed + as an ordered batch +- `PATCH /decisions/{decision_id}` and `POST /decisions/{decision_id}/resolve` +- `PATCH /workplans/{workplan_id}` for lifecycle/status fields + +Replay must mark these as conflicted when the central row changed after the +client's observed revision and the update is not a monotonic no-op. + +### Online-only writes in v1 + +These should fail fast while offline: + +- `DELETE` endpoints +- repository sync/import/ingest endpoints +- consistency sweep mutation endpoints +- fabric graph exports +- schema/bootstrap/admin operations +- any request containing authorization tokens, credentials, attachments, or + large opaque payloads + +## Conflict Policy + +- Append-only writes use idempotency keys and replay exactly once from the + caller's point of view. +- Replace-style writes include `expected_updated_at`, `expected_status`, or a + route-specific revision field where available. +- Supersedable queued writes, such as multiple task status patches for the same + task, may be coalesced for replay while preserving local audit entries. +- If central state is newer and the replay cannot prove the queued write is + still safe, mark the envelope `conflict` and surface it in relay status. +- Workplan-file canon remains authoritative. After recovery, operators should + run `make fix-consistency REPO=state-hub` so file-backed task/workplan state + wins over stale queued task updates. + +## T01 - Write Safety ADR and Route Inventory + +```task +id: STATE-WP-0068-T01 +status: done +priority: high +state_hub_task_id: "07aa2d43-0305-45ca-8b5a-bf6f96f716a9" +``` + +Create a short ADR or design doc that classifies State Hub write routes as +append-only, replace-style, supersedable, or online-only. + +Deliverables: + +- Route inventory generated from `api/routers/*` and MCP sanctioned writes. +- V1 safe-write allowlist with request/response examples. +- Conflict policy per route class. +- Explicit statement that queued receipts are pending evidence, not successful + central commits. +- Operator decision on the local relay port, default outbox location, and + retention window. + +Done when implementation tasks can refer to a reviewed allowlist instead of +guessing route safety. + +## T02 - Central Idempotency and Replay Acceptance + +```task +id: STATE-WP-0068-T02 +status: done +priority: high +state_hub_task_id: "f0060859-e9a7-441c-91cc-1e838c5ba60f" +``` + +Add central API support for idempotent replay. + +Expected implementation: + +- Migration for a `write_idempotency_keys` table storing key, method, path, + request hash, response status/body, source host/agent, first seen, last seen, + and expiry. +- Middleware or route dependency that accepts `Idempotency-Key` on allowlisted + write endpoints. +- Same-key/same-request replay returns the original response. +- Same-key/different-request returns HTTP 409. +- Replay metadata is available for diagnostics without logging request secrets. +- Tests cover success, retry, hash mismatch, expiry, and unsupported routes. + +Done when append-only writes can be retried after a transport failure without +duplicating central records. + +## T03 - Durable Local Outbox Store + +```task +id: STATE-WP-0068-T03 +status: done +priority: high +state_hub_task_id: "6897dd71-6252-4eed-bb0c-350e8c566b3b" +``` + +Implement a local SQLite-backed outbox module used by the relay and CLI. + +Minimum schema: + +- envelope id and idempotency key +- method, path, scrubbed JSON body, route class +- source agent, source host, repo slug, session id when known +- observed revision fields for conflict checks +- status: `queued`, `sending`, `acked`, `conflict`, `dead`, `cancelled` +- attempt count, next retry time, last error, central response summary +- created, updated, acked timestamps + +Safety requirements: + +- Create the DB with owner-only permissions where the platform supports it. +- Never persist authorization headers, API keys, bearer tokens, cookies, or + secret-looking fields. +- Cap payload size and reject large opaque bodies. +- Provide export/import of non-secret envelopes for operator debugging. + +Done when unit tests prove enqueue, status transitions, coalescing metadata, +scrubbing, and corruption-safe startup behavior. + +## T04 - Edge Relay HTTP Surface + +```task +id: STATE-WP-0068-T04 +status: done +priority: high +state_hub_task_id: "deb883df-b312-4e8f-b559-718bb8a94035" +``` + +Create a local `statehub-edge` relay process that exposes a small HTTP surface. + +Behavior: + +- Online path: forward allowlisted writes to upstream and return the upstream + response. +- Offline path: enqueue allowlisted writes and return a clear queued receipt: + `{"queued": true, "outbox_id": "...", "idempotency_key": "...", + "upstream": "unreachable"}`. +- Online-only path during outage: return a deterministic error explaining that + the route is not queueable. +- Read path: proxy selected reads while online; optionally serve cached + `/state/summary` metadata with stale markers while offline. +- Health/status: expose relay health, upstream reachability, pending count, + oldest pending age, and conflict count. + +Done when agents can point `API_BASE` at the relay and receive either the +normal REST shape or an explicit queued/error shape. + +## T05 - Replay Worker and Conflict Handling + +```task +id: STATE-WP-0068-T05 +status: done +priority: high +state_hub_task_id: "6c3916c1-4a9f-4b1d-a8b1-a356a6edf3db" +``` + +Implement the replay loop. + +Requirements: + +- Exponential backoff with jitter for transport failures. +- Single-flight sending per envelope. +- Preserve per-entity order for replace-style writes. +- Coalesce superseded task/workplan status writes before replay when safe. +- Use `Idempotency-Key` for every replayed write. +- Mark conflicts without dropping the original envelope. +- Provide commands to retry, cancel, or mark-dead individual envelopes. + +Done when an integration test can simulate central outage, enqueue writes, +restore central service, replay successfully, and surface one intentionally +stale task update as a conflict. + +## T06 - MCP and Agent UX Integration + +```task +id: STATE-WP-0068-T06 +status: done +priority: high +state_hub_task_id: "8ccac4f9-f457-4f87-9195-1d8619043c0f" +``` + +Update MCP tooling and agent-facing docs so offline buffering is usable without +surprise. + +Expected changes: + +- MCP write helpers recognize relay queued receipts and return them clearly. +- Automatic progress-event side effects do not duplicate queued primary writes. +- Session-close guidance says to check relay status when writes were queued. +- `mcp_server/TOOLS.md` documents online, queued, and conflict outcomes. +- Repo `AGENTS.md` template can point agents at the relay when enabled. + +Done when an agent can complete a session during a central outage, see that the +progress write is queued, and verify later that it was replayed. + +## T07 - Operator Observability + +```task +id: STATE-WP-0068-T07 +status: done +priority: medium +state_hub_task_id: "62c0ca4f-b3e2-49f7-ba70-365016195e83" +``` + +Expose pending offline writes to humans and automations. + +Deliverables: + +- CLI commands: `statehub outbox status`, `statehub outbox list`, + `statehub outbox replay`, `statehub outbox export`. +- Optional dashboard panel or docs page showing edge relay health, if the + dashboard can reach the relay. +- Prometheus-style or JSON metrics for pending count, oldest age, replay + failures, and conflicts. +- Progress event after replay recovery summarizing non-secret results. + +Done when the operator can see whether any host still has unsent State Hub +writes before declaring an outage recovered. + +## T08 - Chaos and Regression Test Suite + +```task +id: STATE-WP-0068-T08 +status: done +priority: high +state_hub_task_id: "2a12614f-8923-45b1-b8e9-ad8c818b23d3" +``` + +Add tests that make offline behavior boring. + +Coverage: + +- Unit tests for route allowlist, payload scrubbing, idempotency hash behavior, + outbox state transitions, and coalescing decisions. +- Integration test with a fake upstream returning connection errors, 5xx, 409, + and success. +- End-to-end test for MCP write through relay during outage and replay. +- Drill script that can be run locally without touching production data. + +Done when CI can prove no duplicate append-only records are produced across +retry and no replace-style conflict is silently applied. + +## T09 - Runbooks, Cutover, and Recovery Drill + +```task +id: STATE-WP-0068-T09 +status: done +priority: medium +state_hub_task_id: "fedea85e-c720-4814-9691-affa6c944954" +``` + +Document and rehearse the operator workflow. + +Runbook content: + +- How to start the relay on an operator workstation or agent host. +- How to configure MCP/REST clients to use the relay. +- What queued receipts mean during session close. +- How to inspect, replay, export, cancel, and resolve conflicted envelopes. +- Recovery checklist after central State Hub returns. +- Interaction with `make fix-consistency REPO=state-hub`. + +Done when a controlled drill queues at least one progress event and one task +status update during a forced outage, replays the progress event, flags or +applies the task update according to the conflict policy, and records the +results without exposing secrets. + +## Dependencies and References + +- `CUST-WP-0011` - pragmatic railiance01 State Hub migration. +- `CUST-WP-0038` - long-term ThreePhoenix HA State Hub target. +- `STATE-WP-0059` - MCP write-layer reliability and explicit API failure + handling. +- `STATE-WP-0066` - summary cache and stale-while-revalidate for read paths. +- `docs/activity-core-delegation.md` - JetStream buffering covers State Hub to + activity-core events after commit; this work covers agent to State Hub writes + before commit. +- `mcp_server/TOOLS.md` - current MCP/REST parity and failure handling contract. + +After this workplan is synced, run: + +```bash +make fix-consistency REPO=state-hub +``` + +## Implementation Notes + +Completed 2026-06-23. The implementation provides the first full offline-write +buffering path: + +- Central idempotency support through WriteIdempotencyMiddleware, the + write_idempotency_keys model, and migration e9f0a1b2c3d4. Exact duplicate + writes replay the original response; same key with a different request returns + HTTP 409. +- Shared route classification for queueable append and replace-style writes. +- Local SQLite outbox with payload scrubbing, payload size limits, private file + permissions where supported, status transitions, retry/cancel/export support, + and latest replace-write coalescing. +- State Hub edge relay app with online forwarding, offline queue receipts, + health/status, replay endpoint, and replay worker. +- statehub outbox CLI commands for status, list, export, replay, retry, and + cancel. +- MCP queued receipt handling so queued primary writes do not trigger automatic + progress side effects. +- Operator documentation in docs/offline-write-buffer.md, MCP tool docs, and the + Codex agent instruction template. + +Verification: + +- Focused suite: 22 passed in 19.51s. +- Full suite: 446 passed, 1 warning in 287.20s. The warning was a SQLAlchemy + RuntimeWarning in tests/test_summary_cache.py and was not introduced by a + failing assertion. +- Syntax checks passed for the new and touched Python modules. +- git diff --check passed.