13 KiB
activity-core Operational Runbook
Dev environment — quick start
# 1. Start the full stack (Temporal + PostgreSQL + Elasticsearch + NATS)
docker compose -f docker-compose.dev.yml up -d
# 2. Apply DB migrations
uv run alembic upgrade head
# 3. Seed initial ActivityDefinitions
uv run python src/activity_core/seed.py
# 4. Register custom Temporal search attributes (one-time per namespace)
docker exec temporal temporal operator search-attribute create \
--name ActivityId --type Keyword \
--name ActivityName --type Keyword \
--address temporal:7233
# 5. Start the worker (syncs schedules automatically on startup)
TEMPORAL_HOST=localhost:7233 \
ACTCORE_DB_URL=postgresql+asyncpg://actcore:actcore@localhost:5433/actcore \
uv run python -m activity_core.worker
# 6. Start the Event Router (in a second terminal)
TEMPORAL_HOST=localhost:7233 \
ACTCORE_DB_URL=postgresql+asyncpg://actcore:actcore@localhost:5433/actcore \
NATS_URL=nats://localhost:4222 \
uv run python -m activity_core.event_router
# 7. Start the REST API (in a third terminal)
TEMPORAL_HOST=localhost:7233 \
ACTCORE_DB_URL=postgresql+asyncpg://actcore:actcore@localhost:5433/actcore \
uv run uvicorn activity_core.api:app --port 8010 --reload
Endpoints
| Service | URL |
|---|---|
| Temporal Web UI | http://localhost:8080 |
| REST API docs (Swagger) | http://localhost:8010/docs |
| NATS monitoring | http://localhost:8222 |
| Prometheus metrics (worker) | http://localhost:9090/metrics |
REST API — common operations
# List all ActivityDefinitions
curl http://localhost:8010/activity-definitions/
# Create a cron ActivityDefinition (fires every weekday at 09:00 Berlin time)
curl -s -X POST http://localhost:8010/activity-definitions/ \
-H "Content-Type: application/json" -d '{
"name": "daily-report",
"trigger_config": {
"trigger_type": "cron",
"cron_expression": "0 9 * * 1-5",
"timezone": "Europe/Berlin",
"misfire_policy": "skip"
}
}'
# Create an event-triggered ActivityDefinition
curl -s -X POST http://localhost:8010/activity-definitions/ \
-H "Content-Type: application/json" -d '{
"name": "user-onboarding",
"trigger_config": {
"trigger_type": "event",
"event_type": "user.created",
"filters": {"tier": "pro"}
}
}'
# Manually trigger a one-shot run
curl -s -X POST http://localhost:8010/activity-definitions/<id>/trigger
# Disable an activity (pauses its schedule)
curl -s -X PUT http://localhost:8010/activity-definitions/<id> \
-H "Content-Type: application/json" -d '{"enabled": false}'
Publishing events to the Event Router
The Event Router subscribes to the activity.> NATS subject on the ACTIVITY_EVENTS stream.
import asyncio, json, nats
from datetime import datetime, timezone
import uuid
async def publish():
nc = await nats.connect("nats://localhost:4222")
js = nc.jetstream()
envelope = {
"event_id": str(uuid.uuid4()),
"type": "user.created",
"source": "user-service",
"occurred_at": datetime.now(tz=timezone.utc).isoformat(),
"subject": "user/42",
"trace_id": str(uuid.uuid4()),
"payload": {"tier": "pro", "region": "eu"},
}
await js.publish("activity.user.created", json.dumps(envelope).encode())
await nc.drain()
asyncio.run(publish())
Syncing schedules manually
TEMPORAL_HOST=localhost:7233 \
ACTCORE_DB_URL=postgresql+asyncpg://actcore:actcore@localhost:5433/actcore \
uv run python -m activity_core.sync_schedules
This reconciles all Temporal Schedules with the activity_definitions table:
- Upserts schedules for every enabled cron definition
- Creates paused schedules for disabled cron definitions
- Deletes orphaned schedules with no matching DB row
Temporal UI — filtering by activity
With search attributes registered, you can filter in the Temporal Web UI:
ActivityId = "your-activity-uuid"
Or via tctl:
docker exec temporal-admin-tools temporal workflow list \
--query 'ActivityId="<uuid>"' \
--address temporal:7233
Daily State Hub WSJF triage verification
Use this when answering: "did today's daily triage run happen?"
Set the ActivityDefinition id when known. If it is not known, pass the definition name used in the environment and let the live helper resolve it from Postgres.
export DAILY_TRIAGE_ACTIVITY_ID=<daily-triage-activity-definition-uuid>
# Dry-run checklist; safe from any shell because it only prints checks.
uv run python scripts/verify_daily_triage.py \
--activity-id "$DAILY_TRIAGE_ACTIVITY_ID" \
--date "$(date -u +%F)"
# Live check from a shell with Temporal, DB, State Hub, and working-memory access.
ACTCORE_DB_URL=postgresql+asyncpg://actcore:actcore@localhost:5433/actcore \
TEMPORAL_HOST=localhost:7233 \
STATE_HUB_URL=http://127.0.0.1:8000 \
uv run python scripts/verify_daily_triage.py \
--activity-id "$DAILY_TRIAGE_ACTIVITY_ID" \
--working-memory-dir /home/worsch/the-custodian/memory/working \
--live
The verification is complete when all of these agree:
- Temporal schedule
activity-schedule-$DAILY_TRIAGE_ACTIVITY_IDexists, is not paused, and uses theskipoverlap policy. - The latest workflow found with
ActivityId="$DAILY_TRIAGE_ACTIVITY_ID"either completed or is visibly retrying a failed activity in history. activity_runshas a row for the daily triage ActivityDefinition with today'sscheduled_fororfired_atdate.- State Hub
/progress/contains adaily_triageevent whose detail includes the sameactivity_core_run_idand itsoutput_validatedflag. - The working-memory sink wrote
daily-triage-YYYY-MM-DD-<run>.mdand its frontmatter contains the sameactivity_core_run_idand validation metadata. - The ActivityDefinition's instruction model, token budget, and sink timeouts fit
under
ACTIVITY_TIMEOUT_SECONDS(default 900 seconds). Temporal retries each activity up to 10 attempts, so a slow LLM or sink failure should show as workflow retry history rather than a silent missing report.
Expected missed-run behavior: the daily triage definition should use
misfire_policy: skip. Planned downtime does not catch up missed daily reports;
the next scheduled fire is the next authoritative run.
Scale-out
Multiple worker replicas
Temporal workers are stateless and horizontally scalable. Run additional worker
processes to increase throughput on orchestrator-tq and task-execution-tq.
Each worker registers the same workflows/activities — Temporal distributes tasks across all pollers automatically.
Important: Only one process should call sync_schedules at startup to avoid
race conditions. Consider disabling the startup sync on secondary worker replicas
via an env var:
SKIP_SCHEDULE_SYNC=true uv run python -m activity_core.worker
(Implement the SKIP_SCHEDULE_SYNC check in worker.py when needed.)
Multiple Event Router replicas
The durable NATS consumer (activity-core-event-router) ensures that only one
subscriber processes each message. Running multiple event_router processes with
the same durable consumer name provides automatic failover.
Troubleshooting
Worker fails to start: "ACTCORE_DB_URL is required"
Set the environment variable before running the worker.
Schedule not firing
- Check Temporal UI → Schedules tab for the schedule status.
- Ensure
enabled=Trueon the ActivityDefinition (paused schedules don't fire). - Verify the cron expression with:
docker exec temporal-admin-tools temporal schedule describe --schedule-id activity-schedule-<uuid>
Event not routing
- Check NATS monitoring: http://localhost:8222/jsz to verify the
ACTIVITY_EVENTSstream exists. - Verify the consumer is active: http://localhost:8222/jsz?consumers=true
- Check Event Router logs for "matched no definitions" — the event type may not match any enabled ActivityDefinition.
- Check
trigger_config.filters— all key/value pairs must match the event payload exactly.
Workflow stuck / not completing
- Open Temporal UI → find the workflow by ID or ActivityId search attribute.
- Check the workflow history for failed activities.
- Common causes:
- DB connection lost during
load_activity_definitionorlog_run - Activity retry exhausted (check
maximum_attempts=10) ActivityDefinitionrow was deleted while workflow was running
- DB connection lost during
Prometheus metrics not appearing
- Confirm the worker is running with
PROMETHEUS_BIND_ADDRset. curl http://localhost:9090/metricsshould return Temporal SDK metrics.- If port 9090 conflicts with Prometheus server, set
PROMETHEUS_BIND_ADDR=0.0.0.0:9091.
Production alerting and failure modes
Kubernetes health expectations:
kubectl -n activity-core get deploy actcore-worker actcore-api actcore-event-router
kubectl -n activity-core get pods -l app.kubernetes.io/part-of=activity-core
kubectl -n activity-core port-forward svc/actcore-worker-metrics 9090:9090
curl -sf http://127.0.0.1:9090/metrics
Page an operator when:
actcore-workerhas no ready pod, cannot connect to Temporal, or cannot reach Postgres.- The daily triage schedule is missing or paused outside an approved maintenance window.
- The expected daily triage run is absent from Temporal and
activity_runsafter the retry window. - Both State Hub progress and working-memory report sinks are missing for a completed run.
- Report sink or task emission failures repeat across Temporal retries.
Leave a State Hub progress note, but do not page, when:
- A planned outage caused one skipped run and the schedule is healthy again.
- A sink idempotency check reports
existsfor the expected run id. - An instruction report has
output_validated=falsebut still emitted a validation-failure note preserving partial model output for review. - The report completed but calibration feedback says the recommendations were noisy, too long, or under-sensitive.
Handle in the next operator session:
- Prompt/schema tuning, loose-end sensitivity, and stale-but-parked work calibration.
- Non-urgent schedule jitter or timeout adjustments.
- Moving a task sink from
ISSUE_SINK_TYPE=nullto the real issue-core endpoint after a dry-run contract check has passed.
DB migration drift
uv run alembic current # show current revision
uv run alembic upgrade head # apply pending migrations
uv run alembic history # show full migration history
Railiance Deployment
Pre-requisites
- Docker ≥ 24 with Compose v2 (
docker composenotdocker-compose) - ≥ 4 GB RAM available (Temporal server takes ~1 GB)
- Ports available: 4222 (NATS), 7233 (Temporal gRPC), 8010 (API), 8080 (Temporal UI), 9090 (Prometheus metrics)
First-time setup
# 1. Copy and edit the env file — fill in all secrets and URLs
cp .env.example .env
# 2. Build the image and start all services
make railiance-up
# 3. Wait for health (retry until 200)
curl -sf http://localhost:8010/health # → {"status":"ok","db":true,"temporal":true}
# 4. Register Temporal search attributes (one-time per namespace)
docker exec actcore-temporal temporal operator search-attribute create \
--name ActivityId --type Keyword \
--name ActivityName --type Keyword \
--address temporal:7233
# 5. Load event types and activity definitions
make sync-all
Upgrade procedure
git pull
make railiance-up # rebuilds image, restarts changed services
make migrate # apply any new migrations (safe to run when none pending)
curl -sf http://localhost:8010/health
Health verification
# API health (db + temporal probes)
curl -s http://localhost:8010/health | python3 -m json.tool
# Temporal UI
open http://localhost:8080
# Prometheus metrics
curl -s http://localhost:9090/metrics | head -20
Common ops
# Follow logs for one service
docker compose -f docker-compose.railiance.yml logs -f actcore-worker
# Restart one service without bringing down others
docker compose -f docker-compose.railiance.yml restart actcore-api
# Re-run migrations manually
docker compose -f docker-compose.railiance.yml run --rm actcore-migrate
# Wipe and reset (DESTRUCTIVE — deletes all volumes including DB data)
make railiance-down
docker volume rm activity-core_temporal-db-data activity-core_app-db-data activity-core_nats-data
make railiance-up
Wipe and restart dev stack
docker compose -f docker-compose.dev.yml down -v # removes all volumes
docker compose -f docker-compose.dev.yml up -d
uv run alembic upgrade head
uv run python src/activity_core/seed.py
# Re-register search attributes (see Dev environment step 4)