from datetime import datetime, timedelta, timezone from fastapi import APIRouter, Depends from fastapi.responses import JSONResponse from sqlalchemy import func, select, text from sqlalchemy.ext.asyncio import AsyncSession from api.database import get_session, engine from api.models.decision import Decision, DecisionStatus, DecisionType from api.models.progress_event import ProgressEvent from api.models.task import Task, TaskPriority, TaskStatus from api.models.topic import Topic, TopicStatus from api.models.workstream import Workstream, WorkstreamStatus from api.models.workstream_dependency import WorkstreamDependency from api.schemas.decision import DecisionRead from api.schemas.progress_event import ProgressEventRead from api.schemas.state import ( DecisionTotals, NextStep, StateSummary, TaskTotals, Totals, TopicTotals, WorkstreamTotals, ) from api.schemas.task import TaskRead from api.schemas.topic import TopicWithWorkstreams from api.schemas.workstream import WorkstreamRead, WorkstreamWithTaskCounts, WorkstreamWithDeps from api.schemas.workstream_dependency import WorkstreamDepStub router = APIRouter(prefix="/state", tags=["state"]) @router.get("/summary", response_model=StateSummary) async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSummary: # Run all queries sequentially on one session. # AsyncSession does not support concurrent operations (no gather on same session). topics_rows = await session.execute( select(Topic).where(Topic.status != TopicStatus.archived).order_by(Topic.created_at) ) topics = list(topics_rows.scalars().all()) blocking_rows = await session.execute( select(Decision) .where(Decision.decision_type == DecisionType.pending) .where(Decision.status.in_([DecisionStatus.open, DecisionStatus.escalated])) .order_by(Decision.deadline.asc().nullslast(), Decision.created_at) ) blocking = list(blocking_rows.scalars().all()) blocked_rows = await session.execute( select(Task).where(Task.status == TaskStatus.blocked).order_by(Task.created_at) ) blocked = list(blocked_rows.scalars().all()) recent_rows = await session.execute( select(ProgressEvent).order_by(ProgressEvent.created_at.desc()).limit(20) ) recent = list(recent_rows.scalars().all()) open_ws_rows = await session.execute( select(Workstream) .where(Workstream.status.in_([WorkstreamStatus.active, WorkstreamStatus.blocked])) .order_by(Workstream.due_date.asc().nullslast(), Workstream.created_at) ) open_ws = list(open_ws_rows.scalars().all()) # Task counts per workstream (used to enrich open_workstreams) task_per_ws: dict = {} for ws_id, tstat, cnt in await session.execute( select(Task.workstream_id, Task.status, func.count()).group_by(Task.workstream_id, Task.status) ): task_per_ws.setdefault(ws_id, {})[tstat] = cnt # Dependency graph for open workstreams open_ws_ids = [w.id for w in open_ws] dep_rows = [] if open_ws_ids: dep_result = await session.execute( select(WorkstreamDependency).where( (WorkstreamDependency.from_workstream_id.in_(open_ws_ids)) | (WorkstreamDependency.to_workstream_id.in_(open_ws_ids)) ) ) dep_rows = list(dep_result.scalars().all()) # Build a slug+title lookup for all workstreams referenced in deps dep_ws_ids = set() for d in dep_rows: dep_ws_ids.add(d.from_workstream_id) dep_ws_ids.add(d.to_workstream_id) ws_lookup: dict = {w.id: w for w in open_ws} extra_ids = dep_ws_ids - set(ws_lookup.keys()) if extra_ids: extra_rows = await session.execute( select(Workstream).where(Workstream.id.in_(extra_ids)) ) for w in extra_rows.scalars(): ws_lookup[w.id] = w # Index: workstream_id → (depends_on stubs, blocks stubs) dep_index: dict = {w.id: {"depends_on": [], "blocks": []} for w in open_ws} for d in dep_rows: from_id, to_id = d.from_workstream_id, d.to_workstream_id if from_id in dep_index and to_id in ws_lookup: dep_index[from_id]["depends_on"].append(WorkstreamDepStub( dep_id=d.id, workstream_id=to_id, workstream_slug=ws_lookup[to_id].slug, workstream_title=ws_lookup[to_id].title, description=d.description, )) if to_id in dep_index and from_id in ws_lookup: dep_index[to_id]["blocks"].append(WorkstreamDepStub( dep_id=d.id, workstream_id=from_id, workstream_slug=ws_lookup[from_id].slug, workstream_title=ws_lookup[from_id].title, description=d.description, )) # Totals — one GROUP BY per table topic_counts = {r[0]: r[1] for r in await session.execute( select(Topic.status, func.count()).group_by(Topic.status) )} ws_counts = {r[0]: r[1] for r in await session.execute( select(Workstream.status, func.count()).group_by(Workstream.status) )} task_counts = {r[0]: r[1] for r in await session.execute( select(Task.status, func.count()).group_by(Task.status) )} dec_counts = {r[0]: r[1] for r in await session.execute( select(Decision.status, func.count()).group_by(Decision.status) )} totals = Totals( topics=TopicTotals( active=topic_counts.get(TopicStatus.active, 0), paused=topic_counts.get(TopicStatus.paused, 0), archived=topic_counts.get(TopicStatus.archived, 0), total=sum(topic_counts.values()), ), workstreams=WorkstreamTotals( active=ws_counts.get(WorkstreamStatus.active, 0), blocked=ws_counts.get(WorkstreamStatus.blocked, 0), completed=ws_counts.get(WorkstreamStatus.completed, 0), archived=ws_counts.get(WorkstreamStatus.archived, 0), total=sum(ws_counts.values()), ), tasks=TaskTotals( todo=task_counts.get(TaskStatus.todo, 0), in_progress=task_counts.get(TaskStatus.in_progress, 0), blocked=task_counts.get(TaskStatus.blocked, 0), done=task_counts.get(TaskStatus.done, 0), cancelled=task_counts.get(TaskStatus.cancelled, 0), total=sum(task_counts.values()), ), decisions=DecisionTotals( open=dec_counts.get(DecisionStatus.open, 0), resolved=dec_counts.get(DecisionStatus.resolved, 0), escalated=dec_counts.get(DecisionStatus.escalated, 0), superseded=dec_counts.get(DecisionStatus.superseded, 0), total=sum(dec_counts.values()), ), ) next_steps = await _derive_next_steps(session) return StateSummary( generated_at=datetime.now(tz=timezone.utc), totals=totals, topics=[TopicWithWorkstreams.model_validate(t) for t in topics], blocking_decisions=[DecisionRead.model_validate(d) for d in blocking], blocked_tasks=[TaskRead.model_validate(t) for t in blocked], recent_progress=[ProgressEventRead.model_validate(e) for e in recent], next_steps=next_steps, open_workstreams=[ WorkstreamWithDeps( **WorkstreamRead.model_validate(w).model_dump(), tasks_total=sum(task_per_ws.get(w.id, {}).values()), tasks_todo=task_per_ws.get(w.id, {}).get(TaskStatus.todo, 0), tasks_in_progress=task_per_ws.get(w.id, {}).get(TaskStatus.in_progress, 0), tasks_blocked=task_per_ws.get(w.id, {}).get(TaskStatus.blocked, 0), tasks_done=task_per_ws.get(w.id, {}).get(TaskStatus.done, 0), depends_on=dep_index.get(w.id, {}).get("depends_on", []), blocks=dep_index.get(w.id, {}).get("blocks", []), ) for w in open_ws ], ) _PRIORITY_RANK = { TaskPriority.critical: 0, TaskPriority.high: 1, TaskPriority.medium: 2, TaskPriority.low: 3, } async def _derive_next_steps(session: AsyncSession) -> list[NextStep]: """Derive contextual next-action suggestions from current hub state. Two signal sources: 1. Recently resolved decisions (last 7 days) → first open task in same workstream 2. Workstreams whose every dependency is now completed → first todo task in that workstream """ steps: list[NextStep] = [] seen_task_ids: set = set() # ── Signal 1: recently resolved decisions ──────────────────────────────── cutoff = datetime.now(tz=timezone.utc) - timedelta(days=7) resolved_rows = await session.execute( select(Decision) .where(Decision.status == DecisionStatus.resolved) .where(Decision.decided_at >= cutoff) .where(Decision.workstream_id.isnot(None)) .order_by(Decision.decided_at.desc()) ) for decision in resolved_rows.scalars().all(): open_tasks_rows = await session.execute( select(Task) .where(Task.workstream_id == decision.workstream_id) .where(Task.status.in_([TaskStatus.todo, TaskStatus.in_progress])) ) open_tasks = list(open_tasks_rows.scalars().all()) if not open_tasks: continue task = min(open_tasks, key=lambda t: (_PRIORITY_RANK.get(t.priority, 99), t.created_at)) if task.id in seen_task_ids: continue ws = await session.get(Workstream, decision.workstream_id) topic = await session.get(Topic, ws.topic_id) if ws else None steps.append(NextStep( type="resolved_decision", domain=topic.domain if topic else None, workstream_id=ws.id if ws else None, workstream_title=ws.title if ws else None, workstream_slug=ws.slug if ws else None, task_id=task.id, task_title=task.title, message=( f"Decision '{decision.title}' was resolved → " f"'{task.title}' is the next open task in '{ws.title if ws else '?'}'" ), )) seen_task_ids.add(task.id) # ── Signal 2: cleared dependencies ────────────────────────────────────── all_dep_rows = await session.execute(select(WorkstreamDependency)) all_deps = list(all_dep_rows.scalars().all()) # Group from_workstream_id → set of to_workstream_ids dep_map: dict = {} for d in all_deps: dep_map.setdefault(d.from_workstream_id, set()).add(d.to_workstream_id) for from_ws_id, to_ws_ids in dep_map.items(): # All targets must be completed all_done = True for to_id in to_ws_ids: to_ws = await session.get(Workstream, to_id) if to_ws is None or to_ws.status != WorkstreamStatus.completed: all_done = False break if not all_done: continue from_ws = await session.get(Workstream, from_ws_id) if from_ws is None or from_ws.status not in (WorkstreamStatus.active, WorkstreamStatus.blocked): continue todo_rows = await session.execute( select(Task) .where(Task.workstream_id == from_ws_id) .where(Task.status == TaskStatus.todo) ) todo_tasks = list(todo_rows.scalars().all()) if not todo_tasks: continue task = min(todo_tasks, key=lambda t: (_PRIORITY_RANK.get(t.priority, 99), t.created_at)) if task.id in seen_task_ids: continue topic = await session.get(Topic, from_ws.topic_id) blocker_slugs = ", ".join( (await session.get(Workstream, tid)).slug for tid in to_ws_ids if await session.get(Workstream, tid) ) steps.append(NextStep( type="dependency_cleared", domain=topic.domain if topic else None, workstream_id=from_ws.id, workstream_title=from_ws.title, workstream_slug=from_ws.slug, task_id=task.id, task_title=task.title, message=( f"All dependencies of '{from_ws.title}' are completed ({blocker_slugs}) → " f"'{task.title}' is ready to start" ), )) seen_task_ids.add(task.id) return steps @router.get("/next_steps", response_model=list[NextStep]) async def get_next_steps(session: AsyncSession = Depends(get_session)) -> list[NextStep]: """Derive contextual next-action suggestions from current hub state. Returns suggestions based on: - Recently resolved decisions → first open task in the same workstream - Workstreams whose every dependency workstream is now completed → first todo task """ return await _derive_next_steps(session) @router.get("/health") async def health_check() -> dict: try: async with engine.connect() as conn: await conn.execute(text("SELECT 1")) return {"status": "ok", "db": "connected"} except Exception as exc: return JSONResponse( status_code=503, content={"status": "error", "db": str(exc)}, )