#!/usr/bin/env python3 """Verify the daily State Hub triage activity run. The default mode is ``--dry-run`` so operators can see the exact checks without needing live Temporal, Postgres, or State Hub access from the current shell. Pass ``--live`` to run the cheap checks directly. """ from __future__ import annotations import argparse import asyncio import json import os import sys from datetime import datetime, timezone from pathlib import Path from typing import Any from uuid import UUID DEFAULT_ACTIVITY_NAME = "Daily State Hub WSJF Triage" DEFAULT_PROGRESS_EVENT_TYPE = "daily_triage" DEFAULT_TEMPORAL_HOST = "localhost:7233" DEFAULT_TEMPORAL_NAMESPACE = "default" DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000" DEFAULT_WORKING_MEMORY_DIR = "/home/worsch/the-custodian/memory/working" def parse_args(argv: list[str] | None = None) -> argparse.Namespace: parser = argparse.ArgumentParser( description="Verify whether today's daily State Hub triage run happened.", ) parser.add_argument("--activity-id", default=os.environ.get("DAILY_TRIAGE_ACTIVITY_ID")) parser.add_argument("--activity-name", default=os.environ.get( "DAILY_TRIAGE_ACTIVITY_NAME", DEFAULT_ACTIVITY_NAME, )) parser.add_argument("--db-url", default=os.environ.get("ACTCORE_DB_URL")) parser.add_argument("--temporal-host", default=os.environ.get( "TEMPORAL_HOST", DEFAULT_TEMPORAL_HOST, )) parser.add_argument("--temporal-namespace", default=os.environ.get( "TEMPORAL_NAMESPACE", DEFAULT_TEMPORAL_NAMESPACE, )) parser.add_argument("--state-hub-url", default=os.environ.get( "STATE_HUB_URL", DEFAULT_STATE_HUB_URL, )) parser.add_argument("--progress-event-type", default=DEFAULT_PROGRESS_EVENT_TYPE) parser.add_argument("--working-memory-dir", default=os.environ.get( "DAILY_TRIAGE_WORKING_MEMORY_DIR", DEFAULT_WORKING_MEMORY_DIR, )) parser.add_argument( "--date", default=datetime.now(timezone.utc).date().isoformat(), help="Local report date to check, formatted YYYY-MM-DD.", ) parser.add_argument( "--live", action="store_true", help="Run live checks. Without this flag the script prints a dry-run checklist.", ) return parser.parse_args(argv) def build_dry_run_report(args: argparse.Namespace) -> dict[str, Any]: activity_ref = args.activity_id or ( f'' ) schedule_id = f"activity-schedule-{activity_ref}" db_filter = ( f"activity_runs.activity_id = '{args.activity_id}'" if args.activity_id else f"activity_definitions.name = '{args.activity_name}'" ) activity_def_filter = ( f"id = '{args.activity_id}'" if args.activity_id else f"name = '{args.activity_name}'" ) return { "mode": "dry-run", "generated_at": datetime.now(timezone.utc).isoformat(), "activity": { "id": args.activity_id, "name": args.activity_name, "schedule_id": schedule_id, }, "checks": [ { "name": "temporal_schedule", "expect": "Schedule exists, is not paused, and uses SKIP overlap for misfire_policy=skip.", "command": ( "temporal schedule describe " f"--schedule-id {schedule_id} " f"--address {args.temporal_host} " f"--namespace {args.temporal_namespace}" ), }, { "name": "latest_workflow_history", "expect": "Latest workflow has ActivityId search attribute and completed or is retrying visibly.", "command": ( "temporal workflow list " f"--query 'ActivityId=\"{activity_ref}\"' " f"--address {args.temporal_host} " f"--namespace {args.temporal_namespace}" ), }, { "name": "activity_runs_row", "expect": "Latest activity_runs row exists for today's scheduled_for or fired_at date.", "sql": ( "select run_id, scheduled_for, fired_at, tasks_spawned, version_used " "from activity_runs join activity_definitions on " "activity_runs.activity_id = activity_definitions.id " f"where {db_filter} " "order by fired_at desc limit 5;" ), }, { "name": "state_hub_progress", "expect": f"State Hub progress contains event_type={args.progress_event_type!r} with this run id.", "command": ( f"curl -s {args.state_hub_url.rstrip('/')}/progress/?limit=100" ), }, { "name": "working_memory_note", "expect": "A daily-triage note exists and its frontmatter carries activity_core_run_id.", "path_glob": str( Path(args.working_memory_dir) / f"daily-triage-{args.date}-*.md" ), }, { "name": "llm_timeout_budget", "expect": "Instruction model/max_tokens fit within ACTIVITY_TIMEOUT_SECONDS and Temporal retries.", "sql": ( "select name, instructions_json, version from activity_definitions " f"where {activity_def_filter};" ), "activity_timeout_seconds": int(os.environ.get( "ACTIVITY_TIMEOUT_SECONDS", "900", )), "retry_attempts": 10, }, ], } async def build_live_report(args: argparse.Namespace) -> dict[str, Any]: if not args.db_url: raise RuntimeError("ACTCORE_DB_URL or --db-url is required for --live") activity = await _resolve_activity(args) activity_id = str(activity["id"]) args.activity_id = activity_id dry = build_dry_run_report(args) dry["mode"] = "live" dry["results"] = { "activity_definition": _json_ready(activity), "temporal": await _check_temporal(args, activity_id), "activity_runs": await _latest_activity_runs(args, activity_id), "state_hub_progress": await _state_hub_progress(args), "working_memory_notes": _working_memory_notes(args), } return dry async def _resolve_activity(args: argparse.Namespace) -> dict[str, Any]: from sqlalchemy import text from activity_core.db import make_engine engine = make_engine(args.db_url) try: async with engine.connect() as conn: if args.activity_id: result = await conn.execute( text( "select id, name, enabled, trigger_config, instructions_json, version " "from activity_definitions where id = :activity_id" ), {"activity_id": args.activity_id}, ) else: result = await conn.execute( text( "select id, name, enabled, trigger_config, instructions_json, version " "from activity_definitions where name = :activity_name " "order by updated_at desc limit 1" ), {"activity_name": args.activity_name}, ) row = result.mappings().first() if row is None: raise RuntimeError("daily triage ActivityDefinition was not found") return dict(row) finally: await engine.dispose() async def _latest_activity_runs( args: argparse.Namespace, activity_id: str, ) -> list[dict[str, Any]]: from sqlalchemy import text from activity_core.db import make_engine engine = make_engine(args.db_url) try: async with engine.connect() as conn: result = await conn.execute( text( "select run_id, scheduled_for, fired_at, tasks_spawned, version_used " "from activity_runs where activity_id = :activity_id " "order by fired_at desc limit 5" ), {"activity_id": activity_id}, ) return [_json_ready(dict(row)) for row in result.mappings().all()] finally: await engine.dispose() async def _check_temporal(args: argparse.Namespace, activity_id: str) -> dict[str, Any]: from temporalio.client import Client schedule_id = f"activity-schedule-{activity_id}" client = await Client.connect( args.temporal_host, namespace=args.temporal_namespace, ) handle = client.get_schedule_handle(schedule_id) schedule = await handle.describe() workflows = [] query = f'ActivityId="{activity_id}"' async for item in client.list_workflows(query=query): workflows.append({ "id": item.id, "run_id": item.run_id, "status": str(item.status), "start_time": _iso(getattr(item, "start_time", None)), "close_time": _iso(getattr(item, "close_time", None)), }) if len(workflows) >= 5: break state = getattr(schedule.schedule, "state", None) policy = getattr(schedule.schedule, "policy", None) return { "schedule_id": schedule_id, "paused": getattr(state, "paused", None), "overlap_policy": str(getattr(policy, "overlap", "")), "latest_workflows": workflows, } async def _state_hub_progress(args: argparse.Namespace) -> list[dict[str, Any]]: import httpx base = args.state_hub_url.rstrip("/") async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(f"{base}/progress/", params={"limit": 100}) response.raise_for_status() items = response.json() if not isinstance(items, list): return [] return [ _json_ready(item) for item in items if item.get("event_type") == args.progress_event_type ][:5] def _working_memory_notes(args: argparse.Namespace) -> list[str]: directory = Path(args.working_memory_dir) pattern = f"daily-triage-{args.date}-*.md" if not directory.exists(): return [] return [str(path) for path in sorted(directory.glob(pattern))] def _json_ready(value: Any) -> Any: if isinstance(value, dict): return {key: _json_ready(item) for key, item in value.items()} if isinstance(value, list): return [_json_ready(item) for item in value] if isinstance(value, datetime): return value.isoformat() if isinstance(value, UUID): return str(value) return value def _iso(value: Any) -> str | None: return value.isoformat() if hasattr(value, "isoformat") else None def main(argv: list[str] | None = None) -> int: args = parse_args(argv) try: if args.live: report = asyncio.run(build_live_report(args)) else: report = build_dry_run_report(args) except Exception as exc: print(f"verify_daily_triage: {exc}", file=sys.stderr) return 2 print(json.dumps(report, indent=2, sort_keys=True)) return 0 if __name__ == "__main__": raise SystemExit(main())