#!/usr/bin/env python3 """Recreate and smoke-test a recurring ActivityDefinition schedule. The smoke test creates a distinct one-shot Temporal Schedule that fires once after a short delay and starts the same RunActivityWorkflow. It is meant for operator use after adding or changing scheduled workflow actions. """ from __future__ import annotations import argparse import asyncio import json import os import sys from datetime import datetime, timedelta, timezone from typing import Any from uuid import UUID from temporalio.client import Client from temporalio.service import RPCError DEFAULT_TEMPORAL_HOST = "localhost:7233" DEFAULT_TEMPORAL_NAMESPACE = "default" def parse_args(argv: list[str] | None = None) -> argparse.Namespace: parser = argparse.ArgumentParser( description="Schedule a one-shot smoke run for a recurring ActivityDefinition.", ) parser.add_argument("--activity-id", required=True) parser.add_argument("--db-url", default=os.environ.get("ACTCORE_DB_URL")) parser.add_argument("--temporal-host", default=os.environ.get( "TEMPORAL_HOST", DEFAULT_TEMPORAL_HOST, )) parser.add_argument("--temporal-namespace", default=os.environ.get( "TEMPORAL_NAMESPACE", DEFAULT_TEMPORAL_NAMESPACE, )) parser.add_argument("--delay-seconds", type=int, default=60) parser.add_argument("--timeout-seconds", type=int, default=600) parser.add_argument( "--recreate-recurring", action="store_true", help="Delete and recreate the recurring schedule before the smoke run.", ) parser.add_argument( "--keep-smoke-schedule", action="store_true", help="Leave the one-shot smoke schedule in Temporal after waiting.", ) parser.add_argument( "--dry-run", action="store_true", help="Print the planned smoke test without contacting Temporal or DB.", ) return parser.parse_args(argv) def build_dry_run_report(args: argparse.Namespace) -> dict[str, Any]: return { "mode": "dry-run", "activity_id": args.activity_id, "temporal_host": args.temporal_host, "temporal_namespace": args.temporal_namespace, "recreate_recurring": bool(args.recreate_recurring), "delay_seconds": args.delay_seconds, "timeout_seconds": args.timeout_seconds, "checks": [ "load ActivityDefinition from Postgres", "optionally delete and recreate the recurring Temporal Schedule", "create a one-shot smoke Temporal Schedule one minute in the future", "wait for the smoke workflow to complete", "return non-zero if the smoke workflow fails or times out", ], } async def _load_activity(db_url: str, activity_id: str): from sqlalchemy import select from sqlalchemy.ext.asyncio import async_sessionmaker from activity_core.db import make_engine from activity_core.models import ActivityDefinition from activity_core.orm import ActivityDefinition as ActivityDefinitionRow engine = make_engine(db_url) session_factory = async_sessionmaker(engine, expire_on_commit=False) try: async with session_factory() as session: row = await session.scalar( select(ActivityDefinitionRow).where( ActivityDefinitionRow.id == UUID(activity_id) ) ) if row is None: raise RuntimeError(f"ActivityDefinition {activity_id!r} was not found") return ActivityDefinition.model_validate({ "id": row.id, "name": row.name, "enabled": row.enabled, "trigger_config": row.trigger_config, "context_sources": row.context_sources, "task_templates": row.task_templates, "dedupe_key_strategy": row.dedupe_key_strategy, "version": row.version, }) finally: await engine.dispose() async def _wait_for_workflow( client: Client, workflow_id_prefix: str, timeout_seconds: int, ) -> dict[str, Any]: deadline = asyncio.get_running_loop().time() + timeout_seconds last_error: str | None = None while asyncio.get_running_loop().time() < deadline: try: query = f'WorkflowId STARTS_WITH "{workflow_id_prefix}"' item = None async for candidate in client.list_workflows(query=query): if candidate.id.startswith(workflow_id_prefix): item = candidate break if item is None: raise RuntimeError(f"workflow not found for prefix: {workflow_id_prefix}") handle = client.get_workflow_handle(item.id, run_id=item.run_id) desc = await handle.describe() except (RPCError, RuntimeError) as exc: last_error = str(exc) await asyncio.sleep(2) continue status = str(desc.status) if status == "2": return { "workflow_id": item.id, "run_id": item.run_id, "status": "completed", "result": await handle.result(), } if status != "1": return { "workflow_id": item.id, "run_id": item.run_id, "status": status, "error": f"smoke workflow ended with status {status}", } await asyncio.sleep(2) return { "workflow_id_prefix": workflow_id_prefix, "status": "timeout", "error": last_error or "smoke workflow did not complete before timeout", } async def run_live(args: argparse.Namespace) -> dict[str, Any]: if not args.db_url: raise RuntimeError("ACTCORE_DB_URL or --db-url is required") from activity_core.schedule_manager import ( delete_schedule, delete_smoke_test_schedule, schedule_id, schedule_smoke_test, smoke_schedule_id, upsert_schedule, ) defn = await _load_activity(args.db_url, args.activity_id) client = await Client.connect( args.temporal_host, namespace=args.temporal_namespace, ) if args.recreate_recurring: await delete_schedule(client, args.activity_id) await upsert_schedule(client, defn) smoke_sid, workflow_id_prefix, fire_at = await schedule_smoke_test( client, defn, delay=timedelta(seconds=args.delay_seconds), ) wait_result: dict[str, Any] try: wait_result = await _wait_for_workflow( client, workflow_id_prefix, args.timeout_seconds, ) finally: if not args.keep_smoke_schedule: await delete_smoke_test_schedule(client, args.activity_id) return { "mode": "live", "activity_id": args.activity_id, "activity_name": defn.name, "recurring_schedule_id": schedule_id(args.activity_id), "smoke_schedule_id": smoke_sid or smoke_schedule_id(args.activity_id), "smoke_workflow_id_prefix": workflow_id_prefix, "smoke_fire_at": fire_at.isoformat(), "recreate_recurring": bool(args.recreate_recurring), "wait_result": wait_result, } def main(argv: list[str] | None = None) -> int: args = parse_args(argv) try: if args.dry_run: report = build_dry_run_report(args) else: report = asyncio.run(run_live(args)) except Exception as exc: print(f"smoke_test_schedule: {exc}", file=sys.stderr) return 2 print(json.dumps(report, indent=2, sort_keys=True)) wait_result = report.get("wait_result") or {} return 0 if wait_result.get("status") in (None, "completed") else 1 if __name__ == "__main__": raise SystemExit(main())