Files
activity-core/src/activity_core/sync_schedules.py

171 lines
5.5 KiB
Python

"""Bootstrap script: sync Temporal Schedules with the ActivityDefinition DB.
T23: On startup, ensures every enabled cron ActivityDefinition has a live
Temporal Schedule, and removes orphaned schedules that have no matching DB row.
Run directly:
ACTCORE_DB_URL=... uv run python -m activity_core.sync_schedules
Also called from worker.py before the worker enters its run loop.
"""
from __future__ import annotations
import asyncio
import logging
import os
import uuid
from dataclasses import dataclass
from typing import Sequence
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from temporalio.client import Client
from activity_core.models import ActivityDefinition, CronTriggerConfig, ScheduledTriggerConfig
from activity_core.orm import ActivityDefinition as ActivityDefinitionRow
from activity_core.schedule_manager import delete_schedule, list_schedules, upsert_schedule
logger = logging.getLogger(__name__)
TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233")
TEMPORAL_NAMESPACE = os.environ.get("TEMPORAL_NAMESPACE", "default")
@dataclass
class ScheduleSyncResult:
upserted: int = 0
paused: int = 0
deleted_orphans: int = 0
def to_dict(self) -> dict[str, int]:
return {
"upserted": self.upserted,
"paused": self.paused,
"deleted_orphans": self.deleted_orphans,
}
def _row_to_domain(row: ActivityDefinitionRow) -> ActivityDefinition:
"""Convert an ORM row to a domain ActivityDefinition for schedule_manager."""
return ActivityDefinition.model_validate(
{
"id": row.id,
"name": row.name,
"enabled": row.enabled,
"trigger_config": row.trigger_config,
"context_sources": row.context_sources,
"task_templates": row.task_templates,
"dedupe_key_strategy": row.dedupe_key_strategy,
"version": row.version,
}
)
def _valid_schedule_activity_id(defn: ActivityDefinition) -> str:
if isinstance(defn.trigger_config, ScheduledTriggerConfig):
return f"{defn.id}-once"
return str(defn.id)
async def _load_schedule_rows(
session_factory: async_sessionmaker[AsyncSession],
) -> Sequence[ActivityDefinitionRow]:
async with session_factory() as session:
return (
await session.scalars(
select(ActivityDefinitionRow).where(
ActivityDefinitionRow.trigger_type.in_(["cron", "scheduled"])
)
)
).all()
async def sync_schedule_rows(
client: Client,
rows: Sequence[ActivityDefinitionRow],
) -> ScheduleSyncResult:
"""Reconcile Temporal Schedules against already-loaded definition rows."""
valid_schedule_activity_ids: set[str] = set()
result = ScheduleSyncResult()
for row in rows:
defn = _row_to_domain(row)
if not isinstance(
defn.trigger_config,
(CronTriggerConfig, ScheduledTriggerConfig),
):
continue
valid_schedule_activity_ids.add(_valid_schedule_activity_id(defn))
await upsert_schedule(client, defn)
if defn.enabled:
result.upserted += 1
logger.info("upserted schedule for activity %s (%s)", defn.id, defn.name)
else:
result.paused += 1
logger.info("upserted paused schedule for disabled activity %s", defn.id)
# Tombstone cleanup: remove Temporal Schedules with no matching DB row.
existing_schedules = await list_schedules(client)
for entry in existing_schedules:
if entry["activity_id"] not in valid_schedule_activity_ids:
await delete_schedule(client, entry["activity_id"])
result.deleted_orphans += 1
logger.info("deleted orphaned schedule %s", entry["schedule_id"])
logger.info(
"sync_schedules complete — upserted=%d paused=%d deleted_orphans=%d",
result.upserted,
result.paused,
result.deleted_orphans,
)
return result
async def sync_with_session_factory(
client: Client,
session_factory: async_sessionmaker[AsyncSession],
) -> ScheduleSyncResult:
"""Reconcile Temporal Schedules using an existing DB session factory."""
return await sync_schedule_rows(client, await _load_schedule_rows(session_factory))
async def sync(client: Client, db_url: str) -> ScheduleSyncResult:
"""Reconcile Temporal Schedules against the ActivityDefinition table.
Steps:
1. Load all cron/scheduled ActivityDefinitions from Postgres.
2. Upsert a Temporal Schedule for each one, paused when disabled.
3. Delete Temporal Schedules whose activity_id has no matching DB row
(tombstone cleanup for deleted or trigger-type-changed definitions).
"""
engine = create_async_engine(db_url)
session_factory = async_sessionmaker(engine, expire_on_commit=False)
try:
return await sync_with_session_factory(client, session_factory)
finally:
await engine.dispose()
async def main() -> None:
logging.basicConfig(level=logging.INFO)
db_url = os.environ.get("ACTCORE_DB_URL")
if not db_url:
raise RuntimeError("ACTCORE_DB_URL is required")
client = await Client.connect(TEMPORAL_HOST, namespace=TEMPORAL_NAMESPACE)
result = await sync(client, db_url)
print(
"Synced schedules: "
f"upserted={result.upserted} "
f"paused={result.paused} "
f"deleted_orphans={result.deleted_orphans}"
)
if __name__ == "__main__":
asyncio.run(main())