generated from coulomb/repo-seed
feat(WP-0002): complete Triggers & Ops workstream
Delivers all 12 tasks (T22–T33): Temporal Schedule manager + startup sync, NATS JetStream event router, FastAPI CRUD + manual trigger, Prometheus metrics wiring, custom search-attribute tagging, and operational runbook. Marks workplan status as done. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
226
src/activity_core/event_router.py
Normal file
226
src/activity_core/event_router.py
Normal file
@@ -0,0 +1,226 @@
|
||||
"""Event Router — NATS JetStream consumer that routes events to RunActivityWorkflow.
|
||||
|
||||
T26: EventRouter class — connects to NATS JetStream, subscribes to activity.>
|
||||
T27: Routing rules — match event.type + payload filters to enabled ActivityDefinitions
|
||||
T28: Start/signal workflow from Event Router with idempotent workflow ID
|
||||
|
||||
Stream: ACTIVITY_EVENTS
|
||||
Subject: activity.>
|
||||
Consumer: activity-core-event-router (durable, push-based)
|
||||
|
||||
Message ack happens only after the workflow has been successfully started,
|
||||
giving at-least-once delivery semantics.
|
||||
|
||||
Usage:
|
||||
NATS_URL=nats://localhost:4222 \
|
||||
ACTCORE_DB_URL=postgresql+asyncpg://... \
|
||||
TEMPORAL_HOST=localhost:7233 \
|
||||
python -m activity_core.event_router
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
import nats
|
||||
import nats.js.api
|
||||
from nats.aio.client import Client as NATSClient
|
||||
from nats.js.client import JetStreamContext
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
from temporalio.client import Client as TemporalClient
|
||||
from temporalio.common import WorkflowIDConflictPolicy
|
||||
from temporalio.exceptions import WorkflowAlreadyStartedError
|
||||
|
||||
from activity_core.models import EventEnvelope, EventTriggerConfig
|
||||
from activity_core.orm import ActivityDefinition as ActivityDefinitionRow
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
NATS_URL = os.environ.get("NATS_URL", "nats://localhost:4222")
|
||||
TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233")
|
||||
TEMPORAL_NAMESPACE = os.environ.get("TEMPORAL_NAMESPACE", "default")
|
||||
|
||||
_STREAM_NAME = "ACTIVITY_EVENTS"
|
||||
_SUBJECT = "activity.>"
|
||||
_CONSUMER_NAME = "activity-core-event-router"
|
||||
_ORCHESTRATOR_TASK_QUEUE = "orchestrator-tq"
|
||||
|
||||
|
||||
class EventRouter:
|
||||
"""Subscribes to NATS JetStream and routes incoming events to Temporal workflows."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
nats_url: str,
|
||||
temporal_client: TemporalClient,
|
||||
db_url: str,
|
||||
) -> None:
|
||||
self._nats_url = nats_url
|
||||
self._temporal = temporal_client
|
||||
self._db_url = db_url
|
||||
self._nc: NATSClient | None = None
|
||||
self._js: JetStreamContext | None = None
|
||||
self._session_factory: async_sessionmaker[AsyncSession] | None = None
|
||||
|
||||
async def _ensure_stream(self, js: JetStreamContext) -> None:
|
||||
"""Create the ACTIVITY_EVENTS stream if it does not exist."""
|
||||
try:
|
||||
await js.find_stream(_SUBJECT)
|
||||
except Exception:
|
||||
await js.add_stream(
|
||||
nats.js.api.StreamConfig(
|
||||
name=_STREAM_NAME,
|
||||
subjects=[_SUBJECT],
|
||||
)
|
||||
)
|
||||
logger.info("created JetStream stream %r", _STREAM_NAME)
|
||||
|
||||
# T27: Load all enabled event-trigger ActivityDefinitions from DB.
|
||||
async def _load_event_definitions(
|
||||
self,
|
||||
) -> list[tuple[str, EventTriggerConfig]]:
|
||||
"""Return list of (activity_id, EventTriggerConfig) for enabled event defs."""
|
||||
assert self._session_factory is not None
|
||||
async with self._session_factory() as session:
|
||||
rows = (
|
||||
await session.scalars(
|
||||
select(ActivityDefinitionRow).where(
|
||||
ActivityDefinitionRow.trigger_type == "event",
|
||||
ActivityDefinitionRow.enabled.is_(True),
|
||||
)
|
||||
)
|
||||
).all()
|
||||
|
||||
result = []
|
||||
for row in rows:
|
||||
try:
|
||||
cfg = EventTriggerConfig.model_validate(row.trigger_config)
|
||||
result.append((str(row.id), cfg))
|
||||
except Exception:
|
||||
logger.warning("skipping malformed trigger_config for activity %s", row.id)
|
||||
return result
|
||||
|
||||
# T27: Match an envelope against the routing rules.
|
||||
def _matches(self, envelope: EventEnvelope, cfg: EventTriggerConfig) -> bool:
|
||||
"""Return True if the envelope matches the EventTriggerConfig."""
|
||||
if envelope.type != cfg.event_type:
|
||||
return False
|
||||
# All filter key/value pairs must be present in envelope.payload.
|
||||
for key, value in cfg.filters.items():
|
||||
if envelope.payload.get(key) != value:
|
||||
return False
|
||||
return True
|
||||
|
||||
# T28: Start RunActivityWorkflow for a matched activity.
|
||||
async def _dispatch(self, activity_id: str, envelope: EventEnvelope) -> None:
|
||||
"""Start RunActivityWorkflow for one matched activity.
|
||||
|
||||
Workflow ID is deterministic: activity-{activity_id}:{event_id}
|
||||
REJECT_DUPLICATE prevents double-processing if the message is redelivered
|
||||
before ack reaches NATS.
|
||||
"""
|
||||
workflow_id = f"activity-{activity_id}:{envelope.event_id}"
|
||||
try:
|
||||
await self._temporal.start_workflow(
|
||||
"RunActivityWorkflow",
|
||||
args=[activity_id, envelope.event_id, envelope.occurred_at.isoformat()],
|
||||
id=workflow_id,
|
||||
task_queue=_ORCHESTRATOR_TASK_QUEUE,
|
||||
id_conflict_policy=WorkflowIDConflictPolicy.FAIL,
|
||||
)
|
||||
logger.info(
|
||||
"started workflow %r for event %r (activity %s)",
|
||||
workflow_id,
|
||||
envelope.event_id,
|
||||
activity_id,
|
||||
)
|
||||
except WorkflowAlreadyStartedError:
|
||||
# Duplicate delivery — workflow already running or completed; safe to skip.
|
||||
logger.debug("duplicate event %r for activity %s — skipped", envelope.event_id, activity_id)
|
||||
|
||||
async def _handle_message(self, msg: Any) -> None:
|
||||
"""Decode a NATS message, match it against routing rules, and dispatch."""
|
||||
try:
|
||||
raw = json.loads(msg.data.decode())
|
||||
envelope = EventEnvelope.model_validate(raw)
|
||||
except Exception:
|
||||
logger.warning("failed to parse event envelope from NATS message — nacking")
|
||||
await msg.nak()
|
||||
return
|
||||
|
||||
# T27: Reload routing table per message so hot changes take effect.
|
||||
event_defs = await self._load_event_definitions()
|
||||
matched = [aid for aid, cfg in event_defs if self._matches(envelope, cfg)]
|
||||
|
||||
if not matched:
|
||||
logger.debug("event %r type=%r matched no definitions", envelope.event_id, envelope.type)
|
||||
await msg.ack()
|
||||
return
|
||||
|
||||
# T28: Start a workflow for each matched activity.
|
||||
for activity_id in matched:
|
||||
await self._dispatch(activity_id, envelope)
|
||||
|
||||
# Ack only after all dispatches succeed (at-least-once guarantee).
|
||||
await msg.ack()
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Connect to NATS, set up the stream/consumer, and begin processing.
|
||||
|
||||
Blocks until cancelled.
|
||||
"""
|
||||
engine = create_async_engine(self._db_url)
|
||||
self._session_factory = async_sessionmaker(engine, expire_on_commit=False)
|
||||
|
||||
self._nc = await nats.connect(self._nats_url)
|
||||
self._js = self._nc.jetstream()
|
||||
|
||||
await self._ensure_stream(self._js)
|
||||
|
||||
# Durable push consumer — survives restarts, replays unacked messages.
|
||||
sub = await self._js.subscribe(
|
||||
_SUBJECT,
|
||||
durable=_CONSUMER_NAME,
|
||||
cb=self._handle_message,
|
||||
manual_ack=True,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"EventRouter listening on subject %r (stream=%r, consumer=%r)",
|
||||
_SUBJECT,
|
||||
_STREAM_NAME,
|
||||
_CONSUMER_NAME,
|
||||
)
|
||||
|
||||
try:
|
||||
await asyncio.Future() # run until cancelled
|
||||
finally:
|
||||
await sub.unsubscribe()
|
||||
await self._nc.drain()
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
db_url = os.environ.get("ACTCORE_DB_URL")
|
||||
if not db_url:
|
||||
raise RuntimeError("ACTCORE_DB_URL is required")
|
||||
|
||||
temporal_client = await TemporalClient.connect(TEMPORAL_HOST, namespace=TEMPORAL_NAMESPACE)
|
||||
router = EventRouter(
|
||||
nats_url=NATS_URL,
|
||||
temporal_client=temporal_client,
|
||||
db_url=db_url,
|
||||
)
|
||||
await router.start()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user