import uuid from fastapi import APIRouter, Depends, HTTPException from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from api.database import get_session from api.models.agent_message import AgentMessage from api.models.managed_repo import ManagedRepo from api.models.task import Task from api.models.task import TaskStatus from api.models.workstream import Workstream from api.schemas.reconciliation import StateChangeRequest, StateChangeResponse from api.services.lifecycle import ( should_activate_parent_for_task_start, status_value, transition_task_status, transition_workstream_status, ) from api.task_status import TERMINAL_TASK_STATUSES from api.services.reconciliation import ( ReconciliationClass, StateChangeClassification, classify_task_status_change, classify_workstream_status_change, ) from api.services.workplan_files import ( find_workplan_for_workstream, patch_task_status, patch_workplan_status, resolve_repo_path, task_block_status, task_block_linked, workplan_status, ) from api.workplan_status import normalize_workstream_status router = APIRouter(prefix="/reconciliation", tags=["reconciliation"]) def _bool_or_default(value: bool | None, default: bool) -> bool: return default if value is None else value def _conflict(reason: str, follow_up: str) -> StateChangeClassification: return StateChangeClassification( ReconciliationClass.DEFERRED, reason, follow_up, ) async def _workstream_tasks_terminal(session: AsyncSession, workstream_id: uuid.UUID) -> bool: result = await session.execute(select(Task.status).where(Task.workstream_id == workstream_id)) statuses = [status_value(row[0]) for row in result.all()] return bool(statuses) and all(status in TERMINAL_TASK_STATUSES for status in statuses) def _deferred_message( *, body: StateChangeRequest, current_status: str, target_status: str, classification: ReconciliationClass, reason: str, follow_up: str, workplan_path: str | None, conflict: bool = False, ) -> AgentMessage: subject = f"Reconcile {body.target_type} state change: {current_status} -> {target_status}" lines = [ "A UI-originated state change could not be written through immediately.", "", f"target_type: {body.target_type}", f"target_id: {body.target_id}", f"actor: {body.actor}", f"intent: {body.intent or ''}", f"expected_current_status: {body.expected_current_status or ''}", f"current_status: {current_status}", f"target_status: {target_status}", f"reconciliation_class: {classification.value}", f"conflict: {str(conflict).lower()}", f"reason: {reason}", f"follow_up: {follow_up}", f"workplan_path: {workplan_path or ''}", ] return AgentMessage( from_agent=body.actor, to_agent="state-hub", subject=subject, body="\n".join(lines), ) @router.post("/state-change", response_model=StateChangeResponse) async def classify_state_change( body: StateChangeRequest, session: AsyncSession = Depends(get_session), ) -> StateChangeResponse: if body.target_type == "workstream": ws = await session.get(Workstream, body.target_id) if ws is None: raise HTTPException(status_code=404, detail="Workstream not found") repo = await session.get(ManagedRepo, ws.repo_id) if ws.repo_id else None repo_path = resolve_repo_path(repo) workplan_ref = find_workplan_for_workstream(repo, ws.id) if repo_path else None actual_file_backed = workplan_ref is not None actual_archived_file = bool(workplan_ref and workplan_ref.archived) file_backed = ( actual_file_backed if body.apply else _bool_or_default(body.file_backed, actual_file_backed) ) archived_file = ( actual_archived_file if body.apply else _bool_or_default(body.archived_file, actual_archived_file) ) tasks_terminal = ( body.tasks_terminal if body.tasks_terminal is not None else await _workstream_tasks_terminal(session, ws.id) ) current_status = normalize_workstream_status(ws.status) target_status = normalize_workstream_status(body.target_status) classification = classify_workstream_status_change( current_status=current_status, target_status=target_status, file_backed=file_backed, archived_file=archived_file, tasks_terminal=tasks_terminal, ) write_result = "not_attempted" reconciliation_record_id = None conflict = False if body.apply: expected_status = ( normalize_workstream_status(body.expected_current_status) if body.expected_current_status is not None else None ) if expected_status is not None and expected_status != current_status: classification = _conflict( f"cached workstream status changed from expected {expected_status!r} to {current_status!r}", "refresh the dashboard and retry the state change if it is still intended", ) conflict = True elif repo is not None and repo_path is None: classification = _conflict( "repo host path is unavailable for this State Hub host", "register a host path for this machine or retry from a host with the repo checkout", ) conflict = True elif classification.reconciliation_class == ReconciliationClass.WRITE_THROUGH and workplan_ref: file_status = normalize_workstream_status(workplan_status(workplan_ref.path)) if file_status and file_status != current_status: classification = _conflict( f"workplan file status {file_status!r} differs from cached DB status {current_status!r}", "run consistency repair or refresh State Hub from files before retrying", ) conflict = True else: try: patch_workplan_status(workplan_ref.path, target_status) patched_status = normalize_workstream_status(workplan_status(workplan_ref.path)) except OSError as exc: classification = _conflict( f"workplan file write failed: {exc}", "fix repo file access and retry the reconciliation", ) conflict = True else: if patched_status != target_status: classification = _conflict( f"workplan file status could not be patched to {target_status!r}", "inspect the workplan frontmatter format before retrying", ) conflict = True else: transition_workstream_status(ws, target_status) await session.commit() write_result = "applied" if write_result != "applied": msg = _deferred_message( body=body, current_status=current_status, target_status=target_status, classification=classification.reconciliation_class, reason=classification.reason, follow_up=classification.follow_up, workplan_path=workplan_ref.relative_path if workplan_ref else None, conflict=conflict, ) session.add(msg) await session.commit() await session.refresh(msg) reconciliation_record_id = msg.id write_result = "not_applicable" return StateChangeResponse( target_type=body.target_type, target_id=body.target_id, actor=body.actor, intent=body.intent, current_status=current_status, target_status=target_status, file_backed=file_backed, archived_file=archived_file, tasks_terminal=tasks_terminal, reconciliation_class=classification.reconciliation_class, reason=classification.reason, follow_up=classification.follow_up, write_through_result=write_result, workplan_path=workplan_ref.relative_path if workplan_ref else None, reconciliation_record_id=reconciliation_record_id, conflict=conflict, ) task = await session.get(Task, body.target_id) if task is None: raise HTTPException(status_code=404, detail="Task not found") ws = await session.get(Workstream, task.workstream_id) repo = await session.get(ManagedRepo, ws.repo_id) if ws and ws.repo_id else None repo_path = resolve_repo_path(repo) workplan_ref = find_workplan_for_workstream(repo, ws.id) if ws and repo_path else None actual_file_backed = workplan_ref is not None actual_archived_file = bool(workplan_ref and workplan_ref.archived) file_backed = ( actual_file_backed if body.apply else _bool_or_default(body.file_backed, actual_file_backed) ) archived_file = ( actual_archived_file if body.apply else _bool_or_default(body.archived_file, actual_archived_file) ) actual_task_linked = bool(workplan_ref and task_block_linked(workplan_ref.path, task.id)) task_linked = ( actual_task_linked if body.apply else _bool_or_default(body.task_linked, actual_task_linked) ) current_status = status_value(task.status) target_status = status_value(body.target_status) classification = classify_task_status_change( current_status=current_status, target_status=target_status, file_backed=file_backed, archived_file=archived_file, task_linked=task_linked, blocking_reason=body.blocking_reason, ) write_result = "not_attempted" reconciliation_record_id = None conflict = False if body.apply: expected_status = ( status_value(body.expected_current_status) if body.expected_current_status is not None else None ) if expected_status is not None and expected_status != current_status: classification = _conflict( f"cached task status changed from expected {expected_status!r} to {current_status!r}", "refresh the dashboard and retry the state change if it is still intended", ) conflict = True elif repo is not None and repo_path is None: classification = _conflict( "repo host path is unavailable for this State Hub host", "register a host path for this machine or retry from a host with the repo checkout", ) conflict = True elif ( classification.reconciliation_class == ReconciliationClass.WRITE_THROUGH and workplan_ref and actual_task_linked ): file_status = status_value(task_block_status(workplan_ref.path, task.id)) if file_status and file_status != current_status: classification = _conflict( f"workplan task status {file_status!r} differs from cached DB status {current_status!r}", "run consistency repair or refresh State Hub from files before retrying", ) conflict = True else: original_text = None parent_will_activate = should_activate_parent_for_task_start( previous_task_status=current_status, new_task_status=target_status, parent_workstream_status=ws.status if ws else None, ) try: original_text = workplan_ref.path.read_text(encoding="utf-8") patch_task_status(workplan_ref.path, task.id, target_status) patched_status = status_value(task_block_status(workplan_ref.path, task.id)) if parent_will_activate: patch_workplan_status(workplan_ref.path, "active") parent_status = normalize_workstream_status(workplan_status(workplan_ref.path)) if parent_status != "active": if original_text is not None: workplan_ref.path.write_text(original_text, encoding="utf-8") classification = _conflict( "parent workplan status could not be patched to 'active'", "inspect the workplan frontmatter format before retrying", ) conflict = True except OSError as exc: if original_text is not None: workplan_ref.path.write_text(original_text, encoding="utf-8") classification = _conflict( f"workplan task write failed: {exc}", "fix repo file access and retry the reconciliation", ) conflict = True else: if conflict: pass elif patched_status != target_status: if original_text is not None: workplan_ref.path.write_text(original_text, encoding="utf-8") classification = _conflict( f"workplan task block could not be patched to {target_status!r}", "inspect the task block format before retrying", ) conflict = True else: transition = transition_task_status( task, target_status, parent_workstream=ws, previous_task_status=current_status, status_coercer=TaskStatus, ) if body.blocking_reason is not None: task.blocking_reason = body.blocking_reason await session.commit() write_result = "applied" if write_result != "applied": msg = _deferred_message( body=body, current_status=current_status, target_status=target_status, classification=classification.reconciliation_class, reason=classification.reason, follow_up=classification.follow_up, workplan_path=workplan_ref.relative_path if workplan_ref else None, conflict=conflict, ) session.add(msg) await session.commit() await session.refresh(msg) reconciliation_record_id = msg.id write_result = "not_applicable" return StateChangeResponse( target_type=body.target_type, target_id=body.target_id, actor=body.actor, intent=body.intent, current_status=current_status, target_status=target_status, file_backed=file_backed, archived_file=archived_file, task_linked=task_linked, reconciliation_class=classification.reconciliation_class, reason=classification.reason, follow_up=classification.follow_up, write_through_result=write_result, workplan_path=workplan_ref.relative_path if workplan_ref else None, reconciliation_record_id=reconciliation_record_id, conflict=conflict, )