From a83b117f6003962565a09df70b18c6d683a82b14 Mon Sep 17 00:00:00 2001 From: tegwick Date: Tue, 23 Jun 2026 14:15:45 +0200 Subject: [PATCH] feat(ACTIVITY-WP-0014): explicit run-miss recovery policies (T02, T04) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Set Temporal catchup_window on cron schedules so a fire missed during a worker/Temporal outage is no longer silently dropped. Redefine misfire_policy into three explicit modes — skip, catchup_all, catchup_latest — mapping to (catchup_window, overlap) pairs; legacy catchup/compress aliased. Add catchup_window_seconds override. Remove the ad-hoc upsert-time 1h backfill in favour of native catchup. Apply catchup_latest to daily-statehub-wsjf-triage in the Railiance runtime manifest and document run-miss policies in the runbook. Co-Authored-By: Claude Opus 4.8 --- docs/runbook.md | 28 +++++++ k8s/railiance/20-runtime.yaml | 5 +- src/activity_core/models.py | 13 +++- src/activity_core/schedule_manager.py | 78 ++++++++++++++----- tests/test_schedule_lifecycle.py | 72 +++++++++++++++++ ...ITY-WP-0014-schedule-misfire-robustness.md | 14 ++-- 6 files changed, 181 insertions(+), 29 deletions(-) diff --git a/docs/runbook.md b/docs/runbook.md index 12d5e6d..fc81eb1 100644 --- a/docs/runbook.md +++ b/docs/runbook.md @@ -333,6 +333,31 @@ the same durable consumer name provides automatic failover. --- +## Run-miss recovery policies (cron triggers) + +A cron fire is **missed** when the worker or Temporal is unavailable at trigger +time. `trigger_config.misfire_policy` selects what happens when the system +recovers. Each policy combines a Temporal **catchup window** (how far back missed +fires are recovered) with an **overlap policy** (what to do if a recovered fire +would start while a prior run is still executing): + +| `misfire_policy` | Behaviour | Default catchup window | Overlap | +| --- | --- | --- | --- | +| `skip` | Run on trigger or skip — a missed fire is never recovered | 60s grace | `SKIP` | +| `catchup_all` | Recover **every** fire missed during the outage | 365 days | `BUFFER_ALL` | +| `catchup_latest` | Recover only the **most recent** missed fire; no backlog | 24h | `BUFFER_ONE` | + +Set `trigger_config.catchup_window_seconds` to override the per-policy default +(e.g. an hourly definition using `catchup_latest` should set it to ~3600 so a +single missed hour is recovered but older ones are not). + +Legacy values are still accepted: `catchup` → `catchup_all`, +`compress` → `catchup_latest`. + +> **Why this exists:** before ACTIVITY-WP-0014 no catchup window was set, so a +> brief outage at trigger time silently dropped the fire with no recovery and no +> log line. The `daily-statehub-wsjf-triage` definition now uses `catchup_latest`. + ## Troubleshooting ### Worker fails to start: "ACTCORE_DB_URL is required" @@ -342,6 +367,9 @@ Set the environment variable before running the worker. 1. Check Temporal UI → Schedules tab for the schedule status. 2. Ensure `enabled=True` on the ActivityDefinition (paused schedules don't fire). 3. Verify the cron expression with: `docker exec temporal-admin-tools temporal schedule describe --schedule-id activity-schedule-` +4. If a fire was **missed entirely** (no run, no failure event) during an outage, + check `misfire_policy` — under `skip` missed fires are dropped by design. Use + `catchup_all` or `catchup_latest` to recover them. See *Run-miss recovery policies*. ### Event not routing 1. Check NATS monitoring: http://localhost:8222/jsz to verify the `ACTIVITY_EVENTS` stream exists. diff --git a/k8s/railiance/20-runtime.yaml b/k8s/railiance/20-runtime.yaml index ed5d2e0..06157d6 100644 --- a/k8s/railiance/20-runtime.yaml +++ b/k8s/railiance/20-runtime.yaml @@ -47,7 +47,10 @@ data: type: cron cron_expression: "20 7 * * *" timezone: Europe/Berlin - misfire_policy: skip + # ACTIVITY-WP-0014: recover the most recent missed daily fire when the + # worker/Temporal was unavailable at trigger time, without accumulating a + # backlog after a multi-day outage. + misfire_policy: catchup_latest context_sources: - type: static bind_to: context.prompt_path diff --git a/src/activity_core/models.py b/src/activity_core/models.py index 72e9b85..d31483f 100644 --- a/src/activity_core/models.py +++ b/src/activity_core/models.py @@ -49,7 +49,18 @@ class CronTriggerConfig(BaseModel): ) timezone: str = Field(default="UTC", description="IANA timezone name.") jitter_seconds: int = Field(default=0, ge=0) - misfire_policy: Literal["skip", "catchup", "compress"] = Field(default="skip") + # Run-miss recovery behaviour (ACTIVITY-WP-0014). What happens when a fire is + # missed because the worker / Temporal was unavailable at trigger time: + # skip - run on trigger or skip; a missed fire is never recovered + # catchup_all - recover every fire missed during the outage window + # catchup_latest - recover only the most recent missed fire; do not accumulate + # Legacy aliases are accepted: catchup → catchup_all, compress → catchup_latest. + misfire_policy: Literal[ + "skip", "catchup_all", "catchup_latest", "catchup", "compress" + ] = Field(default="skip") + # Override the per-policy default catchup window (how far back Temporal will + # recover missed fires after an outage). None uses the policy default. + catchup_window_seconds: int | None = Field(default=None, ge=0) class EventTriggerConfig(BaseModel): diff --git a/src/activity_core/schedule_manager.py b/src/activity_core/schedule_manager.py index bcacbbe..76136f1 100644 --- a/src/activity_core/schedule_manager.py +++ b/src/activity_core/schedule_manager.py @@ -17,7 +17,6 @@ from temporalio.client import ( Schedule, ScheduleActionStartWorkflow, ScheduleAlreadyRunningError, - ScheduleBackfill, ScheduleCalendarSpec, ScheduleHandle, ScheduleOverlapPolicy, @@ -38,13 +37,49 @@ _ORCHESTRATOR_TASK_QUEUE = "orchestrator-tq" # RunActivityWorkflow detects this value and derives run dedup key from workflow_id. SCHEDULED_TRIGGER_KEY = "scheduled" -# T24: misfire_policy → ScheduleOverlapPolicy -_MISFIRE_TO_OVERLAP: dict[str, ScheduleOverlapPolicy] = { - "skip": ScheduleOverlapPolicy.SKIP, - "catchup": ScheduleOverlapPolicy.BUFFER_ALL, - "compress": ScheduleOverlapPolicy.BUFFER_ONE, +# ACTIVITY-WP-0014: misfire_policy → run-miss recovery behaviour. +# +# A "missed fire" happens when the worker / Temporal is unavailable at trigger +# time. Two Temporal levers together define the behaviour: +# - catchup_window: how far back the server will recover missed fires once it +# is healthy again. The previous code never set this, so a brief outage at +# trigger time silently dropped the fire with no recovery and no signal. +# - overlap: what to do when a (recovered) fire would start while a prior run +# is still executing. +# +# Legacy values (catchup, compress) are aliased onto the explicit names. +_MISFIRE_ALIASES: dict[str, str] = { + "catchup": "catchup_all", + "compress": "catchup_latest", } +# overlap policy + default catchup window (seconds) per normalised policy. +_SKIP_WINDOW_SECONDS = 60 +_CATCHUP_ALL_WINDOW_SECONDS = 365 * 24 * 3600 +_CATCHUP_LATEST_WINDOW_SECONDS = 24 * 3600 + +_MISFIRE_TO_OVERLAP: dict[str, ScheduleOverlapPolicy] = { + # Run on trigger or skip — recover nothing past a tiny grace window. + "skip": ScheduleOverlapPolicy.SKIP, + # Run on trigger or recover every missed fire during the outage window. + "catchup_all": ScheduleOverlapPolicy.BUFFER_ALL, + # Run on trigger or recover the most recent missed fire only; BUFFER_ONE + # buffers at most one start and drops the rest, so a backlog never accumulates. + "catchup_latest": ScheduleOverlapPolicy.BUFFER_ONE, +} + +_MISFIRE_DEFAULT_WINDOW: dict[str, int] = { + "skip": _SKIP_WINDOW_SECONDS, + "catchup_all": _CATCHUP_ALL_WINDOW_SECONDS, + "catchup_latest": _CATCHUP_LATEST_WINDOW_SECONDS, +} + + +def _normalize_misfire_policy(misfire_policy: str) -> str: + """Map legacy aliases onto the explicit run-miss policy names.""" + canonical = _MISFIRE_ALIASES.get(misfire_policy, misfire_policy) + return canonical if canonical in _MISFIRE_TO_OVERLAP else "skip" + def schedule_id(activity_id: str | UUID) -> str: """Return the canonical Temporal Schedule ID for an ActivityDefinition.""" @@ -57,7 +92,15 @@ def smoke_schedule_id(activity_id: str | UUID) -> str: def _overlap_policy(misfire_policy: str) -> ScheduleOverlapPolicy: - return _MISFIRE_TO_OVERLAP.get(misfire_policy, ScheduleOverlapPolicy.SKIP) + return _MISFIRE_TO_OVERLAP[_normalize_misfire_policy(misfire_policy)] + + +def _catchup_window(cfg: CronTriggerConfig) -> timedelta: + """Resolve the catchup window: explicit override, else the policy default.""" + if cfg.catchup_window_seconds is not None: + return timedelta(seconds=cfg.catchup_window_seconds) + policy = _normalize_misfire_policy(cfg.misfire_policy) + return timedelta(seconds=_MISFIRE_DEFAULT_WINDOW[policy]) def _build_schedule(defn: ActivityDefinition) -> Schedule: @@ -80,7 +123,10 @@ def _build_schedule(defn: ActivityDefinition) -> Schedule: jitter=timedelta(seconds=cfg.jitter_seconds) if cfg.jitter_seconds else None, ) - policy = SchedulePolicy(overlap=_overlap_policy(cfg.misfire_policy)) + policy = SchedulePolicy( + overlap=_overlap_policy(cfg.misfire_policy), + catchup_window=_catchup_window(cfg), + ) state = ScheduleState(paused=not defn.enabled) return Schedule(action=action, spec=spec, policy=policy, state=state) @@ -282,18 +328,10 @@ async def upsert_schedule(client: Client, defn: ActivityDefinition) -> ScheduleH else: await handle.pause(note="disabled via upsert_schedule") - # T24 catchup: backfill any fires missed in the last hour. - if isinstance(defn.trigger_config, CronTriggerConfig): - if defn.trigger_config.misfire_policy == "catchup": - now = datetime.now(tz=timezone.utc) - backfill_start = now - timedelta(hours=1) - await handle.backfill( - ScheduleBackfill( - start_at=backfill_start, - end_at=now, - overlap=ScheduleOverlapPolicy.BUFFER_ALL, - ) - ) + # ACTIVITY-WP-0014: missed-fire recovery is now handled natively by the + # schedule's catchup_window (see _build_schedule), which the server applies + # continuously after any outage — not only at upsert time. The previous + # ad-hoc 1-hour backfill is therefore no longer needed. return handle diff --git a/tests/test_schedule_lifecycle.py b/tests/test_schedule_lifecycle.py index 6a37d44..4835d96 100644 --- a/tests/test_schedule_lifecycle.py +++ b/tests/test_schedule_lifecycle.py @@ -37,6 +37,7 @@ def _make_defn( misfire_policy: str = "skip", enabled: bool = True, jitter: int = 0, + catchup_window_seconds: int | None = None, ) -> ActivityDefinition: return ActivityDefinition( id=uuid.uuid4(), @@ -46,6 +47,7 @@ def _make_defn( cron_expression=cron, misfire_policy=misfire_policy, jitter_seconds=jitter, + catchup_window_seconds=catchup_window_seconds, ), ) @@ -186,6 +188,76 @@ async def test_misfire_policy_compress_sets_overlap_buffer_one(env: WorkflowEnvi await delete_schedule(env.client, defn.id) +# ── ACTIVITY-WP-0014: explicit run-miss policies + catchup window ──────────── + +@pytest.mark.asyncio +async def test_skip_sets_short_catchup_window(env: WorkflowEnvironment) -> None: + """skip = run on trigger or skip: tiny grace window, no real recovery.""" + defn = _make_defn(misfire_policy="skip") + await upsert_schedule(env.client, defn) + + desc = await env.client.get_schedule_handle(schedule_id(defn.id)).describe() + assert desc.schedule.policy.overlap == ScheduleOverlapPolicy.SKIP + assert desc.schedule.policy.catchup_window == timedelta(seconds=60) + + await delete_schedule(env.client, defn.id) + + +@pytest.mark.asyncio +async def test_catchup_all_recovers_full_window(env: WorkflowEnvironment) -> None: + """catchup_all = recover every missed fire: long window, BUFFER_ALL.""" + defn = _make_defn(misfire_policy="catchup_all") + await upsert_schedule(env.client, defn) + + desc = await env.client.get_schedule_handle(schedule_id(defn.id)).describe() + assert desc.schedule.policy.overlap == ScheduleOverlapPolicy.BUFFER_ALL + assert desc.schedule.policy.catchup_window == timedelta(days=365) + + await delete_schedule(env.client, defn.id) + + +@pytest.mark.asyncio +async def test_catchup_latest_does_not_accumulate(env: WorkflowEnvironment) -> None: + """catchup_latest = recover only the most recent missed fire: BUFFER_ONE.""" + defn = _make_defn(misfire_policy="catchup_latest") + await upsert_schedule(env.client, defn) + + desc = await env.client.get_schedule_handle(schedule_id(defn.id)).describe() + assert desc.schedule.policy.overlap == ScheduleOverlapPolicy.BUFFER_ONE + assert desc.schedule.policy.catchup_window == timedelta(hours=24) + + await delete_schedule(env.client, defn.id) + + +@pytest.mark.asyncio +async def test_legacy_aliases_map_to_explicit_policies(env: WorkflowEnvironment) -> None: + """Legacy catchup/compress keep working and pick up the new catchup windows.""" + catchup = _make_defn(misfire_policy="catchup") + compress = _make_defn(misfire_policy="compress") + await upsert_schedule(env.client, catchup) + await upsert_schedule(env.client, compress) + + d1 = await env.client.get_schedule_handle(schedule_id(catchup.id)).describe() + d2 = await env.client.get_schedule_handle(schedule_id(compress.id)).describe() + assert d1.schedule.policy.catchup_window == timedelta(days=365) + assert d2.schedule.policy.catchup_window == timedelta(hours=24) + + await delete_schedule(env.client, catchup.id) + await delete_schedule(env.client, compress.id) + + +@pytest.mark.asyncio +async def test_explicit_catchup_window_override(env: WorkflowEnvironment) -> None: + """An explicit catchup_window_seconds overrides the per-policy default.""" + defn = _make_defn(misfire_policy="skip", catchup_window_seconds=7200) + await upsert_schedule(env.client, defn) + + desc = await env.client.get_schedule_handle(schedule_id(defn.id)).describe() + assert desc.schedule.policy.catchup_window == timedelta(hours=2) + + await delete_schedule(env.client, defn.id) + + @pytest.mark.asyncio async def test_schedule_smoke_test_creates_one_shot_schedule( env: WorkflowEnvironment, diff --git a/workplans/ACTIVITY-WP-0014-schedule-misfire-robustness.md b/workplans/ACTIVITY-WP-0014-schedule-misfire-robustness.md index c092ef5..6ab8ad5 100644 --- a/workplans/ACTIVITY-WP-0014-schedule-misfire-robustness.md +++ b/workplans/ACTIVITY-WP-0014-schedule-misfire-robustness.md @@ -9,7 +9,7 @@ owner: claude topic_slug: activity-core created: "2026-06-23" updated: "2026-06-23" -state_hub_workstream_id: "" +state_hub_workstream_id: "91b64686-5d17-4c86-bc9e-3d0ee6720cf5" --- # Schedule Misfire Robustness & Run-Miss Recovery Options @@ -66,7 +66,7 @@ Proposed mapping to a new `misfire_policy` value set (names open to review): id: ACTIVITY-WP-0014-T01 status: todo priority: high -state_hub_task_id: "" +state_hub_task_id: "c90ff214-9214-48c7-96b9-7d699528d5ab" ``` Bring up the ops-bridge tunnel (`bridge up state-hub-coulombcore`, @@ -81,9 +81,9 @@ calibration evidence is still wanted. Record findings in the workplan. ```task id: ACTIVITY-WP-0014-T02 -status: todo +status: done priority: high -state_hub_task_id: "" +state_hub_task_id: "19615562-4cb2-4f25-872f-505d6e40dcc5" ``` Add `catchup_window_seconds` to `CronTriggerConfig` and redefine `misfire_policy` @@ -99,7 +99,7 @@ map). Unit tests for each mode's `(catchup_window, overlap)` mapping. id: ACTIVITY-WP-0014-T03 status: todo priority: medium -state_hub_task_id: "" +state_hub_task_id: "dbedd96a-59ca-4b83-bce6-35755b076807" ``` Detect when a scheduled definition has no successful run within its expected @@ -112,9 +112,9 @@ be invisible. ```task id: ACTIVITY-WP-0014-T04 -status: wait +status: progress priority: medium -state_hub_task_id: "" +state_hub_task_id: "04e9d1d2-1192-4402-9402-b12c5d7d44e5" ``` Choose and set the appropriate `misfire_policy` for `daily-statehub-wsjf-triage`