Files
state-hub/api/services/lifecycle.py

126 lines
3.9 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from api.workplan_status import normalize_workstream_status
TASK_STARTED_STATUS = "in_progress"
TASK_NOT_STARTED_STATUS = "todo"
TASK_ACTIVE_STATUSES = {"in_progress", "blocked"}
PARENT_ACTIVATION_STATUSES = {"proposed", "ready", "backlog"}
@dataclass(frozen=True)
class LifecycleTransitionResult:
entity_type: str
previous_status: str
target_status: str
changed: bool
parent_activated: bool = False
def status_value(status: Any) -> str:
if hasattr(status, "value"):
status = status.value
return str(status or "").strip().lower()
def should_activate_parent_for_task_start(
*,
previous_task_status: Any,
new_task_status: Any,
parent_workstream_status: Any,
) -> bool:
"""Return whether a task start should move its parent to active."""
return (
status_value(previous_task_status) == TASK_NOT_STARTED_STATUS
and status_value(new_task_status) == TASK_STARTED_STATUS
and normalize_workstream_status(parent_workstream_status)
in PARENT_ACTIVATION_STATUSES
)
def has_active_task_status(task_statuses: list[Any] | tuple[Any, ...]) -> bool:
"""Return whether any task status represents currently active work."""
return any(status_value(status) in TASK_ACTIVE_STATUSES for status in task_statuses)
def should_activate_parent_for_active_tasks(
*,
parent_workstream_status: Any,
task_statuses: list[Any] | tuple[Any, ...],
) -> bool:
"""Return whether existing task state implies an active parent workstream."""
return (
normalize_workstream_status(parent_workstream_status)
in PARENT_ACTIVATION_STATUSES
and has_active_task_status(task_statuses)
)
def activate_parent_for_task_start(
*,
previous_task_status: Any,
new_task_status: Any,
parent_workstream: Any,
) -> bool:
"""Activate a planning-state parent workstream when real task work starts."""
if parent_workstream is None:
return False
if not should_activate_parent_for_task_start(
previous_task_status=previous_task_status,
new_task_status=new_task_status,
parent_workstream_status=getattr(parent_workstream, "status", None),
):
return False
parent_workstream.status = "active"
return True
def transition_workstream_status(
workstream: Any,
target_status: Any,
) -> LifecycleTransitionResult:
"""Apply a canonical workstream status transition."""
previous_status = normalize_workstream_status(getattr(workstream, "status", None))
normalised_target = normalize_workstream_status(target_status)
workstream.status = normalised_target
return LifecycleTransitionResult(
entity_type="workstream",
previous_status=previous_status,
target_status=normalised_target,
changed=previous_status != normalised_target,
)
def transition_task_status(
task: Any,
target_status: Any,
*,
parent_workstream: Any = None,
previous_task_status: Any = None,
status_coercer: Any = None,
) -> LifecycleTransitionResult:
"""Apply a task status transition and activate the parent when work starts."""
previous_status = status_value(
getattr(task, "status", None)
if previous_task_status is None
else previous_task_status
)
normalised_target = status_value(target_status)
task.status = status_coercer(normalised_target) if status_coercer else target_status
parent_activated = activate_parent_for_task_start(
previous_task_status=previous_status,
new_task_status=normalised_target,
parent_workstream=parent_workstream,
)
return LifecycleTransitionResult(
entity_type="task",
previous_status=previous_status,
target_status=normalised_target,
changed=previous_status != normalised_target,
parent_activated=parent_activated,
)