"""Temporal workflow definitions for activity-core. Two workflows are registered here: - RunActivityWorkflow → orchestrator-tq - TaskExecutorWorkflow → task-execution-tq Workflow IDs follow the conventions in docs/conventions.md: RunActivityWorkflow: activity-{activity_id}:{trigger_key} TaskExecutorWorkflow: task-{run_id}:{task_type}:{index} """ from __future__ import annotations import uuid from datetime import timedelta from temporalio import workflow from temporalio.common import RetryPolicy, SearchAttributeKey, TypedSearchAttributes, SearchAttributePair with workflow.unsafe.imports_passed_through(): from activity_core.activities import ( load_activity_definition, log_run, persist_task_instance, resolve_context, ) from activity_core.template_engine import evaluate_templates from activity_core.schedule_manager import SCHEDULED_TRIGGER_KEY # T32: Custom search attributes for Temporal visibility (must be registered in Temporal first). # Registration: temporal operator search-attribute create --name ActivityId --type Keyword _ACTIVITY_ID_KEY = SearchAttributeKey.for_keyword("ActivityId") _ACTIVITY_NAME_KEY = SearchAttributeKey.for_keyword("ActivityName") _RETRY_POLICY = RetryPolicy( initial_interval=timedelta(seconds=1), backoff_coefficient=2.0, maximum_interval=timedelta(minutes=5), maximum_attempts=10, ) _ACTIVITY_TIMEOUT = timedelta(minutes=5) _TASK_QUEUE = "task-execution-tq" @workflow.defn class RunActivityWorkflow: """Durable orchestration workflow. Sequence: 1. load_activity_definition(activity_id) → defn dict 2. resolve_context(defn.context_sources) → context snapshot 3. evaluate_templates(templates, context) → task specs (pure, no activity) 4. log_run(...) → run_id 5. start_child_workflow per task spec (fire-and-forget, detached) """ @workflow.run async def run( self, activity_id: str, trigger_key: str, scheduled_for: str | None = None, ) -> dict: """ Args: activity_id: UUID of the ActivityDefinition row. trigger_key: ISO-8601 datetime (cron) or event_id (event trigger). Used as the idempotency key component. scheduled_for: ISO-8601 string of the nominal scheduled time (cron only). Returns: {"run_id": str, "tasks_spawned": int} """ # ── 1. Load definition ──────────────────────────────────────────────── defn: dict = await workflow.execute_activity( load_activity_definition, activity_id, start_to_close_timeout=_ACTIVITY_TIMEOUT, retry_policy=_RETRY_POLICY, ) # T32: Tag this workflow execution with activity metadata so runs are # filterable in the Temporal UI (requires ActivityId + ActivityName to be # registered as custom search attributes — see docs/runbook.md). workflow.upsert_search_attributes( TypedSearchAttributes([ SearchAttributePair(_ACTIVITY_ID_KEY, activity_id), SearchAttributePair(_ACTIVITY_NAME_KEY, defn.get("name", "")), ]) ) # ── 2. Resolve context ──────────────────────────────────────────────── context_snapshot: dict = await workflow.execute_activity( resolve_context, defn["context_sources"], start_to_close_timeout=_ACTIVITY_TIMEOUT, retry_policy=_RETRY_POLICY, ) # ── 3. Evaluate templates (pure — no activity) ──────────────────────── task_specs: list[dict] = evaluate_templates( defn["task_templates"], context_snapshot ) # ── 4. Log the run ──────────────────────────────────────────────────── # run_id is derived deterministically so log_run retries are idempotent. # For schedule-fired runs the trigger_key is the sentinel "scheduled"; # each fire has a unique workflow_id (embeds ${firstScheduledTime}), so # we use the workflow_id as the dedup key instead. if trigger_key == SCHEDULED_TRIGGER_KEY: dedup_source = workflow.info().workflow_id else: dedup_source = f"{activity_id}:{trigger_key}" run_id = str(uuid.uuid5(uuid.NAMESPACE_URL, dedup_source)) await workflow.execute_activity( log_run, { "run_id": run_id, "activity_id": activity_id, "scheduled_for": scheduled_for, "context_snapshot": context_snapshot, "tasks_spawned": len(task_specs), "version_used": defn["version"], }, start_to_close_timeout=_ACTIVITY_TIMEOUT, retry_policy=_RETRY_POLICY, ) # ── 5. Spawn task executor children (fire-and-forget) ───────────────── for index, spec in enumerate(task_specs): child_id = f"task-{run_id}:{spec['task_type']}:{index}" await workflow.start_child_workflow( TaskExecutorWorkflow, args=[run_id, spec["task_type"], spec["params"]], id=child_id, task_queue=_TASK_QUEUE, parent_close_policy=workflow.ParentClosePolicy.ABANDON, ) return {"run_id": run_id, "tasks_spawned": len(task_specs)} @workflow.defn class TaskExecutorWorkflow: """Child workflow that executes one concrete task instance. Stub behaviour: persists a task_instances row with status=done and returns immediately. Real task execution logic replaces this in a later workstream. task_id is derived deterministically from the workflow's own ID so persist_task_instance retries remain idempotent. """ @workflow.run async def run(self, run_id: str, task_type: str, params: dict) -> dict: # Derive a stable task_id from this workflow's own ID. task_id = str( uuid.uuid5(uuid.NAMESPACE_URL, workflow.info().workflow_id) ) workflow.logger.info( "TaskExecutorWorkflow started", extra={"run_id": run_id, "task_type": task_type, "task_id": task_id}, ) await workflow.execute_activity( persist_task_instance, { "id": task_id, "run_id": run_id, "type": task_type, "params": params, "status": "done", }, task_queue=_TASK_QUEUE, start_to_close_timeout=_ACTIVITY_TIMEOUT, retry_policy=_RETRY_POLICY, ) return {"task_id": task_id, "status": "done"}