"""FastAPI REST API for activity-core. T30: CRUD for ActivityDefinition + manual one-shot trigger. Endpoints: GET /activity-definitions/ — list all GET /activity-definitions/{id} — get one POST /activity-definitions/ — create PUT /activity-definitions/{id} — update DELETE /activity-definitions/{id} — delete POST /activity-definitions/{id}/trigger — manual one-shot run Schedule lifecycle: - POST/PUT with trigger_type='cron' upserts a Temporal Schedule. - DELETE removes the Temporal Schedule if present. - /trigger starts RunActivityWorkflow directly (works for any trigger_type). Run with: TEMPORAL_HOST=localhost:7233 \ ACTCORE_DB_URL=postgresql+asyncpg://actcore:actcore@localhost:5433/actcore \ uv run uvicorn activity_core.api:app --port 8010 """ from __future__ import annotations import os import uuid from contextlib import asynccontextmanager from datetime import datetime, timezone from typing import Any from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse from pydantic import BaseModel from sqlalchemy import select, text from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from temporalio.api.workflowservice.v1 import GetSystemInfoRequest from temporalio.client import Client from activity_core.models import ActivityDefinition, CronTriggerConfig from activity_core.orm import ActivityDefinition as ActivityDefinitionRow, EventType as EventTypeRow from activity_core.schedule_manager import delete_schedule, upsert_schedule from activity_core.webhook_receiver import router as webhook_router TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233") TEMPORAL_NAMESPACE = os.environ.get("TEMPORAL_NAMESPACE", "default") _ORCHESTRATOR_TASK_QUEUE = "orchestrator-tq" # T42: Curator gate — controls which event type statuses are accepted by the router. # "disabled" (default): accepts "active" and "pending" types (pending logged as warning). # "required": only "active" types accepted; "pending" events discarded. ACTIVITY_CURATOR_GATE = os.environ.get("ACTIVITY_CURATOR_GATE", "disabled") # --- App state --------------------------------------------------------------- _session_factory: async_sessionmaker[AsyncSession] | None = None _temporal_client: Client | None = None @asynccontextmanager async def lifespan(app: FastAPI): # type: ignore[type-arg] global _session_factory, _temporal_client db_url = os.environ.get("ACTCORE_DB_URL") if not db_url: raise RuntimeError("ACTCORE_DB_URL is required") engine = create_async_engine(db_url) _session_factory = async_sessionmaker(engine, expire_on_commit=False) _temporal_client = await Client.connect(TEMPORAL_HOST, namespace=TEMPORAL_NAMESPACE) yield await engine.dispose() app = FastAPI(title="activity-core API", lifespan=lifespan) app.include_router(webhook_router) def _get_db() -> async_sessionmaker[AsyncSession]: assert _session_factory is not None return _session_factory def _get_temporal() -> Client: assert _temporal_client is not None return _temporal_client # --- Schemas ----------------------------------------------------------------- class ActivityDefinitionCreate(BaseModel): name: str enabled: bool = True trigger_config: dict[str, Any] context_sources: list[dict[str, Any]] = [] task_templates: list[dict[str, Any]] = [] dedupe_key_strategy: str = "skip" version: int = 1 class ActivityDefinitionUpdate(BaseModel): name: str | None = None enabled: bool | None = None trigger_config: dict[str, Any] | None = None context_sources: list[dict[str, Any]] | None = None task_templates: list[dict[str, Any]] | None = None dedupe_key_strategy: str | None = None version: int | None = None class ActivityDefinitionResponse(BaseModel): id: uuid.UUID name: str enabled: bool trigger_type: str trigger_config: dict[str, Any] context_sources: list[dict[str, Any]] task_templates: list[dict[str, Any]] dedupe_key_strategy: str version: int created_at: datetime updated_at: datetime def _row_to_response(row: ActivityDefinitionRow) -> ActivityDefinitionResponse: return ActivityDefinitionResponse( id=row.id, name=row.name, enabled=row.enabled, trigger_type=row.trigger_type, trigger_config=row.trigger_config, context_sources=row.context_sources, task_templates=row.task_templates, dedupe_key_strategy=row.dedupe_key_strategy, version=row.version, created_at=row.created_at, updated_at=row.updated_at, ) async def _upsert_schedule_if_cron(row: ActivityDefinitionRow) -> None: """Upsert a Temporal Schedule for the row if it uses a cron trigger.""" try: defn = ActivityDefinition.model_validate( { "id": row.id, "name": row.name, "enabled": row.enabled, "trigger_config": row.trigger_config, "context_sources": row.context_sources, "task_templates": row.task_templates, "dedupe_key_strategy": row.dedupe_key_strategy, "version": row.version, } ) if isinstance(defn.trigger_config, CronTriggerConfig): await upsert_schedule(_get_temporal(), defn) except Exception: pass # Schedule management is best-effort; don't fail the API call. # --- Routes ------------------------------------------------------------------ @app.get("/activity-definitions/", response_model=list[ActivityDefinitionResponse]) async def list_definitions() -> list[ActivityDefinitionResponse]: """List all ActivityDefinitions.""" Session = _get_db() async with Session() as session: rows = (await session.scalars(select(ActivityDefinitionRow))).all() return [_row_to_response(r) for r in rows] @app.get("/activity-definitions/{definition_id}", response_model=ActivityDefinitionResponse) async def get_definition(definition_id: uuid.UUID) -> ActivityDefinitionResponse: """Get one ActivityDefinition by ID.""" Session = _get_db() async with Session() as session: row = await session.get(ActivityDefinitionRow, definition_id) if row is None: raise HTTPException(status_code=404, detail="ActivityDefinition not found") return _row_to_response(row) @app.post("/activity-definitions/", response_model=ActivityDefinitionResponse, status_code=201) async def create_definition(body: ActivityDefinitionCreate) -> ActivityDefinitionResponse: """Create a new ActivityDefinition. Upserts a Temporal Schedule if trigger_type='cron'.""" trigger_type = body.trigger_config.get("trigger_type", "") row = ActivityDefinitionRow( id=uuid.uuid4(), name=body.name, enabled=body.enabled, trigger_type=trigger_type, trigger_config=body.trigger_config, context_sources=body.context_sources, task_templates=body.task_templates, dedupe_key_strategy=body.dedupe_key_strategy, version=body.version, ) Session = _get_db() async with Session() as session: async with session.begin(): session.add(row) await _upsert_schedule_if_cron(row) return _row_to_response(row) @app.put("/activity-definitions/{definition_id}", response_model=ActivityDefinitionResponse) async def update_definition( definition_id: uuid.UUID, body: ActivityDefinitionUpdate ) -> ActivityDefinitionResponse: """Update an ActivityDefinition. Re-upserts the Temporal Schedule if trigger_type='cron'.""" Session = _get_db() async with Session() as session: row = await session.get(ActivityDefinitionRow, definition_id) if row is None: raise HTTPException(status_code=404, detail="ActivityDefinition not found") if body.name is not None: row.name = body.name if body.enabled is not None: row.enabled = body.enabled if body.trigger_config is not None: row.trigger_config = body.trigger_config row.trigger_type = body.trigger_config.get("trigger_type", row.trigger_type) if body.context_sources is not None: row.context_sources = body.context_sources if body.task_templates is not None: row.task_templates = body.task_templates if body.dedupe_key_strategy is not None: row.dedupe_key_strategy = body.dedupe_key_strategy if body.version is not None: row.version = body.version async with session.begin(): session.add(row) await _upsert_schedule_if_cron(row) return _row_to_response(row) @app.delete("/activity-definitions/{definition_id}", status_code=204) async def delete_definition(definition_id: uuid.UUID) -> None: """Delete an ActivityDefinition and its Temporal Schedule if present.""" Session = _get_db() async with Session() as session: row = await session.get(ActivityDefinitionRow, definition_id) if row is None: raise HTTPException(status_code=404, detail="ActivityDefinition not found") async with session.begin(): await session.delete(row) await delete_schedule(_get_temporal(), definition_id) @app.post("/activity-definitions/{definition_id}/trigger", status_code=202) async def trigger_definition(definition_id: uuid.UUID) -> dict[str, str]: """Manually trigger a one-shot RunActivityWorkflow for any ActivityDefinition.""" Session = _get_db() async with Session() as session: row = await session.get(ActivityDefinitionRow, definition_id) if row is None: raise HTTPException(status_code=404, detail="ActivityDefinition not found") trigger_key = f"manual-{uuid.uuid4()}" workflow_id = f"activity-{definition_id}:{trigger_key}" handle = await _get_temporal().start_workflow( "RunActivityWorkflow", args=[str(definition_id), trigger_key, datetime.now(tz=timezone.utc).isoformat()], id=workflow_id, task_queue=_ORCHESTRATOR_TASK_QUEUE, ) return {"workflow_id": handle.id, "trigger_key": trigger_key} # T42: Curator gate — event type approval endpoint @app.get("/health") async def health() -> JSONResponse: db_ok = False temporal_ok = False try: async with _get_db()() as session: await session.execute(text("SELECT 1")) db_ok = True except Exception: pass try: await _get_temporal().workflow_service.get_system_info(GetSystemInfoRequest()) temporal_ok = True except Exception: pass status = "ok" if db_ok and temporal_ok else "degraded" code = 200 if status == "ok" else 503 return JSONResponse( {"status": status, "db": db_ok, "temporal": temporal_ok}, status_code=code, ) @app.post("/event-types/{type_id}/approve", status_code=200) async def approve_event_type(type_id: str) -> dict[str, str]: """Approve a pending event type, setting its status to 'active'. Only relevant when ACTIVITY_CURATOR_GATE=required. Requires admin access (same auth as the rest of the API). """ from sqlalchemy import text Session = _get_db() async with Session() as session: row = await session.get(EventTypeRow, type_id) if row is None: raise HTTPException(status_code=404, detail=f"Event type {type_id!r} not found") if row.status == "active": return {"type_id": type_id, "status": "active", "message": "already active"} async with session.begin(): await session.execute( text("UPDATE event_types SET status = 'active' WHERE type_id = :tid"), {"tid": type_id}, ) return {"type_id": type_id, "status": "active", "message": "approved"}