"""FastAPI REST API for activity-core. T30: CRUD for ActivityDefinition + manual one-shot trigger. Endpoints: GET /activity-definitions/ — list all GET /activity-definitions/{id} — get one POST /activity-definitions/ — create PUT /activity-definitions/{id} — update DELETE /activity-definitions/{id} — delete POST /activity-definitions/{id}/trigger — manual one-shot run Schedule lifecycle: - POST/PUT with trigger_type='cron' upserts a Temporal Schedule. - DELETE removes the Temporal Schedule if present. - /trigger starts RunActivityWorkflow directly (works for any trigger_type). Run with: TEMPORAL_HOST=localhost:7233 \ ACTCORE_DB_URL=postgresql+asyncpg://actcore:actcore@localhost:5433/actcore \ uv run uvicorn activity_core.api:app --port 8010 """ from __future__ import annotations import os import uuid from contextlib import asynccontextmanager from datetime import datetime, timezone from typing import Any from fastapi import FastAPI, HTTPException from pydantic import BaseModel from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from temporalio.client import Client from activity_core.models import ActivityDefinition, CronTriggerConfig from activity_core.orm import ActivityDefinition as ActivityDefinitionRow from activity_core.schedule_manager import delete_schedule, upsert_schedule TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233") TEMPORAL_NAMESPACE = os.environ.get("TEMPORAL_NAMESPACE", "default") _ORCHESTRATOR_TASK_QUEUE = "orchestrator-tq" # --- App state --------------------------------------------------------------- _session_factory: async_sessionmaker[AsyncSession] | None = None _temporal_client: Client | None = None @asynccontextmanager async def lifespan(app: FastAPI): # type: ignore[type-arg] global _session_factory, _temporal_client db_url = os.environ.get("ACTCORE_DB_URL") if not db_url: raise RuntimeError("ACTCORE_DB_URL is required") engine = create_async_engine(db_url) _session_factory = async_sessionmaker(engine, expire_on_commit=False) _temporal_client = await Client.connect(TEMPORAL_HOST, namespace=TEMPORAL_NAMESPACE) yield await engine.dispose() app = FastAPI(title="activity-core API", lifespan=lifespan) def _get_db() -> async_sessionmaker[AsyncSession]: assert _session_factory is not None return _session_factory def _get_temporal() -> Client: assert _temporal_client is not None return _temporal_client # --- Schemas ----------------------------------------------------------------- class ActivityDefinitionCreate(BaseModel): name: str enabled: bool = True trigger_config: dict[str, Any] context_sources: list[dict[str, Any]] = [] task_templates: list[dict[str, Any]] = [] dedupe_key_strategy: str = "skip" version: int = 1 class ActivityDefinitionUpdate(BaseModel): name: str | None = None enabled: bool | None = None trigger_config: dict[str, Any] | None = None context_sources: list[dict[str, Any]] | None = None task_templates: list[dict[str, Any]] | None = None dedupe_key_strategy: str | None = None version: int | None = None class ActivityDefinitionResponse(BaseModel): id: uuid.UUID name: str enabled: bool trigger_type: str trigger_config: dict[str, Any] context_sources: list[dict[str, Any]] task_templates: list[dict[str, Any]] dedupe_key_strategy: str version: int created_at: datetime updated_at: datetime def _row_to_response(row: ActivityDefinitionRow) -> ActivityDefinitionResponse: return ActivityDefinitionResponse( id=row.id, name=row.name, enabled=row.enabled, trigger_type=row.trigger_type, trigger_config=row.trigger_config, context_sources=row.context_sources, task_templates=row.task_templates, dedupe_key_strategy=row.dedupe_key_strategy, version=row.version, created_at=row.created_at, updated_at=row.updated_at, ) async def _upsert_schedule_if_cron(row: ActivityDefinitionRow) -> None: """Upsert a Temporal Schedule for the row if it uses a cron trigger.""" try: defn = ActivityDefinition.model_validate( { "id": row.id, "name": row.name, "enabled": row.enabled, "trigger_config": row.trigger_config, "context_sources": row.context_sources, "task_templates": row.task_templates, "dedupe_key_strategy": row.dedupe_key_strategy, "version": row.version, } ) if isinstance(defn.trigger_config, CronTriggerConfig): await upsert_schedule(_get_temporal(), defn) except Exception: pass # Schedule management is best-effort; don't fail the API call. # --- Routes ------------------------------------------------------------------ @app.get("/activity-definitions/", response_model=list[ActivityDefinitionResponse]) async def list_definitions() -> list[ActivityDefinitionResponse]: """List all ActivityDefinitions.""" Session = _get_db() async with Session() as session: rows = (await session.scalars(select(ActivityDefinitionRow))).all() return [_row_to_response(r) for r in rows] @app.get("/activity-definitions/{definition_id}", response_model=ActivityDefinitionResponse) async def get_definition(definition_id: uuid.UUID) -> ActivityDefinitionResponse: """Get one ActivityDefinition by ID.""" Session = _get_db() async with Session() as session: row = await session.get(ActivityDefinitionRow, definition_id) if row is None: raise HTTPException(status_code=404, detail="ActivityDefinition not found") return _row_to_response(row) @app.post("/activity-definitions/", response_model=ActivityDefinitionResponse, status_code=201) async def create_definition(body: ActivityDefinitionCreate) -> ActivityDefinitionResponse: """Create a new ActivityDefinition. Upserts a Temporal Schedule if trigger_type='cron'.""" trigger_type = body.trigger_config.get("trigger_type", "") row = ActivityDefinitionRow( id=uuid.uuid4(), name=body.name, enabled=body.enabled, trigger_type=trigger_type, trigger_config=body.trigger_config, context_sources=body.context_sources, task_templates=body.task_templates, dedupe_key_strategy=body.dedupe_key_strategy, version=body.version, ) Session = _get_db() async with Session() as session: async with session.begin(): session.add(row) await _upsert_schedule_if_cron(row) return _row_to_response(row) @app.put("/activity-definitions/{definition_id}", response_model=ActivityDefinitionResponse) async def update_definition( definition_id: uuid.UUID, body: ActivityDefinitionUpdate ) -> ActivityDefinitionResponse: """Update an ActivityDefinition. Re-upserts the Temporal Schedule if trigger_type='cron'.""" Session = _get_db() async with Session() as session: row = await session.get(ActivityDefinitionRow, definition_id) if row is None: raise HTTPException(status_code=404, detail="ActivityDefinition not found") if body.name is not None: row.name = body.name if body.enabled is not None: row.enabled = body.enabled if body.trigger_config is not None: row.trigger_config = body.trigger_config row.trigger_type = body.trigger_config.get("trigger_type", row.trigger_type) if body.context_sources is not None: row.context_sources = body.context_sources if body.task_templates is not None: row.task_templates = body.task_templates if body.dedupe_key_strategy is not None: row.dedupe_key_strategy = body.dedupe_key_strategy if body.version is not None: row.version = body.version async with session.begin(): session.add(row) await _upsert_schedule_if_cron(row) return _row_to_response(row) @app.delete("/activity-definitions/{definition_id}", status_code=204) async def delete_definition(definition_id: uuid.UUID) -> None: """Delete an ActivityDefinition and its Temporal Schedule if present.""" Session = _get_db() async with Session() as session: row = await session.get(ActivityDefinitionRow, definition_id) if row is None: raise HTTPException(status_code=404, detail="ActivityDefinition not found") async with session.begin(): await session.delete(row) await delete_schedule(_get_temporal(), definition_id) @app.post("/activity-definitions/{definition_id}/trigger", status_code=202) async def trigger_definition(definition_id: uuid.UUID) -> dict[str, str]: """Manually trigger a one-shot RunActivityWorkflow for any ActivityDefinition.""" Session = _get_db() async with Session() as session: row = await session.get(ActivityDefinitionRow, definition_id) if row is None: raise HTTPException(status_code=404, detail="ActivityDefinition not found") trigger_key = f"manual-{uuid.uuid4()}" workflow_id = f"activity-{definition_id}:{trigger_key}" handle = await _get_temporal().start_workflow( "RunActivityWorkflow", args=[str(definition_id), trigger_key, datetime.now(tz=timezone.utc).isoformat()], id=workflow_id, task_queue=_ORCHESTRATOR_TASK_QUEUE, ) return {"workflow_id": handle.id, "trigger_key": trigger_key}