--- type: session-note created: "2026-03-28" status: handoff --- # WP-0002 Handoff Note — Continue on CoulombCore ## Context Implementing custodian-WP-0002 (Triggers & Ops). Work interrupted on workstation due to WSL2 Docker image pull issues. Continue on CoulombCore where Docker runs natively. ## What was confirmed before handoff - **All 12 tasks are unblocked** — broker decision `bc47c9c2` is resolved: **NATS + JetStream** - Dev stack (Temporal + PostgreSQL + Elasticsearch) needs to be started: ```bash cd ~/activity-core docker compose -f docker-compose.dev.yml up -d uv run alembic upgrade head uv run python src/activity_core/seed.py ``` ## Build order ### Phase 4 — Time-based triggers (start here) 1. **T22** — `src/activity_core/schedule_manager.py` - `upsert_schedule(client, defn)` — create/update Temporal Schedule from ActivityDefinition - `delete_schedule(client, activity_id)` — remove schedule - `list_schedules(client)` — enumerate active schedules - Map `CronTriggerConfig.misfire_policy` → `ScheduleOverlapPolicy`: - `"skip"` → `ScheduleOverlapPolicy.SKIP` - `"catchup"` → `ScheduleOverlapPolicy.BUFFER_ALL` + backfill logic - `"compress"` → `ScheduleOverlapPolicy.BUFFER_ONE` - Schedule ID convention: `activity-schedule-{activity_definition.id}` - Workflow action: start `RunActivityWorkflow` on `orchestrator-tq` - Only create schedules for `enabled=True` + `trigger_type="cron"` definitions 2. **T23** — `src/activity_core/sync_schedules.py` (bootstrap/startup script) - Query all enabled cron ActivityDefinitions from DB - For each: call `upsert_schedule()` - Delete Temporal schedules that have no matching DB row (tombstone cleanup) - Entry point: `uv run python -m activity_core.sync_schedules` - Also call at top of `worker.py` before entering the run loop 3. **T24** — Misfire policy in schedule config (part of T22, ensure all 3 policies tested) - `"catchup"`: after upsert, call `schedule_handle.backfill()` for any gap since last run 4. **T25** — `tests/test_schedule_lifecycle.py` - Uses `temporalio[testing]` — `TestWorkflowEnvironment` - Test: create ActivityDefinition → `upsert_schedule` → schedule exists - Test: set `enabled=False` → schedule paused - Test: delete ActivityDefinition → `delete_schedule` → schedule gone - Test: misfire policy round-trip (skip/catchup/compress → correct OverlapPolicy) ### Phase 5 — Event-driven triggers (after Phase 4) Add `nats-py` (or `nats` async client) to pyproject.toml: ``` uv add nats-py ``` 5. **T26** — `src/activity_core/event_router.py` - `EventRouter` class: connects to NATS JetStream, subscribes to `activity.>` subject - Durable consumer name: `activity-core-event-router` - Stream name: `ACTIVITY_EVENTS` - `async def start(nats_url, temporal_client)` — main loop - Deserialize message payload → `EventEnvelope` 6. **T27** — Routing rules in `event_router.py` - Query DB for all enabled `event` trigger ActivityDefinitions - Match: `defn.trigger_config.event_type == envelope.type` - Filter: all `defn.trigger_config.filters` key/value pairs present in `envelope.payload` - Returns list of matching `activity_id`s 7. **T28** — Start workflow from Event Router - For each matched activity_id: `client.start_workflow(RunActivityWorkflow, ...)` - Workflow ID: `activity-{activity_id}:{envelope.event_id}` (idempotent) - `id_reuse_policy=WorkflowIDReusePolicy.REJECT_DUPLICATE` - Ack NATS message only after successful start (at-least-once) 8. **T29** — `tests/test_event_router.py` - Integration test: publish event to NATS → assert workflow started in Temporal - Use `nats-server` in Docker (already in compose or add service) ### Phase 6 — Observability & Admin 9. **T30** — `src/activity_core/api.py` (FastAPI) - `GET /activity-definitions/` — list all - `GET /activity-definitions/{id}` — get one - `POST /activity-definitions/` — create (+ upsert_schedule if cron) - `PUT /activity-definitions/{id}` — update (+ upsert_schedule) - `DELETE /activity-definitions/{id}` — delete (+ delete_schedule) - `POST /activity-definitions/{id}/trigger` — manual one-shot start - Entry point: `uv run uvicorn activity_core.api:app --port 8010` 10. **T31** — Prometheus metrics - Temporal SDK emits metrics via `temporalio.service.connect()` `runtime` param - `from temporalio.runtime import Runtime, TelemetryConfig, PrometheusConfig` - Expose on `http://0.0.0.0:9090/metrics` - Add to `worker.py` startup 11. **T32** — Temporal search attributes - Register `activity_id` as a custom search attribute in Temporal (type: Keyword) - In `RunActivityWorkflow`: `workflow.upsert_search_attributes({"activity_id": [str(activity_id)]})` - Enables filtering in Temporal Web UI: `activity_id = "uuid"` 12. **T33** — `docs/runbook.md` - Dev startup, prod deployment, scale-out, troubleshooting common failures ## State hub workstream - Workstream ID: `3a4f47d9-8bc1-434e-acb4-bed5d4dacda0` - Log progress at end of session: `add_progress_event(workstream_id=...)` - Update task statuses as each T## completes ## Key file references | File | Purpose | |------|---------| | `src/activity_core/models.py` | CronTriggerConfig, EventTriggerConfig, ActivityDefinition | | `src/activity_core/workflows.py` | RunActivityWorkflow (what schedules will trigger) | | `src/activity_core/worker.py` | Where to call sync_schedules at startup | | `docs/conventions.md` | Schedule/workflow ID conventions | | `docs/idempotency.md` | Deduplication strategy | | `e2e/tests/test_full_flow.py` | Integration test pattern to follow |