Files
activity-core/src/activity_core/api.py
tegwick ea5fbe0bf3 feat(WP-0002): complete Triggers & Ops workstream
Delivers all 12 tasks (T22–T33): Temporal Schedule manager + startup
sync, NATS JetStream event router, FastAPI CRUD + manual trigger,
Prometheus metrics wiring, custom search-attribute tagging, and
operational runbook. Marks workplan status as done.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-28 01:04:43 +01:00

267 lines
9.5 KiB
Python

"""FastAPI REST API for activity-core.
T30: CRUD for ActivityDefinition + manual one-shot trigger.
Endpoints:
GET /activity-definitions/ — list all
GET /activity-definitions/{id} — get one
POST /activity-definitions/ — create
PUT /activity-definitions/{id} — update
DELETE /activity-definitions/{id} — delete
POST /activity-definitions/{id}/trigger — manual one-shot run
Schedule lifecycle:
- POST/PUT with trigger_type='cron' upserts a Temporal Schedule.
- DELETE removes the Temporal Schedule if present.
- /trigger starts RunActivityWorkflow directly (works for any trigger_type).
Run with:
TEMPORAL_HOST=localhost:7233 \
ACTCORE_DB_URL=postgresql+asyncpg://actcore:actcore@localhost:5433/actcore \
uv run uvicorn activity_core.api:app --port 8010
"""
from __future__ import annotations
import os
import uuid
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from typing import Any
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from temporalio.client import Client
from activity_core.models import ActivityDefinition, CronTriggerConfig
from activity_core.orm import ActivityDefinition as ActivityDefinitionRow
from activity_core.schedule_manager import delete_schedule, upsert_schedule
TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233")
TEMPORAL_NAMESPACE = os.environ.get("TEMPORAL_NAMESPACE", "default")
_ORCHESTRATOR_TASK_QUEUE = "orchestrator-tq"
# --- App state ---------------------------------------------------------------
_session_factory: async_sessionmaker[AsyncSession] | None = None
_temporal_client: Client | None = None
@asynccontextmanager
async def lifespan(app: FastAPI): # type: ignore[type-arg]
global _session_factory, _temporal_client
db_url = os.environ.get("ACTCORE_DB_URL")
if not db_url:
raise RuntimeError("ACTCORE_DB_URL is required")
engine = create_async_engine(db_url)
_session_factory = async_sessionmaker(engine, expire_on_commit=False)
_temporal_client = await Client.connect(TEMPORAL_HOST, namespace=TEMPORAL_NAMESPACE)
yield
await engine.dispose()
app = FastAPI(title="activity-core API", lifespan=lifespan)
def _get_db() -> async_sessionmaker[AsyncSession]:
assert _session_factory is not None
return _session_factory
def _get_temporal() -> Client:
assert _temporal_client is not None
return _temporal_client
# --- Schemas -----------------------------------------------------------------
class ActivityDefinitionCreate(BaseModel):
name: str
enabled: bool = True
trigger_config: dict[str, Any]
context_sources: list[dict[str, Any]] = []
task_templates: list[dict[str, Any]] = []
dedupe_key_strategy: str = "skip"
version: int = 1
class ActivityDefinitionUpdate(BaseModel):
name: str | None = None
enabled: bool | None = None
trigger_config: dict[str, Any] | None = None
context_sources: list[dict[str, Any]] | None = None
task_templates: list[dict[str, Any]] | None = None
dedupe_key_strategy: str | None = None
version: int | None = None
class ActivityDefinitionResponse(BaseModel):
id: uuid.UUID
name: str
enabled: bool
trigger_type: str
trigger_config: dict[str, Any]
context_sources: list[dict[str, Any]]
task_templates: list[dict[str, Any]]
dedupe_key_strategy: str
version: int
created_at: datetime
updated_at: datetime
def _row_to_response(row: ActivityDefinitionRow) -> ActivityDefinitionResponse:
return ActivityDefinitionResponse(
id=row.id,
name=row.name,
enabled=row.enabled,
trigger_type=row.trigger_type,
trigger_config=row.trigger_config,
context_sources=row.context_sources,
task_templates=row.task_templates,
dedupe_key_strategy=row.dedupe_key_strategy,
version=row.version,
created_at=row.created_at,
updated_at=row.updated_at,
)
async def _upsert_schedule_if_cron(row: ActivityDefinitionRow) -> None:
"""Upsert a Temporal Schedule for the row if it uses a cron trigger."""
try:
defn = ActivityDefinition.model_validate(
{
"id": row.id,
"name": row.name,
"enabled": row.enabled,
"trigger_config": row.trigger_config,
"context_sources": row.context_sources,
"task_templates": row.task_templates,
"dedupe_key_strategy": row.dedupe_key_strategy,
"version": row.version,
}
)
if isinstance(defn.trigger_config, CronTriggerConfig):
await upsert_schedule(_get_temporal(), defn)
except Exception:
pass # Schedule management is best-effort; don't fail the API call.
# --- Routes ------------------------------------------------------------------
@app.get("/activity-definitions/", response_model=list[ActivityDefinitionResponse])
async def list_definitions() -> list[ActivityDefinitionResponse]:
"""List all ActivityDefinitions."""
Session = _get_db()
async with Session() as session:
rows = (await session.scalars(select(ActivityDefinitionRow))).all()
return [_row_to_response(r) for r in rows]
@app.get("/activity-definitions/{definition_id}", response_model=ActivityDefinitionResponse)
async def get_definition(definition_id: uuid.UUID) -> ActivityDefinitionResponse:
"""Get one ActivityDefinition by ID."""
Session = _get_db()
async with Session() as session:
row = await session.get(ActivityDefinitionRow, definition_id)
if row is None:
raise HTTPException(status_code=404, detail="ActivityDefinition not found")
return _row_to_response(row)
@app.post("/activity-definitions/", response_model=ActivityDefinitionResponse, status_code=201)
async def create_definition(body: ActivityDefinitionCreate) -> ActivityDefinitionResponse:
"""Create a new ActivityDefinition. Upserts a Temporal Schedule if trigger_type='cron'."""
trigger_type = body.trigger_config.get("trigger_type", "")
row = ActivityDefinitionRow(
id=uuid.uuid4(),
name=body.name,
enabled=body.enabled,
trigger_type=trigger_type,
trigger_config=body.trigger_config,
context_sources=body.context_sources,
task_templates=body.task_templates,
dedupe_key_strategy=body.dedupe_key_strategy,
version=body.version,
)
Session = _get_db()
async with Session() as session:
async with session.begin():
session.add(row)
await _upsert_schedule_if_cron(row)
return _row_to_response(row)
@app.put("/activity-definitions/{definition_id}", response_model=ActivityDefinitionResponse)
async def update_definition(
definition_id: uuid.UUID, body: ActivityDefinitionUpdate
) -> ActivityDefinitionResponse:
"""Update an ActivityDefinition. Re-upserts the Temporal Schedule if trigger_type='cron'."""
Session = _get_db()
async with Session() as session:
row = await session.get(ActivityDefinitionRow, definition_id)
if row is None:
raise HTTPException(status_code=404, detail="ActivityDefinition not found")
if body.name is not None:
row.name = body.name
if body.enabled is not None:
row.enabled = body.enabled
if body.trigger_config is not None:
row.trigger_config = body.trigger_config
row.trigger_type = body.trigger_config.get("trigger_type", row.trigger_type)
if body.context_sources is not None:
row.context_sources = body.context_sources
if body.task_templates is not None:
row.task_templates = body.task_templates
if body.dedupe_key_strategy is not None:
row.dedupe_key_strategy = body.dedupe_key_strategy
if body.version is not None:
row.version = body.version
async with session.begin():
session.add(row)
await _upsert_schedule_if_cron(row)
return _row_to_response(row)
@app.delete("/activity-definitions/{definition_id}", status_code=204)
async def delete_definition(definition_id: uuid.UUID) -> None:
"""Delete an ActivityDefinition and its Temporal Schedule if present."""
Session = _get_db()
async with Session() as session:
row = await session.get(ActivityDefinitionRow, definition_id)
if row is None:
raise HTTPException(status_code=404, detail="ActivityDefinition not found")
async with session.begin():
await session.delete(row)
await delete_schedule(_get_temporal(), definition_id)
@app.post("/activity-definitions/{definition_id}/trigger", status_code=202)
async def trigger_definition(definition_id: uuid.UUID) -> dict[str, str]:
"""Manually trigger a one-shot RunActivityWorkflow for any ActivityDefinition."""
Session = _get_db()
async with Session() as session:
row = await session.get(ActivityDefinitionRow, definition_id)
if row is None:
raise HTTPException(status_code=404, detail="ActivityDefinition not found")
trigger_key = f"manual-{uuid.uuid4()}"
workflow_id = f"activity-{definition_id}:{trigger_key}"
handle = await _get_temporal().start_workflow(
"RunActivityWorkflow",
args=[str(definition_id), trigger_key, datetime.now(tz=timezone.utc).isoformat()],
id=workflow_id,
task_queue=_ORCHESTRATOR_TASK_QUEUE,
)
return {"workflow_id": handle.id, "trigger_key": trigger_key}