generated from coulomb/repo-seed
feat(ACTIVITY-WP-0014): idempotency-keyed State Hub writes (T05, in-repo part)
Add activity_core/state_hub_write: every State Hub write (report-sink, ops-evidence, schedule-miss) now sends a stable Idempotency-Key header derived from run_id:instruction_id:event_type. Makes writes safe to buffer/replay under the future state-hub beachhead without duplicate progress/triage events. The read-based _progress_exists dedup is now best-effort (returns False on connection error instead of hard-failing), so the guarantee lives on the keyed write rather than a live read. Tests + runbook note. Endpoint adoption / proxy retirement stays blocked on the state-hub beachhead capability. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -358,6 +358,27 @@ Legacy values are still accepted: `catchup` → `catchup_all`,
|
|||||||
> brief outage at trigger time silently dropped the fire with no recovery and no
|
> brief outage at trigger time silently dropped the fire with no recovery and no
|
||||||
> log line. The `daily-statehub-wsjf-triage` definition now uses `catchup_latest`.
|
> log line. The `daily-statehub-wsjf-triage` definition now uses `catchup_latest`.
|
||||||
|
|
||||||
|
## State Hub write idempotency (ACTIVITY-WP-0014 T05)
|
||||||
|
|
||||||
|
Every State Hub write from activity-core (report-sink progress, ops-evidence
|
||||||
|
progress, schedule-miss alerts) carries a stable **`Idempotency-Key`** header
|
||||||
|
derived deterministically from the write's identity
|
||||||
|
(`run_id:instruction_id:event_type`, or `schedule_miss:activity_id:last_fired`
|
||||||
|
for miss alerts). This makes writes safe to **buffer and replay** under the
|
||||||
|
planned State Hub *beachhead* (per-machine read cache + write outbox): a flush —
|
||||||
|
possibly retried after an outage — cannot create duplicate progress/triage
|
||||||
|
events once State Hub / the beachhead honours the header.
|
||||||
|
|
||||||
|
The guarantee lives on the write, not on a live dedup read. The read-based
|
||||||
|
`_progress_exists` check is now best-effort only: if State Hub is unreachable it
|
||||||
|
returns `False` (proceed to the keyed write) rather than hard-failing. The header
|
||||||
|
passes untouched through the `actcore-state-hub-bridge` proxy and is ignored by
|
||||||
|
State Hub versions that do not yet honour it.
|
||||||
|
|
||||||
|
> The queue/cache itself is **not** built in activity-core — it belongs to the
|
||||||
|
> state-hub beachhead. activity-core only emits the key. See the proposal sent to
|
||||||
|
> the `state-hub` agent.
|
||||||
|
|
||||||
## Troubleshooting
|
## Troubleshooting
|
||||||
|
|
||||||
### Worker fails to start: "ACTCORE_DB_URL is required"
|
### Worker fails to start: "ACTCORE_DB_URL is required"
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ from typing import Any
|
|||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
from activity_core.context_resolvers.ops_inventory import _sanitize_url
|
from activity_core.context_resolvers.ops_inventory import _sanitize_url
|
||||||
|
from activity_core.state_hub_write import idempotency_headers
|
||||||
|
|
||||||
_DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000"
|
_DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000"
|
||||||
_INTER_HUB_SINK_TYPES = {
|
_INTER_HUB_SINK_TYPES = {
|
||||||
@@ -121,6 +122,7 @@ def _post_state_hub_progress(
|
|||||||
resp = httpx.post(
|
resp = httpx.post(
|
||||||
f"{base_url}/progress/",
|
f"{base_url}/progress/",
|
||||||
json=body,
|
json=body,
|
||||||
|
headers=idempotency_headers(run_id, context_key, event_type),
|
||||||
timeout=float(sink.get("timeout_seconds", 10.0)),
|
timeout=float(sink.get("timeout_seconds", 10.0)),
|
||||||
)
|
)
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
@@ -136,12 +138,17 @@ def _post_state_hub_progress(
|
|||||||
|
|
||||||
|
|
||||||
def _progress_exists(base_url: str, event_type: str, idempotency_key: str) -> bool:
|
def _progress_exists(base_url: str, event_type: str, idempotency_key: str) -> bool:
|
||||||
resp = httpx.get(
|
# Best-effort optimisation only; the Idempotency-Key header on the write is the
|
||||||
f"{base_url}/progress/",
|
# real dedup guarantee. Do not hard-fail if State Hub is unreachable here.
|
||||||
params={"limit": 100},
|
try:
|
||||||
timeout=10.0,
|
resp = httpx.get(
|
||||||
)
|
f"{base_url}/progress/",
|
||||||
resp.raise_for_status()
|
params={"limit": 100},
|
||||||
|
timeout=10.0,
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
except httpx.HTTPError:
|
||||||
|
return False
|
||||||
for item in resp.json():
|
for item in resp.json():
|
||||||
detail = item.get("detail") or {}
|
detail = item.get("detail") or {}
|
||||||
if (
|
if (
|
||||||
|
|||||||
@@ -11,6 +11,8 @@ from zoneinfo import ZoneInfo
|
|||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
|
from activity_core.state_hub_write import idempotency_headers
|
||||||
|
|
||||||
_DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000"
|
_DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000"
|
||||||
_THE_CUSTODIAN_ROOT = Path("/home/worsch/the-custodian")
|
_THE_CUSTODIAN_ROOT = Path("/home/worsch/the-custodian")
|
||||||
_FORBIDDEN_CUSTODIAN_ROOTS = (
|
_FORBIDDEN_CUSTODIAN_ROOTS = (
|
||||||
@@ -149,6 +151,7 @@ def _post_state_hub_progress(
|
|||||||
resp = httpx.post(
|
resp = httpx.post(
|
||||||
f"{base_url}/progress/",
|
f"{base_url}/progress/",
|
||||||
json=body,
|
json=body,
|
||||||
|
headers=idempotency_headers(run_id, instruction_id, event_type),
|
||||||
timeout=float(sink.get("timeout_seconds", 10.0)),
|
timeout=float(sink.get("timeout_seconds", 10.0)),
|
||||||
)
|
)
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
@@ -167,12 +170,18 @@ def _progress_exists(
|
|||||||
instruction_id: str,
|
instruction_id: str,
|
||||||
event_type: str,
|
event_type: str,
|
||||||
) -> bool:
|
) -> bool:
|
||||||
resp = httpx.get(
|
# Best-effort read-dedup optimisation only. The Idempotency-Key header on the
|
||||||
f"{base_url}/progress/",
|
# write is the real guarantee; if State Hub is unreachable here we must not
|
||||||
params={"limit": 100},
|
# hard-fail — proceed to the (keyed) write rather than raising.
|
||||||
timeout=10.0,
|
try:
|
||||||
)
|
resp = httpx.get(
|
||||||
resp.raise_for_status()
|
f"{base_url}/progress/",
|
||||||
|
params={"limit": 100},
|
||||||
|
timeout=10.0,
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
except httpx.HTTPError:
|
||||||
|
return False
|
||||||
for item in resp.json():
|
for item in resp.json():
|
||||||
detail = item.get("detail") or {}
|
detail = item.get("detail") or {}
|
||||||
if (
|
if (
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ from uuid import UUID
|
|||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
from activity_core.schedule_manager import schedule_id
|
from activity_core.schedule_manager import schedule_id
|
||||||
|
from activity_core.state_hub_write import idempotency_headers
|
||||||
|
|
||||||
_DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000"
|
_DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000"
|
||||||
|
|
||||||
@@ -176,7 +177,14 @@ def post_missed_fire_alert(
|
|||||||
if workstream_id:
|
if workstream_id:
|
||||||
body["workstream_id"] = workstream_id
|
body["workstream_id"] = workstream_id
|
||||||
|
|
||||||
resp = httpx.post(f"{base_url}/progress/", json=body, timeout=timeout_seconds)
|
# Dedup repeated alerts for the same missed window (same schedule + last fire).
|
||||||
|
last_fired = health.last_fired_at.isoformat() if health.last_fired_at else "none"
|
||||||
|
resp = httpx.post(
|
||||||
|
f"{base_url}/progress/",
|
||||||
|
json=body,
|
||||||
|
headers=idempotency_headers("schedule_miss", health.activity_id, last_fired),
|
||||||
|
timeout=timeout_seconds,
|
||||||
|
)
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
return {
|
return {
|
||||||
|
|||||||
34
src/activity_core/state_hub_write.py
Normal file
34
src/activity_core/state_hub_write.py
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
"""Idempotency-keyed State Hub writes (ACTIVITY-WP-0014 T05).
|
||||||
|
|
||||||
|
Under the State Hub *beachhead* model, a write may be buffered locally while
|
||||||
|
central State Hub is unreachable and **flushed later, possibly with retries**.
|
||||||
|
To keep that flush safe — no duplicate progress / triage events — every write
|
||||||
|
carries a stable ``Idempotency-Key`` header derived deterministically from the
|
||||||
|
write's identity. The guarantee lives on the write itself and does **not** depend
|
||||||
|
on a live dedup read, so it holds even when the beachhead is serving offline.
|
||||||
|
|
||||||
|
activity-core does not implement the queue/cache (that is state-hub's beachhead);
|
||||||
|
it only emits the key so the beachhead / State Hub can dedup on flush. The header
|
||||||
|
passes untouched through the existing ``actcore-state-hub-bridge`` proxy and is
|
||||||
|
ignored by State Hub versions that do not yet honour it.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
IDEMPOTENCY_HEADER = "Idempotency-Key"
|
||||||
|
|
||||||
|
|
||||||
|
def idempotency_key(*parts: str | None) -> str:
|
||||||
|
"""Build a stable, header-safe idempotency key from identity parts.
|
||||||
|
|
||||||
|
Empty/None parts are kept as empty segments so the key shape is stable across
|
||||||
|
calls. Whitespace and control characters are collapsed to keep the value a
|
||||||
|
valid single-line HTTP header.
|
||||||
|
"""
|
||||||
|
raw = ":".join((p or "") for p in parts)
|
||||||
|
return "".join(ch if 0x20 < ord(ch) < 0x7F else "_" for ch in raw) or "_"
|
||||||
|
|
||||||
|
|
||||||
|
def idempotency_headers(*parts: str | None) -> dict[str, str]:
|
||||||
|
"""Return the header dict to attach to a State Hub write."""
|
||||||
|
return {IDEMPOTENCY_HEADER: idempotency_key(*parts)}
|
||||||
81
tests/test_state_hub_write.py
Normal file
81
tests/test_state_hub_write.py
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
"""ACTIVITY-WP-0014 T05: idempotency-keyed State Hub writes."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from activity_core import report_sinks
|
||||||
|
from activity_core.state_hub_write import (
|
||||||
|
IDEMPOTENCY_HEADER,
|
||||||
|
idempotency_headers,
|
||||||
|
idempotency_key,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_key_is_stable_and_deterministic() -> None:
|
||||||
|
a = idempotency_key("run1", "daily-triage-report", "daily_triage")
|
||||||
|
b = idempotency_key("run1", "daily-triage-report", "daily_triage")
|
||||||
|
assert a == b == "run1:daily-triage-report:daily_triage"
|
||||||
|
|
||||||
|
|
||||||
|
def test_key_shape_stable_with_missing_parts() -> None:
|
||||||
|
assert idempotency_key("run1", None, "daily_triage") == "run1::daily_triage"
|
||||||
|
|
||||||
|
|
||||||
|
def test_key_sanitizes_control_and_whitespace() -> None:
|
||||||
|
key = idempotency_key("run 1", "a\tb", "x\n")
|
||||||
|
assert "\t" not in key and "\n" not in key and " " not in key
|
||||||
|
|
||||||
|
|
||||||
|
def test_headers_carry_the_key() -> None:
|
||||||
|
headers = idempotency_headers("run1", "i", "e")
|
||||||
|
assert headers == {IDEMPOTENCY_HEADER: "run1:i:e"}
|
||||||
|
|
||||||
|
|
||||||
|
def test_distinct_identities_get_distinct_keys() -> None:
|
||||||
|
assert idempotency_key("r", "i", "daily_triage") != idempotency_key(
|
||||||
|
"r", "i", "schedule_miss"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_progress_exists_is_best_effort_on_connection_error(monkeypatch) -> None:
|
||||||
|
"""A down State Hub must not hard-fail the dedup read; it returns False so the
|
||||||
|
keyed write can still proceed."""
|
||||||
|
|
||||||
|
def _boom(*args, **kwargs):
|
||||||
|
raise httpx.ConnectError("Connection refused")
|
||||||
|
|
||||||
|
monkeypatch.setattr(report_sinks.httpx, "get", _boom)
|
||||||
|
assert (
|
||||||
|
report_sinks._progress_exists(
|
||||||
|
"http://127.0.0.1:8000", "run1", "daily-triage-report", "daily_triage"
|
||||||
|
)
|
||||||
|
is False
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_report_sink_post_sends_idempotency_header(monkeypatch) -> None:
|
||||||
|
"""The state-hub-progress write carries a stable Idempotency-Key header."""
|
||||||
|
captured: dict[str, object] = {}
|
||||||
|
|
||||||
|
monkeypatch.setattr(report_sinks, "_progress_exists", lambda *a, **k: False)
|
||||||
|
|
||||||
|
class _Resp:
|
||||||
|
def raise_for_status(self) -> None: ...
|
||||||
|
def json(self) -> dict[str, str]:
|
||||||
|
return {"id": "pid-1"}
|
||||||
|
|
||||||
|
def _capture_post(url, json, headers, timeout): # noqa: A002
|
||||||
|
captured["headers"] = headers
|
||||||
|
return _Resp()
|
||||||
|
|
||||||
|
monkeypatch.setattr(report_sinks.httpx, "post", _capture_post)
|
||||||
|
|
||||||
|
payload = {"run_id": "run1", "activity_id": "act1", "scheduled_for": None}
|
||||||
|
report_entry = {"instruction_id": "daily-triage-report", "report": {"summary": "s"}}
|
||||||
|
sink = {"event_type": "daily_triage"}
|
||||||
|
|
||||||
|
result = report_sinks._post_state_hub_progress(payload, report_entry, sink)
|
||||||
|
assert result["status"] == "posted"
|
||||||
|
assert captured["headers"][IDEMPOTENCY_HEADER] == "run1:daily-triage-report:daily_triage"
|
||||||
@@ -171,12 +171,14 @@ down. This is handed off to state-hub (see the coordination message / proposal);
|
|||||||
|
|
||||||
activity-core's only responsibilities under this model are thin:
|
activity-core's only responsibilities under this model are thin:
|
||||||
|
|
||||||
- **Idempotent writes (do now, in-repo):** attach a stable idempotency key
|
- **Idempotent writes — DONE (2026-06-23, in-repo):** added
|
||||||
(e.g. `run_id` + `instruction_id` + `event_type`) to every State Hub write so a
|
`activity_core/state_hub_write` (`idempotency_headers`); every State Hub write
|
||||||
beachhead flush — possibly replayed after an outage — cannot create duplicate
|
(report-sink, ops-evidence, schedule-miss) now sends a stable `Idempotency-Key`
|
||||||
`daily_triage`/progress events. The report sink already does a read-based dedup
|
header derived from `run_id:instruction_id:event_type`. The read-based
|
||||||
check (`_progress_exists`); make the guarantee explicit and not dependent on a
|
`_progress_exists` dedup is now best-effort (returns `False` on connection
|
||||||
live read.
|
error instead of hard-failing), so the guarantee lives on the keyed write, not
|
||||||
|
a live read. Tests in `tests/test_state_hub_write.py`; documented in
|
||||||
|
`docs/runbook.md`.
|
||||||
- **Adopt the beachhead endpoint (blocked on state-hub):** keep `STATE_HUB_URL`
|
- **Adopt the beachhead endpoint (blocked on state-hub):** keep `STATE_HUB_URL`
|
||||||
pointed at the local beachhead, and **retire the bespoke
|
pointed at the local beachhead, and **retire the bespoke
|
||||||
`actcore-state-hub-bridge` proxy** (the inline `hostNetwork` proxy in
|
`actcore-state-hub-bridge` proxy** (the inline `hostNetwork` proxy in
|
||||||
|
|||||||
Reference in New Issue
Block a user