Optimize dashboard overview loading

This commit is contained in:
2026-06-06 00:42:00 +02:00
parent a412998c96
commit b340489d96
14 changed files with 990 additions and 88 deletions

View File

@@ -1,7 +1,7 @@
import time
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, Depends, Request
from fastapi import APIRouter, Depends, Request, Response
from fastapi.responses import JSONResponse
from sqlalchemy import func, select, text
from sqlalchemy.ext.asyncio import AsyncSession
@@ -17,6 +17,7 @@ from api.models.extension_point import ExtensionPoint
from api.models.managed_repo import ManagedRepo
from api.models.progress_event import ProgressEvent
from api.models.sbom_entry import SBOMEntry
from api.models.sbom_snapshot import SBOMSnapshot
from api.models.task import Task, TaskPriority, TaskStatus
from api.models.technical_debt import TechnicalDebt
from api.models.topic import Topic, TopicStatus
@@ -26,6 +27,9 @@ from api.schemas.decision import DecisionRead
from api.schemas.domain import DomainSummary
from api.schemas.progress_event import ProgressEventRead
from api.schemas.state import (
DashboardOverview,
DashboardSourceMeta,
DashboardWorkplanRow,
DecisionTotals,
NextStep,
StateSummary,
@@ -38,6 +42,7 @@ from api.schemas.task import TaskRead
from api.schemas.topic import TopicRead, TopicWithWorkstreams
from api.schemas.workstream import WorkstreamRead, WorkstreamWithTaskCounts, WorkstreamWithDeps
from api.schemas.workstream_dependency import WorkstreamDepStub
from api.routers.workstreams import _workplan_index
from api.task_status import TERMINAL_TASK_STATUSES, status_value
from api.workplan_status import (
CLOSED_WORKSTREAM_STATUSES,
@@ -51,17 +56,25 @@ router = APIRouter(prefix="/state", tags=["state"])
_SUMMARY_CACHE: StateSummary | None = None
_SUMMARY_CACHE_AT: float = 0.0
_SUMMARY_TTL = 15.0
_OVERVIEW_CACHE: DashboardOverview | None = None
_OVERVIEW_CACHE_AT: float = 0.0
_OVERVIEW_TTL = 10.0
@router.get("/summary", response_model=StateSummary)
async def get_summary(
request: Request,
response: Response,
session: AsyncSession = Depends(get_session),
) -> StateSummary:
global _SUMMARY_CACHE, _SUMMARY_CACHE_AT
no_cache = "no-cache" in request.headers.get("cache-control", "")
if not no_cache and _SUMMARY_CACHE is not None and (time.monotonic() - _SUMMARY_CACHE_AT) < _SUMMARY_TTL:
response.headers["X-StateHub-Cache"] = "hit"
response.headers["Cache-Control"] = "max-age=15, stale-while-revalidate=30"
return _SUMMARY_CACHE
response.headers["X-StateHub-Cache"] = "miss"
response.headers["Cache-Control"] = "max-age=15, stale-while-revalidate=30"
# Run all queries sequentially on one session.
# AsyncSession does not support concurrent operations (no gather on same session).
@@ -362,6 +375,309 @@ async def get_summary(
return result
@router.get("/overview", response_model=DashboardOverview)
async def get_overview(
request: Request,
response: Response,
session: AsyncSession = Depends(get_session),
) -> DashboardOverview:
"""Bounded dashboard overview read model.
This is intentionally narrower than /state/summary. The dashboard overview
needs counts, recent rows, and chart-ready workplan rows; it does not need
full task or workplan lists transferred to the browser on every poll.
"""
global _OVERVIEW_CACHE, _OVERVIEW_CACHE_AT
no_cache = "no-cache" in request.headers.get("cache-control", "")
if not no_cache and _OVERVIEW_CACHE is not None and (time.monotonic() - _OVERVIEW_CACHE_AT) < _OVERVIEW_TTL:
response.headers["X-StateHub-Cache"] = "hit"
response.headers["Cache-Control"] = "max-age=10, stale-while-revalidate=30"
return _OVERVIEW_CACHE
response.headers["X-StateHub-Cache"] = "miss"
response.headers["Cache-Control"] = "max-age=10, stale-while-revalidate=30"
result = await _build_dashboard_overview(session)
_OVERVIEW_CACHE = result
_OVERVIEW_CACHE_AT = time.monotonic()
return result
async def _build_dashboard_overview(session: AsyncSession) -> DashboardOverview:
topics_rows = await session.execute(
select(Topic)
.options(
selectinload(Topic.domain),
noload(Topic.workstreams),
noload(Topic.decisions),
noload(Topic.progress_events),
)
.where(Topic.status != TopicStatus.archived)
.order_by(Topic.created_at)
)
topics = list(topics_rows.scalars().all())
topic_map = {topic.id: topic for topic in topics}
workstream_rows = await session.execute(
select(Workstream)
.options(noload("*"))
.order_by(
Workstream.planning_priority.asc().nullslast(),
Workstream.planning_order.asc().nullslast(),
Workstream.updated_at.desc(),
)
)
workstreams_all = list(workstream_rows.scalars().all())
topic_workstreams: dict = {t.id: [] for t in topics}
for w in sorted(workstreams_all, key=lambda item: item.created_at):
if w.topic_id not in topic_workstreams:
continue
topic_workstreams[w.topic_id].append({
"id": w.id,
"slug": w.slug,
"title": w.title,
"status": w.status,
"owner": w.owner,
"due_date": w.due_date,
})
repo_rows = await session.execute(
select(ManagedRepo.id, ManagedRepo.slug, Domain.slug)
.join(Domain, Domain.id == ManagedRepo.domain_id)
.order_by(ManagedRepo.slug)
)
repo_map = {
repo_id: {"slug": repo_slug, "domain_slug": domain_slug}
for repo_id, repo_slug, domain_slug in repo_rows
}
task_counts_by_ws: dict = {}
task_statuses_per_ws: dict = {}
task_totals_by_status: dict[str, int] = {}
for ws_id, task_status, count in await session.execute(
select(Task.workstream_id, Task.status, func.count()).group_by(Task.workstream_id, Task.status)
):
status = status_value(task_status)
task_counts_by_ws.setdefault(ws_id, {"done": 0, "progress": 0, "wait": 0, "todo": 0, "total": 0})
task_counts_by_ws[ws_id]["total"] += count
if status in {"done", "progress", "wait", "todo"}:
task_counts_by_ws[ws_id][status] += count
task_statuses_per_ws.setdefault(ws_id, []).extend([status] * count)
task_totals_by_status[status] = task_totals_by_status.get(status, 0) + count
open_ws = [
w for w in workstreams_all
if normalize_workstream_status(w.status) in OPEN_WORKSTREAM_STATUSES
]
open_ws_ids = [w.id for w in open_ws]
dep_rows = []
if open_ws_ids:
dep_result = await session.execute(
select(WorkstreamDependency).where(
(WorkstreamDependency.from_workstream_id.in_(open_ws_ids))
| (WorkstreamDependency.to_workstream_id.in_(open_ws_ids))
)
)
dep_rows = list(dep_result.scalars().all())
ws_lookup = {w.id: w for w in workstreams_all}
workstream_flow = load_flow("workstream")
flow_engine = FlowEngine()
effective_status: dict = {}
for w in open_ws:
flow_obj = {
"status": w.status,
"workstation": w.status,
"tasks": [{"status": status} for status in task_statuses_per_ws.get(w.id, [])],
"dependencies": [
{"workstation": normalize_workstream_status(ws_lookup[d.to_workstream_id].status)}
for d in dep_rows
if d.from_workstream_id == w.id and d.to_workstream_id and d.to_workstream_id in ws_lookup
],
}
flow_result = flow_engine.evaluate(flow_obj, workstream_flow)
effective_status[w.id] = "blocked" if flow_result.exit_blocked else normalize_workstream_status(w.status)
topic_counts = {r[0]: r[1] for r in await session.execute(
select(Topic.status, func.count()).group_by(Topic.status)
)}
ws_counts = {r[0]: r[1] for r in await session.execute(
select(Workstream.status, func.count()).group_by(Workstream.status)
)}
dec_counts = {r[0]: r[1] for r in await session.execute(
select(Decision.status, func.count()).group_by(Decision.status)
)}
totals = Totals(
topics=TopicTotals(
active=topic_counts.get(TopicStatus.active, 0),
paused=topic_counts.get(TopicStatus.paused, 0),
archived=topic_counts.get(TopicStatus.archived, 0),
total=sum(topic_counts.values()),
),
workstreams=WorkstreamTotals(
proposed=ws_counts.get("proposed", 0),
ready=ws_counts.get("ready", 0) + ws_counts.get("todo", 0),
active=sum(1 for status in effective_status.values() if status == "active"),
blocked=sum(1 for status in effective_status.values() if status == "blocked"),
backlog=ws_counts.get("backlog", 0),
finished=(
ws_counts.get("finished", 0)
+ ws_counts.get("completed", 0)
+ ws_counts.get("accepted", 0)
),
archived=ws_counts.get("archived", 0),
total=sum(ws_counts.values()),
),
tasks=TaskTotals(
wait=task_totals_by_status.get("wait", 0),
todo=task_totals_by_status.get("todo", 0),
progress=task_totals_by_status.get("progress", 0),
done=task_totals_by_status.get("done", 0),
cancel=task_totals_by_status.get("cancel", 0),
total=sum(task_totals_by_status.values()),
),
decisions=DecisionTotals(
open=dec_counts.get(DecisionStatus.open, 0),
resolved=dec_counts.get(DecisionStatus.resolved, 0),
escalated=dec_counts.get(DecisionStatus.escalated, 0),
superseded=dec_counts.get(DecisionStatus.superseded, 0),
total=sum(dec_counts.values()),
),
)
blocking_rows = await session.execute(
select(Decision)
.where(Decision.decision_type == DecisionType.pending)
.where(Decision.status.in_([DecisionStatus.open, DecisionStatus.escalated]))
.order_by(Decision.deadline.asc().nullslast(), Decision.created_at)
)
blocking = list(blocking_rows.scalars().all())
waiting_rows = await session.execute(
select(Task).options(noload("*")).where(Task.status == TaskStatus.wait).order_by(Task.created_at)
)
waiting = list(waiting_rows.scalars().all())
recent_rows = await session.execute(
select(ProgressEvent).options(noload("*")).order_by(ProgressEvent.created_at.desc()).limit(20)
)
recent = list(recent_rows.scalars().all())
milestone_rows = await session.execute(
select(ProgressEvent)
.options(noload("*"))
.where(ProgressEvent.event_type == "milestone")
.where(ProgressEvent.summary.like("Project registered with State Hub:%"))
.order_by(ProgressEvent.created_at.desc())
.limit(500)
)
registration_milestones = list(milestone_rows.scalars().all())
contrib_type_counts = {r[0].value: r[1] for r in await session.execute(
select(Contribution.type, func.count()).group_by(Contribution.type)
)}
contrib_status_counts = {r[0].value: r[1] for r in await session.execute(
select(Contribution.status, func.count()).group_by(Contribution.status)
)}
contribution_counts = {**contrib_type_counts, **contrib_status_counts}
_COPYLEFT_PATS = ("GPL", "AGPL", "LGPL", "EUPL", "CDDL", "MPL")
all_direct_prod_rows = await session.execute(
select(SBOMEntry.license_spdx)
.where(SBOMEntry.is_direct.is_(True))
.where(SBOMEntry.is_dev.is_(False))
)
licence_risk_count = sum(
1 for (lic,) in all_direct_prod_rows.all()
if lic and any(pat in lic.upper() for pat in _COPYLEFT_PATS)
)
snapshot_count, package_total = (await session.execute(
select(
func.count(SBOMSnapshot.id),
func.coalesce(func.sum(SBOMSnapshot.entry_count), 0),
)
)).one()
open_cap_req_count = (await session.execute(
select(func.count()).select_from(CapabilityRequest).where(
CapabilityRequest.status.in_(["requested", "accepted", "in_progress", "ready_for_review"])
)
)).scalar() or 0
sources: dict[str, DashboardSourceMeta] = {}
try:
workplan_index = await _workplan_index(refresh=False, session=session)
workplan_map = workplan_index.get("workstreams", {})
index_meta = workplan_index.get("_meta", {})
sources["workplan_index"] = DashboardSourceMeta(
ok=not bool(index_meta.get("last_error")),
stale=bool(index_meta.get("stale")),
cache_age_seconds=index_meta.get("cache_age_seconds"),
refresh_in_progress=bool(index_meta.get("refresh_in_progress")),
error=index_meta.get("last_error"),
)
except Exception as exc:
workplan_map = {}
sources["workplan_index"] = DashboardSourceMeta(ok=False, error=str(exc))
workplan_rows: list[DashboardWorkplanRow] = []
for w in workstreams_all:
repo = repo_map.get(w.repo_id)
topic = topic_map.get(w.topic_id)
workplan = workplan_map.get(str(w.id), {})
counts = task_counts_by_ws.get(w.id, {"done": 0, "progress": 0, "wait": 0, "todo": 0, "total": 0})
workplan_rows.append(DashboardWorkplanRow(
id=w.id,
title=w.title,
status=normalize_workstream_status(w.status),
domain=repo["domain_slug"] if repo else (topic.domain_slug if topic else "unknown"),
repo_label=repo["slug"] if repo else workplan.get("repo_slug", "unassigned"),
workplan_filename=workplan.get("filename"),
workplan_relative_path=workplan.get("relative_path"),
workplan_archived=bool(workplan.get("archived", False)),
health_labels=workplan.get("health_labels", []),
href=f"./workstreams/{w.id}",
done=counts.get("done", 0),
progress=counts.get("progress", 0),
wait=counts.get("wait", 0),
todo=counts.get("todo", 0),
total=counts.get("total", 0),
created_at=w.created_at,
updated_at=w.updated_at,
))
return DashboardOverview(
generated_at=datetime.now(tz=timezone.utc),
totals=totals,
topics=[
TopicWithWorkstreams(
**TopicRead.model_validate(t).model_dump(),
workstreams=topic_workstreams.get(t.id, []),
)
for t in topics
],
blocking_decisions=[DecisionRead.model_validate(d) for d in blocking],
waiting_tasks=[TaskRead.model_validate(t) for t in waiting],
blocked_tasks=[TaskRead.model_validate(t) for t in waiting],
recent_progress=[ProgressEventRead.model_validate(e) for e in recent],
next_steps=await _derive_next_steps(session),
contribution_counts=contribution_counts,
licence_risk_count=licence_risk_count,
open_capability_requests=open_cap_req_count,
sbom_snapshot_count=int(snapshot_count or 0),
sbom_package_total=int(package_total or 0),
registration_milestones=[ProgressEventRead.model_validate(e) for e in registration_milestones],
workplan_rows=workplan_rows,
sources=sources,
diagnostics={
"workplan_row_count": len(workplan_rows),
"task_count_strategy": "grouped",
},
)
async def _build_domain_summaries(session: AsyncSession) -> list[DomainSummary]:
"""Compute per-domain stats for the state summary."""
domains_rows = await session.execute(