"""Bootstrap script: sync Temporal Schedules with the ActivityDefinition DB. T23: On startup, ensures every enabled cron ActivityDefinition has a live Temporal Schedule, and removes orphaned schedules that have no matching DB row. Run directly: ACTCORE_DB_URL=... uv run python -m activity_core.sync_schedules Also called from worker.py before the worker enters its run loop. """ from __future__ import annotations import asyncio import logging import os import uuid from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from temporalio.client import Client from activity_core.models import ActivityDefinition, CronTriggerConfig from activity_core.orm import ActivityDefinition as ActivityDefinitionRow from activity_core.schedule_manager import delete_schedule, list_schedules, upsert_schedule logger = logging.getLogger(__name__) TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233") TEMPORAL_NAMESPACE = os.environ.get("TEMPORAL_NAMESPACE", "default") def _row_to_domain(row: ActivityDefinitionRow) -> ActivityDefinition: """Convert an ORM row to a domain ActivityDefinition for schedule_manager.""" return ActivityDefinition.model_validate( { "id": row.id, "name": row.name, "enabled": row.enabled, "trigger_config": row.trigger_config, "context_sources": row.context_sources, "task_templates": row.task_templates, "dedupe_key_strategy": row.dedupe_key_strategy, "version": row.version, } ) async def sync(client: Client, db_url: str) -> None: """Reconcile Temporal Schedules against the ActivityDefinition table. Steps: 1. Load all enabled cron ActivityDefinitions from Postgres. 2. Upsert a Temporal Schedule for each one. 3. Delete Temporal Schedules whose activity_id has no matching DB row (tombstone cleanup for deleted or trigger-type-changed definitions). """ engine = create_async_engine(db_url) session_factory = async_sessionmaker(engine, expire_on_commit=False) try: async with session_factory() as session: rows = ( await session.scalars( select(ActivityDefinitionRow).where( ActivityDefinitionRow.trigger_type == "cron" ) ) ).all() finally: await engine.dispose() db_activity_ids: set[str] = set() upserted = 0 skipped = 0 for row in rows: defn = _row_to_domain(row) if not isinstance(defn.trigger_config, CronTriggerConfig): continue # should not happen given the WHERE clause, but guard anyway db_activity_ids.add(str(defn.id)) if defn.enabled: await upsert_schedule(client, defn) upserted += 1 logger.info("upserted schedule for activity %s (%s)", defn.id, defn.name) else: # Disabled definitions: schedule may exist (paused) — leave it; # upsert_schedule already handles the paused state. await upsert_schedule(client, defn) skipped += 1 logger.info("upserted paused schedule for disabled activity %s", defn.id) # Tombstone cleanup: remove Temporal Schedules with no matching DB row. existing_schedules = await list_schedules(client) deleted = 0 for entry in existing_schedules: if entry["activity_id"] not in db_activity_ids: await delete_schedule(client, entry["activity_id"]) deleted += 1 logger.info("deleted orphaned schedule %s", entry["schedule_id"]) logger.info( "sync_schedules complete — upserted=%d skipped_disabled=%d deleted_orphans=%d", upserted, skipped, deleted, ) async def main() -> None: logging.basicConfig(level=logging.INFO) db_url = os.environ.get("ACTCORE_DB_URL") if not db_url: raise RuntimeError("ACTCORE_DB_URL is required") client = await Client.connect(TEMPORAL_HOST, namespace=TEMPORAL_NAMESPACE) await sync(client, db_url) if __name__ == "__main__": asyncio.run(main())