Files
activity-core/src/activity_core/event_router.py
tegwick ea5fbe0bf3 feat(WP-0002): complete Triggers & Ops workstream
Delivers all 12 tasks (T22–T33): Temporal Schedule manager + startup
sync, NATS JetStream event router, FastAPI CRUD + manual trigger,
Prometheus metrics wiring, custom search-attribute tagging, and
operational runbook. Marks workplan status as done.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-28 01:04:43 +01:00

227 lines
8.0 KiB
Python

"""Event Router — NATS JetStream consumer that routes events to RunActivityWorkflow.
T26: EventRouter class — connects to NATS JetStream, subscribes to activity.>
T27: Routing rules — match event.type + payload filters to enabled ActivityDefinitions
T28: Start/signal workflow from Event Router with idempotent workflow ID
Stream: ACTIVITY_EVENTS
Subject: activity.>
Consumer: activity-core-event-router (durable, push-based)
Message ack happens only after the workflow has been successfully started,
giving at-least-once delivery semantics.
Usage:
NATS_URL=nats://localhost:4222 \
ACTCORE_DB_URL=postgresql+asyncpg://... \
TEMPORAL_HOST=localhost:7233 \
python -m activity_core.event_router
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import uuid
from datetime import datetime, timezone
from typing import Any
import nats
import nats.js.api
from nats.aio.client import Client as NATSClient
from nats.js.client import JetStreamContext
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from temporalio.client import Client as TemporalClient
from temporalio.common import WorkflowIDConflictPolicy
from temporalio.exceptions import WorkflowAlreadyStartedError
from activity_core.models import EventEnvelope, EventTriggerConfig
from activity_core.orm import ActivityDefinition as ActivityDefinitionRow
logger = logging.getLogger(__name__)
NATS_URL = os.environ.get("NATS_URL", "nats://localhost:4222")
TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233")
TEMPORAL_NAMESPACE = os.environ.get("TEMPORAL_NAMESPACE", "default")
_STREAM_NAME = "ACTIVITY_EVENTS"
_SUBJECT = "activity.>"
_CONSUMER_NAME = "activity-core-event-router"
_ORCHESTRATOR_TASK_QUEUE = "orchestrator-tq"
class EventRouter:
"""Subscribes to NATS JetStream and routes incoming events to Temporal workflows."""
def __init__(
self,
nats_url: str,
temporal_client: TemporalClient,
db_url: str,
) -> None:
self._nats_url = nats_url
self._temporal = temporal_client
self._db_url = db_url
self._nc: NATSClient | None = None
self._js: JetStreamContext | None = None
self._session_factory: async_sessionmaker[AsyncSession] | None = None
async def _ensure_stream(self, js: JetStreamContext) -> None:
"""Create the ACTIVITY_EVENTS stream if it does not exist."""
try:
await js.find_stream(_SUBJECT)
except Exception:
await js.add_stream(
nats.js.api.StreamConfig(
name=_STREAM_NAME,
subjects=[_SUBJECT],
)
)
logger.info("created JetStream stream %r", _STREAM_NAME)
# T27: Load all enabled event-trigger ActivityDefinitions from DB.
async def _load_event_definitions(
self,
) -> list[tuple[str, EventTriggerConfig]]:
"""Return list of (activity_id, EventTriggerConfig) for enabled event defs."""
assert self._session_factory is not None
async with self._session_factory() as session:
rows = (
await session.scalars(
select(ActivityDefinitionRow).where(
ActivityDefinitionRow.trigger_type == "event",
ActivityDefinitionRow.enabled.is_(True),
)
)
).all()
result = []
for row in rows:
try:
cfg = EventTriggerConfig.model_validate(row.trigger_config)
result.append((str(row.id), cfg))
except Exception:
logger.warning("skipping malformed trigger_config for activity %s", row.id)
return result
# T27: Match an envelope against the routing rules.
def _matches(self, envelope: EventEnvelope, cfg: EventTriggerConfig) -> bool:
"""Return True if the envelope matches the EventTriggerConfig."""
if envelope.type != cfg.event_type:
return False
# All filter key/value pairs must be present in envelope.payload.
for key, value in cfg.filters.items():
if envelope.payload.get(key) != value:
return False
return True
# T28: Start RunActivityWorkflow for a matched activity.
async def _dispatch(self, activity_id: str, envelope: EventEnvelope) -> None:
"""Start RunActivityWorkflow for one matched activity.
Workflow ID is deterministic: activity-{activity_id}:{event_id}
REJECT_DUPLICATE prevents double-processing if the message is redelivered
before ack reaches NATS.
"""
workflow_id = f"activity-{activity_id}:{envelope.event_id}"
try:
await self._temporal.start_workflow(
"RunActivityWorkflow",
args=[activity_id, envelope.event_id, envelope.occurred_at.isoformat()],
id=workflow_id,
task_queue=_ORCHESTRATOR_TASK_QUEUE,
id_conflict_policy=WorkflowIDConflictPolicy.FAIL,
)
logger.info(
"started workflow %r for event %r (activity %s)",
workflow_id,
envelope.event_id,
activity_id,
)
except WorkflowAlreadyStartedError:
# Duplicate delivery — workflow already running or completed; safe to skip.
logger.debug("duplicate event %r for activity %s — skipped", envelope.event_id, activity_id)
async def _handle_message(self, msg: Any) -> None:
"""Decode a NATS message, match it against routing rules, and dispatch."""
try:
raw = json.loads(msg.data.decode())
envelope = EventEnvelope.model_validate(raw)
except Exception:
logger.warning("failed to parse event envelope from NATS message — nacking")
await msg.nak()
return
# T27: Reload routing table per message so hot changes take effect.
event_defs = await self._load_event_definitions()
matched = [aid for aid, cfg in event_defs if self._matches(envelope, cfg)]
if not matched:
logger.debug("event %r type=%r matched no definitions", envelope.event_id, envelope.type)
await msg.ack()
return
# T28: Start a workflow for each matched activity.
for activity_id in matched:
await self._dispatch(activity_id, envelope)
# Ack only after all dispatches succeed (at-least-once guarantee).
await msg.ack()
async def start(self) -> None:
"""Connect to NATS, set up the stream/consumer, and begin processing.
Blocks until cancelled.
"""
engine = create_async_engine(self._db_url)
self._session_factory = async_sessionmaker(engine, expire_on_commit=False)
self._nc = await nats.connect(self._nats_url)
self._js = self._nc.jetstream()
await self._ensure_stream(self._js)
# Durable push consumer — survives restarts, replays unacked messages.
sub = await self._js.subscribe(
_SUBJECT,
durable=_CONSUMER_NAME,
cb=self._handle_message,
manual_ack=True,
)
logger.info(
"EventRouter listening on subject %r (stream=%r, consumer=%r)",
_SUBJECT,
_STREAM_NAME,
_CONSUMER_NAME,
)
try:
await asyncio.Future() # run until cancelled
finally:
await sub.unsubscribe()
await self._nc.drain()
await engine.dispose()
async def main() -> None:
logging.basicConfig(level=logging.INFO)
db_url = os.environ.get("ACTCORE_DB_URL")
if not db_url:
raise RuntimeError("ACTCORE_DB_URL is required")
temporal_client = await TemporalClient.connect(TEMPORAL_HOST, namespace=TEMPORAL_NAMESPACE)
router = EventRouter(
nats_url=NATS_URL,
temporal_client=temporal_client,
db_url=db_url,
)
await router.start()
if __name__ == "__main__":
asyncio.run(main())