#!/usr/bin/env python3 """ activity-core E2E test — full RunActivityWorkflow flow. Closes WP-0001 T21 (Manual end-to-end test). Requires the docker compose stack to be running (docker-compose.dev.yml). Run via: uv run python e2e/tests/test_full_flow.py Or via the custodian e2e framework: make e2e REPO=activity-core (from ~/the-custodian) Exit codes: 0 = all steps passed, 1 = one or more steps failed. """ from __future__ import annotations import asyncio import os import subprocess import sys import time import uuid from pathlib import Path # ── deps (temporalio + sqlalchemy pulled in via uv) ────────────────────────── import sqlalchemy as sa from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from temporalio.client import Client, WorkflowExecutionStatus # ── config ─────────────────────────────────────────────────────────────────── TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233") ACTCORE_DB_URL = os.environ.get( "ACTCORE_DB_URL", "postgresql+asyncpg://actcore:actcore@localhost:5433/actcore", ) SEED_DEFINITION_ID = "00000000-0000-0000-0000-000000000001" REPO_ROOT = Path(__file__).parent.parent.parent # activity-core/ WORKER_STARTUP_TIMEOUT = 15 # seconds # ── helpers ────────────────────────────────────────────────────────────────── def step(label: str, ok: bool, detail: str = "") -> bool: status = "PASS" if ok else "FAIL" line = f" [{status}] {label}" if detail: line += f" — {detail}" print(line) return ok def check(label: str, condition: bool, detail: str = "") -> bool: return step(label, condition, detail) # ── test steps ─────────────────────────────────────────────────────────────── async def run_migrations() -> bool: proc = subprocess.run( ["uv", "run", "alembic", "upgrade", "head"], capture_output=True, text=True, cwd=REPO_ROOT, env={**os.environ, "ACTCORE_DB_URL": ACTCORE_DB_URL}, ) ok = proc.returncode == 0 return step("DB migrations", ok, proc.stderr.strip().splitlines()[-1] if proc.stderr.strip() else "") async def seed_db() -> bool: proc = subprocess.run( ["uv", "run", "python", "-m", "activity_core.seed"], capture_output=True, text=True, cwd=REPO_ROOT, env={**os.environ, "ACTCORE_DB_URL": ACTCORE_DB_URL}, ) ok = proc.returncode == 0 out = (proc.stdout + proc.stderr).strip().splitlines() return step("Seed ActivityDefinition", ok, out[-1] if out else "") def start_workers() -> subprocess.Popen: env = { **os.environ, "TEMPORAL_HOST": TEMPORAL_HOST, "ACTCORE_DB_URL": ACTCORE_DB_URL, } proc = subprocess.Popen( ["uv", "run", "python", "-m", "activity_core.worker"], cwd=REPO_ROOT, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, ) return proc async def wait_for_workers(proc: subprocess.Popen) -> bool: """Poll worker stdout until 'Workers running' or timeout.""" deadline = time.time() + WORKER_STARTUP_TIMEOUT while time.time() < deadline: line = proc.stdout.readline() if proc.stdout else "" if "Workers running" in line: return step("Workers started", True, line.strip()) if proc.poll() is not None: return step("Workers started", False, "process exited unexpectedly") await asyncio.sleep(0.2) return step("Workers started", False, f"timeout after {WORKER_STARTUP_TIMEOUT}s") async def trigger_workflow(client: Client) -> tuple[bool, str | None]: trigger_key = f"e2e-{uuid.uuid4()}" workflow_id = f"activity-{SEED_DEFINITION_ID}:{trigger_key}" try: handle = await client.start_workflow( "RunActivityWorkflow", args=[SEED_DEFINITION_ID, trigger_key, None], id=workflow_id, task_queue="orchestrator-tq", ) ok = step("Trigger RunActivityWorkflow", True, f"id={workflow_id}") return ok, handle.id except Exception as exc: step("Trigger RunActivityWorkflow", False, str(exc)) return False, None async def wait_for_completion(client: Client, workflow_id: str, timeout: int = 30) -> tuple[bool, dict | None]: deadline = time.time() + timeout while time.time() < deadline: handle = client.get_workflow_handle(workflow_id) desc = await handle.describe() if desc.status == WorkflowExecutionStatus.COMPLETED: result = await handle.result() ok = step("Workflow completed", True, f"run_id={result.get('run_id')}, tasks_spawned={result.get('tasks_spawned')}") return ok, result if desc.status in (WorkflowExecutionStatus.FAILED, WorkflowExecutionStatus.TIMED_OUT, WorkflowExecutionStatus.TERMINATED): step("Workflow completed", False, f"status={desc.status}") return False, None await asyncio.sleep(1) step("Workflow completed", False, f"timeout after {timeout}s") return False, None async def assert_run_in_db(run_id: str) -> bool: engine = create_async_engine(ACTCORE_DB_URL) try: async with AsyncSession(engine) as session: row = await session.execute( sa.text("SELECT run_id, tasks_spawned FROM activity_runs WHERE run_id = :rid"), {"rid": run_id}, ) record = row.fetchone() ok = record is not None detail = f"tasks_spawned={record.tasks_spawned}" if record else "not found" return step("ActivityRun written to DB", ok, detail) finally: await engine.dispose() async def assert_task_instances_in_db(run_id: str) -> bool: engine = create_async_engine(ACTCORE_DB_URL) try: async with AsyncSession(engine) as session: result = await session.execute( sa.text("SELECT COUNT(*) FROM task_instances WHERE run_id = :rid"), {"rid": run_id}, ) count = result.scalar() ok = count > 0 return step("TaskInstances written to DB", ok, f"count={count}") finally: await engine.dispose() # ── main ───────────────────────────────────────────────────────────────────── async def main() -> int: print("\nactivity-core E2E — full RunActivityWorkflow flow") print("=" * 55) results: list[bool] = [] worker_proc: subprocess.Popen | None = None try: # 1. Migrations results.append(await run_migrations()) if not results[-1]: return 1 # 2. Seed results.append(await seed_db()) if not results[-1]: return 1 # 3. Workers worker_proc = start_workers() results.append(await wait_for_workers(worker_proc)) if not results[-1]: return 1 # 4. Temporal connection try: client = await Client.connect(TEMPORAL_HOST) results.append(step("Temporal client connected", True, TEMPORAL_HOST)) except Exception as exc: results.append(step("Temporal client connected", False, str(exc))) return 1 # 5. Trigger workflow ok, workflow_id = await trigger_workflow(client) results.append(ok) if not ok: return 1 # 6. Wait for completion ok, result = await wait_for_completion(client, workflow_id) results.append(ok) if not ok: return 1 run_id = result["run_id"] # 7. Assert DB results.append(await assert_run_in_db(run_id)) results.append(await assert_task_instances_in_db(run_id)) finally: if worker_proc and worker_proc.poll() is None: worker_proc.terminate() worker_proc.wait(timeout=5) print("=" * 55) passed = sum(results) total = len(results) all_ok = all(results) print(f"{'PASSED' if all_ok else 'FAILED'} {passed}/{total} steps passed\n") return 0 if all_ok else 1 if __name__ == "__main__": sys.exit(asyncio.run(main()))