feat(ACTIVITY-WP-0014): missed-fire detection & alert sink (T03)

Add activity_core/schedule_health: a pure evaluate_schedule_health() verdict
(built on Temporal's num_actions_missed_catchup_window plus a staleness check),
an async check_schedule_health() reader, and post_missed_fire_alert() that emits
a schedule_miss State Hub progress event. Makes a missed fire visible even under
misfire_policy=skip, where Temporal drops it by design. Unit tests for the
verdict logic.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-23 14:25:33 +02:00
parent 77af65afb2
commit 053d18b24a
3 changed files with 269 additions and 2 deletions

View File

@@ -0,0 +1,186 @@
"""Missed-fire detection for cron schedules (ACTIVITY-WP-0014, T03).
Even with a catchup window configured, an operator wants to *know* when a fire
was missed — especially under ``misfire_policy: skip`` where missed fires are
dropped by design and leave no run and no failure event. This module turns the
schedule's own bookkeeping into an explicit verdict and an optional State Hub
alert so a miss is never invisible again.
Temporal already counts fires that were dropped because they fell outside the
catchup window in ``ScheduleInfo.num_actions_missed_catchup_window``. We surface
that, plus a staleness check on the most recent fire, as a ``ScheduleHealth``
verdict. The verdict logic is a pure function so it is testable without a live
Temporal server; ``check_schedule_health`` is the thin async reader.
"""
from __future__ import annotations
import os
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import Any
from uuid import UUID
import httpx
from activity_core.schedule_manager import schedule_id
_DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000"
@dataclass(frozen=True)
class ScheduleHealth:
"""Verdict for a single schedule's recent firing behaviour."""
activity_id: str
healthy: bool
missed_catchup_window: int
last_fired_at: datetime | None
staleness: timedelta | None
reasons: list[str] = field(default_factory=list)
@property
def missed(self) -> bool:
return not self.healthy
def evaluate_schedule_health(
*,
activity_id: str,
missed_catchup_window: int,
last_fired_at: datetime | None,
now: datetime,
expected_interval: timedelta | None = None,
tolerance: timedelta = timedelta(minutes=10),
) -> ScheduleHealth:
"""Pure verdict: was a fire missed?
A schedule is unhealthy if Temporal dropped any fire past the catchup window,
or — when ``expected_interval`` is known — if the most recent fire is older
than one interval plus ``tolerance`` (i.e. a fire should have happened and
did not).
"""
reasons: list[str] = []
if missed_catchup_window > 0:
reasons.append(
f"{missed_catchup_window} fire(s) dropped outside the catchup window"
)
staleness: timedelta | None = None
if last_fired_at is not None:
staleness = now - last_fired_at
if expected_interval is not None and staleness > expected_interval + tolerance:
reasons.append(
f"last fire was {staleness} ago, exceeding the expected "
f"{expected_interval} interval"
)
elif expected_interval is not None:
reasons.append("no recorded fire for a schedule that should have fired")
return ScheduleHealth(
activity_id=activity_id,
healthy=not reasons,
missed_catchup_window=missed_catchup_window,
last_fired_at=last_fired_at,
staleness=staleness,
reasons=reasons,
)
def _extract_info(desc: Any) -> tuple[int, datetime | None]:
"""Pull (missed_catchup_window, last_fired_at) from a ScheduleDescription.
Accesses are defensive so a Temporal SDK field rename degrades to "unknown"
rather than raising inside an operational health check.
"""
info = getattr(desc, "info", None)
missed = int(getattr(info, "num_actions_missed_catchup_window", 0) or 0)
last_fired: datetime | None = None
recent = getattr(info, "recent_actions", None) or []
times = [
getattr(a, "scheduled_at", None) or getattr(a, "started_at", None)
for a in recent
]
times = [t for t in times if t is not None]
if times:
last_fired = max(times)
return missed, last_fired
async def check_schedule_health(
client: Any,
activity_id: str | UUID,
*,
now: datetime | None = None,
expected_interval: timedelta | None = None,
tolerance: timedelta = timedelta(minutes=10),
) -> ScheduleHealth:
"""Describe the schedule for ``activity_id`` and evaluate its health."""
now = now or datetime.now(tz=timezone.utc)
handle = client.get_schedule_handle(schedule_id(activity_id))
desc = await handle.describe()
missed, last_fired = _extract_info(desc)
return evaluate_schedule_health(
activity_id=str(activity_id),
missed_catchup_window=missed,
last_fired_at=last_fired,
now=now,
expected_interval=expected_interval,
tolerance=tolerance,
)
def post_missed_fire_alert(
health: ScheduleHealth,
*,
state_hub_url: str | None = None,
author: str = "activity-core",
topic_id: str | None = None,
workstream_id: str | None = None,
timeout_seconds: float = 10.0,
) -> dict[str, Any]:
"""Post a ``schedule_miss`` progress event to State Hub for an unhealthy schedule.
No-op (returns ``status: ok``) when the schedule is healthy, so callers can
invoke unconditionally.
"""
if health.healthy:
return {"type": "schedule-miss-alert", "status": "ok"}
base_url = state_hub_url or os.environ.get("STATE_HUB_URL", _DEFAULT_STATE_HUB_URL)
base_url = str(base_url).rstrip("/")
body: dict[str, Any] = {
"event_type": "schedule_miss",
"author": author,
"summary": (
f"Schedule {health.activity_id} missed a fire: "
+ "; ".join(health.reasons)
),
"detail": {
"activity_id": health.activity_id,
"missed_catchup_window": health.missed_catchup_window,
"last_fired_at": (
health.last_fired_at.isoformat() if health.last_fired_at else None
),
"staleness_seconds": (
health.staleness.total_seconds() if health.staleness else None
),
"reasons": health.reasons,
},
}
if topic_id:
body["topic_id"] = topic_id
if workstream_id:
body["workstream_id"] = workstream_id
resp = httpx.post(f"{base_url}/progress/", json=body, timeout=timeout_seconds)
resp.raise_for_status()
data = resp.json()
return {
"type": "schedule-miss-alert",
"status": "posted",
"progress_id": data.get("id"),
}