chore: add WP-0002 handoff note for CoulombCore continuation

All 12 tasks unblocked (broker decision resolved: NATS + JetStream).
Work interrupted on workstation due to WSL2 Docker pull issues.
Note captures build order, file names, key design decisions, and
state hub IDs for seamless pickup on CoulombCore.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-28 00:10:33 +01:00
parent 3593f76361
commit 4457d6d6b9

View File

@@ -0,0 +1,128 @@
---
type: session-note
created: "2026-03-28"
status: handoff
---
# WP-0002 Handoff Note — Continue on CoulombCore
## Context
Implementing custodian-WP-0002 (Triggers & Ops). Work interrupted on workstation
due to WSL2 Docker image pull issues. Continue on CoulombCore where Docker runs natively.
## What was confirmed before handoff
- **All 12 tasks are unblocked** — broker decision `bc47c9c2` is resolved: **NATS + JetStream**
- Dev stack (Temporal + PostgreSQL + Elasticsearch) needs to be started:
```bash
cd ~/activity-core
docker compose -f docker-compose.dev.yml up -d
uv run alembic upgrade head
uv run python src/activity_core/seed.py
```
## Build order
### Phase 4 — Time-based triggers (start here)
1. **T22** — `src/activity_core/schedule_manager.py`
- `upsert_schedule(client, defn)` — create/update Temporal Schedule from ActivityDefinition
- `delete_schedule(client, activity_id)` — remove schedule
- `list_schedules(client)` — enumerate active schedules
- Map `CronTriggerConfig.misfire_policy` → `ScheduleOverlapPolicy`:
- `"skip"` → `ScheduleOverlapPolicy.SKIP`
- `"catchup"` → `ScheduleOverlapPolicy.BUFFER_ALL` + backfill logic
- `"compress"` → `ScheduleOverlapPolicy.BUFFER_ONE`
- Schedule ID convention: `activity-schedule-{activity_definition.id}`
- Workflow action: start `RunActivityWorkflow` on `orchestrator-tq`
- Only create schedules for `enabled=True` + `trigger_type="cron"` definitions
2. **T23** — `src/activity_core/sync_schedules.py` (bootstrap/startup script)
- Query all enabled cron ActivityDefinitions from DB
- For each: call `upsert_schedule()`
- Delete Temporal schedules that have no matching DB row (tombstone cleanup)
- Entry point: `uv run python -m activity_core.sync_schedules`
- Also call at top of `worker.py` before entering the run loop
3. **T24** — Misfire policy in schedule config (part of T22, ensure all 3 policies tested)
- `"catchup"`: after upsert, call `schedule_handle.backfill()` for any gap since last run
4. **T25** — `tests/test_schedule_lifecycle.py`
- Uses `temporalio[testing]` — `TestWorkflowEnvironment`
- Test: create ActivityDefinition → `upsert_schedule` → schedule exists
- Test: set `enabled=False` → schedule paused
- Test: delete ActivityDefinition → `delete_schedule` → schedule gone
- Test: misfire policy round-trip (skip/catchup/compress → correct OverlapPolicy)
### Phase 5 — Event-driven triggers (after Phase 4)
Add `nats-py` (or `nats` async client) to pyproject.toml:
```
uv add nats-py
```
5. **T26** — `src/activity_core/event_router.py`
- `EventRouter` class: connects to NATS JetStream, subscribes to `activity.>` subject
- Durable consumer name: `activity-core-event-router`
- Stream name: `ACTIVITY_EVENTS`
- `async def start(nats_url, temporal_client)` — main loop
- Deserialize message payload → `EventEnvelope`
6. **T27** — Routing rules in `event_router.py`
- Query DB for all enabled `event` trigger ActivityDefinitions
- Match: `defn.trigger_config.event_type == envelope.type`
- Filter: all `defn.trigger_config.filters` key/value pairs present in `envelope.payload`
- Returns list of matching `activity_id`s
7. **T28** — Start workflow from Event Router
- For each matched activity_id: `client.start_workflow(RunActivityWorkflow, ...)`
- Workflow ID: `activity-{activity_id}:{envelope.event_id}` (idempotent)
- `id_reuse_policy=WorkflowIDReusePolicy.REJECT_DUPLICATE`
- Ack NATS message only after successful start (at-least-once)
8. **T29** — `tests/test_event_router.py`
- Integration test: publish event to NATS → assert workflow started in Temporal
- Use `nats-server` in Docker (already in compose or add service)
### Phase 6 — Observability & Admin
9. **T30** — `src/activity_core/api.py` (FastAPI)
- `GET /activity-definitions/` — list all
- `GET /activity-definitions/{id}` — get one
- `POST /activity-definitions/` — create (+ upsert_schedule if cron)
- `PUT /activity-definitions/{id}` — update (+ upsert_schedule)
- `DELETE /activity-definitions/{id}` — delete (+ delete_schedule)
- `POST /activity-definitions/{id}/trigger` — manual one-shot start
- Entry point: `uv run uvicorn activity_core.api:app --port 8010`
10. **T31** — Prometheus metrics
- Temporal SDK emits metrics via `temporalio.service.connect()` `runtime` param
- `from temporalio.runtime import Runtime, TelemetryConfig, PrometheusConfig`
- Expose on `http://0.0.0.0:9090/metrics`
- Add to `worker.py` startup
11. **T32** — Temporal search attributes
- Register `activity_id` as a custom search attribute in Temporal (type: Keyword)
- In `RunActivityWorkflow`: `workflow.upsert_search_attributes({"activity_id": [str(activity_id)]})`
- Enables filtering in Temporal Web UI: `activity_id = "uuid"`
12. **T33** — `docs/runbook.md`
- Dev startup, prod deployment, scale-out, troubleshooting common failures
## State hub workstream
- Workstream ID: `3a4f47d9-8bc1-434e-acb4-bed5d4dacda0`
- Log progress at end of session: `add_progress_event(workstream_id=...)`
- Update task statuses as each T## completes
## Key file references
| File | Purpose |
|------|---------|
| `src/activity_core/models.py` | CronTriggerConfig, EventTriggerConfig, ActivityDefinition |
| `src/activity_core/workflows.py` | RunActivityWorkflow (what schedules will trigger) |
| `src/activity_core/worker.py` | Where to call sync_schedules at startup |
| `docs/conventions.md` | Schedule/workflow ID conventions |
| `docs/idempotency.md` | Deduplication strategy |
| `e2e/tests/test_full_flow.py` | Integration test pattern to follow |