Files
activity-core/src/activity_core/schedule_health.py
tegwick 88fe359385 feat(ACTIVITY-WP-0014): idempotency-keyed State Hub writes (T05, in-repo part)
Add activity_core/state_hub_write: every State Hub write (report-sink,
ops-evidence, schedule-miss) now sends a stable Idempotency-Key header derived
from run_id:instruction_id:event_type. Makes writes safe to buffer/replay under
the future state-hub beachhead without duplicate progress/triage events. The
read-based _progress_exists dedup is now best-effort (returns False on connection
error instead of hard-failing), so the guarantee lives on the keyed write rather
than a live read. Tests + runbook note. Endpoint adoption / proxy retirement stays
blocked on the state-hub beachhead capability.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 21:38:46 +02:00

195 lines
6.4 KiB
Python

"""Missed-fire detection for cron schedules (ACTIVITY-WP-0014, T03).
Even with a catchup window configured, an operator wants to *know* when a fire
was missed — especially under ``misfire_policy: skip`` where missed fires are
dropped by design and leave no run and no failure event. This module turns the
schedule's own bookkeeping into an explicit verdict and an optional State Hub
alert so a miss is never invisible again.
Temporal already counts fires that were dropped because they fell outside the
catchup window in ``ScheduleInfo.num_actions_missed_catchup_window``. We surface
that, plus a staleness check on the most recent fire, as a ``ScheduleHealth``
verdict. The verdict logic is a pure function so it is testable without a live
Temporal server; ``check_schedule_health`` is the thin async reader.
"""
from __future__ import annotations
import os
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import Any
from uuid import UUID
import httpx
from activity_core.schedule_manager import schedule_id
from activity_core.state_hub_write import idempotency_headers
_DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000"
@dataclass(frozen=True)
class ScheduleHealth:
"""Verdict for a single schedule's recent firing behaviour."""
activity_id: str
healthy: bool
missed_catchup_window: int
last_fired_at: datetime | None
staleness: timedelta | None
reasons: list[str] = field(default_factory=list)
@property
def missed(self) -> bool:
return not self.healthy
def evaluate_schedule_health(
*,
activity_id: str,
missed_catchup_window: int,
last_fired_at: datetime | None,
now: datetime,
expected_interval: timedelta | None = None,
tolerance: timedelta = timedelta(minutes=10),
) -> ScheduleHealth:
"""Pure verdict: was a fire missed?
A schedule is unhealthy if Temporal dropped any fire past the catchup window,
or — when ``expected_interval`` is known — if the most recent fire is older
than one interval plus ``tolerance`` (i.e. a fire should have happened and
did not).
"""
reasons: list[str] = []
if missed_catchup_window > 0:
reasons.append(
f"{missed_catchup_window} fire(s) dropped outside the catchup window"
)
staleness: timedelta | None = None
if last_fired_at is not None:
staleness = now - last_fired_at
if expected_interval is not None and staleness > expected_interval + tolerance:
reasons.append(
f"last fire was {staleness} ago, exceeding the expected "
f"{expected_interval} interval"
)
elif expected_interval is not None:
reasons.append("no recorded fire for a schedule that should have fired")
return ScheduleHealth(
activity_id=activity_id,
healthy=not reasons,
missed_catchup_window=missed_catchup_window,
last_fired_at=last_fired_at,
staleness=staleness,
reasons=reasons,
)
def _extract_info(desc: Any) -> tuple[int, datetime | None]:
"""Pull (missed_catchup_window, last_fired_at) from a ScheduleDescription.
Accesses are defensive so a Temporal SDK field rename degrades to "unknown"
rather than raising inside an operational health check.
"""
info = getattr(desc, "info", None)
missed = int(getattr(info, "num_actions_missed_catchup_window", 0) or 0)
last_fired: datetime | None = None
recent = getattr(info, "recent_actions", None) or []
times = [
getattr(a, "scheduled_at", None) or getattr(a, "started_at", None)
for a in recent
]
times = [t for t in times if t is not None]
if times:
last_fired = max(times)
return missed, last_fired
async def check_schedule_health(
client: Any,
activity_id: str | UUID,
*,
now: datetime | None = None,
expected_interval: timedelta | None = None,
tolerance: timedelta = timedelta(minutes=10),
) -> ScheduleHealth:
"""Describe the schedule for ``activity_id`` and evaluate its health."""
now = now or datetime.now(tz=timezone.utc)
handle = client.get_schedule_handle(schedule_id(activity_id))
desc = await handle.describe()
missed, last_fired = _extract_info(desc)
return evaluate_schedule_health(
activity_id=str(activity_id),
missed_catchup_window=missed,
last_fired_at=last_fired,
now=now,
expected_interval=expected_interval,
tolerance=tolerance,
)
def post_missed_fire_alert(
health: ScheduleHealth,
*,
state_hub_url: str | None = None,
author: str = "activity-core",
topic_id: str | None = None,
workstream_id: str | None = None,
timeout_seconds: float = 10.0,
) -> dict[str, Any]:
"""Post a ``schedule_miss`` progress event to State Hub for an unhealthy schedule.
No-op (returns ``status: ok``) when the schedule is healthy, so callers can
invoke unconditionally.
"""
if health.healthy:
return {"type": "schedule-miss-alert", "status": "ok"}
base_url = state_hub_url or os.environ.get("STATE_HUB_URL", _DEFAULT_STATE_HUB_URL)
base_url = str(base_url).rstrip("/")
body: dict[str, Any] = {
"event_type": "schedule_miss",
"author": author,
"summary": (
f"Schedule {health.activity_id} missed a fire: "
+ "; ".join(health.reasons)
),
"detail": {
"activity_id": health.activity_id,
"missed_catchup_window": health.missed_catchup_window,
"last_fired_at": (
health.last_fired_at.isoformat() if health.last_fired_at else None
),
"staleness_seconds": (
health.staleness.total_seconds() if health.staleness else None
),
"reasons": health.reasons,
},
}
if topic_id:
body["topic_id"] = topic_id
if workstream_id:
body["workstream_id"] = workstream_id
# Dedup repeated alerts for the same missed window (same schedule + last fire).
last_fired = health.last_fired_at.isoformat() if health.last_fired_at else "none"
resp = httpx.post(
f"{base_url}/progress/",
json=body,
headers=idempotency_headers("schedule_miss", health.activity_id, last_fired),
timeout=timeout_seconds,
)
resp.raise_for_status()
data = resp.json()
return {
"type": "schedule-miss-alert",
"status": "posted",
"progress_id": data.get("id"),
}