"""Temporal worker entrypoint for activity-core. Starts two workers (wired up in T20): - orchestrator-tq: RunActivityWorkflow + its activities - task-execution-tq: TaskExecutorWorkflow T23: Calls sync_schedules before entering the worker run loop to ensure all cron ActivityDefinitions have live Temporal Schedules. T31: Exposes Prometheus metrics via the Temporal SDK runtime on :9090/metrics. Run with: TEMPORAL_HOST=localhost:7233 \ ACTCORE_DB_URL=postgresql+asyncpg://actcore:actcore@localhost:5433/actcore \ python -m activity_core.worker Environment variables: TEMPORAL_HOST Temporal frontend address (default: localhost:7233) TEMPORAL_NAMESPACE Temporal namespace (default: default) ACTCORE_DB_URL App DB connection string (required) PROMETHEUS_BIND_ADDR Prometheus metrics bind (default: 0.0.0.0:9090) """ from __future__ import annotations import asyncio import logging import os from temporalio.client import Client from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig from temporalio.worker import Worker from activity_core.activities import ( init_session_factory, load_activity_definition, log_run, persist_task_instance, resolve_context, ) from activity_core.sync_schedules import sync as sync_schedules from activity_core.workflows import RunActivityWorkflow, TaskExecutorWorkflow logger = logging.getLogger(__name__) TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233") TEMPORAL_NAMESPACE = os.environ.get("TEMPORAL_NAMESPACE", "default") PROMETHEUS_BIND_ADDR = os.environ.get("PROMETHEUS_BIND_ADDR", "0.0.0.0:9090") ORCHESTRATOR_TASK_QUEUE = "orchestrator-tq" TASK_EXECUTION_TASK_QUEUE = "task-execution-tq" async def run() -> None: db_url = os.environ.get("ACTCORE_DB_URL") if not db_url: raise RuntimeError("ACTCORE_DB_URL is required") init_session_factory(db_url) # T31: Configure the Temporal SDK runtime to emit metrics in Prometheus format. runtime = Runtime( telemetry=TelemetryConfig( metrics=PrometheusConfig(bind_address=PROMETHEUS_BIND_ADDR) ) ) client = await Client.connect( TEMPORAL_HOST, namespace=TEMPORAL_NAMESPACE, runtime=runtime ) # T23: Sync Temporal Schedules with the DB before workers start accepting tasks. logger.info("Syncing Temporal Schedules with ActivityDefinition DB...") try: await sync_schedules(client, db_url) except Exception: logger.exception("schedule sync failed — continuing worker startup") orchestrator_worker = Worker( client, task_queue=ORCHESTRATOR_TASK_QUEUE, workflows=[RunActivityWorkflow], activities=[load_activity_definition, resolve_context, log_run], ) task_worker = Worker( client, task_queue=TASK_EXECUTION_TASK_QUEUE, workflows=[TaskExecutorWorkflow], activities=[persist_task_instance], ) async with orchestrator_worker, task_worker: print( f"Workers running — queues: {ORCHESTRATOR_TASK_QUEUE!r}, " f"{TASK_EXECUTION_TASK_QUEUE!r} (namespace={TEMPORAL_NAMESPACE!r})" ) await asyncio.Future() # run until cancelled if __name__ == "__main__": logging.basicConfig(level=logging.INFO) asyncio.run(run())