"""Event Router — NATS JetStream consumer that routes events to RunActivityWorkflow. T26: EventRouter class — connects to NATS JetStream, subscribes to activity.> T27: Routing rules — match event.type + payload filters to enabled ActivityDefinitions T28: Start/signal workflow from Event Router with idempotent workflow ID Stream: ACTIVITY_EVENTS Subject: activity.> Consumer: activity-core-event-router (durable, push-based) Message ack happens only after the workflow has been successfully started, giving at-least-once delivery semantics. Usage: NATS_URL=nats://localhost:4222 \ ACTCORE_DB_URL=postgresql+asyncpg://... \ TEMPORAL_HOST=localhost:7233 \ python -m activity_core.event_router """ from __future__ import annotations import asyncio import logging import os import signal import uuid from datetime import datetime, timezone from typing import Any import nats import nats.js.api from nats.aio.client import Client as NATSClient from nats.js.client import JetStreamContext from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from temporalio.client import Client as TemporalClient from temporalio.common import WorkflowIDConflictPolicy from temporalio.exceptions import WorkflowAlreadyStartedError from activity_core.models import EventEnvelope, EventTriggerConfig from activity_core.orm import ActivityDefinition as ActivityDefinitionRow logger = logging.getLogger(__name__) NATS_URL = os.environ.get("NATS_URL", "nats://localhost:4222") TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233") TEMPORAL_NAMESPACE = os.environ.get("TEMPORAL_NAMESPACE", "default") _STREAM_NAME = "ACTIVITY_EVENTS" _SUBJECT = "activity.>" _CONSUMER_NAME = "activity-core-event-router" _ORCHESTRATOR_TASK_QUEUE = "orchestrator-tq" class EventRouter: """Subscribes to NATS JetStream and routes incoming events to Temporal workflows.""" def __init__( self, nats_url: str, temporal_client: TemporalClient, db_url: str, ) -> None: self._nats_url = nats_url self._temporal = temporal_client self._db_url = db_url self._nc: NATSClient | None = None self._js: JetStreamContext | None = None self._session_factory: async_sessionmaker[AsyncSession] | None = None async def _ensure_stream(self, js: JetStreamContext) -> None: """Create the ACTIVITY_EVENTS stream if it does not exist.""" try: await js.find_stream(_SUBJECT) except Exception: await js.add_stream( nats.js.api.StreamConfig( name=_STREAM_NAME, subjects=[_SUBJECT], ) ) logger.info("created JetStream stream %r", _STREAM_NAME) # T27: Load all enabled event-trigger ActivityDefinitions from DB. async def _load_event_definitions( self, ) -> list[tuple[str, EventTriggerConfig]]: """Return list of (activity_id, EventTriggerConfig) for enabled event defs.""" assert self._session_factory is not None async with self._session_factory() as session: rows = ( await session.scalars( select(ActivityDefinitionRow).where( ActivityDefinitionRow.trigger_type == "event", ActivityDefinitionRow.enabled.is_(True), ) ) ).all() result = [] for row in rows: try: cfg = EventTriggerConfig.model_validate(row.trigger_config) result.append((str(row.id), cfg)) except Exception: logger.warning("skipping malformed trigger_config for activity %s", row.id) return result # T27: Match an envelope against the routing rules. def _matches(self, envelope: EventEnvelope, cfg: EventTriggerConfig) -> bool: """Return True if the envelope matches the EventTriggerConfig.""" if envelope.type != cfg.event_type: return False # All filter key/value pairs must be present in envelope.attributes. for key, value in cfg.filters.items(): if envelope.attributes.get(key) != value: return False return True # T28: Start RunActivityWorkflow for a matched activity. async def _dispatch(self, activity_id: str, envelope: EventEnvelope) -> None: """Start RunActivityWorkflow for one matched activity. Workflow ID is deterministic: activity-{activity_id}:{id} REJECT_DUPLICATE prevents double-processing if the message is redelivered before ack reaches NATS. """ workflow_id = f"activity-{activity_id}:{envelope.id}" try: await self._temporal.start_workflow( "RunActivityWorkflow", args=[ activity_id, envelope.id, envelope.timestamp.isoformat(), envelope.model_dump_json(), ], id=workflow_id, task_queue=_ORCHESTRATOR_TASK_QUEUE, id_conflict_policy=WorkflowIDConflictPolicy.FAIL, ) logger.info( "started workflow %r for event %r (activity %s)", workflow_id, envelope.id, activity_id, ) except WorkflowAlreadyStartedError: # Duplicate delivery — workflow already running or completed; safe to skip. logger.debug("duplicate event %r for activity %s — skipped", envelope.id, activity_id) async def _handle_message(self, msg: Any) -> None: """Decode a NATS message, match it against routing rules, and dispatch.""" try: envelope = EventEnvelope.from_nats_message(msg) except Exception: logger.warning("failed to parse event envelope from NATS message — nacking") await msg.nak() return # T27: Reload routing table per message so hot changes take effect. event_defs = await self._load_event_definitions() matched = [aid for aid, cfg in event_defs if self._matches(envelope, cfg)] if not matched: logger.debug("event %r type=%r matched no definitions", envelope.id, envelope.type) await msg.ack() return # T28: Start a workflow for each matched activity. for activity_id in matched: await self._dispatch(activity_id, envelope) # Ack only after all dispatches succeed (at-least-once guarantee). await msg.ack() async def start(self) -> None: """Connect to NATS, set up the stream/consumer, and begin processing. Blocks until cancelled. """ engine = create_async_engine(self._db_url) self._session_factory = async_sessionmaker(engine, expire_on_commit=False) self._nc = await nats.connect(self._nats_url) self._js = self._nc.jetstream() await self._ensure_stream(self._js) # Durable push consumer — survives restarts, replays unacked messages. sub = await self._js.subscribe( _SUBJECT, durable=_CONSUMER_NAME, cb=self._handle_message, manual_ack=True, ) logger.info( "EventRouter listening on subject %r (stream=%r, consumer=%r)", _SUBJECT, _STREAM_NAME, _CONSUMER_NAME, ) loop = asyncio.get_running_loop() stop = asyncio.Event() loop.add_signal_handler(signal.SIGTERM, stop.set) loop.add_signal_handler(signal.SIGINT, stop.set) try: await stop.wait() logger.info("Shutdown signal received — draining event router") finally: await sub.unsubscribe() await self._nc.drain() await engine.dispose() logger.info("Event router stopped cleanly") async def main() -> None: logging.basicConfig(level=logging.INFO) db_url = os.environ.get("ACTCORE_DB_URL") if not db_url: raise RuntimeError("ACTCORE_DB_URL is required") temporal_client = await TemporalClient.connect(TEMPORAL_HOST, namespace=TEMPORAL_NAMESPACE) router = EventRouter( nats_url=NATS_URL, temporal_client=temporal_client, db_url=db_url, ) await router.start() if __name__ == "__main__": asyncio.run(main())