"""Temporal workflow definitions for activity-core. Two workflows are registered here: - RunActivityWorkflow → orchestrator-tq - TaskExecutorWorkflow → task-execution-tq Workflow IDs follow the conventions in docs/conventions.md: RunActivityWorkflow: activity-{activity_id}:{trigger_key} TaskExecutorWorkflow: task-{run_id}:{task_type}:{index} """ from __future__ import annotations import uuid from datetime import timedelta from temporalio import workflow from temporalio.common import RetryPolicy, SearchAttributeKey, TypedSearchAttributes, SearchAttributePair with workflow.unsafe.imports_passed_through(): from activity_core.activities import ( emit_tasks, evaluate_rules, load_activity_definition, log_run, persist_task_instance, resolve_context, ) from activity_core.schedule_manager import SCHEDULED_TRIGGER_KEY # T32: Custom search attributes for Temporal visibility (must be registered in Temporal first). # Registration: temporal operator search-attribute create --name ActivityId --type Keyword _ACTIVITY_ID_KEY = SearchAttributeKey.for_keyword("ActivityId") _ACTIVITY_NAME_KEY = SearchAttributeKey.for_keyword("ActivityName") _RETRY_POLICY = RetryPolicy( initial_interval=timedelta(seconds=1), backoff_coefficient=2.0, maximum_interval=timedelta(minutes=5), maximum_attempts=10, ) _ACTIVITY_TIMEOUT = timedelta(minutes=5) _TASK_QUEUE = "task-execution-tq" @workflow.defn class RunActivityWorkflow: """Durable orchestration workflow. Sequence: 1. load_activity_definition(activity_id) → defn dict 2. resolve_context(defn.context_sources) → context snapshot 3. evaluate_rules(rules, event, context) → matching rules → TaskSpec dicts 4. emit_tasks(task_specs) → TaskRef list via IssueSink 5. log_run(...) → activity_runs row """ @workflow.run async def run( self, activity_id: str, trigger_key: str, scheduled_for: str | None = None, event_envelope_json: str | None = None, ) -> dict: """ Args: activity_id: UUID of the ActivityDefinition row. trigger_key: event_id (event trigger) or "scheduled" (cron). scheduled_for: ISO-8601 nominal scheduled time (cron only). event_envelope_json: JSON-serialised EventEnvelope (event trigger only). Returns: {"run_id": str, "tasks_spawned": int} """ # ── 1. Load definition ──────────────────────────────────────────────── defn: dict = await workflow.execute_activity( load_activity_definition, activity_id, start_to_close_timeout=_ACTIVITY_TIMEOUT, retry_policy=_RETRY_POLICY, ) # T32: Tag this workflow execution with activity metadata so runs are # filterable in the Temporal UI (requires ActivityId + ActivityName to be # registered as custom search attributes — see docs/runbook.md). workflow.upsert_search_attributes( TypedSearchAttributes([ SearchAttributePair(_ACTIVITY_ID_KEY, activity_id), SearchAttributePair(_ACTIVITY_NAME_KEY, defn.get("name", "")), ]) ) # ── 2. Resolve context ──────────────────────────────────────────────── context_snapshot: dict = await workflow.execute_activity( resolve_context, defn["context_sources"], start_to_close_timeout=_ACTIVITY_TIMEOUT, retry_policy=_RETRY_POLICY, ) # ── 3. Evaluate rules ───────────────────────────────────────────────── import json as _json event_attrs: dict = {} if event_envelope_json: try: event_attrs = _json.loads(event_envelope_json).get("attributes", {}) except Exception: pass matched_rules: list[dict] = await workflow.execute_activity( evaluate_rules, { "rules": defn.get("rules", []), "event": event_attrs, "context": context_snapshot, }, start_to_close_timeout=_ACTIVITY_TIMEOUT, retry_policy=_RETRY_POLICY, ) # Convert matched rules to TaskSpec dicts for emission. task_spec_dicts: list[dict] = [] for rule in matched_rules: action = rule.get("action", {}) task_spec_dicts.append({ "title": action.get("task_template", rule.get("id", "")), "description": "", "target_repo": action.get("target_repo"), "priority": action.get("priority", "medium"), "labels": action.get("labels", []), "due_in_days": action.get("due_in_days"), "source_type": "rule", "source_id": rule.get("id", ""), "condition": rule.get("condition", ""), }) # ── 4. Emit tasks via IssueSink ─────────────────────────────────────── if trigger_key == SCHEDULED_TRIGGER_KEY: dedup_source = workflow.info().workflow_id else: dedup_source = f"{activity_id}:{trigger_key}" run_id = str(uuid.uuid5(uuid.NAMESPACE_URL, dedup_source)) if task_spec_dicts: await workflow.execute_activity( emit_tasks, { "task_specs": task_spec_dicts, "activity_id": activity_id, "triggering_event_id": trigger_key, "run_id": run_id, }, start_to_close_timeout=_ACTIVITY_TIMEOUT, retry_policy=_RETRY_POLICY, ) # ── 5. Log the run ──────────────────────────────────────────────────── await workflow.execute_activity( log_run, { "run_id": run_id, "activity_id": activity_id, "scheduled_for": scheduled_for, "context_snapshot": context_snapshot, "tasks_spawned": len(task_spec_dicts), "version_used": defn["version"], }, start_to_close_timeout=_ACTIVITY_TIMEOUT, retry_policy=_RETRY_POLICY, ) return {"run_id": run_id, "tasks_spawned": len(task_spec_dicts)} @workflow.defn class TaskExecutorWorkflow: """Child workflow that executes one concrete task instance. Stub behaviour: persists a task_instances row with status=done and returns immediately. Real task execution logic replaces this in a later workstream. task_id is derived deterministically from the workflow's own ID so persist_task_instance retries remain idempotent. """ @workflow.run async def run(self, run_id: str, task_type: str, params: dict) -> dict: # Derive a stable task_id from this workflow's own ID. task_id = str( uuid.uuid5(uuid.NAMESPACE_URL, workflow.info().workflow_id) ) workflow.logger.info( "TaskExecutorWorkflow started", extra={"run_id": run_id, "task_type": task_type, "task_id": task_id}, ) await workflow.execute_activity( persist_task_instance, { "id": task_id, "run_id": run_id, "type": task_type, "params": params, "status": "done", }, task_queue=_TASK_QUEUE, start_to_close_timeout=_ACTIVITY_TIMEOUT, retry_policy=_RETRY_POLICY, ) return {"task_id": task_id, "status": "done"}