From 21edc313db1454f86de742e4679eaa5726877f75 Mon Sep 17 00:00:00 2001 From: Bernd Worsch Date: Thu, 26 Mar 2026 21:57:56 +0000 Subject: [PATCH] =?UTF-8?q?feat(worker):=20scaffold=20activities,=20workfl?= =?UTF-8?q?ows,=20worker=20entrypoint=20=E2=80=94=20T13?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit src/activity_core/activities.py: - load_activity_definition, resolve_context, log_run — @activity.defn stubs (raise NotImplementedError, bodies in T14–T17) src/activity_core/workflows.py: - RunActivityWorkflow (orchestrator-tq) — @workflow.defn stub (T18) - TaskExecutorWorkflow (task-execution-tq) — @workflow.defn stub (T19) src/activity_core/worker.py: - Connects to Temporal via TEMPORAL_HOST / TEMPORAL_NAMESPACE env vars - Spawns two Workers: orchestrator-tq and task-execution-tq - Runs until cancelled (python -m activity_core.worker) Co-Authored-By: Claude Sonnet 4.6 --- src/activity_core/activities.py | 41 ++++++++++++ src/activity_core/worker.py | 66 +++++++++++++++++++ src/activity_core/workflows.py | 57 ++++++++++++++++ .../custodian-WP-0001-temporal-backbone.md | 2 +- 4 files changed, 165 insertions(+), 1 deletion(-) create mode 100644 src/activity_core/activities.py create mode 100644 src/activity_core/worker.py create mode 100644 src/activity_core/workflows.py diff --git a/src/activity_core/activities.py b/src/activity_core/activities.py new file mode 100644 index 0000000..6acc713 --- /dev/null +++ b/src/activity_core/activities.py @@ -0,0 +1,41 @@ +"""Temporal activity definitions for activity-core. + +Activities run inside a Worker bound to 'orchestrator-tq'. +Each function is decorated with @activity.defn and executed by +RunActivityWorkflow via workflow.execute_activity(). + +Implementations are added in T14–T17; stubs here let the worker +register correctly before the bodies are written. +""" + +from __future__ import annotations + +from temporalio import activity + + +@activity.defn +async def load_activity_definition(activity_id: str) -> dict: + """Load an ActivityDefinition row from the DB by ID. + + Returns a JSON-serialisable dict (Pydantic .model_dump()). + Implemented in T14. + """ + raise NotImplementedError("T14") + + +@activity.defn +async def resolve_context(context_sources: list[dict]) -> dict: + """Fetch and merge all context sources into a single snapshot dict. + + Implemented in T15. + """ + raise NotImplementedError("T15") + + +@activity.defn +async def log_run(run_payload: dict) -> str: + """Persist an ActivityRun record and return its run_id. + + Implemented in T17. + """ + raise NotImplementedError("T17") diff --git a/src/activity_core/worker.py b/src/activity_core/worker.py new file mode 100644 index 0000000..ae8da60 --- /dev/null +++ b/src/activity_core/worker.py @@ -0,0 +1,66 @@ +"""Temporal worker entrypoint for activity-core. + +Starts two workers (wired up in T20): + - orchestrator-tq: RunActivityWorkflow + its activities + - task-execution-tq: TaskExecutorWorkflow + +Run with: + TEMPORAL_HOST=localhost:7233 \ + ACTCORE_DB_URL=postgresql+asyncpg://actcore:actcore@localhost:5433/actcore \ + python -m activity_core.worker + +Environment variables: + TEMPORAL_HOST Temporal frontend address (default: localhost:7233) + TEMPORAL_NAMESPACE Temporal namespace (default: default) + ACTCORE_DB_URL App DB connection string (required) +""" + +from __future__ import annotations + +import asyncio +import os + +from temporalio.client import Client +from temporalio.worker import Worker + +from activity_core.activities import ( + load_activity_definition, + log_run, + resolve_context, +) +from activity_core.workflows import RunActivityWorkflow, TaskExecutorWorkflow + +TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233") +TEMPORAL_NAMESPACE = os.environ.get("TEMPORAL_NAMESPACE", "default") + +ORCHESTRATOR_TASK_QUEUE = "orchestrator-tq" +TASK_EXECUTION_TASK_QUEUE = "task-execution-tq" + + +async def run() -> None: + client = await Client.connect(TEMPORAL_HOST, namespace=TEMPORAL_NAMESPACE) + + orchestrator_worker = Worker( + client, + task_queue=ORCHESTRATOR_TASK_QUEUE, + workflows=[RunActivityWorkflow], + activities=[load_activity_definition, resolve_context, log_run], + ) + + task_worker = Worker( + client, + task_queue=TASK_EXECUTION_TASK_QUEUE, + workflows=[TaskExecutorWorkflow], + activities=[], + ) + + async with orchestrator_worker, task_worker: + print( + f"Workers running — queues: {ORCHESTRATOR_TASK_QUEUE!r}, " + f"{TASK_EXECUTION_TASK_QUEUE!r} (namespace={TEMPORAL_NAMESPACE!r})" + ) + await asyncio.Future() # run until cancelled + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/src/activity_core/workflows.py b/src/activity_core/workflows.py new file mode 100644 index 0000000..e3fd65e --- /dev/null +++ b/src/activity_core/workflows.py @@ -0,0 +1,57 @@ +"""Temporal workflow definitions for activity-core. + +Two workflows are registered here: + - RunActivityWorkflow → orchestrator-tq + - TaskExecutorWorkflow → task-execution-tq + +Workflow IDs follow the conventions in docs/conventions.md: + RunActivityWorkflow: activity-{activity_id}:{trigger_key} + TaskExecutorWorkflow: task-{run_id}:{task_type}:{index} + +Implementations are added in T18–T19; stubs here let the worker +register and the type system resolve references in T14–T17. +""" + +from __future__ import annotations + +from datetime import timedelta + +from temporalio import workflow + +with workflow.unsafe.imports_passed_through(): + from activity_core.activities import ( + load_activity_definition, + log_run, + resolve_context, + ) + +_DEFAULT_TIMEOUT = timedelta(minutes=5) + + +@workflow.defn +class RunActivityWorkflow: + """Durable orchestration workflow. + + Sequence (T18): + 1. load_activity_definition(activity_id) + 2. resolve_context(context_sources) + 3. evaluate_templates(task_templates, context) ← pure function, no activity + 4. spawn TaskExecutorWorkflow child per template result + 5. log_run(...) + """ + + @workflow.run + async def run(self, activity_id: str, trigger_key: str) -> dict: + raise NotImplementedError("T18") + + +@workflow.defn +class TaskExecutorWorkflow: + """Child workflow that executes one concrete task instance. + + Stub implementation in T19. + """ + + @workflow.run + async def run(self, run_id: str, task_type: str, params: dict) -> dict: + raise NotImplementedError("T19") diff --git a/workplans/custodian-WP-0001-temporal-backbone.md b/workplans/custodian-WP-0001-temporal-backbone.md index 3e893bd..8a8ecab 100644 --- a/workplans/custodian-WP-0001-temporal-backbone.md +++ b/workplans/custodian-WP-0001-temporal-backbone.md @@ -56,7 +56,7 @@ tasks: state_hub_task_id: f24662ff-4a26-48bd-b997-57e7586c7f11 - id: T13 title: Scaffold Python worker project - status: todo + status: done state_hub_task_id: e0205c56-1d40-4142-952b-e27ff6a44e1d - id: T14 title: Implement load_activity_definition activity