Add admin sync hot reload path

This commit is contained in:
2026-06-19 01:54:13 +02:00
parent 6f68f8f9ec
commit 3e93567a53
9 changed files with 676 additions and 72 deletions

View File

@@ -40,6 +40,7 @@ from temporalio.client import Client
from activity_core.models import ActivityDefinition, CronTriggerConfig
from activity_core.orm import ActivityDefinition as ActivityDefinitionRow, EventType as EventTypeRow
from activity_core.schedule_manager import delete_schedule, upsert_schedule
from activity_core.sync_service import run_sync
from activity_core.webhook_receiver import router as webhook_router
TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233")
@@ -275,6 +276,24 @@ async def trigger_definition(definition_id: uuid.UUID) -> dict[str, str]:
return {"workflow_id": handle.id, "trigger_key": trigger_key}
# --- Admin sync ---------------------------------------------------------------
@app.post("/admin/sync")
async def admin_sync(
definitions: bool = True,
schedules: bool = True,
event_types: bool = False,
) -> dict[str, Any]:
"""Run operator-triggered definition/event/schedule sync without restart."""
return await run_sync(
session_factory=_get_db(),
temporal_client=_get_temporal() if schedules else None,
definitions=definitions,
schedules=schedules,
event_types=event_types,
)
# T42: Curator gate — event type approval endpoint
@app.get("/health")

View File

@@ -15,6 +15,8 @@ import asyncio
import logging
import os
import uuid
from dataclasses import dataclass
from typing import Sequence
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
@@ -30,6 +32,20 @@ TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233")
TEMPORAL_NAMESPACE = os.environ.get("TEMPORAL_NAMESPACE", "default")
@dataclass
class ScheduleSyncResult:
upserted: int = 0
paused: int = 0
deleted_orphans: int = 0
def to_dict(self) -> dict[str, int]:
return {
"upserted": self.upserted,
"paused": self.paused,
"deleted_orphans": self.deleted_orphans,
}
def _row_to_domain(row: ActivityDefinitionRow) -> ActivityDefinition:
"""Convert an ORM row to a domain ActivityDefinition for schedule_manager."""
return ActivityDefinition.model_validate(
@@ -46,12 +62,82 @@ def _row_to_domain(row: ActivityDefinitionRow) -> ActivityDefinition:
)
async def sync(client: Client, db_url: str) -> None:
def _valid_schedule_activity_id(defn: ActivityDefinition) -> str:
if isinstance(defn.trigger_config, ScheduledTriggerConfig):
return f"{defn.id}-once"
return str(defn.id)
async def _load_schedule_rows(
session_factory: async_sessionmaker[AsyncSession],
) -> Sequence[ActivityDefinitionRow]:
async with session_factory() as session:
return (
await session.scalars(
select(ActivityDefinitionRow).where(
ActivityDefinitionRow.trigger_type.in_(["cron", "scheduled"])
)
)
).all()
async def sync_schedule_rows(
client: Client,
rows: Sequence[ActivityDefinitionRow],
) -> ScheduleSyncResult:
"""Reconcile Temporal Schedules against already-loaded definition rows."""
valid_schedule_activity_ids: set[str] = set()
result = ScheduleSyncResult()
for row in rows:
defn = _row_to_domain(row)
if not isinstance(
defn.trigger_config,
(CronTriggerConfig, ScheduledTriggerConfig),
):
continue
valid_schedule_activity_ids.add(_valid_schedule_activity_id(defn))
await upsert_schedule(client, defn)
if defn.enabled:
result.upserted += 1
logger.info("upserted schedule for activity %s (%s)", defn.id, defn.name)
else:
result.paused += 1
logger.info("upserted paused schedule for disabled activity %s", defn.id)
# Tombstone cleanup: remove Temporal Schedules with no matching DB row.
existing_schedules = await list_schedules(client)
for entry in existing_schedules:
if entry["activity_id"] not in valid_schedule_activity_ids:
await delete_schedule(client, entry["activity_id"])
result.deleted_orphans += 1
logger.info("deleted orphaned schedule %s", entry["schedule_id"])
logger.info(
"sync_schedules complete — upserted=%d paused=%d deleted_orphans=%d",
result.upserted,
result.paused,
result.deleted_orphans,
)
return result
async def sync_with_session_factory(
client: Client,
session_factory: async_sessionmaker[AsyncSession],
) -> ScheduleSyncResult:
"""Reconcile Temporal Schedules using an existing DB session factory."""
return await sync_schedule_rows(client, await _load_schedule_rows(session_factory))
async def sync(client: Client, db_url: str) -> ScheduleSyncResult:
"""Reconcile Temporal Schedules against the ActivityDefinition table.
Steps:
1. Load all enabled cron ActivityDefinitions from Postgres.
2. Upsert a Temporal Schedule for each one.
1. Load all cron/scheduled ActivityDefinitions from Postgres.
2. Upsert a Temporal Schedule for each one, paused when disabled.
3. Delete Temporal Schedules whose activity_id has no matching DB row
(tombstone cleanup for deleted or trigger-type-changed definitions).
"""
@@ -59,55 +145,10 @@ async def sync(client: Client, db_url: str) -> None:
session_factory = async_sessionmaker(engine, expire_on_commit=False)
try:
async with session_factory() as session:
rows = (
await session.scalars(
select(ActivityDefinitionRow).where(
ActivityDefinitionRow.trigger_type.in_(["cron", "scheduled"])
)
)
).all()
return await sync_with_session_factory(client, session_factory)
finally:
await engine.dispose()
db_activity_ids: set[str] = set()
upserted = 0
skipped = 0
for row in rows:
defn = _row_to_domain(row)
if not isinstance(defn.trigger_config, (CronTriggerConfig, ScheduledTriggerConfig)):
continue
db_activity_ids.add(str(defn.id))
if defn.enabled:
await upsert_schedule(client, defn)
upserted += 1
logger.info("upserted schedule for activity %s (%s)", defn.id, defn.name)
else:
# Disabled definitions: schedule may exist (paused) — leave it;
# upsert_schedule already handles the paused state.
await upsert_schedule(client, defn)
skipped += 1
logger.info("upserted paused schedule for disabled activity %s", defn.id)
# Tombstone cleanup: remove Temporal Schedules with no matching DB row.
existing_schedules = await list_schedules(client)
deleted = 0
for entry in existing_schedules:
if entry["activity_id"] not in db_activity_ids:
await delete_schedule(client, entry["activity_id"])
deleted += 1
logger.info("deleted orphaned schedule %s", entry["schedule_id"])
logger.info(
"sync_schedules complete — upserted=%d skipped_disabled=%d deleted_orphans=%d",
upserted,
skipped,
deleted,
)
async def main() -> None:
logging.basicConfig(level=logging.INFO)
@@ -116,7 +157,13 @@ async def main() -> None:
raise RuntimeError("ACTCORE_DB_URL is required")
client = await Client.connect(TEMPORAL_HOST, namespace=TEMPORAL_NAMESPACE)
await sync(client, db_url)
result = await sync(client, db_url)
print(
"Synced schedules: "
f"upserted={result.upserted} "
f"paused={result.paused} "
f"deleted_orphans={result.deleted_orphans}"
)
if __name__ == "__main__":

View File

@@ -0,0 +1,97 @@
"""Shared ActivityDefinition/event type/schedule sync orchestration."""
from __future__ import annotations
from typing import Any
from temporalio.client import Client
from activity_core.event_type_registry import sync_event_types
from activity_core.sync_activity_definitions import sync as sync_activity_definitions
from activity_core.sync_schedules import ScheduleSyncResult, sync_with_session_factory
_MAX_ERRORS = 20
_MAX_ERROR_MESSAGE_LENGTH = 1000
def _empty_result(
*,
definitions: bool,
schedules: bool,
event_types: bool,
) -> dict[str, Any]:
return {
"ok": True,
"ran": {
"definitions": definitions,
"schedules": schedules,
"event_types": event_types,
},
"definitions": {"synced": 0},
"event_types": {"synced": 0},
"schedules": ScheduleSyncResult().to_dict(),
"errors": [],
}
def _record_error(result: dict[str, Any], stage: str, exc: Exception) -> None:
errors = result["errors"]
if len(errors) >= _MAX_ERRORS:
return
errors.append(
{
"stage": stage,
"type": type(exc).__name__,
"message": str(exc)[:_MAX_ERROR_MESSAGE_LENGTH],
}
)
result["ok"] = False
async def run_sync(
*,
session_factory: Any,
temporal_client: Client | None,
definitions: bool = True,
schedules: bool = True,
event_types: bool = False,
) -> dict[str, Any]:
"""Run the requested sync stages and return bounded operator-facing status.
The orchestration deliberately accepts its database and Temporal
dependencies as arguments so startup and the API can share the same behavior
without creating another global runtime.
"""
result = _empty_result(
definitions=definitions,
schedules=schedules,
event_types=event_types,
)
if definitions:
try:
result["definitions"]["synced"] = await sync_activity_definitions(
session_factory
)
except Exception as exc: # pragma: no cover - exercised through tests
_record_error(result, "definitions", exc)
if event_types:
try:
result["event_types"]["synced"] = await sync_event_types(session_factory)
except Exception as exc: # pragma: no cover - exercised through tests
_record_error(result, "event_types", exc)
if schedules:
try:
if temporal_client is None:
raise RuntimeError("Temporal client is required for schedule sync")
schedule_result = await sync_with_session_factory(
temporal_client,
session_factory,
)
result["schedules"] = schedule_result.to_dict()
except Exception as exc: # pragma: no cover - exercised through tests
_record_error(result, "schedules", exc)
return result

View File

@@ -46,8 +46,7 @@ from activity_core.activities import (
)
from activity_core.db import make_engine
from sqlalchemy.ext.asyncio import async_sessionmaker
from activity_core.sync_activity_definitions import sync as sync_activity_defs
from activity_core.sync_schedules import sync as sync_schedules
from activity_core.sync_service import run_sync
from activity_core.workflows import RunActivityWorkflow, TaskExecutorWorkflow
logger = logging.getLogger(__name__)
@@ -77,20 +76,26 @@ async def run() -> None:
TEMPORAL_HOST, namespace=TEMPORAL_NAMESPACE, runtime=runtime
)
# T45: Sync ActivityDefinition files into DB before schedule sync.
logger.info("Syncing ActivityDefinition files...")
logger.info("Syncing ActivityDefinitions and Temporal Schedules...")
sync_engine = make_engine(db_url)
session_factory = async_sessionmaker(sync_engine, expire_on_commit=False)
try:
session_factory = async_sessionmaker(make_engine(db_url), expire_on_commit=False)
await sync_activity_defs(session_factory)
except Exception:
logger.exception("activity definition sync failed — continuing worker startup")
# T23: Sync Temporal Schedules with the DB before workers start accepting tasks.
logger.info("Syncing Temporal Schedules with ActivityDefinition DB...")
try:
await sync_schedules(client, db_url)
except Exception:
logger.exception("schedule sync failed — continuing worker startup")
sync_result = await run_sync(
session_factory=session_factory,
temporal_client=client,
definitions=True,
schedules=True,
event_types=False,
)
for error in sync_result["errors"]:
logger.error(
"startup sync %s failed — %s: %s",
error["stage"],
error["type"],
error["message"],
)
finally:
await sync_engine.dispose()
orchestrator_worker = Worker(
client,