Files
state-hub/api/routers/tasks.py
tegwick 2d0ce8f943 feat(api): CUST-WP-0018 — API hardening & code quality
T01: Fix datetime.utcnow() → datetime.now(tz=timezone.utc) in MCP server
T02: Wrap _get/_post/_patch/_delete with try/except; return error dicts
T03: Log warnings when write_log skips missing project path
T04: Add priority + due_date_before filters to GET /tasks/
T05: Add owner + slug filters to GET /workstreams/
T06: Add offset param to GET /progress/ for proper pagination
T07: Low-severity bundle:
  - CORS origins from CORS_ORIGINS env var (TD-017)
  - seed.py upsert domains+topics on re-run (TD-011)
  - normalise filter bar CSS → filter-text-input everywhere (TD-016)
  - add 30.5 avg-days-per-month comment in decisions.md (TD-019)
  - TD-009, TD-018 already resolved by existing code

Closes CUST-WP-0018.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-18 02:17:04 +01:00

94 lines
2.8 KiB
Python

import uuid
from datetime import date
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from api.database import get_session
from api.models.task import Task, TaskStatus
from api.schemas.task import TaskCreate, TaskRead, TaskUpdate
router = APIRouter(prefix="/tasks", tags=["tasks"])
@router.get("/", response_model=list[TaskRead])
async def list_tasks(
workstream_id: uuid.UUID | None = None,
status: TaskStatus | None = None,
assignee: str | None = None,
needs_human: bool | None = Query(None),
priority: str | None = None,
due_date_before: date | None = None,
session: AsyncSession = Depends(get_session),
) -> list[Task]:
q = select(Task)
if workstream_id:
q = q.where(Task.workstream_id == workstream_id)
if status:
q = q.where(Task.status == status)
if assignee:
q = q.where(Task.assignee == assignee)
if needs_human is not None:
q = q.where(Task.needs_human == needs_human)
if priority:
q = q.where(Task.priority == priority)
if due_date_before is not None:
q = q.where(Task.due_date <= due_date_before)
q = q.order_by(Task.created_at)
result = await session.execute(q)
return list(result.scalars().all())
@router.post("/", response_model=TaskRead, status_code=status.HTTP_201_CREATED)
async def create_task(
body: TaskCreate,
session: AsyncSession = Depends(get_session),
) -> Task:
task = Task(**body.model_dump())
session.add(task)
await session.commit()
await session.refresh(task)
return task
@router.get("/{task_id}", response_model=TaskRead)
async def get_task(
task_id: uuid.UUID,
session: AsyncSession = Depends(get_session),
) -> Task:
task = await session.get(Task, task_id)
if task is None:
raise HTTPException(status_code=404, detail="Task not found")
return task
@router.patch("/{task_id}", response_model=TaskRead)
async def update_task(
task_id: uuid.UUID,
body: TaskUpdate,
session: AsyncSession = Depends(get_session),
) -> Task:
task = await session.get(Task, task_id)
if task is None:
raise HTTPException(status_code=404, detail="Task not found")
for field, value in body.model_dump(exclude_unset=True).items():
setattr(task, field, value)
await session.commit()
await session.refresh(task)
return task
@router.delete("/{task_id}", response_model=TaskRead)
async def cancel_task(
task_id: uuid.UUID,
session: AsyncSession = Depends(get_session),
) -> Task:
task = await session.get(Task, task_id)
if task is None:
raise HTTPException(status_code=404, detail="Task not found")
task.status = TaskStatus.cancelled
await session.commit()
await session.refresh(task)
return task