generated from coulomb/repo-seed
- Migration t7o8p9q0r1s2: indexes on tasks.status, tasks(workstream_id,status), workstreams.status, sbom_snapshots(repo_id,snapshot_at) - workplan-index: 30 s TTL cache + ?refresh param (4171 ms → 16 ms on hit) - /state/summary: 15 s TTL cache, bypassed on Cache-Control: no-cache - /topics/: noload(workstreams, decisions, progress_events) (2382 ms → 115 ms) - /domains/: noload(topics, repos, goals) (2252 ms → 39 ms) - /repos/: noload(goals) (2222 ms → 599 ms first / fast on repeat) - conftest: reset TTL caches between tests to prevent bleed-through Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
669 lines
26 KiB
Python
669 lines
26 KiB
Python
import time
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from fastapi import APIRouter, Depends, Request
|
|
from fastapi.responses import JSONResponse
|
|
from sqlalchemy import func, select, text
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import noload, selectinload
|
|
|
|
from api.database import get_session, engine
|
|
from api.flow_defs import assertion_result_to_dict, load_flow
|
|
from api.models.capability_request import CapabilityRequest
|
|
from api.models.contribution import Contribution, ContributionStatus, ContributionType
|
|
from api.models.decision import Decision, DecisionStatus, DecisionType
|
|
from api.models.domain import Domain
|
|
from api.models.extension_point import ExtensionPoint
|
|
from api.models.managed_repo import ManagedRepo
|
|
from api.models.progress_event import ProgressEvent
|
|
from api.models.sbom_entry import SBOMEntry
|
|
from api.models.task import Task, TaskPriority, TaskStatus
|
|
from api.models.technical_debt import TechnicalDebt
|
|
from api.models.topic import Topic, TopicStatus
|
|
from api.models.workstream import Workstream
|
|
from api.models.workstream_dependency import WorkstreamDependency
|
|
from api.schemas.decision import DecisionRead
|
|
from api.schemas.domain import DomainSummary
|
|
from api.schemas.progress_event import ProgressEventRead
|
|
from api.schemas.state import (
|
|
DecisionTotals,
|
|
NextStep,
|
|
StateSummary,
|
|
TaskTotals,
|
|
Totals,
|
|
TopicTotals,
|
|
WorkstreamTotals,
|
|
)
|
|
from api.schemas.task import TaskRead
|
|
from api.schemas.topic import TopicRead, TopicWithWorkstreams
|
|
from api.schemas.workstream import WorkstreamRead, WorkstreamWithTaskCounts, WorkstreamWithDeps
|
|
from api.schemas.workstream_dependency import WorkstreamDepStub
|
|
from task_flow_engine import FlowEngine
|
|
|
|
router = APIRouter(prefix="/state", tags=["state"])
|
|
|
|
_SUMMARY_CACHE: StateSummary | None = None
|
|
_SUMMARY_CACHE_AT: float = 0.0
|
|
_SUMMARY_TTL = 15.0
|
|
|
|
|
|
@router.get("/summary", response_model=StateSummary)
|
|
async def get_summary(
|
|
request: Request,
|
|
session: AsyncSession = Depends(get_session),
|
|
) -> StateSummary:
|
|
global _SUMMARY_CACHE, _SUMMARY_CACHE_AT
|
|
no_cache = "no-cache" in request.headers.get("cache-control", "")
|
|
if not no_cache and _SUMMARY_CACHE is not None and (time.monotonic() - _SUMMARY_CACHE_AT) < _SUMMARY_TTL:
|
|
return _SUMMARY_CACHE
|
|
# Run all queries sequentially on one session.
|
|
# AsyncSession does not support concurrent operations (no gather on same session).
|
|
|
|
topics_rows = await session.execute(
|
|
select(Topic)
|
|
.options(
|
|
selectinload(Topic.domain),
|
|
noload(Topic.workstreams),
|
|
noload(Topic.decisions),
|
|
noload(Topic.progress_events),
|
|
)
|
|
.where(Topic.status != TopicStatus.archived)
|
|
.order_by(Topic.created_at)
|
|
)
|
|
topics = list(topics_rows.scalars().all())
|
|
topic_ids = [t.id for t in topics]
|
|
|
|
topic_workstreams: dict = {t.id: [] for t in topics}
|
|
if topic_ids:
|
|
topic_ws_rows = await session.execute(
|
|
select(
|
|
Workstream.topic_id,
|
|
Workstream.id,
|
|
Workstream.slug,
|
|
Workstream.title,
|
|
Workstream.status,
|
|
Workstream.owner,
|
|
Workstream.due_date,
|
|
)
|
|
.where(Workstream.topic_id.in_(topic_ids))
|
|
.order_by(Workstream.created_at)
|
|
)
|
|
for topic_id, ws_id, slug, title, status, owner, due_date in topic_ws_rows:
|
|
topic_workstreams.setdefault(topic_id, []).append({
|
|
"id": ws_id,
|
|
"slug": slug,
|
|
"title": title,
|
|
"status": status,
|
|
"owner": owner,
|
|
"due_date": due_date,
|
|
})
|
|
|
|
blocking_rows = await session.execute(
|
|
select(Decision)
|
|
.where(Decision.decision_type == DecisionType.pending)
|
|
.where(Decision.status.in_([DecisionStatus.open, DecisionStatus.escalated]))
|
|
.order_by(Decision.deadline.asc().nullslast(), Decision.created_at)
|
|
)
|
|
blocking = list(blocking_rows.scalars().all())
|
|
|
|
blocked_rows = await session.execute(
|
|
select(Task).options(noload("*")).where(Task.status == TaskStatus.blocked).order_by(Task.created_at)
|
|
)
|
|
blocked = list(blocked_rows.scalars().all())
|
|
|
|
recent_rows = await session.execute(
|
|
select(ProgressEvent).options(noload("*")).order_by(ProgressEvent.created_at.desc()).limit(20)
|
|
)
|
|
recent = list(recent_rows.scalars().all())
|
|
|
|
open_ws_rows = await session.execute(
|
|
select(Workstream)
|
|
.options(noload("*"))
|
|
.where(Workstream.status.in_(["active", "blocked"]))
|
|
.order_by(Workstream.due_date.asc().nullslast(), Workstream.created_at)
|
|
)
|
|
open_ws = list(open_ws_rows.scalars().all())
|
|
|
|
# Task counts per workstream (used to enrich open_workstreams)
|
|
task_per_ws: dict = {}
|
|
task_statuses_per_ws: dict = {}
|
|
for ws_id, tstat, cnt in await session.execute(
|
|
select(Task.workstream_id, Task.status, func.count()).group_by(Task.workstream_id, Task.status)
|
|
):
|
|
task_per_ws.setdefault(ws_id, {})[tstat] = cnt
|
|
task_statuses_per_ws.setdefault(ws_id, []).extend([_value(tstat)] * cnt)
|
|
|
|
# Dependency graph for open workstreams
|
|
open_ws_ids = [w.id for w in open_ws]
|
|
dep_rows = []
|
|
if open_ws_ids:
|
|
dep_result = await session.execute(
|
|
select(WorkstreamDependency).where(
|
|
(WorkstreamDependency.from_workstream_id.in_(open_ws_ids))
|
|
| (WorkstreamDependency.to_workstream_id.in_(open_ws_ids))
|
|
)
|
|
)
|
|
dep_rows = list(dep_result.scalars().all())
|
|
|
|
# Build a slug+title lookup for all workstreams referenced in deps
|
|
dep_ws_ids = set()
|
|
dep_task_ids = set()
|
|
for d in dep_rows:
|
|
dep_ws_ids.add(d.from_workstream_id)
|
|
if d.to_workstream_id:
|
|
dep_ws_ids.add(d.to_workstream_id)
|
|
if d.to_task_id:
|
|
dep_task_ids.add(d.to_task_id)
|
|
ws_lookup: dict = {w.id: w for w in open_ws}
|
|
extra_ids = dep_ws_ids - set(ws_lookup.keys())
|
|
if extra_ids:
|
|
extra_rows = await session.execute(
|
|
select(Workstream).options(noload("*")).where(Workstream.id.in_(extra_ids))
|
|
)
|
|
for w in extra_rows.scalars():
|
|
ws_lookup[w.id] = w
|
|
task_lookup: dict = {}
|
|
if dep_task_ids:
|
|
task_rows = await session.execute(select(Task).where(Task.id.in_(dep_task_ids)))
|
|
task_lookup = {t.id: t for t in task_rows.scalars().all()}
|
|
|
|
# Index: workstream_id → (depends_on stubs, blocks stubs)
|
|
dep_index: dict = {w.id: {"depends_on": [], "blocks": []} for w in open_ws}
|
|
for d in dep_rows:
|
|
from_id, to_id, task_id = d.from_workstream_id, d.to_workstream_id, d.to_task_id
|
|
if from_id in dep_index and to_id and to_id in ws_lookup:
|
|
dep_index[from_id]["depends_on"].append(WorkstreamDepStub(
|
|
dep_id=d.id,
|
|
target_type="workstream",
|
|
relationship_type=d.relationship_type,
|
|
workstream_id=to_id,
|
|
workstream_slug=ws_lookup[to_id].slug,
|
|
workstream_title=ws_lookup[to_id].title,
|
|
description=d.description,
|
|
))
|
|
if from_id in dep_index and task_id and task_id in task_lookup:
|
|
dep_index[from_id]["depends_on"].append(WorkstreamDepStub(
|
|
dep_id=d.id,
|
|
target_type="task",
|
|
relationship_type=d.relationship_type,
|
|
task_id=task_id,
|
|
task_title=task_lookup[task_id].title,
|
|
description=d.description,
|
|
))
|
|
if to_id and to_id in dep_index and from_id in ws_lookup:
|
|
dep_index[to_id]["blocks"].append(WorkstreamDepStub(
|
|
dep_id=d.id,
|
|
target_type="workstream",
|
|
relationship_type=d.relationship_type,
|
|
workstream_id=from_id,
|
|
workstream_slug=ws_lookup[from_id].slug,
|
|
workstream_title=ws_lookup[from_id].title,
|
|
description=d.description,
|
|
))
|
|
|
|
workstream_flow = load_flow("workstream")
|
|
flow_engine = FlowEngine()
|
|
effective_status: dict = {}
|
|
blocked_reasons: dict = {}
|
|
for w in open_ws:
|
|
flow_obj = {
|
|
"status": w.status,
|
|
"workstation": w.status,
|
|
"tasks": [{"status": status} for status in task_statuses_per_ws.get(w.id, [])],
|
|
"dependencies": [
|
|
{"workstation": ws_lookup[d.to_workstream_id].status}
|
|
for d in dep_rows
|
|
if d.from_workstream_id == w.id and d.to_workstream_id and d.to_workstream_id in ws_lookup
|
|
],
|
|
}
|
|
flow_result = flow_engine.evaluate(flow_obj, workstream_flow)
|
|
effective_status[w.id] = "blocked" if flow_result.exit_blocked else w.status
|
|
blocked_reasons[w.id] = [
|
|
assertion_result_to_dict(item) for item in flow_result.blocking_assertions
|
|
]
|
|
|
|
# Totals — one GROUP BY per table
|
|
topic_counts = {r[0]: r[1] for r in await session.execute(
|
|
select(Topic.status, func.count()).group_by(Topic.status)
|
|
)}
|
|
ws_counts = {r[0]: r[1] for r in await session.execute(
|
|
select(Workstream.status, func.count()).group_by(Workstream.status)
|
|
)}
|
|
task_counts = {r[0]: r[1] for r in await session.execute(
|
|
select(Task.status, func.count()).group_by(Task.status)
|
|
)}
|
|
dec_counts = {r[0]: r[1] for r in await session.execute(
|
|
select(Decision.status, func.count()).group_by(Decision.status)
|
|
)}
|
|
|
|
totals = Totals(
|
|
topics=TopicTotals(
|
|
active=topic_counts.get(TopicStatus.active, 0),
|
|
paused=topic_counts.get(TopicStatus.paused, 0),
|
|
archived=topic_counts.get(TopicStatus.archived, 0),
|
|
total=sum(topic_counts.values()),
|
|
),
|
|
workstreams=WorkstreamTotals(
|
|
active=sum(1 for status in effective_status.values() if status == "active"),
|
|
blocked=sum(1 for status in effective_status.values() if status == "blocked"),
|
|
completed=ws_counts.get("completed", 0),
|
|
archived=ws_counts.get("archived", 0),
|
|
total=sum(ws_counts.values()),
|
|
),
|
|
tasks=TaskTotals(
|
|
todo=task_counts.get(TaskStatus.todo, 0),
|
|
in_progress=task_counts.get(TaskStatus.in_progress, 0),
|
|
blocked=task_counts.get(TaskStatus.blocked, 0),
|
|
done=task_counts.get(TaskStatus.done, 0),
|
|
cancelled=task_counts.get(TaskStatus.cancelled, 0),
|
|
total=sum(task_counts.values()),
|
|
),
|
|
decisions=DecisionTotals(
|
|
open=dec_counts.get(DecisionStatus.open, 0),
|
|
resolved=dec_counts.get(DecisionStatus.resolved, 0),
|
|
escalated=dec_counts.get(DecisionStatus.escalated, 0),
|
|
superseded=dec_counts.get(DecisionStatus.superseded, 0),
|
|
total=sum(dec_counts.values()),
|
|
),
|
|
)
|
|
|
|
next_steps = await _derive_next_steps(session)
|
|
|
|
# Domain summary stats
|
|
domain_summaries = await _build_domain_summaries(session)
|
|
|
|
# Contribution counts (by type and status)
|
|
contrib_type_counts = {r[0].value: r[1] for r in await session.execute(
|
|
select(Contribution.type, func.count()).group_by(Contribution.type)
|
|
)}
|
|
contrib_status_counts = {r[0].value: r[1] for r in await session.execute(
|
|
select(Contribution.status, func.count()).group_by(Contribution.status)
|
|
)}
|
|
contribution_counts = {**contrib_type_counts, **contrib_status_counts}
|
|
|
|
# Licence risk: copyleft packages in direct prod deps
|
|
_COPYLEFT_PATS = ("GPL", "AGPL", "LGPL", "EUPL", "CDDL", "MPL")
|
|
copyleft_risk_rows = await session.execute(
|
|
select(func.count()).select_from(SBOMEntry)
|
|
.where(SBOMEntry.is_direct.is_(True))
|
|
.where(SBOMEntry.is_dev.is_(False))
|
|
)
|
|
# Filter in Python since ILIKE across multiple patterns is verbose in SQLAlchemy
|
|
all_direct_prod_rows = await session.execute(
|
|
select(SBOMEntry.license_spdx)
|
|
.where(SBOMEntry.is_direct.is_(True))
|
|
.where(SBOMEntry.is_dev.is_(False))
|
|
)
|
|
licence_risk_count = sum(
|
|
1 for (lic,) in all_direct_prod_rows.all()
|
|
if lic and any(pat in lic.upper() for pat in _COPYLEFT_PATS)
|
|
)
|
|
|
|
# Open capability requests (non-terminal statuses)
|
|
open_cap_req_count = (await session.execute(
|
|
select(func.count()).select_from(CapabilityRequest).where(
|
|
CapabilityRequest.status.in_(["requested", "accepted", "in_progress", "ready_for_review"])
|
|
)
|
|
)).scalar() or 0
|
|
|
|
result = StateSummary(
|
|
generated_at=datetime.now(tz=timezone.utc),
|
|
totals=totals,
|
|
topics=[
|
|
TopicWithWorkstreams(
|
|
**TopicRead.model_validate(t).model_dump(),
|
|
workstreams=topic_workstreams.get(t.id, []),
|
|
)
|
|
for t in topics
|
|
],
|
|
blocking_decisions=[DecisionRead.model_validate(d) for d in blocking],
|
|
blocked_tasks=[TaskRead.model_validate(t) for t in blocked],
|
|
recent_progress=[ProgressEventRead.model_validate(e) for e in recent],
|
|
next_steps=next_steps,
|
|
domains=domain_summaries,
|
|
contribution_counts=contribution_counts,
|
|
licence_risk_count=licence_risk_count,
|
|
open_capability_requests=open_cap_req_count,
|
|
open_workstreams=[
|
|
WorkstreamWithDeps(
|
|
**{
|
|
**WorkstreamRead.model_validate(w).model_dump(),
|
|
"status": effective_status.get(w.id, w.status),
|
|
},
|
|
tasks_total=sum(task_per_ws.get(w.id, {}).values()),
|
|
tasks_todo=task_per_ws.get(w.id, {}).get(TaskStatus.todo, 0),
|
|
tasks_in_progress=task_per_ws.get(w.id, {}).get(TaskStatus.in_progress, 0),
|
|
tasks_blocked=task_per_ws.get(w.id, {}).get(TaskStatus.blocked, 0),
|
|
tasks_done=task_per_ws.get(w.id, {}).get(TaskStatus.done, 0),
|
|
depends_on=dep_index.get(w.id, {}).get("depends_on", []),
|
|
blocks=dep_index.get(w.id, {}).get("blocks", []),
|
|
blocked_reasons=blocked_reasons.get(w.id, []),
|
|
)
|
|
for w in open_ws
|
|
],
|
|
)
|
|
_SUMMARY_CACHE = result
|
|
_SUMMARY_CACHE_AT = time.monotonic()
|
|
return result
|
|
|
|
|
|
async def _build_domain_summaries(session: AsyncSession) -> list[DomainSummary]:
|
|
"""Compute per-domain stats for the state summary."""
|
|
domains_rows = await session.execute(
|
|
select(Domain).options(noload("*")).where(Domain.status == "active").order_by(Domain.name)
|
|
)
|
|
domains = list(domains_rows.scalars().all())
|
|
|
|
# Repo counts per domain
|
|
repo_counts = {r[0]: r[1] for r in await session.execute(
|
|
select(ManagedRepo.domain_id, func.count())
|
|
.where(ManagedRepo.status == "active")
|
|
.group_by(ManagedRepo.domain_id)
|
|
)}
|
|
|
|
# Active workstream counts per domain (join through topics)
|
|
ws_per_domain = {}
|
|
for domain_id, cnt in await session.execute(
|
|
select(Topic.domain_id, func.count(Workstream.id))
|
|
.join(Workstream, Workstream.topic_id == Topic.id)
|
|
.where(Workstream.status == "active")
|
|
.group_by(Topic.domain_id)
|
|
):
|
|
ws_per_domain[domain_id] = cnt
|
|
|
|
# EP counts per domain id (via FK)
|
|
ep_counts = {r[0]: r[1] for r in await session.execute(
|
|
select(ExtensionPoint.domain_id, func.count()).group_by(ExtensionPoint.domain_id)
|
|
)}
|
|
|
|
# TD counts per domain id (via FK)
|
|
td_counts = {r[0]: r[1] for r in await session.execute(
|
|
select(TechnicalDebt.domain_id, func.count()).group_by(TechnicalDebt.domain_id)
|
|
)}
|
|
|
|
return [
|
|
DomainSummary(
|
|
slug=d.slug,
|
|
name=d.name,
|
|
repo_count=repo_counts.get(d.id, 0),
|
|
active_workstream_count=ws_per_domain.get(d.id, 0),
|
|
ep_count=ep_counts.get(d.id, 0),
|
|
td_count=td_counts.get(d.id, 0),
|
|
)
|
|
for d in domains
|
|
]
|
|
|
|
|
|
@router.get("/deps", response_model=list[WorkstreamWithDeps])
|
|
async def get_deps(session: AsyncSession = Depends(get_session)) -> list[WorkstreamWithDeps]:
|
|
"""Lightweight dep-graph endpoint: open workstreams with their dependency edges only.
|
|
|
|
Returns the same structure as open_workstreams in /state/summary but skips
|
|
the 10-table full-summary computation. Task counts are omitted (all zero).
|
|
Used by workstreams.md and dependencies.md which only need dep edges.
|
|
"""
|
|
open_ws_rows = await session.execute(
|
|
select(Workstream)
|
|
.options(noload("*"))
|
|
.where(Workstream.status.in_(["active", "blocked"]))
|
|
.order_by(Workstream.due_date.asc().nullslast(), Workstream.created_at)
|
|
)
|
|
open_ws = list(open_ws_rows.scalars().all())
|
|
|
|
open_ws_ids = [w.id for w in open_ws]
|
|
dep_rows = []
|
|
if open_ws_ids:
|
|
dep_result = await session.execute(
|
|
select(WorkstreamDependency).where(
|
|
(WorkstreamDependency.from_workstream_id.in_(open_ws_ids))
|
|
| (WorkstreamDependency.to_workstream_id.in_(open_ws_ids))
|
|
)
|
|
)
|
|
dep_rows = list(dep_result.scalars().all())
|
|
|
|
dep_ws_ids: set = set()
|
|
dep_task_ids: set = set()
|
|
for d in dep_rows:
|
|
dep_ws_ids.add(d.from_workstream_id)
|
|
if d.to_workstream_id:
|
|
dep_ws_ids.add(d.to_workstream_id)
|
|
if d.to_task_id:
|
|
dep_task_ids.add(d.to_task_id)
|
|
|
|
ws_lookup: dict = {w.id: w for w in open_ws}
|
|
extra_ids = dep_ws_ids - set(ws_lookup.keys())
|
|
if extra_ids:
|
|
extra_rows = await session.execute(
|
|
select(Workstream).options(noload("*")).where(Workstream.id.in_(extra_ids))
|
|
)
|
|
for w in extra_rows.scalars():
|
|
ws_lookup[w.id] = w
|
|
|
|
task_lookup: dict = {}
|
|
if dep_task_ids:
|
|
task_rows = await session.execute(select(Task).options(noload("*")).where(Task.id.in_(dep_task_ids)))
|
|
task_lookup = {t.id: t for t in task_rows.scalars().all()}
|
|
|
|
dep_index: dict = {w.id: {"depends_on": [], "blocks": []} for w in open_ws}
|
|
for d in dep_rows:
|
|
from_id, to_id, task_id = d.from_workstream_id, d.to_workstream_id, d.to_task_id
|
|
if from_id in dep_index and to_id and to_id in ws_lookup:
|
|
dep_index[from_id]["depends_on"].append(WorkstreamDepStub(
|
|
dep_id=d.id, target_type="workstream", relationship_type=d.relationship_type,
|
|
workstream_id=to_id, workstream_slug=ws_lookup[to_id].slug,
|
|
workstream_title=ws_lookup[to_id].title, description=d.description,
|
|
))
|
|
if from_id in dep_index and task_id and task_id in task_lookup:
|
|
dep_index[from_id]["depends_on"].append(WorkstreamDepStub(
|
|
dep_id=d.id, target_type="task", relationship_type=d.relationship_type,
|
|
task_id=task_id, task_title=task_lookup[task_id].title, description=d.description,
|
|
))
|
|
if to_id and to_id in dep_index and from_id in ws_lookup:
|
|
dep_index[to_id]["blocks"].append(WorkstreamDepStub(
|
|
dep_id=d.id, target_type="workstream", relationship_type=d.relationship_type,
|
|
workstream_id=from_id, workstream_slug=ws_lookup[from_id].slug,
|
|
workstream_title=ws_lookup[from_id].title, description=d.description,
|
|
))
|
|
|
|
return [
|
|
WorkstreamWithDeps(
|
|
**WorkstreamRead.model_validate(w).model_dump(),
|
|
depends_on=dep_index[w.id]["depends_on"],
|
|
blocks=dep_index[w.id]["blocks"],
|
|
)
|
|
for w in open_ws
|
|
]
|
|
|
|
|
|
_PRIORITY_RANK = {
|
|
TaskPriority.critical: 0,
|
|
TaskPriority.high: 1,
|
|
TaskPriority.medium: 2,
|
|
TaskPriority.low: 3,
|
|
}
|
|
|
|
|
|
async def _derive_next_steps(session: AsyncSession) -> list[NextStep]:
|
|
"""Derive contextual next-action suggestions from current hub state.
|
|
|
|
Two signal sources:
|
|
1. Recently resolved decisions (last 7 days) → first open task in same workstream
|
|
2. Workstreams whose every dependency is now completed → first todo task in that workstream
|
|
"""
|
|
steps: list[NextStep] = []
|
|
seen_task_ids: set = set()
|
|
|
|
# ── Signal 1: recently resolved decisions ────────────────────────────────
|
|
cutoff = datetime.now(tz=timezone.utc) - timedelta(days=7)
|
|
resolved_rows = await session.execute(
|
|
select(Decision)
|
|
.options(noload("*"))
|
|
.where(Decision.status == DecisionStatus.resolved)
|
|
.where(Decision.decided_at >= cutoff)
|
|
.where(Decision.workstream_id.isnot(None))
|
|
.order_by(Decision.decided_at.desc())
|
|
.limit(20)
|
|
)
|
|
for decision in resolved_rows.scalars().all():
|
|
open_tasks_rows = await session.execute(
|
|
select(Task)
|
|
.options(noload("*"))
|
|
.where(Task.workstream_id == decision.workstream_id)
|
|
.where(Task.status.in_([TaskStatus.todo, TaskStatus.in_progress]))
|
|
)
|
|
open_tasks = list(open_tasks_rows.scalars().all())
|
|
if not open_tasks:
|
|
continue
|
|
task = min(open_tasks, key=lambda t: (_PRIORITY_RANK.get(t.priority, 99), t.created_at))
|
|
if task.id in seen_task_ids:
|
|
continue
|
|
ws = await session.get(Workstream, decision.workstream_id, options=[noload("*")])
|
|
domain_slug = await _get_domain_slug_for_workstream(ws, session)
|
|
steps.append(NextStep(
|
|
type="resolved_decision",
|
|
domain=domain_slug,
|
|
workstream_id=ws.id if ws else None,
|
|
workstream_title=ws.title if ws else None,
|
|
workstream_slug=ws.slug if ws else None,
|
|
task_id=task.id,
|
|
task_title=task.title,
|
|
message=(
|
|
f"Decision '{decision.title}' was resolved → "
|
|
f"'{task.title}' is the next open task in '{ws.title if ws else '?'}'"
|
|
),
|
|
))
|
|
seen_task_ids.add(task.id)
|
|
|
|
# ── Signal 2: cleared dependencies ──────────────────────────────────────
|
|
all_dep_rows = await session.execute(
|
|
select(
|
|
WorkstreamDependency.from_workstream_id,
|
|
WorkstreamDependency.to_workstream_id,
|
|
).where(WorkstreamDependency.to_workstream_id.isnot(None))
|
|
)
|
|
all_deps = all_dep_rows.all()
|
|
|
|
# Group from_workstream_id → set of to_workstream_ids
|
|
dep_map: dict = {}
|
|
dep_ws_ids = set()
|
|
for from_ws_id, to_ws_id in all_deps:
|
|
dep_map.setdefault(from_ws_id, set()).add(to_ws_id)
|
|
dep_ws_ids.add(from_ws_id)
|
|
dep_ws_ids.add(to_ws_id)
|
|
|
|
ws_info = {}
|
|
if dep_ws_ids:
|
|
ws_rows = await session.execute(
|
|
select(
|
|
Workstream.id,
|
|
Workstream.status,
|
|
Workstream.title,
|
|
Workstream.slug,
|
|
Workstream.topic_id,
|
|
).where(Workstream.id.in_(dep_ws_ids))
|
|
)
|
|
ws_info = {
|
|
ws_id: {
|
|
"status": status,
|
|
"title": title,
|
|
"slug": slug,
|
|
"topic_id": topic_id,
|
|
}
|
|
for ws_id, status, title, slug, topic_id in ws_rows
|
|
}
|
|
|
|
ready_from_ws_ids = [
|
|
from_ws_id
|
|
for from_ws_id, to_ws_ids in dep_map.items()
|
|
if ws_info.get(from_ws_id, {}).get("status") in ("active", "blocked")
|
|
and all(ws_info.get(to_id, {}).get("status") == "completed" for to_id in to_ws_ids)
|
|
]
|
|
|
|
todo_by_ws: dict = {}
|
|
if ready_from_ws_ids:
|
|
todo_rows = await session.execute(
|
|
select(Task)
|
|
.options(noload("*"))
|
|
.where(Task.workstream_id.in_(ready_from_ws_ids))
|
|
.where(Task.status == TaskStatus.todo)
|
|
)
|
|
for task in todo_rows.scalars().all():
|
|
todo_by_ws.setdefault(task.workstream_id, []).append(task)
|
|
|
|
for from_ws_id in ready_from_ws_ids:
|
|
from_ws = ws_info.get(from_ws_id, {})
|
|
todo_tasks = todo_by_ws.get(from_ws_id, [])
|
|
if not todo_tasks:
|
|
continue
|
|
task = min(todo_tasks, key=lambda t: (_PRIORITY_RANK.get(t.priority, 99), t.created_at))
|
|
if task.id in seen_task_ids:
|
|
continue
|
|
domain_slug = await _get_domain_slug_for_topic(from_ws.get("topic_id"), session)
|
|
_blocker_slugs = []
|
|
for tid in dep_map[from_ws_id]:
|
|
if tid in ws_info:
|
|
_blocker_slugs.append(ws_info[tid]["slug"])
|
|
blocker_slugs = ", ".join(_blocker_slugs)
|
|
steps.append(NextStep(
|
|
type="dependency_cleared",
|
|
domain=domain_slug,
|
|
workstream_id=from_ws_id,
|
|
workstream_title=from_ws["title"],
|
|
workstream_slug=from_ws["slug"],
|
|
task_id=task.id,
|
|
task_title=task.title,
|
|
message=(
|
|
f"All dependencies of '{from_ws['title']}' are completed ({blocker_slugs}) → "
|
|
f"'{task.title}' is ready to start"
|
|
),
|
|
))
|
|
seen_task_ids.add(task.id)
|
|
|
|
return steps
|
|
|
|
|
|
async def _get_domain_slug_for_workstream(ws: Workstream | None, session: AsyncSession) -> str | None:
|
|
"""Get the domain slug for a workstream via its topic."""
|
|
if ws is None or ws.topic_id is None:
|
|
return None
|
|
return await _get_domain_slug_for_topic(ws.topic_id, session)
|
|
|
|
|
|
async def _get_domain_slug_for_topic(topic_id, session: AsyncSession) -> str | None:
|
|
"""Get the domain slug for a topic id."""
|
|
if topic_id is None:
|
|
return None
|
|
topic = await session.get(Topic, topic_id, options=[noload("*")])
|
|
if topic is None or topic.domain_id is None:
|
|
return None
|
|
domain = await session.get(Domain, topic.domain_id, options=[noload("*")])
|
|
return domain.slug if domain else None
|
|
|
|
|
|
def _value(item):
|
|
return item.value if hasattr(item, "value") else item
|
|
|
|
|
|
@router.get("/next_steps", response_model=list[NextStep])
|
|
async def get_next_steps(session: AsyncSession = Depends(get_session)) -> list[NextStep]:
|
|
"""Derive contextual next-action suggestions from current hub state.
|
|
|
|
Returns suggestions based on:
|
|
- Recently resolved decisions → first open task in the same workstream
|
|
- Workstreams whose every dependency workstream is now completed → first todo task
|
|
"""
|
|
return await _derive_next_steps(session)
|
|
|
|
|
|
@router.get("/health")
|
|
async def health_check() -> dict:
|
|
try:
|
|
async with engine.connect() as conn:
|
|
await conn.execute(text("SELECT 1"))
|
|
return {"status": "ok", "db": "connected"}
|
|
except Exception as exc:
|
|
return JSONResponse(
|
|
status_code=503,
|
|
content={"status": "error", "db": str(exc)},
|
|
)
|