feat(workflows): TaskExecutorWorkflow stub + wire worker — T19/T20

activities.py — persist_task_instance (new):
  Idempotent INSERT ... ON CONFLICT (id) DO NOTHING on task_instances.
  task_id passed in from workflow (derived from workflow_id via uuid5).
  Registered on task-execution-tq.

workflows.py — TaskExecutorWorkflow (T19):
  Derives stable task_id = uuid5(NAMESPACE_URL, workflow_id).
  Calls persist_task_instance → status=done, returns immediately.
  Real execution logic to replace stub in a later workstream.

worker.py — T20:
  Registers persist_task_instance on task-execution-tq Worker.
  Both queues fully wired: orchestrator-tq and task-execution-tq.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-26 22:30:50 +00:00
parent da7de6ea3b
commit 34aa70cbd9
4 changed files with 75 additions and 6 deletions

View File

@@ -22,7 +22,7 @@ from temporalio.exceptions import ApplicationError
from activity_core.db import make_engine from activity_core.db import make_engine
from activity_core.orm import ActivityDefinition as ActivityDefinitionRow from activity_core.orm import ActivityDefinition as ActivityDefinitionRow
from activity_core.orm import ActivityRun from activity_core.orm import ActivityRun, TaskInstance
_session_factory: async_sessionmaker[AsyncSession] | None = None _session_factory: async_sessionmaker[AsyncSession] | None = None
@@ -162,3 +162,41 @@ async def log_run(run_payload: dict) -> str:
await session.execute(stmt) await session.execute(stmt)
return str(run_id) return str(run_id)
@activity.defn
async def persist_task_instance(task_payload: dict) -> str:
"""Write a TaskInstance row and return its id.
Idempotent: uses INSERT … ON CONFLICT (id) DO NOTHING.
Expected keys in task_payload:
id (str UUID — deterministic, computed in TaskExecutorWorkflow)
run_id (str UUID)
type (str)
params (dict)
status (str, default "done" for stub)
Returns:
task instance id as a str UUID.
"""
Session = _get_session_factory()
task_id = uuid.UUID(task_payload["id"])
stmt = (
pg_insert(TaskInstance)
.values(
id=task_id,
run_id=uuid.UUID(task_payload["run_id"]),
type=task_payload["type"],
params=task_payload.get("params", {}),
status=task_payload.get("status", "done"),
)
.on_conflict_do_nothing(index_elements=["id"])
)
async with Session() as session:
async with session.begin():
await session.execute(stmt)
return str(task_id)

View File

@@ -27,6 +27,7 @@ from activity_core.activities import (
init_session_factory, init_session_factory,
load_activity_definition, load_activity_definition,
log_run, log_run,
persist_task_instance,
resolve_context, resolve_context,
) )
from activity_core.workflows import RunActivityWorkflow, TaskExecutorWorkflow from activity_core.workflows import RunActivityWorkflow, TaskExecutorWorkflow
@@ -57,7 +58,7 @@ async def run() -> None:
client, client,
task_queue=TASK_EXECUTION_TASK_QUEUE, task_queue=TASK_EXECUTION_TASK_QUEUE,
workflows=[TaskExecutorWorkflow], workflows=[TaskExecutorWorkflow],
activities=[], activities=[persist_task_instance],
) )
async with orchestrator_worker, task_worker: async with orchestrator_worker, task_worker:

View File

@@ -21,6 +21,7 @@ with workflow.unsafe.imports_passed_through():
from activity_core.activities import ( from activity_core.activities import (
load_activity_definition, load_activity_definition,
log_run, log_run,
persist_task_instance,
resolve_context, resolve_context,
) )
from activity_core.template_engine import evaluate_templates from activity_core.template_engine import evaluate_templates
@@ -123,9 +124,38 @@ class RunActivityWorkflow:
class TaskExecutorWorkflow: class TaskExecutorWorkflow:
"""Child workflow that executes one concrete task instance. """Child workflow that executes one concrete task instance.
Stub implementation — T19. Stub behaviour: persists a task_instances row with status=done and
returns immediately. Real task execution logic replaces this in a
later workstream.
task_id is derived deterministically from the workflow's own ID so
persist_task_instance retries remain idempotent.
""" """
@workflow.run @workflow.run
async def run(self, run_id: str, task_type: str, params: dict) -> dict: async def run(self, run_id: str, task_type: str, params: dict) -> dict:
raise NotImplementedError("T19") # Derive a stable task_id from this workflow's own ID.
task_id = str(
uuid.uuid5(uuid.NAMESPACE_URL, workflow.info().workflow_id)
)
workflow.logger.info(
"TaskExecutorWorkflow started",
extra={"run_id": run_id, "task_type": task_type, "task_id": task_id},
)
await workflow.execute_activity(
persist_task_instance,
{
"id": task_id,
"run_id": run_id,
"type": task_type,
"params": params,
"status": "done",
},
task_queue=_TASK_QUEUE,
start_to_close_timeout=_ACTIVITY_TIMEOUT,
retry_policy=_RETRY_POLICY,
)
return {"task_id": task_id, "status": "done"}

View File

@@ -80,11 +80,11 @@ tasks:
state_hub_task_id: fb6b3440-47d2-4b0a-97c1-6e780cc497c4 state_hub_task_id: fb6b3440-47d2-4b0a-97c1-6e780cc497c4
- id: T19 - id: T19
title: Implement TaskExecutorWorkflow (stub) title: Implement TaskExecutorWorkflow (stub)
status: todo status: done
state_hub_task_id: 70a7365f-3042-4770-b3cd-3c6724b0790d state_hub_task_id: 70a7365f-3042-4770-b3cd-3c6724b0790d
- id: T20 - id: T20
title: Wire up worker entrypoint title: Wire up worker entrypoint
status: todo status: done
state_hub_task_id: 1da921f5-86a8-488f-a015-402079194e10 state_hub_task_id: 1da921f5-86a8-488f-a015-402079194e10
- id: T21 - id: T21
title: Manual end-to-end test title: Manual end-to-end test