generated from coulomb/repo-seed
feat(worker): scaffold activities, workflows, worker entrypoint — T13
src/activity_core/activities.py:
- load_activity_definition, resolve_context, log_run — @activity.defn
stubs (raise NotImplementedError, bodies in T14–T17)
src/activity_core/workflows.py:
- RunActivityWorkflow (orchestrator-tq) — @workflow.defn stub (T18)
- TaskExecutorWorkflow (task-execution-tq) — @workflow.defn stub (T19)
src/activity_core/worker.py:
- Connects to Temporal via TEMPORAL_HOST / TEMPORAL_NAMESPACE env vars
- Spawns two Workers: orchestrator-tq and task-execution-tq
- Runs until cancelled (python -m activity_core.worker)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
41
src/activity_core/activities.py
Normal file
41
src/activity_core/activities.py
Normal file
@@ -0,0 +1,41 @@
|
||||
"""Temporal activity definitions for activity-core.
|
||||
|
||||
Activities run inside a Worker bound to 'orchestrator-tq'.
|
||||
Each function is decorated with @activity.defn and executed by
|
||||
RunActivityWorkflow via workflow.execute_activity().
|
||||
|
||||
Implementations are added in T14–T17; stubs here let the worker
|
||||
register correctly before the bodies are written.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from temporalio import activity
|
||||
|
||||
|
||||
@activity.defn
|
||||
async def load_activity_definition(activity_id: str) -> dict:
|
||||
"""Load an ActivityDefinition row from the DB by ID.
|
||||
|
||||
Returns a JSON-serialisable dict (Pydantic .model_dump()).
|
||||
Implemented in T14.
|
||||
"""
|
||||
raise NotImplementedError("T14")
|
||||
|
||||
|
||||
@activity.defn
|
||||
async def resolve_context(context_sources: list[dict]) -> dict:
|
||||
"""Fetch and merge all context sources into a single snapshot dict.
|
||||
|
||||
Implemented in T15.
|
||||
"""
|
||||
raise NotImplementedError("T15")
|
||||
|
||||
|
||||
@activity.defn
|
||||
async def log_run(run_payload: dict) -> str:
|
||||
"""Persist an ActivityRun record and return its run_id.
|
||||
|
||||
Implemented in T17.
|
||||
"""
|
||||
raise NotImplementedError("T17")
|
||||
66
src/activity_core/worker.py
Normal file
66
src/activity_core/worker.py
Normal file
@@ -0,0 +1,66 @@
|
||||
"""Temporal worker entrypoint for activity-core.
|
||||
|
||||
Starts two workers (wired up in T20):
|
||||
- orchestrator-tq: RunActivityWorkflow + its activities
|
||||
- task-execution-tq: TaskExecutorWorkflow
|
||||
|
||||
Run with:
|
||||
TEMPORAL_HOST=localhost:7233 \
|
||||
ACTCORE_DB_URL=postgresql+asyncpg://actcore:actcore@localhost:5433/actcore \
|
||||
python -m activity_core.worker
|
||||
|
||||
Environment variables:
|
||||
TEMPORAL_HOST Temporal frontend address (default: localhost:7233)
|
||||
TEMPORAL_NAMESPACE Temporal namespace (default: default)
|
||||
ACTCORE_DB_URL App DB connection string (required)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
from temporalio.client import Client
|
||||
from temporalio.worker import Worker
|
||||
|
||||
from activity_core.activities import (
|
||||
load_activity_definition,
|
||||
log_run,
|
||||
resolve_context,
|
||||
)
|
||||
from activity_core.workflows import RunActivityWorkflow, TaskExecutorWorkflow
|
||||
|
||||
TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233")
|
||||
TEMPORAL_NAMESPACE = os.environ.get("TEMPORAL_NAMESPACE", "default")
|
||||
|
||||
ORCHESTRATOR_TASK_QUEUE = "orchestrator-tq"
|
||||
TASK_EXECUTION_TASK_QUEUE = "task-execution-tq"
|
||||
|
||||
|
||||
async def run() -> None:
|
||||
client = await Client.connect(TEMPORAL_HOST, namespace=TEMPORAL_NAMESPACE)
|
||||
|
||||
orchestrator_worker = Worker(
|
||||
client,
|
||||
task_queue=ORCHESTRATOR_TASK_QUEUE,
|
||||
workflows=[RunActivityWorkflow],
|
||||
activities=[load_activity_definition, resolve_context, log_run],
|
||||
)
|
||||
|
||||
task_worker = Worker(
|
||||
client,
|
||||
task_queue=TASK_EXECUTION_TASK_QUEUE,
|
||||
workflows=[TaskExecutorWorkflow],
|
||||
activities=[],
|
||||
)
|
||||
|
||||
async with orchestrator_worker, task_worker:
|
||||
print(
|
||||
f"Workers running — queues: {ORCHESTRATOR_TASK_QUEUE!r}, "
|
||||
f"{TASK_EXECUTION_TASK_QUEUE!r} (namespace={TEMPORAL_NAMESPACE!r})"
|
||||
)
|
||||
await asyncio.Future() # run until cancelled
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(run())
|
||||
57
src/activity_core/workflows.py
Normal file
57
src/activity_core/workflows.py
Normal file
@@ -0,0 +1,57 @@
|
||||
"""Temporal workflow definitions for activity-core.
|
||||
|
||||
Two workflows are registered here:
|
||||
- RunActivityWorkflow → orchestrator-tq
|
||||
- TaskExecutorWorkflow → task-execution-tq
|
||||
|
||||
Workflow IDs follow the conventions in docs/conventions.md:
|
||||
RunActivityWorkflow: activity-{activity_id}:{trigger_key}
|
||||
TaskExecutorWorkflow: task-{run_id}:{task_type}:{index}
|
||||
|
||||
Implementations are added in T18–T19; stubs here let the worker
|
||||
register and the type system resolve references in T14–T17.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import timedelta
|
||||
|
||||
from temporalio import workflow
|
||||
|
||||
with workflow.unsafe.imports_passed_through():
|
||||
from activity_core.activities import (
|
||||
load_activity_definition,
|
||||
log_run,
|
||||
resolve_context,
|
||||
)
|
||||
|
||||
_DEFAULT_TIMEOUT = timedelta(minutes=5)
|
||||
|
||||
|
||||
@workflow.defn
|
||||
class RunActivityWorkflow:
|
||||
"""Durable orchestration workflow.
|
||||
|
||||
Sequence (T18):
|
||||
1. load_activity_definition(activity_id)
|
||||
2. resolve_context(context_sources)
|
||||
3. evaluate_templates(task_templates, context) ← pure function, no activity
|
||||
4. spawn TaskExecutorWorkflow child per template result
|
||||
5. log_run(...)
|
||||
"""
|
||||
|
||||
@workflow.run
|
||||
async def run(self, activity_id: str, trigger_key: str) -> dict:
|
||||
raise NotImplementedError("T18")
|
||||
|
||||
|
||||
@workflow.defn
|
||||
class TaskExecutorWorkflow:
|
||||
"""Child workflow that executes one concrete task instance.
|
||||
|
||||
Stub implementation in T19.
|
||||
"""
|
||||
|
||||
@workflow.run
|
||||
async def run(self, run_id: str, task_type: str, params: dict) -> dict:
|
||||
raise NotImplementedError("T19")
|
||||
Reference in New Issue
Block a user