Add project scaffold: contracts, schemas, docker-compose, workplans

Phase 0 contracts (event envelope, ActivityDefinition, idempotency doc,
naming conventions) and Phase 1 Temporal cluster setup (docker-compose.dev.yml,
Temporal dynamic config) are complete. Includes Pydantic models, JSON schemas,
wiki architecture docs, and ADR-001 workplan files for both workstreams.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-04 22:45:40 +01:00
parent 098c13febd
commit 6f9132314f
16 changed files with 2652 additions and 0 deletions

View File

@@ -0,0 +1,203 @@
# This is a protoplan to be compared with others and then compiled into an actionalble plan.
**Comparison of my initial blueprint with your provided research**
Your research description (Activity = event-driven “task factory” with trigger + context resolver + task templates) maps **almost 1:1** to the architecture I outlined earlier.
Key alignments:
- **Core objects** (ActivityDefinition, TriggerEvent, TaskInstance) = my Activity Registry + internal TriggerEvent + spawned tasks.
- **Services** (Triggering layer → Activity Orchestrator → Task execution layer + Storage) = my Trigger Layer + Activity Processor + Task Queue/Workers + Storage.
- **Data flow, non-negotiables** (idempotency via `activity_id + scheduled_for`, misfire policy, versioning, timezone, observability) and the three implementation levels are identical.
- **Suggested reference architectures** Option A (Celery Beat etc.) vs Option B (Temporal) directly match my “lightweight” vs “workflow-engine” paths.
**Temporal self-hosted is the clear winner** for a production-grade backbone. It implements **Level 3 / Option B** natively and “for free”:
- Durable workflows = your Activity Orchestrator (state, retries, history survive restarts/crashes).
- Temporal **Schedules** = superior time-based triggering (built-in jitter, backfill/misfire handling, pause/resume, overlap policies, timezone support).
- Signals + Client.start_workflow = external event triggering with built-in deduplication.
- Workflow ID uniqueness + deterministic execution = perfect idempotency and versioning.
- Temporal UI + metrics + visibility = observability out of the box.
- No need for separate Celery/Quartz/APScheduler/Beat — one system handles everything reliably at scale.
Your researchs “practical next steps” are exactly what we will execute below, but powered by Temporal instead of lighter schedulers.
### Consolidated Architecture (research objects mapped to Temporal)
| Research Concept | Temporal Implementation | Notes |
|---------------------------|------------------------------------------------------|-------|
| ActivityDefinition | Your app DB (Postgres etc.) + versioned record | Workflow loads it via Activity (safe side-effect) |
| TriggerEvent | Temporal Schedule (time) or Client.start_workflow/signal (events) | Schedule ID = activity_id |
| Activity Orchestrator | Durable Workflow (RunActivityWorkflow) | Queries context, evaluates rules, spawns tasks |
| TaskInstance | Child Workflow or Activity call (parametrized) | Full history, retries, observability |
| Storage (registry, run log, tasks) | Your app DB + Temporal history (automatic) | Temporal stores every run/context snapshot |
| Idempotency / Misfire / Versioning | Workflow ID + Schedule policies + patching | All built-in |
| Observability | Temporal Web UI + Prometheus + your run-log table | Zero extra work |
**High-level flow**
External event or Schedule fires → Temporal starts/runs Orchestrator Workflow → Workflow loads ActivityDefinition → Activity queries “current situation” → Workflow spawns 0..N parametrized child workflows/activities → Everything durable + auditable.
### Consolidated Workplan: Set Up the Backbone Service (Self-Hosted Temporal)
**Goal**: A robust, event-driven “task factory” service that you can run today and scale forever without rewriting.
#### Phase 0: Prerequisites (12 hours)
- Docker + Docker Compose (for dev) or Kubernetes (for prod).
- PostgreSQL (or MySQL/Cassandra) for Temporal persistence + one for your app data (ActivityDefinitions, tasks, run logs).
- Choose SDK: Ill use **Python** below (mature, matches your research examples). Other SDKs (Go, TypeScript, Java, .NET) follow the same patterns — tell me if you prefer another.
#### Phase 1: Deploy Self-Hosted Temporal Cluster (3060 min)
**Dev / Quickstart** (recommended first):
```bash
git clone https://github.com/temporalio/docker-compose.git
cd docker-compose
docker compose up -d
```
- Temporal gRPC: `127.0.0.1:7233`
- Temporal Web UI: `http://127.0.0.1:8080` (inspect workflows, schedules, history)
- Default namespace: `default` (create more if needed).
**Production / Kubernetes**:
Use official Helm charts: https://github.com/temporalio/helm-charts
Follow the production checklist (security, visibility with Elasticsearch, monitoring, encryption, backups).
Connect your workers/clients to the cluster. The same code runs unchanged on Temporal Cloud later if you ever migrate.
#### Phase 2: Define Core Domain Model (your app DB)
Create these tables (Postgres example):
```sql
CREATE TABLE activity_definitions (
id UUID PRIMARY KEY,
name TEXT,
enabled BOOLEAN DEFAULT true,
trigger_type TEXT, -- 'cron' | 'event'
trigger_config JSONB, -- cron expr, interval, timezone, jitter, misfire_policy
context_sources JSONB,
task_templates JSONB[],
dedupe_key_strategy TEXT,
version INT
);
CREATE TABLE activity_runs (
run_id UUID PRIMARY KEY,
activity_id UUID,
scheduled_for TIMESTAMPTZ,
fired_at TIMESTAMPTZ,
context_snapshot JSONB,
tasks_spawned INT,
version_used INT
);
```
(Plus a tasks table if you want human-facing tasks.)
#### Phase 3: Implement the Orchestrator Workflow (the “task factory”)
This is your Activity Processor — durable by design.
**Python skeleton** (in `workflows.py`):
```python
from temporalio import workflow
from temporalio.exceptions import ApplicationError
import uuid
from datetime import timedelta
@workflow.defn
class RunActivityWorkflow:
@workflow.run
async def run(self, activity_id: str, trigger_event: dict):
# Step 1: Load definition (via Activity — safe DB call)
activity_def = await workflow.execute_activity(
load_activity_definition,
activity_id,
start_to_close_timeout=timedelta(seconds=10)
)
# Step 2: Resolve current situation (context)
context = await workflow.execute_activity(
resolve_context,
(activity_def["context_sources"], trigger_event),
start_to_close_timeout=timedelta(seconds=30)
)
# Step 3: Evaluate rules & instantiate tasks
task_instances = evaluate_templates(activity_def["task_templates"], context) # pure Python
# Step 4: Spawn parametrized tasks (child workflows or activities)
for task in task_instances:
await workflow.start_child_workflow(
task["type"], # e.g. "SendEmailWorkflow" or generic TaskExecutor
task["params"],
id=f"task-{uuid.uuid4()}",
task_queue="task-execution-queue"
)
# Step 5: Log run (via Activity)
await workflow.execute_activity(
log_run,
{"activity_id": activity_id, "context": context, "tasks_spawned": len(task_instances)},
start_to_close_timeout=timedelta(seconds=5)
)
```
Implement the Activities (`activities.py`) for DB access, context resolution, etc.
#### Phase 4: Set Up Time-Based Triggers (Temporal Schedules)
Register a Schedule per enabled ActivityDefinition (run once at startup or on definition change).
**Python example** (using Temporal Client):
```python
await client.create_schedule(
schedule_id=f"activity-schedule-{activity_id}",
schedule=Schedule(
action=ScheduleActionStartWorkflow(
RunActivityWorkflow.run,
args=[activity_id, {"scheduled_for": datetime.now()}],
id=f"wf-{activity_id}-{scheduled_for_iso}",
task_queue="orchestrator-queue"
),
spec=ScheduleSpec(
cron_expressions=[activity_def["trigger_config"]["cron"]],
timezone=activity_def["trigger_config"]["timezone"],
jitter=timedelta(minutes=1) if needed,
),
state=ScheduleState(paused=not activity_def["enabled"])
)
)
```
- Misfire/backfill: Use `handle.backfill(...)`
- Pause/resume: `handle.pause()` / `handle.unpause()`
- Manual trigger: `handle.trigger()`
#### Phase 5: Event-Driven Triggers (external events)
Build a tiny “Event Ingress” service (microservice or even a Temporal Activity):
- On incoming event → compute unique workflow ID (`f"activity-{activity_id}-{event_key}"`)
- `client.start_workflow(RunActivityWorkflow.run, ... , id=unique_id)`
→ Temporal guarantees exactly-once execution.
#### Phase 6: Task Execution Layer
- Automatable tasks → Temporal Activities or child workflows (retries, heartbeats, timeouts built-in).
- Human tasks → Persist to your DB + notify UI (or use Temporal for human-in-the-loop workflows).
#### Phase 7: Observability, Admin & Production Hardening
- Use Temporal Web UI (search runs by activity_id, view context snapshots, replay history).
- Add Prometheus metrics + your run-log table.
- Admin API: CRUD ActivityDefinitions → recreate/update Schedules.
- Idempotency, versioning, rate limits: all handled by Temporal + your dedupe strategy.
- Monitoring: Temporal production checklist (visibility, encryption, backups).
### Timeline & Effort Estimate
- Day 1: Deploy cluster + basic Workflow (Phase 13) → you can already trigger manually.
- Day 2: Schedules + event ingress (Phase 45).
- Day 3: Task spawning + observability (Phase 67).
- Week 2: Production hardening, testing misfires, scaling workers.
This gives you a **single, durable, observable backbone** that replaces every scheduler/queue you were considering and satisfies every non-negotiable in your research.
**Next step for you**: Tell me
1. Your preferred language (Python/Go/TypeScript/Java/.NET)?
2. Are you on Kubernetes already (Helm) or prefer plain Docker?
3. Do you want the full minimal repo structure + DB migration scripts + complete working example (I can provide a GitHub-ready skeleton next)?
Ill deliver the exact files/commands you can run today. This is the robust event-driven architecture you asked for — built once, reliable forever. Ready when you are!