"""ACTIVITY-WP-0014 T05: idempotency-keyed State Hub writes.""" from __future__ import annotations import httpx import pytest from activity_core import report_sinks from activity_core.state_hub_write import ( IDEMPOTENCY_HEADER, idempotency_headers, idempotency_key, ) def test_key_is_stable_and_deterministic() -> None: a = idempotency_key("run1", "daily-triage-report", "daily_triage") b = idempotency_key("run1", "daily-triage-report", "daily_triage") assert a == b == "run1:daily-triage-report:daily_triage" def test_key_shape_stable_with_missing_parts() -> None: assert idempotency_key("run1", None, "daily_triage") == "run1::daily_triage" def test_key_sanitizes_control_and_whitespace() -> None: key = idempotency_key("run 1", "a\tb", "x\n") assert "\t" not in key and "\n" not in key and " " not in key def test_headers_carry_the_key() -> None: headers = idempotency_headers("run1", "i", "e") assert headers == {IDEMPOTENCY_HEADER: "run1:i:e"} def test_distinct_identities_get_distinct_keys() -> None: assert idempotency_key("r", "i", "daily_triage") != idempotency_key( "r", "i", "schedule_miss" ) def test_progress_exists_is_best_effort_on_connection_error(monkeypatch) -> None: """A down State Hub must not hard-fail the dedup read; it returns False so the keyed write can still proceed.""" def _boom(*args, **kwargs): raise httpx.ConnectError("Connection refused") monkeypatch.setattr(report_sinks.httpx, "get", _boom) assert ( report_sinks._progress_exists( "http://127.0.0.1:8000", "run1", "daily-triage-report", "daily_triage" ) is False ) def test_report_sink_post_sends_idempotency_header(monkeypatch) -> None: """The state-hub-progress write carries a stable Idempotency-Key header.""" captured: dict[str, object] = {} monkeypatch.setattr(report_sinks, "_progress_exists", lambda *a, **k: False) class _Resp: def raise_for_status(self) -> None: ... def json(self) -> dict[str, str]: return {"id": "pid-1"} def _capture_post(url, json, headers, timeout): # noqa: A002 captured["headers"] = headers return _Resp() monkeypatch.setattr(report_sinks.httpx, "post", _capture_post) payload = {"run_id": "run1", "activity_id": "act1", "scheduled_for": None} report_entry = {"instruction_id": "daily-triage-report", "report": {"summary": "s"}} sink = {"event_type": "daily_triage"} result = report_sinks._post_state_hub_progress(payload, report_entry, sink) assert result["status"] == "posted" assert captured["headers"][IDEMPOTENCY_HEADER] == "run1:daily-triage-report:daily_triage"