Files
activity-core/src/activity_core/automation_status.py

1108 lines
46 KiB
Python

"""Repo-native automation status and inventory reporting without LLM calls."""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import sys
import uuid
from datetime import date, datetime, time, timedelta, timezone
from pathlib import Path
from typing import Any
from zoneinfo import ZoneInfo
import httpx
import yaml
from sqlalchemy import bindparam, text
from activity_core.db import make_engine
from activity_core.definition_parser import scan_and_parse
from activity_core.schedule_manager import schedule_id
from activity_core.sync_activity_definitions import ACTIVITY_DEFINITION_ID_NAMESPACE
DEFAULT_TIMEZONE = "Europe/Berlin"
DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000"
DEFAULT_WORKING_MEMORY_DIR = "/home/worsch/the-custodian/memory/working"
DEFAULT_TEMPORAL_NAMESPACE = "default"
FAILURE_STATUSES = {"missed", "validation_failed", "sink_failed"}
WEEKDAYS = {
"monday": 0,
"tuesday": 1,
"wednesday": 2,
"thursday": 3,
"friday": 4,
"saturday": 5,
"sunday": 6,
}
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Report recent activity-core automation status without LLMs."
)
parser.add_argument(
"--since",
default=os.environ.get("SINCE", "today"),
help="Window start: YYYY-MM-DD, ISO datetime, today, yesterday, friday, or last-friday.",
)
parser.add_argument(
"--until",
default=os.environ.get("UNTIL"),
help="Window end. Defaults to now; date-only values use that day's end.",
)
parser.add_argument("--timezone", default=os.environ.get("AUTOMATION_STATUS_TIMEZONE", DEFAULT_TIMEZONE))
parser.add_argument("--activity-id", action="append", default=[])
parser.add_argument("--activity-name", action="append", default=[])
parser.add_argument("--db-url", default=os.environ.get("ACTCORE_DB_URL"))
parser.add_argument("--state-hub-url", default=os.environ.get("STATE_HUB_URL", DEFAULT_STATE_HUB_URL))
parser.add_argument("--working-memory-dir", default=os.environ.get("AUTOMATION_STATUS_WORKING_MEMORY_DIR", DEFAULT_WORKING_MEMORY_DIR))
parser.add_argument("--temporal-host", default=os.environ.get("TEMPORAL_HOST"))
parser.add_argument("--temporal-namespace", default=os.environ.get("TEMPORAL_NAMESPACE", DEFAULT_TEMPORAL_NAMESPACE))
parser.add_argument("--timeout-seconds", type=float, default=float(os.environ.get("AUTOMATION_STATUS_TIMEOUT_SECONDS", "5")))
parser.add_argument("--progress-limit", type=int, default=int(os.environ.get("AUTOMATION_STATUS_PROGRESS_LIMIT", "100")))
parser.add_argument("--progress-event-type", action="append", default=None, help="State Hub progress event type to read; repeatable. Use all for the unfiltered feed.")
parser.add_argument("--format", choices=("human", "json"), default=os.environ.get("FORMAT", "human"))
return parser.parse_args(argv)
def resolve_window(since: str, until: str | None, timezone_name: str, *, now: datetime | None = None) -> dict[str, Any]:
tz = ZoneInfo(timezone_name)
base = now or datetime.now(tz=tz)
if base.tzinfo is None:
base = base.replace(tzinfo=tz)
base = base.astimezone(tz)
since_local = _resolve_time(since, tz, base, "start")
until_local = _resolve_time(until, tz, base, "end") if until else base
if since_local > until_local:
raise ValueError(f"since {since_local.isoformat()} is after until {until_local.isoformat()}")
return {
"since": since_local,
"until": until_local,
"since_utc": since_local.astimezone(timezone.utc),
"until_utc": until_local.astimezone(timezone.utc),
"timezone": timezone_name,
}
def _resolve_time(raw: str, tz: ZoneInfo, now: datetime, boundary: str) -> datetime:
token = raw.strip().lower()
if token == "now":
return now
if token == "today":
return _day_boundary(now.date(), tz, boundary)
if token == "yesterday":
return _day_boundary(now.date() - timedelta(days=1), tz, boundary)
strict_last = False
if token.startswith("last-"):
strict_last = True
token = token.removeprefix("last-")
elif token.startswith("last "):
strict_last = True
token = token.removeprefix("last ")
if token in WEEKDAYS:
days_back = (now.weekday() - WEEKDAYS[token]) % 7
if strict_last and days_back == 0:
days_back = 7
return _day_boundary((now - timedelta(days=days_back)).date(), tz, boundary)
if "T" in raw or " " in raw:
value = datetime.fromisoformat(raw)
if value.tzinfo is None:
value = value.replace(tzinfo=tz)
return value.astimezone(tz)
try:
return _day_boundary(date.fromisoformat(raw), tz, boundary)
except ValueError as exc:
raise ValueError(f"unsupported time expression: {raw!r}") from exc
def _day_boundary(day: date, tz: ZoneInfo, boundary: str) -> datetime:
if boundary == "end":
return datetime.combine(day, time.max, tzinfo=tz)
return datetime.combine(day, time.min, tzinfo=tz)
def definition_uuid(raw_id: str) -> str:
try:
return str(uuid.UUID(raw_id))
except ValueError:
return str(uuid.uuid5(ACTIVITY_DEFINITION_ID_NAMESPACE, raw_id))
def file_definitions() -> list[dict[str, Any]]:
records = []
for definition in scan_and_parse():
trigger = dict(definition.trigger_config or {})
trigger_type = str(trigger.get("trigger_type") or trigger.get("type") or "")
if trigger_type not in {"cron", "scheduled"}:
continue
records.append({
"id": definition_uuid(definition.id),
"name": definition.name,
"enabled": bool(definition.enabled),
"trigger_type": trigger_type,
"trigger_config": trigger,
"instructions": list(definition.instructions or []),
"source": "files",
})
return sorted(records, key=lambda item: item["name"])
def filter_definitions(definitions: list[dict[str, Any]], ids: list[str], names: list[str]) -> list[dict[str, Any]]:
wanted_ids = {item.lower() for item in ids}
wanted_names = {item.lower() for item in names}
if not wanted_ids and not wanted_names:
return definitions
return [
item for item in definitions
if item["id"].lower() in wanted_ids or item["name"].lower() in wanted_names
]
def progress_event_types(args: argparse.Namespace) -> list[str | None]:
raw = args.progress_event_type
if raw is None:
env_value = os.environ.get("AUTOMATION_STATUS_PROGRESS_EVENT_TYPES")
raw = env_value.split(",") if env_value else ["daily_triage", "schedule_miss", "ops_inventory_probe"]
values = [item.strip() for item in raw if item and item.strip()]
return [None if item == "all" else item for item in values]
def expected_fires(definition: dict[str, Any], window: dict[str, Any]) -> list[str]:
cfg = definition.get("trigger_config") or {}
if definition.get("trigger_type") == "scheduled":
at = coerce_datetime(cfg.get("at"))
if at and in_window(at, window):
return [at.astimezone(ZoneInfo(window["timezone"])).isoformat()]
return []
if definition.get("trigger_type") != "cron" or not cfg.get("cron_expression"):
return []
tz = ZoneInfo(str(cfg.get("timezone") or window["timezone"]))
start = window["since_utc"].astimezone(tz).replace(second=0, microsecond=0)
end = window["until_utc"].astimezone(tz).replace(second=0, microsecond=0)
minutes = int((end - start).total_seconds() // 60) + 1
if minutes < 0:
return []
if minutes > 366 * 24 * 60:
raise ValueError("automation-status refuses to estimate cron windows longer than 366 days")
cron = parse_cron(cfg["cron_expression"])
fires = []
current = start
for _ in range(minutes):
if cron_matches(current, cron):
fires.append(current.isoformat())
current += timedelta(minutes=1)
return fires
def parse_cron(expr: str) -> tuple[set[int], set[int], set[int], set[int], set[int]]:
parts = expr.split()
if len(parts) != 5:
raise ValueError(f"unsupported cron expression {expr!r}: expected 5 fields")
return (
parse_cron_field(parts[0], 0, 59),
parse_cron_field(parts[1], 0, 23),
parse_cron_field(parts[2], 1, 31),
parse_cron_field(parts[3], 1, 12),
parse_cron_field(parts[4], 0, 7),
)
def parse_cron_field(field: str, minimum: int, maximum: int) -> set[int]:
values: set[int] = set()
for chunk in field.split(","):
base, _, step_text = chunk.partition("/")
step = int(step_text) if step_text else 1
if base == "*":
start, stop = minimum, maximum
elif "-" in base:
left, right = base.split("-", 1)
start, stop = int(left), int(right)
else:
start = stop = int(base)
if step <= 0 or start < minimum or stop > maximum or start > stop:
raise ValueError(f"cron field {field!r} outside {minimum}-{maximum}")
values.update(range(start, stop + 1, step))
if minimum == 0 and maximum == 7 and 7 in values:
values.add(0)
values.discard(7)
return values
def cron_matches(value: datetime, cron: tuple[set[int], set[int], set[int], set[int], set[int]]) -> bool:
minute, hour, day, month, weekday = cron
cron_weekday = (value.weekday() + 1) % 7
return value.minute in minute and value.hour in hour and value.day in day and value.month in month and cron_weekday in weekday
async def db_definitions(db_url: str) -> list[dict[str, Any]]:
engine = make_engine(db_url)
try:
async with engine.connect() as conn:
result = await conn.execute(text(
"select id, name, enabled, trigger_type, trigger_config, instructions_json, version "
"from activity_definitions where trigger_type in ('cron', 'scheduled') order by name"
))
return [{
"id": str(row["id"]),
"name": row["name"],
"enabled": bool(row["enabled"]),
"trigger_type": row["trigger_type"],
"trigger_config": dict(row["trigger_config"] or {}),
"instructions": list(row["instructions_json"] or []),
"version": row["version"],
"source": "db",
} for row in result.mappings().all()]
finally:
await engine.dispose()
async def load_definitions(args: argparse.Namespace, warnings: list[str]) -> tuple[list[dict[str, Any]], dict[str, Any]]:
if args.db_url:
try:
return await db_definitions(args.db_url), {"status": "ok", "source": "db"}
except Exception as exc: # pragma: no cover - depends on local DB driver/runtime
warning = f"definition DB unavailable; using file definitions: {exc}"
warnings.append(warning)
return file_definitions(), {"status": "degraded", "source": "files", "warning": warning}
warning = "ACTCORE_DB_URL is not set; using file definitions and skipping run-history DB checks"
warnings.append(warning)
return file_definitions(), {"status": "degraded", "source": "files", "warning": warning}
async def load_runs(db_url: str | None, definitions: list[dict[str, Any]], window: dict[str, Any]) -> tuple[dict[str, list[dict[str, Any]]], dict[str, Any]]:
if not db_url:
return {}, {"status": "unavailable", "warning": "ACTCORE_DB_URL is not set"}
if not definitions:
return {}, {"status": "ok", "count": 0}
ids = [uuid.UUID(item["id"]) for item in definitions]
stmt = text(
"select run_id, activity_id, scheduled_for, fired_at, tasks_spawned, version_used from activity_runs "
"where activity_id in :ids and coalesce(scheduled_for, fired_at) >= :since "
"and coalesce(scheduled_for, fired_at) <= :until order by fired_at"
).bindparams(bindparam("ids", expanding=True))
engine = make_engine(db_url)
try:
async with engine.connect() as conn:
result = await conn.execute(stmt, {"ids": ids, "since": window["since_utc"], "until": window["until_utc"]})
rows = result.mappings().all()
except Exception as exc: # pragma: no cover - depends on local DB driver/runtime
return {}, {"status": "unavailable", "warning": f"activity_runs unavailable: {exc}"}
finally:
await engine.dispose()
grouped: dict[str, list[dict[str, Any]]] = {}
for row in rows:
record = {
"run_id": str(row["run_id"]),
"activity_id": str(row["activity_id"]),
"scheduled_for": iso(coerce_datetime(row["scheduled_for"])),
"fired_at": iso(coerce_datetime(row["fired_at"])),
"tasks_spawned": row["tasks_spawned"],
"version_used": row["version_used"],
}
grouped.setdefault(record["activity_id"], []).append(record)
return grouped, {"status": "ok", "count": len(rows)}
async def load_spawn_validation(db_url: str | None, definitions: list[dict[str, Any]], window: dict[str, Any]) -> tuple[list[dict[str, Any]], dict[str, Any]]:
if not db_url:
return [], {"status": "unavailable", "warning": "ACTCORE_DB_URL is not set"}
if not definitions:
return [], {"status": "ok", "count": 0}
ids = [uuid.UUID(item["id"]) for item in definitions]
stmt = text(
"select activity_def_id, source_id, output_validated, created_at from task_spawn_log "
"where activity_def_id in :ids and created_at >= :since and created_at <= :until "
"and output_validated is not null"
).bindparams(bindparam("ids", expanding=True))
engine = make_engine(db_url)
try:
async with engine.connect() as conn:
result = await conn.execute(stmt, {"ids": ids, "since": window["since_utc"], "until": window["until_utc"]})
rows = result.mappings().all()
except Exception as exc: # pragma: no cover - depends on local DB driver/runtime
return [], {"status": "unavailable", "warning": f"task_spawn_log unavailable: {exc}"}
finally:
await engine.dispose()
return [{
"source": "task_spawn_log",
"activity_id": str(row["activity_def_id"]),
"created_at": iso(coerce_datetime(row["created_at"])),
"output_validated": bool(row["output_validated"]),
"summary": f"instruction {row['source_id']} task output validation",
} for row in rows], {"status": "ok", "count": len(rows)}
def load_state_hub_progress(state_hub_url: str | None, window: dict[str, Any], *, limit: int, timeout_seconds: float, event_types: list[str | None] | None = None) -> tuple[list[dict[str, Any]], dict[str, Any]]:
if not state_hub_url:
return [], {"status": "unavailable", "warning": "STATE_HUB_URL is not set"}
event_types = event_types or [None]
payload: list[Any] = []
try:
for event_type in event_types:
params: dict[str, Any] = {"limit": limit}
if event_type:
params["event_type"] = event_type
response = httpx.get(f"{state_hub_url.rstrip('/')}/progress/", params=params, timeout=timeout_seconds)
response.raise_for_status()
items = response.json()
if not isinstance(items, list):
return [], {"status": "unavailable", "warning": "State Hub progress response was not a list"}
payload.extend(items)
except Exception as exc:
detail = str(exc) or exc.__class__.__name__
return [], {"status": "unavailable", "warning": f"State Hub progress unavailable: {detail}"}
evidence: list[dict[str, Any]] = []
seen: set[str] = set()
for item in payload:
if not isinstance(item, dict):
continue
item_id = str(item.get("id") or id(item))
if item_id in seen:
continue
seen.add(item_id)
detail = item.get("detail") if isinstance(item.get("detail"), dict) else {}
created_at = coerce_datetime(item.get("created_at"))
scheduled_for = coerce_datetime(detail.get("scheduled_for"))
event_time = scheduled_for or created_at
if event_time and not in_window(event_time, window):
continue
run_id = string_or_none(detail.get("activity_core_run_id"))
activity_id = string_or_none(detail.get("activity_id"))
if not run_id and not activity_id and item.get("event_type") != "schedule_miss":
continue
evidence.append({
"source": "state_hub_progress",
"activity_id": activity_id,
"run_id": run_id,
"scheduled_for": iso(scheduled_for),
"created_at": iso(created_at),
"event_type": string_or_none(item.get("event_type")),
"output_validated": bool_or_none(detail.get("output_validated")),
"validation_error": shorten(string_or_none(detail.get("validation_error"))),
"status": string_or_none(detail.get("status")),
"summary": shorten(string_or_none(item.get("summary"))),
"path": string_or_none(detail.get("working_memory_path")),
})
labels = [item or "all" for item in event_types]
return evidence, {"status": "ok", "count": len(evidence), "event_types": labels}
def load_working_memory_evidence(working_memory_dir: str | None, window: dict[str, Any]) -> tuple[list[dict[str, Any]], dict[str, Any]]:
if not working_memory_dir:
return [], {"status": "unavailable", "warning": "working-memory directory is not configured"}
root = Path(working_memory_dir).expanduser()
if not root.exists():
return [], {"status": "unavailable", "warning": f"working-memory directory does not exist: {root}"}
evidence: list[dict[str, Any]] = []
for path in sorted(root.glob("*.md")):
meta = read_frontmatter(path)
if not meta or meta.get("source") != "activity-core":
continue
scheduled_for = coerce_datetime(meta.get("scheduled_for"))
created_at = coerce_datetime(meta.get("created"))
event_time = scheduled_for or created_at
if event_time and not in_window(event_time, window):
continue
evidence.append({
"source": "working_memory",
"activity_id": string_or_none(meta.get("activity_id")),
"run_id": string_or_none(meta.get("activity_core_run_id")),
"scheduled_for": iso(scheduled_for),
"created_at": iso(created_at),
"output_validated": bool_or_none(meta.get("output_validated")),
"path": str(path),
"summary": path.name,
})
return evidence, {"status": "ok", "count": len(evidence), "path": str(root)}
def read_frontmatter(path: Path) -> dict[str, Any]:
try:
value = path.read_text(encoding="utf-8")
except OSError:
return {}
if not value.startswith("---\n"):
return {}
parts = value.split("---\n", 2)
if len(parts) < 3:
return {}
loaded = yaml.safe_load(parts[1])
return loaded if isinstance(loaded, dict) else {}
async def load_temporal_visibility(temporal_host: str | None, namespace: str, definitions: list[dict[str, Any]], *, timeout_seconds: float) -> tuple[dict[str, dict[str, Any]], dict[str, Any]]:
if not temporal_host:
return {}, {"status": "skipped", "warning": "TEMPORAL_HOST is not set"}
try:
from temporalio.client import Client
client = await asyncio.wait_for(Client.connect(temporal_host, namespace=namespace), timeout=timeout_seconds)
except Exception as exc:
return {}, {"status": "unavailable", "warning": f"Temporal unavailable: {exc}"}
records: dict[str, dict[str, Any]] = {}
for definition in definitions:
sid = automation_schedule_id(definition)
try:
desc = await asyncio.wait_for(client.get_schedule_handle(sid).describe(), timeout=timeout_seconds)
state = getattr(getattr(desc, "schedule", None), "state", None)
info = getattr(desc, "info", None)
records[definition["id"]] = {
"schedule_id": sid,
"available": True,
"paused": getattr(state, "paused", None),
"missed_catchup_window": int(getattr(info, "num_actions_missed_catchup_window", 0) or 0),
"last_fired_at": iso(latest_recent_action_time(getattr(info, "recent_actions", None) or [])),
"workflows": await list_workflows(client, definition["id"], timeout_seconds),
}
except Exception as exc:
records[definition["id"]] = {"schedule_id": sid, "available": False, "warning": str(exc)}
return records, {"status": "ok", "count": len(records)}
async def list_workflows(client: Any, activity_id: str, timeout_seconds: float) -> list[dict[str, Any]]:
async def collect() -> list[dict[str, Any]]:
workflows: list[dict[str, Any]] = []
async for item in client.list_workflows(query=f'ActivityId="{activity_id}"'):
workflows.append({
"id": item.id,
"run_id": item.run_id,
"status": str(item.status),
"start_time": iso(coerce_datetime(getattr(item, "start_time", None))),
"close_time": iso(coerce_datetime(getattr(item, "close_time", None))),
})
if len(workflows) >= 5:
break
return workflows
return await asyncio.wait_for(collect(), timeout=timeout_seconds)
def latest_recent_action_time(actions: list[Any]) -> datetime | None:
times = [coerce_datetime(getattr(item, "scheduled_at", None) or getattr(item, "started_at", None)) for item in actions]
times = [item for item in times if item is not None]
return max(times) if times else None
def automation_schedule_id(definition: dict[str, Any]) -> str:
activity_id = definition["id"]
if definition.get("trigger_type") == "scheduled":
return f"activity-schedule-{activity_id}-once"
return schedule_id(activity_id)
async def build_report(args: argparse.Namespace) -> tuple[dict[str, Any], int]:
window = resolve_window(args.since, args.until, args.timezone)
timeout = max(float(args.timeout_seconds), 0.1)
warnings: list[str] = []
sources: dict[str, dict[str, Any]] = {}
try:
definitions, sources["definitions"] = await asyncio.wait_for(
load_definitions(args, warnings),
timeout=timeout,
)
except asyncio.TimeoutError:
warning = "definition DB timed out; using file definitions"
warnings.append(warning)
definitions = file_definitions()
sources["definitions"] = {"status": "degraded", "source": "files", "warning": warning}
definitions = filter_definitions(definitions, args.activity_id, args.activity_name)
try:
runs_by_activity, sources["activity_runs"] = await asyncio.wait_for(
load_runs(args.db_url, definitions, window),
timeout=timeout,
)
except asyncio.TimeoutError:
runs_by_activity = {}
sources["activity_runs"] = {"status": "unavailable", "warning": "activity_runs timed out"}
try:
spawn_evidence, sources["task_spawn_log"] = await asyncio.wait_for(
load_spawn_validation(args.db_url, definitions, window),
timeout=timeout,
)
except asyncio.TimeoutError:
spawn_evidence = []
sources["task_spawn_log"] = {"status": "unavailable", "warning": "task_spawn_log timed out"}
progress_evidence, sources["state_hub_progress"] = load_state_hub_progress(
args.state_hub_url,
window,
limit=args.progress_limit,
timeout_seconds=timeout,
event_types=progress_event_types(args),
)
wm_evidence, sources["working_memory"] = load_working_memory_evidence(args.working_memory_dir, window)
temporal_by_activity, sources["temporal"] = await load_temporal_visibility(
args.temporal_host,
args.temporal_namespace,
definitions,
timeout_seconds=timeout,
)
for source in sources.values():
if source.get("status") not in {"ok", "skipped"} and source.get("warning"):
warnings.append(str(source["warning"]))
all_evidence = progress_evidence + wm_evidence + spawn_evidence
definitions = add_evidence_only_definitions(definitions, all_evidence)
activities = []
runs_available = sources["activity_runs"].get("status") == "ok"
for definition in definitions:
runs = runs_by_activity.get(definition["id"], [])
activities.append(classify_activity(
definition,
window,
runs,
evidence_for_activity(definition, runs, all_evidence, window),
temporal_by_activity.get(definition["id"]),
expected_fires(definition, window),
runs_available=runs_available,
))
report = {
"mode": "automation-status",
"generated_at": datetime.now(tz=timezone.utc).isoformat(),
"window": {
"since": window["since"].isoformat(),
"until": window["until"].isoformat(),
"timezone": window["timezone"],
"since_utc": window["since_utc"].isoformat(),
"until_utc": window["until_utc"].isoformat(),
},
"sources": sources,
"summary": summarize(activities),
"activities": activities,
"warnings": sorted(set(warnings)),
}
exit_code = 1 if any(item["status"] in FAILURE_STATUSES for item in activities) else 0
return report, exit_code
def add_evidence_only_definitions(definitions: list[dict[str, Any]], evidence: list[dict[str, Any]]) -> list[dict[str, Any]]:
known = {item["id"] for item in definitions}
result = list(definitions)
for item in evidence:
activity_id = item.get("activity_id")
if not activity_id or activity_id in known:
continue
known.add(activity_id)
result.append({
"id": activity_id,
"name": f"Activity {activity_id}",
"enabled": True,
"trigger_type": "evidence",
"trigger_config": {},
"instructions": [],
"source": "evidence",
})
return result
def evidence_for_activity(definition: dict[str, Any], runs: list[dict[str, Any]], all_evidence: list[dict[str, Any]], window: dict[str, Any]) -> list[dict[str, Any]]:
run_ids = {item["run_id"] for item in runs}
selected: list[dict[str, Any]] = []
for item in all_evidence:
if item.get("activity_id") == definition["id"] or (item.get("run_id") and item["run_id"] in run_ids):
selected.append(item)
continue
event_time = coerce_datetime(item.get("scheduled_for") or item.get("created_at"))
if item.get("activity_id") is None and item.get("run_id") in run_ids and (not event_time or in_window(event_time, window)):
selected.append(item)
return selected
def classify_activity(definition: dict[str, Any], window: dict[str, Any], runs: list[dict[str, Any]], evidence: list[dict[str, Any]], temporal: dict[str, Any] | None, expected: list[str], *, runs_available: bool) -> dict[str, Any]:
warnings: list[str] = []
if not runs_available:
warnings.append("activity_runs source unavailable; missed-run verdict is unknown")
if temporal and not temporal.get("available", False) and temporal.get("warning"):
warnings.append(f"Temporal schedule visibility unavailable: {temporal['warning']}")
if temporal and temporal.get("paused") and definition.get("enabled"):
warnings.append("Temporal schedule is paused while ActivityDefinition is enabled")
workflows = temporal.get("workflows", []) if temporal else []
if not definition.get("enabled"):
status = "disabled"
elif any(item.get("output_validated") is False for item in evidence):
status = "validation_failed"
elif any(workflow_status_matches(item, {"RUNNING"}) for item in workflows):
status = "running"
elif any(workflow_status_matches(item, {"FAILED", "TIMED_OUT", "TERMINATED"}) for item in workflows):
status = "retrying"
elif temporal and int(temporal.get("missed_catchup_window") or 0) > 0:
status = "missed"
elif runs_available and definition.get("enabled") and len(expected) > len(runs):
status = "missed"
elif runs or evidence:
status = "completed"
elif expected:
status = "unknown"
else:
status = "no_due"
return {
"id": definition["id"],
"name": definition["name"],
"enabled": definition["enabled"],
"trigger_type": definition["trigger_type"],
"trigger_config": public_trigger_config(definition.get("trigger_config", {})),
"schedule_id": automation_schedule_id(definition),
"definition_source": definition.get("source"),
"status": status,
"expected_fires": expected,
"expected_fire_count": len(expected),
"observed_run_count": len(runs),
"runs": runs,
"evidence": evidence,
"temporal": temporal,
"warnings": warnings,
"window": {"since": window["since"].isoformat(), "until": window["until"].isoformat(), "timezone": window["timezone"]},
}
def workflow_status_matches(workflow: dict[str, Any], names: set[str]) -> bool:
value = str(workflow.get("status") or "").upper()
return any(name in value for name in names)
def public_trigger_config(config: dict[str, Any]) -> dict[str, Any]:
allowed = {"trigger_type", "cron_expression", "timezone", "misfire_policy", "catchup_window_seconds", "jitter_seconds", "at"}
return {key: config.get(key) for key in allowed if key in config}
def summarize(activities: list[dict[str, Any]]) -> dict[str, Any]:
counts: dict[str, int] = {}
for item in activities:
counts[item["status"]] = counts.get(item["status"], 0) + 1
return {"activity_count": len(activities), "status_counts": counts, "failure_count": sum(counts.get(status, 0) for status in FAILURE_STATUSES)}
def render_human(report: dict[str, Any]) -> str:
window = report["window"]
lines = [
f"Automation status {window['since']} -> {window['until']} ({window['timezone']})",
render_source_line(report["sources"]),
render_summary_line(report["summary"]),
"",
]
for activity in report["activities"]:
lines.append(f"- {activity['name']} [{activity['status']}] expected={activity['expected_fire_count']} runs={activity['observed_run_count']} evidence={len(activity['evidence'])}")
if activity["expected_fires"]:
suffix = " ..." if len(activity["expected_fires"]) > 3 else ""
lines.append(" expected fires: " + ", ".join(activity["expected_fires"][:3]) + suffix)
for run in activity["runs"][:3]:
lines.append(f" run {run['run_id']} scheduled_for={run['scheduled_for']} fired_at={run['fired_at']} tasks={run['tasks_spawned']}")
for evidence in activity["evidence"][:3]:
detail = f" evidence {evidence['source']}"
if evidence.get("event_type"):
detail += f" event_type={evidence['event_type']}"
if evidence.get("run_id"):
detail += f" run={evidence['run_id']}"
if evidence.get("output_validated") is not None:
detail += f" output_validated={str(evidence['output_validated']).lower()}"
if evidence.get("validation_error"):
detail += f" validation_error={evidence['validation_error']}"
lines.append(detail)
for warning in activity["warnings"]:
lines.append(f" warning: {warning}")
if report["warnings"]:
lines.extend(["", "Warnings:"])
lines.extend(f"- {warning}" for warning in report["warnings"])
return "\n".join(lines)
def render_source_line(sources: dict[str, dict[str, Any]]) -> str:
parts = []
for name, source in sources.items():
label = f"{name}={source.get('status', 'unknown')}"
if source.get("source"):
label += f"/{source['source']}"
parts.append(label)
return "Sources: " + ", ".join(parts)
def render_summary_line(summary: dict[str, Any]) -> str:
counts = summary.get("status_counts") or {}
if not counts:
return "Summary: no scheduled automations found"
return "Summary: " + ", ".join(f"{status}={count}" for status, count in sorted(counts.items()))
def coerce_datetime(value: Any) -> datetime | None:
if value is None:
return None
if isinstance(value, datetime):
result = value
elif isinstance(value, date):
result = datetime.combine(value, time.min)
elif isinstance(value, str):
raw = value.strip()
if not raw:
return None
if raw.endswith("Z"):
raw = raw[:-1] + "+00:00"
try:
result = datetime.fromisoformat(raw)
except ValueError:
return None
else:
return None
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
return result.astimezone(timezone.utc)
def in_window(value: datetime, window: dict[str, Any]) -> bool:
instant = coerce_datetime(value)
return bool(instant and window["since_utc"] <= instant <= window["until_utc"])
def iso(value: datetime | None) -> str | None:
return value.isoformat() if value else None
def string_or_none(value: Any) -> str | None:
if value is None:
return None
result = str(value)
return result or None
def bool_or_none(value: Any) -> bool | None:
if value is None or isinstance(value, bool):
return value
if isinstance(value, str):
lowered = value.strip().lower()
if lowered in {"true", "yes", "1"}:
return True
if lowered in {"false", "no", "0"}:
return False
return None
def shorten(value: str | None, limit: int = 240) -> str | None:
if value is None:
return None
return value if len(value) <= limit else value[: limit - 3] + "..."
def parse_inventory_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="List configured activity-core scheduled automations without LLMs."
)
parser.add_argument("--activity-id", action="append", default=_env_list("ACTIVITY_ID"))
parser.add_argument("--activity-name", action="append", default=_env_list("ACTIVITY_NAME"))
parser.add_argument(
"--enabled",
default=os.environ.get("ENABLED", "all"),
help="Filter by enabled state: all, true/enabled, or false/disabled.",
)
parser.add_argument(
"--trigger-type",
"--trigger",
dest="trigger_type",
action="append",
default=_env_list("TRIGGER"),
help="Filter by trigger type: cron or scheduled. Repeatable or comma-separated.",
)
parser.add_argument("--db-url", default=os.environ.get("ACTCORE_DB_URL"))
parser.add_argument("--temporal-host", default=os.environ.get("TEMPORAL_HOST"))
parser.add_argument(
"--temporal-namespace",
default=os.environ.get("TEMPORAL_NAMESPACE", DEFAULT_TEMPORAL_NAMESPACE),
)
parser.add_argument(
"--timeout-seconds",
type=float,
default=float(os.environ.get(
"AUTOMATION_INVENTORY_TIMEOUT_SECONDS",
os.environ.get("AUTOMATION_STATUS_TIMEOUT_SECONDS", "5"),
)),
)
parser.add_argument("--format", choices=("human", "json"), default=os.environ.get("FORMAT", "human"))
return parser.parse_args(argv)
def _env_list(name: str) -> list[str]:
raw = os.environ.get(name, "")
return [item.strip() for item in raw.split(",") if item.strip()]
def parse_enabled_filter(value: str | None) -> bool | None:
if value is None or not str(value).strip() or str(value).strip().lower() == "all":
return None
lowered = str(value).strip().lower()
if lowered in {"true", "yes", "1", "enabled"}:
return True
if lowered in {"false", "no", "0", "disabled"}:
return False
raise ValueError("enabled filter must be all, true/enabled, or false/disabled")
def normalize_trigger_filters(values: list[str] | None) -> set[str]:
result: set[str] = set()
for value in values or []:
for item in str(value).split(","):
token = item.strip().lower()
if not token or token == "all":
continue
if token not in {"cron", "scheduled"}:
raise ValueError("trigger filter must be cron or scheduled")
result.add(token)
return result
def filter_inventory_definitions(
definitions: list[dict[str, Any]],
ids: list[str],
names: list[str],
enabled: bool | None,
trigger_types: set[str],
) -> list[dict[str, Any]]:
selected = filter_definitions(definitions, ids, names)
if enabled is not None:
selected = [item for item in selected if bool(item.get("enabled")) is enabled]
if trigger_types:
selected = [item for item in selected if str(item.get("trigger_type") or "").lower() in trigger_types]
return selected
def inventory_row(definition: dict[str, Any], temporal: dict[str, Any] | None) -> dict[str, Any]:
trigger = public_trigger_config(definition.get("trigger_config", {}))
row = {
"id": definition["id"],
"name": definition["name"],
"enabled": bool(definition.get("enabled")),
"trigger_type": definition["trigger_type"],
"schedule_id": automation_schedule_id(definition),
"definition_source": definition.get("source"),
"sources": inventory_sources(definition, temporal),
"trigger": trigger,
"cron_expression": trigger.get("cron_expression"),
"at": trigger.get("at"),
"timezone": trigger.get("timezone"),
"misfire_policy": trigger.get("misfire_policy"),
"catchup_window_seconds": trigger.get("catchup_window_seconds"),
"jitter_seconds": trigger.get("jitter_seconds"),
"temporal": inventory_temporal_state(definition, temporal),
"drift_hints": inventory_drift_hints(definition, temporal),
}
return row
def inventory_sources(definition: dict[str, Any], temporal: dict[str, Any] | None) -> list[str]:
sources = [str(definition.get("source") or "definition")]
if temporal is None:
sources.append("temporal:not_checked")
elif temporal.get("available"):
sources.append("temporal")
else:
sources.append("temporal:unavailable")
return sources
def inventory_temporal_state(definition: dict[str, Any], temporal: dict[str, Any] | None) -> dict[str, Any]:
expected_schedule_id = automation_schedule_id(definition)
if temporal is None:
return {"status": "not_checked", "schedule_id": expected_schedule_id}
if not temporal.get("available"):
return {
"status": "missing_or_unavailable",
"schedule_id": temporal.get("schedule_id") or expected_schedule_id,
"warning": temporal.get("warning"),
}
paused = bool(temporal.get("paused"))
return {
"status": "paused" if paused else "active",
"schedule_id": temporal.get("schedule_id") or expected_schedule_id,
"paused": paused,
"last_fired_at": temporal.get("last_fired_at"),
"missed_catchup_window": int(temporal.get("missed_catchup_window") or 0),
}
def inventory_drift_hints(definition: dict[str, Any], temporal: dict[str, Any] | None) -> list[str]:
if temporal is None:
return []
hints: list[str] = []
expected_schedule_id = automation_schedule_id(definition)
observed_schedule_id = temporal.get("schedule_id")
if observed_schedule_id and observed_schedule_id != expected_schedule_id:
hints.append("temporal_schedule_id_mismatch")
if not temporal.get("available"):
hints.append("temporal_schedule_missing_or_unavailable")
return hints
paused = bool(temporal.get("paused"))
enabled = bool(definition.get("enabled"))
if enabled and paused:
hints.append("temporal_paused_but_definition_enabled")
if not enabled and not paused:
hints.append("temporal_active_but_definition_disabled")
if int(temporal.get("missed_catchup_window") or 0) > 0:
hints.append("temporal_missed_catchup_window")
return hints
async def build_inventory_report(args: argparse.Namespace) -> tuple[dict[str, Any], int]:
timeout = max(float(args.timeout_seconds), 0.1)
warnings: list[str] = []
sources: dict[str, dict[str, Any]] = {}
try:
definitions, sources["definitions"] = await asyncio.wait_for(
load_definitions(args, warnings),
timeout=timeout,
)
except asyncio.TimeoutError:
warning = "definition DB timed out; using file definitions"
warnings.append(warning)
definitions = file_definitions()
sources["definitions"] = {"status": "degraded", "source": "files", "warning": warning}
enabled = parse_enabled_filter(args.enabled)
trigger_types = normalize_trigger_filters(args.trigger_type)
definitions = filter_inventory_definitions(
definitions,
args.activity_id,
args.activity_name,
enabled,
trigger_types,
)
temporal_by_activity, sources["temporal"] = await load_temporal_visibility(
args.temporal_host,
args.temporal_namespace,
definitions,
timeout_seconds=timeout,
)
for source in sources.values():
if source.get("status") not in {"ok", "skipped"} and source.get("warning"):
warnings.append(str(source["warning"]))
automations = [
inventory_row(definition, temporal_by_activity.get(definition["id"]))
for definition in definitions
]
report = {
"mode": "automation-inventory",
"generated_at": datetime.now(tz=timezone.utc).isoformat(),
"sources": sources,
"summary": summarize_inventory(automations),
"automations": automations,
"filters": {
"activity_id": args.activity_id,
"activity_name": args.activity_name,
"enabled": args.enabled,
"trigger_type": sorted(trigger_types),
},
"warnings": sorted(set(warnings)),
}
return report, 0
def summarize_inventory(automations: list[dict[str, Any]]) -> dict[str, Any]:
trigger_counts: dict[str, int] = {}
temporal_counts: dict[str, int] = {}
for item in automations:
trigger = str(item.get("trigger_type") or "unknown")
trigger_counts[trigger] = trigger_counts.get(trigger, 0) + 1
temporal_status = str((item.get("temporal") or {}).get("status") or "unknown")
temporal_counts[temporal_status] = temporal_counts.get(temporal_status, 0) + 1
return {
"automation_count": len(automations),
"enabled_count": sum(1 for item in automations if item.get("enabled")),
"disabled_count": sum(1 for item in automations if not item.get("enabled")),
"trigger_counts": trigger_counts,
"temporal_status_counts": temporal_counts,
"drift_count": sum(1 for item in automations if item.get("drift_hints")),
}
def render_inventory_human(report: dict[str, Any]) -> str:
lines = [
f"Automation inventory generated_at={report['generated_at']}",
render_source_line(report["sources"]),
render_inventory_summary_line(report["summary"]),
"",
]
for item in report["automations"]:
state = "enabled" if item["enabled"] else "disabled"
trigger_value = item.get("cron_expression") or item.get("at") or "-"
temporal = item.get("temporal") or {}
line = (
f"- {item['name']} [{state} {item['trigger_type']}] "
f"schedule={item['schedule_id']} trigger={trigger_value} "
f"tz={item.get('timezone') or '-'} source={item.get('definition_source') or '-'} "
f"temporal={temporal.get('status') or 'unknown'}"
)
lines.append(line)
if item.get("misfire_policy") or item.get("catchup_window_seconds") is not None:
lines.append(
" policy="
f"{item.get('misfire_policy') or '-'} catchup={item.get('catchup_window_seconds') if item.get('catchup_window_seconds') is not None else '-'}"
)
if item.get("drift_hints"):
lines.append(" drift: " + ", ".join(item["drift_hints"]))
if temporal.get("warning"):
lines.append(f" temporal warning: {shorten(str(temporal['warning']))}")
if report["warnings"]:
lines.extend(["", "Warnings:"])
lines.extend(f"- {warning}" for warning in report["warnings"])
return "\n".join(lines)
def render_inventory_summary_line(summary: dict[str, Any]) -> str:
trigger_counts = summary.get("trigger_counts") or {}
trigger_text = ", ".join(f"{key}={value}" for key, value in sorted(trigger_counts.items())) or "none"
return (
"Summary: "
f"total={summary.get('automation_count', 0)} "
f"enabled={summary.get('enabled_count', 0)} "
f"disabled={summary.get('disabled_count', 0)} "
f"drift={summary.get('drift_count', 0)} "
f"triggers=({trigger_text})"
)
async def async_inventory_main(argv: list[str] | None = None) -> int:
args = parse_inventory_args(argv)
try:
report, exit_code = await build_inventory_report(args)
except ValueError as exc:
print(f"automation_inventory: {exc}", file=sys.stderr)
return 2
if args.format == "json":
print(json.dumps(report, indent=2, sort_keys=True))
else:
print(render_inventory_human(report))
return exit_code
def inventory_main(argv: list[str] | None = None) -> int:
return asyncio.run(async_inventory_main(argv))
async def async_main(argv: list[str] | None = None) -> int:
args = parse_args(argv)
try:
report, exit_code = await build_report(args)
except ValueError as exc:
print(f"automation_status: {exc}", file=sys.stderr)
return 2
if args.format == "json":
print(json.dumps(report, indent=2, sort_keys=True))
else:
print(render_human(report))
return exit_code
def main(argv: list[str] | None = None) -> int:
return asyncio.run(async_main(argv))
if __name__ == "__main__":
raise SystemExit(main())