From da7de6ea3bb7ca6b110c722ff3da1178ef65597d Mon Sep 17 00:00:00 2001 From: Bernd Worsch Date: Thu, 26 Mar 2026 22:25:19 +0000 Subject: [PATCH] =?UTF-8?q?feat(workflows):=20implement=20RunActivityWorkf?= =?UTF-8?q?low=20=E2=80=94=20T18?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit workflows.py — RunActivityWorkflow: 1. load_activity_definition(activity_id) 2. resolve_context(context_sources) 3. evaluate_templates (pure, called in-workflow) 4. log_run({run_id, ...}) — run_id = uuid5(NAMESPACE_URL, activity_id:trigger_key) 5. start_child_workflow(TaskExecutorWorkflow, ...) per task spec ABANDON parent-close policy (fire-and-forget) Returns {"run_id": str, "tasks_spawned": int} activities.py — log_run updated: - now accepts run_id in run_payload (deterministic, passed from workflow) - uses pg INSERT ... ON CONFLICT (run_id) DO NOTHING for idempotency Co-Authored-By: Claude Sonnet 4.6 --- src/activity_core/activities.py | 30 ++++-- src/activity_core/workflows.py | 100 +++++++++++++++--- .../custodian-WP-0001-temporal-backbone.md | 2 +- 3 files changed, 109 insertions(+), 23 deletions(-) diff --git a/src/activity_core/activities.py b/src/activity_core/activities.py index 9de8d41..a961e0d 100644 --- a/src/activity_core/activities.py +++ b/src/activity_core/activities.py @@ -15,6 +15,7 @@ import uuid from datetime import datetime, timezone from sqlalchemy import select +from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from temporalio import activity from temporalio.exceptions import ApplicationError @@ -120,7 +121,11 @@ async def resolve_context(context_sources: list[dict]) -> dict: async def log_run(run_payload: dict) -> str: """Persist an ActivityRun record to Postgres and return its run_id. + Idempotent: uses INSERT … ON CONFLICT (run_id) DO NOTHING so Temporal + activity retries do not produce duplicate rows. + Expected keys in run_payload: + run_id (str UUID — computed deterministically in workflow) activity_id (str UUID) scheduled_for (ISO-8601 str or None) context_snapshot (dict) @@ -132,21 +137,28 @@ async def log_run(run_payload: dict) -> str: """ Session = _get_session_factory() + run_id = uuid.UUID(run_payload["run_id"]) + scheduled_for: datetime | None = None if run_payload.get("scheduled_for"): scheduled_for = datetime.fromisoformat(run_payload["scheduled_for"]) - row = ActivityRun( - activity_id=uuid.UUID(run_payload["activity_id"]), - scheduled_for=scheduled_for, - fired_at=datetime.now(tz=timezone.utc), - context_snapshot=run_payload["context_snapshot"], - tasks_spawned=run_payload["tasks_spawned"], - version_used=run_payload["version_used"], + stmt = ( + pg_insert(ActivityRun) + .values( + run_id=run_id, + activity_id=uuid.UUID(run_payload["activity_id"]), + scheduled_for=scheduled_for, + fired_at=datetime.now(tz=timezone.utc), + context_snapshot=run_payload["context_snapshot"], + tasks_spawned=run_payload["tasks_spawned"], + version_used=run_payload["version_used"], + ) + .on_conflict_do_nothing(index_elements=["run_id"]) ) async with Session() as session: async with session.begin(): - session.add(row) + await session.execute(stmt) - return str(row.run_id) + return str(run_id) diff --git a/src/activity_core/workflows.py b/src/activity_core/workflows.py index e3fd65e..1205664 100644 --- a/src/activity_core/workflows.py +++ b/src/activity_core/workflows.py @@ -7,16 +7,15 @@ Two workflows are registered here: Workflow IDs follow the conventions in docs/conventions.md: RunActivityWorkflow: activity-{activity_id}:{trigger_key} TaskExecutorWorkflow: task-{run_id}:{task_type}:{index} - -Implementations are added in T18–T19; stubs here let the worker -register and the type system resolve references in T14–T17. """ from __future__ import annotations +import uuid from datetime import timedelta from temporalio import workflow +from temporalio.common import RetryPolicy with workflow.unsafe.imports_passed_through(): from activity_core.activities import ( @@ -24,32 +23,107 @@ with workflow.unsafe.imports_passed_through(): log_run, resolve_context, ) + from activity_core.template_engine import evaluate_templates -_DEFAULT_TIMEOUT = timedelta(minutes=5) +_RETRY_POLICY = RetryPolicy( + initial_interval=timedelta(seconds=1), + backoff_coefficient=2.0, + maximum_interval=timedelta(minutes=5), + maximum_attempts=10, +) + +_ACTIVITY_TIMEOUT = timedelta(minutes=5) +_TASK_QUEUE = "task-execution-tq" @workflow.defn class RunActivityWorkflow: """Durable orchestration workflow. - Sequence (T18): - 1. load_activity_definition(activity_id) - 2. resolve_context(context_sources) - 3. evaluate_templates(task_templates, context) ← pure function, no activity - 4. spawn TaskExecutorWorkflow child per template result - 5. log_run(...) + Sequence: + 1. load_activity_definition(activity_id) → defn dict + 2. resolve_context(defn.context_sources) → context snapshot + 3. evaluate_templates(templates, context) → task specs (pure, no activity) + 4. log_run(...) → run_id + 5. start_child_workflow per task spec (fire-and-forget, detached) """ @workflow.run - async def run(self, activity_id: str, trigger_key: str) -> dict: - raise NotImplementedError("T18") + async def run( + self, + activity_id: str, + trigger_key: str, + scheduled_for: str | None = None, + ) -> dict: + """ + Args: + activity_id: UUID of the ActivityDefinition row. + trigger_key: ISO-8601 datetime (cron) or event_id (event trigger). + Used as the idempotency key component. + scheduled_for: ISO-8601 string of the nominal scheduled time (cron only). + + Returns: + {"run_id": str, "tasks_spawned": int} + """ + # ── 1. Load definition ──────────────────────────────────────────────── + defn: dict = await workflow.execute_activity( + load_activity_definition, + activity_id, + start_to_close_timeout=_ACTIVITY_TIMEOUT, + retry_policy=_RETRY_POLICY, + ) + + # ── 2. Resolve context ──────────────────────────────────────────────── + context_snapshot: dict = await workflow.execute_activity( + resolve_context, + defn["context_sources"], + start_to_close_timeout=_ACTIVITY_TIMEOUT, + retry_policy=_RETRY_POLICY, + ) + + # ── 3. Evaluate templates (pure — no activity) ──────────────────────── + task_specs: list[dict] = evaluate_templates( + defn["task_templates"], context_snapshot + ) + + # ── 4. Log the run ──────────────────────────────────────────────────── + # run_id is derived deterministically so log_run retries are idempotent. + run_id = str( + uuid.uuid5(uuid.NAMESPACE_URL, f"{activity_id}:{trigger_key}") + ) + await workflow.execute_activity( + log_run, + { + "run_id": run_id, + "activity_id": activity_id, + "scheduled_for": scheduled_for, + "context_snapshot": context_snapshot, + "tasks_spawned": len(task_specs), + "version_used": defn["version"], + }, + start_to_close_timeout=_ACTIVITY_TIMEOUT, + retry_policy=_RETRY_POLICY, + ) + + # ── 5. Spawn task executor children (fire-and-forget) ───────────────── + for index, spec in enumerate(task_specs): + child_id = f"task-{run_id}:{spec['task_type']}:{index}" + await workflow.start_child_workflow( + TaskExecutorWorkflow, + args=[run_id, spec["task_type"], spec["params"]], + id=child_id, + task_queue=_TASK_QUEUE, + parent_close_policy=workflow.ParentClosePolicy.ABANDON, + ) + + return {"run_id": run_id, "tasks_spawned": len(task_specs)} @workflow.defn class TaskExecutorWorkflow: """Child workflow that executes one concrete task instance. - Stub implementation in T19. + Stub implementation — T19. """ @workflow.run diff --git a/workplans/custodian-WP-0001-temporal-backbone.md b/workplans/custodian-WP-0001-temporal-backbone.md index c3ee57f..e353ff1 100644 --- a/workplans/custodian-WP-0001-temporal-backbone.md +++ b/workplans/custodian-WP-0001-temporal-backbone.md @@ -76,7 +76,7 @@ tasks: state_hub_task_id: e019cb5a-adf0-4a5d-9410-c41810128190 - id: T18 title: Implement RunActivityWorkflow - status: todo + status: done state_hub_task_id: fb6b3440-47d2-4b0a-97c1-6e780cc497c4 - id: T19 title: Implement TaskExecutorWorkflow (stub)