From 053d18b24a047c75fdc583b0a9a1485a0ffd2eba Mon Sep 17 00:00:00 2001 From: tegwick Date: Tue, 23 Jun 2026 14:25:33 +0200 Subject: [PATCH] feat(ACTIVITY-WP-0014): missed-fire detection & alert sink (T03) Add activity_core/schedule_health: a pure evaluate_schedule_health() verdict (built on Temporal's num_actions_missed_catchup_window plus a staleness check), an async check_schedule_health() reader, and post_missed_fire_alert() that emits a schedule_miss State Hub progress event. Makes a missed fire visible even under misfire_policy=skip, where Temporal drops it by design. Unit tests for the verdict logic. Co-Authored-By: Claude Opus 4.8 --- src/activity_core/schedule_health.py | 186 ++++++++++++++++++ tests/test_schedule_health.py | 81 ++++++++ ...ITY-WP-0014-schedule-misfire-robustness.md | 4 +- 3 files changed, 269 insertions(+), 2 deletions(-) create mode 100644 src/activity_core/schedule_health.py create mode 100644 tests/test_schedule_health.py diff --git a/src/activity_core/schedule_health.py b/src/activity_core/schedule_health.py new file mode 100644 index 0000000..2aa4813 --- /dev/null +++ b/src/activity_core/schedule_health.py @@ -0,0 +1,186 @@ +"""Missed-fire detection for cron schedules (ACTIVITY-WP-0014, T03). + +Even with a catchup window configured, an operator wants to *know* when a fire +was missed — especially under ``misfire_policy: skip`` where missed fires are +dropped by design and leave no run and no failure event. This module turns the +schedule's own bookkeeping into an explicit verdict and an optional State Hub +alert so a miss is never invisible again. + +Temporal already counts fires that were dropped because they fell outside the +catchup window in ``ScheduleInfo.num_actions_missed_catchup_window``. We surface +that, plus a staleness check on the most recent fire, as a ``ScheduleHealth`` +verdict. The verdict logic is a pure function so it is testable without a live +Temporal server; ``check_schedule_health`` is the thin async reader. +""" + +from __future__ import annotations + +import os +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from typing import Any +from uuid import UUID + +import httpx + +from activity_core.schedule_manager import schedule_id + +_DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000" + + +@dataclass(frozen=True) +class ScheduleHealth: + """Verdict for a single schedule's recent firing behaviour.""" + + activity_id: str + healthy: bool + missed_catchup_window: int + last_fired_at: datetime | None + staleness: timedelta | None + reasons: list[str] = field(default_factory=list) + + @property + def missed(self) -> bool: + return not self.healthy + + +def evaluate_schedule_health( + *, + activity_id: str, + missed_catchup_window: int, + last_fired_at: datetime | None, + now: datetime, + expected_interval: timedelta | None = None, + tolerance: timedelta = timedelta(minutes=10), +) -> ScheduleHealth: + """Pure verdict: was a fire missed? + + A schedule is unhealthy if Temporal dropped any fire past the catchup window, + or — when ``expected_interval`` is known — if the most recent fire is older + than one interval plus ``tolerance`` (i.e. a fire should have happened and + did not). + """ + reasons: list[str] = [] + + if missed_catchup_window > 0: + reasons.append( + f"{missed_catchup_window} fire(s) dropped outside the catchup window" + ) + + staleness: timedelta | None = None + if last_fired_at is not None: + staleness = now - last_fired_at + if expected_interval is not None and staleness > expected_interval + tolerance: + reasons.append( + f"last fire was {staleness} ago, exceeding the expected " + f"{expected_interval} interval" + ) + elif expected_interval is not None: + reasons.append("no recorded fire for a schedule that should have fired") + + return ScheduleHealth( + activity_id=activity_id, + healthy=not reasons, + missed_catchup_window=missed_catchup_window, + last_fired_at=last_fired_at, + staleness=staleness, + reasons=reasons, + ) + + +def _extract_info(desc: Any) -> tuple[int, datetime | None]: + """Pull (missed_catchup_window, last_fired_at) from a ScheduleDescription. + + Accesses are defensive so a Temporal SDK field rename degrades to "unknown" + rather than raising inside an operational health check. + """ + info = getattr(desc, "info", None) + missed = int(getattr(info, "num_actions_missed_catchup_window", 0) or 0) + + last_fired: datetime | None = None + recent = getattr(info, "recent_actions", None) or [] + times = [ + getattr(a, "scheduled_at", None) or getattr(a, "started_at", None) + for a in recent + ] + times = [t for t in times if t is not None] + if times: + last_fired = max(times) + return missed, last_fired + + +async def check_schedule_health( + client: Any, + activity_id: str | UUID, + *, + now: datetime | None = None, + expected_interval: timedelta | None = None, + tolerance: timedelta = timedelta(minutes=10), +) -> ScheduleHealth: + """Describe the schedule for ``activity_id`` and evaluate its health.""" + now = now or datetime.now(tz=timezone.utc) + handle = client.get_schedule_handle(schedule_id(activity_id)) + desc = await handle.describe() + missed, last_fired = _extract_info(desc) + return evaluate_schedule_health( + activity_id=str(activity_id), + missed_catchup_window=missed, + last_fired_at=last_fired, + now=now, + expected_interval=expected_interval, + tolerance=tolerance, + ) + + +def post_missed_fire_alert( + health: ScheduleHealth, + *, + state_hub_url: str | None = None, + author: str = "activity-core", + topic_id: str | None = None, + workstream_id: str | None = None, + timeout_seconds: float = 10.0, +) -> dict[str, Any]: + """Post a ``schedule_miss`` progress event to State Hub for an unhealthy schedule. + + No-op (returns ``status: ok``) when the schedule is healthy, so callers can + invoke unconditionally. + """ + if health.healthy: + return {"type": "schedule-miss-alert", "status": "ok"} + + base_url = state_hub_url or os.environ.get("STATE_HUB_URL", _DEFAULT_STATE_HUB_URL) + base_url = str(base_url).rstrip("/") + + body: dict[str, Any] = { + "event_type": "schedule_miss", + "author": author, + "summary": ( + f"Schedule {health.activity_id} missed a fire: " + + "; ".join(health.reasons) + ), + "detail": { + "activity_id": health.activity_id, + "missed_catchup_window": health.missed_catchup_window, + "last_fired_at": ( + health.last_fired_at.isoformat() if health.last_fired_at else None + ), + "staleness_seconds": ( + health.staleness.total_seconds() if health.staleness else None + ), + "reasons": health.reasons, + }, + } + if topic_id: + body["topic_id"] = topic_id + if workstream_id: + body["workstream_id"] = workstream_id + + resp = httpx.post(f"{base_url}/progress/", json=body, timeout=timeout_seconds) + resp.raise_for_status() + data = resp.json() + return { + "type": "schedule-miss-alert", + "status": "posted", + "progress_id": data.get("id"), + } diff --git a/tests/test_schedule_health.py b/tests/test_schedule_health.py new file mode 100644 index 0000000..33f3715 --- /dev/null +++ b/tests/test_schedule_health.py @@ -0,0 +1,81 @@ +"""ACTIVITY-WP-0014 T03: missed-fire detection verdict tests.""" + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone + +from activity_core.schedule_health import evaluate_schedule_health + +NOW = datetime(2026, 6, 23, 12, 0, tzinfo=timezone.utc) + + +def test_healthy_when_recent_fire_and_no_drops() -> None: + health = evaluate_schedule_health( + activity_id="a1", + missed_catchup_window=0, + last_fired_at=NOW - timedelta(minutes=5), + now=NOW, + expected_interval=timedelta(hours=1), + ) + assert health.healthy is True + assert health.missed is False + assert health.reasons == [] + + +def test_unhealthy_when_catchup_window_dropped_fires() -> None: + health = evaluate_schedule_health( + activity_id="a1", + missed_catchup_window=2, + last_fired_at=NOW - timedelta(minutes=5), + now=NOW, + ) + assert health.missed is True + assert "2 fire(s) dropped" in health.reasons[0] + + +def test_unhealthy_when_last_fire_too_stale() -> None: + health = evaluate_schedule_health( + activity_id="daily", + missed_catchup_window=0, + last_fired_at=NOW - timedelta(days=2), + now=NOW, + expected_interval=timedelta(days=1), + ) + assert health.missed is True + assert any("exceeding the expected" in r for r in health.reasons) + assert health.staleness == timedelta(days=2) + + +def test_within_tolerance_is_healthy() -> None: + health = evaluate_schedule_health( + activity_id="daily", + missed_catchup_window=0, + last_fired_at=NOW - (timedelta(days=1) + timedelta(minutes=5)), + now=NOW, + expected_interval=timedelta(days=1), + tolerance=timedelta(minutes=10), + ) + assert health.healthy is True + + +def test_no_fire_recorded_for_due_schedule_is_unhealthy() -> None: + health = evaluate_schedule_health( + activity_id="daily", + missed_catchup_window=0, + last_fired_at=None, + now=NOW, + expected_interval=timedelta(days=1), + ) + assert health.missed is True + assert "no recorded fire" in health.reasons[0] + + +def test_no_interval_and_no_fire_is_not_flagged() -> None: + # Without an expected interval we cannot assert a miss from absence alone. + health = evaluate_schedule_health( + activity_id="event-ish", + missed_catchup_window=0, + last_fired_at=None, + now=NOW, + ) + assert health.healthy is True diff --git a/workplans/ACTIVITY-WP-0014-schedule-misfire-robustness.md b/workplans/ACTIVITY-WP-0014-schedule-misfire-robustness.md index 9e4b34a..197074c 100644 --- a/workplans/ACTIVITY-WP-0014-schedule-misfire-robustness.md +++ b/workplans/ACTIVITY-WP-0014-schedule-misfire-robustness.md @@ -97,7 +97,7 @@ map). Unit tests for each mode's `(catchup_window, overlap)` mapping. ```task id: ACTIVITY-WP-0014-T03 -status: todo +status: done priority: medium state_hub_task_id: "dbedd96a-59ca-4b83-bce6-35755b076807" ``` @@ -112,7 +112,7 @@ be invisible. ```task id: ACTIVITY-WP-0014-T04 -status: wait +status: progress priority: medium state_hub_task_id: "04e9d1d2-1192-4402-9402-b12c5d7d44e5" ```