diff --git a/src/activity_core/activities.py b/src/activity_core/activities.py index 6acc713..a184373 100644 --- a/src/activity_core/activities.py +++ b/src/activity_core/activities.py @@ -4,23 +4,81 @@ Activities run inside a Worker bound to 'orchestrator-tq'. Each function is decorated with @activity.defn and executed by RunActivityWorkflow via workflow.execute_activity(). -Implementations are added in T14–T17; stubs here let the worker -register correctly before the bodies are written. +DB access pattern: worker.py calls init_session_factory(url) once before +starting workers, which sets the module-level _session_factory used by +activities that need DB access. """ from __future__ import annotations -from temporalio import activity +import uuid +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker +from temporalio import activity +from temporalio.exceptions import ApplicationError + +from activity_core.db import make_engine +from activity_core.orm import ActivityDefinition as ActivityDefinitionRow + + +_session_factory: async_sessionmaker[AsyncSession] | None = None + + +def init_session_factory(url: str) -> None: + """Initialise the shared DB session factory. + + Must be called once from worker.py before workers are started. + """ + global _session_factory + _session_factory = async_sessionmaker(make_engine(url), expire_on_commit=False) + + +def _get_session_factory() -> async_sessionmaker[AsyncSession]: + if _session_factory is None: + raise RuntimeError( + "DB session factory not initialised — call init_session_factory() first" + ) + return _session_factory + + +# ── Activities ───────────────────────────────────────────────────────────────── @activity.defn async def load_activity_definition(activity_id: str) -> dict: - """Load an ActivityDefinition row from the DB by ID. + """Load an ActivityDefinition row from Postgres by ID. - Returns a JSON-serialisable dict (Pydantic .model_dump()). - Implemented in T14. + Returns a JSON-serialisable dict suitable for passing between + Temporal workflow steps. + + Raises: + ApplicationError (non-retryable): if no row exists for activity_id. """ - raise NotImplementedError("T14") + Session = _get_session_factory() + async with Session() as session: + row = await session.scalar( + select(ActivityDefinitionRow).where( + ActivityDefinitionRow.id == uuid.UUID(activity_id) + ) + ) + + if row is None: + raise ApplicationError( + f"ActivityDefinition {activity_id!r} not found", + non_retryable=True, + ) + + return { + "id": str(row.id), + "name": row.name, + "enabled": row.enabled, + "trigger_type": row.trigger_type, + "trigger_config": row.trigger_config, + "context_sources": row.context_sources, + "task_templates": row.task_templates, + "dedupe_key_strategy": row.dedupe_key_strategy, + "version": row.version, + } @activity.defn diff --git a/src/activity_core/worker.py b/src/activity_core/worker.py index ae8da60..9aaef07 100644 --- a/src/activity_core/worker.py +++ b/src/activity_core/worker.py @@ -24,6 +24,7 @@ from temporalio.client import Client from temporalio.worker import Worker from activity_core.activities import ( + init_session_factory, load_activity_definition, log_run, resolve_context, @@ -38,6 +39,11 @@ TASK_EXECUTION_TASK_QUEUE = "task-execution-tq" async def run() -> None: + db_url = os.environ.get("ACTCORE_DB_URL") + if not db_url: + raise RuntimeError("ACTCORE_DB_URL is required") + init_session_factory(db_url) + client = await Client.connect(TEMPORAL_HOST, namespace=TEMPORAL_NAMESPACE) orchestrator_worker = Worker( diff --git a/workplans/custodian-WP-0001-temporal-backbone.md b/workplans/custodian-WP-0001-temporal-backbone.md index 8a8ecab..2d829dc 100644 --- a/workplans/custodian-WP-0001-temporal-backbone.md +++ b/workplans/custodian-WP-0001-temporal-backbone.md @@ -60,7 +60,7 @@ tasks: state_hub_task_id: e0205c56-1d40-4142-952b-e27ff6a44e1d - id: T14 title: Implement load_activity_definition activity - status: todo + status: done state_hub_task_id: b05f046f-a6ba-4d96-a298-a0bbea067427 - id: T15 title: Implement resolve_context activity (stub)