from __future__ import annotations import asyncio import json from datetime import datetime from pathlib import Path from zoneinfo import ZoneInfo from activity_core import automation_status as status ACTIVITY_ID = "00000000-0000-0000-0000-000000000123" def _window(): return status.resolve_window( "2026-06-26", "2026-06-29", "Europe/Berlin", ) def _definition(enabled: bool = True): return { "id": ACTIVITY_ID, "name": "Daily Check", "enabled": enabled, "trigger_type": "cron", "trigger_config": { "trigger_type": "cron", "cron_expression": "0 9 * * *", "timezone": "Europe/Berlin", "misfire_policy": "skip", }, "source": "test", } def test_friday_shortcut_resolves_to_previous_friday_start() -> None: now = datetime(2026, 6, 29, 12, 0, tzinfo=ZoneInfo("Europe/Berlin")) window = status.resolve_window("friday", None, "Europe/Berlin", now=now) assert window["since"].isoformat() == "2026-06-26T00:00:00+02:00" assert window["until"].isoformat() == "2026-06-29T12:00:00+02:00" def test_expected_fires_for_simple_cron_window() -> None: fires = status.expected_fires(_definition(), _window()) assert fires == [ "2026-06-26T09:00:00+02:00", "2026-06-27T09:00:00+02:00", "2026-06-28T09:00:00+02:00", "2026-06-29T09:00:00+02:00", ] def test_completed_when_expected_run_exists() -> None: run = { "run_id": "run-1", "activity_id": ACTIVITY_ID, "scheduled_for": "2026-06-26T07:00:00+00:00", "fired_at": "2026-06-26T07:00:10+00:00", "tasks_spawned": 1, } report = status.classify_activity( _definition(), _window(), [run], [{"source": "state_hub_progress", "run_id": "run-1", "output_validated": True}], None, ["2026-06-26T09:00:00+02:00"], runs_available=True, ) assert report["status"] == "completed" def test_validation_failure_wins_over_completed_run() -> None: run = {"run_id": "run-1", "activity_id": ACTIVITY_ID, "scheduled_for": None, "fired_at": "2026-06-26T07:00:10+00:00"} report = status.classify_activity( _definition(), _window(), [run], [{"source": "working_memory", "run_id": "run-1", "output_validated": False}], None, ["2026-06-26T09:00:00+02:00"], runs_available=True, ) assert report["status"] == "validation_failed" def test_missed_when_expected_fire_has_no_run_and_runs_available() -> None: report = status.classify_activity( _definition(), _window(), [], [], None, ["2026-06-26T09:00:00+02:00"], runs_available=True, ) assert report["status"] == "missed" def test_disabled_schedule_is_not_counted_as_missed() -> None: report = status.classify_activity( _definition(enabled=False), _window(), [], [], None, ["2026-06-26T09:00:00+02:00"], runs_available=True, ) assert report["status"] == "disabled" def test_scheduled_definition_reports_one_shot_schedule_id() -> None: definition = { "id": ACTIVITY_ID, "name": "One Shot", "enabled": True, "trigger_type": "scheduled", "trigger_config": { "trigger_type": "scheduled", "at": "2026-06-26T09:00:00+02:00", "timezone": "Europe/Berlin", }, "source": "test", } report = status.classify_activity( definition, _window(), [], [], None, ["2026-06-26T09:00:00+02:00"], runs_available=False, ) assert status.automation_schedule_id(_definition()) == f"activity-schedule-{ACTIVITY_ID}" assert report["schedule_id"] == f"activity-schedule-{ACTIVITY_ID}-once" def test_partial_source_availability_is_unknown_not_missed() -> None: report = status.classify_activity( _definition(), _window(), [], [], None, ["2026-06-26T09:00:00+02:00"], runs_available=False, ) assert report["status"] == "unknown" assert "missed-run verdict is unknown" in report["warnings"][0] def test_working_memory_frontmatter_evidence(tmp_path: Path) -> None: note = tmp_path / "daily-triage-2026-06-26-run.md" note.write_text( "---\n" "source: activity-core\n" f"activity_id: {ACTIVITY_ID}\n" "activity_core_run_id: run-1\n" "scheduled_for: 2026-06-26T07:00:00+00:00\n" "output_validated: false\n" "created: 2026-06-26T07:01:00+00:00\n" "---\n" "body\n", encoding="utf-8", ) evidence, source = status.load_working_memory_evidence(str(tmp_path), _window()) assert source["status"] == "ok" assert evidence[0]["run_id"] == "run-1" assert evidence[0]["output_validated"] is False def _scheduled_definition(enabled: bool = False): return { "id": "00000000-0000-0000-0000-000000000456", "name": "One Shot", "enabled": enabled, "trigger_type": "scheduled", "trigger_config": { "trigger_type": "scheduled", "at": "2026-06-26T09:00:00+02:00", "timezone": "Europe/Berlin", }, "source": "db", } def test_inventory_report_uses_db_definition_rows(monkeypatch) -> None: async def fake_load_definitions(args, warnings): return [dict(_definition(), source="db"), _scheduled_definition()], {"status": "ok", "source": "db"} async def fake_temporal(host, namespace, definitions, *, timeout_seconds): return { ACTIVITY_ID: { "schedule_id": f"activity-schedule-{ACTIVITY_ID}", "available": True, "paused": False, "missed_catchup_window": 0, "last_fired_at": None, }, }, {"status": "ok", "count": 1} monkeypatch.setattr(status, "load_definitions", fake_load_definitions) monkeypatch.setattr(status, "load_temporal_visibility", fake_temporal) args = status.parse_inventory_args(["--format", "json"]) report, exit_code = asyncio.run(status.build_inventory_report(args)) assert exit_code == 0 assert report["sources"]["definitions"] == {"status": "ok", "source": "db"} assert report["summary"]["automation_count"] == 2 assert report["automations"][0]["definition_source"] == "db" assert report["automations"][0]["temporal"]["status"] == "active" assert report["automations"][1]["schedule_id"].endswith("-once") def test_inventory_file_fallback_when_db_url_missing(monkeypatch) -> None: monkeypatch.setattr(status, "file_definitions", lambda: [dict(_definition(), source="files")]) args = status.parse_inventory_args(["--db-url", "", "--temporal-host", ""]) report, exit_code = asyncio.run(status.build_inventory_report(args)) assert exit_code == 0 assert report["sources"]["definitions"]["status"] == "degraded" assert report["automations"][0]["definition_source"] == "files" assert "ACTCORE_DB_URL is not set" in report["warnings"][0] def test_inventory_filters_disabled_definitions() -> None: definitions = [_definition(enabled=True), _scheduled_definition(enabled=False)] filtered = status.filter_inventory_definitions( definitions, ids=[], names=[], enabled=False, trigger_types=set(), ) assert [item["name"] for item in filtered] == ["One Shot"] def test_inventory_temporal_unavailable_is_warning_not_failure(monkeypatch) -> None: async def fake_load_definitions(args, warnings): return [_definition()], {"status": "ok", "source": "db"} async def fake_temporal(host, namespace, definitions, *, timeout_seconds): return {}, {"status": "unavailable", "warning": "Temporal unavailable: nope"} monkeypatch.setattr(status, "load_definitions", fake_load_definitions) monkeypatch.setattr(status, "load_temporal_visibility", fake_temporal) args = status.parse_inventory_args([]) report, exit_code = asyncio.run(status.build_inventory_report(args)) assert exit_code == 0 assert report["automations"][0]["temporal"]["status"] == "not_checked" assert report["warnings"] == ["Temporal unavailable: nope"] def test_inventory_cli_emits_json(monkeypatch, capsys) -> None: monkeypatch.setattr(status, "file_definitions", lambda: [dict(_definition(), source="files")]) exit_code = asyncio.run(status.async_inventory_main([ "--db-url", "", "--temporal-host", "", "--format", "json", ])) payload = json.loads(capsys.readouterr().out) assert exit_code == 0 assert payload["mode"] == "automation-inventory" assert payload["automations"][0]["name"] == "Daily Check"