Files
activity-core/workplans/WP-0002-next-steps.md
tegwick 4457d6d6b9 chore: add WP-0002 handoff note for CoulombCore continuation
All 12 tasks unblocked (broker decision resolved: NATS + JetStream).
Work interrupted on workstation due to WSL2 Docker pull issues.
Note captures build order, file names, key design decisions, and
state hub IDs for seamless pickup on CoulombCore.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-28 00:10:33 +01:00

5.6 KiB

type, created, status
type created status
session-note 2026-03-28 handoff

WP-0002 Handoff Note — Continue on CoulombCore

Context

Implementing custodian-WP-0002 (Triggers & Ops). Work interrupted on workstation due to WSL2 Docker image pull issues. Continue on CoulombCore where Docker runs natively.

What was confirmed before handoff

  • All 12 tasks are unblocked — broker decision bc47c9c2 is resolved: NATS + JetStream
  • Dev stack (Temporal + PostgreSQL + Elasticsearch) needs to be started:
    cd ~/activity-core
    docker compose -f docker-compose.dev.yml up -d
    uv run alembic upgrade head
    uv run python src/activity_core/seed.py
    

Build order

Phase 4 — Time-based triggers (start here)

  1. T22src/activity_core/schedule_manager.py

    • upsert_schedule(client, defn) — create/update Temporal Schedule from ActivityDefinition
    • delete_schedule(client, activity_id) — remove schedule
    • list_schedules(client) — enumerate active schedules
    • Map CronTriggerConfig.misfire_policyScheduleOverlapPolicy:
      • "skip"ScheduleOverlapPolicy.SKIP
      • "catchup"ScheduleOverlapPolicy.BUFFER_ALL + backfill logic
      • "compress"ScheduleOverlapPolicy.BUFFER_ONE
    • Schedule ID convention: activity-schedule-{activity_definition.id}
    • Workflow action: start RunActivityWorkflow on orchestrator-tq
    • Only create schedules for enabled=True + trigger_type="cron" definitions
  2. T23src/activity_core/sync_schedules.py (bootstrap/startup script)

    • Query all enabled cron ActivityDefinitions from DB
    • For each: call upsert_schedule()
    • Delete Temporal schedules that have no matching DB row (tombstone cleanup)
    • Entry point: uv run python -m activity_core.sync_schedules
    • Also call at top of worker.py before entering the run loop
  3. T24 — Misfire policy in schedule config (part of T22, ensure all 3 policies tested)

    • "catchup": after upsert, call schedule_handle.backfill() for any gap since last run
  4. T25tests/test_schedule_lifecycle.py

    • Uses temporalio[testing]TestWorkflowEnvironment
    • Test: create ActivityDefinition → upsert_schedule → schedule exists
    • Test: set enabled=False → schedule paused
    • Test: delete ActivityDefinition → delete_schedule → schedule gone
    • Test: misfire policy round-trip (skip/catchup/compress → correct OverlapPolicy)

Phase 5 — Event-driven triggers (after Phase 4)

Add nats-py (or nats async client) to pyproject.toml:

uv add nats-py
  1. T26src/activity_core/event_router.py

    • EventRouter class: connects to NATS JetStream, subscribes to activity.> subject
    • Durable consumer name: activity-core-event-router
    • Stream name: ACTIVITY_EVENTS
    • async def start(nats_url, temporal_client) — main loop
    • Deserialize message payload → EventEnvelope
  2. T27 — Routing rules in event_router.py

    • Query DB for all enabled event trigger ActivityDefinitions
    • Match: defn.trigger_config.event_type == envelope.type
    • Filter: all defn.trigger_config.filters key/value pairs present in envelope.payload
    • Returns list of matching activity_ids
  3. T28 — Start workflow from Event Router

    • For each matched activity_id: client.start_workflow(RunActivityWorkflow, ...)
    • Workflow ID: activity-{activity_id}:{envelope.event_id} (idempotent)
    • id_reuse_policy=WorkflowIDReusePolicy.REJECT_DUPLICATE
    • Ack NATS message only after successful start (at-least-once)
  4. T29tests/test_event_router.py

    • Integration test: publish event to NATS → assert workflow started in Temporal
    • Use nats-server in Docker (already in compose or add service)

Phase 6 — Observability & Admin

  1. T30src/activity_core/api.py (FastAPI)

    • GET /activity-definitions/ — list all
    • GET /activity-definitions/{id} — get one
    • POST /activity-definitions/ — create (+ upsert_schedule if cron)
    • PUT /activity-definitions/{id} — update (+ upsert_schedule)
    • DELETE /activity-definitions/{id} — delete (+ delete_schedule)
    • POST /activity-definitions/{id}/trigger — manual one-shot start
    • Entry point: uv run uvicorn activity_core.api:app --port 8010
  2. T31 — Prometheus metrics

    • Temporal SDK emits metrics via temporalio.service.connect() runtime param
    • from temporalio.runtime import Runtime, TelemetryConfig, PrometheusConfig
    • Expose on http://0.0.0.0:9090/metrics
    • Add to worker.py startup
  3. T32 — Temporal search attributes

    • Register activity_id as a custom search attribute in Temporal (type: Keyword)
    • In RunActivityWorkflow: workflow.upsert_search_attributes({"activity_id": [str(activity_id)]})
    • Enables filtering in Temporal Web UI: activity_id = "uuid"
  4. T33docs/runbook.md

    • Dev startup, prod deployment, scale-out, troubleshooting common failures

State hub workstream

  • Workstream ID: 3a4f47d9-8bc1-434e-acb4-bed5d4dacda0
  • Log progress at end of session: add_progress_event(workstream_id=...)
  • Update task statuses as each T## completes

Key file references

File Purpose
src/activity_core/models.py CronTriggerConfig, EventTriggerConfig, ActivityDefinition
src/activity_core/workflows.py RunActivityWorkflow (what schedules will trigger)
src/activity_core/worker.py Where to call sync_schedules at startup
docs/conventions.md Schedule/workflow ID conventions
docs/idempotency.md Deduplication strategy
e2e/tests/test_full_flow.py Integration test pattern to follow