Files
activity-core/docs/runbook.md
tegwick ea5fbe0bf3 feat(WP-0002): complete Triggers & Ops workstream
Delivers all 12 tasks (T22–T33): Temporal Schedule manager + startup
sync, NATS JetStream event router, FastAPI CRUD + manual trigger,
Prometheus metrics wiring, custom search-attribute tagging, and
operational runbook. Marks workplan status as done.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-28 01:04:43 +01:00

6.9 KiB

activity-core Operational Runbook

Dev environment — quick start

# 1. Start the full stack (Temporal + PostgreSQL + Elasticsearch + NATS)
docker compose -f docker-compose.dev.yml up -d

# 2. Apply DB migrations
uv run alembic upgrade head

# 3. Seed initial ActivityDefinitions
uv run python src/activity_core/seed.py

# 4. Register custom Temporal search attributes (one-time per namespace)
docker exec temporal temporal operator search-attribute create \
  --name ActivityId   --type Keyword \
  --name ActivityName --type Keyword \
  --address temporal:7233

# 5. Start the worker (syncs schedules automatically on startup)
TEMPORAL_HOST=localhost:7233 \
ACTCORE_DB_URL=postgresql+asyncpg://actcore:actcore@localhost:5433/actcore \
  uv run python -m activity_core.worker

# 6. Start the Event Router (in a second terminal)
TEMPORAL_HOST=localhost:7233 \
ACTCORE_DB_URL=postgresql+asyncpg://actcore:actcore@localhost:5433/actcore \
NATS_URL=nats://localhost:4222 \
  uv run python -m activity_core.event_router

# 7. Start the REST API (in a third terminal)
TEMPORAL_HOST=localhost:7233 \
ACTCORE_DB_URL=postgresql+asyncpg://actcore:actcore@localhost:5433/actcore \
  uv run uvicorn activity_core.api:app --port 8010 --reload

Endpoints

Service URL
Temporal Web UI http://localhost:8080
REST API docs (Swagger) http://localhost:8010/docs
NATS monitoring http://localhost:8222
Prometheus metrics (worker) http://localhost:9090/metrics

REST API — common operations

# List all ActivityDefinitions
curl http://localhost:8010/activity-definitions/

# Create a cron ActivityDefinition (fires every weekday at 09:00 Berlin time)
curl -s -X POST http://localhost:8010/activity-definitions/ \
  -H "Content-Type: application/json" -d '{
    "name": "daily-report",
    "trigger_config": {
      "trigger_type": "cron",
      "cron_expression": "0 9 * * 1-5",
      "timezone": "Europe/Berlin",
      "misfire_policy": "skip"
    }
  }'

# Create an event-triggered ActivityDefinition
curl -s -X POST http://localhost:8010/activity-definitions/ \
  -H "Content-Type: application/json" -d '{
    "name": "user-onboarding",
    "trigger_config": {
      "trigger_type": "event",
      "event_type": "user.created",
      "filters": {"tier": "pro"}
    }
  }'

# Manually trigger a one-shot run
curl -s -X POST http://localhost:8010/activity-definitions/<id>/trigger

# Disable an activity (pauses its schedule)
curl -s -X PUT http://localhost:8010/activity-definitions/<id> \
  -H "Content-Type: application/json" -d '{"enabled": false}'

Publishing events to the Event Router

The Event Router subscribes to the activity.> NATS subject on the ACTIVITY_EVENTS stream.

import asyncio, json, nats
from datetime import datetime, timezone
import uuid

async def publish():
    nc = await nats.connect("nats://localhost:4222")
    js = nc.jetstream()
    envelope = {
        "event_id": str(uuid.uuid4()),
        "type": "user.created",
        "source": "user-service",
        "occurred_at": datetime.now(tz=timezone.utc).isoformat(),
        "subject": "user/42",
        "trace_id": str(uuid.uuid4()),
        "payload": {"tier": "pro", "region": "eu"},
    }
    await js.publish("activity.user.created", json.dumps(envelope).encode())
    await nc.drain()

asyncio.run(publish())

Syncing schedules manually

TEMPORAL_HOST=localhost:7233 \
ACTCORE_DB_URL=postgresql+asyncpg://actcore:actcore@localhost:5433/actcore \
  uv run python -m activity_core.sync_schedules

This reconciles all Temporal Schedules with the activity_definitions table:

  • Upserts schedules for every enabled cron definition
  • Creates paused schedules for disabled cron definitions
  • Deletes orphaned schedules with no matching DB row

Temporal UI — filtering by activity

With search attributes registered, you can filter in the Temporal Web UI:

ActivityId = "your-activity-uuid"

Or via tctl:

docker exec temporal-admin-tools temporal workflow list \
  --query 'ActivityId="<uuid>"' \
  --address temporal:7233

Scale-out

Multiple worker replicas

Temporal workers are stateless and horizontally scalable. Run additional worker processes to increase throughput on orchestrator-tq and task-execution-tq.

Each worker registers the same workflows/activities — Temporal distributes tasks across all pollers automatically.

Important: Only one process should call sync_schedules at startup to avoid race conditions. Consider disabling the startup sync on secondary worker replicas via an env var:

SKIP_SCHEDULE_SYNC=true uv run python -m activity_core.worker

(Implement the SKIP_SCHEDULE_SYNC check in worker.py when needed.)

Multiple Event Router replicas

The durable NATS consumer (activity-core-event-router) ensures that only one subscriber processes each message. Running multiple event_router processes with the same durable consumer name provides automatic failover.


Troubleshooting

Worker fails to start: "ACTCORE_DB_URL is required"

Set the environment variable before running the worker.

Schedule not firing

  1. Check Temporal UI → Schedules tab for the schedule status.
  2. Ensure enabled=True on the ActivityDefinition (paused schedules don't fire).
  3. Verify the cron expression with: docker exec temporal-admin-tools temporal schedule describe --schedule-id activity-schedule-<uuid>

Event not routing

  1. Check NATS monitoring: http://localhost:8222/jsz to verify the ACTIVITY_EVENTS stream exists.
  2. Verify the consumer is active: http://localhost:8222/jsz?consumers=true
  3. Check Event Router logs for "matched no definitions" — the event type may not match any enabled ActivityDefinition.
  4. Check trigger_config.filters — all key/value pairs must match the event payload exactly.

Workflow stuck / not completing

  1. Open Temporal UI → find the workflow by ID or ActivityId search attribute.
  2. Check the workflow history for failed activities.
  3. Common causes:
    • DB connection lost during load_activity_definition or log_run
    • Activity retry exhausted (check maximum_attempts=10)
    • ActivityDefinition row was deleted while workflow was running

Prometheus metrics not appearing

  1. Confirm the worker is running with PROMETHEUS_BIND_ADDR set.
  2. curl http://localhost:9090/metrics should return Temporal SDK metrics.
  3. If port 9090 conflicts with Prometheus server, set PROMETHEUS_BIND_ADDR=0.0.0.0:9091.

DB migration drift

uv run alembic current    # show current revision
uv run alembic upgrade head  # apply pending migrations
uv run alembic history    # show full migration history

Wipe and restart dev stack

docker compose -f docker-compose.dev.yml down -v   # removes all volumes
docker compose -f docker-compose.dev.yml up -d
uv run alembic upgrade head
uv run python src/activity_core/seed.py
# Re-register search attributes (see Dev environment step 4)