Add schedule smoke test routine

This commit is contained in:
2026-06-06 15:32:57 +02:00
parent e926636617
commit 418eb4ffda
8 changed files with 472 additions and 3 deletions

View File

@@ -129,6 +129,24 @@ This reconciles all Temporal Schedules with the `activity_definitions` table:
- Creates paused schedules for disabled cron definitions
- Deletes orphaned schedules with no matching DB row
After adding or changing a recurring ActivityDefinition or workflow activity
wiring, run a smoke schedule before trusting the next real fire:
```bash
ACTCORE_DB_URL=postgresql+asyncpg://actcore:actcore@localhost:5433/actcore \
TEMPORAL_HOST=localhost:7233 \
uv run python scripts/smoke_test_schedule.py \
--activity-id <activity-definition-uuid> \
--recreate-recurring
```
The smoke command deletes and recreates the recurring Temporal Schedule when
`--recreate-recurring` is set, creates a distinct one-shot smoke Schedule one
minute in the future, waits for the smoke workflow to complete, and exits
non-zero if the workflow fails or times out. Use this after worker deployments
that add workflow imports or new activities; it catches stale-worker and missing
activity registration issues before the next scheduled run.
---
## Temporal UI — filtering by activity

229
scripts/smoke_test_schedule.py Executable file
View File

@@ -0,0 +1,229 @@
#!/usr/bin/env python3
"""Recreate and smoke-test a recurring ActivityDefinition schedule.
The smoke test creates a distinct one-shot Temporal Schedule that fires once
after a short delay and starts the same RunActivityWorkflow. It is meant for
operator use after adding or changing scheduled workflow actions.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import sys
from datetime import datetime, timedelta, timezone
from typing import Any
from uuid import UUID
from temporalio.client import Client
from temporalio.service import RPCError
DEFAULT_TEMPORAL_HOST = "localhost:7233"
DEFAULT_TEMPORAL_NAMESPACE = "default"
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Schedule a one-shot smoke run for a recurring ActivityDefinition.",
)
parser.add_argument("--activity-id", required=True)
parser.add_argument("--db-url", default=os.environ.get("ACTCORE_DB_URL"))
parser.add_argument("--temporal-host", default=os.environ.get(
"TEMPORAL_HOST",
DEFAULT_TEMPORAL_HOST,
))
parser.add_argument("--temporal-namespace", default=os.environ.get(
"TEMPORAL_NAMESPACE",
DEFAULT_TEMPORAL_NAMESPACE,
))
parser.add_argument("--delay-seconds", type=int, default=60)
parser.add_argument("--timeout-seconds", type=int, default=600)
parser.add_argument(
"--recreate-recurring",
action="store_true",
help="Delete and recreate the recurring schedule before the smoke run.",
)
parser.add_argument(
"--keep-smoke-schedule",
action="store_true",
help="Leave the one-shot smoke schedule in Temporal after waiting.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print the planned smoke test without contacting Temporal or DB.",
)
return parser.parse_args(argv)
def build_dry_run_report(args: argparse.Namespace) -> dict[str, Any]:
return {
"mode": "dry-run",
"activity_id": args.activity_id,
"temporal_host": args.temporal_host,
"temporal_namespace": args.temporal_namespace,
"recreate_recurring": bool(args.recreate_recurring),
"delay_seconds": args.delay_seconds,
"timeout_seconds": args.timeout_seconds,
"checks": [
"load ActivityDefinition from Postgres",
"optionally delete and recreate the recurring Temporal Schedule",
"create a one-shot smoke Temporal Schedule one minute in the future",
"wait for the smoke workflow to complete",
"return non-zero if the smoke workflow fails or times out",
],
}
async def _load_activity(db_url: str, activity_id: str):
from sqlalchemy import select
from sqlalchemy.ext.asyncio import async_sessionmaker
from activity_core.db import make_engine
from activity_core.models import ActivityDefinition
from activity_core.orm import ActivityDefinition as ActivityDefinitionRow
engine = make_engine(db_url)
session_factory = async_sessionmaker(engine, expire_on_commit=False)
try:
async with session_factory() as session:
row = await session.scalar(
select(ActivityDefinitionRow).where(
ActivityDefinitionRow.id == UUID(activity_id)
)
)
if row is None:
raise RuntimeError(f"ActivityDefinition {activity_id!r} was not found")
return ActivityDefinition.model_validate({
"id": row.id,
"name": row.name,
"enabled": row.enabled,
"trigger_config": row.trigger_config,
"context_sources": row.context_sources,
"task_templates": row.task_templates,
"dedupe_key_strategy": row.dedupe_key_strategy,
"version": row.version,
})
finally:
await engine.dispose()
async def _wait_for_workflow(
client: Client,
workflow_id_prefix: str,
timeout_seconds: int,
) -> dict[str, Any]:
deadline = asyncio.get_running_loop().time() + timeout_seconds
last_error: str | None = None
while asyncio.get_running_loop().time() < deadline:
try:
query = f'WorkflowId STARTS_WITH "{workflow_id_prefix}"'
item = None
async for candidate in client.list_workflows(query=query):
if candidate.id.startswith(workflow_id_prefix):
item = candidate
break
if item is None:
raise RuntimeError(f"workflow not found for prefix: {workflow_id_prefix}")
handle = client.get_workflow_handle(item.id, run_id=item.run_id)
desc = await handle.describe()
except (RPCError, RuntimeError) as exc:
last_error = str(exc)
await asyncio.sleep(2)
continue
status = str(desc.status)
if status == "2":
return {
"workflow_id": item.id,
"run_id": item.run_id,
"status": "completed",
"result": await handle.result(),
}
if status != "1":
return {
"workflow_id": item.id,
"run_id": item.run_id,
"status": status,
"error": f"smoke workflow ended with status {status}",
}
await asyncio.sleep(2)
return {
"workflow_id_prefix": workflow_id_prefix,
"status": "timeout",
"error": last_error or "smoke workflow did not complete before timeout",
}
async def run_live(args: argparse.Namespace) -> dict[str, Any]:
if not args.db_url:
raise RuntimeError("ACTCORE_DB_URL or --db-url is required")
from activity_core.schedule_manager import (
delete_schedule,
delete_smoke_test_schedule,
schedule_id,
schedule_smoke_test,
smoke_schedule_id,
upsert_schedule,
)
defn = await _load_activity(args.db_url, args.activity_id)
client = await Client.connect(
args.temporal_host,
namespace=args.temporal_namespace,
)
if args.recreate_recurring:
await delete_schedule(client, args.activity_id)
await upsert_schedule(client, defn)
smoke_sid, workflow_id_prefix, fire_at = await schedule_smoke_test(
client,
defn,
delay=timedelta(seconds=args.delay_seconds),
)
wait_result: dict[str, Any]
try:
wait_result = await _wait_for_workflow(
client,
workflow_id_prefix,
args.timeout_seconds,
)
finally:
if not args.keep_smoke_schedule:
await delete_smoke_test_schedule(client, args.activity_id)
return {
"mode": "live",
"activity_id": args.activity_id,
"activity_name": defn.name,
"recurring_schedule_id": schedule_id(args.activity_id),
"smoke_schedule_id": smoke_sid or smoke_schedule_id(args.activity_id),
"smoke_workflow_id_prefix": workflow_id_prefix,
"smoke_fire_at": fire_at.isoformat(),
"recreate_recurring": bool(args.recreate_recurring),
"wait_result": wait_result,
}
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv)
try:
if args.dry_run:
report = build_dry_run_report(args)
else:
report = asyncio.run(run_live(args))
except Exception as exc:
print(f"smoke_test_schedule: {exc}", file=sys.stderr)
return 2
print(json.dumps(report, indent=2, sort_keys=True))
wait_result = report.get("wait_result") or {}
return 0 if wait_result.get("status") in (None, "completed") else 1
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -22,6 +22,7 @@ logger = logging.getLogger(__name__)
# Matches {field.path} placeholders in prompt templates.
_PLACEHOLDER_RE = re.compile(r"\{([a-zA-Z_][a-zA-Z0-9_.]*)\}")
_FENCED_JSON_RE = re.compile(r"^```(?:json)?\s*(.*?)\s*```\s*$", re.DOTALL)
class UntrustedFieldError(ValueError):
@@ -240,7 +241,7 @@ def _invalid_output_report(
raw_preview: str | None = None
if isinstance(raw_output, str):
try:
partial_output = json.loads(raw_output)
partial_output = _parse_json_output(raw_output)
except json.JSONDecodeError:
partial_output = None
raw_preview = raw_output[:4000]
@@ -282,7 +283,7 @@ def _validate_output(
"""
try:
if isinstance(raw_output, str):
data = json.loads(raw_output)
data = _parse_json_output(raw_output)
else:
data = raw_output
@@ -328,6 +329,29 @@ def _validate_output(
return [], None, str(exc)
def _parse_json_output(raw_output: str) -> Any:
"""Parse JSON output, accepting a single Markdown JSON fence when present."""
text = raw_output.strip()
try:
return json.loads(text)
except json.JSONDecodeError as original_error:
fence_match = _FENCED_JSON_RE.match(text)
if fence_match:
return json.loads(fence_match.group(1).strip())
decoder = json.JSONDecoder()
for marker in ("{", "["):
start = text.find(marker)
if start < 0:
continue
try:
data, _ = decoder.raw_decode(text[start:])
except json.JSONDecodeError:
continue
return data
raise original_error
def _validate_against_schema(data: Any, schema_path: str) -> str | None:
if not schema_path:
return None

View File

@@ -51,6 +51,11 @@ def schedule_id(activity_id: str | UUID) -> str:
return f"activity-schedule-{activity_id}"
def smoke_schedule_id(activity_id: str | UUID) -> str:
"""Return the one-shot smoke-test Schedule ID for an ActivityDefinition."""
return f"activity-smoke-test-{activity_id}"
def _overlap_policy(misfire_policy: str) -> ScheduleOverlapPolicy:
return _MISFIRE_TO_OVERLAP.get(misfire_policy, ScheduleOverlapPolicy.SKIP)
@@ -128,6 +133,55 @@ def _build_onetime_schedule(defn: ActivityDefinition) -> tuple[str, Schedule]:
return sid, Schedule(action=action, spec=spec, state=state)
def _build_smoke_test_schedule(
defn: ActivityDefinition,
fire_at: datetime,
) -> tuple[str, str, Schedule]:
"""Build a one-shot smoke Schedule for an enabled cron ActivityDefinition."""
if not isinstance(defn.trigger_config, CronTriggerConfig):
raise ValueError("schedule smoke tests require trigger_type='cron'")
if not defn.enabled:
raise ValueError("schedule smoke tests require an enabled ActivityDefinition")
at = fire_at.astimezone(timezone.utc)
token = at.strftime("%Y%m%dT%H%M%SZ")
workflow_id_prefix = f"activity-{defn.id}:smoke-{token}"
trigger_key = f"schedule-smoke-{token}"
action = ScheduleActionStartWorkflow(
"RunActivityWorkflow",
args=[str(defn.id), trigger_key, at.isoformat(), None],
id=workflow_id_prefix,
task_queue=_ORCHESTRATOR_TASK_QUEUE,
)
spec = ScheduleSpec(
calendars=[
ScheduleCalendarSpec(
second=[ScheduleRange(at.second)],
minute=[ScheduleRange(at.minute)],
hour=[ScheduleRange(at.hour)],
day_of_month=[ScheduleRange(at.day)],
month=[ScheduleRange(at.month)],
year=[ScheduleRange(at.year)],
)
],
time_zone_name="UTC",
)
state = ScheduleState(
limited_actions=True,
remaining_actions=1,
paused=False,
)
return (
smoke_schedule_id(defn.id),
workflow_id_prefix,
Schedule(action=action, spec=spec, state=state),
)
async def cancel_scheduled(client: Client, activity_id: str | UUID) -> None:
"""Delete the one-off Temporal Schedule for a ScheduledTriggerConfig definition.
@@ -140,6 +194,45 @@ async def cancel_scheduled(client: Client, activity_id: str | UUID) -> None:
pass
async def schedule_smoke_test(
client: Client,
defn: ActivityDefinition,
*,
delay: timedelta = timedelta(minutes=1),
now: datetime | None = None,
) -> tuple[str, str, datetime]:
"""Schedule a one-shot smoke run for a recurring ActivityDefinition.
Returns ``(schedule_id, workflow_id_prefix, fire_at)``. Temporal appends
the scheduled fire time to workflow IDs created by schedules.
"""
base = now or datetime.now(tz=timezone.utc)
if base.tzinfo is None:
base = base.replace(tzinfo=timezone.utc)
fire_at = (base + delay).astimezone(timezone.utc)
sid, workflow_id_prefix, sched = _build_smoke_test_schedule(defn, fire_at)
try:
await client.create_schedule(sid, sched)
except (RPCError, ScheduleAlreadyRunningError):
handle = client.get_schedule_handle(sid)
async def _updater_smoke(inp: ScheduleUpdateInput) -> ScheduleUpdate: # noqa: ARG001
return ScheduleUpdate(schedule=sched)
await handle.update(_updater_smoke)
await handle.unpause()
return sid, workflow_id_prefix, fire_at
async def delete_smoke_test_schedule(client: Client, activity_id: str | UUID) -> None:
"""Delete the smoke-test Schedule for the given activity_id if present."""
handle = client.get_schedule_handle(smoke_schedule_id(activity_id))
try:
await handle.delete()
except RPCError:
pass
async def upsert_schedule(client: Client, defn: ActivityDefinition) -> ScheduleHandle:
"""Create or update a Temporal Schedule for a cron or scheduled ActivityDefinition.

View File

@@ -338,6 +338,27 @@ def test_execute_instruction_with_audit_accepts_report_payload():
assert result.output_validated is True
def test_execute_instruction_with_audit_accepts_fenced_report_payload():
report_data = {
"summary": "State Hub has loose ends.",
"recommendations": [{"action": "revisit", "candidate": "CUST-WP-0045"}],
}
llm = _CountingLLM([f"```json\n{json.dumps(report_data)}\n```"])
instr = _instr(
id="daily-triage-report",
prompt="Report.",
trusted_fields=[],
output_schema="schemas/daily-triage-report.json",
)
result = execute_instruction_with_audit(instr, _Event(), {}, llm)
assert result.tasks == []
assert result.report == report_data
assert result.output_validated is True
assert llm.call_count == 1
def test_execute_instruction_with_audit_rejects_invalid_report_schema():
report_data = {"summary": "Missing recommendations."}
llm = _CountingLLM([json.dumps(report_data), json.dumps(report_data)])

View File

@@ -13,6 +13,7 @@ from __future__ import annotations
import asyncio
import uuid
from datetime import datetime, timedelta, timezone
import pytest
from temporalio.client import ScheduleOverlapPolicy
@@ -21,8 +22,11 @@ from temporalio.testing import WorkflowEnvironment
from activity_core.models import ActivityDefinition, CronTriggerConfig
from activity_core.schedule_manager import (
delete_schedule,
delete_smoke_test_schedule,
list_schedules,
schedule_id,
schedule_smoke_test,
smoke_schedule_id,
upsert_schedule,
)
@@ -180,3 +184,30 @@ async def test_misfire_policy_compress_sets_overlap_buffer_one(env: WorkflowEnvi
assert desc.schedule.policy.overlap == ScheduleOverlapPolicy.BUFFER_ONE
await delete_schedule(env.client, defn.id)
@pytest.mark.asyncio
async def test_schedule_smoke_test_creates_one_shot_schedule(
env: WorkflowEnvironment,
) -> None:
defn = _make_defn()
fire_base = datetime(2026, 6, 6, 12, 0, tzinfo=timezone.utc)
sid, workflow_id, fire_at = await schedule_smoke_test(
env.client,
defn,
delay=timedelta(minutes=1),
now=fire_base,
)
assert sid == smoke_schedule_id(defn.id)
assert workflow_id == f"activity-{defn.id}:smoke-20260606T120100Z"
assert fire_at == datetime(2026, 6, 6, 12, 1, tzinfo=timezone.utc)
handle = env.client.get_schedule_handle(sid)
desc = await handle.describe()
assert desc.schedule.state.limited_actions is True
assert desc.schedule.state.remaining_actions == 1
assert desc.schedule.spec.time_zone_name == "UTC"
await delete_smoke_test_schedule(env.client, defn.id)

View File

@@ -0,0 +1,32 @@
from __future__ import annotations
import importlib.util
from pathlib import Path
def _load_script():
path = Path(__file__).parent.parent / "scripts" / "smoke_test_schedule.py"
spec = importlib.util.spec_from_file_location("smoke_test_schedule", path)
assert spec is not None
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module)
return module
def test_schedule_smoke_script_dry_run_contract() -> None:
script = _load_script()
args = script.parse_args([
"--activity-id",
"00000000-0000-0000-0000-000000000123",
"--recreate-recurring",
"--dry-run",
])
report = script.build_dry_run_report(args)
assert report["mode"] == "dry-run"
assert report["activity_id"] == "00000000-0000-0000-0000-000000000123"
assert report["recreate_recurring"] is True
assert report["delay_seconds"] == 60
assert "create a one-shot smoke Temporal Schedule one minute in the future" in report["checks"]

View File

@@ -8,7 +8,7 @@ status: active
owner: codex
topic_slug: custodian
created: "2026-06-03"
updated: "2026-06-05"
updated: "2026-06-06"
state_hub_workstream_id: "5646e13a-13af-4724-bca6-3c0d86f96733"
---
@@ -89,6 +89,17 @@ no report because the live schema required `wsjf` fields and the stale DB prompt
did not request them. The verifier default and runbook now point at the live
working-memory sink path, `/home/worsch/the-custodian/memory/working`.
2026-06-06: Added a schedule smoke-test routine for new or changed recurring
ActivityDefinitions. Operators can recreate the recurring Temporal Schedule,
schedule a one-shot smoke run one minute in the future, wait for completion,
and get a non-zero warning if workflow imports, activity registration, or
runtime wiring are broken.
2026-06-06: Exercised the routine against the daily triage definition. The
daily recurring Temporal Schedule was deleted and recreated, then a one-shot
smoke workflow completed with run id `c2db32e5-3874-522f-ae1f-9b2cdf307fd2`
and emitted a validated `daily_triage` report plus working-memory note.
## Three-Run Calibration Feedback
```task
@@ -117,6 +128,12 @@ scheduled daily triage runs, but this task still requires three consecutive
actual activity-core scheduled runs and State Hub calibration feedback. Local
tests cannot substitute for that operational evidence.
2026-06-06: The scheduled run fired at 07:20 Europe/Berlin but initially stuck
on a stale worker import error after ops-evidence wiring landed. Restarting the
worker let Temporal complete the run, and the hardened report path emitted a
validation-failure note instead of losing the evidence. This run is useful
calibration input, but it is not a clean consecutive scheduled success.
## Rule Action Contract Documentation
```task
@@ -177,6 +194,10 @@ validation-failure report containing bounded partial output instead of a silent
empty result. Report sinks include validation metadata in working-memory
frontmatter and State Hub progress detail.
2026-06-06: Hardened instruction output parsing to accept a single Markdown
JSON fence when the fenced content is valid JSON, while preserving the
validation-failure artifact path for genuinely invalid output.
## Issue-Core Emission Boundary Verification
```task