From 4457d6d6b947b923e8f270fef832f2359a28a824 Mon Sep 17 00:00:00 2001 From: tegwick Date: Sat, 28 Mar 2026 00:10:33 +0100 Subject: [PATCH] chore: add WP-0002 handoff note for CoulombCore continuation All 12 tasks unblocked (broker decision resolved: NATS + JetStream). Work interrupted on workstation due to WSL2 Docker pull issues. Note captures build order, file names, key design decisions, and state hub IDs for seamless pickup on CoulombCore. Co-Authored-By: Claude Sonnet 4.6 --- workplans/WP-0002-next-steps.md | 128 ++++++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) create mode 100644 workplans/WP-0002-next-steps.md diff --git a/workplans/WP-0002-next-steps.md b/workplans/WP-0002-next-steps.md new file mode 100644 index 0000000..794a2f0 --- /dev/null +++ b/workplans/WP-0002-next-steps.md @@ -0,0 +1,128 @@ +--- +type: session-note +created: "2026-03-28" +status: handoff +--- + +# WP-0002 Handoff Note — Continue on CoulombCore + +## Context + +Implementing custodian-WP-0002 (Triggers & Ops). Work interrupted on workstation +due to WSL2 Docker image pull issues. Continue on CoulombCore where Docker runs natively. + +## What was confirmed before handoff + +- **All 12 tasks are unblocked** — broker decision `bc47c9c2` is resolved: **NATS + JetStream** +- Dev stack (Temporal + PostgreSQL + Elasticsearch) needs to be started: + ```bash + cd ~/activity-core + docker compose -f docker-compose.dev.yml up -d + uv run alembic upgrade head + uv run python src/activity_core/seed.py + ``` + +## Build order + +### Phase 4 — Time-based triggers (start here) + +1. **T22** — `src/activity_core/schedule_manager.py` + - `upsert_schedule(client, defn)` — create/update Temporal Schedule from ActivityDefinition + - `delete_schedule(client, activity_id)` — remove schedule + - `list_schedules(client)` — enumerate active schedules + - Map `CronTriggerConfig.misfire_policy` → `ScheduleOverlapPolicy`: + - `"skip"` → `ScheduleOverlapPolicy.SKIP` + - `"catchup"` → `ScheduleOverlapPolicy.BUFFER_ALL` + backfill logic + - `"compress"` → `ScheduleOverlapPolicy.BUFFER_ONE` + - Schedule ID convention: `activity-schedule-{activity_definition.id}` + - Workflow action: start `RunActivityWorkflow` on `orchestrator-tq` + - Only create schedules for `enabled=True` + `trigger_type="cron"` definitions + +2. **T23** — `src/activity_core/sync_schedules.py` (bootstrap/startup script) + - Query all enabled cron ActivityDefinitions from DB + - For each: call `upsert_schedule()` + - Delete Temporal schedules that have no matching DB row (tombstone cleanup) + - Entry point: `uv run python -m activity_core.sync_schedules` + - Also call at top of `worker.py` before entering the run loop + +3. **T24** — Misfire policy in schedule config (part of T22, ensure all 3 policies tested) + - `"catchup"`: after upsert, call `schedule_handle.backfill()` for any gap since last run + +4. **T25** — `tests/test_schedule_lifecycle.py` + - Uses `temporalio[testing]` — `TestWorkflowEnvironment` + - Test: create ActivityDefinition → `upsert_schedule` → schedule exists + - Test: set `enabled=False` → schedule paused + - Test: delete ActivityDefinition → `delete_schedule` → schedule gone + - Test: misfire policy round-trip (skip/catchup/compress → correct OverlapPolicy) + +### Phase 5 — Event-driven triggers (after Phase 4) + +Add `nats-py` (or `nats` async client) to pyproject.toml: +``` +uv add nats-py +``` + +5. **T26** — `src/activity_core/event_router.py` + - `EventRouter` class: connects to NATS JetStream, subscribes to `activity.>` subject + - Durable consumer name: `activity-core-event-router` + - Stream name: `ACTIVITY_EVENTS` + - `async def start(nats_url, temporal_client)` — main loop + - Deserialize message payload → `EventEnvelope` + +6. **T27** — Routing rules in `event_router.py` + - Query DB for all enabled `event` trigger ActivityDefinitions + - Match: `defn.trigger_config.event_type == envelope.type` + - Filter: all `defn.trigger_config.filters` key/value pairs present in `envelope.payload` + - Returns list of matching `activity_id`s + +7. **T28** — Start workflow from Event Router + - For each matched activity_id: `client.start_workflow(RunActivityWorkflow, ...)` + - Workflow ID: `activity-{activity_id}:{envelope.event_id}` (idempotent) + - `id_reuse_policy=WorkflowIDReusePolicy.REJECT_DUPLICATE` + - Ack NATS message only after successful start (at-least-once) + +8. **T29** — `tests/test_event_router.py` + - Integration test: publish event to NATS → assert workflow started in Temporal + - Use `nats-server` in Docker (already in compose or add service) + +### Phase 6 — Observability & Admin + +9. **T30** — `src/activity_core/api.py` (FastAPI) + - `GET /activity-definitions/` — list all + - `GET /activity-definitions/{id}` — get one + - `POST /activity-definitions/` — create (+ upsert_schedule if cron) + - `PUT /activity-definitions/{id}` — update (+ upsert_schedule) + - `DELETE /activity-definitions/{id}` — delete (+ delete_schedule) + - `POST /activity-definitions/{id}/trigger` — manual one-shot start + - Entry point: `uv run uvicorn activity_core.api:app --port 8010` + +10. **T31** — Prometheus metrics + - Temporal SDK emits metrics via `temporalio.service.connect()` `runtime` param + - `from temporalio.runtime import Runtime, TelemetryConfig, PrometheusConfig` + - Expose on `http://0.0.0.0:9090/metrics` + - Add to `worker.py` startup + +11. **T32** — Temporal search attributes + - Register `activity_id` as a custom search attribute in Temporal (type: Keyword) + - In `RunActivityWorkflow`: `workflow.upsert_search_attributes({"activity_id": [str(activity_id)]})` + - Enables filtering in Temporal Web UI: `activity_id = "uuid"` + +12. **T33** — `docs/runbook.md` + - Dev startup, prod deployment, scale-out, troubleshooting common failures + +## State hub workstream + +- Workstream ID: `3a4f47d9-8bc1-434e-acb4-bed5d4dacda0` +- Log progress at end of session: `add_progress_event(workstream_id=...)` +- Update task statuses as each T## completes + +## Key file references + +| File | Purpose | +|------|---------| +| `src/activity_core/models.py` | CronTriggerConfig, EventTriggerConfig, ActivityDefinition | +| `src/activity_core/workflows.py` | RunActivityWorkflow (what schedules will trigger) | +| `src/activity_core/worker.py` | Where to call sync_schedules at startup | +| `docs/conventions.md` | Schedule/workflow ID conventions | +| `docs/idempotency.md` | Deduplication strategy | +| `e2e/tests/test_full_flow.py` | Integration test pattern to follow |