diff --git a/docs/runbook.md b/docs/runbook.md index 49b2894..daa733f 100644 --- a/docs/runbook.md +++ b/docs/runbook.md @@ -129,6 +129,24 @@ This reconciles all Temporal Schedules with the `activity_definitions` table: - Creates paused schedules for disabled cron definitions - Deletes orphaned schedules with no matching DB row +After adding or changing a recurring ActivityDefinition or workflow activity +wiring, run a smoke schedule before trusting the next real fire: + +```bash +ACTCORE_DB_URL=postgresql+asyncpg://actcore:actcore@localhost:5433/actcore \ +TEMPORAL_HOST=localhost:7233 \ + uv run python scripts/smoke_test_schedule.py \ + --activity-id \ + --recreate-recurring +``` + +The smoke command deletes and recreates the recurring Temporal Schedule when +`--recreate-recurring` is set, creates a distinct one-shot smoke Schedule one +minute in the future, waits for the smoke workflow to complete, and exits +non-zero if the workflow fails or times out. Use this after worker deployments +that add workflow imports or new activities; it catches stale-worker and missing +activity registration issues before the next scheduled run. + --- ## Temporal UI — filtering by activity diff --git a/scripts/smoke_test_schedule.py b/scripts/smoke_test_schedule.py new file mode 100755 index 0000000..db0fd00 --- /dev/null +++ b/scripts/smoke_test_schedule.py @@ -0,0 +1,229 @@ +#!/usr/bin/env python3 +"""Recreate and smoke-test a recurring ActivityDefinition schedule. + +The smoke test creates a distinct one-shot Temporal Schedule that fires once +after a short delay and starts the same RunActivityWorkflow. It is meant for +operator use after adding or changing scheduled workflow actions. +""" + +from __future__ import annotations + +import argparse +import asyncio +import json +import os +import sys +from datetime import datetime, timedelta, timezone +from typing import Any +from uuid import UUID + +from temporalio.client import Client +from temporalio.service import RPCError + +DEFAULT_TEMPORAL_HOST = "localhost:7233" +DEFAULT_TEMPORAL_NAMESPACE = "default" + + +def parse_args(argv: list[str] | None = None) -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Schedule a one-shot smoke run for a recurring ActivityDefinition.", + ) + parser.add_argument("--activity-id", required=True) + parser.add_argument("--db-url", default=os.environ.get("ACTCORE_DB_URL")) + parser.add_argument("--temporal-host", default=os.environ.get( + "TEMPORAL_HOST", + DEFAULT_TEMPORAL_HOST, + )) + parser.add_argument("--temporal-namespace", default=os.environ.get( + "TEMPORAL_NAMESPACE", + DEFAULT_TEMPORAL_NAMESPACE, + )) + parser.add_argument("--delay-seconds", type=int, default=60) + parser.add_argument("--timeout-seconds", type=int, default=600) + parser.add_argument( + "--recreate-recurring", + action="store_true", + help="Delete and recreate the recurring schedule before the smoke run.", + ) + parser.add_argument( + "--keep-smoke-schedule", + action="store_true", + help="Leave the one-shot smoke schedule in Temporal after waiting.", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Print the planned smoke test without contacting Temporal or DB.", + ) + return parser.parse_args(argv) + + +def build_dry_run_report(args: argparse.Namespace) -> dict[str, Any]: + return { + "mode": "dry-run", + "activity_id": args.activity_id, + "temporal_host": args.temporal_host, + "temporal_namespace": args.temporal_namespace, + "recreate_recurring": bool(args.recreate_recurring), + "delay_seconds": args.delay_seconds, + "timeout_seconds": args.timeout_seconds, + "checks": [ + "load ActivityDefinition from Postgres", + "optionally delete and recreate the recurring Temporal Schedule", + "create a one-shot smoke Temporal Schedule one minute in the future", + "wait for the smoke workflow to complete", + "return non-zero if the smoke workflow fails or times out", + ], + } + + +async def _load_activity(db_url: str, activity_id: str): + from sqlalchemy import select + from sqlalchemy.ext.asyncio import async_sessionmaker + + from activity_core.db import make_engine + from activity_core.models import ActivityDefinition + from activity_core.orm import ActivityDefinition as ActivityDefinitionRow + + engine = make_engine(db_url) + session_factory = async_sessionmaker(engine, expire_on_commit=False) + try: + async with session_factory() as session: + row = await session.scalar( + select(ActivityDefinitionRow).where( + ActivityDefinitionRow.id == UUID(activity_id) + ) + ) + if row is None: + raise RuntimeError(f"ActivityDefinition {activity_id!r} was not found") + return ActivityDefinition.model_validate({ + "id": row.id, + "name": row.name, + "enabled": row.enabled, + "trigger_config": row.trigger_config, + "context_sources": row.context_sources, + "task_templates": row.task_templates, + "dedupe_key_strategy": row.dedupe_key_strategy, + "version": row.version, + }) + finally: + await engine.dispose() + + +async def _wait_for_workflow( + client: Client, + workflow_id_prefix: str, + timeout_seconds: int, +) -> dict[str, Any]: + deadline = asyncio.get_running_loop().time() + timeout_seconds + last_error: str | None = None + while asyncio.get_running_loop().time() < deadline: + try: + query = f'WorkflowId STARTS_WITH "{workflow_id_prefix}"' + item = None + async for candidate in client.list_workflows(query=query): + if candidate.id.startswith(workflow_id_prefix): + item = candidate + break + if item is None: + raise RuntimeError(f"workflow not found for prefix: {workflow_id_prefix}") + handle = client.get_workflow_handle(item.id, run_id=item.run_id) + desc = await handle.describe() + except (RPCError, RuntimeError) as exc: + last_error = str(exc) + await asyncio.sleep(2) + continue + + status = str(desc.status) + if status == "2": + return { + "workflow_id": item.id, + "run_id": item.run_id, + "status": "completed", + "result": await handle.result(), + } + if status != "1": + return { + "workflow_id": item.id, + "run_id": item.run_id, + "status": status, + "error": f"smoke workflow ended with status {status}", + } + await asyncio.sleep(2) + + return { + "workflow_id_prefix": workflow_id_prefix, + "status": "timeout", + "error": last_error or "smoke workflow did not complete before timeout", + } + + +async def run_live(args: argparse.Namespace) -> dict[str, Any]: + if not args.db_url: + raise RuntimeError("ACTCORE_DB_URL or --db-url is required") + + from activity_core.schedule_manager import ( + delete_schedule, + delete_smoke_test_schedule, + schedule_id, + schedule_smoke_test, + smoke_schedule_id, + upsert_schedule, + ) + + defn = await _load_activity(args.db_url, args.activity_id) + client = await Client.connect( + args.temporal_host, + namespace=args.temporal_namespace, + ) + + if args.recreate_recurring: + await delete_schedule(client, args.activity_id) + await upsert_schedule(client, defn) + + smoke_sid, workflow_id_prefix, fire_at = await schedule_smoke_test( + client, + defn, + delay=timedelta(seconds=args.delay_seconds), + ) + wait_result: dict[str, Any] + try: + wait_result = await _wait_for_workflow( + client, + workflow_id_prefix, + args.timeout_seconds, + ) + finally: + if not args.keep_smoke_schedule: + await delete_smoke_test_schedule(client, args.activity_id) + + return { + "mode": "live", + "activity_id": args.activity_id, + "activity_name": defn.name, + "recurring_schedule_id": schedule_id(args.activity_id), + "smoke_schedule_id": smoke_sid or smoke_schedule_id(args.activity_id), + "smoke_workflow_id_prefix": workflow_id_prefix, + "smoke_fire_at": fire_at.isoformat(), + "recreate_recurring": bool(args.recreate_recurring), + "wait_result": wait_result, + } + + +def main(argv: list[str] | None = None) -> int: + args = parse_args(argv) + try: + if args.dry_run: + report = build_dry_run_report(args) + else: + report = asyncio.run(run_live(args)) + except Exception as exc: + print(f"smoke_test_schedule: {exc}", file=sys.stderr) + return 2 + print(json.dumps(report, indent=2, sort_keys=True)) + wait_result = report.get("wait_result") or {} + return 0 if wait_result.get("status") in (None, "completed") else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/activity_core/rules/executor.py b/src/activity_core/rules/executor.py index aa2c9ff..8f115ac 100644 --- a/src/activity_core/rules/executor.py +++ b/src/activity_core/rules/executor.py @@ -22,6 +22,7 @@ logger = logging.getLogger(__name__) # Matches {field.path} placeholders in prompt templates. _PLACEHOLDER_RE = re.compile(r"\{([a-zA-Z_][a-zA-Z0-9_.]*)\}") +_FENCED_JSON_RE = re.compile(r"^```(?:json)?\s*(.*?)\s*```\s*$", re.DOTALL) class UntrustedFieldError(ValueError): @@ -240,7 +241,7 @@ def _invalid_output_report( raw_preview: str | None = None if isinstance(raw_output, str): try: - partial_output = json.loads(raw_output) + partial_output = _parse_json_output(raw_output) except json.JSONDecodeError: partial_output = None raw_preview = raw_output[:4000] @@ -282,7 +283,7 @@ def _validate_output( """ try: if isinstance(raw_output, str): - data = json.loads(raw_output) + data = _parse_json_output(raw_output) else: data = raw_output @@ -328,6 +329,29 @@ def _validate_output( return [], None, str(exc) +def _parse_json_output(raw_output: str) -> Any: + """Parse JSON output, accepting a single Markdown JSON fence when present.""" + text = raw_output.strip() + try: + return json.loads(text) + except json.JSONDecodeError as original_error: + fence_match = _FENCED_JSON_RE.match(text) + if fence_match: + return json.loads(fence_match.group(1).strip()) + + decoder = json.JSONDecoder() + for marker in ("{", "["): + start = text.find(marker) + if start < 0: + continue + try: + data, _ = decoder.raw_decode(text[start:]) + except json.JSONDecodeError: + continue + return data + raise original_error + + def _validate_against_schema(data: Any, schema_path: str) -> str | None: if not schema_path: return None diff --git a/src/activity_core/schedule_manager.py b/src/activity_core/schedule_manager.py index 4692680..bcacbbe 100644 --- a/src/activity_core/schedule_manager.py +++ b/src/activity_core/schedule_manager.py @@ -51,6 +51,11 @@ def schedule_id(activity_id: str | UUID) -> str: return f"activity-schedule-{activity_id}" +def smoke_schedule_id(activity_id: str | UUID) -> str: + """Return the one-shot smoke-test Schedule ID for an ActivityDefinition.""" + return f"activity-smoke-test-{activity_id}" + + def _overlap_policy(misfire_policy: str) -> ScheduleOverlapPolicy: return _MISFIRE_TO_OVERLAP.get(misfire_policy, ScheduleOverlapPolicy.SKIP) @@ -128,6 +133,55 @@ def _build_onetime_schedule(defn: ActivityDefinition) -> tuple[str, Schedule]: return sid, Schedule(action=action, spec=spec, state=state) +def _build_smoke_test_schedule( + defn: ActivityDefinition, + fire_at: datetime, +) -> tuple[str, str, Schedule]: + """Build a one-shot smoke Schedule for an enabled cron ActivityDefinition.""" + if not isinstance(defn.trigger_config, CronTriggerConfig): + raise ValueError("schedule smoke tests require trigger_type='cron'") + if not defn.enabled: + raise ValueError("schedule smoke tests require an enabled ActivityDefinition") + + at = fire_at.astimezone(timezone.utc) + token = at.strftime("%Y%m%dT%H%M%SZ") + workflow_id_prefix = f"activity-{defn.id}:smoke-{token}" + trigger_key = f"schedule-smoke-{token}" + + action = ScheduleActionStartWorkflow( + "RunActivityWorkflow", + args=[str(defn.id), trigger_key, at.isoformat(), None], + id=workflow_id_prefix, + task_queue=_ORCHESTRATOR_TASK_QUEUE, + ) + + spec = ScheduleSpec( + calendars=[ + ScheduleCalendarSpec( + second=[ScheduleRange(at.second)], + minute=[ScheduleRange(at.minute)], + hour=[ScheduleRange(at.hour)], + day_of_month=[ScheduleRange(at.day)], + month=[ScheduleRange(at.month)], + year=[ScheduleRange(at.year)], + ) + ], + time_zone_name="UTC", + ) + + state = ScheduleState( + limited_actions=True, + remaining_actions=1, + paused=False, + ) + + return ( + smoke_schedule_id(defn.id), + workflow_id_prefix, + Schedule(action=action, spec=spec, state=state), + ) + + async def cancel_scheduled(client: Client, activity_id: str | UUID) -> None: """Delete the one-off Temporal Schedule for a ScheduledTriggerConfig definition. @@ -140,6 +194,45 @@ async def cancel_scheduled(client: Client, activity_id: str | UUID) -> None: pass +async def schedule_smoke_test( + client: Client, + defn: ActivityDefinition, + *, + delay: timedelta = timedelta(minutes=1), + now: datetime | None = None, +) -> tuple[str, str, datetime]: + """Schedule a one-shot smoke run for a recurring ActivityDefinition. + + Returns ``(schedule_id, workflow_id_prefix, fire_at)``. Temporal appends + the scheduled fire time to workflow IDs created by schedules. + """ + base = now or datetime.now(tz=timezone.utc) + if base.tzinfo is None: + base = base.replace(tzinfo=timezone.utc) + fire_at = (base + delay).astimezone(timezone.utc) + sid, workflow_id_prefix, sched = _build_smoke_test_schedule(defn, fire_at) + try: + await client.create_schedule(sid, sched) + except (RPCError, ScheduleAlreadyRunningError): + handle = client.get_schedule_handle(sid) + + async def _updater_smoke(inp: ScheduleUpdateInput) -> ScheduleUpdate: # noqa: ARG001 + return ScheduleUpdate(schedule=sched) + + await handle.update(_updater_smoke) + await handle.unpause() + return sid, workflow_id_prefix, fire_at + + +async def delete_smoke_test_schedule(client: Client, activity_id: str | UUID) -> None: + """Delete the smoke-test Schedule for the given activity_id if present.""" + handle = client.get_schedule_handle(smoke_schedule_id(activity_id)) + try: + await handle.delete() + except RPCError: + pass + + async def upsert_schedule(client: Client, defn: ActivityDefinition) -> ScheduleHandle: """Create or update a Temporal Schedule for a cron or scheduled ActivityDefinition. diff --git a/tests/rules/test_executor.py b/tests/rules/test_executor.py index 28fc499..04c5d47 100644 --- a/tests/rules/test_executor.py +++ b/tests/rules/test_executor.py @@ -338,6 +338,27 @@ def test_execute_instruction_with_audit_accepts_report_payload(): assert result.output_validated is True +def test_execute_instruction_with_audit_accepts_fenced_report_payload(): + report_data = { + "summary": "State Hub has loose ends.", + "recommendations": [{"action": "revisit", "candidate": "CUST-WP-0045"}], + } + llm = _CountingLLM([f"```json\n{json.dumps(report_data)}\n```"]) + instr = _instr( + id="daily-triage-report", + prompt="Report.", + trusted_fields=[], + output_schema="schemas/daily-triage-report.json", + ) + + result = execute_instruction_with_audit(instr, _Event(), {}, llm) + + assert result.tasks == [] + assert result.report == report_data + assert result.output_validated is True + assert llm.call_count == 1 + + def test_execute_instruction_with_audit_rejects_invalid_report_schema(): report_data = {"summary": "Missing recommendations."} llm = _CountingLLM([json.dumps(report_data), json.dumps(report_data)]) diff --git a/tests/test_schedule_lifecycle.py b/tests/test_schedule_lifecycle.py index 3af0d37..6a37d44 100644 --- a/tests/test_schedule_lifecycle.py +++ b/tests/test_schedule_lifecycle.py @@ -13,6 +13,7 @@ from __future__ import annotations import asyncio import uuid +from datetime import datetime, timedelta, timezone import pytest from temporalio.client import ScheduleOverlapPolicy @@ -21,8 +22,11 @@ from temporalio.testing import WorkflowEnvironment from activity_core.models import ActivityDefinition, CronTriggerConfig from activity_core.schedule_manager import ( delete_schedule, + delete_smoke_test_schedule, list_schedules, schedule_id, + schedule_smoke_test, + smoke_schedule_id, upsert_schedule, ) @@ -180,3 +184,30 @@ async def test_misfire_policy_compress_sets_overlap_buffer_one(env: WorkflowEnvi assert desc.schedule.policy.overlap == ScheduleOverlapPolicy.BUFFER_ONE await delete_schedule(env.client, defn.id) + + +@pytest.mark.asyncio +async def test_schedule_smoke_test_creates_one_shot_schedule( + env: WorkflowEnvironment, +) -> None: + defn = _make_defn() + fire_base = datetime(2026, 6, 6, 12, 0, tzinfo=timezone.utc) + + sid, workflow_id, fire_at = await schedule_smoke_test( + env.client, + defn, + delay=timedelta(minutes=1), + now=fire_base, + ) + + assert sid == smoke_schedule_id(defn.id) + assert workflow_id == f"activity-{defn.id}:smoke-20260606T120100Z" + assert fire_at == datetime(2026, 6, 6, 12, 1, tzinfo=timezone.utc) + + handle = env.client.get_schedule_handle(sid) + desc = await handle.describe() + assert desc.schedule.state.limited_actions is True + assert desc.schedule.state.remaining_actions == 1 + assert desc.schedule.spec.time_zone_name == "UTC" + + await delete_smoke_test_schedule(env.client, defn.id) diff --git a/tests/test_schedule_smoke_script.py b/tests/test_schedule_smoke_script.py new file mode 100644 index 0000000..5399ca1 --- /dev/null +++ b/tests/test_schedule_smoke_script.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +import importlib.util +from pathlib import Path + + +def _load_script(): + path = Path(__file__).parent.parent / "scripts" / "smoke_test_schedule.py" + spec = importlib.util.spec_from_file_location("smoke_test_schedule", path) + assert spec is not None + module = importlib.util.module_from_spec(spec) + assert spec.loader is not None + spec.loader.exec_module(module) + return module + + +def test_schedule_smoke_script_dry_run_contract() -> None: + script = _load_script() + args = script.parse_args([ + "--activity-id", + "00000000-0000-0000-0000-000000000123", + "--recreate-recurring", + "--dry-run", + ]) + + report = script.build_dry_run_report(args) + + assert report["mode"] == "dry-run" + assert report["activity_id"] == "00000000-0000-0000-0000-000000000123" + assert report["recreate_recurring"] is True + assert report["delay_seconds"] == 60 + assert "create a one-shot smoke Temporal Schedule one minute in the future" in report["checks"] diff --git a/workplans/ACTIVITY-WP-0006-post-triage-operational-hardening.md b/workplans/ACTIVITY-WP-0006-post-triage-operational-hardening.md index 18a08ba..f8ade1b 100644 --- a/workplans/ACTIVITY-WP-0006-post-triage-operational-hardening.md +++ b/workplans/ACTIVITY-WP-0006-post-triage-operational-hardening.md @@ -8,7 +8,7 @@ status: active owner: codex topic_slug: custodian created: "2026-06-03" -updated: "2026-06-05" +updated: "2026-06-06" state_hub_workstream_id: "5646e13a-13af-4724-bca6-3c0d86f96733" --- @@ -89,6 +89,17 @@ no report because the live schema required `wsjf` fields and the stale DB prompt did not request them. The verifier default and runbook now point at the live working-memory sink path, `/home/worsch/the-custodian/memory/working`. +2026-06-06: Added a schedule smoke-test routine for new or changed recurring +ActivityDefinitions. Operators can recreate the recurring Temporal Schedule, +schedule a one-shot smoke run one minute in the future, wait for completion, +and get a non-zero warning if workflow imports, activity registration, or +runtime wiring are broken. + +2026-06-06: Exercised the routine against the daily triage definition. The +daily recurring Temporal Schedule was deleted and recreated, then a one-shot +smoke workflow completed with run id `c2db32e5-3874-522f-ae1f-9b2cdf307fd2` +and emitted a validated `daily_triage` report plus working-memory note. + ## Three-Run Calibration Feedback ```task @@ -117,6 +128,12 @@ scheduled daily triage runs, but this task still requires three consecutive actual activity-core scheduled runs and State Hub calibration feedback. Local tests cannot substitute for that operational evidence. +2026-06-06: The scheduled run fired at 07:20 Europe/Berlin but initially stuck +on a stale worker import error after ops-evidence wiring landed. Restarting the +worker let Temporal complete the run, and the hardened report path emitted a +validation-failure note instead of losing the evidence. This run is useful +calibration input, but it is not a clean consecutive scheduled success. + ## Rule Action Contract Documentation ```task @@ -177,6 +194,10 @@ validation-failure report containing bounded partial output instead of a silent empty result. Report sinks include validation metadata in working-memory frontmatter and State Hub progress detail. +2026-06-06: Hardened instruction output parsing to accept a single Markdown +JSON fence when the fenced content is valid JSON, while preserving the +validation-failure artifact path for genuinely invalid output. + ## Issue-Core Emission Boundary Verification ```task