generated from coulomb/repo-seed
feat(ACTIVITY-WP-0014): explicit run-miss recovery policies (T02, T04)
Set Temporal catchup_window on cron schedules so a fire missed during a worker/Temporal outage is no longer silently dropped. Redefine misfire_policy into three explicit modes — skip, catchup_all, catchup_latest — mapping to (catchup_window, overlap) pairs; legacy catchup/compress aliased. Add catchup_window_seconds override. Remove the ad-hoc upsert-time 1h backfill in favour of native catchup. Apply catchup_latest to daily-statehub-wsjf-triage in the Railiance runtime manifest and document run-miss policies in the runbook. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -333,6 +333,31 @@ the same durable consumer name provides automatic failover.
|
||||
|
||||
---
|
||||
|
||||
## Run-miss recovery policies (cron triggers)
|
||||
|
||||
A cron fire is **missed** when the worker or Temporal is unavailable at trigger
|
||||
time. `trigger_config.misfire_policy` selects what happens when the system
|
||||
recovers. Each policy combines a Temporal **catchup window** (how far back missed
|
||||
fires are recovered) with an **overlap policy** (what to do if a recovered fire
|
||||
would start while a prior run is still executing):
|
||||
|
||||
| `misfire_policy` | Behaviour | Default catchup window | Overlap |
|
||||
| --- | --- | --- | --- |
|
||||
| `skip` | Run on trigger or skip — a missed fire is never recovered | 60s grace | `SKIP` |
|
||||
| `catchup_all` | Recover **every** fire missed during the outage | 365 days | `BUFFER_ALL` |
|
||||
| `catchup_latest` | Recover only the **most recent** missed fire; no backlog | 24h | `BUFFER_ONE` |
|
||||
|
||||
Set `trigger_config.catchup_window_seconds` to override the per-policy default
|
||||
(e.g. an hourly definition using `catchup_latest` should set it to ~3600 so a
|
||||
single missed hour is recovered but older ones are not).
|
||||
|
||||
Legacy values are still accepted: `catchup` → `catchup_all`,
|
||||
`compress` → `catchup_latest`.
|
||||
|
||||
> **Why this exists:** before ACTIVITY-WP-0014 no catchup window was set, so a
|
||||
> brief outage at trigger time silently dropped the fire with no recovery and no
|
||||
> log line. The `daily-statehub-wsjf-triage` definition now uses `catchup_latest`.
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Worker fails to start: "ACTCORE_DB_URL is required"
|
||||
@@ -342,6 +367,9 @@ Set the environment variable before running the worker.
|
||||
1. Check Temporal UI → Schedules tab for the schedule status.
|
||||
2. Ensure `enabled=True` on the ActivityDefinition (paused schedules don't fire).
|
||||
3. Verify the cron expression with: `docker exec temporal-admin-tools temporal schedule describe --schedule-id activity-schedule-<uuid>`
|
||||
4. If a fire was **missed entirely** (no run, no failure event) during an outage,
|
||||
check `misfire_policy` — under `skip` missed fires are dropped by design. Use
|
||||
`catchup_all` or `catchup_latest` to recover them. See *Run-miss recovery policies*.
|
||||
|
||||
### Event not routing
|
||||
1. Check NATS monitoring: http://localhost:8222/jsz to verify the `ACTIVITY_EVENTS` stream exists.
|
||||
|
||||
@@ -47,7 +47,10 @@ data:
|
||||
type: cron
|
||||
cron_expression: "20 7 * * *"
|
||||
timezone: Europe/Berlin
|
||||
misfire_policy: skip
|
||||
# ACTIVITY-WP-0014: recover the most recent missed daily fire when the
|
||||
# worker/Temporal was unavailable at trigger time, without accumulating a
|
||||
# backlog after a multi-day outage.
|
||||
misfire_policy: catchup_latest
|
||||
context_sources:
|
||||
- type: static
|
||||
bind_to: context.prompt_path
|
||||
|
||||
@@ -49,7 +49,18 @@ class CronTriggerConfig(BaseModel):
|
||||
)
|
||||
timezone: str = Field(default="UTC", description="IANA timezone name.")
|
||||
jitter_seconds: int = Field(default=0, ge=0)
|
||||
misfire_policy: Literal["skip", "catchup", "compress"] = Field(default="skip")
|
||||
# Run-miss recovery behaviour (ACTIVITY-WP-0014). What happens when a fire is
|
||||
# missed because the worker / Temporal was unavailable at trigger time:
|
||||
# skip - run on trigger or skip; a missed fire is never recovered
|
||||
# catchup_all - recover every fire missed during the outage window
|
||||
# catchup_latest - recover only the most recent missed fire; do not accumulate
|
||||
# Legacy aliases are accepted: catchup → catchup_all, compress → catchup_latest.
|
||||
misfire_policy: Literal[
|
||||
"skip", "catchup_all", "catchup_latest", "catchup", "compress"
|
||||
] = Field(default="skip")
|
||||
# Override the per-policy default catchup window (how far back Temporal will
|
||||
# recover missed fires after an outage). None uses the policy default.
|
||||
catchup_window_seconds: int | None = Field(default=None, ge=0)
|
||||
|
||||
|
||||
class EventTriggerConfig(BaseModel):
|
||||
|
||||
@@ -17,7 +17,6 @@ from temporalio.client import (
|
||||
Schedule,
|
||||
ScheduleActionStartWorkflow,
|
||||
ScheduleAlreadyRunningError,
|
||||
ScheduleBackfill,
|
||||
ScheduleCalendarSpec,
|
||||
ScheduleHandle,
|
||||
ScheduleOverlapPolicy,
|
||||
@@ -38,13 +37,49 @@ _ORCHESTRATOR_TASK_QUEUE = "orchestrator-tq"
|
||||
# RunActivityWorkflow detects this value and derives run dedup key from workflow_id.
|
||||
SCHEDULED_TRIGGER_KEY = "scheduled"
|
||||
|
||||
# T24: misfire_policy → ScheduleOverlapPolicy
|
||||
_MISFIRE_TO_OVERLAP: dict[str, ScheduleOverlapPolicy] = {
|
||||
"skip": ScheduleOverlapPolicy.SKIP,
|
||||
"catchup": ScheduleOverlapPolicy.BUFFER_ALL,
|
||||
"compress": ScheduleOverlapPolicy.BUFFER_ONE,
|
||||
# ACTIVITY-WP-0014: misfire_policy → run-miss recovery behaviour.
|
||||
#
|
||||
# A "missed fire" happens when the worker / Temporal is unavailable at trigger
|
||||
# time. Two Temporal levers together define the behaviour:
|
||||
# - catchup_window: how far back the server will recover missed fires once it
|
||||
# is healthy again. The previous code never set this, so a brief outage at
|
||||
# trigger time silently dropped the fire with no recovery and no signal.
|
||||
# - overlap: what to do when a (recovered) fire would start while a prior run
|
||||
# is still executing.
|
||||
#
|
||||
# Legacy values (catchup, compress) are aliased onto the explicit names.
|
||||
_MISFIRE_ALIASES: dict[str, str] = {
|
||||
"catchup": "catchup_all",
|
||||
"compress": "catchup_latest",
|
||||
}
|
||||
|
||||
# overlap policy + default catchup window (seconds) per normalised policy.
|
||||
_SKIP_WINDOW_SECONDS = 60
|
||||
_CATCHUP_ALL_WINDOW_SECONDS = 365 * 24 * 3600
|
||||
_CATCHUP_LATEST_WINDOW_SECONDS = 24 * 3600
|
||||
|
||||
_MISFIRE_TO_OVERLAP: dict[str, ScheduleOverlapPolicy] = {
|
||||
# Run on trigger or skip — recover nothing past a tiny grace window.
|
||||
"skip": ScheduleOverlapPolicy.SKIP,
|
||||
# Run on trigger or recover every missed fire during the outage window.
|
||||
"catchup_all": ScheduleOverlapPolicy.BUFFER_ALL,
|
||||
# Run on trigger or recover the most recent missed fire only; BUFFER_ONE
|
||||
# buffers at most one start and drops the rest, so a backlog never accumulates.
|
||||
"catchup_latest": ScheduleOverlapPolicy.BUFFER_ONE,
|
||||
}
|
||||
|
||||
_MISFIRE_DEFAULT_WINDOW: dict[str, int] = {
|
||||
"skip": _SKIP_WINDOW_SECONDS,
|
||||
"catchup_all": _CATCHUP_ALL_WINDOW_SECONDS,
|
||||
"catchup_latest": _CATCHUP_LATEST_WINDOW_SECONDS,
|
||||
}
|
||||
|
||||
|
||||
def _normalize_misfire_policy(misfire_policy: str) -> str:
|
||||
"""Map legacy aliases onto the explicit run-miss policy names."""
|
||||
canonical = _MISFIRE_ALIASES.get(misfire_policy, misfire_policy)
|
||||
return canonical if canonical in _MISFIRE_TO_OVERLAP else "skip"
|
||||
|
||||
|
||||
def schedule_id(activity_id: str | UUID) -> str:
|
||||
"""Return the canonical Temporal Schedule ID for an ActivityDefinition."""
|
||||
@@ -57,7 +92,15 @@ def smoke_schedule_id(activity_id: str | UUID) -> str:
|
||||
|
||||
|
||||
def _overlap_policy(misfire_policy: str) -> ScheduleOverlapPolicy:
|
||||
return _MISFIRE_TO_OVERLAP.get(misfire_policy, ScheduleOverlapPolicy.SKIP)
|
||||
return _MISFIRE_TO_OVERLAP[_normalize_misfire_policy(misfire_policy)]
|
||||
|
||||
|
||||
def _catchup_window(cfg: CronTriggerConfig) -> timedelta:
|
||||
"""Resolve the catchup window: explicit override, else the policy default."""
|
||||
if cfg.catchup_window_seconds is not None:
|
||||
return timedelta(seconds=cfg.catchup_window_seconds)
|
||||
policy = _normalize_misfire_policy(cfg.misfire_policy)
|
||||
return timedelta(seconds=_MISFIRE_DEFAULT_WINDOW[policy])
|
||||
|
||||
|
||||
def _build_schedule(defn: ActivityDefinition) -> Schedule:
|
||||
@@ -80,7 +123,10 @@ def _build_schedule(defn: ActivityDefinition) -> Schedule:
|
||||
jitter=timedelta(seconds=cfg.jitter_seconds) if cfg.jitter_seconds else None,
|
||||
)
|
||||
|
||||
policy = SchedulePolicy(overlap=_overlap_policy(cfg.misfire_policy))
|
||||
policy = SchedulePolicy(
|
||||
overlap=_overlap_policy(cfg.misfire_policy),
|
||||
catchup_window=_catchup_window(cfg),
|
||||
)
|
||||
state = ScheduleState(paused=not defn.enabled)
|
||||
|
||||
return Schedule(action=action, spec=spec, policy=policy, state=state)
|
||||
@@ -282,18 +328,10 @@ async def upsert_schedule(client: Client, defn: ActivityDefinition) -> ScheduleH
|
||||
else:
|
||||
await handle.pause(note="disabled via upsert_schedule")
|
||||
|
||||
# T24 catchup: backfill any fires missed in the last hour.
|
||||
if isinstance(defn.trigger_config, CronTriggerConfig):
|
||||
if defn.trigger_config.misfire_policy == "catchup":
|
||||
now = datetime.now(tz=timezone.utc)
|
||||
backfill_start = now - timedelta(hours=1)
|
||||
await handle.backfill(
|
||||
ScheduleBackfill(
|
||||
start_at=backfill_start,
|
||||
end_at=now,
|
||||
overlap=ScheduleOverlapPolicy.BUFFER_ALL,
|
||||
)
|
||||
)
|
||||
# ACTIVITY-WP-0014: missed-fire recovery is now handled natively by the
|
||||
# schedule's catchup_window (see _build_schedule), which the server applies
|
||||
# continuously after any outage — not only at upsert time. The previous
|
||||
# ad-hoc 1-hour backfill is therefore no longer needed.
|
||||
|
||||
return handle
|
||||
|
||||
|
||||
@@ -37,6 +37,7 @@ def _make_defn(
|
||||
misfire_policy: str = "skip",
|
||||
enabled: bool = True,
|
||||
jitter: int = 0,
|
||||
catchup_window_seconds: int | None = None,
|
||||
) -> ActivityDefinition:
|
||||
return ActivityDefinition(
|
||||
id=uuid.uuid4(),
|
||||
@@ -46,6 +47,7 @@ def _make_defn(
|
||||
cron_expression=cron,
|
||||
misfire_policy=misfire_policy,
|
||||
jitter_seconds=jitter,
|
||||
catchup_window_seconds=catchup_window_seconds,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -186,6 +188,76 @@ async def test_misfire_policy_compress_sets_overlap_buffer_one(env: WorkflowEnvi
|
||||
await delete_schedule(env.client, defn.id)
|
||||
|
||||
|
||||
# ── ACTIVITY-WP-0014: explicit run-miss policies + catchup window ────────────
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_skip_sets_short_catchup_window(env: WorkflowEnvironment) -> None:
|
||||
"""skip = run on trigger or skip: tiny grace window, no real recovery."""
|
||||
defn = _make_defn(misfire_policy="skip")
|
||||
await upsert_schedule(env.client, defn)
|
||||
|
||||
desc = await env.client.get_schedule_handle(schedule_id(defn.id)).describe()
|
||||
assert desc.schedule.policy.overlap == ScheduleOverlapPolicy.SKIP
|
||||
assert desc.schedule.policy.catchup_window == timedelta(seconds=60)
|
||||
|
||||
await delete_schedule(env.client, defn.id)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_catchup_all_recovers_full_window(env: WorkflowEnvironment) -> None:
|
||||
"""catchup_all = recover every missed fire: long window, BUFFER_ALL."""
|
||||
defn = _make_defn(misfire_policy="catchup_all")
|
||||
await upsert_schedule(env.client, defn)
|
||||
|
||||
desc = await env.client.get_schedule_handle(schedule_id(defn.id)).describe()
|
||||
assert desc.schedule.policy.overlap == ScheduleOverlapPolicy.BUFFER_ALL
|
||||
assert desc.schedule.policy.catchup_window == timedelta(days=365)
|
||||
|
||||
await delete_schedule(env.client, defn.id)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_catchup_latest_does_not_accumulate(env: WorkflowEnvironment) -> None:
|
||||
"""catchup_latest = recover only the most recent missed fire: BUFFER_ONE."""
|
||||
defn = _make_defn(misfire_policy="catchup_latest")
|
||||
await upsert_schedule(env.client, defn)
|
||||
|
||||
desc = await env.client.get_schedule_handle(schedule_id(defn.id)).describe()
|
||||
assert desc.schedule.policy.overlap == ScheduleOverlapPolicy.BUFFER_ONE
|
||||
assert desc.schedule.policy.catchup_window == timedelta(hours=24)
|
||||
|
||||
await delete_schedule(env.client, defn.id)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_legacy_aliases_map_to_explicit_policies(env: WorkflowEnvironment) -> None:
|
||||
"""Legacy catchup/compress keep working and pick up the new catchup windows."""
|
||||
catchup = _make_defn(misfire_policy="catchup")
|
||||
compress = _make_defn(misfire_policy="compress")
|
||||
await upsert_schedule(env.client, catchup)
|
||||
await upsert_schedule(env.client, compress)
|
||||
|
||||
d1 = await env.client.get_schedule_handle(schedule_id(catchup.id)).describe()
|
||||
d2 = await env.client.get_schedule_handle(schedule_id(compress.id)).describe()
|
||||
assert d1.schedule.policy.catchup_window == timedelta(days=365)
|
||||
assert d2.schedule.policy.catchup_window == timedelta(hours=24)
|
||||
|
||||
await delete_schedule(env.client, catchup.id)
|
||||
await delete_schedule(env.client, compress.id)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_explicit_catchup_window_override(env: WorkflowEnvironment) -> None:
|
||||
"""An explicit catchup_window_seconds overrides the per-policy default."""
|
||||
defn = _make_defn(misfire_policy="skip", catchup_window_seconds=7200)
|
||||
await upsert_schedule(env.client, defn)
|
||||
|
||||
desc = await env.client.get_schedule_handle(schedule_id(defn.id)).describe()
|
||||
assert desc.schedule.policy.catchup_window == timedelta(hours=2)
|
||||
|
||||
await delete_schedule(env.client, defn.id)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_schedule_smoke_test_creates_one_shot_schedule(
|
||||
env: WorkflowEnvironment,
|
||||
|
||||
@@ -9,7 +9,7 @@ owner: claude
|
||||
topic_slug: activity-core
|
||||
created: "2026-06-23"
|
||||
updated: "2026-06-23"
|
||||
state_hub_workstream_id: ""
|
||||
state_hub_workstream_id: "91b64686-5d17-4c86-bc9e-3d0ee6720cf5"
|
||||
---
|
||||
|
||||
# Schedule Misfire Robustness & Run-Miss Recovery Options
|
||||
@@ -66,7 +66,7 @@ Proposed mapping to a new `misfire_policy` value set (names open to review):
|
||||
id: ACTIVITY-WP-0014-T01
|
||||
status: todo
|
||||
priority: high
|
||||
state_hub_task_id: ""
|
||||
state_hub_task_id: "c90ff214-9214-48c7-96b9-7d699528d5ab"
|
||||
```
|
||||
|
||||
Bring up the ops-bridge tunnel (`bridge up state-hub-coulombcore`,
|
||||
@@ -81,9 +81,9 @@ calibration evidence is still wanted. Record findings in the workplan.
|
||||
|
||||
```task
|
||||
id: ACTIVITY-WP-0014-T02
|
||||
status: todo
|
||||
status: done
|
||||
priority: high
|
||||
state_hub_task_id: ""
|
||||
state_hub_task_id: "19615562-4cb2-4f25-872f-505d6e40dcc5"
|
||||
```
|
||||
|
||||
Add `catchup_window_seconds` to `CronTriggerConfig` and redefine `misfire_policy`
|
||||
@@ -99,7 +99,7 @@ map). Unit tests for each mode's `(catchup_window, overlap)` mapping.
|
||||
id: ACTIVITY-WP-0014-T03
|
||||
status: todo
|
||||
priority: medium
|
||||
state_hub_task_id: ""
|
||||
state_hub_task_id: "dbedd96a-59ca-4b83-bce6-35755b076807"
|
||||
```
|
||||
|
||||
Detect when a scheduled definition has no successful run within its expected
|
||||
@@ -112,9 +112,9 @@ be invisible.
|
||||
|
||||
```task
|
||||
id: ACTIVITY-WP-0014-T04
|
||||
status: wait
|
||||
status: progress
|
||||
priority: medium
|
||||
state_hub_task_id: ""
|
||||
state_hub_task_id: "04e9d1d2-1192-4402-9402-b12c5d7d44e5"
|
||||
```
|
||||
|
||||
Choose and set the appropriate `misfire_policy` for `daily-statehub-wsjf-triage`
|
||||
|
||||
Reference in New Issue
Block a user