from __future__ import annotations import uuid from datetime import datetime, timezone from types import SimpleNamespace from typing import Any import pytest from activity_core import sync_schedules def _row( *, activity_id: uuid.UUID, enabled: bool, trigger_config: dict[str, Any], ) -> SimpleNamespace: return SimpleNamespace( id=activity_id, name=f"definition-{activity_id}", enabled=enabled, trigger_config=trigger_config, context_sources=[], task_templates=[], dedupe_key_strategy="skip", version=1, ) @pytest.mark.asyncio async def test_sync_schedule_rows_reports_drift_counts_and_preserves_one_shots( monkeypatch, ) -> None: new_id = uuid.uuid4() disabled_old_id = uuid.uuid4() one_shot_id = uuid.uuid4() orphan_id = uuid.uuid4() upserted: list[tuple[uuid.UUID, bool, str]] = [] deleted: list[str] = [] async def fake_upsert_schedule(client: object, defn: object) -> None: upserted.append(( defn.id, defn.enabled, defn.trigger_config.trigger_type, )) async def fake_list_schedules(client: object) -> list[dict[str, str]]: return [ { "schedule_id": f"activity-schedule-{disabled_old_id}", "activity_id": str(disabled_old_id), }, { "schedule_id": f"activity-schedule-{one_shot_id}-once", "activity_id": f"{one_shot_id}-once", }, { "schedule_id": f"activity-schedule-{orphan_id}", "activity_id": str(orphan_id), }, ] async def fake_delete_schedule(client: object, activity_id: str) -> None: deleted.append(activity_id) monkeypatch.setattr(sync_schedules, "upsert_schedule", fake_upsert_schedule) monkeypatch.setattr(sync_schedules, "list_schedules", fake_list_schedules) monkeypatch.setattr(sync_schedules, "delete_schedule", fake_delete_schedule) result = await sync_schedules.sync_schedule_rows( object(), [ _row( activity_id=new_id, enabled=True, trigger_config={ "trigger_type": "cron", "cron_expression": "20 7 * * *", "timezone": "Europe/Berlin", "misfire_policy": "skip", }, ), _row( activity_id=disabled_old_id, enabled=False, trigger_config={ "trigger_type": "cron", "cron_expression": "20 * * * *", "timezone": "Europe/Berlin", "misfire_policy": "skip", }, ), _row( activity_id=one_shot_id, enabled=True, trigger_config={ "trigger_type": "scheduled", "at": datetime(2026, 6, 19, 8, 0, tzinfo=timezone.utc), "timezone": "UTC", }, ), _row( activity_id=uuid.uuid4(), enabled=True, trigger_config={ "trigger_type": "event", "event_type": "kaizen.metrics.recorded", "filters": {}, }, ), ], ) assert result.to_dict() == { "upserted": 2, "paused": 1, "deleted_orphans": 1, } assert upserted == [ (new_id, True, "cron"), (disabled_old_id, False, "cron"), (one_shot_id, True, "scheduled"), ] assert deleted == [str(orphan_id)]