Add automation status surface

This commit is contained in:
2026-07-01 20:12:04 +02:00
parent 3f85274916
commit ffe10f098e
20 changed files with 1732 additions and 11 deletions

View File

@@ -366,6 +366,7 @@ async def evaluate_instructions(payload: dict) -> dict:
"output_validated": result.output_validated,
"review_required": result.review_required,
"validation_error": result.validation_error,
"llm_response_metadata": result.llm_response_metadata,
})
for spec in result.tasks:
task_specs.append({

View File

@@ -0,0 +1,811 @@
"""Repo-native automation status reporting without LLM calls."""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import sys
import uuid
from datetime import date, datetime, time, timedelta, timezone
from pathlib import Path
from typing import Any
from zoneinfo import ZoneInfo
import httpx
import yaml
from sqlalchemy import bindparam, text
from activity_core.db import make_engine
from activity_core.definition_parser import scan_and_parse
from activity_core.schedule_manager import schedule_id
from activity_core.sync_activity_definitions import ACTIVITY_DEFINITION_ID_NAMESPACE
DEFAULT_TIMEZONE = "Europe/Berlin"
DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000"
DEFAULT_WORKING_MEMORY_DIR = "/home/worsch/the-custodian/memory/working"
DEFAULT_TEMPORAL_NAMESPACE = "default"
FAILURE_STATUSES = {"missed", "validation_failed", "sink_failed"}
WEEKDAYS = {
"monday": 0,
"tuesday": 1,
"wednesday": 2,
"thursday": 3,
"friday": 4,
"saturday": 5,
"sunday": 6,
}
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Report recent activity-core automation status without LLMs."
)
parser.add_argument(
"--since",
default=os.environ.get("SINCE", "today"),
help="Window start: YYYY-MM-DD, ISO datetime, today, yesterday, friday, or last-friday.",
)
parser.add_argument(
"--until",
default=os.environ.get("UNTIL"),
help="Window end. Defaults to now; date-only values use that day's end.",
)
parser.add_argument("--timezone", default=os.environ.get("AUTOMATION_STATUS_TIMEZONE", DEFAULT_TIMEZONE))
parser.add_argument("--activity-id", action="append", default=[])
parser.add_argument("--activity-name", action="append", default=[])
parser.add_argument("--db-url", default=os.environ.get("ACTCORE_DB_URL"))
parser.add_argument("--state-hub-url", default=os.environ.get("STATE_HUB_URL", DEFAULT_STATE_HUB_URL))
parser.add_argument("--working-memory-dir", default=os.environ.get("AUTOMATION_STATUS_WORKING_MEMORY_DIR", DEFAULT_WORKING_MEMORY_DIR))
parser.add_argument("--temporal-host", default=os.environ.get("TEMPORAL_HOST"))
parser.add_argument("--temporal-namespace", default=os.environ.get("TEMPORAL_NAMESPACE", DEFAULT_TEMPORAL_NAMESPACE))
parser.add_argument("--timeout-seconds", type=float, default=float(os.environ.get("AUTOMATION_STATUS_TIMEOUT_SECONDS", "5")))
parser.add_argument("--progress-limit", type=int, default=int(os.environ.get("AUTOMATION_STATUS_PROGRESS_LIMIT", "100")))
parser.add_argument("--progress-event-type", action="append", default=None, help="State Hub progress event type to read; repeatable. Use all for the unfiltered feed.")
parser.add_argument("--format", choices=("human", "json"), default=os.environ.get("FORMAT", "human"))
return parser.parse_args(argv)
def resolve_window(since: str, until: str | None, timezone_name: str, *, now: datetime | None = None) -> dict[str, Any]:
tz = ZoneInfo(timezone_name)
base = now or datetime.now(tz=tz)
if base.tzinfo is None:
base = base.replace(tzinfo=tz)
base = base.astimezone(tz)
since_local = _resolve_time(since, tz, base, "start")
until_local = _resolve_time(until, tz, base, "end") if until else base
if since_local > until_local:
raise ValueError(f"since {since_local.isoformat()} is after until {until_local.isoformat()}")
return {
"since": since_local,
"until": until_local,
"since_utc": since_local.astimezone(timezone.utc),
"until_utc": until_local.astimezone(timezone.utc),
"timezone": timezone_name,
}
def _resolve_time(raw: str, tz: ZoneInfo, now: datetime, boundary: str) -> datetime:
token = raw.strip().lower()
if token == "now":
return now
if token == "today":
return _day_boundary(now.date(), tz, boundary)
if token == "yesterday":
return _day_boundary(now.date() - timedelta(days=1), tz, boundary)
strict_last = False
if token.startswith("last-"):
strict_last = True
token = token.removeprefix("last-")
elif token.startswith("last "):
strict_last = True
token = token.removeprefix("last ")
if token in WEEKDAYS:
days_back = (now.weekday() - WEEKDAYS[token]) % 7
if strict_last and days_back == 0:
days_back = 7
return _day_boundary((now - timedelta(days=days_back)).date(), tz, boundary)
if "T" in raw or " " in raw:
value = datetime.fromisoformat(raw)
if value.tzinfo is None:
value = value.replace(tzinfo=tz)
return value.astimezone(tz)
try:
return _day_boundary(date.fromisoformat(raw), tz, boundary)
except ValueError as exc:
raise ValueError(f"unsupported time expression: {raw!r}") from exc
def _day_boundary(day: date, tz: ZoneInfo, boundary: str) -> datetime:
if boundary == "end":
return datetime.combine(day, time.max, tzinfo=tz)
return datetime.combine(day, time.min, tzinfo=tz)
def definition_uuid(raw_id: str) -> str:
try:
return str(uuid.UUID(raw_id))
except ValueError:
return str(uuid.uuid5(ACTIVITY_DEFINITION_ID_NAMESPACE, raw_id))
def file_definitions() -> list[dict[str, Any]]:
records = []
for definition in scan_and_parse():
trigger = dict(definition.trigger_config or {})
trigger_type = str(trigger.get("trigger_type") or trigger.get("type") or "")
if trigger_type not in {"cron", "scheduled"}:
continue
records.append({
"id": definition_uuid(definition.id),
"name": definition.name,
"enabled": bool(definition.enabled),
"trigger_type": trigger_type,
"trigger_config": trigger,
"instructions": list(definition.instructions or []),
"source": "files",
})
return sorted(records, key=lambda item: item["name"])
def filter_definitions(definitions: list[dict[str, Any]], ids: list[str], names: list[str]) -> list[dict[str, Any]]:
wanted_ids = {item.lower() for item in ids}
wanted_names = {item.lower() for item in names}
if not wanted_ids and not wanted_names:
return definitions
return [
item for item in definitions
if item["id"].lower() in wanted_ids or item["name"].lower() in wanted_names
]
def progress_event_types(args: argparse.Namespace) -> list[str | None]:
raw = args.progress_event_type
if raw is None:
env_value = os.environ.get("AUTOMATION_STATUS_PROGRESS_EVENT_TYPES")
raw = env_value.split(",") if env_value else ["daily_triage", "schedule_miss", "ops_inventory_probe"]
values = [item.strip() for item in raw if item and item.strip()]
return [None if item == "all" else item for item in values]
def expected_fires(definition: dict[str, Any], window: dict[str, Any]) -> list[str]:
cfg = definition.get("trigger_config") or {}
if definition.get("trigger_type") == "scheduled":
at = coerce_datetime(cfg.get("at"))
if at and in_window(at, window):
return [at.astimezone(ZoneInfo(window["timezone"])).isoformat()]
return []
if definition.get("trigger_type") != "cron" or not cfg.get("cron_expression"):
return []
tz = ZoneInfo(str(cfg.get("timezone") or window["timezone"]))
start = window["since_utc"].astimezone(tz).replace(second=0, microsecond=0)
end = window["until_utc"].astimezone(tz).replace(second=0, microsecond=0)
minutes = int((end - start).total_seconds() // 60) + 1
if minutes < 0:
return []
if minutes > 366 * 24 * 60:
raise ValueError("automation-status refuses to estimate cron windows longer than 366 days")
cron = parse_cron(cfg["cron_expression"])
fires = []
current = start
for _ in range(minutes):
if cron_matches(current, cron):
fires.append(current.isoformat())
current += timedelta(minutes=1)
return fires
def parse_cron(expr: str) -> tuple[set[int], set[int], set[int], set[int], set[int]]:
parts = expr.split()
if len(parts) != 5:
raise ValueError(f"unsupported cron expression {expr!r}: expected 5 fields")
return (
parse_cron_field(parts[0], 0, 59),
parse_cron_field(parts[1], 0, 23),
parse_cron_field(parts[2], 1, 31),
parse_cron_field(parts[3], 1, 12),
parse_cron_field(parts[4], 0, 7),
)
def parse_cron_field(field: str, minimum: int, maximum: int) -> set[int]:
values: set[int] = set()
for chunk in field.split(","):
base, _, step_text = chunk.partition("/")
step = int(step_text) if step_text else 1
if base == "*":
start, stop = minimum, maximum
elif "-" in base:
left, right = base.split("-", 1)
start, stop = int(left), int(right)
else:
start = stop = int(base)
if step <= 0 or start < minimum or stop > maximum or start > stop:
raise ValueError(f"cron field {field!r} outside {minimum}-{maximum}")
values.update(range(start, stop + 1, step))
if minimum == 0 and maximum == 7 and 7 in values:
values.add(0)
values.discard(7)
return values
def cron_matches(value: datetime, cron: tuple[set[int], set[int], set[int], set[int], set[int]]) -> bool:
minute, hour, day, month, weekday = cron
cron_weekday = (value.weekday() + 1) % 7
return value.minute in minute and value.hour in hour and value.day in day and value.month in month and cron_weekday in weekday
async def db_definitions(db_url: str) -> list[dict[str, Any]]:
engine = make_engine(db_url)
try:
async with engine.connect() as conn:
result = await conn.execute(text(
"select id, name, enabled, trigger_type, trigger_config, instructions_json, version "
"from activity_definitions where trigger_type in ('cron', 'scheduled') order by name"
))
return [{
"id": str(row["id"]),
"name": row["name"],
"enabled": bool(row["enabled"]),
"trigger_type": row["trigger_type"],
"trigger_config": dict(row["trigger_config"] or {}),
"instructions": list(row["instructions_json"] or []),
"version": row["version"],
"source": "db",
} for row in result.mappings().all()]
finally:
await engine.dispose()
async def load_definitions(args: argparse.Namespace, warnings: list[str]) -> tuple[list[dict[str, Any]], dict[str, Any]]:
if args.db_url:
try:
return await db_definitions(args.db_url), {"status": "ok", "source": "db"}
except Exception as exc: # pragma: no cover - depends on local DB driver/runtime
warning = f"definition DB unavailable; using file definitions: {exc}"
warnings.append(warning)
return file_definitions(), {"status": "degraded", "source": "files", "warning": warning}
warning = "ACTCORE_DB_URL is not set; using file definitions and skipping run-history DB checks"
warnings.append(warning)
return file_definitions(), {"status": "degraded", "source": "files", "warning": warning}
async def load_runs(db_url: str | None, definitions: list[dict[str, Any]], window: dict[str, Any]) -> tuple[dict[str, list[dict[str, Any]]], dict[str, Any]]:
if not db_url:
return {}, {"status": "unavailable", "warning": "ACTCORE_DB_URL is not set"}
if not definitions:
return {}, {"status": "ok", "count": 0}
ids = [uuid.UUID(item["id"]) for item in definitions]
stmt = text(
"select run_id, activity_id, scheduled_for, fired_at, tasks_spawned, version_used from activity_runs "
"where activity_id in :ids and coalesce(scheduled_for, fired_at) >= :since "
"and coalesce(scheduled_for, fired_at) <= :until order by fired_at"
).bindparams(bindparam("ids", expanding=True))
engine = make_engine(db_url)
try:
async with engine.connect() as conn:
result = await conn.execute(stmt, {"ids": ids, "since": window["since_utc"], "until": window["until_utc"]})
rows = result.mappings().all()
except Exception as exc: # pragma: no cover - depends on local DB driver/runtime
return {}, {"status": "unavailable", "warning": f"activity_runs unavailable: {exc}"}
finally:
await engine.dispose()
grouped: dict[str, list[dict[str, Any]]] = {}
for row in rows:
record = {
"run_id": str(row["run_id"]),
"activity_id": str(row["activity_id"]),
"scheduled_for": iso(coerce_datetime(row["scheduled_for"])),
"fired_at": iso(coerce_datetime(row["fired_at"])),
"tasks_spawned": row["tasks_spawned"],
"version_used": row["version_used"],
}
grouped.setdefault(record["activity_id"], []).append(record)
return grouped, {"status": "ok", "count": len(rows)}
async def load_spawn_validation(db_url: str | None, definitions: list[dict[str, Any]], window: dict[str, Any]) -> tuple[list[dict[str, Any]], dict[str, Any]]:
if not db_url:
return [], {"status": "unavailable", "warning": "ACTCORE_DB_URL is not set"}
if not definitions:
return [], {"status": "ok", "count": 0}
ids = [uuid.UUID(item["id"]) for item in definitions]
stmt = text(
"select activity_def_id, source_id, output_validated, created_at from task_spawn_log "
"where activity_def_id in :ids and created_at >= :since and created_at <= :until "
"and output_validated is not null"
).bindparams(bindparam("ids", expanding=True))
engine = make_engine(db_url)
try:
async with engine.connect() as conn:
result = await conn.execute(stmt, {"ids": ids, "since": window["since_utc"], "until": window["until_utc"]})
rows = result.mappings().all()
except Exception as exc: # pragma: no cover - depends on local DB driver/runtime
return [], {"status": "unavailable", "warning": f"task_spawn_log unavailable: {exc}"}
finally:
await engine.dispose()
return [{
"source": "task_spawn_log",
"activity_id": str(row["activity_def_id"]),
"created_at": iso(coerce_datetime(row["created_at"])),
"output_validated": bool(row["output_validated"]),
"summary": f"instruction {row['source_id']} task output validation",
} for row in rows], {"status": "ok", "count": len(rows)}
def load_state_hub_progress(state_hub_url: str | None, window: dict[str, Any], *, limit: int, timeout_seconds: float, event_types: list[str | None] | None = None) -> tuple[list[dict[str, Any]], dict[str, Any]]:
if not state_hub_url:
return [], {"status": "unavailable", "warning": "STATE_HUB_URL is not set"}
event_types = event_types or [None]
payload: list[Any] = []
try:
for event_type in event_types:
params: dict[str, Any] = {"limit": limit}
if event_type:
params["event_type"] = event_type
response = httpx.get(f"{state_hub_url.rstrip('/')}/progress/", params=params, timeout=timeout_seconds)
response.raise_for_status()
items = response.json()
if not isinstance(items, list):
return [], {"status": "unavailable", "warning": "State Hub progress response was not a list"}
payload.extend(items)
except Exception as exc:
detail = str(exc) or exc.__class__.__name__
return [], {"status": "unavailable", "warning": f"State Hub progress unavailable: {detail}"}
evidence: list[dict[str, Any]] = []
seen: set[str] = set()
for item in payload:
if not isinstance(item, dict):
continue
item_id = str(item.get("id") or id(item))
if item_id in seen:
continue
seen.add(item_id)
detail = item.get("detail") if isinstance(item.get("detail"), dict) else {}
created_at = coerce_datetime(item.get("created_at"))
scheduled_for = coerce_datetime(detail.get("scheduled_for"))
event_time = scheduled_for or created_at
if event_time and not in_window(event_time, window):
continue
run_id = string_or_none(detail.get("activity_core_run_id"))
activity_id = string_or_none(detail.get("activity_id"))
if not run_id and not activity_id and item.get("event_type") != "schedule_miss":
continue
evidence.append({
"source": "state_hub_progress",
"activity_id": activity_id,
"run_id": run_id,
"scheduled_for": iso(scheduled_for),
"created_at": iso(created_at),
"event_type": string_or_none(item.get("event_type")),
"output_validated": bool_or_none(detail.get("output_validated")),
"validation_error": shorten(string_or_none(detail.get("validation_error"))),
"status": string_or_none(detail.get("status")),
"summary": shorten(string_or_none(item.get("summary"))),
"path": string_or_none(detail.get("working_memory_path")),
})
labels = [item or "all" for item in event_types]
return evidence, {"status": "ok", "count": len(evidence), "event_types": labels}
def load_working_memory_evidence(working_memory_dir: str | None, window: dict[str, Any]) -> tuple[list[dict[str, Any]], dict[str, Any]]:
if not working_memory_dir:
return [], {"status": "unavailable", "warning": "working-memory directory is not configured"}
root = Path(working_memory_dir).expanduser()
if not root.exists():
return [], {"status": "unavailable", "warning": f"working-memory directory does not exist: {root}"}
evidence: list[dict[str, Any]] = []
for path in sorted(root.glob("*.md")):
meta = read_frontmatter(path)
if not meta or meta.get("source") != "activity-core":
continue
scheduled_for = coerce_datetime(meta.get("scheduled_for"))
created_at = coerce_datetime(meta.get("created"))
event_time = scheduled_for or created_at
if event_time and not in_window(event_time, window):
continue
evidence.append({
"source": "working_memory",
"activity_id": string_or_none(meta.get("activity_id")),
"run_id": string_or_none(meta.get("activity_core_run_id")),
"scheduled_for": iso(scheduled_for),
"created_at": iso(created_at),
"output_validated": bool_or_none(meta.get("output_validated")),
"path": str(path),
"summary": path.name,
})
return evidence, {"status": "ok", "count": len(evidence), "path": str(root)}
def read_frontmatter(path: Path) -> dict[str, Any]:
try:
value = path.read_text(encoding="utf-8")
except OSError:
return {}
if not value.startswith("---\n"):
return {}
parts = value.split("---\n", 2)
if len(parts) < 3:
return {}
loaded = yaml.safe_load(parts[1])
return loaded if isinstance(loaded, dict) else {}
async def load_temporal_visibility(temporal_host: str | None, namespace: str, definitions: list[dict[str, Any]], *, timeout_seconds: float) -> tuple[dict[str, dict[str, Any]], dict[str, Any]]:
if not temporal_host:
return {}, {"status": "skipped", "warning": "TEMPORAL_HOST is not set"}
try:
from temporalio.client import Client
client = await asyncio.wait_for(Client.connect(temporal_host, namespace=namespace), timeout=timeout_seconds)
except Exception as exc:
return {}, {"status": "unavailable", "warning": f"Temporal unavailable: {exc}"}
records: dict[str, dict[str, Any]] = {}
for definition in definitions:
sid = automation_schedule_id(definition)
try:
desc = await asyncio.wait_for(client.get_schedule_handle(sid).describe(), timeout=timeout_seconds)
state = getattr(getattr(desc, "schedule", None), "state", None)
info = getattr(desc, "info", None)
records[definition["id"]] = {
"schedule_id": sid,
"available": True,
"paused": getattr(state, "paused", None),
"missed_catchup_window": int(getattr(info, "num_actions_missed_catchup_window", 0) or 0),
"last_fired_at": iso(latest_recent_action_time(getattr(info, "recent_actions", None) or [])),
"workflows": await list_workflows(client, definition["id"], timeout_seconds),
}
except Exception as exc:
records[definition["id"]] = {"schedule_id": sid, "available": False, "warning": str(exc)}
return records, {"status": "ok", "count": len(records)}
async def list_workflows(client: Any, activity_id: str, timeout_seconds: float) -> list[dict[str, Any]]:
async def collect() -> list[dict[str, Any]]:
workflows: list[dict[str, Any]] = []
async for item in client.list_workflows(query=f'ActivityId="{activity_id}"'):
workflows.append({
"id": item.id,
"run_id": item.run_id,
"status": str(item.status),
"start_time": iso(coerce_datetime(getattr(item, "start_time", None))),
"close_time": iso(coerce_datetime(getattr(item, "close_time", None))),
})
if len(workflows) >= 5:
break
return workflows
return await asyncio.wait_for(collect(), timeout=timeout_seconds)
def latest_recent_action_time(actions: list[Any]) -> datetime | None:
times = [coerce_datetime(getattr(item, "scheduled_at", None) or getattr(item, "started_at", None)) for item in actions]
times = [item for item in times if item is not None]
return max(times) if times else None
def automation_schedule_id(definition: dict[str, Any]) -> str:
activity_id = definition["id"]
if definition.get("trigger_type") == "scheduled":
return f"activity-schedule-{activity_id}-once"
return schedule_id(activity_id)
async def build_report(args: argparse.Namespace) -> tuple[dict[str, Any], int]:
window = resolve_window(args.since, args.until, args.timezone)
timeout = max(float(args.timeout_seconds), 0.1)
warnings: list[str] = []
sources: dict[str, dict[str, Any]] = {}
try:
definitions, sources["definitions"] = await asyncio.wait_for(
load_definitions(args, warnings),
timeout=timeout,
)
except asyncio.TimeoutError:
warning = "definition DB timed out; using file definitions"
warnings.append(warning)
definitions = file_definitions()
sources["definitions"] = {"status": "degraded", "source": "files", "warning": warning}
definitions = filter_definitions(definitions, args.activity_id, args.activity_name)
try:
runs_by_activity, sources["activity_runs"] = await asyncio.wait_for(
load_runs(args.db_url, definitions, window),
timeout=timeout,
)
except asyncio.TimeoutError:
runs_by_activity = {}
sources["activity_runs"] = {"status": "unavailable", "warning": "activity_runs timed out"}
try:
spawn_evidence, sources["task_spawn_log"] = await asyncio.wait_for(
load_spawn_validation(args.db_url, definitions, window),
timeout=timeout,
)
except asyncio.TimeoutError:
spawn_evidence = []
sources["task_spawn_log"] = {"status": "unavailable", "warning": "task_spawn_log timed out"}
progress_evidence, sources["state_hub_progress"] = load_state_hub_progress(
args.state_hub_url,
window,
limit=args.progress_limit,
timeout_seconds=timeout,
event_types=progress_event_types(args),
)
wm_evidence, sources["working_memory"] = load_working_memory_evidence(args.working_memory_dir, window)
temporal_by_activity, sources["temporal"] = await load_temporal_visibility(
args.temporal_host,
args.temporal_namespace,
definitions,
timeout_seconds=timeout,
)
for source in sources.values():
if source.get("status") not in {"ok", "skipped"} and source.get("warning"):
warnings.append(str(source["warning"]))
all_evidence = progress_evidence + wm_evidence + spawn_evidence
definitions = add_evidence_only_definitions(definitions, all_evidence)
activities = []
runs_available = sources["activity_runs"].get("status") == "ok"
for definition in definitions:
runs = runs_by_activity.get(definition["id"], [])
activities.append(classify_activity(
definition,
window,
runs,
evidence_for_activity(definition, runs, all_evidence, window),
temporal_by_activity.get(definition["id"]),
expected_fires(definition, window),
runs_available=runs_available,
))
report = {
"mode": "automation-status",
"generated_at": datetime.now(tz=timezone.utc).isoformat(),
"window": {
"since": window["since"].isoformat(),
"until": window["until"].isoformat(),
"timezone": window["timezone"],
"since_utc": window["since_utc"].isoformat(),
"until_utc": window["until_utc"].isoformat(),
},
"sources": sources,
"summary": summarize(activities),
"activities": activities,
"warnings": sorted(set(warnings)),
}
exit_code = 1 if any(item["status"] in FAILURE_STATUSES for item in activities) else 0
return report, exit_code
def add_evidence_only_definitions(definitions: list[dict[str, Any]], evidence: list[dict[str, Any]]) -> list[dict[str, Any]]:
known = {item["id"] for item in definitions}
result = list(definitions)
for item in evidence:
activity_id = item.get("activity_id")
if not activity_id or activity_id in known:
continue
known.add(activity_id)
result.append({
"id": activity_id,
"name": f"Activity {activity_id}",
"enabled": True,
"trigger_type": "evidence",
"trigger_config": {},
"instructions": [],
"source": "evidence",
})
return result
def evidence_for_activity(definition: dict[str, Any], runs: list[dict[str, Any]], all_evidence: list[dict[str, Any]], window: dict[str, Any]) -> list[dict[str, Any]]:
run_ids = {item["run_id"] for item in runs}
selected: list[dict[str, Any]] = []
for item in all_evidence:
if item.get("activity_id") == definition["id"] or (item.get("run_id") and item["run_id"] in run_ids):
selected.append(item)
continue
event_time = coerce_datetime(item.get("scheduled_for") or item.get("created_at"))
if item.get("activity_id") is None and item.get("run_id") in run_ids and (not event_time or in_window(event_time, window)):
selected.append(item)
return selected
def classify_activity(definition: dict[str, Any], window: dict[str, Any], runs: list[dict[str, Any]], evidence: list[dict[str, Any]], temporal: dict[str, Any] | None, expected: list[str], *, runs_available: bool) -> dict[str, Any]:
warnings: list[str] = []
if not runs_available:
warnings.append("activity_runs source unavailable; missed-run verdict is unknown")
if temporal and not temporal.get("available", False) and temporal.get("warning"):
warnings.append(f"Temporal schedule visibility unavailable: {temporal['warning']}")
if temporal and temporal.get("paused") and definition.get("enabled"):
warnings.append("Temporal schedule is paused while ActivityDefinition is enabled")
workflows = temporal.get("workflows", []) if temporal else []
if not definition.get("enabled"):
status = "disabled"
elif any(item.get("output_validated") is False for item in evidence):
status = "validation_failed"
elif any(workflow_status_matches(item, {"RUNNING"}) for item in workflows):
status = "running"
elif any(workflow_status_matches(item, {"FAILED", "TIMED_OUT", "TERMINATED"}) for item in workflows):
status = "retrying"
elif temporal and int(temporal.get("missed_catchup_window") or 0) > 0:
status = "missed"
elif runs_available and definition.get("enabled") and len(expected) > len(runs):
status = "missed"
elif runs or evidence:
status = "completed"
elif expected:
status = "unknown"
else:
status = "no_due"
return {
"id": definition["id"],
"name": definition["name"],
"enabled": definition["enabled"],
"trigger_type": definition["trigger_type"],
"trigger_config": public_trigger_config(definition.get("trigger_config", {})),
"schedule_id": automation_schedule_id(definition),
"definition_source": definition.get("source"),
"status": status,
"expected_fires": expected,
"expected_fire_count": len(expected),
"observed_run_count": len(runs),
"runs": runs,
"evidence": evidence,
"temporal": temporal,
"warnings": warnings,
"window": {"since": window["since"].isoformat(), "until": window["until"].isoformat(), "timezone": window["timezone"]},
}
def workflow_status_matches(workflow: dict[str, Any], names: set[str]) -> bool:
value = str(workflow.get("status") or "").upper()
return any(name in value for name in names)
def public_trigger_config(config: dict[str, Any]) -> dict[str, Any]:
allowed = {"trigger_type", "cron_expression", "timezone", "misfire_policy", "catchup_window_seconds", "jitter_seconds", "at"}
return {key: config.get(key) for key in allowed if key in config}
def summarize(activities: list[dict[str, Any]]) -> dict[str, Any]:
counts: dict[str, int] = {}
for item in activities:
counts[item["status"]] = counts.get(item["status"], 0) + 1
return {"activity_count": len(activities), "status_counts": counts, "failure_count": sum(counts.get(status, 0) for status in FAILURE_STATUSES)}
def render_human(report: dict[str, Any]) -> str:
window = report["window"]
lines = [
f"Automation status {window['since']} -> {window['until']} ({window['timezone']})",
render_source_line(report["sources"]),
render_summary_line(report["summary"]),
"",
]
for activity in report["activities"]:
lines.append(f"- {activity['name']} [{activity['status']}] expected={activity['expected_fire_count']} runs={activity['observed_run_count']} evidence={len(activity['evidence'])}")
if activity["expected_fires"]:
suffix = " ..." if len(activity["expected_fires"]) > 3 else ""
lines.append(" expected fires: " + ", ".join(activity["expected_fires"][:3]) + suffix)
for run in activity["runs"][:3]:
lines.append(f" run {run['run_id']} scheduled_for={run['scheduled_for']} fired_at={run['fired_at']} tasks={run['tasks_spawned']}")
for evidence in activity["evidence"][:3]:
detail = f" evidence {evidence['source']}"
if evidence.get("event_type"):
detail += f" event_type={evidence['event_type']}"
if evidence.get("run_id"):
detail += f" run={evidence['run_id']}"
if evidence.get("output_validated") is not None:
detail += f" output_validated={str(evidence['output_validated']).lower()}"
if evidence.get("validation_error"):
detail += f" validation_error={evidence['validation_error']}"
lines.append(detail)
for warning in activity["warnings"]:
lines.append(f" warning: {warning}")
if report["warnings"]:
lines.extend(["", "Warnings:"])
lines.extend(f"- {warning}" for warning in report["warnings"])
return "\n".join(lines)
def render_source_line(sources: dict[str, dict[str, Any]]) -> str:
parts = []
for name, source in sources.items():
label = f"{name}={source.get('status', 'unknown')}"
if source.get("source"):
label += f"/{source['source']}"
parts.append(label)
return "Sources: " + ", ".join(parts)
def render_summary_line(summary: dict[str, Any]) -> str:
counts = summary.get("status_counts") or {}
if not counts:
return "Summary: no scheduled automations found"
return "Summary: " + ", ".join(f"{status}={count}" for status, count in sorted(counts.items()))
def coerce_datetime(value: Any) -> datetime | None:
if value is None:
return None
if isinstance(value, datetime):
result = value
elif isinstance(value, date):
result = datetime.combine(value, time.min)
elif isinstance(value, str):
raw = value.strip()
if not raw:
return None
if raw.endswith("Z"):
raw = raw[:-1] + "+00:00"
try:
result = datetime.fromisoformat(raw)
except ValueError:
return None
else:
return None
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
return result.astimezone(timezone.utc)
def in_window(value: datetime, window: dict[str, Any]) -> bool:
instant = coerce_datetime(value)
return bool(instant and window["since_utc"] <= instant <= window["until_utc"])
def iso(value: datetime | None) -> str | None:
return value.isoformat() if value else None
def string_or_none(value: Any) -> str | None:
if value is None:
return None
result = str(value)
return result or None
def bool_or_none(value: Any) -> bool | None:
if value is None or isinstance(value, bool):
return value
if isinstance(value, str):
lowered = value.strip().lower()
if lowered in {"true", "yes", "1"}:
return True
if lowered in {"false", "no", "0"}:
return False
return None
def shorten(value: str | None, limit: int = 240) -> str | None:
if value is None:
return None
return value if len(value) <= limit else value[: limit - 3] + "..."
async def async_main(argv: list[str] | None = None) -> int:
args = parse_args(argv)
try:
report, exit_code = await build_report(args)
except ValueError as exc:
print(f"automation_status: {exc}", file=sys.stderr)
return 2
if args.format == "json":
print(json.dumps(report, indent=2, sort_keys=True))
else:
print(render_human(report))
return exit_code
def main(argv: list[str] | None = None) -> int:
return asyncio.run(async_main(argv))
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -17,6 +17,8 @@ import httpx
class DisabledLLMClient:
"""LLM client used when no llm-connect endpoint is configured."""
last_response_metadata: dict[str, Any] | None = None
def complete(
self,
prompt: str,
@@ -32,6 +34,7 @@ class LLMConnectClient:
def __init__(self, base_url: str, timeout_seconds: float = 300.0) -> None:
self.base_url = base_url.rstrip("/")
self.timeout_seconds = timeout_seconds
self.last_response_metadata: dict[str, Any] | None = None
def complete(
self,
@@ -54,12 +57,48 @@ class LLMConnectClient:
)
resp.raise_for_status()
data = resp.json()
self.last_response_metadata = _extract_response_metadata(data)
content = data.get("content")
if not isinstance(content, str):
raise ValueError("llm-connect response missing string content")
return content
_SAFE_RESPONSE_METADATA_KEYS = {
"finish_reason",
"usage",
"model",
"model_name",
"provider",
"request_id",
"response_id",
"trace_id",
"latency_ms",
"duration_ms",
"elapsed_ms",
"created",
"created_at",
}
def _extract_response_metadata(data: dict[str, Any]) -> dict[str, Any]:
"""Keep non-secret llm-connect diagnostics alongside the returned content."""
return {
key: value for key, value in data.items()
if key in _SAFE_RESPONSE_METADATA_KEYS and _json_safe(value)
}
def _json_safe(value: Any) -> bool:
try:
import json
json.dumps(value)
except (TypeError, ValueError):
return False
return True
def get_llm_client() -> DisabledLLMClient | LLMConnectClient:
base_url = os.environ.get("LLM_CONNECT_URL", "").strip()
if not base_url:

View File

@@ -136,6 +136,7 @@ def _post_state_hub_progress(
"output_validated": report_entry.get("output_validated"),
"review_required": report_entry.get("review_required"),
"validation_error": report_entry.get("validation_error"),
"llm_response_metadata": report_entry.get("llm_response_metadata"),
"report": report,
},
}
@@ -224,6 +225,16 @@ def _render_markdown(
lines.extend([summary, ""])
if validation_error:
lines.extend(["Validation error:", "", f"`{validation_error}`", ""])
metadata = report_entry.get("llm_response_metadata")
if metadata:
lines.extend([
"LLM response metadata:",
"",
"```json",
json.dumps(metadata, indent=2, sort_keys=True),
"```",
"",
])
lines.extend([
"```json",
json.dumps(report, indent=2, sort_keys=True),

View File

@@ -41,6 +41,7 @@ class InstructionResult:
review_required: bool = False
condition_matched: str | None = None
validation_error: str | None = None
llm_response_metadata: dict[str, Any] | None = None
def _resolve_path(obj: Any, path: str) -> Any:
@@ -167,12 +168,14 @@ def _execute(
# Step 3 — call LLM
raw_output = llm_client.complete(rendered, model=instr.model, config=llm_config)
response_metadata = _llm_response_metadata(llm_client)
# Step 4 — validate and optionally retry
task_specs, report, error = _validate_output(raw_output, instr, allow_list)
if error:
retry_prompt = rendered + f"\n\nPrevious output was invalid: {error}\nPlease fix."
raw_output = llm_client.complete(retry_prompt, model=instr.model, config=llm_config)
response_metadata = _llm_response_metadata(llm_client)
task_specs, report, error = _validate_output(raw_output, instr, allow_list)
if error:
# Truncate to keep log volume bounded but long enough to see the
@@ -188,10 +191,13 @@ def _execute(
# loss. One bad item should cost one item, not the whole report.
recovered = _resilient_report(
instr, raw_output, error, prompt_hash, allow_list,
response_metadata=response_metadata,
)
if recovered is not None:
return recovered
failure_report = _invalid_output_report(instr, error, raw_output)
failure_report = _invalid_output_report(
instr, error, raw_output, response_metadata=response_metadata,
)
if failure_report is not None:
return InstructionResult(
tasks=[],
@@ -202,6 +208,7 @@ def _execute(
review_required=True,
condition_matched=instr.condition or None,
validation_error=error,
llm_response_metadata=response_metadata,
)
return _empty_result(instr, prompt_hash=prompt_hash, validation_error=error)
@@ -213,6 +220,7 @@ def _execute(
output_validated=True,
review_required=bool(getattr(instr, "review_required", False)),
condition_matched=instr.condition or None,
llm_response_metadata=response_metadata,
)
@@ -252,6 +260,7 @@ def _invalid_output_report(
instr: Any,
validation_error: str,
raw_output: Any,
response_metadata: dict[str, Any] | None = None,
) -> dict[str, Any] | None:
"""Build a durable diagnostic report for invalid report-sink output.
@@ -269,7 +278,7 @@ def _invalid_output_report(
partial_output = _parse_json_output(raw_output)
except json.JSONDecodeError:
partial_output = None
raw_preview = raw_output[:4000]
raw_preview = raw_output[:_RAW_OUTPUT_PREVIEW_LIMIT]
else:
partial_output = raw_output
@@ -281,6 +290,8 @@ def _invalid_output_report(
"status": "validation_failed",
"validation_error": validation_error,
}
if response_metadata:
report["llm_response_metadata"] = response_metadata
if isinstance(partial_output, dict):
if isinstance(partial_output.get("summary"), str):
report["partial_summary"] = partial_output["summary"]
@@ -310,9 +321,43 @@ _SNIPPET_LIMIT = 200
# fail the whole report or flow unbounded into a downstream consumer.
_MAX_STRING_LEN = 4000
_MAX_DEPTH = 8
_RAW_OUTPUT_PREVIEW_LIMIT = 12000
_SUMMARY_RE = re.compile(r'"summary"\s*:\s*"((?:[^"\\]|\\.)*)"')
_SAFE_RESPONSE_METADATA_KEYS = {
"finish_reason",
"usage",
"model",
"model_name",
"provider",
"request_id",
"response_id",
"trace_id",
"latency_ms",
"duration_ms",
"elapsed_ms",
"created",
"created_at",
}
def _llm_response_metadata(llm_client: Any) -> dict[str, Any] | None:
metadata = getattr(llm_client, "last_response_metadata", None)
if not isinstance(metadata, dict) or not metadata:
return None
safe: dict[str, Any] = {}
for key, value in metadata.items():
if key not in _SAFE_RESPONSE_METADATA_KEYS:
continue
try:
json.dumps(value)
except (TypeError, ValueError):
continue
safe[str(key)] = value
return safe or None
def _snippet(value: Any) -> str:
text = value if isinstance(value, str) else json.dumps(value, default=str)
return text[:_SNIPPET_LIMIT]
@@ -561,6 +606,7 @@ def _resilient_report(
original_error: str,
prompt_hash: str | None,
allow_list: set[str] | None = None,
response_metadata: dict[str, Any] | None = None,
) -> InstructionResult | None:
"""Recover a partial-but-usable report from output that failed validation.
@@ -590,6 +636,8 @@ def _resilient_report(
"quarantined_items": quarantined[:_QUARANTINE_LIMIT],
"recovery_note": f"original validation error: {original_error}",
}
if response_metadata:
report["llm_response_metadata"] = response_metadata
logger.warning(
"instruction_output_recovered: instruction=%r, kept=%d, quarantined=%d",
getattr(instr, "id", None), len(valid), len(quarantined),
@@ -603,6 +651,7 @@ def _resilient_report(
review_required=True,
condition_matched=getattr(instr, "condition", "") or None,
validation_error=None,
llm_response_metadata=response_metadata,
)