feat(e2e): add e2e contract and test script (closes T21)

CUST-WP-0028-T03/T04:
- e2e/e2e.yml: declares stack (docker-compose.dev.yml), Temporal UI
  health check, test command
- e2e/tests/test_full_flow.py: automates WP-0001 T21 — seeds DB, starts
  workers, triggers RunActivityWorkflow, polls completion, asserts
  ActivityRun + TaskInstances written to DB

Run via: make e2e REPO=activity-core  (from ~/the-custodian)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-27 00:52:47 +01:00
parent d6fcb1398a
commit 8d8a353901
3 changed files with 251 additions and 1 deletions

12
e2e/e2e.yml Normal file
View File

@@ -0,0 +1,12 @@
# activity-core e2e test contract
# Run via: make e2e REPO=activity-core (from ~/the-custodian)
name: activity-core
compose_file: docker-compose.dev.yml
health_checks:
- name: temporal-ui
url: http://localhost:8080
timeout: 180 # Temporal + Elasticsearch startup is slow (~2min)
test_command: >-
uv run python e2e/tests/test_full_flow.py
timeout: 300
cleanup: always

238
e2e/tests/test_full_flow.py Normal file
View File

@@ -0,0 +1,238 @@
#!/usr/bin/env python3
"""
activity-core E2E test — full RunActivityWorkflow flow.
Closes WP-0001 T21 (Manual end-to-end test).
Requires the docker compose stack to be running (docker-compose.dev.yml).
Run via:
uv run python e2e/tests/test_full_flow.py
Or via the custodian e2e framework:
make e2e REPO=activity-core (from ~/the-custodian)
Exit codes: 0 = all steps passed, 1 = one or more steps failed.
"""
from __future__ import annotations
import asyncio
import os
import subprocess
import sys
import time
import uuid
from pathlib import Path
# ── deps (temporalio + sqlalchemy pulled in via uv) ──────────────────────────
import sqlalchemy as sa
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from temporalio.client import Client, WorkflowExecutionStatus
# ── config ───────────────────────────────────────────────────────────────────
TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233")
ACTCORE_DB_URL = os.environ.get(
"ACTCORE_DB_URL",
"postgresql+asyncpg://actcore:actcore@localhost:5433/actcore",
)
SEED_DEFINITION_ID = "00000000-0000-0000-0000-000000000001"
REPO_ROOT = Path(__file__).parent.parent.parent # activity-core/
WORKER_STARTUP_TIMEOUT = 15 # seconds
# ── helpers ──────────────────────────────────────────────────────────────────
def step(label: str, ok: bool, detail: str = "") -> bool:
status = "PASS" if ok else "FAIL"
line = f" [{status}] {label}"
if detail:
line += f"{detail}"
print(line)
return ok
def check(label: str, condition: bool, detail: str = "") -> bool:
return step(label, condition, detail)
# ── test steps ───────────────────────────────────────────────────────────────
async def run_migrations() -> bool:
proc = subprocess.run(
["uv", "run", "alembic", "upgrade", "head"],
capture_output=True, text=True, cwd=REPO_ROOT,
env={**os.environ, "ACTCORE_DB_URL": ACTCORE_DB_URL},
)
ok = proc.returncode == 0
return step("DB migrations", ok, proc.stderr.strip().splitlines()[-1] if proc.stderr.strip() else "")
async def seed_db() -> bool:
proc = subprocess.run(
["uv", "run", "python", "-m", "activity_core.seed"],
capture_output=True, text=True, cwd=REPO_ROOT,
env={**os.environ, "ACTCORE_DB_URL": ACTCORE_DB_URL},
)
ok = proc.returncode == 0
out = (proc.stdout + proc.stderr).strip().splitlines()
return step("Seed ActivityDefinition", ok, out[-1] if out else "")
def start_workers() -> subprocess.Popen:
env = {
**os.environ,
"TEMPORAL_HOST": TEMPORAL_HOST,
"ACTCORE_DB_URL": ACTCORE_DB_URL,
}
proc = subprocess.Popen(
["uv", "run", "python", "-m", "activity_core.worker"],
cwd=REPO_ROOT, env=env,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True,
)
return proc
async def wait_for_workers(proc: subprocess.Popen) -> bool:
"""Poll worker stdout until 'Workers running' or timeout."""
deadline = time.time() + WORKER_STARTUP_TIMEOUT
while time.time() < deadline:
line = proc.stdout.readline() if proc.stdout else ""
if "Workers running" in line:
return step("Workers started", True, line.strip())
if proc.poll() is not None:
return step("Workers started", False, "process exited unexpectedly")
await asyncio.sleep(0.2)
return step("Workers started", False, f"timeout after {WORKER_STARTUP_TIMEOUT}s")
async def trigger_workflow(client: Client) -> tuple[bool, str | None]:
trigger_key = f"e2e-{uuid.uuid4()}"
workflow_id = f"activity-{SEED_DEFINITION_ID}:{trigger_key}"
try:
handle = await client.start_workflow(
"RunActivityWorkflow",
args=[SEED_DEFINITION_ID, trigger_key, None],
id=workflow_id,
task_queue="orchestrator-tq",
)
ok = step("Trigger RunActivityWorkflow", True, f"id={workflow_id}")
return ok, handle.id
except Exception as exc:
step("Trigger RunActivityWorkflow", False, str(exc))
return False, None
async def wait_for_completion(client: Client, workflow_id: str, timeout: int = 30) -> tuple[bool, dict | None]:
deadline = time.time() + timeout
while time.time() < deadline:
handle = client.get_workflow_handle(workflow_id)
desc = await handle.describe()
if desc.status == WorkflowExecutionStatus.COMPLETED:
result = await handle.result()
ok = step("Workflow completed", True, f"run_id={result.get('run_id')}, tasks_spawned={result.get('tasks_spawned')}")
return ok, result
if desc.status in (WorkflowExecutionStatus.FAILED, WorkflowExecutionStatus.TIMED_OUT, WorkflowExecutionStatus.TERMINATED):
step("Workflow completed", False, f"status={desc.status}")
return False, None
await asyncio.sleep(1)
step("Workflow completed", False, f"timeout after {timeout}s")
return False, None
async def assert_run_in_db(run_id: str) -> bool:
engine = create_async_engine(ACTCORE_DB_URL)
try:
async with AsyncSession(engine) as session:
row = await session.execute(
sa.text("SELECT run_id, tasks_spawned FROM activity_runs WHERE run_id = :rid"),
{"rid": run_id},
)
record = row.fetchone()
ok = record is not None
detail = f"tasks_spawned={record.tasks_spawned}" if record else "not found"
return step("ActivityRun written to DB", ok, detail)
finally:
await engine.dispose()
async def assert_task_instances_in_db(run_id: str) -> bool:
engine = create_async_engine(ACTCORE_DB_URL)
try:
async with AsyncSession(engine) as session:
result = await session.execute(
sa.text("SELECT COUNT(*) FROM task_instances WHERE run_id = :rid"),
{"rid": run_id},
)
count = result.scalar()
ok = count > 0
return step("TaskInstances written to DB", ok, f"count={count}")
finally:
await engine.dispose()
# ── main ─────────────────────────────────────────────────────────────────────
async def main() -> int:
print("\nactivity-core E2E — full RunActivityWorkflow flow")
print("=" * 55)
results: list[bool] = []
worker_proc: subprocess.Popen | None = None
try:
# 1. Migrations
results.append(await run_migrations())
if not results[-1]:
return 1
# 2. Seed
results.append(await seed_db())
if not results[-1]:
return 1
# 3. Workers
worker_proc = start_workers()
results.append(await wait_for_workers(worker_proc))
if not results[-1]:
return 1
# 4. Temporal connection
try:
client = await Client.connect(TEMPORAL_HOST)
results.append(step("Temporal client connected", True, TEMPORAL_HOST))
except Exception as exc:
results.append(step("Temporal client connected", False, str(exc)))
return 1
# 5. Trigger workflow
ok, workflow_id = await trigger_workflow(client)
results.append(ok)
if not ok:
return 1
# 6. Wait for completion
ok, result = await wait_for_completion(client, workflow_id)
results.append(ok)
if not ok:
return 1
run_id = result["run_id"]
# 7. Assert DB
results.append(await assert_run_in_db(run_id))
results.append(await assert_task_instances_in_db(run_id))
finally:
if worker_proc and worker_proc.poll() is None:
worker_proc.terminate()
worker_proc.wait(timeout=5)
print("=" * 55)
passed = sum(results)
total = len(results)
all_ok = all(results)
print(f"{'PASSED' if all_ok else 'FAILED'} {passed}/{total} steps passed\n")
return 0 if all_ok else 1
if __name__ == "__main__":
sys.exit(asyncio.run(main()))

View File

@@ -88,7 +88,7 @@ tasks:
state_hub_task_id: 1da921f5-86a8-488f-a015-402079194e10
- id: T21
title: Manual end-to-end test
status: todo
status: done
state_hub_task_id: f72bba1a-eb24-496e-9498-f4676facc5c9
---