diff --git a/docs/runbook.md b/docs/runbook.md index fc81eb1..0136854 100644 --- a/docs/runbook.md +++ b/docs/runbook.md @@ -358,6 +358,27 @@ Legacy values are still accepted: `catchup` → `catchup_all`, > brief outage at trigger time silently dropped the fire with no recovery and no > log line. The `daily-statehub-wsjf-triage` definition now uses `catchup_latest`. +## State Hub write idempotency (ACTIVITY-WP-0014 T05) + +Every State Hub write from activity-core (report-sink progress, ops-evidence +progress, schedule-miss alerts) carries a stable **`Idempotency-Key`** header +derived deterministically from the write's identity +(`run_id:instruction_id:event_type`, or `schedule_miss:activity_id:last_fired` +for miss alerts). This makes writes safe to **buffer and replay** under the +planned State Hub *beachhead* (per-machine read cache + write outbox): a flush — +possibly retried after an outage — cannot create duplicate progress/triage +events once State Hub / the beachhead honours the header. + +The guarantee lives on the write, not on a live dedup read. The read-based +`_progress_exists` check is now best-effort only: if State Hub is unreachable it +returns `False` (proceed to the keyed write) rather than hard-failing. The header +passes untouched through the `actcore-state-hub-bridge` proxy and is ignored by +State Hub versions that do not yet honour it. + +> The queue/cache itself is **not** built in activity-core — it belongs to the +> state-hub beachhead. activity-core only emits the key. See the proposal sent to +> the `state-hub` agent. + ## Troubleshooting ### Worker fails to start: "ACTCORE_DB_URL is required" diff --git a/src/activity_core/ops_evidence_sinks.py b/src/activity_core/ops_evidence_sinks.py index 9299280..803479f 100644 --- a/src/activity_core/ops_evidence_sinks.py +++ b/src/activity_core/ops_evidence_sinks.py @@ -8,6 +8,7 @@ from typing import Any import httpx from activity_core.context_resolvers.ops_inventory import _sanitize_url +from activity_core.state_hub_write import idempotency_headers _DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000" _INTER_HUB_SINK_TYPES = { @@ -121,6 +122,7 @@ def _post_state_hub_progress( resp = httpx.post( f"{base_url}/progress/", json=body, + headers=idempotency_headers(run_id, context_key, event_type), timeout=float(sink.get("timeout_seconds", 10.0)), ) resp.raise_for_status() @@ -136,12 +138,17 @@ def _post_state_hub_progress( def _progress_exists(base_url: str, event_type: str, idempotency_key: str) -> bool: - resp = httpx.get( - f"{base_url}/progress/", - params={"limit": 100}, - timeout=10.0, - ) - resp.raise_for_status() + # Best-effort optimisation only; the Idempotency-Key header on the write is the + # real dedup guarantee. Do not hard-fail if State Hub is unreachable here. + try: + resp = httpx.get( + f"{base_url}/progress/", + params={"limit": 100}, + timeout=10.0, + ) + resp.raise_for_status() + except httpx.HTTPError: + return False for item in resp.json(): detail = item.get("detail") or {} if ( diff --git a/src/activity_core/report_sinks.py b/src/activity_core/report_sinks.py index ad6dd1b..66fa5a7 100644 --- a/src/activity_core/report_sinks.py +++ b/src/activity_core/report_sinks.py @@ -11,6 +11,8 @@ from zoneinfo import ZoneInfo import httpx +from activity_core.state_hub_write import idempotency_headers + _DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000" _THE_CUSTODIAN_ROOT = Path("/home/worsch/the-custodian") _FORBIDDEN_CUSTODIAN_ROOTS = ( @@ -149,6 +151,7 @@ def _post_state_hub_progress( resp = httpx.post( f"{base_url}/progress/", json=body, + headers=idempotency_headers(run_id, instruction_id, event_type), timeout=float(sink.get("timeout_seconds", 10.0)), ) resp.raise_for_status() @@ -167,12 +170,18 @@ def _progress_exists( instruction_id: str, event_type: str, ) -> bool: - resp = httpx.get( - f"{base_url}/progress/", - params={"limit": 100}, - timeout=10.0, - ) - resp.raise_for_status() + # Best-effort read-dedup optimisation only. The Idempotency-Key header on the + # write is the real guarantee; if State Hub is unreachable here we must not + # hard-fail — proceed to the (keyed) write rather than raising. + try: + resp = httpx.get( + f"{base_url}/progress/", + params={"limit": 100}, + timeout=10.0, + ) + resp.raise_for_status() + except httpx.HTTPError: + return False for item in resp.json(): detail = item.get("detail") or {} if ( diff --git a/src/activity_core/schedule_health.py b/src/activity_core/schedule_health.py index 2aa4813..d363812 100644 --- a/src/activity_core/schedule_health.py +++ b/src/activity_core/schedule_health.py @@ -24,6 +24,7 @@ from uuid import UUID import httpx from activity_core.schedule_manager import schedule_id +from activity_core.state_hub_write import idempotency_headers _DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000" @@ -176,7 +177,14 @@ def post_missed_fire_alert( if workstream_id: body["workstream_id"] = workstream_id - resp = httpx.post(f"{base_url}/progress/", json=body, timeout=timeout_seconds) + # Dedup repeated alerts for the same missed window (same schedule + last fire). + last_fired = health.last_fired_at.isoformat() if health.last_fired_at else "none" + resp = httpx.post( + f"{base_url}/progress/", + json=body, + headers=idempotency_headers("schedule_miss", health.activity_id, last_fired), + timeout=timeout_seconds, + ) resp.raise_for_status() data = resp.json() return { diff --git a/src/activity_core/state_hub_write.py b/src/activity_core/state_hub_write.py new file mode 100644 index 0000000..2b45138 --- /dev/null +++ b/src/activity_core/state_hub_write.py @@ -0,0 +1,34 @@ +"""Idempotency-keyed State Hub writes (ACTIVITY-WP-0014 T05). + +Under the State Hub *beachhead* model, a write may be buffered locally while +central State Hub is unreachable and **flushed later, possibly with retries**. +To keep that flush safe — no duplicate progress / triage events — every write +carries a stable ``Idempotency-Key`` header derived deterministically from the +write's identity. The guarantee lives on the write itself and does **not** depend +on a live dedup read, so it holds even when the beachhead is serving offline. + +activity-core does not implement the queue/cache (that is state-hub's beachhead); +it only emits the key so the beachhead / State Hub can dedup on flush. The header +passes untouched through the existing ``actcore-state-hub-bridge`` proxy and is +ignored by State Hub versions that do not yet honour it. +""" + +from __future__ import annotations + +IDEMPOTENCY_HEADER = "Idempotency-Key" + + +def idempotency_key(*parts: str | None) -> str: + """Build a stable, header-safe idempotency key from identity parts. + + Empty/None parts are kept as empty segments so the key shape is stable across + calls. Whitespace and control characters are collapsed to keep the value a + valid single-line HTTP header. + """ + raw = ":".join((p or "") for p in parts) + return "".join(ch if 0x20 < ord(ch) < 0x7F else "_" for ch in raw) or "_" + + +def idempotency_headers(*parts: str | None) -> dict[str, str]: + """Return the header dict to attach to a State Hub write.""" + return {IDEMPOTENCY_HEADER: idempotency_key(*parts)} diff --git a/tests/test_state_hub_write.py b/tests/test_state_hub_write.py new file mode 100644 index 0000000..207eaf9 --- /dev/null +++ b/tests/test_state_hub_write.py @@ -0,0 +1,81 @@ +"""ACTIVITY-WP-0014 T05: idempotency-keyed State Hub writes.""" + +from __future__ import annotations + +import httpx +import pytest + +from activity_core import report_sinks +from activity_core.state_hub_write import ( + IDEMPOTENCY_HEADER, + idempotency_headers, + idempotency_key, +) + + +def test_key_is_stable_and_deterministic() -> None: + a = idempotency_key("run1", "daily-triage-report", "daily_triage") + b = idempotency_key("run1", "daily-triage-report", "daily_triage") + assert a == b == "run1:daily-triage-report:daily_triage" + + +def test_key_shape_stable_with_missing_parts() -> None: + assert idempotency_key("run1", None, "daily_triage") == "run1::daily_triage" + + +def test_key_sanitizes_control_and_whitespace() -> None: + key = idempotency_key("run 1", "a\tb", "x\n") + assert "\t" not in key and "\n" not in key and " " not in key + + +def test_headers_carry_the_key() -> None: + headers = idempotency_headers("run1", "i", "e") + assert headers == {IDEMPOTENCY_HEADER: "run1:i:e"} + + +def test_distinct_identities_get_distinct_keys() -> None: + assert idempotency_key("r", "i", "daily_triage") != idempotency_key( + "r", "i", "schedule_miss" + ) + + +def test_progress_exists_is_best_effort_on_connection_error(monkeypatch) -> None: + """A down State Hub must not hard-fail the dedup read; it returns False so the + keyed write can still proceed.""" + + def _boom(*args, **kwargs): + raise httpx.ConnectError("Connection refused") + + monkeypatch.setattr(report_sinks.httpx, "get", _boom) + assert ( + report_sinks._progress_exists( + "http://127.0.0.1:8000", "run1", "daily-triage-report", "daily_triage" + ) + is False + ) + + +def test_report_sink_post_sends_idempotency_header(monkeypatch) -> None: + """The state-hub-progress write carries a stable Idempotency-Key header.""" + captured: dict[str, object] = {} + + monkeypatch.setattr(report_sinks, "_progress_exists", lambda *a, **k: False) + + class _Resp: + def raise_for_status(self) -> None: ... + def json(self) -> dict[str, str]: + return {"id": "pid-1"} + + def _capture_post(url, json, headers, timeout): # noqa: A002 + captured["headers"] = headers + return _Resp() + + monkeypatch.setattr(report_sinks.httpx, "post", _capture_post) + + payload = {"run_id": "run1", "activity_id": "act1", "scheduled_for": None} + report_entry = {"instruction_id": "daily-triage-report", "report": {"summary": "s"}} + sink = {"event_type": "daily_triage"} + + result = report_sinks._post_state_hub_progress(payload, report_entry, sink) + assert result["status"] == "posted" + assert captured["headers"][IDEMPOTENCY_HEADER] == "run1:daily-triage-report:daily_triage" diff --git a/workplans/ACTIVITY-WP-0014-schedule-misfire-robustness.md b/workplans/ACTIVITY-WP-0014-schedule-misfire-robustness.md index 9ae43da..28cc264 100644 --- a/workplans/ACTIVITY-WP-0014-schedule-misfire-robustness.md +++ b/workplans/ACTIVITY-WP-0014-schedule-misfire-robustness.md @@ -171,12 +171,14 @@ down. This is handed off to state-hub (see the coordination message / proposal); activity-core's only responsibilities under this model are thin: -- **Idempotent writes (do now, in-repo):** attach a stable idempotency key - (e.g. `run_id` + `instruction_id` + `event_type`) to every State Hub write so a - beachhead flush — possibly replayed after an outage — cannot create duplicate - `daily_triage`/progress events. The report sink already does a read-based dedup - check (`_progress_exists`); make the guarantee explicit and not dependent on a - live read. +- **Idempotent writes — DONE (2026-06-23, in-repo):** added + `activity_core/state_hub_write` (`idempotency_headers`); every State Hub write + (report-sink, ops-evidence, schedule-miss) now sends a stable `Idempotency-Key` + header derived from `run_id:instruction_id:event_type`. The read-based + `_progress_exists` dedup is now best-effort (returns `False` on connection + error instead of hard-failing), so the guarantee lives on the keyed write, not + a live read. Tests in `tests/test_state_hub_write.py`; documented in + `docs/runbook.md`. - **Adopt the beachhead endpoint (blocked on state-hub):** keep `STATE_HUB_URL` pointed at the local beachhead, and **retire the bespoke `actcore-state-hub-bridge` proxy** (the inline `hostNetwork` proxy in