generated from coulomb/repo-seed
feat(activities): implement load_activity_definition — T14
activities.py: - init_session_factory(url): module-level async_sessionmaker init, called once from worker.py before workers start - load_activity_definition(activity_id): queries activity_definitions by UUID, returns JSON-serialisable dict; raises ApplicationError (non_retryable=True) if row not found worker.py: - reads ACTCORE_DB_URL at startup, fails fast if missing - calls init_session_factory() before connecting to Temporal Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -4,23 +4,81 @@ Activities run inside a Worker bound to 'orchestrator-tq'.
|
||||
Each function is decorated with @activity.defn and executed by
|
||||
RunActivityWorkflow via workflow.execute_activity().
|
||||
|
||||
Implementations are added in T14–T17; stubs here let the worker
|
||||
register correctly before the bodies are written.
|
||||
DB access pattern: worker.py calls init_session_factory(url) once before
|
||||
starting workers, which sets the module-level _session_factory used by
|
||||
activities that need DB access.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from temporalio import activity
|
||||
import uuid
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
||||
from temporalio import activity
|
||||
from temporalio.exceptions import ApplicationError
|
||||
|
||||
from activity_core.db import make_engine
|
||||
from activity_core.orm import ActivityDefinition as ActivityDefinitionRow
|
||||
|
||||
|
||||
_session_factory: async_sessionmaker[AsyncSession] | None = None
|
||||
|
||||
|
||||
def init_session_factory(url: str) -> None:
|
||||
"""Initialise the shared DB session factory.
|
||||
|
||||
Must be called once from worker.py before workers are started.
|
||||
"""
|
||||
global _session_factory
|
||||
_session_factory = async_sessionmaker(make_engine(url), expire_on_commit=False)
|
||||
|
||||
|
||||
def _get_session_factory() -> async_sessionmaker[AsyncSession]:
|
||||
if _session_factory is None:
|
||||
raise RuntimeError(
|
||||
"DB session factory not initialised — call init_session_factory() first"
|
||||
)
|
||||
return _session_factory
|
||||
|
||||
|
||||
# ── Activities ─────────────────────────────────────────────────────────────────
|
||||
|
||||
@activity.defn
|
||||
async def load_activity_definition(activity_id: str) -> dict:
|
||||
"""Load an ActivityDefinition row from the DB by ID.
|
||||
"""Load an ActivityDefinition row from Postgres by ID.
|
||||
|
||||
Returns a JSON-serialisable dict (Pydantic .model_dump()).
|
||||
Implemented in T14.
|
||||
Returns a JSON-serialisable dict suitable for passing between
|
||||
Temporal workflow steps.
|
||||
|
||||
Raises:
|
||||
ApplicationError (non-retryable): if no row exists for activity_id.
|
||||
"""
|
||||
raise NotImplementedError("T14")
|
||||
Session = _get_session_factory()
|
||||
async with Session() as session:
|
||||
row = await session.scalar(
|
||||
select(ActivityDefinitionRow).where(
|
||||
ActivityDefinitionRow.id == uuid.UUID(activity_id)
|
||||
)
|
||||
)
|
||||
|
||||
if row is None:
|
||||
raise ApplicationError(
|
||||
f"ActivityDefinition {activity_id!r} not found",
|
||||
non_retryable=True,
|
||||
)
|
||||
|
||||
return {
|
||||
"id": str(row.id),
|
||||
"name": row.name,
|
||||
"enabled": row.enabled,
|
||||
"trigger_type": row.trigger_type,
|
||||
"trigger_config": row.trigger_config,
|
||||
"context_sources": row.context_sources,
|
||||
"task_templates": row.task_templates,
|
||||
"dedupe_key_strategy": row.dedupe_key_strategy,
|
||||
"version": row.version,
|
||||
}
|
||||
|
||||
|
||||
@activity.defn
|
||||
|
||||
@@ -24,6 +24,7 @@ from temporalio.client import Client
|
||||
from temporalio.worker import Worker
|
||||
|
||||
from activity_core.activities import (
|
||||
init_session_factory,
|
||||
load_activity_definition,
|
||||
log_run,
|
||||
resolve_context,
|
||||
@@ -38,6 +39,11 @@ TASK_EXECUTION_TASK_QUEUE = "task-execution-tq"
|
||||
|
||||
|
||||
async def run() -> None:
|
||||
db_url = os.environ.get("ACTCORE_DB_URL")
|
||||
if not db_url:
|
||||
raise RuntimeError("ACTCORE_DB_URL is required")
|
||||
init_session_factory(db_url)
|
||||
|
||||
client = await Client.connect(TEMPORAL_HOST, namespace=TEMPORAL_NAMESPACE)
|
||||
|
||||
orchestrator_worker = Worker(
|
||||
|
||||
@@ -60,7 +60,7 @@ tasks:
|
||||
state_hub_task_id: e0205c56-1d40-4142-952b-e27ff6a44e1d
|
||||
- id: T14
|
||||
title: Implement load_activity_definition activity
|
||||
status: todo
|
||||
status: done
|
||||
state_hub_task_id: b05f046f-a6ba-4d96-a298-a0bbea067427
|
||||
- id: T15
|
||||
title: Implement resolve_context activity (stub)
|
||||
|
||||
Reference in New Issue
Block a user