From 34aa70cbd9799a43774cf72e0d975b2fd917a8a9 Mon Sep 17 00:00:00 2001 From: Bernd Worsch Date: Thu, 26 Mar 2026 22:30:50 +0000 Subject: [PATCH] =?UTF-8?q?feat(workflows):=20TaskExecutorWorkflow=20stub?= =?UTF-8?q?=20+=20wire=20worker=20=E2=80=94=20T19/T20?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit activities.py — persist_task_instance (new): Idempotent INSERT ... ON CONFLICT (id) DO NOTHING on task_instances. task_id passed in from workflow (derived from workflow_id via uuid5). Registered on task-execution-tq. workflows.py — TaskExecutorWorkflow (T19): Derives stable task_id = uuid5(NAMESPACE_URL, workflow_id). Calls persist_task_instance → status=done, returns immediately. Real execution logic to replace stub in a later workstream. worker.py — T20: Registers persist_task_instance on task-execution-tq Worker. Both queues fully wired: orchestrator-tq and task-execution-tq. Co-Authored-By: Claude Sonnet 4.6 --- src/activity_core/activities.py | 40 ++++++++++++++++++- src/activity_core/worker.py | 3 +- src/activity_core/workflows.py | 34 +++++++++++++++- .../custodian-WP-0001-temporal-backbone.md | 4 +- 4 files changed, 75 insertions(+), 6 deletions(-) diff --git a/src/activity_core/activities.py b/src/activity_core/activities.py index a961e0d..de9dfb5 100644 --- a/src/activity_core/activities.py +++ b/src/activity_core/activities.py @@ -22,7 +22,7 @@ from temporalio.exceptions import ApplicationError from activity_core.db import make_engine from activity_core.orm import ActivityDefinition as ActivityDefinitionRow -from activity_core.orm import ActivityRun +from activity_core.orm import ActivityRun, TaskInstance _session_factory: async_sessionmaker[AsyncSession] | None = None @@ -162,3 +162,41 @@ async def log_run(run_payload: dict) -> str: await session.execute(stmt) return str(run_id) + + +@activity.defn +async def persist_task_instance(task_payload: dict) -> str: + """Write a TaskInstance row and return its id. + + Idempotent: uses INSERT … ON CONFLICT (id) DO NOTHING. + + Expected keys in task_payload: + id (str UUID — deterministic, computed in TaskExecutorWorkflow) + run_id (str UUID) + type (str) + params (dict) + status (str, default "done" for stub) + + Returns: + task instance id as a str UUID. + """ + Session = _get_session_factory() + task_id = uuid.UUID(task_payload["id"]) + + stmt = ( + pg_insert(TaskInstance) + .values( + id=task_id, + run_id=uuid.UUID(task_payload["run_id"]), + type=task_payload["type"], + params=task_payload.get("params", {}), + status=task_payload.get("status", "done"), + ) + .on_conflict_do_nothing(index_elements=["id"]) + ) + + async with Session() as session: + async with session.begin(): + await session.execute(stmt) + + return str(task_id) diff --git a/src/activity_core/worker.py b/src/activity_core/worker.py index 9aaef07..5c1f15e 100644 --- a/src/activity_core/worker.py +++ b/src/activity_core/worker.py @@ -27,6 +27,7 @@ from activity_core.activities import ( init_session_factory, load_activity_definition, log_run, + persist_task_instance, resolve_context, ) from activity_core.workflows import RunActivityWorkflow, TaskExecutorWorkflow @@ -57,7 +58,7 @@ async def run() -> None: client, task_queue=TASK_EXECUTION_TASK_QUEUE, workflows=[TaskExecutorWorkflow], - activities=[], + activities=[persist_task_instance], ) async with orchestrator_worker, task_worker: diff --git a/src/activity_core/workflows.py b/src/activity_core/workflows.py index 1205664..b79b6a9 100644 --- a/src/activity_core/workflows.py +++ b/src/activity_core/workflows.py @@ -21,6 +21,7 @@ with workflow.unsafe.imports_passed_through(): from activity_core.activities import ( load_activity_definition, log_run, + persist_task_instance, resolve_context, ) from activity_core.template_engine import evaluate_templates @@ -123,9 +124,38 @@ class RunActivityWorkflow: class TaskExecutorWorkflow: """Child workflow that executes one concrete task instance. - Stub implementation — T19. + Stub behaviour: persists a task_instances row with status=done and + returns immediately. Real task execution logic replaces this in a + later workstream. + + task_id is derived deterministically from the workflow's own ID so + persist_task_instance retries remain idempotent. """ @workflow.run async def run(self, run_id: str, task_type: str, params: dict) -> dict: - raise NotImplementedError("T19") + # Derive a stable task_id from this workflow's own ID. + task_id = str( + uuid.uuid5(uuid.NAMESPACE_URL, workflow.info().workflow_id) + ) + + workflow.logger.info( + "TaskExecutorWorkflow started", + extra={"run_id": run_id, "task_type": task_type, "task_id": task_id}, + ) + + await workflow.execute_activity( + persist_task_instance, + { + "id": task_id, + "run_id": run_id, + "type": task_type, + "params": params, + "status": "done", + }, + task_queue=_TASK_QUEUE, + start_to_close_timeout=_ACTIVITY_TIMEOUT, + retry_policy=_RETRY_POLICY, + ) + + return {"task_id": task_id, "status": "done"} diff --git a/workplans/custodian-WP-0001-temporal-backbone.md b/workplans/custodian-WP-0001-temporal-backbone.md index e353ff1..045b80a 100644 --- a/workplans/custodian-WP-0001-temporal-backbone.md +++ b/workplans/custodian-WP-0001-temporal-backbone.md @@ -80,11 +80,11 @@ tasks: state_hub_task_id: fb6b3440-47d2-4b0a-97c1-6e780cc497c4 - id: T19 title: Implement TaskExecutorWorkflow (stub) - status: todo + status: done state_hub_task_id: 70a7365f-3042-4770-b3cd-3c6724b0790d - id: T20 title: Wire up worker entrypoint - status: todo + status: done state_hub_task_id: 1da921f5-86a8-488f-a015-402079194e10 - id: T21 title: Manual end-to-end test