Files
activity-core/src/activity_core/workflows.py

250 lines
9.3 KiB
Python

"""Temporal workflow definitions for activity-core.
Two workflows are registered here:
- RunActivityWorkflow → orchestrator-tq
- TaskExecutorWorkflow → task-execution-tq
Workflow IDs follow the conventions in docs/conventions.md:
RunActivityWorkflow: activity-{activity_id}:{trigger_key}
TaskExecutorWorkflow: task-{run_id}:{task_type}:{index}
"""
from __future__ import annotations
import os
import uuid
from datetime import timedelta
from temporalio import workflow
from temporalio.common import RetryPolicy, SearchAttributeKey, TypedSearchAttributes, SearchAttributePair
with workflow.unsafe.imports_passed_through():
from activity_core.activities import (
emit_tasks,
evaluate_rules,
evaluate_instructions,
load_activity_definition,
log_run,
persist_instruction_reports,
persist_ops_evidence,
persist_task_instance,
resolve_context,
)
from activity_core.schedule_manager import SCHEDULED_TRIGGER_KEY
# T32: Custom search attributes for Temporal visibility (must be registered in Temporal first).
# Registration: temporal operator search-attribute create --name ActivityId --type Keyword
_ACTIVITY_ID_KEY = SearchAttributeKey.for_keyword("ActivityId")
_ACTIVITY_NAME_KEY = SearchAttributeKey.for_keyword("ActivityName")
_RETRY_POLICY = RetryPolicy(
initial_interval=timedelta(seconds=1),
backoff_coefficient=2.0,
maximum_interval=timedelta(minutes=5),
maximum_attempts=10,
)
_ACTIVITY_TIMEOUT = timedelta(
seconds=int(os.environ.get("ACTIVITY_TIMEOUT_SECONDS", "900"))
)
_TASK_QUEUE = "task-execution-tq"
@workflow.defn
class RunActivityWorkflow:
"""Durable orchestration workflow.
Sequence:
1. load_activity_definition(activity_id) → defn dict
2. resolve_context(defn.context_sources) → context snapshot
3. evaluate_rules(rules, event, context) → matching rules → TaskSpec dicts
4. emit_tasks(task_specs) → TaskRef list via IssueSink
5. log_run(...) → activity_runs row
"""
@workflow.run
async def run(
self,
activity_id: str,
trigger_key: str,
scheduled_for: str | None = None,
event_envelope_json: str | None = None,
) -> dict:
"""
Args:
activity_id: UUID of the ActivityDefinition row.
trigger_key: event_id (event trigger) or "scheduled" (cron).
scheduled_for: ISO-8601 nominal scheduled time (cron only).
event_envelope_json: JSON-serialised EventEnvelope (event trigger only).
Returns:
{"run_id": str, "tasks_spawned": int}
"""
# ── 1. Load definition ────────────────────────────────────────────────
defn: dict = await workflow.execute_activity(
load_activity_definition,
activity_id,
start_to_close_timeout=_ACTIVITY_TIMEOUT,
retry_policy=_RETRY_POLICY,
)
# T32: Tag this workflow execution with activity metadata so runs are
# filterable in the Temporal UI (requires ActivityId + ActivityName to be
# registered as custom search attributes — see docs/runbook.md).
workflow.upsert_search_attributes(
TypedSearchAttributes([
SearchAttributePair(_ACTIVITY_ID_KEY, activity_id),
SearchAttributePair(_ACTIVITY_NAME_KEY, defn.get("name", "")),
])
)
# ── 2. Resolve context ────────────────────────────────────────────────
context_snapshot: dict = await workflow.execute_activity(
resolve_context,
args=[defn["context_sources"], event_envelope_json],
start_to_close_timeout=_ACTIVITY_TIMEOUT,
retry_policy=_RETRY_POLICY,
)
if trigger_key == SCHEDULED_TRIGGER_KEY:
dedup_source = workflow.info().workflow_id
else:
dedup_source = f"{activity_id}:{trigger_key}"
run_id = str(uuid.uuid5(uuid.NAMESPACE_URL, dedup_source))
await workflow.execute_activity(
persist_ops_evidence,
{
"context_sources": defn.get("context_sources", []),
"context": context_snapshot,
"activity_id": activity_id,
"run_id": run_id,
"scheduled_for": scheduled_for,
"version_used": defn["version"],
},
start_to_close_timeout=_ACTIVITY_TIMEOUT,
retry_policy=_RETRY_POLICY,
)
# ── 3. Evaluate rules ─────────────────────────────────────────────────
import json as _json
event_attrs: dict = {}
if event_envelope_json:
try:
event_attrs = _json.loads(event_envelope_json).get("attributes", {})
except Exception:
pass
task_spec_dicts: list[dict] = await workflow.execute_activity(
evaluate_rules,
{
"rules": defn.get("rules", []),
"event": event_attrs,
"context": context_snapshot,
},
start_to_close_timeout=_ACTIVITY_TIMEOUT,
retry_policy=_RETRY_POLICY,
)
report_dicts: list[dict] = []
if defn.get("instructions"):
instruction_result: dict = await workflow.execute_activity(
evaluate_instructions,
{
"instructions": defn.get("instructions", []),
"event": event_attrs,
"context": context_snapshot,
},
start_to_close_timeout=_ACTIVITY_TIMEOUT,
retry_policy=_RETRY_POLICY,
)
task_spec_dicts.extend(instruction_result.get("task_specs", []))
report_dicts.extend(instruction_result.get("reports", []))
# ── 4. Persist reports and emit tasks ────────────────────────────────
if report_dicts:
await workflow.execute_activity(
persist_instruction_reports,
{
"reports": report_dicts,
"activity_id": activity_id,
"run_id": run_id,
"scheduled_for": scheduled_for,
"version_used": defn["version"],
},
start_to_close_timeout=_ACTIVITY_TIMEOUT,
retry_policy=_RETRY_POLICY,
)
if task_spec_dicts:
await workflow.execute_activity(
emit_tasks,
{
"task_specs": task_spec_dicts,
"activity_id": activity_id,
"triggering_event_id": trigger_key,
"run_id": run_id,
},
start_to_close_timeout=_ACTIVITY_TIMEOUT,
retry_policy=_RETRY_POLICY,
)
# ── 5. Log the run ────────────────────────────────────────────────────
await workflow.execute_activity(
log_run,
{
"run_id": run_id,
"activity_id": activity_id,
"scheduled_for": scheduled_for,
"context_snapshot": context_snapshot,
"tasks_spawned": len(task_spec_dicts),
"version_used": defn["version"],
},
start_to_close_timeout=_ACTIVITY_TIMEOUT,
retry_policy=_RETRY_POLICY,
)
return {"run_id": run_id, "tasks_spawned": len(task_spec_dicts)}
@workflow.defn
class TaskExecutorWorkflow:
"""Compatibility stub for legacy task-instance workflows.
This is not a production execution surface for activity-core. It persists a
task_instances row with status=done and returns immediately so legacy/dev
flows keep their idempotency behavior. Real task execution belongs in
per-repo workers or a future execution-owned repo/workplan, not here.
task_id is derived deterministically from the workflow's own ID so
persist_task_instance retries remain idempotent.
"""
@workflow.run
async def run(self, run_id: str, task_type: str, params: dict) -> dict:
# Keep the stub idempotent without implying task lifecycle ownership.
task_id = str(
uuid.uuid5(uuid.NAMESPACE_URL, workflow.info().workflow_id)
)
workflow.logger.info(
"TaskExecutorWorkflow started",
extra={"run_id": run_id, "task_type": task_type, "task_id": task_id},
)
await workflow.execute_activity(
persist_task_instance,
{
"id": task_id,
"run_id": run_id,
"type": task_type,
"params": params,
"status": "done",
},
task_queue=_TASK_QUEUE,
start_to_close_timeout=_ACTIVITY_TIMEOUT,
retry_policy=_RETRY_POLICY,
)
return {"task_id": task_id, "status": "done"}