Phase 0 contracts (event envelope, ActivityDefinition, idempotency doc, naming conventions) and Phase 1 Temporal cluster setup (docker-compose.dev.yml, Temporal dynamic config) are complete. Includes Pydantic models, JSON schemas, wiki architecture docs, and ADR-001 workplan files for both workstreams. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
10 KiB
This is a protoplan to be compared with others and then compiled into an actionalble plan.
Comparison of my initial blueprint with your provided research
Your research description (Activity = event-driven “task factory” with trigger + context resolver + task templates) maps almost 1:1 to the architecture I outlined earlier.
Key alignments:
- Core objects (ActivityDefinition, TriggerEvent, TaskInstance) = my Activity Registry + internal TriggerEvent + spawned tasks.
- Services (Triggering layer → Activity Orchestrator → Task execution layer + Storage) = my Trigger Layer + Activity Processor + Task Queue/Workers + Storage.
- Data flow, non-negotiables (idempotency via
activity_id + scheduled_for, misfire policy, versioning, timezone, observability) and the three implementation levels are identical. - Suggested reference architectures Option A (Celery Beat etc.) vs Option B (Temporal) directly match my “lightweight” vs “workflow-engine” paths.
Temporal self-hosted is the clear winner for a production-grade backbone. It implements Level 3 / Option B natively and “for free”:
- Durable workflows = your Activity Orchestrator (state, retries, history survive restarts/crashes).
- Temporal Schedules = superior time-based triggering (built-in jitter, backfill/misfire handling, pause/resume, overlap policies, timezone support).
- Signals + Client.start_workflow = external event triggering with built-in deduplication.
- Workflow ID uniqueness + deterministic execution = perfect idempotency and versioning.
- Temporal UI + metrics + visibility = observability out of the box.
- No need for separate Celery/Quartz/APScheduler/Beat — one system handles everything reliably at scale.
Your research’s “practical next steps” are exactly what we will execute below, but powered by Temporal instead of lighter schedulers.
Consolidated Architecture (research objects mapped to Temporal)
| Research Concept | Temporal Implementation | Notes |
|---|---|---|
| ActivityDefinition | Your app DB (Postgres etc.) + versioned record | Workflow loads it via Activity (safe side-effect) |
| TriggerEvent | Temporal Schedule (time) or Client.start_workflow/signal (events) | Schedule ID = activity_id |
| Activity Orchestrator | Durable Workflow (RunActivityWorkflow) | Queries context, evaluates rules, spawns tasks |
| TaskInstance | Child Workflow or Activity call (parametrized) | Full history, retries, observability |
| Storage (registry, run log, tasks) | Your app DB + Temporal history (automatic) | Temporal stores every run/context snapshot |
| Idempotency / Misfire / Versioning | Workflow ID + Schedule policies + patching | All built-in |
| Observability | Temporal Web UI + Prometheus + your run-log table | Zero extra work |
High-level flow
External event or Schedule fires → Temporal starts/runs Orchestrator Workflow → Workflow loads ActivityDefinition → Activity queries “current situation” → Workflow spawns 0..N parametrized child workflows/activities → Everything durable + auditable.
Consolidated Workplan: Set Up the Backbone Service (Self-Hosted Temporal)
Goal: A robust, event-driven “task factory” service that you can run today and scale forever without rewriting.
Phase 0: Prerequisites (1–2 hours)
- Docker + Docker Compose (for dev) or Kubernetes (for prod).
- PostgreSQL (or MySQL/Cassandra) for Temporal persistence + one for your app data (ActivityDefinitions, tasks, run logs).
- Choose SDK: I’ll use Python below (mature, matches your research examples). Other SDKs (Go, TypeScript, Java, .NET) follow the same patterns — tell me if you prefer another.
Phase 1: Deploy Self-Hosted Temporal Cluster (30–60 min)
Dev / Quickstart (recommended first):
git clone https://github.com/temporalio/docker-compose.git
cd docker-compose
docker compose up -d
- Temporal gRPC:
127.0.0.1:7233 - Temporal Web UI:
http://127.0.0.1:8080(inspect workflows, schedules, history) - Default namespace:
default(create more if needed).
Production / Kubernetes:
Use official Helm charts: https://github.com/temporalio/helm-charts
Follow the production checklist (security, visibility with Elasticsearch, monitoring, encryption, backups).
Connect your workers/clients to the cluster. The same code runs unchanged on Temporal Cloud later if you ever migrate.
Phase 2: Define Core Domain Model (your app DB)
Create these tables (Postgres example):
CREATE TABLE activity_definitions (
id UUID PRIMARY KEY,
name TEXT,
enabled BOOLEAN DEFAULT true,
trigger_type TEXT, -- 'cron' | 'event'
trigger_config JSONB, -- cron expr, interval, timezone, jitter, misfire_policy
context_sources JSONB,
task_templates JSONB[],
dedupe_key_strategy TEXT,
version INT
);
CREATE TABLE activity_runs (
run_id UUID PRIMARY KEY,
activity_id UUID,
scheduled_for TIMESTAMPTZ,
fired_at TIMESTAMPTZ,
context_snapshot JSONB,
tasks_spawned INT,
version_used INT
);
(Plus a tasks table if you want human-facing tasks.)
Phase 3: Implement the Orchestrator Workflow (the “task factory”)
This is your Activity Processor — durable by design.
Python skeleton (in workflows.py):
from temporalio import workflow
from temporalio.exceptions import ApplicationError
import uuid
from datetime import timedelta
@workflow.defn
class RunActivityWorkflow:
@workflow.run
async def run(self, activity_id: str, trigger_event: dict):
# Step 1: Load definition (via Activity — safe DB call)
activity_def = await workflow.execute_activity(
load_activity_definition,
activity_id,
start_to_close_timeout=timedelta(seconds=10)
)
# Step 2: Resolve current situation (context)
context = await workflow.execute_activity(
resolve_context,
(activity_def["context_sources"], trigger_event),
start_to_close_timeout=timedelta(seconds=30)
)
# Step 3: Evaluate rules & instantiate tasks
task_instances = evaluate_templates(activity_def["task_templates"], context) # pure Python
# Step 4: Spawn parametrized tasks (child workflows or activities)
for task in task_instances:
await workflow.start_child_workflow(
task["type"], # e.g. "SendEmailWorkflow" or generic TaskExecutor
task["params"],
id=f"task-{uuid.uuid4()}",
task_queue="task-execution-queue"
)
# Step 5: Log run (via Activity)
await workflow.execute_activity(
log_run,
{"activity_id": activity_id, "context": context, "tasks_spawned": len(task_instances)},
start_to_close_timeout=timedelta(seconds=5)
)
Implement the Activities (activities.py) for DB access, context resolution, etc.
Phase 4: Set Up Time-Based Triggers (Temporal Schedules)
Register a Schedule per enabled ActivityDefinition (run once at startup or on definition change).
Python example (using Temporal Client):
await client.create_schedule(
schedule_id=f"activity-schedule-{activity_id}",
schedule=Schedule(
action=ScheduleActionStartWorkflow(
RunActivityWorkflow.run,
args=[activity_id, {"scheduled_for": datetime.now()}],
id=f"wf-{activity_id}-{scheduled_for_iso}",
task_queue="orchestrator-queue"
),
spec=ScheduleSpec(
cron_expressions=[activity_def["trigger_config"]["cron"]],
timezone=activity_def["trigger_config"]["timezone"],
jitter=timedelta(minutes=1) if needed,
),
state=ScheduleState(paused=not activity_def["enabled"])
)
)
- Misfire/backfill: Use
handle.backfill(...) - Pause/resume:
handle.pause()/handle.unpause() - Manual trigger:
handle.trigger()
Phase 5: Event-Driven Triggers (external events)
Build a tiny “Event Ingress” service (microservice or even a Temporal Activity):
- On incoming event → compute unique workflow ID (
f"activity-{activity_id}-{event_key}") client.start_workflow(RunActivityWorkflow.run, ... , id=unique_id)
→ Temporal guarantees exactly-once execution.
Phase 6: Task Execution Layer
- Automatable tasks → Temporal Activities or child workflows (retries, heartbeats, timeouts built-in).
- Human tasks → Persist to your DB + notify UI (or use Temporal for human-in-the-loop workflows).
Phase 7: Observability, Admin & Production Hardening
- Use Temporal Web UI (search runs by activity_id, view context snapshots, replay history).
- Add Prometheus metrics + your run-log table.
- Admin API: CRUD ActivityDefinitions → recreate/update Schedules.
- Idempotency, versioning, rate limits: all handled by Temporal + your dedupe strategy.
- Monitoring: Temporal production checklist (visibility, encryption, backups).
Timeline & Effort Estimate
- Day 1: Deploy cluster + basic Workflow (Phase 1–3) → you can already trigger manually.
- Day 2: Schedules + event ingress (Phase 4–5).
- Day 3: Task spawning + observability (Phase 6–7).
- Week 2: Production hardening, testing misfires, scaling workers.
This gives you a single, durable, observable backbone that replaces every scheduler/queue you were considering and satisfies every non-negotiable in your research.
Next step for you: Tell me
- Your preferred language (Python/Go/TypeScript/Java/.NET)?
- Are you on Kubernetes already (Helm) or prefer plain Docker?
- Do you want the full minimal repo structure + DB migration scripts + complete working example (I can provide a GitHub-ready skeleton next)?
I’ll deliver the exact files/commands you can run today. This is the robust event-driven architecture you asked for — built once, reliable forever. Ready when you are!