diff --git a/docs/runbook.md b/docs/runbook.md index 3617bd0..2344888 100644 --- a/docs/runbook.md +++ b/docs/runbook.md @@ -116,7 +116,40 @@ asyncio.run(publish()) --- -## Syncing schedules manually +## Syncing definitions and schedules manually + +When the API is running, prefer the admin sync endpoint for definition or +schedule changes. It refreshes file-backed ActivityDefinitions and reconciles +Temporal Schedules without restarting the worker: + +```bash +curl -s -X POST \ + 'http://localhost:8010/admin/sync?definitions=true&schedules=true' +``` + +The response reports: + +- `definitions.synced` +- `event_types.synced` +- `schedules.upserted` +- `schedules.paused` +- `schedules.deleted_orphans` +- bounded `errors[]` + +`event_types` defaults to `false` for this endpoint because event-triggered +definitions already reload from the DB in the event router path; opt in when +the operator intentionally changed event type definition files: + +```bash +curl -s -X POST \ + 'http://localhost:8010/admin/sync?definitions=true&schedules=true&event_types=true' +``` + +The v1 posture is manual/operator-triggered sync. A periodic background loop is +deferred until live use shows it is needed; this keeps customer definition +changes explicit and avoids background repo scanning from the worker. + +If the API is unavailable, the schedule-only CLI remains available: ```bash TEMPORAL_HOST=localhost:7233 \ @@ -126,7 +159,7 @@ ACTCORE_DB_URL=postgresql+asyncpg://actcore:actcore@localhost:5433/actcore \ This reconciles all Temporal Schedules with the `activity_definitions` table: - Upserts schedules for every enabled cron definition -- Creates paused schedules for disabled cron definitions +- Creates paused schedules for disabled cron or one-shot scheduled definitions - Deletes orphaned schedules with no matching DB row After adding or changing a recurring ActivityDefinition or workflow activity diff --git a/src/activity_core/api.py b/src/activity_core/api.py index 2b527ab..bc03d11 100644 --- a/src/activity_core/api.py +++ b/src/activity_core/api.py @@ -40,6 +40,7 @@ from temporalio.client import Client from activity_core.models import ActivityDefinition, CronTriggerConfig from activity_core.orm import ActivityDefinition as ActivityDefinitionRow, EventType as EventTypeRow from activity_core.schedule_manager import delete_schedule, upsert_schedule +from activity_core.sync_service import run_sync from activity_core.webhook_receiver import router as webhook_router TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233") @@ -275,6 +276,24 @@ async def trigger_definition(definition_id: uuid.UUID) -> dict[str, str]: return {"workflow_id": handle.id, "trigger_key": trigger_key} +# --- Admin sync --------------------------------------------------------------- + +@app.post("/admin/sync") +async def admin_sync( + definitions: bool = True, + schedules: bool = True, + event_types: bool = False, +) -> dict[str, Any]: + """Run operator-triggered definition/event/schedule sync without restart.""" + return await run_sync( + session_factory=_get_db(), + temporal_client=_get_temporal() if schedules else None, + definitions=definitions, + schedules=schedules, + event_types=event_types, + ) + + # T42: Curator gate — event type approval endpoint @app.get("/health") diff --git a/src/activity_core/sync_schedules.py b/src/activity_core/sync_schedules.py index f97b781..ab9a2b3 100644 --- a/src/activity_core/sync_schedules.py +++ b/src/activity_core/sync_schedules.py @@ -15,6 +15,8 @@ import asyncio import logging import os import uuid +from dataclasses import dataclass +from typing import Sequence from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine @@ -30,6 +32,20 @@ TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233") TEMPORAL_NAMESPACE = os.environ.get("TEMPORAL_NAMESPACE", "default") +@dataclass +class ScheduleSyncResult: + upserted: int = 0 + paused: int = 0 + deleted_orphans: int = 0 + + def to_dict(self) -> dict[str, int]: + return { + "upserted": self.upserted, + "paused": self.paused, + "deleted_orphans": self.deleted_orphans, + } + + def _row_to_domain(row: ActivityDefinitionRow) -> ActivityDefinition: """Convert an ORM row to a domain ActivityDefinition for schedule_manager.""" return ActivityDefinition.model_validate( @@ -46,12 +62,82 @@ def _row_to_domain(row: ActivityDefinitionRow) -> ActivityDefinition: ) -async def sync(client: Client, db_url: str) -> None: +def _valid_schedule_activity_id(defn: ActivityDefinition) -> str: + if isinstance(defn.trigger_config, ScheduledTriggerConfig): + return f"{defn.id}-once" + return str(defn.id) + + +async def _load_schedule_rows( + session_factory: async_sessionmaker[AsyncSession], +) -> Sequence[ActivityDefinitionRow]: + async with session_factory() as session: + return ( + await session.scalars( + select(ActivityDefinitionRow).where( + ActivityDefinitionRow.trigger_type.in_(["cron", "scheduled"]) + ) + ) + ).all() + + +async def sync_schedule_rows( + client: Client, + rows: Sequence[ActivityDefinitionRow], +) -> ScheduleSyncResult: + """Reconcile Temporal Schedules against already-loaded definition rows.""" + valid_schedule_activity_ids: set[str] = set() + result = ScheduleSyncResult() + + for row in rows: + defn = _row_to_domain(row) + if not isinstance( + defn.trigger_config, + (CronTriggerConfig, ScheduledTriggerConfig), + ): + continue + + valid_schedule_activity_ids.add(_valid_schedule_activity_id(defn)) + + await upsert_schedule(client, defn) + if defn.enabled: + result.upserted += 1 + logger.info("upserted schedule for activity %s (%s)", defn.id, defn.name) + else: + result.paused += 1 + logger.info("upserted paused schedule for disabled activity %s", defn.id) + + # Tombstone cleanup: remove Temporal Schedules with no matching DB row. + existing_schedules = await list_schedules(client) + for entry in existing_schedules: + if entry["activity_id"] not in valid_schedule_activity_ids: + await delete_schedule(client, entry["activity_id"]) + result.deleted_orphans += 1 + logger.info("deleted orphaned schedule %s", entry["schedule_id"]) + + logger.info( + "sync_schedules complete — upserted=%d paused=%d deleted_orphans=%d", + result.upserted, + result.paused, + result.deleted_orphans, + ) + return result + + +async def sync_with_session_factory( + client: Client, + session_factory: async_sessionmaker[AsyncSession], +) -> ScheduleSyncResult: + """Reconcile Temporal Schedules using an existing DB session factory.""" + return await sync_schedule_rows(client, await _load_schedule_rows(session_factory)) + + +async def sync(client: Client, db_url: str) -> ScheduleSyncResult: """Reconcile Temporal Schedules against the ActivityDefinition table. Steps: - 1. Load all enabled cron ActivityDefinitions from Postgres. - 2. Upsert a Temporal Schedule for each one. + 1. Load all cron/scheduled ActivityDefinitions from Postgres. + 2. Upsert a Temporal Schedule for each one, paused when disabled. 3. Delete Temporal Schedules whose activity_id has no matching DB row (tombstone cleanup for deleted or trigger-type-changed definitions). """ @@ -59,55 +145,10 @@ async def sync(client: Client, db_url: str) -> None: session_factory = async_sessionmaker(engine, expire_on_commit=False) try: - async with session_factory() as session: - rows = ( - await session.scalars( - select(ActivityDefinitionRow).where( - ActivityDefinitionRow.trigger_type.in_(["cron", "scheduled"]) - ) - ) - ).all() + return await sync_with_session_factory(client, session_factory) finally: await engine.dispose() - db_activity_ids: set[str] = set() - upserted = 0 - skipped = 0 - - for row in rows: - defn = _row_to_domain(row) - if not isinstance(defn.trigger_config, (CronTriggerConfig, ScheduledTriggerConfig)): - continue - - db_activity_ids.add(str(defn.id)) - - if defn.enabled: - await upsert_schedule(client, defn) - upserted += 1 - logger.info("upserted schedule for activity %s (%s)", defn.id, defn.name) - else: - # Disabled definitions: schedule may exist (paused) — leave it; - # upsert_schedule already handles the paused state. - await upsert_schedule(client, defn) - skipped += 1 - logger.info("upserted paused schedule for disabled activity %s", defn.id) - - # Tombstone cleanup: remove Temporal Schedules with no matching DB row. - existing_schedules = await list_schedules(client) - deleted = 0 - for entry in existing_schedules: - if entry["activity_id"] not in db_activity_ids: - await delete_schedule(client, entry["activity_id"]) - deleted += 1 - logger.info("deleted orphaned schedule %s", entry["schedule_id"]) - - logger.info( - "sync_schedules complete — upserted=%d skipped_disabled=%d deleted_orphans=%d", - upserted, - skipped, - deleted, - ) - async def main() -> None: logging.basicConfig(level=logging.INFO) @@ -116,7 +157,13 @@ async def main() -> None: raise RuntimeError("ACTCORE_DB_URL is required") client = await Client.connect(TEMPORAL_HOST, namespace=TEMPORAL_NAMESPACE) - await sync(client, db_url) + result = await sync(client, db_url) + print( + "Synced schedules: " + f"upserted={result.upserted} " + f"paused={result.paused} " + f"deleted_orphans={result.deleted_orphans}" + ) if __name__ == "__main__": diff --git a/src/activity_core/sync_service.py b/src/activity_core/sync_service.py new file mode 100644 index 0000000..53b7e14 --- /dev/null +++ b/src/activity_core/sync_service.py @@ -0,0 +1,97 @@ +"""Shared ActivityDefinition/event type/schedule sync orchestration.""" + +from __future__ import annotations + +from typing import Any + +from temporalio.client import Client + +from activity_core.event_type_registry import sync_event_types +from activity_core.sync_activity_definitions import sync as sync_activity_definitions +from activity_core.sync_schedules import ScheduleSyncResult, sync_with_session_factory + +_MAX_ERRORS = 20 +_MAX_ERROR_MESSAGE_LENGTH = 1000 + + +def _empty_result( + *, + definitions: bool, + schedules: bool, + event_types: bool, +) -> dict[str, Any]: + return { + "ok": True, + "ran": { + "definitions": definitions, + "schedules": schedules, + "event_types": event_types, + }, + "definitions": {"synced": 0}, + "event_types": {"synced": 0}, + "schedules": ScheduleSyncResult().to_dict(), + "errors": [], + } + + +def _record_error(result: dict[str, Any], stage: str, exc: Exception) -> None: + errors = result["errors"] + if len(errors) >= _MAX_ERRORS: + return + errors.append( + { + "stage": stage, + "type": type(exc).__name__, + "message": str(exc)[:_MAX_ERROR_MESSAGE_LENGTH], + } + ) + result["ok"] = False + + +async def run_sync( + *, + session_factory: Any, + temporal_client: Client | None, + definitions: bool = True, + schedules: bool = True, + event_types: bool = False, +) -> dict[str, Any]: + """Run the requested sync stages and return bounded operator-facing status. + + The orchestration deliberately accepts its database and Temporal + dependencies as arguments so startup and the API can share the same behavior + without creating another global runtime. + """ + result = _empty_result( + definitions=definitions, + schedules=schedules, + event_types=event_types, + ) + + if definitions: + try: + result["definitions"]["synced"] = await sync_activity_definitions( + session_factory + ) + except Exception as exc: # pragma: no cover - exercised through tests + _record_error(result, "definitions", exc) + + if event_types: + try: + result["event_types"]["synced"] = await sync_event_types(session_factory) + except Exception as exc: # pragma: no cover - exercised through tests + _record_error(result, "event_types", exc) + + if schedules: + try: + if temporal_client is None: + raise RuntimeError("Temporal client is required for schedule sync") + schedule_result = await sync_with_session_factory( + temporal_client, + session_factory, + ) + result["schedules"] = schedule_result.to_dict() + except Exception as exc: # pragma: no cover - exercised through tests + _record_error(result, "schedules", exc) + + return result diff --git a/src/activity_core/worker.py b/src/activity_core/worker.py index fedb05c..dae6b46 100644 --- a/src/activity_core/worker.py +++ b/src/activity_core/worker.py @@ -46,8 +46,7 @@ from activity_core.activities import ( ) from activity_core.db import make_engine from sqlalchemy.ext.asyncio import async_sessionmaker -from activity_core.sync_activity_definitions import sync as sync_activity_defs -from activity_core.sync_schedules import sync as sync_schedules +from activity_core.sync_service import run_sync from activity_core.workflows import RunActivityWorkflow, TaskExecutorWorkflow logger = logging.getLogger(__name__) @@ -77,20 +76,26 @@ async def run() -> None: TEMPORAL_HOST, namespace=TEMPORAL_NAMESPACE, runtime=runtime ) - # T45: Sync ActivityDefinition files into DB before schedule sync. - logger.info("Syncing ActivityDefinition files...") + logger.info("Syncing ActivityDefinitions and Temporal Schedules...") + sync_engine = make_engine(db_url) + session_factory = async_sessionmaker(sync_engine, expire_on_commit=False) try: - session_factory = async_sessionmaker(make_engine(db_url), expire_on_commit=False) - await sync_activity_defs(session_factory) - except Exception: - logger.exception("activity definition sync failed — continuing worker startup") - - # T23: Sync Temporal Schedules with the DB before workers start accepting tasks. - logger.info("Syncing Temporal Schedules with ActivityDefinition DB...") - try: - await sync_schedules(client, db_url) - except Exception: - logger.exception("schedule sync failed — continuing worker startup") + sync_result = await run_sync( + session_factory=session_factory, + temporal_client=client, + definitions=True, + schedules=True, + event_types=False, + ) + for error in sync_result["errors"]: + logger.error( + "startup sync %s failed — %s: %s", + error["stage"], + error["type"], + error["message"], + ) + finally: + await sync_engine.dispose() orchestrator_worker = Worker( client, diff --git a/tests/test_admin_sync_api.py b/tests/test_admin_sync_api.py new file mode 100644 index 0000000..9e844b1 --- /dev/null +++ b/tests/test_admin_sync_api.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +from typing import Any + +import pytest + +from activity_core import api + + +@pytest.mark.asyncio +async def test_admin_sync_definitions_only_does_not_require_temporal( + monkeypatch, +) -> None: + seen: dict[str, Any] = {} + + async def fake_run_sync(**kwargs: Any) -> dict[str, Any]: + seen.update(kwargs) + return {"ok": True, "ran": {"definitions": True}} + + monkeypatch.setattr(api, "_session_factory", object()) + monkeypatch.setattr(api, "_temporal_client", None) + monkeypatch.setattr(api, "run_sync", fake_run_sync) + + result = await api.admin_sync( + definitions=True, + schedules=False, + event_types=False, + ) + + assert result == {"ok": True, "ran": {"definitions": True}} + assert seen["session_factory"] is api._session_factory + assert seen["temporal_client"] is None + assert seen["definitions"] is True + assert seen["schedules"] is False + assert seen["event_types"] is False + + +@pytest.mark.asyncio +async def test_admin_sync_schedules_only_passes_temporal(monkeypatch) -> None: + temporal = object() + seen: dict[str, Any] = {} + + async def fake_run_sync(**kwargs: Any) -> dict[str, Any]: + seen.update(kwargs) + return { + "ok": True, + "schedules": { + "upserted": 1, + "paused": 0, + "deleted_orphans": 0, + }, + } + + monkeypatch.setattr(api, "_session_factory", object()) + monkeypatch.setattr(api, "_temporal_client", temporal) + monkeypatch.setattr(api, "run_sync", fake_run_sync) + + result = await api.admin_sync( + definitions=False, + schedules=True, + event_types=False, + ) + + assert result["schedules"]["upserted"] == 1 + assert seen["temporal_client"] is temporal + assert seen["definitions"] is False + assert seen["schedules"] is True + assert seen["event_types"] is False + + +@pytest.mark.asyncio +async def test_admin_sync_all_sync_returns_failure_result(monkeypatch) -> None: + async def fake_run_sync(**kwargs: Any) -> dict[str, Any]: + return { + "ok": False, + "ran": { + "definitions": kwargs["definitions"], + "schedules": kwargs["schedules"], + "event_types": kwargs["event_types"], + }, + "errors": [ + { + "stage": "event_types", + "type": "RuntimeError", + "message": "bad event type", + } + ], + } + + monkeypatch.setattr(api, "_session_factory", object()) + monkeypatch.setattr(api, "_temporal_client", object()) + monkeypatch.setattr(api, "run_sync", fake_run_sync) + + result = await api.admin_sync( + definitions=True, + schedules=True, + event_types=True, + ) + + assert result == { + "ok": False, + "ran": { + "definitions": True, + "schedules": True, + "event_types": True, + }, + "errors": [ + { + "stage": "event_types", + "type": "RuntimeError", + "message": "bad event type", + } + ], + } diff --git a/tests/test_sync_schedules.py b/tests/test_sync_schedules.py new file mode 100644 index 0000000..4cb3958 --- /dev/null +++ b/tests/test_sync_schedules.py @@ -0,0 +1,126 @@ +from __future__ import annotations + +import uuid +from datetime import datetime, timezone +from types import SimpleNamespace +from typing import Any + +import pytest + +from activity_core import sync_schedules + + +def _row( + *, + activity_id: uuid.UUID, + enabled: bool, + trigger_config: dict[str, Any], +) -> SimpleNamespace: + return SimpleNamespace( + id=activity_id, + name=f"definition-{activity_id}", + enabled=enabled, + trigger_config=trigger_config, + context_sources=[], + task_templates=[], + dedupe_key_strategy="skip", + version=1, + ) + + +@pytest.mark.asyncio +async def test_sync_schedule_rows_reports_drift_counts_and_preserves_one_shots( + monkeypatch, +) -> None: + new_id = uuid.uuid4() + disabled_old_id = uuid.uuid4() + one_shot_id = uuid.uuid4() + orphan_id = uuid.uuid4() + upserted: list[tuple[uuid.UUID, bool, str]] = [] + deleted: list[str] = [] + + async def fake_upsert_schedule(client: object, defn: object) -> None: + upserted.append(( + defn.id, + defn.enabled, + defn.trigger_config.trigger_type, + )) + + async def fake_list_schedules(client: object) -> list[dict[str, str]]: + return [ + { + "schedule_id": f"activity-schedule-{disabled_old_id}", + "activity_id": str(disabled_old_id), + }, + { + "schedule_id": f"activity-schedule-{one_shot_id}-once", + "activity_id": f"{one_shot_id}-once", + }, + { + "schedule_id": f"activity-schedule-{orphan_id}", + "activity_id": str(orphan_id), + }, + ] + + async def fake_delete_schedule(client: object, activity_id: str) -> None: + deleted.append(activity_id) + + monkeypatch.setattr(sync_schedules, "upsert_schedule", fake_upsert_schedule) + monkeypatch.setattr(sync_schedules, "list_schedules", fake_list_schedules) + monkeypatch.setattr(sync_schedules, "delete_schedule", fake_delete_schedule) + + result = await sync_schedules.sync_schedule_rows( + object(), + [ + _row( + activity_id=new_id, + enabled=True, + trigger_config={ + "trigger_type": "cron", + "cron_expression": "20 7 * * *", + "timezone": "Europe/Berlin", + "misfire_policy": "skip", + }, + ), + _row( + activity_id=disabled_old_id, + enabled=False, + trigger_config={ + "trigger_type": "cron", + "cron_expression": "20 * * * *", + "timezone": "Europe/Berlin", + "misfire_policy": "skip", + }, + ), + _row( + activity_id=one_shot_id, + enabled=True, + trigger_config={ + "trigger_type": "scheduled", + "at": datetime(2026, 6, 19, 8, 0, tzinfo=timezone.utc), + "timezone": "UTC", + }, + ), + _row( + activity_id=uuid.uuid4(), + enabled=True, + trigger_config={ + "trigger_type": "event", + "event_type": "kaizen.metrics.recorded", + "filters": {}, + }, + ), + ], + ) + + assert result.to_dict() == { + "upserted": 2, + "paused": 1, + "deleted_orphans": 1, + } + assert upserted == [ + (new_id, True, "cron"), + (disabled_old_id, False, "cron"), + (one_shot_id, True, "scheduled"), + ] + assert deleted == [str(orphan_id)] diff --git a/tests/test_sync_service.py b/tests/test_sync_service.py new file mode 100644 index 0000000..4a3d252 --- /dev/null +++ b/tests/test_sync_service.py @@ -0,0 +1,134 @@ +from __future__ import annotations + +from typing import Any + +import pytest + +from activity_core import sync_service +from activity_core.sync_schedules import ScheduleSyncResult + + +@pytest.mark.asyncio +async def test_run_sync_runs_requested_sections(monkeypatch) -> None: + calls: list[str] = [] + + async def fake_definitions(session_factory: object) -> int: + calls.append("definitions") + return 2 + + async def fake_event_types(session_factory: object) -> int: + calls.append("event_types") + return 5 + + async def fake_schedules( + temporal_client: object, + session_factory: object, + ) -> ScheduleSyncResult: + calls.append("schedules") + return ScheduleSyncResult(upserted=3, paused=1, deleted_orphans=2) + + monkeypatch.setattr(sync_service, "sync_activity_definitions", fake_definitions) + monkeypatch.setattr(sync_service, "sync_event_types", fake_event_types) + monkeypatch.setattr(sync_service, "sync_with_session_factory", fake_schedules) + + result = await sync_service.run_sync( + session_factory=object(), + temporal_client=object(), + definitions=True, + schedules=True, + event_types=True, + ) + + assert calls == ["definitions", "event_types", "schedules"] + assert result["ok"] is True + assert result["ran"] == { + "definitions": True, + "schedules": True, + "event_types": True, + } + assert result["definitions"] == {"synced": 2} + assert result["event_types"] == {"synced": 5} + assert result["schedules"] == { + "upserted": 3, + "paused": 1, + "deleted_orphans": 2, + } + assert result["errors"] == [] + + +@pytest.mark.asyncio +async def test_run_sync_collects_errors_and_continues(monkeypatch) -> None: + calls: list[str] = [] + + async def failing_definitions(session_factory: object) -> int: + calls.append("definitions") + raise RuntimeError("definition parse failed") + + async def fake_schedules( + temporal_client: object, + session_factory: object, + ) -> ScheduleSyncResult: + calls.append("schedules") + return ScheduleSyncResult(upserted=1) + + monkeypatch.setattr( + sync_service, + "sync_activity_definitions", + failing_definitions, + ) + monkeypatch.setattr(sync_service, "sync_with_session_factory", fake_schedules) + + result = await sync_service.run_sync( + session_factory=object(), + temporal_client=object(), + definitions=True, + schedules=True, + event_types=False, + ) + + assert calls == ["definitions", "schedules"] + assert result["ok"] is False + assert result["definitions"] == {"synced": 0} + assert result["schedules"]["upserted"] == 1 + assert result["errors"] == [ + { + "stage": "definitions", + "type": "RuntimeError", + "message": "definition parse failed", + } + ] + + +@pytest.mark.asyncio +async def test_run_sync_reports_missing_temporal_client_for_schedules() -> None: + result = await sync_service.run_sync( + session_factory=object(), + temporal_client=None, + definitions=False, + schedules=True, + event_types=False, + ) + + assert result["ok"] is False + assert result["errors"] == [ + { + "stage": "schedules", + "type": "RuntimeError", + "message": "Temporal client is required for schedule sync", + } + ] + + +def test_record_error_bounds_error_count() -> None: + result: dict[str, Any] = { + "ok": True, + "errors": [], + } + + for i in range(25): + sync_service._record_error(result, "stage", RuntimeError(f"boom {i}")) + + assert result["ok"] is False + assert len(result["errors"]) == 20 + assert result["errors"][0]["message"] == "boom 0" + assert result["errors"][-1]["message"] == "boom 19" diff --git a/workplans/ACTIVITY-WP-0012-definition-schedule-hot-reload.md b/workplans/ACTIVITY-WP-0012-definition-schedule-hot-reload.md index 4bc218e..c0c38f8 100644 --- a/workplans/ACTIVITY-WP-0012-definition-schedule-hot-reload.md +++ b/workplans/ACTIVITY-WP-0012-definition-schedule-hot-reload.md @@ -8,7 +8,7 @@ status: active owner: codex topic_slug: custodian created: "2026-06-18" -updated: "2026-06-18" +updated: "2026-06-19" state_hub_workstream_id: "8887075e-21ec-451b-b82b-cd81035c9ca5" --- @@ -39,7 +39,7 @@ a repo checkout manager or CI system. ```task id: ACTIVITY-WP-0012-T01 -status: todo +status: done priority: high state_hub_task_id: "53a7970b-7eec-47f5-ad30-bbd7c6271952" ``` @@ -57,11 +57,17 @@ Done when: - failures are collected into a bounded `errors[]` result while preserving the current startup best-effort behavior. +2026-06-19: Completed. Added `activity_core.sync_service.run_sync`, which +orchestrates ActivityDefinition, event type, and schedule sync independently +from explicit DB session factory and Temporal client dependencies. Worker +startup now calls the shared service for definitions+schedules and logs bounded +stage errors while continuing startup. + ## Add Admin Sync Endpoint ```task id: ACTIVITY-WP-0012-T02 -status: todo +status: done priority: high state_hub_task_id: "8697c761-15d1-4da0-b66b-d838218a2495" ``` @@ -80,11 +86,17 @@ Done when: - endpoint tests cover definitions-only, schedules-only, all-sync, and failure result behavior. +2026-06-19: Completed. Added `POST /admin/sync` with defaults +`definitions=true`, `schedules=true`, and `event_types=false`. The response +reports definition/event counts, schedule upsert/pause/orphan-delete counts, and +bounded `errors[]`. Tests cover definitions-only, schedules-only, all-sync, and +failure-result behavior. + ## Preserve Schedule Drift Semantics ```task id: ACTIVITY-WP-0012-T03 -status: todo +status: done priority: high state_hub_task_id: "efeac412-632c-4c90-9428-bb575ac7a624" ``` @@ -101,11 +113,18 @@ Done when: - regression tests demonstrate the Coulomb hourly-to-daily rename shape without needing a worker restart. +2026-06-19: Completed. `sync_schedules` now returns explicit counts for enabled +schedule upserts, disabled schedule pauses, and orphan deletes. Regression tests +cover the hourly-to-daily rename shape: a new enabled cron schedule is upserted, +the old disabled cron schedule is preserved as paused, unrelated orphan +schedules are deleted, event-triggered definitions do not create schedules, and +one-shot scheduled definitions are no longer mistaken for orphans. + ## Optional Background Sync Loop ```task id: ACTIVITY-WP-0012-T04 -status: todo +status: done priority: medium state_hub_task_id: "d774087b-c51d-4444-8e90-bfef43765456" ``` @@ -121,6 +140,12 @@ Done when: last error summary; - the loop does not block worker startup or workflow task processing. +2026-06-19: Completed by decision. v1 stays manual/operator-triggered through +`POST /admin/sync`; no background loop was added. The runbook records this +posture so customer definition changes stay explicit and the worker does not +start background repo scanning. A periodic loop remains a future option if live +operator use proves it is needed. + ## Live No-Restart Smoke ```task @@ -143,3 +168,7 @@ Done when non-secret State Hub evidence shows: Current wait reason: this gate depends on the implementation tasks and a cluster-owned smoke path. + +2026-06-19: Implementation tasks T01-T04 are complete and the repository test +suite passed (`192 passed, 1 skipped`). This task is now ready for a +Railiance/operator no-restart smoke using `POST /admin/sync`.