generated from coulomb/repo-seed
All 12 tasks unblocked (broker decision resolved: NATS + JetStream). Work interrupted on workstation due to WSL2 Docker pull issues. Note captures build order, file names, key design decisions, and state hub IDs for seamless pickup on CoulombCore. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
5.6 KiB
5.6 KiB
type, created, status
| type | created | status |
|---|---|---|
| session-note | 2026-03-28 | handoff |
WP-0002 Handoff Note — Continue on CoulombCore
Context
Implementing custodian-WP-0002 (Triggers & Ops). Work interrupted on workstation due to WSL2 Docker image pull issues. Continue on CoulombCore where Docker runs natively.
What was confirmed before handoff
- All 12 tasks are unblocked — broker decision
bc47c9c2is resolved: NATS + JetStream - Dev stack (Temporal + PostgreSQL + Elasticsearch) needs to be started:
cd ~/activity-core docker compose -f docker-compose.dev.yml up -d uv run alembic upgrade head uv run python src/activity_core/seed.py
Build order
Phase 4 — Time-based triggers (start here)
-
T22 —
src/activity_core/schedule_manager.pyupsert_schedule(client, defn)— create/update Temporal Schedule from ActivityDefinitiondelete_schedule(client, activity_id)— remove schedulelist_schedules(client)— enumerate active schedules- Map
CronTriggerConfig.misfire_policy→ScheduleOverlapPolicy:"skip"→ScheduleOverlapPolicy.SKIP"catchup"→ScheduleOverlapPolicy.BUFFER_ALL+ backfill logic"compress"→ScheduleOverlapPolicy.BUFFER_ONE
- Schedule ID convention:
activity-schedule-{activity_definition.id} - Workflow action: start
RunActivityWorkflowonorchestrator-tq - Only create schedules for
enabled=True+trigger_type="cron"definitions
-
T23 —
src/activity_core/sync_schedules.py(bootstrap/startup script)- Query all enabled cron ActivityDefinitions from DB
- For each: call
upsert_schedule() - Delete Temporal schedules that have no matching DB row (tombstone cleanup)
- Entry point:
uv run python -m activity_core.sync_schedules - Also call at top of
worker.pybefore entering the run loop
-
T24 — Misfire policy in schedule config (part of T22, ensure all 3 policies tested)
"catchup": after upsert, callschedule_handle.backfill()for any gap since last run
-
T25 —
tests/test_schedule_lifecycle.py- Uses
temporalio[testing]—TestWorkflowEnvironment - Test: create ActivityDefinition →
upsert_schedule→ schedule exists - Test: set
enabled=False→ schedule paused - Test: delete ActivityDefinition →
delete_schedule→ schedule gone - Test: misfire policy round-trip (skip/catchup/compress → correct OverlapPolicy)
- Uses
Phase 5 — Event-driven triggers (after Phase 4)
Add nats-py (or nats async client) to pyproject.toml:
uv add nats-py
-
T26 —
src/activity_core/event_router.pyEventRouterclass: connects to NATS JetStream, subscribes toactivity.>subject- Durable consumer name:
activity-core-event-router - Stream name:
ACTIVITY_EVENTS async def start(nats_url, temporal_client)— main loop- Deserialize message payload →
EventEnvelope
-
T27 — Routing rules in
event_router.py- Query DB for all enabled
eventtrigger ActivityDefinitions - Match:
defn.trigger_config.event_type == envelope.type - Filter: all
defn.trigger_config.filterskey/value pairs present inenvelope.payload - Returns list of matching
activity_ids
- Query DB for all enabled
-
T28 — Start workflow from Event Router
- For each matched activity_id:
client.start_workflow(RunActivityWorkflow, ...) - Workflow ID:
activity-{activity_id}:{envelope.event_id}(idempotent) id_reuse_policy=WorkflowIDReusePolicy.REJECT_DUPLICATE- Ack NATS message only after successful start (at-least-once)
- For each matched activity_id:
-
T29 —
tests/test_event_router.py- Integration test: publish event to NATS → assert workflow started in Temporal
- Use
nats-serverin Docker (already in compose or add service)
Phase 6 — Observability & Admin
-
T30 —
src/activity_core/api.py(FastAPI)GET /activity-definitions/— list allGET /activity-definitions/{id}— get onePOST /activity-definitions/— create (+ upsert_schedule if cron)PUT /activity-definitions/{id}— update (+ upsert_schedule)DELETE /activity-definitions/{id}— delete (+ delete_schedule)POST /activity-definitions/{id}/trigger— manual one-shot start- Entry point:
uv run uvicorn activity_core.api:app --port 8010
-
T31 — Prometheus metrics
- Temporal SDK emits metrics via
temporalio.service.connect()runtimeparam from temporalio.runtime import Runtime, TelemetryConfig, PrometheusConfig- Expose on
http://0.0.0.0:9090/metrics - Add to
worker.pystartup
- Temporal SDK emits metrics via
-
T32 — Temporal search attributes
- Register
activity_idas a custom search attribute in Temporal (type: Keyword) - In
RunActivityWorkflow:workflow.upsert_search_attributes({"activity_id": [str(activity_id)]}) - Enables filtering in Temporal Web UI:
activity_id = "uuid"
- Register
-
T33 —
docs/runbook.md- Dev startup, prod deployment, scale-out, troubleshooting common failures
State hub workstream
- Workstream ID:
3a4f47d9-8bc1-434e-acb4-bed5d4dacda0 - Log progress at end of session:
add_progress_event(workstream_id=...) - Update task statuses as each T## completes
Key file references
| File | Purpose |
|---|---|
src/activity_core/models.py |
CronTriggerConfig, EventTriggerConfig, ActivityDefinition |
src/activity_core/workflows.py |
RunActivityWorkflow (what schedules will trigger) |
src/activity_core/worker.py |
Where to call sync_schedules at startup |
docs/conventions.md |
Schedule/workflow ID conventions |
docs/idempotency.md |
Deduplication strategy |
e2e/tests/test_full_flow.py |
Integration test pattern to follow |