from __future__ import annotations from dataclasses import dataclass from typing import Any from api.task_status import ACTIVE_TASK_STATUSES, normalize_task_status, status_value from api.workplan_status import normalize_workstream_status TASK_STARTED_STATUS = "progress" TASK_NOT_STARTED_STATUS = "todo" TASK_ACTIVE_STATUSES = ACTIVE_TASK_STATUSES PARENT_ACTIVATION_STATUSES = {"proposed", "ready", "backlog"} @dataclass(frozen=True) class LifecycleTransitionResult: entity_type: str previous_status: str target_status: str changed: bool parent_activated: bool = False def should_activate_parent_for_task_start( *, previous_task_status: Any, new_task_status: Any, parent_workstream_status: Any, ) -> bool: """Return whether a task start should move its parent to active.""" return ( status_value(previous_task_status) == TASK_NOT_STARTED_STATUS and status_value(new_task_status) == TASK_STARTED_STATUS and normalize_workstream_status(parent_workstream_status) in PARENT_ACTIVATION_STATUSES ) def has_active_task_status(task_statuses: list[Any] | tuple[Any, ...]) -> bool: """Return whether any task status represents currently active work.""" return any(status_value(status) in TASK_ACTIVE_STATUSES for status in task_statuses) def should_activate_parent_for_active_tasks( *, parent_workstream_status: Any, task_statuses: list[Any] | tuple[Any, ...], ) -> bool: """Return whether existing task state implies an active parent workstream.""" return ( normalize_workstream_status(parent_workstream_status) in PARENT_ACTIVATION_STATUSES and has_active_task_status(task_statuses) ) def activate_parent_for_task_start( *, previous_task_status: Any, new_task_status: Any, parent_workstream: Any, ) -> bool: """Activate a planning-state parent workstream when real task work starts.""" if parent_workstream is None: return False if not should_activate_parent_for_task_start( previous_task_status=previous_task_status, new_task_status=new_task_status, parent_workstream_status=getattr(parent_workstream, "status", None), ): return False parent_workstream.status = "active" return True def transition_workstream_status( workstream: Any, target_status: Any, ) -> LifecycleTransitionResult: """Apply a canonical workstream status transition.""" previous_status = normalize_workstream_status(getattr(workstream, "status", None)) normalised_target = normalize_workstream_status(target_status) workstream.status = normalised_target return LifecycleTransitionResult( entity_type="workstream", previous_status=previous_status, target_status=normalised_target, changed=previous_status != normalised_target, ) def transition_task_status( task: Any, target_status: Any, *, parent_workstream: Any = None, previous_task_status: Any = None, status_coercer: Any = None, ) -> LifecycleTransitionResult: """Apply a task status transition and activate the parent when work starts.""" previous_status = status_value( getattr(task, "status", None) if previous_task_status is None else previous_task_status ) normalised_target = normalize_task_status(target_status) task.status = status_coercer(normalised_target) if status_coercer else normalised_target parent_activated = activate_parent_for_task_start( previous_task_status=previous_status, new_task_status=normalised_target, parent_workstream=parent_workstream, ) return LifecycleTransitionResult( entity_type="task", previous_status=previous_status, target_status=normalised_target, changed=previous_status != normalised_target, parent_activated=parent_activated, )