Files
activity-core/src/activity_core/api.py
tegwick 2a8e6cfe7f feat(WP-0004): railiance deployment & service ops
- Dockerfile (multi-stage, uv-based, slim runtime)
- .dockerignore
- docker-compose.railiance.yml (Temporal + NATS + PG, no Elasticsearch)
- GET /health endpoint (db + temporal probes, 200/503)
- .env.example (complete env var reference)
- Makefile: migrate, sync-all, dev-up/down, railiance-up/down,
  start-worker, start-api, start-event-router, help targets;
  extracted sync-event-types Python to scripts/sync_event_types.py
- SIGTERM graceful shutdown in worker.py and event_router.py
- docs/runbook.md: Railiance deployment section

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 00:04:39 +02:00

326 lines
12 KiB
Python

"""FastAPI REST API for activity-core.
T30: CRUD for ActivityDefinition + manual one-shot trigger.
Endpoints:
GET /activity-definitions/ — list all
GET /activity-definitions/{id} — get one
POST /activity-definitions/ — create
PUT /activity-definitions/{id} — update
DELETE /activity-definitions/{id} — delete
POST /activity-definitions/{id}/trigger — manual one-shot run
Schedule lifecycle:
- POST/PUT with trigger_type='cron' upserts a Temporal Schedule.
- DELETE removes the Temporal Schedule if present.
- /trigger starts RunActivityWorkflow directly (works for any trigger_type).
Run with:
TEMPORAL_HOST=localhost:7233 \
ACTCORE_DB_URL=postgresql+asyncpg://actcore:actcore@localhost:5433/actcore \
uv run uvicorn activity_core.api:app --port 8010
"""
from __future__ import annotations
import os
import uuid
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from typing import Any
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from temporalio.client import Client
from activity_core.models import ActivityDefinition, CronTriggerConfig
from activity_core.orm import ActivityDefinition as ActivityDefinitionRow, EventType as EventTypeRow
from activity_core.schedule_manager import delete_schedule, upsert_schedule
from activity_core.webhook_receiver import router as webhook_router
TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233")
TEMPORAL_NAMESPACE = os.environ.get("TEMPORAL_NAMESPACE", "default")
_ORCHESTRATOR_TASK_QUEUE = "orchestrator-tq"
# T42: Curator gate — controls which event type statuses are accepted by the router.
# "disabled" (default): accepts "active" and "pending" types (pending logged as warning).
# "required": only "active" types accepted; "pending" events discarded.
ACTIVITY_CURATOR_GATE = os.environ.get("ACTIVITY_CURATOR_GATE", "disabled")
# --- App state ---------------------------------------------------------------
_session_factory: async_sessionmaker[AsyncSession] | None = None
_temporal_client: Client | None = None
@asynccontextmanager
async def lifespan(app: FastAPI): # type: ignore[type-arg]
global _session_factory, _temporal_client
db_url = os.environ.get("ACTCORE_DB_URL")
if not db_url:
raise RuntimeError("ACTCORE_DB_URL is required")
engine = create_async_engine(db_url)
_session_factory = async_sessionmaker(engine, expire_on_commit=False)
_temporal_client = await Client.connect(TEMPORAL_HOST, namespace=TEMPORAL_NAMESPACE)
yield
await engine.dispose()
app = FastAPI(title="activity-core API", lifespan=lifespan)
app.include_router(webhook_router)
def _get_db() -> async_sessionmaker[AsyncSession]:
assert _session_factory is not None
return _session_factory
def _get_temporal() -> Client:
assert _temporal_client is not None
return _temporal_client
# --- Schemas -----------------------------------------------------------------
class ActivityDefinitionCreate(BaseModel):
name: str
enabled: bool = True
trigger_config: dict[str, Any]
context_sources: list[dict[str, Any]] = []
task_templates: list[dict[str, Any]] = []
dedupe_key_strategy: str = "skip"
version: int = 1
class ActivityDefinitionUpdate(BaseModel):
name: str | None = None
enabled: bool | None = None
trigger_config: dict[str, Any] | None = None
context_sources: list[dict[str, Any]] | None = None
task_templates: list[dict[str, Any]] | None = None
dedupe_key_strategy: str | None = None
version: int | None = None
class ActivityDefinitionResponse(BaseModel):
id: uuid.UUID
name: str
enabled: bool
trigger_type: str
trigger_config: dict[str, Any]
context_sources: list[dict[str, Any]]
task_templates: list[dict[str, Any]]
dedupe_key_strategy: str
version: int
created_at: datetime
updated_at: datetime
def _row_to_response(row: ActivityDefinitionRow) -> ActivityDefinitionResponse:
return ActivityDefinitionResponse(
id=row.id,
name=row.name,
enabled=row.enabled,
trigger_type=row.trigger_type,
trigger_config=row.trigger_config,
context_sources=row.context_sources,
task_templates=row.task_templates,
dedupe_key_strategy=row.dedupe_key_strategy,
version=row.version,
created_at=row.created_at,
updated_at=row.updated_at,
)
async def _upsert_schedule_if_cron(row: ActivityDefinitionRow) -> None:
"""Upsert a Temporal Schedule for the row if it uses a cron trigger."""
try:
defn = ActivityDefinition.model_validate(
{
"id": row.id,
"name": row.name,
"enabled": row.enabled,
"trigger_config": row.trigger_config,
"context_sources": row.context_sources,
"task_templates": row.task_templates,
"dedupe_key_strategy": row.dedupe_key_strategy,
"version": row.version,
}
)
if isinstance(defn.trigger_config, CronTriggerConfig):
await upsert_schedule(_get_temporal(), defn)
except Exception:
pass # Schedule management is best-effort; don't fail the API call.
# --- Routes ------------------------------------------------------------------
@app.get("/activity-definitions/", response_model=list[ActivityDefinitionResponse])
async def list_definitions() -> list[ActivityDefinitionResponse]:
"""List all ActivityDefinitions."""
Session = _get_db()
async with Session() as session:
rows = (await session.scalars(select(ActivityDefinitionRow))).all()
return [_row_to_response(r) for r in rows]
@app.get("/activity-definitions/{definition_id}", response_model=ActivityDefinitionResponse)
async def get_definition(definition_id: uuid.UUID) -> ActivityDefinitionResponse:
"""Get one ActivityDefinition by ID."""
Session = _get_db()
async with Session() as session:
row = await session.get(ActivityDefinitionRow, definition_id)
if row is None:
raise HTTPException(status_code=404, detail="ActivityDefinition not found")
return _row_to_response(row)
@app.post("/activity-definitions/", response_model=ActivityDefinitionResponse, status_code=201)
async def create_definition(body: ActivityDefinitionCreate) -> ActivityDefinitionResponse:
"""Create a new ActivityDefinition. Upserts a Temporal Schedule if trigger_type='cron'."""
trigger_type = body.trigger_config.get("trigger_type", "")
row = ActivityDefinitionRow(
id=uuid.uuid4(),
name=body.name,
enabled=body.enabled,
trigger_type=trigger_type,
trigger_config=body.trigger_config,
context_sources=body.context_sources,
task_templates=body.task_templates,
dedupe_key_strategy=body.dedupe_key_strategy,
version=body.version,
)
Session = _get_db()
async with Session() as session:
async with session.begin():
session.add(row)
await _upsert_schedule_if_cron(row)
return _row_to_response(row)
@app.put("/activity-definitions/{definition_id}", response_model=ActivityDefinitionResponse)
async def update_definition(
definition_id: uuid.UUID, body: ActivityDefinitionUpdate
) -> ActivityDefinitionResponse:
"""Update an ActivityDefinition. Re-upserts the Temporal Schedule if trigger_type='cron'."""
Session = _get_db()
async with Session() as session:
row = await session.get(ActivityDefinitionRow, definition_id)
if row is None:
raise HTTPException(status_code=404, detail="ActivityDefinition not found")
if body.name is not None:
row.name = body.name
if body.enabled is not None:
row.enabled = body.enabled
if body.trigger_config is not None:
row.trigger_config = body.trigger_config
row.trigger_type = body.trigger_config.get("trigger_type", row.trigger_type)
if body.context_sources is not None:
row.context_sources = body.context_sources
if body.task_templates is not None:
row.task_templates = body.task_templates
if body.dedupe_key_strategy is not None:
row.dedupe_key_strategy = body.dedupe_key_strategy
if body.version is not None:
row.version = body.version
async with session.begin():
session.add(row)
await _upsert_schedule_if_cron(row)
return _row_to_response(row)
@app.delete("/activity-definitions/{definition_id}", status_code=204)
async def delete_definition(definition_id: uuid.UUID) -> None:
"""Delete an ActivityDefinition and its Temporal Schedule if present."""
Session = _get_db()
async with Session() as session:
row = await session.get(ActivityDefinitionRow, definition_id)
if row is None:
raise HTTPException(status_code=404, detail="ActivityDefinition not found")
async with session.begin():
await session.delete(row)
await delete_schedule(_get_temporal(), definition_id)
@app.post("/activity-definitions/{definition_id}/trigger", status_code=202)
async def trigger_definition(definition_id: uuid.UUID) -> dict[str, str]:
"""Manually trigger a one-shot RunActivityWorkflow for any ActivityDefinition."""
Session = _get_db()
async with Session() as session:
row = await session.get(ActivityDefinitionRow, definition_id)
if row is None:
raise HTTPException(status_code=404, detail="ActivityDefinition not found")
trigger_key = f"manual-{uuid.uuid4()}"
workflow_id = f"activity-{definition_id}:{trigger_key}"
handle = await _get_temporal().start_workflow(
"RunActivityWorkflow",
args=[str(definition_id), trigger_key, datetime.now(tz=timezone.utc).isoformat()],
id=workflow_id,
task_queue=_ORCHESTRATOR_TASK_QUEUE,
)
return {"workflow_id": handle.id, "trigger_key": trigger_key}
# T42: Curator gate — event type approval endpoint
@app.get("/health")
async def health() -> JSONResponse:
db_ok = False
temporal_ok = False
try:
async with _get_db()() as session:
await session.execute(text("SELECT 1"))
db_ok = True
except Exception:
pass
try:
await _get_temporal().describe_namespace(TEMPORAL_NAMESPACE)
temporal_ok = True
except Exception:
pass
status = "ok" if db_ok and temporal_ok else "degraded"
code = 200 if status == "ok" else 503
return JSONResponse(
{"status": status, "db": db_ok, "temporal": temporal_ok},
status_code=code,
)
@app.post("/event-types/{type_id}/approve", status_code=200)
async def approve_event_type(type_id: str) -> dict[str, str]:
"""Approve a pending event type, setting its status to 'active'.
Only relevant when ACTIVITY_CURATOR_GATE=required. Requires admin access
(same auth as the rest of the API).
"""
from sqlalchemy import text
Session = _get_db()
async with Session() as session:
row = await session.get(EventTypeRow, type_id)
if row is None:
raise HTTPException(status_code=404, detail=f"Event type {type_id!r} not found")
if row.status == "active":
return {"type_id": type_id, "status": "active", "message": "already active"}
async with session.begin():
await session.execute(
text("UPDATE event_types SET status = 'active' WHERE type_id = :tid"),
{"tid": type_id},
)
return {"type_id": type_id, "status": "active", "message": "approved"}