From ffe10f098efe2459c4c3929668720de7f9927909 Mon Sep 17 00:00:00 2001 From: tegwick Date: Wed, 1 Jul 2026 20:12:04 +0200 Subject: [PATCH] Add automation status surface --- AGENTS.md | 15 + Makefile | 12 + SCOPE.md | 9 + docs/runbook.md | 41 + k8s/railiance/20-runtime.yaml | 7 +- scripts/automation_status.py | 8 + src/activity_core/activities.py | 1 + src/activity_core/automation_status.py | 811 ++++++++++++++++++ src/activity_core/llm_client.py | 39 + src/activity_core/report_sinks.py | 11 + src/activity_core/rules/executor.py | 53 +- tests/rules/test_executor.py | 41 + tests/test_automation_status.py | 184 ++++ tests/test_llm_client.py | 11 +- tests/test_railiance_ops_inventory_wiring.py | 9 + tests/test_report_sinks.py | 10 + ...-0006-post-triage-operational-hardening.md | 27 +- ...16-llm-output-robustness-trust-boundary.md | 42 +- ...ITY-WP-0018-own-infra-automation-status.md | 248 ++++++ ...9-automation-schedule-inventory-targets.md | 164 ++++ 20 files changed, 1732 insertions(+), 11 deletions(-) create mode 100644 scripts/automation_status.py create mode 100644 src/activity_core/automation_status.py create mode 100644 tests/test_automation_status.py create mode 100644 workplans/ACTIVITY-WP-0018-own-infra-automation-status.md create mode 100644 workplans/ACTIVITY-WP-0019-automation-schedule-inventory-targets.md diff --git a/AGENTS.md b/AGENTS.md index f9e0558..369def1 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -158,6 +158,21 @@ get wrong. --- +## Automation Scheduling Preference + +Durable activity-core automations must use this repo's own infrastructure: +Temporal Schedules, NATS JetStream, activity-core run records, State Hub +progress, and configured report/evidence sinks. Do not use coding +assistant-provided automation, reminder, or heartbeat tooling as the execution +or evidence source for production or operational recurrence. + +Coding assistants may run repo-native inspection commands and summarize their +outputs, but the baseline answer to questions like "How did our automations go +since Friday?" must come from deterministic local tooling such as the +ACTIVITY-WP-0018 automation status surface. + +--- + ## Workplan Convention (ADR-001) Work items originate as files in this repo — not in the hub. The hub is a diff --git a/Makefile b/Makefile index 1644b83..756ae72 100644 --- a/Makefile +++ b/Makefile @@ -2,6 +2,7 @@ export .PHONY: sync-event-types sync-activity-definitions sync-schedules test migrate sync-all \ + automation-status automation-status-json \ dev-up dev-down railiance-up railiance-down \ start-worker start-api start-event-router help @@ -24,6 +25,17 @@ migrate: ## Apply all pending Alembic migrations sync-all: sync-event-types sync-activity-definitions ## Sync event types and activity definitions +# -- Automation status --------------------------------------------------------- + +SINCE ?= today +FORMAT ?= human + +automation-status: ## Report recent automation status from repo-owned evidence + uv run python scripts/automation_status.py --since "$(SINCE)" $(if $(UNTIL),--until "$(UNTIL)",) --format "$(FORMAT)" + +automation-status-json: ## Report recent automation status as JSON + $(MAKE) automation-status FORMAT=json + # ── Infrastructure ───────────────────────────────────────────────────────────── dev-up: ## Start full dev stack (Temporal + PG + ES + NATS) diff --git a/SCOPE.md b/SCOPE.md index 0b6660b..f1755f9 100644 --- a/SCOPE.md +++ b/SCOPE.md @@ -90,6 +90,9 @@ The two evaluation modes: - **REST admin API** (FastAPI): CRUD for ActivityDefinitions, manual trigger, event type registry queries. - **Prometheus metrics**: Temporal SDK metrics exposed for scraping. +- **Automation status surface**: deterministic, non-LLM status reporting via + `make automation-status` / `scripts/automation_status.py`, using repo-owned + evidence sources rather than coding assistant scheduler state. - **Operational runbook**: `docs/runbook.md`. --- @@ -116,6 +119,10 @@ The two evaluation modes: runs on Railiance infrastructure (or Docker Compose for dev). - **End-user task UI** — tasks land in issue-core; presentation is separate. - **Synchronous request-response patterns** — Temporal is async-first. +- **Coding assistant automation infrastructure** — assistant-provided reminders, + heartbeats, or scheduled jobs are not the execution or evidence authority for + activity-core automations. Assistants may run and summarize repo-native + commands only. --- @@ -132,6 +139,8 @@ The two evaluation modes: commands. - You are replacing scattered bespoke cron jobs and manual coordination with a governed, observable automation layer. +- You need to answer "how did our automations go since Friday?" from + deterministic repo-native evidence before any optional LLM summary. --- diff --git a/docs/runbook.md b/docs/runbook.md index 0136854..7ffae30 100644 --- a/docs/runbook.md +++ b/docs/runbook.md @@ -136,6 +136,47 @@ The response reports: - `schedules.deleted_orphans` - bounded `errors[]` +## Automation status + +Use the repo-native status command to answer operator questions such as "how did +our automations go since Friday?". This is the baseline evidence surface; LLMs +or coding assistants may summarize the output, but they are not the scheduler or +source of truth. + +```bash +# Human-readable status. `friday` resolves in Europe/Berlin by default. +make automation-status SINCE=friday + +# JSON for scripts or assistant summarization. +make automation-status-json SINCE=2026-06-26 +``` + +The command reads activity-core owned evidence only: ActivityDefinition files or +DB rows, `activity_runs`, State Hub progress, working-memory report notes, and +Temporal visibility when `TEMPORAL_HOST` is configured. Missing live sources are +reported as warnings rather than hidden. It exits non-zero for real automation +failures such as `missed`, `validation_failed`, or `sink_failed`. + +Useful knobs: + +```bash +AUTOMATION_STATUS_TIMEOUT_SECONDS=10 make automation-status SINCE=friday +make automation-status SINCE=2026-06-26 FORMAT=json +make automation-status SINCE=2026-06-26 UNTIL=2026-06-27 ACTCORE_DB_URL= +``` + +Example distinction from the June 2026 daily triage evidence: + +```text +- Activity 6fca51fa-387a-4fd0-bc4e-d62c29eb859a [validation_failed] expected=0 runs=0 evidence=2 + evidence state_hub_progress event_type=daily_triage run=ebec6e41... output_validated=false validation_error=Unterminated string... + evidence state_hub_progress event_type=daily_triage run=c7370f9c... output_validated=false validation_error=Expecting ',' delimiter... +``` + +That means the schedule/report path left evidence, but the report was not a +clean validated output. Disabled schedules, such as the gated weekly coding +retro, are reported as `disabled` and are not counted as missed runs. + `event_types` defaults to `false` for this endpoint because event-triggered definitions already reload from the DB in the event router path; opt in when the operator intentionally changed event type definition files: diff --git a/k8s/railiance/20-runtime.yaml b/k8s/railiance/20-runtime.yaml index 06157d6..37f5b64 100644 --- a/k8s/railiance/20-runtime.yaml +++ b/k8s/railiance/20-runtime.yaml @@ -95,7 +95,8 @@ data: (strategic_value + time_criticality + risk_reduction + opportunity_enablement) / job_size. Use integer factor values from 1 to 5, round score to one decimal place, sort recommendations by rank, and return at - most 10 recommendations. + most 7 recommendations. If uncertain, emit fewer well-formed + recommendations rather than more. Curated digest: {context.daily_triage_digest} @@ -432,7 +433,7 @@ data: "recommendations": { "type": "array", "minItems": 1, - "maxItems": 10, + "maxItems": 7, "items": { "type": "object", "required": ["rank", "candidate", "action", "why", "confidence", "wsjf"], @@ -441,7 +442,7 @@ data: "rank": { "type": "integer", "minimum": 1, - "maximum": 10 + "maximum": 7 }, "candidate": { "type": "string" diff --git a/scripts/automation_status.py b/scripts/automation_status.py new file mode 100644 index 0000000..6436e91 --- /dev/null +++ b/scripts/automation_status.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python3 +"""CLI wrapper for the repo-native automation status report.""" + +from activity_core.automation_status import main + + +if __name__ == "__main__": + raise SystemExit(main()) \ No newline at end of file diff --git a/src/activity_core/activities.py b/src/activity_core/activities.py index 4ec54e7..5585ab4 100644 --- a/src/activity_core/activities.py +++ b/src/activity_core/activities.py @@ -366,6 +366,7 @@ async def evaluate_instructions(payload: dict) -> dict: "output_validated": result.output_validated, "review_required": result.review_required, "validation_error": result.validation_error, + "llm_response_metadata": result.llm_response_metadata, }) for spec in result.tasks: task_specs.append({ diff --git a/src/activity_core/automation_status.py b/src/activity_core/automation_status.py new file mode 100644 index 0000000..8bcf05d --- /dev/null +++ b/src/activity_core/automation_status.py @@ -0,0 +1,811 @@ +"""Repo-native automation status reporting without LLM calls.""" + +from __future__ import annotations + +import argparse +import asyncio +import json +import os +import sys +import uuid +from datetime import date, datetime, time, timedelta, timezone +from pathlib import Path +from typing import Any +from zoneinfo import ZoneInfo + +import httpx +import yaml +from sqlalchemy import bindparam, text + +from activity_core.db import make_engine +from activity_core.definition_parser import scan_and_parse +from activity_core.schedule_manager import schedule_id +from activity_core.sync_activity_definitions import ACTIVITY_DEFINITION_ID_NAMESPACE + +DEFAULT_TIMEZONE = "Europe/Berlin" +DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000" +DEFAULT_WORKING_MEMORY_DIR = "/home/worsch/the-custodian/memory/working" +DEFAULT_TEMPORAL_NAMESPACE = "default" +FAILURE_STATUSES = {"missed", "validation_failed", "sink_failed"} +WEEKDAYS = { + "monday": 0, + "tuesday": 1, + "wednesday": 2, + "thursday": 3, + "friday": 4, + "saturday": 5, + "sunday": 6, +} + + +def parse_args(argv: list[str] | None = None) -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Report recent activity-core automation status without LLMs." + ) + parser.add_argument( + "--since", + default=os.environ.get("SINCE", "today"), + help="Window start: YYYY-MM-DD, ISO datetime, today, yesterday, friday, or last-friday.", + ) + parser.add_argument( + "--until", + default=os.environ.get("UNTIL"), + help="Window end. Defaults to now; date-only values use that day's end.", + ) + parser.add_argument("--timezone", default=os.environ.get("AUTOMATION_STATUS_TIMEZONE", DEFAULT_TIMEZONE)) + parser.add_argument("--activity-id", action="append", default=[]) + parser.add_argument("--activity-name", action="append", default=[]) + parser.add_argument("--db-url", default=os.environ.get("ACTCORE_DB_URL")) + parser.add_argument("--state-hub-url", default=os.environ.get("STATE_HUB_URL", DEFAULT_STATE_HUB_URL)) + parser.add_argument("--working-memory-dir", default=os.environ.get("AUTOMATION_STATUS_WORKING_MEMORY_DIR", DEFAULT_WORKING_MEMORY_DIR)) + parser.add_argument("--temporal-host", default=os.environ.get("TEMPORAL_HOST")) + parser.add_argument("--temporal-namespace", default=os.environ.get("TEMPORAL_NAMESPACE", DEFAULT_TEMPORAL_NAMESPACE)) + parser.add_argument("--timeout-seconds", type=float, default=float(os.environ.get("AUTOMATION_STATUS_TIMEOUT_SECONDS", "5"))) + parser.add_argument("--progress-limit", type=int, default=int(os.environ.get("AUTOMATION_STATUS_PROGRESS_LIMIT", "100"))) + parser.add_argument("--progress-event-type", action="append", default=None, help="State Hub progress event type to read; repeatable. Use all for the unfiltered feed.") + parser.add_argument("--format", choices=("human", "json"), default=os.environ.get("FORMAT", "human")) + return parser.parse_args(argv) + + +def resolve_window(since: str, until: str | None, timezone_name: str, *, now: datetime | None = None) -> dict[str, Any]: + tz = ZoneInfo(timezone_name) + base = now or datetime.now(tz=tz) + if base.tzinfo is None: + base = base.replace(tzinfo=tz) + base = base.astimezone(tz) + since_local = _resolve_time(since, tz, base, "start") + until_local = _resolve_time(until, tz, base, "end") if until else base + if since_local > until_local: + raise ValueError(f"since {since_local.isoformat()} is after until {until_local.isoformat()}") + return { + "since": since_local, + "until": until_local, + "since_utc": since_local.astimezone(timezone.utc), + "until_utc": until_local.astimezone(timezone.utc), + "timezone": timezone_name, + } + + +def _resolve_time(raw: str, tz: ZoneInfo, now: datetime, boundary: str) -> datetime: + token = raw.strip().lower() + if token == "now": + return now + if token == "today": + return _day_boundary(now.date(), tz, boundary) + if token == "yesterday": + return _day_boundary(now.date() - timedelta(days=1), tz, boundary) + strict_last = False + if token.startswith("last-"): + strict_last = True + token = token.removeprefix("last-") + elif token.startswith("last "): + strict_last = True + token = token.removeprefix("last ") + if token in WEEKDAYS: + days_back = (now.weekday() - WEEKDAYS[token]) % 7 + if strict_last and days_back == 0: + days_back = 7 + return _day_boundary((now - timedelta(days=days_back)).date(), tz, boundary) + if "T" in raw or " " in raw: + value = datetime.fromisoformat(raw) + if value.tzinfo is None: + value = value.replace(tzinfo=tz) + return value.astimezone(tz) + try: + return _day_boundary(date.fromisoformat(raw), tz, boundary) + except ValueError as exc: + raise ValueError(f"unsupported time expression: {raw!r}") from exc + + +def _day_boundary(day: date, tz: ZoneInfo, boundary: str) -> datetime: + if boundary == "end": + return datetime.combine(day, time.max, tzinfo=tz) + return datetime.combine(day, time.min, tzinfo=tz) + + +def definition_uuid(raw_id: str) -> str: + try: + return str(uuid.UUID(raw_id)) + except ValueError: + return str(uuid.uuid5(ACTIVITY_DEFINITION_ID_NAMESPACE, raw_id)) + + +def file_definitions() -> list[dict[str, Any]]: + records = [] + for definition in scan_and_parse(): + trigger = dict(definition.trigger_config or {}) + trigger_type = str(trigger.get("trigger_type") or trigger.get("type") or "") + if trigger_type not in {"cron", "scheduled"}: + continue + records.append({ + "id": definition_uuid(definition.id), + "name": definition.name, + "enabled": bool(definition.enabled), + "trigger_type": trigger_type, + "trigger_config": trigger, + "instructions": list(definition.instructions or []), + "source": "files", + }) + return sorted(records, key=lambda item: item["name"]) + + +def filter_definitions(definitions: list[dict[str, Any]], ids: list[str], names: list[str]) -> list[dict[str, Any]]: + wanted_ids = {item.lower() for item in ids} + wanted_names = {item.lower() for item in names} + if not wanted_ids and not wanted_names: + return definitions + return [ + item for item in definitions + if item["id"].lower() in wanted_ids or item["name"].lower() in wanted_names + ] + + +def progress_event_types(args: argparse.Namespace) -> list[str | None]: + raw = args.progress_event_type + if raw is None: + env_value = os.environ.get("AUTOMATION_STATUS_PROGRESS_EVENT_TYPES") + raw = env_value.split(",") if env_value else ["daily_triage", "schedule_miss", "ops_inventory_probe"] + values = [item.strip() for item in raw if item and item.strip()] + return [None if item == "all" else item for item in values] + + +def expected_fires(definition: dict[str, Any], window: dict[str, Any]) -> list[str]: + cfg = definition.get("trigger_config") or {} + if definition.get("trigger_type") == "scheduled": + at = coerce_datetime(cfg.get("at")) + if at and in_window(at, window): + return [at.astimezone(ZoneInfo(window["timezone"])).isoformat()] + return [] + if definition.get("trigger_type") != "cron" or not cfg.get("cron_expression"): + return [] + tz = ZoneInfo(str(cfg.get("timezone") or window["timezone"])) + start = window["since_utc"].astimezone(tz).replace(second=0, microsecond=0) + end = window["until_utc"].astimezone(tz).replace(second=0, microsecond=0) + minutes = int((end - start).total_seconds() // 60) + 1 + if minutes < 0: + return [] + if minutes > 366 * 24 * 60: + raise ValueError("automation-status refuses to estimate cron windows longer than 366 days") + cron = parse_cron(cfg["cron_expression"]) + fires = [] + current = start + for _ in range(minutes): + if cron_matches(current, cron): + fires.append(current.isoformat()) + current += timedelta(minutes=1) + return fires + + +def parse_cron(expr: str) -> tuple[set[int], set[int], set[int], set[int], set[int]]: + parts = expr.split() + if len(parts) != 5: + raise ValueError(f"unsupported cron expression {expr!r}: expected 5 fields") + return ( + parse_cron_field(parts[0], 0, 59), + parse_cron_field(parts[1], 0, 23), + parse_cron_field(parts[2], 1, 31), + parse_cron_field(parts[3], 1, 12), + parse_cron_field(parts[4], 0, 7), + ) + + +def parse_cron_field(field: str, minimum: int, maximum: int) -> set[int]: + values: set[int] = set() + for chunk in field.split(","): + base, _, step_text = chunk.partition("/") + step = int(step_text) if step_text else 1 + if base == "*": + start, stop = minimum, maximum + elif "-" in base: + left, right = base.split("-", 1) + start, stop = int(left), int(right) + else: + start = stop = int(base) + if step <= 0 or start < minimum or stop > maximum or start > stop: + raise ValueError(f"cron field {field!r} outside {minimum}-{maximum}") + values.update(range(start, stop + 1, step)) + if minimum == 0 and maximum == 7 and 7 in values: + values.add(0) + values.discard(7) + return values + + +def cron_matches(value: datetime, cron: tuple[set[int], set[int], set[int], set[int], set[int]]) -> bool: + minute, hour, day, month, weekday = cron + cron_weekday = (value.weekday() + 1) % 7 + return value.minute in minute and value.hour in hour and value.day in day and value.month in month and cron_weekday in weekday + + +async def db_definitions(db_url: str) -> list[dict[str, Any]]: + engine = make_engine(db_url) + try: + async with engine.connect() as conn: + result = await conn.execute(text( + "select id, name, enabled, trigger_type, trigger_config, instructions_json, version " + "from activity_definitions where trigger_type in ('cron', 'scheduled') order by name" + )) + return [{ + "id": str(row["id"]), + "name": row["name"], + "enabled": bool(row["enabled"]), + "trigger_type": row["trigger_type"], + "trigger_config": dict(row["trigger_config"] or {}), + "instructions": list(row["instructions_json"] or []), + "version": row["version"], + "source": "db", + } for row in result.mappings().all()] + finally: + await engine.dispose() + + +async def load_definitions(args: argparse.Namespace, warnings: list[str]) -> tuple[list[dict[str, Any]], dict[str, Any]]: + if args.db_url: + try: + return await db_definitions(args.db_url), {"status": "ok", "source": "db"} + except Exception as exc: # pragma: no cover - depends on local DB driver/runtime + warning = f"definition DB unavailable; using file definitions: {exc}" + warnings.append(warning) + return file_definitions(), {"status": "degraded", "source": "files", "warning": warning} + warning = "ACTCORE_DB_URL is not set; using file definitions and skipping run-history DB checks" + warnings.append(warning) + return file_definitions(), {"status": "degraded", "source": "files", "warning": warning} + + +async def load_runs(db_url: str | None, definitions: list[dict[str, Any]], window: dict[str, Any]) -> tuple[dict[str, list[dict[str, Any]]], dict[str, Any]]: + if not db_url: + return {}, {"status": "unavailable", "warning": "ACTCORE_DB_URL is not set"} + if not definitions: + return {}, {"status": "ok", "count": 0} + ids = [uuid.UUID(item["id"]) for item in definitions] + stmt = text( + "select run_id, activity_id, scheduled_for, fired_at, tasks_spawned, version_used from activity_runs " + "where activity_id in :ids and coalesce(scheduled_for, fired_at) >= :since " + "and coalesce(scheduled_for, fired_at) <= :until order by fired_at" + ).bindparams(bindparam("ids", expanding=True)) + engine = make_engine(db_url) + try: + async with engine.connect() as conn: + result = await conn.execute(stmt, {"ids": ids, "since": window["since_utc"], "until": window["until_utc"]}) + rows = result.mappings().all() + except Exception as exc: # pragma: no cover - depends on local DB driver/runtime + return {}, {"status": "unavailable", "warning": f"activity_runs unavailable: {exc}"} + finally: + await engine.dispose() + grouped: dict[str, list[dict[str, Any]]] = {} + for row in rows: + record = { + "run_id": str(row["run_id"]), + "activity_id": str(row["activity_id"]), + "scheduled_for": iso(coerce_datetime(row["scheduled_for"])), + "fired_at": iso(coerce_datetime(row["fired_at"])), + "tasks_spawned": row["tasks_spawned"], + "version_used": row["version_used"], + } + grouped.setdefault(record["activity_id"], []).append(record) + return grouped, {"status": "ok", "count": len(rows)} + + +async def load_spawn_validation(db_url: str | None, definitions: list[dict[str, Any]], window: dict[str, Any]) -> tuple[list[dict[str, Any]], dict[str, Any]]: + if not db_url: + return [], {"status": "unavailable", "warning": "ACTCORE_DB_URL is not set"} + if not definitions: + return [], {"status": "ok", "count": 0} + ids = [uuid.UUID(item["id"]) for item in definitions] + stmt = text( + "select activity_def_id, source_id, output_validated, created_at from task_spawn_log " + "where activity_def_id in :ids and created_at >= :since and created_at <= :until " + "and output_validated is not null" + ).bindparams(bindparam("ids", expanding=True)) + engine = make_engine(db_url) + try: + async with engine.connect() as conn: + result = await conn.execute(stmt, {"ids": ids, "since": window["since_utc"], "until": window["until_utc"]}) + rows = result.mappings().all() + except Exception as exc: # pragma: no cover - depends on local DB driver/runtime + return [], {"status": "unavailable", "warning": f"task_spawn_log unavailable: {exc}"} + finally: + await engine.dispose() + return [{ + "source": "task_spawn_log", + "activity_id": str(row["activity_def_id"]), + "created_at": iso(coerce_datetime(row["created_at"])), + "output_validated": bool(row["output_validated"]), + "summary": f"instruction {row['source_id']} task output validation", + } for row in rows], {"status": "ok", "count": len(rows)} + + +def load_state_hub_progress(state_hub_url: str | None, window: dict[str, Any], *, limit: int, timeout_seconds: float, event_types: list[str | None] | None = None) -> tuple[list[dict[str, Any]], dict[str, Any]]: + if not state_hub_url: + return [], {"status": "unavailable", "warning": "STATE_HUB_URL is not set"} + event_types = event_types or [None] + payload: list[Any] = [] + try: + for event_type in event_types: + params: dict[str, Any] = {"limit": limit} + if event_type: + params["event_type"] = event_type + response = httpx.get(f"{state_hub_url.rstrip('/')}/progress/", params=params, timeout=timeout_seconds) + response.raise_for_status() + items = response.json() + if not isinstance(items, list): + return [], {"status": "unavailable", "warning": "State Hub progress response was not a list"} + payload.extend(items) + except Exception as exc: + detail = str(exc) or exc.__class__.__name__ + return [], {"status": "unavailable", "warning": f"State Hub progress unavailable: {detail}"} + + evidence: list[dict[str, Any]] = [] + seen: set[str] = set() + for item in payload: + if not isinstance(item, dict): + continue + item_id = str(item.get("id") or id(item)) + if item_id in seen: + continue + seen.add(item_id) + detail = item.get("detail") if isinstance(item.get("detail"), dict) else {} + created_at = coerce_datetime(item.get("created_at")) + scheduled_for = coerce_datetime(detail.get("scheduled_for")) + event_time = scheduled_for or created_at + if event_time and not in_window(event_time, window): + continue + run_id = string_or_none(detail.get("activity_core_run_id")) + activity_id = string_or_none(detail.get("activity_id")) + if not run_id and not activity_id and item.get("event_type") != "schedule_miss": + continue + evidence.append({ + "source": "state_hub_progress", + "activity_id": activity_id, + "run_id": run_id, + "scheduled_for": iso(scheduled_for), + "created_at": iso(created_at), + "event_type": string_or_none(item.get("event_type")), + "output_validated": bool_or_none(detail.get("output_validated")), + "validation_error": shorten(string_or_none(detail.get("validation_error"))), + "status": string_or_none(detail.get("status")), + "summary": shorten(string_or_none(item.get("summary"))), + "path": string_or_none(detail.get("working_memory_path")), + }) + labels = [item or "all" for item in event_types] + return evidence, {"status": "ok", "count": len(evidence), "event_types": labels} + + +def load_working_memory_evidence(working_memory_dir: str | None, window: dict[str, Any]) -> tuple[list[dict[str, Any]], dict[str, Any]]: + if not working_memory_dir: + return [], {"status": "unavailable", "warning": "working-memory directory is not configured"} + root = Path(working_memory_dir).expanduser() + if not root.exists(): + return [], {"status": "unavailable", "warning": f"working-memory directory does not exist: {root}"} + evidence: list[dict[str, Any]] = [] + for path in sorted(root.glob("*.md")): + meta = read_frontmatter(path) + if not meta or meta.get("source") != "activity-core": + continue + scheduled_for = coerce_datetime(meta.get("scheduled_for")) + created_at = coerce_datetime(meta.get("created")) + event_time = scheduled_for or created_at + if event_time and not in_window(event_time, window): + continue + evidence.append({ + "source": "working_memory", + "activity_id": string_or_none(meta.get("activity_id")), + "run_id": string_or_none(meta.get("activity_core_run_id")), + "scheduled_for": iso(scheduled_for), + "created_at": iso(created_at), + "output_validated": bool_or_none(meta.get("output_validated")), + "path": str(path), + "summary": path.name, + }) + return evidence, {"status": "ok", "count": len(evidence), "path": str(root)} + + +def read_frontmatter(path: Path) -> dict[str, Any]: + try: + value = path.read_text(encoding="utf-8") + except OSError: + return {} + if not value.startswith("---\n"): + return {} + parts = value.split("---\n", 2) + if len(parts) < 3: + return {} + loaded = yaml.safe_load(parts[1]) + return loaded if isinstance(loaded, dict) else {} + + +async def load_temporal_visibility(temporal_host: str | None, namespace: str, definitions: list[dict[str, Any]], *, timeout_seconds: float) -> tuple[dict[str, dict[str, Any]], dict[str, Any]]: + if not temporal_host: + return {}, {"status": "skipped", "warning": "TEMPORAL_HOST is not set"} + try: + from temporalio.client import Client + client = await asyncio.wait_for(Client.connect(temporal_host, namespace=namespace), timeout=timeout_seconds) + except Exception as exc: + return {}, {"status": "unavailable", "warning": f"Temporal unavailable: {exc}"} + records: dict[str, dict[str, Any]] = {} + for definition in definitions: + sid = automation_schedule_id(definition) + try: + desc = await asyncio.wait_for(client.get_schedule_handle(sid).describe(), timeout=timeout_seconds) + state = getattr(getattr(desc, "schedule", None), "state", None) + info = getattr(desc, "info", None) + records[definition["id"]] = { + "schedule_id": sid, + "available": True, + "paused": getattr(state, "paused", None), + "missed_catchup_window": int(getattr(info, "num_actions_missed_catchup_window", 0) or 0), + "last_fired_at": iso(latest_recent_action_time(getattr(info, "recent_actions", None) or [])), + "workflows": await list_workflows(client, definition["id"], timeout_seconds), + } + except Exception as exc: + records[definition["id"]] = {"schedule_id": sid, "available": False, "warning": str(exc)} + return records, {"status": "ok", "count": len(records)} + + +async def list_workflows(client: Any, activity_id: str, timeout_seconds: float) -> list[dict[str, Any]]: + async def collect() -> list[dict[str, Any]]: + workflows: list[dict[str, Any]] = [] + async for item in client.list_workflows(query=f'ActivityId="{activity_id}"'): + workflows.append({ + "id": item.id, + "run_id": item.run_id, + "status": str(item.status), + "start_time": iso(coerce_datetime(getattr(item, "start_time", None))), + "close_time": iso(coerce_datetime(getattr(item, "close_time", None))), + }) + if len(workflows) >= 5: + break + return workflows + return await asyncio.wait_for(collect(), timeout=timeout_seconds) + + +def latest_recent_action_time(actions: list[Any]) -> datetime | None: + times = [coerce_datetime(getattr(item, "scheduled_at", None) or getattr(item, "started_at", None)) for item in actions] + times = [item for item in times if item is not None] + return max(times) if times else None + + +def automation_schedule_id(definition: dict[str, Any]) -> str: + activity_id = definition["id"] + if definition.get("trigger_type") == "scheduled": + return f"activity-schedule-{activity_id}-once" + return schedule_id(activity_id) + + +async def build_report(args: argparse.Namespace) -> tuple[dict[str, Any], int]: + window = resolve_window(args.since, args.until, args.timezone) + timeout = max(float(args.timeout_seconds), 0.1) + warnings: list[str] = [] + sources: dict[str, dict[str, Any]] = {} + + try: + definitions, sources["definitions"] = await asyncio.wait_for( + load_definitions(args, warnings), + timeout=timeout, + ) + except asyncio.TimeoutError: + warning = "definition DB timed out; using file definitions" + warnings.append(warning) + definitions = file_definitions() + sources["definitions"] = {"status": "degraded", "source": "files", "warning": warning} + + definitions = filter_definitions(definitions, args.activity_id, args.activity_name) + + try: + runs_by_activity, sources["activity_runs"] = await asyncio.wait_for( + load_runs(args.db_url, definitions, window), + timeout=timeout, + ) + except asyncio.TimeoutError: + runs_by_activity = {} + sources["activity_runs"] = {"status": "unavailable", "warning": "activity_runs timed out"} + + try: + spawn_evidence, sources["task_spawn_log"] = await asyncio.wait_for( + load_spawn_validation(args.db_url, definitions, window), + timeout=timeout, + ) + except asyncio.TimeoutError: + spawn_evidence = [] + sources["task_spawn_log"] = {"status": "unavailable", "warning": "task_spawn_log timed out"} + + progress_evidence, sources["state_hub_progress"] = load_state_hub_progress( + args.state_hub_url, + window, + limit=args.progress_limit, + timeout_seconds=timeout, + event_types=progress_event_types(args), + ) + wm_evidence, sources["working_memory"] = load_working_memory_evidence(args.working_memory_dir, window) + temporal_by_activity, sources["temporal"] = await load_temporal_visibility( + args.temporal_host, + args.temporal_namespace, + definitions, + timeout_seconds=timeout, + ) + for source in sources.values(): + if source.get("status") not in {"ok", "skipped"} and source.get("warning"): + warnings.append(str(source["warning"])) + + all_evidence = progress_evidence + wm_evidence + spawn_evidence + definitions = add_evidence_only_definitions(definitions, all_evidence) + activities = [] + runs_available = sources["activity_runs"].get("status") == "ok" + for definition in definitions: + runs = runs_by_activity.get(definition["id"], []) + activities.append(classify_activity( + definition, + window, + runs, + evidence_for_activity(definition, runs, all_evidence, window), + temporal_by_activity.get(definition["id"]), + expected_fires(definition, window), + runs_available=runs_available, + )) + + report = { + "mode": "automation-status", + "generated_at": datetime.now(tz=timezone.utc).isoformat(), + "window": { + "since": window["since"].isoformat(), + "until": window["until"].isoformat(), + "timezone": window["timezone"], + "since_utc": window["since_utc"].isoformat(), + "until_utc": window["until_utc"].isoformat(), + }, + "sources": sources, + "summary": summarize(activities), + "activities": activities, + "warnings": sorted(set(warnings)), + } + exit_code = 1 if any(item["status"] in FAILURE_STATUSES for item in activities) else 0 + return report, exit_code + + +def add_evidence_only_definitions(definitions: list[dict[str, Any]], evidence: list[dict[str, Any]]) -> list[dict[str, Any]]: + known = {item["id"] for item in definitions} + result = list(definitions) + for item in evidence: + activity_id = item.get("activity_id") + if not activity_id or activity_id in known: + continue + known.add(activity_id) + result.append({ + "id": activity_id, + "name": f"Activity {activity_id}", + "enabled": True, + "trigger_type": "evidence", + "trigger_config": {}, + "instructions": [], + "source": "evidence", + }) + return result + + +def evidence_for_activity(definition: dict[str, Any], runs: list[dict[str, Any]], all_evidence: list[dict[str, Any]], window: dict[str, Any]) -> list[dict[str, Any]]: + run_ids = {item["run_id"] for item in runs} + selected: list[dict[str, Any]] = [] + for item in all_evidence: + if item.get("activity_id") == definition["id"] or (item.get("run_id") and item["run_id"] in run_ids): + selected.append(item) + continue + event_time = coerce_datetime(item.get("scheduled_for") or item.get("created_at")) + if item.get("activity_id") is None and item.get("run_id") in run_ids and (not event_time or in_window(event_time, window)): + selected.append(item) + return selected + + +def classify_activity(definition: dict[str, Any], window: dict[str, Any], runs: list[dict[str, Any]], evidence: list[dict[str, Any]], temporal: dict[str, Any] | None, expected: list[str], *, runs_available: bool) -> dict[str, Any]: + warnings: list[str] = [] + if not runs_available: + warnings.append("activity_runs source unavailable; missed-run verdict is unknown") + if temporal and not temporal.get("available", False) and temporal.get("warning"): + warnings.append(f"Temporal schedule visibility unavailable: {temporal['warning']}") + if temporal and temporal.get("paused") and definition.get("enabled"): + warnings.append("Temporal schedule is paused while ActivityDefinition is enabled") + + workflows = temporal.get("workflows", []) if temporal else [] + if not definition.get("enabled"): + status = "disabled" + elif any(item.get("output_validated") is False for item in evidence): + status = "validation_failed" + elif any(workflow_status_matches(item, {"RUNNING"}) for item in workflows): + status = "running" + elif any(workflow_status_matches(item, {"FAILED", "TIMED_OUT", "TERMINATED"}) for item in workflows): + status = "retrying" + elif temporal and int(temporal.get("missed_catchup_window") or 0) > 0: + status = "missed" + elif runs_available and definition.get("enabled") and len(expected) > len(runs): + status = "missed" + elif runs or evidence: + status = "completed" + elif expected: + status = "unknown" + else: + status = "no_due" + + return { + "id": definition["id"], + "name": definition["name"], + "enabled": definition["enabled"], + "trigger_type": definition["trigger_type"], + "trigger_config": public_trigger_config(definition.get("trigger_config", {})), + "schedule_id": automation_schedule_id(definition), + "definition_source": definition.get("source"), + "status": status, + "expected_fires": expected, + "expected_fire_count": len(expected), + "observed_run_count": len(runs), + "runs": runs, + "evidence": evidence, + "temporal": temporal, + "warnings": warnings, + "window": {"since": window["since"].isoformat(), "until": window["until"].isoformat(), "timezone": window["timezone"]}, + } + + +def workflow_status_matches(workflow: dict[str, Any], names: set[str]) -> bool: + value = str(workflow.get("status") or "").upper() + return any(name in value for name in names) + + +def public_trigger_config(config: dict[str, Any]) -> dict[str, Any]: + allowed = {"trigger_type", "cron_expression", "timezone", "misfire_policy", "catchup_window_seconds", "jitter_seconds", "at"} + return {key: config.get(key) for key in allowed if key in config} + + +def summarize(activities: list[dict[str, Any]]) -> dict[str, Any]: + counts: dict[str, int] = {} + for item in activities: + counts[item["status"]] = counts.get(item["status"], 0) + 1 + return {"activity_count": len(activities), "status_counts": counts, "failure_count": sum(counts.get(status, 0) for status in FAILURE_STATUSES)} + + +def render_human(report: dict[str, Any]) -> str: + window = report["window"] + lines = [ + f"Automation status {window['since']} -> {window['until']} ({window['timezone']})", + render_source_line(report["sources"]), + render_summary_line(report["summary"]), + "", + ] + for activity in report["activities"]: + lines.append(f"- {activity['name']} [{activity['status']}] expected={activity['expected_fire_count']} runs={activity['observed_run_count']} evidence={len(activity['evidence'])}") + if activity["expected_fires"]: + suffix = " ..." if len(activity["expected_fires"]) > 3 else "" + lines.append(" expected fires: " + ", ".join(activity["expected_fires"][:3]) + suffix) + for run in activity["runs"][:3]: + lines.append(f" run {run['run_id']} scheduled_for={run['scheduled_for']} fired_at={run['fired_at']} tasks={run['tasks_spawned']}") + for evidence in activity["evidence"][:3]: + detail = f" evidence {evidence['source']}" + if evidence.get("event_type"): + detail += f" event_type={evidence['event_type']}" + if evidence.get("run_id"): + detail += f" run={evidence['run_id']}" + if evidence.get("output_validated") is not None: + detail += f" output_validated={str(evidence['output_validated']).lower()}" + if evidence.get("validation_error"): + detail += f" validation_error={evidence['validation_error']}" + lines.append(detail) + for warning in activity["warnings"]: + lines.append(f" warning: {warning}") + if report["warnings"]: + lines.extend(["", "Warnings:"]) + lines.extend(f"- {warning}" for warning in report["warnings"]) + return "\n".join(lines) + + +def render_source_line(sources: dict[str, dict[str, Any]]) -> str: + parts = [] + for name, source in sources.items(): + label = f"{name}={source.get('status', 'unknown')}" + if source.get("source"): + label += f"/{source['source']}" + parts.append(label) + return "Sources: " + ", ".join(parts) + + +def render_summary_line(summary: dict[str, Any]) -> str: + counts = summary.get("status_counts") or {} + if not counts: + return "Summary: no scheduled automations found" + return "Summary: " + ", ".join(f"{status}={count}" for status, count in sorted(counts.items())) + + +def coerce_datetime(value: Any) -> datetime | None: + if value is None: + return None + if isinstance(value, datetime): + result = value + elif isinstance(value, date): + result = datetime.combine(value, time.min) + elif isinstance(value, str): + raw = value.strip() + if not raw: + return None + if raw.endswith("Z"): + raw = raw[:-1] + "+00:00" + try: + result = datetime.fromisoformat(raw) + except ValueError: + return None + else: + return None + if result.tzinfo is None: + result = result.replace(tzinfo=timezone.utc) + return result.astimezone(timezone.utc) + + +def in_window(value: datetime, window: dict[str, Any]) -> bool: + instant = coerce_datetime(value) + return bool(instant and window["since_utc"] <= instant <= window["until_utc"]) + + +def iso(value: datetime | None) -> str | None: + return value.isoformat() if value else None + + +def string_or_none(value: Any) -> str | None: + if value is None: + return None + result = str(value) + return result or None + + +def bool_or_none(value: Any) -> bool | None: + if value is None or isinstance(value, bool): + return value + if isinstance(value, str): + lowered = value.strip().lower() + if lowered in {"true", "yes", "1"}: + return True + if lowered in {"false", "no", "0"}: + return False + return None + + +def shorten(value: str | None, limit: int = 240) -> str | None: + if value is None: + return None + return value if len(value) <= limit else value[: limit - 3] + "..." + + +async def async_main(argv: list[str] | None = None) -> int: + args = parse_args(argv) + try: + report, exit_code = await build_report(args) + except ValueError as exc: + print(f"automation_status: {exc}", file=sys.stderr) + return 2 + if args.format == "json": + print(json.dumps(report, indent=2, sort_keys=True)) + else: + print(render_human(report)) + return exit_code + + +def main(argv: list[str] | None = None) -> int: + return asyncio.run(async_main(argv)) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/activity_core/llm_client.py b/src/activity_core/llm_client.py index cc9d8cb..7bc00ff 100644 --- a/src/activity_core/llm_client.py +++ b/src/activity_core/llm_client.py @@ -17,6 +17,8 @@ import httpx class DisabledLLMClient: """LLM client used when no llm-connect endpoint is configured.""" + last_response_metadata: dict[str, Any] | None = None + def complete( self, prompt: str, @@ -32,6 +34,7 @@ class LLMConnectClient: def __init__(self, base_url: str, timeout_seconds: float = 300.0) -> None: self.base_url = base_url.rstrip("/") self.timeout_seconds = timeout_seconds + self.last_response_metadata: dict[str, Any] | None = None def complete( self, @@ -54,12 +57,48 @@ class LLMConnectClient: ) resp.raise_for_status() data = resp.json() + self.last_response_metadata = _extract_response_metadata(data) content = data.get("content") if not isinstance(content, str): raise ValueError("llm-connect response missing string content") return content +_SAFE_RESPONSE_METADATA_KEYS = { + "finish_reason", + "usage", + "model", + "model_name", + "provider", + "request_id", + "response_id", + "trace_id", + "latency_ms", + "duration_ms", + "elapsed_ms", + "created", + "created_at", +} + + +def _extract_response_metadata(data: dict[str, Any]) -> dict[str, Any]: + """Keep non-secret llm-connect diagnostics alongside the returned content.""" + return { + key: value for key, value in data.items() + if key in _SAFE_RESPONSE_METADATA_KEYS and _json_safe(value) + } + + +def _json_safe(value: Any) -> bool: + try: + import json + + json.dumps(value) + except (TypeError, ValueError): + return False + return True + + def get_llm_client() -> DisabledLLMClient | LLMConnectClient: base_url = os.environ.get("LLM_CONNECT_URL", "").strip() if not base_url: diff --git a/src/activity_core/report_sinks.py b/src/activity_core/report_sinks.py index 66fa5a7..8fd0cdb 100644 --- a/src/activity_core/report_sinks.py +++ b/src/activity_core/report_sinks.py @@ -136,6 +136,7 @@ def _post_state_hub_progress( "output_validated": report_entry.get("output_validated"), "review_required": report_entry.get("review_required"), "validation_error": report_entry.get("validation_error"), + "llm_response_metadata": report_entry.get("llm_response_metadata"), "report": report, }, } @@ -224,6 +225,16 @@ def _render_markdown( lines.extend([summary, ""]) if validation_error: lines.extend(["Validation error:", "", f"`{validation_error}`", ""]) + metadata = report_entry.get("llm_response_metadata") + if metadata: + lines.extend([ + "LLM response metadata:", + "", + "```json", + json.dumps(metadata, indent=2, sort_keys=True), + "```", + "", + ]) lines.extend([ "```json", json.dumps(report, indent=2, sort_keys=True), diff --git a/src/activity_core/rules/executor.py b/src/activity_core/rules/executor.py index 8d04f0b..5a56a12 100644 --- a/src/activity_core/rules/executor.py +++ b/src/activity_core/rules/executor.py @@ -41,6 +41,7 @@ class InstructionResult: review_required: bool = False condition_matched: str | None = None validation_error: str | None = None + llm_response_metadata: dict[str, Any] | None = None def _resolve_path(obj: Any, path: str) -> Any: @@ -167,12 +168,14 @@ def _execute( # Step 3 — call LLM raw_output = llm_client.complete(rendered, model=instr.model, config=llm_config) + response_metadata = _llm_response_metadata(llm_client) # Step 4 — validate and optionally retry task_specs, report, error = _validate_output(raw_output, instr, allow_list) if error: retry_prompt = rendered + f"\n\nPrevious output was invalid: {error}\nPlease fix." raw_output = llm_client.complete(retry_prompt, model=instr.model, config=llm_config) + response_metadata = _llm_response_metadata(llm_client) task_specs, report, error = _validate_output(raw_output, instr, allow_list) if error: # Truncate to keep log volume bounded but long enough to see the @@ -188,10 +191,13 @@ def _execute( # loss. One bad item should cost one item, not the whole report. recovered = _resilient_report( instr, raw_output, error, prompt_hash, allow_list, + response_metadata=response_metadata, ) if recovered is not None: return recovered - failure_report = _invalid_output_report(instr, error, raw_output) + failure_report = _invalid_output_report( + instr, error, raw_output, response_metadata=response_metadata, + ) if failure_report is not None: return InstructionResult( tasks=[], @@ -202,6 +208,7 @@ def _execute( review_required=True, condition_matched=instr.condition or None, validation_error=error, + llm_response_metadata=response_metadata, ) return _empty_result(instr, prompt_hash=prompt_hash, validation_error=error) @@ -213,6 +220,7 @@ def _execute( output_validated=True, review_required=bool(getattr(instr, "review_required", False)), condition_matched=instr.condition or None, + llm_response_metadata=response_metadata, ) @@ -252,6 +260,7 @@ def _invalid_output_report( instr: Any, validation_error: str, raw_output: Any, + response_metadata: dict[str, Any] | None = None, ) -> dict[str, Any] | None: """Build a durable diagnostic report for invalid report-sink output. @@ -269,7 +278,7 @@ def _invalid_output_report( partial_output = _parse_json_output(raw_output) except json.JSONDecodeError: partial_output = None - raw_preview = raw_output[:4000] + raw_preview = raw_output[:_RAW_OUTPUT_PREVIEW_LIMIT] else: partial_output = raw_output @@ -281,6 +290,8 @@ def _invalid_output_report( "status": "validation_failed", "validation_error": validation_error, } + if response_metadata: + report["llm_response_metadata"] = response_metadata if isinstance(partial_output, dict): if isinstance(partial_output.get("summary"), str): report["partial_summary"] = partial_output["summary"] @@ -310,9 +321,43 @@ _SNIPPET_LIMIT = 200 # fail the whole report or flow unbounded into a downstream consumer. _MAX_STRING_LEN = 4000 _MAX_DEPTH = 8 +_RAW_OUTPUT_PREVIEW_LIMIT = 12000 _SUMMARY_RE = re.compile(r'"summary"\s*:\s*"((?:[^"\\]|\\.)*)"') +_SAFE_RESPONSE_METADATA_KEYS = { + "finish_reason", + "usage", + "model", + "model_name", + "provider", + "request_id", + "response_id", + "trace_id", + "latency_ms", + "duration_ms", + "elapsed_ms", + "created", + "created_at", +} + + +def _llm_response_metadata(llm_client: Any) -> dict[str, Any] | None: + metadata = getattr(llm_client, "last_response_metadata", None) + if not isinstance(metadata, dict) or not metadata: + return None + safe: dict[str, Any] = {} + for key, value in metadata.items(): + if key not in _SAFE_RESPONSE_METADATA_KEYS: + continue + try: + json.dumps(value) + except (TypeError, ValueError): + continue + safe[str(key)] = value + return safe or None + + def _snippet(value: Any) -> str: text = value if isinstance(value, str) else json.dumps(value, default=str) return text[:_SNIPPET_LIMIT] @@ -561,6 +606,7 @@ def _resilient_report( original_error: str, prompt_hash: str | None, allow_list: set[str] | None = None, + response_metadata: dict[str, Any] | None = None, ) -> InstructionResult | None: """Recover a partial-but-usable report from output that failed validation. @@ -590,6 +636,8 @@ def _resilient_report( "quarantined_items": quarantined[:_QUARANTINE_LIMIT], "recovery_note": f"original validation error: {original_error}", } + if response_metadata: + report["llm_response_metadata"] = response_metadata logger.warning( "instruction_output_recovered: instruction=%r, kept=%d, quarantined=%d", getattr(instr, "id", None), len(valid), len(quarantined), @@ -603,6 +651,7 @@ def _resilient_report( review_required=True, condition_matched=getattr(instr, "condition", "") or None, validation_error=None, + llm_response_metadata=response_metadata, ) diff --git a/tests/rules/test_executor.py b/tests/rules/test_executor.py index 23bcad6..cd9c71b 100644 --- a/tests/rules/test_executor.py +++ b/tests/rules/test_executor.py @@ -573,6 +573,47 @@ def test_resilient_recovery_against_real_2026_06_26_fixture(): assert all("rank" in rec and "candidate" in rec for rec in result.report["recommendations"]) + +class _MetadataBadLLM: + def __init__(self) -> None: + self.call_count = 0 + self.last_response_metadata: dict[str, Any] | None = None + + def complete( + self, + prompt: str, + model: str = "", + config: dict | None = None, + ) -> str: + self.call_count += 1 + self.last_response_metadata = { + "finish_reason": "length", + "usage": {"input_tokens": 1100, "output_tokens": 1200}, + } + return ("x" * 9000) + "{" + + +def test_invalid_report_preserves_response_metadata_and_long_preview(): + llm = _MetadataBadLLM() + instr = _instr( + id="daily-triage-report", + prompt="Report.", + trusted_fields=[], + report_sinks=[{"type": "working-memory", "path": "/tmp"}], + ) + + result = execute_instruction_with_audit(instr, _Event(), {}, llm) + + assert llm.call_count == 2 + assert result.output_validated is False + assert result.llm_response_metadata == { + "finish_reason": "length", + "usage": {"input_tokens": 1100, "output_tokens": 1200}, + } + assert result.report["llm_response_metadata"] == result.llm_response_metadata + assert len(result.report["raw_output_preview"]) > 4000 + + def test_execute_instruction_with_audit_preserves_invalid_report_with_sinks( tmp_path, monkeypatch, diff --git a/tests/test_automation_status.py b/tests/test_automation_status.py new file mode 100644 index 0000000..47da300 --- /dev/null +++ b/tests/test_automation_status.py @@ -0,0 +1,184 @@ +from __future__ import annotations + +from datetime import datetime +from pathlib import Path +from zoneinfo import ZoneInfo + +from activity_core import automation_status as status + +ACTIVITY_ID = "00000000-0000-0000-0000-000000000123" + + +def _window(): + return status.resolve_window( + "2026-06-26", + "2026-06-29", + "Europe/Berlin", + ) + + +def _definition(enabled: bool = True): + return { + "id": ACTIVITY_ID, + "name": "Daily Check", + "enabled": enabled, + "trigger_type": "cron", + "trigger_config": { + "trigger_type": "cron", + "cron_expression": "0 9 * * *", + "timezone": "Europe/Berlin", + "misfire_policy": "skip", + }, + "source": "test", + } + + +def test_friday_shortcut_resolves_to_previous_friday_start() -> None: + now = datetime(2026, 6, 29, 12, 0, tzinfo=ZoneInfo("Europe/Berlin")) + + window = status.resolve_window("friday", None, "Europe/Berlin", now=now) + + assert window["since"].isoformat() == "2026-06-26T00:00:00+02:00" + assert window["until"].isoformat() == "2026-06-29T12:00:00+02:00" + + +def test_expected_fires_for_simple_cron_window() -> None: + fires = status.expected_fires(_definition(), _window()) + + assert fires == [ + "2026-06-26T09:00:00+02:00", + "2026-06-27T09:00:00+02:00", + "2026-06-28T09:00:00+02:00", + "2026-06-29T09:00:00+02:00", + ] + + +def test_completed_when_expected_run_exists() -> None: + run = { + "run_id": "run-1", + "activity_id": ACTIVITY_ID, + "scheduled_for": "2026-06-26T07:00:00+00:00", + "fired_at": "2026-06-26T07:00:10+00:00", + "tasks_spawned": 1, + } + + report = status.classify_activity( + _definition(), + _window(), + [run], + [{"source": "state_hub_progress", "run_id": "run-1", "output_validated": True}], + None, + ["2026-06-26T09:00:00+02:00"], + runs_available=True, + ) + + assert report["status"] == "completed" + + +def test_validation_failure_wins_over_completed_run() -> None: + run = {"run_id": "run-1", "activity_id": ACTIVITY_ID, "scheduled_for": None, "fired_at": "2026-06-26T07:00:10+00:00"} + + report = status.classify_activity( + _definition(), + _window(), + [run], + [{"source": "working_memory", "run_id": "run-1", "output_validated": False}], + None, + ["2026-06-26T09:00:00+02:00"], + runs_available=True, + ) + + assert report["status"] == "validation_failed" + + +def test_missed_when_expected_fire_has_no_run_and_runs_available() -> None: + report = status.classify_activity( + _definition(), + _window(), + [], + [], + None, + ["2026-06-26T09:00:00+02:00"], + runs_available=True, + ) + + assert report["status"] == "missed" + + +def test_disabled_schedule_is_not_counted_as_missed() -> None: + report = status.classify_activity( + _definition(enabled=False), + _window(), + [], + [], + None, + ["2026-06-26T09:00:00+02:00"], + runs_available=True, + ) + + assert report["status"] == "disabled" + + +def test_scheduled_definition_reports_one_shot_schedule_id() -> None: + definition = { + "id": ACTIVITY_ID, + "name": "One Shot", + "enabled": True, + "trigger_type": "scheduled", + "trigger_config": { + "trigger_type": "scheduled", + "at": "2026-06-26T09:00:00+02:00", + "timezone": "Europe/Berlin", + }, + "source": "test", + } + + report = status.classify_activity( + definition, + _window(), + [], + [], + None, + ["2026-06-26T09:00:00+02:00"], + runs_available=False, + ) + + assert status.automation_schedule_id(_definition()) == f"activity-schedule-{ACTIVITY_ID}" + assert report["schedule_id"] == f"activity-schedule-{ACTIVITY_ID}-once" + + +def test_partial_source_availability_is_unknown_not_missed() -> None: + report = status.classify_activity( + _definition(), + _window(), + [], + [], + None, + ["2026-06-26T09:00:00+02:00"], + runs_available=False, + ) + + assert report["status"] == "unknown" + assert "missed-run verdict is unknown" in report["warnings"][0] + + +def test_working_memory_frontmatter_evidence(tmp_path: Path) -> None: + note = tmp_path / "daily-triage-2026-06-26-run.md" + note.write_text( + "---\n" + "source: activity-core\n" + f"activity_id: {ACTIVITY_ID}\n" + "activity_core_run_id: run-1\n" + "scheduled_for: 2026-06-26T07:00:00+00:00\n" + "output_validated: false\n" + "created: 2026-06-26T07:01:00+00:00\n" + "---\n" + "body\n", + encoding="utf-8", + ) + + evidence, source = status.load_working_memory_evidence(str(tmp_path), _window()) + + assert source["status"] == "ok" + assert evidence[0]["run_id"] == "run-1" + assert evidence[0]["output_validated"] is False diff --git a/tests/test_llm_client.py b/tests/test_llm_client.py index 81a40bc..655c4b4 100644 --- a/tests/test_llm_client.py +++ b/tests/test_llm_client.py @@ -13,7 +13,12 @@ def test_llm_connect_client_forwards_run_config(monkeypatch) -> None: pass def json(self) -> dict: - return {"content": '{"summary":"ok","recommendations":[]}'} + return { + "content": '{"summary":"ok","recommendations":[]}', + "finish_reason": "stop", + "usage": {"input_tokens": 10, "output_tokens": 20}, + "raw_response": {"provider_blob": "not persisted"}, + } def fake_post(url: str, json: dict, timeout: float) -> Response: captured["url"] = url @@ -50,3 +55,7 @@ def test_llm_connect_client_forwards_run_config(monkeypatch) -> None: "timeout_seconds": 42, }, } + assert client.last_response_metadata == { + "finish_reason": "stop", + "usage": {"input_tokens": 10, "output_tokens": 20}, + } diff --git a/tests/test_railiance_ops_inventory_wiring.py b/tests/test_railiance_ops_inventory_wiring.py index 4db9103..ca64244 100644 --- a/tests/test_railiance_ops_inventory_wiring.py +++ b/tests/test_railiance_ops_inventory_wiring.py @@ -93,12 +93,21 @@ def test_external_configmap_projects_enabled_daily_wsjf_definition(tmp_path) -> assert definition.trigger_config["cron_expression"] == "20 7 * * *" assert definition.trigger_config["timezone"] == "Europe/Berlin" assert instruction["id"] == "daily-triage-report" + assert instruction["max_tokens"] == 1800 + assert "most 7 recommendations" in instruction["prompt"] + assert "fewer well-formed" in instruction["prompt"] assert instruction["output_schema"] == ( "/etc/activity-core/schemas/daily-triage-report.json" ) assert instruction["report_sinks"][0]["type"] == "working-memory" assert instruction["report_sinks"][1]["event_type"] == "daily_triage" + schema = _by_kind_name("ConfigMap", "actcore-report-schemas") + daily_schema = yaml.safe_load(schema["data"]["daily-triage-report.json"]) + recommendations = daily_schema["properties"]["recommendations"] + assert recommendations["maxItems"] == 7 + assert recommendations["items"]["properties"]["rank"]["maximum"] == 7 + def test_ops_inventory_configmap_contains_probeable_inventory() -> None: config = _by_kind_name("ConfigMap", "actcore-ops-service-inventory") diff --git a/tests/test_report_sinks.py b/tests/test_report_sinks.py index ef86b68..f020ae3 100644 --- a/tests/test_report_sinks.py +++ b/tests/test_report_sinks.py @@ -37,6 +37,10 @@ def _payload(sinks: list[dict[str, Any]]) -> dict[str, Any]: "output_validated": True, "review_required": False, "validation_error": None, + "llm_response_metadata": { + "finish_reason": "stop", + "usage": {"output_tokens": 50}, + }, } ], } @@ -62,6 +66,8 @@ def test_working_memory_sink_writes_idempotently(tmp_path) -> None: assert "output_validated: true" in text assert "review_required: false" in text assert "model: test-model" in text + assert "LLM response metadata:" in text + assert '"finish_reason": "stop"' in text assert "State Hub has loose ends." in text @@ -113,6 +119,10 @@ def test_state_hub_progress_sink_posts(monkeypatch) -> None: assert posts[0]["json"]["detail"]["activity_core_run_id"] == payload_run_id() assert posts[0]["json"]["detail"]["output_validated"] is True assert posts[0]["json"]["detail"]["review_required"] is False + assert posts[0]["json"]["detail"]["llm_response_metadata"] == { + "finish_reason": "stop", + "usage": {"output_tokens": 50}, + } def test_state_hub_progress_includes_prior_working_memory_path( diff --git a/workplans/ACTIVITY-WP-0006-post-triage-operational-hardening.md b/workplans/ACTIVITY-WP-0006-post-triage-operational-hardening.md index dd61720..502a0e6 100644 --- a/workplans/ACTIVITY-WP-0006-post-triage-operational-hardening.md +++ b/workplans/ACTIVITY-WP-0006-post-triage-operational-hardening.md @@ -4,11 +4,11 @@ type: workplan title: "Post-triage operational hardening" domain: custodian repo: activity-core -status: active +status: finished owner: codex topic_slug: custodian created: "2026-06-03" -updated: "2026-06-27" +updated: "2026-06-30" state_hub_workstream_id: "5646e13a-13af-4724-bca6-3c0d86f96733" --- @@ -104,7 +104,7 @@ and emitted a validated `daily_triage` report plus working-memory note. ```task id: ACTIVITY-WP-0006-T03 -status: wait +status: done priority: medium state_hub_task_id: "7cbf0a35-71a1-47ac-afc2-f51ad2180fd0" ``` @@ -203,6 +203,27 @@ ACTIVITY-WP-0016 output-robustness bundle and runtime prompt/token changes, not a missing schedule. T03 stays wait until a post-deployment smoke passes and three new clean scheduled runs are collected. +2026-06-30 early checkpoint: two new clean scheduled runs exist after the +validation failures. State Hub daily_triage progress shows 2026-06-28 +05:20:51Z run `6a44d6dd-3f02-53f2-a5d8-d42b76b0ef98` and 2026-06-29 +05:20:49Z run `1dfb47c9-07bf-551b-b778-1d21a40bd95c`, both with +`output_validated=true` and working-memory notes written. The current local time +was 2026-06-30 01:37 Europe/Berlin, before the expected 07:20 Berlin scheduled +fire, so the three-clean-run gate cannot close yet. Recheck after 2026-06-30 +05:20Z; if that scheduled run validates, the clean streak is 06-28 / 06-29 / +06-30 and T03 can close with calibration feedback. + +2026-06-30 closeout: the 07:20 Berlin scheduled run fired at 05:20:50Z as run +`ac3d71a0-2f8f-50df-b3ce-7c60c2abb5c5` with `output_validated=true` and a +working-memory note written. The post-failure clean streak is now complete: +2026-06-28 (`6a44d6dd`), 2026-06-29 (`1dfb47c9`), and 2026-06-30 (`ac3d71a0`). +Calibration feedback: the scheduler, worker, llm-connect route, State Hub sink, +and working-memory sink are stable again; the recommendations were operationally +useful but too dense at 10 items, repeatedly emphasizing human-dependency and +infrastructure-unblock work. ACTIVITY-WP-0016 now owns the density/contract fix: +Railiance runtime projection was aligned to a top-7 contract so the next live +run can prove the bounded output posture. T03 is done. + ## Rule Action Contract Documentation ```task diff --git a/workplans/ACTIVITY-WP-0016-llm-output-robustness-trust-boundary.md b/workplans/ACTIVITY-WP-0016-llm-output-robustness-trust-boundary.md index c223cee..f601013 100644 --- a/workplans/ACTIVITY-WP-0016-llm-output-robustness-trust-boundary.md +++ b/workplans/ACTIVITY-WP-0016-llm-output-robustness-trust-boundary.md @@ -8,7 +8,7 @@ status: active owner: codex topic_slug: custodian created: "2026-06-26" -updated: "2026-06-27" +updated: "2026-06-30" state_hub_workstream_id: "4ef0d53b-1777-41ae-80c6-1b69fdb34726" --- @@ -144,11 +144,21 @@ Done when: `tests/fixtures/wp0016/daily_triage_2026-06-26_validation_failure.partial.json` (the 4000-char preview + validation error; full payload pending the remote pull). +2026-06-30 local retention hardening: activity-core now preserves future +llm-connect diagnostic metadata instead of dropping it at the client boundary. +`LLMConnectClient.complete()` still returns the content string for compatibility, +but records safe non-secret response fields such as `finish_reason` and `usage` +on `last_response_metadata`; the executor copies that into report artifacts, +State Hub progress detail, and working-memory notes. Invalid report raw previews +were raised from 4000 to 12000 chars. This does not recover the historical +06-26 full payload or producer-side `finish_reason`, so T01 remains wait on the +remote llm-connect log pull, but the retention gap is closed for future failures. + ## Schema + Prompt Redesign For Error Locality ```task id: ACTIVITY-WP-0016-T02 -status: progress +status: done priority: high state_hub_task_id: "ae67ca8c-ee01-4a8d-9e8a-a0a36c999758" ``` @@ -209,6 +219,21 @@ Apply there: 4. State the value vocabularies (`action`, `confidence`) the T04 guardrails will check. +2026-06-30 live evidence check: the 2026-06-28 and 2026-06-29 scheduled +`daily_triage` events validated successfully, which shows the runtime is no +longer failing every day. However, the preserved State Hub reports still contain +10 recommendations, not the requested bounded top-N of 7 / framed item contract. +Treat that as evidence that the runtime-projected prompt/schema/max-token bundle +has not fully absorbed the T02 handoff yet. + +2026-06-30 source projection closeout: patched `k8s/railiance/20-runtime.yaml` +so the projected `daily-statehub-wsjf-triage.md` prompt now says at most 7 +recommendations and instructs the model to emit fewer well-formed items rather +than more. The projected `daily-triage-report.json` now has `maxItems: 7` and +`rank.maximum: 7`, aligned with the repo schema. `max_tokens: 1800` remains as +headroom for the bounded report. T02 is done in source; live deployment and an +observed <=7 recommendation run remain under T05. + ## Boundary Parser — Verify & Mitigate (Posture B) ```task @@ -368,6 +393,19 @@ Done when: is cluster/operator work outside this repo's SCOPE. T05 therefore stays `progress` until that live run exists; the in-repo deliverables are done. +2026-06-30 follow-up: added forward-looking diagnostics so future validation +failures carry llm-connect response metadata and a larger bounded raw-output +preview in activity-core-owned evidence. Focused verification passed: +`uv run pytest tests/test_llm_client.py tests/rules/test_executor.py tests/test_report_sinks.py -q` +=> 39 passed. This improves future root-cause ability but does not replace the +required live smoke proving graceful degradation on railiance01. + +2026-06-30 projection follow-up: local source projection now enforces the top-7 +prompt/schema contract. Remaining T05 proof is operational: deploy or sync the +updated `k8s/railiance/20-runtime.yaml`, run `actcore-sync`/schedule smoke or wait +for the next 07:20 Berlin fire, then confirm State Hub `daily_triage` evidence is +`output_validated=true` with no more than 7 recommendations. + ## Relationships - **Blocks / feeds:** `ACTIVITY-WP-0006-T03` (three clean scheduled runs) and diff --git a/workplans/ACTIVITY-WP-0018-own-infra-automation-status.md b/workplans/ACTIVITY-WP-0018-own-infra-automation-status.md new file mode 100644 index 0000000..c146a3e --- /dev/null +++ b/workplans/ACTIVITY-WP-0018-own-infra-automation-status.md @@ -0,0 +1,248 @@ +--- +id: ACTIVITY-WP-0018 +type: workplan +title: "Own-infrastructure automation status surface" +domain: infotech +repo: activity-core +status: finished +owner: codex +topic_slug: automation-observability +created: "2026-06-29" +updated: "2026-06-29" +state_hub_workstream_id: "0220b38b-7c73-4601-9601-5f2c1a5b29e8" +--- + +# Own-infrastructure automation status surface + +## Goal + +Make activity-core's own scheduling and evidence infrastructure the explicit +operating preference for durable automations, independent of any coding +assistant-provided scheduler or reminder system. + +An operator should be able to answer a question like "How did our automations go +since Friday?" with a repo-native command that does not require an LLM. Coding +assistants may inspect or summarize that command's output, but they must not be +the source of truth for scheduled execution, run history, or operational +evidence. + +## Review notes + +The repo already owns the correct infrastructure direction: + +- `SCOPE.md` defines activity-core as the org-wide event bridge for cron, + one-off scheduled datetime, and event-triggered automation. +- `Makefile` exposes sync and service targets, but no operator status target for + recent automation outcomes. +- `docs/runbook.md` documents daily-triage verification through + `scripts/verify_daily_triage.py`, but that helper is activity-specific and + still reads like a checklist rather than the baseline answer surface for all + automations. +- Existing workplan evidence shows the status question is operationally common: + 2026-06-24 and 2026-06-25 daily triage runs were clean, while 2026-06-26 and + 2026-06-27 fired on schedule but failed output validation. That distinction is + exactly what the baseline command must make obvious. + +## Task: Codify the own-infra scheduling preference + +```task +id: ACTIVITY-WP-0018-T01 +status: done +priority: high +state_hub_task_id: "00127678-5ce4-4cb3-b81c-f42e04407c73" +``` + +Record the repository preference that durable automation scheduling, execution +history, and run evidence belong to activity-core's own infrastructure: Temporal +Schedules, NATS JetStream, activity-core run records, State Hub progress, and +working-memory/report sinks. + +Acceptance: + +- `AGENTS.md` repo-specific instructions say not to use coding + assistant-provided automation tooling as the execution or evidence source for + activity-core automations. +- `SCOPE.md` and `docs/runbook.md` describe coding assistants as callers or + summarizers of repo-native automation commands, not as schedulers. +- The preference distinguishes durable automation from harmless local session + reminders: production/operational recurrence belongs to activity-core. +- The text names the authoritative evidence sources and avoids tying the policy + to any one assistant product. + +2026-06-29 progress: Added the immediate repo-agent instruction in AGENTS.md +that durable activity-core automations must use repo-owned infrastructure, not +coding assistant automation/reminder/heartbeat tooling, as the execution or +evidence source. Remaining T01 work is to carry the same preference into +SCOPE.md and docs/runbook.md. + +## Task: Define the automation status evidence contract + +```task +id: ACTIVITY-WP-0018-T02 +status: done +priority: high +state_hub_task_id: "17e6bb87-d4bf-4ef3-b91c-4bdfe2fe3492" +``` + +Define a small, deterministic report contract for answering recent automation +status questions across all ActivityDefinitions. + +Acceptance: + +- The contract covers schedule state, expected fires in the requested window, + observed workflow runs, `activity_runs` rows, State Hub progress events, + working-memory/report sink evidence, and known validation or sink failures. +- It defines normalized statuses such as `completed`, `running`, `retrying`, + `validation_failed`, `sink_failed`, `missed`, `disabled`, and `unknown`. +- Partial data is explicit: if Temporal, Postgres, State Hub, or a sink path is + unavailable, the report includes warnings rather than silently passing or + failing the whole check. +- The contract is safe for operator logs: no secrets, prompts, raw model output, + or credential-bearing URLs. +- The contract can be emitted as JSON for scripts and rendered as concise text + for humans. + +## Task: Implement the non-LLM automation status CLI + +```task +id: ACTIVITY-WP-0018-T03 +status: done +priority: high +state_hub_task_id: "7831f2fc-8b76-48fe-aa34-9dcc11ee84db" +``` + +Add a deterministic CLI, likely under `scripts/automation_status.py` or an +`activity_core` module, that answers recent automation status questions without +calling an LLM. + +Acceptance: + +- Supports `--since`, `--until`, activity name/id filters, JSON output, and a + concise human summary. +- Accepts simple operator dates, including absolute dates and a documented + `friday`/`last-friday` style shortcut, resolving them to concrete dates in the + configured timezone. +- Inspects all enabled scheduled ActivityDefinitions by default, not just daily + triage. +- Uses live sources when configured: Postgres `activity_definitions` / + `activity_runs`, Temporal schedule and workflow visibility, State Hub + progress, and configured local report sink paths. +- Degrades usefully when a source is unavailable and exits non-zero only for + real status failures or invalid input, not for optional evidence gaps that are + clearly reported. +- Includes focused unit tests with fixture data for clean runs, validation + failures, missed runs, disabled schedules, and partial-source availability. + +## Task: Add the Make target baseline + +```task +id: ACTIVITY-WP-0018-T04 +status: done +priority: high +state_hub_task_id: "451bdf62-b619-4ace-9262-46d20b912781" +``` + +Expose the CLI through a Make target that is easy for an operator or any coding +assistant to run before attempting a prose summary. + +Acceptance: + +- `make automation-status SINCE=2026-06-26` prints the human-readable baseline. +- `make automation-status SINCE=friday` is supported or documented with the + exact accepted shortcut. +- A JSON form is available, either through `FORMAT=json` or a separate target + such as `make automation-status-json`. +- The target does not require LLM credentials, coding assistant automation + tooling, or interactive prompts. +- `make help` lists the target with a clear one-line description. + +## Task: Update operator docs and examples + +```task +id: ACTIVITY-WP-0018-T05 +status: done +priority: medium +state_hub_task_id: "233659aa-e14a-4b3d-b156-d04f0fa16db6" +``` + +Update the runbook so "How did automations go since Friday?" has an obvious +operator recipe. + +Acceptance: + +- `docs/runbook.md` has a short "Automation status" section near the scheduling + operations. +- The docs include example output or a compact sample for the known daily + triage distinction: fired on time versus completed successfully versus output + validation failure. +- The docs clarify that LLM summaries are optional convenience only; the Make + target output is the baseline evidence. +- The daily-triage-specific helper is either kept as a lower-level diagnostic or + folded into the generalized status command. + +## Task: Verify against recent scheduled-run evidence + +```task +id: ACTIVITY-WP-0018-T06 +status: done +priority: medium +state_hub_task_id: "24efbe9f-dfff-482f-9edc-456379c9a2aa" +``` + +Prove the new surface against the recent evidence that motivated this workplan. + +Acceptance: + +- Running the status command over the window starting Friday, 2026-06-26 shows + that the daily triage schedule fired on 2026-06-26 and 2026-06-27 but did not + produce clean validated reports. +- The command distinguishes scheduling health from output/schema validation + failure. +- Disabled or waiting schedules, such as the weekly coding retro gate when its + upstream read model is not available, are reported without being counted as + missed runs. +- Verification results are recorded in this workplan and as a State Hub progress + note once the implementation lands. + +## Implementation Result + +Completed 2026-06-29: implemented the own-infrastructure automation status +surface and codified the scheduling preference. + +Delivered: + +- `AGENTS.md` now states that durable activity-core automations use repo-owned + infrastructure, not coding assistant automation/reminder/heartbeat tooling, as + execution or evidence authority. +- `SCOPE.md` and `docs/runbook.md` describe the deterministic status surface and + assistant boundary. +- `src/activity_core/automation_status.py` and `scripts/automation_status.py` + provide the non-LLM CLI. +- `make automation-status SINCE=...` and `make automation-status-json` expose the + baseline operator commands. +- `tests/test_automation_status.py` covers date shortcuts, cron fire estimation, + completed runs, validation failures, missed runs, disabled schedules, partial + source availability, and working-memory evidence parsing. + +Verification: + +```bash +python3 -m py_compile src/activity_core/automation_status.py scripts/automation_status.py tests/test_automation_status.py +/home/worsch/.local/bin/uv run pytest tests/test_automation_status.py tests/test_daily_triage_verifier.py -q +/home/worsch/.local/bin/uv run python scripts/automation_status.py \ + --since 2026-06-26 --until 2026-06-27 --db-url '' \ + --progress-event-type daily_triage --timeout-seconds 10 \ + --working-memory-dir /tmp --format json +``` + +Results: + +- focused tests: `11 passed`; +- `make help` lists `automation-status` and `automation-status-json`; +- the 2026-06-26 through 2026-06-27 status run exited `1` as expected because + State Hub evidence classified daily triage activity + `6fca51fa-387a-4fd0-bc4e-d62c29eb859a` as `validation_failed` with two + non-secret evidence records: 2026-06-26 `Expecting ',' delimiter` and + 2026-06-27 `Unterminated string`; +- the same report classified the gated weekly coding retro as `disabled`, not + `missed`. \ No newline at end of file diff --git a/workplans/ACTIVITY-WP-0019-automation-schedule-inventory-targets.md b/workplans/ACTIVITY-WP-0019-automation-schedule-inventory-targets.md new file mode 100644 index 0000000..a8db05e --- /dev/null +++ b/workplans/ACTIVITY-WP-0019-automation-schedule-inventory-targets.md @@ -0,0 +1,164 @@ +--- +id: ACTIVITY-WP-0019 +type: workplan +title: "Automation schedule inventory Make targets" +domain: infotech +repo: activity-core +status: ready +owner: codex +topic_slug: automation-inventory +created: "2026-06-29" +updated: "2026-06-29" +state_hub_workstream_id: "21c73763-9adc-42f6-8fd2-1b8b33c2c770" +--- + +# Automation schedule inventory Make targets + +## Goal + +Provide a repo-native, non-LLM way to list every scheduled automation that +activity-core knows about. + +`ACTIVITY-WP-0018` added the status surface for questions like "How did our +automations go since Friday?". The next operator question is the inventory +baseline: "What automations are scheduled at all?" That should be answerable +through Make targets backed by activity-core's own ActivityDefinitions, +database, and Temporal schedule metadata when available, independent of any +coding assistant automation infrastructure. + +## Review notes + +- `Makefile` currently exposes `automation-status` and + `automation-status-json`, but no dedicated inventory/list target. +- `scripts/automation_status.py` and `src/activity_core/automation_status.py` + already load scheduled ActivityDefinitions and compute their Temporal schedule + ids. The inventory target should reuse that parsing/loading posture where it + fits rather than creating a second discovery path. +- `make sync-schedules` reconciles Temporal schedules from the + `activity_definitions` database, but it is an action target, not a read-only + operator inventory command. +- The inventory command should remain useful in degraded local mode: file-backed + definitions are enough to list configured scheduled automations, while live + DB and Temporal visibility can enrich the output. + +## Task: Define the automation inventory contract + +```task +id: ACTIVITY-WP-0019-T01 +status: todo +priority: high +state_hub_task_id: "8de24590-f9ee-4d0e-8692-b7ada9f232ed" +``` + +Define the fields and source precedence for a deterministic scheduled +automation inventory report. + +Acceptance: + +- The report includes every ActivityDefinition with `trigger_type` of `cron` or + `scheduled`, including disabled definitions. +- Each row includes id, name, enabled/disabled state, trigger type, schedule + expression or one-shot datetime, timezone, overlap/catchup policy when known, + and the derived Temporal schedule id. +- The report identifies its source for each row: database, repo definition file, + Temporal visibility, or a combination. +- If Temporal is reachable, the report adds paused/missing/drift hints without + mutating schedules. +- Missing optional sources produce warnings, not silent omissions. +- The JSON shape is stable enough for scripts and tests. + +## Task: Implement a non-mutating inventory CLI + +```task +id: ACTIVITY-WP-0019-T02 +status: todo +priority: high +state_hub_task_id: "538cb9a5-48f3-470c-8518-29ee66c96678" +``` + +Add a deterministic CLI path for listing scheduled automations without requiring +LLM credentials or coding assistant tooling. + +Acceptance: + +- A script or module command, likely sharing code with + `activity_core.automation_status`, supports human and JSON output. +- The command is read-only: it does not call `sync-schedules`, upsert schedules, + delete schedules, enqueue workflows, or write State Hub evidence. +- It supports filters by activity id, activity name, enabled state, and trigger + type. +- It loads from the database when configured and falls back to repo definition + files when the database is unavailable or explicitly disabled. +- It optionally enriches rows from Temporal when `TEMPORAL_HOST` is configured, + with bounded timeouts so an unreachable service does not hang the command. +- Unit tests cover DB rows, file fallback, disabled definitions, Temporal + enrichment unavailable, and JSON output. + +## Task: Add Make targets + +```task +id: ACTIVITY-WP-0019-T03 +status: todo +priority: high +state_hub_task_id: "f2001721-07f3-42f5-a15e-0c7d1b0ed801" +``` + +Expose the inventory command through Make targets that are easy for humans, +scripts, and coding assistants to run before asking for a prose summary. + +Acceptance: + +- `make automation-list` prints a concise human-readable inventory. +- `make automation-list-json` emits the same inventory as JSON. +- Optional Make variables pass through cleanly, for example `ENABLED=true`, + `TRIGGER=cron`, `ACTIVITY_ID=`, or `FORMAT=json`. +- `make help` lists both targets with clear one-line descriptions. +- The targets do not require LLM access, Codex automation tooling, or + interactive prompts. + +## Task: Document the inventory workflow + +```task +id: ACTIVITY-WP-0019-T04 +status: todo +priority: medium +state_hub_task_id: "f687743b-3936-413e-ae50-d35484ae9a81" +``` + +Update operator documentation so the scheduled automation inventory path is +discoverable next to the status path. + +Acceptance: + +- `docs/runbook.md` documents `make automation-list` and + `make automation-list-json`. +- The docs distinguish inventory from status: inventory answers what is + configured; status answers what happened in a time window. +- The docs state that the command is read-only and uses activity-core-owned + scheduling evidence. +- The docs include a compact example of the expected human output. + +## Task: Verify against current repo and live/degraded sources + +```task +id: ACTIVITY-WP-0019-T05 +status: todo +priority: medium +state_hub_task_id: "5317b532-5cef-4eff-b6d8-3e85bbca8e8a" +``` + +Prove the target against the current scheduled automation definitions and +degraded local conditions. + +Acceptance: + +- `make automation-list` shows the current scheduled automations, including + daily triage and weekly scheduled definitions when present in the selected + source. +- JSON output is valid and includes the same rows. +- A DB-unavailable run falls back to repo definition files or reports a clear + warning if no definitions are discoverable. +- A Temporal-unavailable run exits successfully with Temporal warnings rather + than hanging. +- Focused tests pass and the result is recorded in this workplan before the + workplan is moved to `finished`.