From 8d8a353901ff3a4bed82cc988532e71b423b8760 Mon Sep 17 00:00:00 2001 From: tegwick Date: Fri, 27 Mar 2026 00:52:47 +0100 Subject: [PATCH] feat(e2e): add e2e contract and test script (closes T21) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CUST-WP-0028-T03/T04: - e2e/e2e.yml: declares stack (docker-compose.dev.yml), Temporal UI health check, test command - e2e/tests/test_full_flow.py: automates WP-0001 T21 — seeds DB, starts workers, triggers RunActivityWorkflow, polls completion, asserts ActivityRun + TaskInstances written to DB Run via: make e2e REPO=activity-core (from ~/the-custodian) Co-Authored-By: Claude Sonnet 4.6 --- e2e/e2e.yml | 12 + e2e/tests/test_full_flow.py | 238 ++++++++++++++++++ .../custodian-WP-0001-temporal-backbone.md | 2 +- 3 files changed, 251 insertions(+), 1 deletion(-) create mode 100644 e2e/e2e.yml create mode 100644 e2e/tests/test_full_flow.py diff --git a/e2e/e2e.yml b/e2e/e2e.yml new file mode 100644 index 0000000..06f90ea --- /dev/null +++ b/e2e/e2e.yml @@ -0,0 +1,12 @@ +# activity-core e2e test contract +# Run via: make e2e REPO=activity-core (from ~/the-custodian) +name: activity-core +compose_file: docker-compose.dev.yml +health_checks: + - name: temporal-ui + url: http://localhost:8080 + timeout: 180 # Temporal + Elasticsearch startup is slow (~2min) +test_command: >- + uv run python e2e/tests/test_full_flow.py +timeout: 300 +cleanup: always diff --git a/e2e/tests/test_full_flow.py b/e2e/tests/test_full_flow.py new file mode 100644 index 0000000..ccc0b8e --- /dev/null +++ b/e2e/tests/test_full_flow.py @@ -0,0 +1,238 @@ +#!/usr/bin/env python3 +""" +activity-core E2E test — full RunActivityWorkflow flow. + +Closes WP-0001 T21 (Manual end-to-end test). + +Requires the docker compose stack to be running (docker-compose.dev.yml). +Run via: + uv run python e2e/tests/test_full_flow.py + +Or via the custodian e2e framework: + make e2e REPO=activity-core (from ~/the-custodian) + +Exit codes: 0 = all steps passed, 1 = one or more steps failed. +""" +from __future__ import annotations + +import asyncio +import os +import subprocess +import sys +import time +import uuid +from pathlib import Path + +# ── deps (temporalio + sqlalchemy pulled in via uv) ────────────────────────── +import sqlalchemy as sa +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine +from temporalio.client import Client, WorkflowExecutionStatus + +# ── config ─────────────────────────────────────────────────────────────────── +TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233") +ACTCORE_DB_URL = os.environ.get( + "ACTCORE_DB_URL", + "postgresql+asyncpg://actcore:actcore@localhost:5433/actcore", +) +SEED_DEFINITION_ID = "00000000-0000-0000-0000-000000000001" +REPO_ROOT = Path(__file__).parent.parent.parent # activity-core/ +WORKER_STARTUP_TIMEOUT = 15 # seconds + + +# ── helpers ────────────────────────────────────────────────────────────────── + +def step(label: str, ok: bool, detail: str = "") -> bool: + status = "PASS" if ok else "FAIL" + line = f" [{status}] {label}" + if detail: + line += f" — {detail}" + print(line) + return ok + + +def check(label: str, condition: bool, detail: str = "") -> bool: + return step(label, condition, detail) + + +# ── test steps ─────────────────────────────────────────────────────────────── + +async def run_migrations() -> bool: + proc = subprocess.run( + ["uv", "run", "alembic", "upgrade", "head"], + capture_output=True, text=True, cwd=REPO_ROOT, + env={**os.environ, "ACTCORE_DB_URL": ACTCORE_DB_URL}, + ) + ok = proc.returncode == 0 + return step("DB migrations", ok, proc.stderr.strip().splitlines()[-1] if proc.stderr.strip() else "") + + +async def seed_db() -> bool: + proc = subprocess.run( + ["uv", "run", "python", "-m", "activity_core.seed"], + capture_output=True, text=True, cwd=REPO_ROOT, + env={**os.environ, "ACTCORE_DB_URL": ACTCORE_DB_URL}, + ) + ok = proc.returncode == 0 + out = (proc.stdout + proc.stderr).strip().splitlines() + return step("Seed ActivityDefinition", ok, out[-1] if out else "") + + +def start_workers() -> subprocess.Popen: + env = { + **os.environ, + "TEMPORAL_HOST": TEMPORAL_HOST, + "ACTCORE_DB_URL": ACTCORE_DB_URL, + } + proc = subprocess.Popen( + ["uv", "run", "python", "-m", "activity_core.worker"], + cwd=REPO_ROOT, env=env, + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, + ) + return proc + + +async def wait_for_workers(proc: subprocess.Popen) -> bool: + """Poll worker stdout until 'Workers running' or timeout.""" + deadline = time.time() + WORKER_STARTUP_TIMEOUT + while time.time() < deadline: + line = proc.stdout.readline() if proc.stdout else "" + if "Workers running" in line: + return step("Workers started", True, line.strip()) + if proc.poll() is not None: + return step("Workers started", False, "process exited unexpectedly") + await asyncio.sleep(0.2) + return step("Workers started", False, f"timeout after {WORKER_STARTUP_TIMEOUT}s") + + +async def trigger_workflow(client: Client) -> tuple[bool, str | None]: + trigger_key = f"e2e-{uuid.uuid4()}" + workflow_id = f"activity-{SEED_DEFINITION_ID}:{trigger_key}" + try: + handle = await client.start_workflow( + "RunActivityWorkflow", + args=[SEED_DEFINITION_ID, trigger_key, None], + id=workflow_id, + task_queue="orchestrator-tq", + ) + ok = step("Trigger RunActivityWorkflow", True, f"id={workflow_id}") + return ok, handle.id + except Exception as exc: + step("Trigger RunActivityWorkflow", False, str(exc)) + return False, None + + +async def wait_for_completion(client: Client, workflow_id: str, timeout: int = 30) -> tuple[bool, dict | None]: + deadline = time.time() + timeout + while time.time() < deadline: + handle = client.get_workflow_handle(workflow_id) + desc = await handle.describe() + if desc.status == WorkflowExecutionStatus.COMPLETED: + result = await handle.result() + ok = step("Workflow completed", True, f"run_id={result.get('run_id')}, tasks_spawned={result.get('tasks_spawned')}") + return ok, result + if desc.status in (WorkflowExecutionStatus.FAILED, WorkflowExecutionStatus.TIMED_OUT, WorkflowExecutionStatus.TERMINATED): + step("Workflow completed", False, f"status={desc.status}") + return False, None + await asyncio.sleep(1) + step("Workflow completed", False, f"timeout after {timeout}s") + return False, None + + +async def assert_run_in_db(run_id: str) -> bool: + engine = create_async_engine(ACTCORE_DB_URL) + try: + async with AsyncSession(engine) as session: + row = await session.execute( + sa.text("SELECT run_id, tasks_spawned FROM activity_runs WHERE run_id = :rid"), + {"rid": run_id}, + ) + record = row.fetchone() + ok = record is not None + detail = f"tasks_spawned={record.tasks_spawned}" if record else "not found" + return step("ActivityRun written to DB", ok, detail) + finally: + await engine.dispose() + + +async def assert_task_instances_in_db(run_id: str) -> bool: + engine = create_async_engine(ACTCORE_DB_URL) + try: + async with AsyncSession(engine) as session: + result = await session.execute( + sa.text("SELECT COUNT(*) FROM task_instances WHERE run_id = :rid"), + {"rid": run_id}, + ) + count = result.scalar() + ok = count > 0 + return step("TaskInstances written to DB", ok, f"count={count}") + finally: + await engine.dispose() + + +# ── main ───────────────────────────────────────────────────────────────────── + +async def main() -> int: + print("\nactivity-core E2E — full RunActivityWorkflow flow") + print("=" * 55) + + results: list[bool] = [] + worker_proc: subprocess.Popen | None = None + + try: + # 1. Migrations + results.append(await run_migrations()) + if not results[-1]: + return 1 + + # 2. Seed + results.append(await seed_db()) + if not results[-1]: + return 1 + + # 3. Workers + worker_proc = start_workers() + results.append(await wait_for_workers(worker_proc)) + if not results[-1]: + return 1 + + # 4. Temporal connection + try: + client = await Client.connect(TEMPORAL_HOST) + results.append(step("Temporal client connected", True, TEMPORAL_HOST)) + except Exception as exc: + results.append(step("Temporal client connected", False, str(exc))) + return 1 + + # 5. Trigger workflow + ok, workflow_id = await trigger_workflow(client) + results.append(ok) + if not ok: + return 1 + + # 6. Wait for completion + ok, result = await wait_for_completion(client, workflow_id) + results.append(ok) + if not ok: + return 1 + + run_id = result["run_id"] + + # 7. Assert DB + results.append(await assert_run_in_db(run_id)) + results.append(await assert_task_instances_in_db(run_id)) + + finally: + if worker_proc and worker_proc.poll() is None: + worker_proc.terminate() + worker_proc.wait(timeout=5) + + print("=" * 55) + passed = sum(results) + total = len(results) + all_ok = all(results) + print(f"{'PASSED' if all_ok else 'FAILED'} {passed}/{total} steps passed\n") + return 0 if all_ok else 1 + + +if __name__ == "__main__": + sys.exit(asyncio.run(main())) diff --git a/workplans/custodian-WP-0001-temporal-backbone.md b/workplans/custodian-WP-0001-temporal-backbone.md index 045b80a..ce3aff2 100644 --- a/workplans/custodian-WP-0001-temporal-backbone.md +++ b/workplans/custodian-WP-0001-temporal-backbone.md @@ -88,7 +88,7 @@ tasks: state_hub_task_id: 1da921f5-86a8-488f-a015-402079194e10 - id: T21 title: Manual end-to-end test - status: todo + status: done state_hub_task_id: f72bba1a-eb24-496e-9498-f4676facc5c9 ---