Files
activity-core/src/activity_core/worker.py
tegwick ea5fbe0bf3 feat(WP-0002): complete Triggers & Ops workstream
Delivers all 12 tasks (T22–T33): Temporal Schedule manager + startup
sync, NATS JetStream event router, FastAPI CRUD + manual trigger,
Prometheus metrics wiring, custom search-attribute tagging, and
operational runbook. Marks workplan status as done.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-28 01:04:43 +01:00

103 lines
3.3 KiB
Python

"""Temporal worker entrypoint for activity-core.
Starts two workers (wired up in T20):
- orchestrator-tq: RunActivityWorkflow + its activities
- task-execution-tq: TaskExecutorWorkflow
T23: Calls sync_schedules before entering the worker run loop to ensure
all cron ActivityDefinitions have live Temporal Schedules.
T31: Exposes Prometheus metrics via the Temporal SDK runtime on :9090/metrics.
Run with:
TEMPORAL_HOST=localhost:7233 \
ACTCORE_DB_URL=postgresql+asyncpg://actcore:actcore@localhost:5433/actcore \
python -m activity_core.worker
Environment variables:
TEMPORAL_HOST Temporal frontend address (default: localhost:7233)
TEMPORAL_NAMESPACE Temporal namespace (default: default)
ACTCORE_DB_URL App DB connection string (required)
PROMETHEUS_BIND_ADDR Prometheus metrics bind (default: 0.0.0.0:9090)
"""
from __future__ import annotations
import asyncio
import logging
import os
from temporalio.client import Client
from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig
from temporalio.worker import Worker
from activity_core.activities import (
init_session_factory,
load_activity_definition,
log_run,
persist_task_instance,
resolve_context,
)
from activity_core.sync_schedules import sync as sync_schedules
from activity_core.workflows import RunActivityWorkflow, TaskExecutorWorkflow
logger = logging.getLogger(__name__)
TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233")
TEMPORAL_NAMESPACE = os.environ.get("TEMPORAL_NAMESPACE", "default")
PROMETHEUS_BIND_ADDR = os.environ.get("PROMETHEUS_BIND_ADDR", "0.0.0.0:9090")
ORCHESTRATOR_TASK_QUEUE = "orchestrator-tq"
TASK_EXECUTION_TASK_QUEUE = "task-execution-tq"
async def run() -> None:
db_url = os.environ.get("ACTCORE_DB_URL")
if not db_url:
raise RuntimeError("ACTCORE_DB_URL is required")
init_session_factory(db_url)
# T31: Configure the Temporal SDK runtime to emit metrics in Prometheus format.
runtime = Runtime(
telemetry=TelemetryConfig(
metrics=PrometheusConfig(bind_address=PROMETHEUS_BIND_ADDR)
)
)
client = await Client.connect(
TEMPORAL_HOST, namespace=TEMPORAL_NAMESPACE, runtime=runtime
)
# T23: Sync Temporal Schedules with the DB before workers start accepting tasks.
logger.info("Syncing Temporal Schedules with ActivityDefinition DB...")
try:
await sync_schedules(client, db_url)
except Exception:
logger.exception("schedule sync failed — continuing worker startup")
orchestrator_worker = Worker(
client,
task_queue=ORCHESTRATOR_TASK_QUEUE,
workflows=[RunActivityWorkflow],
activities=[load_activity_definition, resolve_context, log_run],
)
task_worker = Worker(
client,
task_queue=TASK_EXECUTION_TASK_QUEUE,
workflows=[TaskExecutorWorkflow],
activities=[persist_task_instance],
)
async with orchestrator_worker, task_worker:
print(
f"Workers running — queues: {ORCHESTRATOR_TASK_QUEUE!r}, "
f"{TASK_EXECUTION_TASK_QUEUE!r} (namespace={TEMPORAL_NAMESPACE!r})"
)
await asyncio.Future() # run until cancelled
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(run())