generated from coulomb/repo-seed
feat(workflows): implement RunActivityWorkflow — T18
workflows.py — RunActivityWorkflow:
1. load_activity_definition(activity_id)
2. resolve_context(context_sources)
3. evaluate_templates (pure, called in-workflow)
4. log_run({run_id, ...}) — run_id = uuid5(NAMESPACE_URL, activity_id:trigger_key)
5. start_child_workflow(TaskExecutorWorkflow, ...) per task spec
ABANDON parent-close policy (fire-and-forget)
Returns {"run_id": str, "tasks_spawned": int}
activities.py — log_run updated:
- now accepts run_id in run_payload (deterministic, passed from workflow)
- uses pg INSERT ... ON CONFLICT (run_id) DO NOTHING for idempotency
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -15,6 +15,7 @@ import uuid
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
||||
from temporalio import activity
|
||||
from temporalio.exceptions import ApplicationError
|
||||
@@ -120,7 +121,11 @@ async def resolve_context(context_sources: list[dict]) -> dict:
|
||||
async def log_run(run_payload: dict) -> str:
|
||||
"""Persist an ActivityRun record to Postgres and return its run_id.
|
||||
|
||||
Idempotent: uses INSERT … ON CONFLICT (run_id) DO NOTHING so Temporal
|
||||
activity retries do not produce duplicate rows.
|
||||
|
||||
Expected keys in run_payload:
|
||||
run_id (str UUID — computed deterministically in workflow)
|
||||
activity_id (str UUID)
|
||||
scheduled_for (ISO-8601 str or None)
|
||||
context_snapshot (dict)
|
||||
@@ -132,21 +137,28 @@ async def log_run(run_payload: dict) -> str:
|
||||
"""
|
||||
Session = _get_session_factory()
|
||||
|
||||
run_id = uuid.UUID(run_payload["run_id"])
|
||||
|
||||
scheduled_for: datetime | None = None
|
||||
if run_payload.get("scheduled_for"):
|
||||
scheduled_for = datetime.fromisoformat(run_payload["scheduled_for"])
|
||||
|
||||
row = ActivityRun(
|
||||
activity_id=uuid.UUID(run_payload["activity_id"]),
|
||||
scheduled_for=scheduled_for,
|
||||
fired_at=datetime.now(tz=timezone.utc),
|
||||
context_snapshot=run_payload["context_snapshot"],
|
||||
tasks_spawned=run_payload["tasks_spawned"],
|
||||
version_used=run_payload["version_used"],
|
||||
stmt = (
|
||||
pg_insert(ActivityRun)
|
||||
.values(
|
||||
run_id=run_id,
|
||||
activity_id=uuid.UUID(run_payload["activity_id"]),
|
||||
scheduled_for=scheduled_for,
|
||||
fired_at=datetime.now(tz=timezone.utc),
|
||||
context_snapshot=run_payload["context_snapshot"],
|
||||
tasks_spawned=run_payload["tasks_spawned"],
|
||||
version_used=run_payload["version_used"],
|
||||
)
|
||||
.on_conflict_do_nothing(index_elements=["run_id"])
|
||||
)
|
||||
|
||||
async with Session() as session:
|
||||
async with session.begin():
|
||||
session.add(row)
|
||||
await session.execute(stmt)
|
||||
|
||||
return str(row.run_id)
|
||||
return str(run_id)
|
||||
|
||||
@@ -7,16 +7,15 @@ Two workflows are registered here:
|
||||
Workflow IDs follow the conventions in docs/conventions.md:
|
||||
RunActivityWorkflow: activity-{activity_id}:{trigger_key}
|
||||
TaskExecutorWorkflow: task-{run_id}:{task_type}:{index}
|
||||
|
||||
Implementations are added in T18–T19; stubs here let the worker
|
||||
register and the type system resolve references in T14–T17.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from datetime import timedelta
|
||||
|
||||
from temporalio import workflow
|
||||
from temporalio.common import RetryPolicy
|
||||
|
||||
with workflow.unsafe.imports_passed_through():
|
||||
from activity_core.activities import (
|
||||
@@ -24,32 +23,107 @@ with workflow.unsafe.imports_passed_through():
|
||||
log_run,
|
||||
resolve_context,
|
||||
)
|
||||
from activity_core.template_engine import evaluate_templates
|
||||
|
||||
_DEFAULT_TIMEOUT = timedelta(minutes=5)
|
||||
_RETRY_POLICY = RetryPolicy(
|
||||
initial_interval=timedelta(seconds=1),
|
||||
backoff_coefficient=2.0,
|
||||
maximum_interval=timedelta(minutes=5),
|
||||
maximum_attempts=10,
|
||||
)
|
||||
|
||||
_ACTIVITY_TIMEOUT = timedelta(minutes=5)
|
||||
_TASK_QUEUE = "task-execution-tq"
|
||||
|
||||
|
||||
@workflow.defn
|
||||
class RunActivityWorkflow:
|
||||
"""Durable orchestration workflow.
|
||||
|
||||
Sequence (T18):
|
||||
1. load_activity_definition(activity_id)
|
||||
2. resolve_context(context_sources)
|
||||
3. evaluate_templates(task_templates, context) ← pure function, no activity
|
||||
4. spawn TaskExecutorWorkflow child per template result
|
||||
5. log_run(...)
|
||||
Sequence:
|
||||
1. load_activity_definition(activity_id) → defn dict
|
||||
2. resolve_context(defn.context_sources) → context snapshot
|
||||
3. evaluate_templates(templates, context) → task specs (pure, no activity)
|
||||
4. log_run(...) → run_id
|
||||
5. start_child_workflow per task spec (fire-and-forget, detached)
|
||||
"""
|
||||
|
||||
@workflow.run
|
||||
async def run(self, activity_id: str, trigger_key: str) -> dict:
|
||||
raise NotImplementedError("T18")
|
||||
async def run(
|
||||
self,
|
||||
activity_id: str,
|
||||
trigger_key: str,
|
||||
scheduled_for: str | None = None,
|
||||
) -> dict:
|
||||
"""
|
||||
Args:
|
||||
activity_id: UUID of the ActivityDefinition row.
|
||||
trigger_key: ISO-8601 datetime (cron) or event_id (event trigger).
|
||||
Used as the idempotency key component.
|
||||
scheduled_for: ISO-8601 string of the nominal scheduled time (cron only).
|
||||
|
||||
Returns:
|
||||
{"run_id": str, "tasks_spawned": int}
|
||||
"""
|
||||
# ── 1. Load definition ────────────────────────────────────────────────
|
||||
defn: dict = await workflow.execute_activity(
|
||||
load_activity_definition,
|
||||
activity_id,
|
||||
start_to_close_timeout=_ACTIVITY_TIMEOUT,
|
||||
retry_policy=_RETRY_POLICY,
|
||||
)
|
||||
|
||||
# ── 2. Resolve context ────────────────────────────────────────────────
|
||||
context_snapshot: dict = await workflow.execute_activity(
|
||||
resolve_context,
|
||||
defn["context_sources"],
|
||||
start_to_close_timeout=_ACTIVITY_TIMEOUT,
|
||||
retry_policy=_RETRY_POLICY,
|
||||
)
|
||||
|
||||
# ── 3. Evaluate templates (pure — no activity) ────────────────────────
|
||||
task_specs: list[dict] = evaluate_templates(
|
||||
defn["task_templates"], context_snapshot
|
||||
)
|
||||
|
||||
# ── 4. Log the run ────────────────────────────────────────────────────
|
||||
# run_id is derived deterministically so log_run retries are idempotent.
|
||||
run_id = str(
|
||||
uuid.uuid5(uuid.NAMESPACE_URL, f"{activity_id}:{trigger_key}")
|
||||
)
|
||||
await workflow.execute_activity(
|
||||
log_run,
|
||||
{
|
||||
"run_id": run_id,
|
||||
"activity_id": activity_id,
|
||||
"scheduled_for": scheduled_for,
|
||||
"context_snapshot": context_snapshot,
|
||||
"tasks_spawned": len(task_specs),
|
||||
"version_used": defn["version"],
|
||||
},
|
||||
start_to_close_timeout=_ACTIVITY_TIMEOUT,
|
||||
retry_policy=_RETRY_POLICY,
|
||||
)
|
||||
|
||||
# ── 5. Spawn task executor children (fire-and-forget) ─────────────────
|
||||
for index, spec in enumerate(task_specs):
|
||||
child_id = f"task-{run_id}:{spec['task_type']}:{index}"
|
||||
await workflow.start_child_workflow(
|
||||
TaskExecutorWorkflow,
|
||||
args=[run_id, spec["task_type"], spec["params"]],
|
||||
id=child_id,
|
||||
task_queue=_TASK_QUEUE,
|
||||
parent_close_policy=workflow.ParentClosePolicy.ABANDON,
|
||||
)
|
||||
|
||||
return {"run_id": run_id, "tasks_spawned": len(task_specs)}
|
||||
|
||||
|
||||
@workflow.defn
|
||||
class TaskExecutorWorkflow:
|
||||
"""Child workflow that executes one concrete task instance.
|
||||
|
||||
Stub implementation in T19.
|
||||
Stub implementation — T19.
|
||||
"""
|
||||
|
||||
@workflow.run
|
||||
|
||||
Reference in New Issue
Block a user