Files
activity-core/tests/test_schedule_lifecycle.py
tegwick a83b117f60 feat(ACTIVITY-WP-0014): explicit run-miss recovery policies (T02, T04)
Set Temporal catchup_window on cron schedules so a fire missed during a
worker/Temporal outage is no longer silently dropped. Redefine misfire_policy
into three explicit modes — skip, catchup_all, catchup_latest — mapping to
(catchup_window, overlap) pairs; legacy catchup/compress aliased. Add
catchup_window_seconds override. Remove the ad-hoc upsert-time 1h backfill in
favour of native catchup. Apply catchup_latest to daily-statehub-wsjf-triage in
the Railiance runtime manifest and document run-miss policies in the runbook.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 14:15:45 +02:00

286 lines
9.9 KiB
Python

"""T25: Schedule pause/resume lifecycle tests.
Tests schedule_manager.py against a local embedded Temporal server
(temporalio[testing] — WorkflowEnvironment.start_local()).
Requires no Docker; the Temporal testing library bundles a self-contained server.
Run with:
uv run pytest tests/test_schedule_lifecycle.py -v
"""
from __future__ import annotations
import asyncio
import uuid
from datetime import datetime, timedelta, timezone
import pytest
from temporalio.client import ScheduleOverlapPolicy
from temporalio.testing import WorkflowEnvironment
from activity_core.models import ActivityDefinition, CronTriggerConfig
from activity_core.schedule_manager import (
delete_schedule,
delete_smoke_test_schedule,
list_schedules,
schedule_id,
schedule_smoke_test,
smoke_schedule_id,
upsert_schedule,
)
def _make_defn(
*,
cron: str = "0 9 * * 1-5",
misfire_policy: str = "skip",
enabled: bool = True,
jitter: int = 0,
catchup_window_seconds: int | None = None,
) -> ActivityDefinition:
return ActivityDefinition(
id=uuid.uuid4(),
name="test-activity",
enabled=enabled,
trigger_config=CronTriggerConfig(
cron_expression=cron,
misfire_policy=misfire_policy,
jitter_seconds=jitter,
catchup_window_seconds=catchup_window_seconds,
),
)
@pytest.fixture(scope="module")
async def env():
"""Start a local embedded Temporal server for the test module."""
async with await WorkflowEnvironment.start_local() as e:
yield e
# ── T25a: upsert creates a schedule and list_schedules finds it ──────────────
@pytest.mark.asyncio
async def test_upsert_schedule_creates_schedule(env: WorkflowEnvironment) -> None:
defn = _make_defn()
sid = schedule_id(defn.id)
await upsert_schedule(env.client, defn)
# The embedded test server's visibility index is eventually consistent —
# wait briefly for the new schedule to appear in the listing.
ids: list[str] = []
for _ in range(10):
schedules = await list_schedules(env.client)
ids = [s["schedule_id"] for s in schedules]
if sid in ids:
break
await asyncio.sleep(0.3)
assert sid in ids, f"Expected schedule {sid!r} in {ids}"
# Cleanup
await delete_schedule(env.client, defn.id)
# ── T25b: upsert with enabled=False creates a paused schedule ────────────────
@pytest.mark.asyncio
async def test_upsert_disabled_creates_paused_schedule(env: WorkflowEnvironment) -> None:
defn = _make_defn(enabled=False)
await upsert_schedule(env.client, defn)
handle = env.client.get_schedule_handle(schedule_id(defn.id))
desc = await handle.describe()
assert desc.schedule.state.paused, "Schedule should be paused when enabled=False"
await delete_schedule(env.client, defn.id)
# ── T25c: second upsert (enabled=True) unpauses the schedule ────────────────
@pytest.mark.asyncio
async def test_upsert_reenables_paused_schedule(env: WorkflowEnvironment) -> None:
defn_disabled = _make_defn(enabled=False)
await upsert_schedule(env.client, defn_disabled)
# Re-enable the same activity
defn_enabled = ActivityDefinition(
id=defn_disabled.id,
name=defn_disabled.name,
enabled=True,
trigger_config=defn_disabled.trigger_config,
)
await upsert_schedule(env.client, defn_enabled)
handle = env.client.get_schedule_handle(schedule_id(defn_enabled.id))
desc = await handle.describe()
assert not desc.schedule.state.paused, "Schedule should be unpaused after re-enable"
await delete_schedule(env.client, defn_enabled.id)
# ── T25d: delete_schedule removes the schedule ───────────────────────────────
@pytest.mark.asyncio
async def test_delete_schedule_removes_schedule(env: WorkflowEnvironment) -> None:
defn = _make_defn()
await upsert_schedule(env.client, defn)
await delete_schedule(env.client, defn.id)
sid = schedule_id(defn.id)
ids: list[str] = []
for _ in range(10):
schedules = await list_schedules(env.client)
ids = [s["schedule_id"] for s in schedules]
if sid not in ids:
break
await asyncio.sleep(0.3)
assert sid not in ids, "Schedule should be gone after delete"
# ── T25e: delete_schedule is idempotent (no-op for non-existent schedule) ────
@pytest.mark.asyncio
async def test_delete_schedule_nonexistent_is_noop(env: WorkflowEnvironment) -> None:
# Should not raise
await delete_schedule(env.client, uuid.uuid4())
# ── T24: misfire_policy round-trip ───────────────────────────────────────────
@pytest.mark.asyncio
async def test_misfire_policy_skip_sets_overlap_skip(env: WorkflowEnvironment) -> None:
defn = _make_defn(misfire_policy="skip")
await upsert_schedule(env.client, defn)
handle = env.client.get_schedule_handle(schedule_id(defn.id))
desc = await handle.describe()
assert desc.schedule.policy.overlap == ScheduleOverlapPolicy.SKIP
await delete_schedule(env.client, defn.id)
@pytest.mark.asyncio
async def test_misfire_policy_catchup_sets_overlap_buffer_all(env: WorkflowEnvironment) -> None:
defn = _make_defn(misfire_policy="catchup")
await upsert_schedule(env.client, defn)
handle = env.client.get_schedule_handle(schedule_id(defn.id))
desc = await handle.describe()
assert desc.schedule.policy.overlap == ScheduleOverlapPolicy.BUFFER_ALL
await delete_schedule(env.client, defn.id)
@pytest.mark.asyncio
async def test_misfire_policy_compress_sets_overlap_buffer_one(env: WorkflowEnvironment) -> None:
defn = _make_defn(misfire_policy="compress")
await upsert_schedule(env.client, defn)
handle = env.client.get_schedule_handle(schedule_id(defn.id))
desc = await handle.describe()
assert desc.schedule.policy.overlap == ScheduleOverlapPolicy.BUFFER_ONE
await delete_schedule(env.client, defn.id)
# ── ACTIVITY-WP-0014: explicit run-miss policies + catchup window ────────────
@pytest.mark.asyncio
async def test_skip_sets_short_catchup_window(env: WorkflowEnvironment) -> None:
"""skip = run on trigger or skip: tiny grace window, no real recovery."""
defn = _make_defn(misfire_policy="skip")
await upsert_schedule(env.client, defn)
desc = await env.client.get_schedule_handle(schedule_id(defn.id)).describe()
assert desc.schedule.policy.overlap == ScheduleOverlapPolicy.SKIP
assert desc.schedule.policy.catchup_window == timedelta(seconds=60)
await delete_schedule(env.client, defn.id)
@pytest.mark.asyncio
async def test_catchup_all_recovers_full_window(env: WorkflowEnvironment) -> None:
"""catchup_all = recover every missed fire: long window, BUFFER_ALL."""
defn = _make_defn(misfire_policy="catchup_all")
await upsert_schedule(env.client, defn)
desc = await env.client.get_schedule_handle(schedule_id(defn.id)).describe()
assert desc.schedule.policy.overlap == ScheduleOverlapPolicy.BUFFER_ALL
assert desc.schedule.policy.catchup_window == timedelta(days=365)
await delete_schedule(env.client, defn.id)
@pytest.mark.asyncio
async def test_catchup_latest_does_not_accumulate(env: WorkflowEnvironment) -> None:
"""catchup_latest = recover only the most recent missed fire: BUFFER_ONE."""
defn = _make_defn(misfire_policy="catchup_latest")
await upsert_schedule(env.client, defn)
desc = await env.client.get_schedule_handle(schedule_id(defn.id)).describe()
assert desc.schedule.policy.overlap == ScheduleOverlapPolicy.BUFFER_ONE
assert desc.schedule.policy.catchup_window == timedelta(hours=24)
await delete_schedule(env.client, defn.id)
@pytest.mark.asyncio
async def test_legacy_aliases_map_to_explicit_policies(env: WorkflowEnvironment) -> None:
"""Legacy catchup/compress keep working and pick up the new catchup windows."""
catchup = _make_defn(misfire_policy="catchup")
compress = _make_defn(misfire_policy="compress")
await upsert_schedule(env.client, catchup)
await upsert_schedule(env.client, compress)
d1 = await env.client.get_schedule_handle(schedule_id(catchup.id)).describe()
d2 = await env.client.get_schedule_handle(schedule_id(compress.id)).describe()
assert d1.schedule.policy.catchup_window == timedelta(days=365)
assert d2.schedule.policy.catchup_window == timedelta(hours=24)
await delete_schedule(env.client, catchup.id)
await delete_schedule(env.client, compress.id)
@pytest.mark.asyncio
async def test_explicit_catchup_window_override(env: WorkflowEnvironment) -> None:
"""An explicit catchup_window_seconds overrides the per-policy default."""
defn = _make_defn(misfire_policy="skip", catchup_window_seconds=7200)
await upsert_schedule(env.client, defn)
desc = await env.client.get_schedule_handle(schedule_id(defn.id)).describe()
assert desc.schedule.policy.catchup_window == timedelta(hours=2)
await delete_schedule(env.client, defn.id)
@pytest.mark.asyncio
async def test_schedule_smoke_test_creates_one_shot_schedule(
env: WorkflowEnvironment,
) -> None:
defn = _make_defn()
fire_base = datetime(2026, 6, 6, 12, 0, tzinfo=timezone.utc)
sid, workflow_id, fire_at = await schedule_smoke_test(
env.client,
defn,
delay=timedelta(minutes=1),
now=fire_base,
)
assert sid == smoke_schedule_id(defn.id)
assert workflow_id == f"activity-{defn.id}:smoke-20260606T120100Z"
assert fire_at == datetime(2026, 6, 6, 12, 1, tzinfo=timezone.utc)
handle = env.client.get_schedule_handle(sid)
desc = await handle.describe()
assert desc.schedule.state.limited_actions is True
assert desc.schedule.state.remaining_actions == 1
assert desc.schedule.spec.time_zone_name == "UTC"
await delete_smoke_test_schedule(env.client, defn.id)