Files
state-hub/api/routers/reconciliation.py

234 lines
8.9 KiB
Python

import uuid
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from api.database import get_session
from api.models.agent_message import AgentMessage
from api.models.managed_repo import ManagedRepo
from api.models.task import Task
from api.models.task import TaskStatus
from api.models.workstream import Workstream
from api.schemas.reconciliation import StateChangeRequest, StateChangeResponse
from api.services.lifecycle import status_value
from api.services.reconciliation import (
ReconciliationClass,
classify_task_status_change,
classify_workstream_status_change,
)
from api.services.workplan_files import (
find_workplan_for_workstream,
patch_task_status,
patch_workplan_status,
task_block_linked,
)
from api.workplan_status import normalize_workstream_status
router = APIRouter(prefix="/reconciliation", tags=["reconciliation"])
def _bool_or_default(value: bool | None, default: bool) -> bool:
return default if value is None else value
async def _workstream_tasks_terminal(session: AsyncSession, workstream_id: uuid.UUID) -> bool:
result = await session.execute(select(Task.status).where(Task.workstream_id == workstream_id))
statuses = [status_value(row[0]) for row in result.all()]
return bool(statuses) and all(status in {"done", "cancelled"} for status in statuses)
def _deferred_message(
*,
body: StateChangeRequest,
current_status: str,
target_status: str,
classification: ReconciliationClass,
reason: str,
follow_up: str,
workplan_path: str | None,
) -> AgentMessage:
subject = f"Reconcile {body.target_type} state change: {current_status} -> {target_status}"
lines = [
"A UI-originated state change could not be written through immediately.",
"",
f"target_type: {body.target_type}",
f"target_id: {body.target_id}",
f"actor: {body.actor}",
f"intent: {body.intent or ''}",
f"current_status: {current_status}",
f"target_status: {target_status}",
f"reconciliation_class: {classification.value}",
f"reason: {reason}",
f"follow_up: {follow_up}",
f"workplan_path: {workplan_path or ''}",
]
return AgentMessage(
from_agent=body.actor,
to_agent="state-hub",
subject=subject,
body="\n".join(lines),
)
@router.post("/state-change", response_model=StateChangeResponse)
async def classify_state_change(
body: StateChangeRequest,
session: AsyncSession = Depends(get_session),
) -> StateChangeResponse:
if body.target_type == "workstream":
ws = await session.get(Workstream, body.target_id)
if ws is None:
raise HTTPException(status_code=404, detail="Workstream not found")
repo = await session.get(ManagedRepo, ws.repo_id) if ws.repo_id else None
workplan_ref = find_workplan_for_workstream(repo, ws.id)
actual_file_backed = workplan_ref is not None
actual_archived_file = bool(workplan_ref and workplan_ref.archived)
file_backed = (
actual_file_backed
if body.apply
else _bool_or_default(body.file_backed, actual_file_backed)
)
archived_file = (
actual_archived_file
if body.apply
else _bool_or_default(body.archived_file, actual_archived_file)
)
tasks_terminal = (
body.tasks_terminal
if body.tasks_terminal is not None
else await _workstream_tasks_terminal(session, ws.id)
)
current_status = normalize_workstream_status(ws.status)
target_status = normalize_workstream_status(body.target_status)
classification = classify_workstream_status_change(
current_status=current_status,
target_status=target_status,
file_backed=file_backed,
archived_file=archived_file,
tasks_terminal=tasks_terminal,
)
write_result = "not_attempted"
reconciliation_record_id = None
if body.apply:
if classification.reconciliation_class == ReconciliationClass.WRITE_THROUGH and workplan_ref:
patch_workplan_status(workplan_ref.path, target_status)
ws.status = target_status
await session.commit()
write_result = "applied"
else:
msg = _deferred_message(
body=body,
current_status=current_status,
target_status=target_status,
classification=classification.reconciliation_class,
reason=classification.reason,
follow_up=classification.follow_up,
workplan_path=workplan_ref.relative_path if workplan_ref else None,
)
session.add(msg)
await session.commit()
await session.refresh(msg)
reconciliation_record_id = msg.id
write_result = "not_applicable"
return StateChangeResponse(
target_type=body.target_type,
target_id=body.target_id,
actor=body.actor,
intent=body.intent,
current_status=current_status,
target_status=target_status,
file_backed=file_backed,
archived_file=archived_file,
tasks_terminal=tasks_terminal,
reconciliation_class=classification.reconciliation_class,
reason=classification.reason,
follow_up=classification.follow_up,
write_through_result=write_result,
workplan_path=workplan_ref.relative_path if workplan_ref else None,
reconciliation_record_id=reconciliation_record_id,
)
task = await session.get(Task, body.target_id)
if task is None:
raise HTTPException(status_code=404, detail="Task not found")
ws = await session.get(Workstream, task.workstream_id)
repo = await session.get(ManagedRepo, ws.repo_id) if ws and ws.repo_id else None
workplan_ref = find_workplan_for_workstream(repo, ws.id) if ws else None
actual_file_backed = workplan_ref is not None
actual_archived_file = bool(workplan_ref and workplan_ref.archived)
file_backed = (
actual_file_backed
if body.apply
else _bool_or_default(body.file_backed, actual_file_backed)
)
archived_file = (
actual_archived_file
if body.apply
else _bool_or_default(body.archived_file, actual_archived_file)
)
actual_task_linked = bool(workplan_ref and task_block_linked(workplan_ref.path, task.id))
task_linked = (
actual_task_linked
if body.apply
else _bool_or_default(body.task_linked, actual_task_linked)
)
current_status = status_value(task.status)
target_status = status_value(body.target_status)
classification = classify_task_status_change(
current_status=current_status,
target_status=target_status,
file_backed=file_backed,
archived_file=archived_file,
task_linked=task_linked,
blocking_reason=body.blocking_reason,
)
write_result = "not_attempted"
reconciliation_record_id = None
if body.apply:
if (
classification.reconciliation_class == ReconciliationClass.WRITE_THROUGH
and workplan_ref
and actual_task_linked
):
patch_task_status(workplan_ref.path, task.id, target_status)
task.status = TaskStatus(target_status)
if body.blocking_reason is not None:
task.blocking_reason = body.blocking_reason
await session.commit()
write_result = "applied"
else:
msg = _deferred_message(
body=body,
current_status=current_status,
target_status=target_status,
classification=classification.reconciliation_class,
reason=classification.reason,
follow_up=classification.follow_up,
workplan_path=workplan_ref.relative_path if workplan_ref else None,
)
session.add(msg)
await session.commit()
await session.refresh(msg)
reconciliation_record_id = msg.id
write_result = "not_applicable"
return StateChangeResponse(
target_type=body.target_type,
target_id=body.target_id,
actor=body.actor,
intent=body.intent,
current_status=current_status,
target_status=target_status,
file_backed=file_backed,
archived_file=archived_file,
task_linked=task_linked,
reconciliation_class=classification.reconciliation_class,
reason=classification.reason,
follow_up=classification.follow_up,
write_through_result=write_result,
workplan_path=workplan_ref.relative_path if workplan_ref else None,
reconciliation_record_id=reconciliation_record_id,
)