generated from coulomb/repo-seed
Delivers all 12 tasks (T22–T33): Temporal Schedule manager + startup sync, NATS JetStream event router, FastAPI CRUD + manual trigger, Prometheus metrics wiring, custom search-attribute tagging, and operational runbook. Marks workplan status as done. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
304 lines
9.6 KiB
Python
304 lines
9.6 KiB
Python
"""T29: Integration test — publish event → observe workflow run.
|
|
|
|
Requires the docker compose stack to be running including NATS:
|
|
docker compose -f docker-compose.dev.yml up -d
|
|
|
|
Run with:
|
|
ACTCORE_DB_URL=postgresql+asyncpg://actcore:actcore@localhost:5433/actcore \
|
|
NATS_URL=nats://localhost:4222 \
|
|
TEMPORAL_HOST=localhost:7233 \
|
|
uv run pytest tests/test_event_router.py -v -s
|
|
|
|
These tests are skipped automatically if NATS or Temporal is unreachable.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import pytest
|
|
|
|
from activity_core.event_router import EventRouter
|
|
from activity_core.models import EventEnvelope, EventTriggerConfig
|
|
|
|
|
|
# ── Unit tests (no external deps) ────────────────────────────────────────────
|
|
|
|
def _make_envelope(
|
|
event_type: str = "user.created",
|
|
payload: dict | None = None,
|
|
) -> EventEnvelope:
|
|
return EventEnvelope(
|
|
event_id=str(uuid.uuid4()),
|
|
type=event_type,
|
|
source="test-service",
|
|
occurred_at=datetime.now(tz=timezone.utc),
|
|
subject="user/123",
|
|
trace_id=str(uuid.uuid4()),
|
|
payload=payload or {},
|
|
)
|
|
|
|
|
|
def _make_router() -> EventRouter:
|
|
"""Return an EventRouter wired with mock clients (no real connections)."""
|
|
temporal_mock = MagicMock()
|
|
return EventRouter(
|
|
nats_url="nats://localhost:4222",
|
|
temporal_client=temporal_mock,
|
|
db_url="postgresql+asyncpg://actcore:actcore@localhost:5433/actcore",
|
|
)
|
|
|
|
|
|
# T27: _matches unit tests
|
|
|
|
def test_matches_exact_event_type() -> None:
|
|
router = _make_router()
|
|
cfg = EventTriggerConfig(event_type="user.created")
|
|
envelope = _make_envelope(event_type="user.created")
|
|
assert router._matches(envelope, cfg)
|
|
|
|
|
|
def test_matches_wrong_event_type() -> None:
|
|
router = _make_router()
|
|
cfg = EventTriggerConfig(event_type="user.updated")
|
|
envelope = _make_envelope(event_type="user.created")
|
|
assert not router._matches(envelope, cfg)
|
|
|
|
|
|
def test_matches_with_filters_all_present() -> None:
|
|
router = _make_router()
|
|
cfg = EventTriggerConfig(
|
|
event_type="user.created",
|
|
filters={"region": "eu", "tier": "pro"},
|
|
)
|
|
envelope = _make_envelope(
|
|
event_type="user.created",
|
|
payload={"region": "eu", "tier": "pro", "extra": "ignored"},
|
|
)
|
|
assert router._matches(envelope, cfg)
|
|
|
|
|
|
def test_matches_with_filters_partial_missing() -> None:
|
|
router = _make_router()
|
|
cfg = EventTriggerConfig(
|
|
event_type="user.created",
|
|
filters={"region": "eu", "tier": "pro"},
|
|
)
|
|
envelope = _make_envelope(
|
|
event_type="user.created",
|
|
payload={"region": "eu"}, # "tier" missing
|
|
)
|
|
assert not router._matches(envelope, cfg)
|
|
|
|
|
|
def test_matches_with_filters_wrong_value() -> None:
|
|
router = _make_router()
|
|
cfg = EventTriggerConfig(event_type="order.placed", filters={"status": "paid"})
|
|
envelope = _make_envelope(event_type="order.placed", payload={"status": "pending"})
|
|
assert not router._matches(envelope, cfg)
|
|
|
|
|
|
# T28: _dispatch unit test (mocked Temporal client)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dispatch_starts_workflow_with_correct_id() -> None:
|
|
temporal_mock = AsyncMock()
|
|
handle_mock = AsyncMock()
|
|
temporal_mock.start_workflow.return_value = handle_mock
|
|
|
|
router = EventRouter(
|
|
nats_url="nats://localhost:4222",
|
|
temporal_client=temporal_mock,
|
|
db_url="postgresql+asyncpg://actcore:actcore@localhost:5433/actcore",
|
|
)
|
|
|
|
activity_id = str(uuid.uuid4())
|
|
envelope = _make_envelope()
|
|
|
|
await router._dispatch(activity_id, envelope)
|
|
|
|
expected_id = f"activity-{activity_id}:{envelope.event_id}"
|
|
temporal_mock.start_workflow.assert_called_once()
|
|
call_args = temporal_mock.start_workflow.call_args
|
|
assert call_args.kwargs["id"] == expected_id
|
|
assert call_args.args[0] == "RunActivityWorkflow"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dispatch_duplicate_event_is_silently_skipped() -> None:
|
|
from temporalio.exceptions import WorkflowAlreadyStartedError
|
|
|
|
temporal_mock = AsyncMock()
|
|
temporal_mock.start_workflow.side_effect = WorkflowAlreadyStartedError(
|
|
workflow_id="activity-x:y", run_id="z", workflow_type="RunActivityWorkflow"
|
|
)
|
|
|
|
router = EventRouter(
|
|
nats_url="nats://localhost:4222",
|
|
temporal_client=temporal_mock,
|
|
db_url="postgresql+asyncpg://actcore:actcore@localhost:5433/actcore",
|
|
)
|
|
|
|
# Should not raise
|
|
await router._dispatch(str(uuid.uuid4()), _make_envelope())
|
|
|
|
|
|
# T28: _handle_message unit test (mocked NATS message)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handle_message_invalid_json_nacks() -> None:
|
|
router = _make_router()
|
|
router._session_factory = None # not needed for this test path
|
|
|
|
msg = MagicMock()
|
|
msg.data = b"not-json"
|
|
msg.nak = AsyncMock()
|
|
msg.ack = AsyncMock()
|
|
|
|
await router._handle_message(msg)
|
|
|
|
msg.nak.assert_called_once()
|
|
msg.ack.assert_not_called()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handle_message_no_match_acks_without_dispatch() -> None:
|
|
temporal_mock = AsyncMock()
|
|
router = EventRouter(
|
|
nats_url="nats://localhost:4222",
|
|
temporal_client=temporal_mock,
|
|
db_url="postgresql+asyncpg://actcore:actcore@localhost:5433/actcore",
|
|
)
|
|
# Patch _load_event_definitions to return empty (no definitions match)
|
|
router._load_event_definitions = AsyncMock(return_value=[])
|
|
|
|
envelope = _make_envelope()
|
|
msg = MagicMock()
|
|
msg.data = envelope.model_dump_json().encode()
|
|
msg.ack = AsyncMock()
|
|
msg.nak = AsyncMock()
|
|
|
|
await router._handle_message(msg)
|
|
|
|
msg.ack.assert_called_once()
|
|
temporal_mock.start_workflow.assert_not_called()
|
|
|
|
|
|
# ── Integration tests (require docker-compose stack) ─────────────────────────
|
|
|
|
NATS_URL = os.environ.get("NATS_URL", "nats://localhost:4222")
|
|
TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233")
|
|
ACTCORE_DB_URL = os.environ.get(
|
|
"ACTCORE_DB_URL",
|
|
"postgresql+asyncpg://actcore:actcore@localhost:5433/actcore",
|
|
)
|
|
|
|
|
|
async def _nats_reachable() -> bool:
|
|
try:
|
|
import nats
|
|
nc = await nats.connect(NATS_URL, connect_timeout=2)
|
|
await nc.close()
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
async def _temporal_reachable() -> bool:
|
|
try:
|
|
from temporalio.client import Client
|
|
client = await Client.connect(TEMPORAL_HOST)
|
|
await client.service_client.health_check()
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
async def integration_skip():
|
|
"""Skip the integration block if NATS or Temporal is unreachable."""
|
|
if not (await _nats_reachable() and await _temporal_reachable()):
|
|
pytest.skip("NATS and/or Temporal not reachable — skipping integration tests")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_publish_event_starts_workflow(integration_skip: None) -> None:
|
|
"""Publish a NATS event and verify RunActivityWorkflow is started in Temporal."""
|
|
import nats as nats_lib
|
|
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
|
|
from sqlalchemy import select
|
|
from temporalio.client import Client, WorkflowExecutionStatus
|
|
|
|
from activity_core.orm import ActivityDefinition as ActivityDefinitionRow
|
|
|
|
# Create an event-triggered ActivityDefinition in the DB.
|
|
engine = create_async_engine(ACTCORE_DB_URL)
|
|
session_factory = async_sessionmaker(engine, expire_on_commit=False)
|
|
activity_id = uuid.uuid4()
|
|
event_type = f"test.event.{uuid.uuid4().hex[:8]}"
|
|
|
|
async with session_factory() as session:
|
|
async with session.begin():
|
|
row = ActivityDefinitionRow(
|
|
id=activity_id,
|
|
name=f"integration-test-{activity_id}",
|
|
enabled=True,
|
|
trigger_type="event",
|
|
trigger_config={"trigger_type": "event", "event_type": event_type, "filters": {}},
|
|
context_sources=[],
|
|
task_templates=[],
|
|
dedupe_key_strategy="skip",
|
|
version=1,
|
|
)
|
|
session.add(row)
|
|
|
|
temporal_client = await Client.connect(TEMPORAL_HOST)
|
|
router = EventRouter(
|
|
nats_url=NATS_URL,
|
|
temporal_client=temporal_client,
|
|
db_url=ACTCORE_DB_URL,
|
|
)
|
|
|
|
# Start the router in the background.
|
|
router_task = asyncio.create_task(router.start())
|
|
await asyncio.sleep(1) # allow subscription to establish
|
|
|
|
# Publish a matching event.
|
|
event_id = str(uuid.uuid4())
|
|
envelope = EventEnvelope(
|
|
event_id=event_id,
|
|
type=event_type,
|
|
source="integration-test",
|
|
occurred_at=datetime.now(tz=timezone.utc),
|
|
subject="test/1",
|
|
trace_id=str(uuid.uuid4()),
|
|
)
|
|
|
|
nc = await nats_lib.connect(NATS_URL)
|
|
await nc.publish(f"activity.{event_type}", envelope.model_dump_json().encode())
|
|
await nc.flush()
|
|
await nc.close()
|
|
|
|
# Give the router time to process and Temporal time to receive the start.
|
|
await asyncio.sleep(3)
|
|
|
|
# Assert the workflow was started.
|
|
expected_wf_id = f"activity-{activity_id}:{event_id}"
|
|
try:
|
|
desc = await temporal_client.get_workflow_handle(expected_wf_id).describe()
|
|
assert desc is not None, "Workflow handle should exist"
|
|
except Exception as e:
|
|
pytest.fail(f"Workflow {expected_wf_id!r} was not started: {e}")
|
|
finally:
|
|
router_task.cancel()
|
|
try:
|
|
await router_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
await engine.dispose()
|