"""Temporal Schedule management for activity-core. T22: upsert_schedule, delete_schedule, list_schedules T24: misfire_policy → ScheduleOverlapPolicy mapping (all three policies) Schedule ID convention: activity-schedule-{activity_definition.id} Workflow triggered: RunActivityWorkflow on orchestrator-tq """ from __future__ import annotations from datetime import datetime, timedelta, timezone from uuid import UUID from temporalio.client import ( Client, Schedule, ScheduleActionStartWorkflow, ScheduleBackfill, ScheduleHandle, ScheduleOverlapPolicy, SchedulePolicy, ScheduleSpec, ScheduleState, ScheduleUpdate, ScheduleUpdateInput, ) from temporalio.service import RPCError from activity_core.models import ActivityDefinition, CronTriggerConfig _ORCHESTRATOR_TASK_QUEUE = "orchestrator-tq" # Trigger_key sentinel used when a workflow is started by a Temporal Schedule. # RunActivityWorkflow detects this value and derives run dedup key from workflow_id. SCHEDULED_TRIGGER_KEY = "scheduled" # T24: misfire_policy → ScheduleOverlapPolicy _MISFIRE_TO_OVERLAP: dict[str, ScheduleOverlapPolicy] = { "skip": ScheduleOverlapPolicy.SKIP, "catchup": ScheduleOverlapPolicy.BUFFER_ALL, "compress": ScheduleOverlapPolicy.BUFFER_ONE, } def schedule_id(activity_id: str | UUID) -> str: """Return the canonical Temporal Schedule ID for an ActivityDefinition.""" return f"activity-schedule-{activity_id}" def _overlap_policy(misfire_policy: str) -> ScheduleOverlapPolicy: return _MISFIRE_TO_OVERLAP.get(misfire_policy, ScheduleOverlapPolicy.SKIP) def _build_schedule(defn: ActivityDefinition) -> Schedule: """Construct a Temporal Schedule object from a cron ActivityDefinition.""" assert isinstance(defn.trigger_config, CronTriggerConfig) cfg: CronTriggerConfig = defn.trigger_config # Workflow ID uses ${firstScheduledTime} so each schedule fire gets a # unique workflow ID, enabling replay/audit without ID conflicts. action = ScheduleActionStartWorkflow( "RunActivityWorkflow", args=[str(defn.id), SCHEDULED_TRIGGER_KEY, None], id=f"activity-{defn.id}:${{firstScheduledTime}}", task_queue=_ORCHESTRATOR_TASK_QUEUE, ) spec = ScheduleSpec( cron_expressions=[cfg.cron_expression], timezone_name=cfg.timezone, jitter=timedelta(seconds=cfg.jitter_seconds) if cfg.jitter_seconds else None, ) policy = SchedulePolicy(overlap=_overlap_policy(cfg.misfire_policy)) state = ScheduleState(paused=not defn.enabled) return Schedule(action=action, spec=spec, policy=policy, state=state) async def upsert_schedule(client: Client, defn: ActivityDefinition) -> ScheduleHandle: """Create or update a Temporal Schedule for a cron ActivityDefinition. - Only operates on definitions with trigger_type='cron'. - If enabled=False the schedule is created paused. - For misfire_policy='catchup', triggers a backfill covering the last hour after each upsert to replay any recently missed fires. Returns the ScheduleHandle for the created/updated schedule. """ if not isinstance(defn.trigger_config, CronTriggerConfig): raise ValueError( f"upsert_schedule requires trigger_type='cron', " f"got {defn.trigger_config.trigger_type!r}" ) sid = schedule_id(defn.id) sched = _build_schedule(defn) try: handle = await client.create_schedule(sid, sched) except RPCError: # Schedule already exists — update it in place. handle = client.get_schedule_handle(sid) async def _updater(input: ScheduleUpdateInput) -> ScheduleUpdate: # noqa: ARG001 return ScheduleUpdate(schedule=sched) await handle.update(_updater) # Sync pause state explicitly (update replaces the schedule object # but pause state is part of ScheduleState, already embedded above). if defn.enabled: await handle.unpause() else: await handle.pause(note="disabled via upsert_schedule") # T24 catchup: backfill any fires missed in the last hour. if isinstance(defn.trigger_config, CronTriggerConfig): if defn.trigger_config.misfire_policy == "catchup": now = datetime.now(tz=timezone.utc) backfill_start = now - timedelta(hours=1) await handle.backfill( [ ScheduleBackfill( start_at=backfill_start, end_at=now, overlap=ScheduleOverlapPolicy.BUFFER_ALL, ) ] ) return handle async def delete_schedule(client: Client, activity_id: str | UUID) -> None: """Delete the Temporal Schedule for the given activity_id. No-op if the schedule does not exist. """ handle = client.get_schedule_handle(schedule_id(activity_id)) try: await handle.delete() except RPCError: pass # Not found — treat as success. async def list_schedules(client: Client) -> list[dict]: """Enumerate all activity-core Temporal Schedules. Returns a list of dicts: [{"schedule_id": str, "activity_id": str}, ...] """ prefix = "activity-schedule-" results: list[dict] = [] async for entry in await client.list_schedules(): if entry.id.startswith(prefix): results.append( { "schedule_id": entry.id, "activity_id": entry.id[len(prefix) :], } ) return results