Normalize workplan IDs and activate parents on task start

This commit is contained in:
2026-05-23 16:31:28 +02:00
parent 9f3561a254
commit 90c9f8e7a7
10 changed files with 195 additions and 38 deletions

View File

@@ -10,6 +10,7 @@ from api.models.task import Task, TaskStatus
from api.models.token_event import TokenEvent
from api.models.workstream import Workstream
from api.schemas.task import TaskCreate, TaskRead, TaskUpdate
from api.services.lifecycle import activate_parent_for_task_start, status_value
router = APIRouter(prefix="/tasks", tags=["tasks"])
@@ -49,6 +50,13 @@ async def create_task(
) -> Task:
task = Task(**body.model_dump())
session.add(task)
if status_value(task.status) == "in_progress":
ws = await session.get(Workstream, task.workstream_id)
activate_parent_for_task_start(
previous_task_status="todo",
new_task_status=task.status,
parent_workstream=ws,
)
await session.commit()
await session.refresh(task)
return task
@@ -75,7 +83,7 @@ async def update_task(
if task is None:
raise HTTPException(status_code=404, detail="Task not found")
previous_status = task.status.value
previous_status = status_value(task.status)
# Separate token fields from task fields
token_field_names = {
@@ -92,15 +100,22 @@ async def update_task(
update_data = body.model_dump(exclude_unset=True)
token_data = {k: update_data.pop(k) for k in list(update_data.keys()) if k in token_field_names}
suppress_token_event = bool(token_data.pop("suppress_token_event", False))
status_update = update_data.get("status")
new_status = status_value(status_update) if status_update is not None else None
for field, value in update_data.items():
setattr(task, field, value)
if new_status is not None:
ws = await session.get(Workstream, task.workstream_id)
activate_parent_for_task_start(
previous_task_status=previous_status,
new_task_status=new_status,
parent_workstream=ws,
)
await session.commit()
await session.refresh(task)
# Token event — three-tier logic, only for an intentional transition to done.
status_update = update_data.get("status")
new_status = status_update.value if hasattr(status_update, "value") else status_update
if (
new_status == "done"
and previous_status != "done"

50
api/services/lifecycle.py Normal file
View File

@@ -0,0 +1,50 @@
from __future__ import annotations
from typing import Any
from api.workplan_status import normalize_workstream_status
TASK_STARTED_STATUS = "in_progress"
TASK_NOT_STARTED_STATUS = "todo"
PARENT_ACTIVATION_STATUSES = {"proposed", "ready", "backlog"}
def status_value(status: Any) -> str:
if hasattr(status, "value"):
status = status.value
return str(status or "").strip().lower()
def should_activate_parent_for_task_start(
*,
previous_task_status: Any,
new_task_status: Any,
parent_workstream_status: Any,
) -> bool:
"""Return whether a task start should move its parent to active."""
return (
status_value(previous_task_status) == TASK_NOT_STARTED_STATUS
and status_value(new_task_status) == TASK_STARTED_STATUS
and normalize_workstream_status(parent_workstream_status)
in PARENT_ACTIVATION_STATUSES
)
def activate_parent_for_task_start(
*,
previous_task_status: Any,
new_task_status: Any,
parent_workstream: Any,
) -> bool:
"""Activate a planning-state parent workstream when real task work starts."""
if parent_workstream is None:
return False
if not should_activate_parent_for_task_start(
previous_task_status=previous_task_status,
new_task_status=new_task_status,
parent_workstream_status=getattr(parent_workstream, "status", None),
):
return False
parent_workstream.status = "active"
return True