Files
state-hub/api/routers/state.py
tegwick fc87e26b4b feat(gems): three-pass schema migration aligning state-hub with GEMS
Implements CUST-WP-0007. Resolves inconsistencies I-1, I-2, I-5, I-6
identified in the GEMS audit (GenericEntityModellingSystem.md).

Pass 1 (e1f2a3b4c5d6): domain_id FK on extension_points and
technical_debt (replaces raw string column); repo_id FK on contributions.
Fixes domain-filtering bugs in EP/TD dashboard pages.

Pass 2 (f2a3b4c5d6e7): repo_id nullable FK on workstreams, aligning
the GEMS primary attachment with ADR-001 (repo > topic). Dashboard
pages updated to prefer repo->domain over topic->domain.

Pass 3 (a3b4c5d6e7f8): SBOMSnapshot container entity (GEMS Complex
between Repository and SBOMEntry). Ingest is now additive — each call
creates a new snapshot; history is retained. List/report endpoints
filter to latest snapshot per repo via _latest_snapshot_ids_subquery().
New endpoints: GET /sbom/snapshots/, GET /sbom/snapshots/{id}/.
Dashboard gains a Snapshot History section.

Also adds GEMS analysis artefacts: wiki/GEMS-StateHub-TypeRegistry.md,
wiki/GEMS-StateHub-SWOT.md, workplans/CUST-WP-0006 (analysis),
workplans/CUST-WP-0007 (migration, now completed).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-02 23:39:17 +01:00

430 lines
17 KiB
Python

from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, Depends
from fastapi.responses import JSONResponse
from sqlalchemy import func, select, text
from sqlalchemy.ext.asyncio import AsyncSession
from api.database import get_session, engine
from api.models.contribution import Contribution, ContributionStatus, ContributionType
from api.models.decision import Decision, DecisionStatus, DecisionType
from api.models.domain import Domain
from api.models.extension_point import ExtensionPoint
from api.models.managed_repo import ManagedRepo
from api.models.progress_event import ProgressEvent
from api.models.sbom_entry import SBOMEntry
from api.models.task import Task, TaskPriority, TaskStatus
from api.models.technical_debt import TechnicalDebt
from api.models.topic import Topic, TopicStatus
from api.models.workstream import Workstream, WorkstreamStatus
from api.models.workstream_dependency import WorkstreamDependency
from api.schemas.decision import DecisionRead
from api.schemas.domain import DomainSummary
from api.schemas.progress_event import ProgressEventRead
from api.schemas.state import (
DecisionTotals,
NextStep,
StateSummary,
TaskTotals,
Totals,
TopicTotals,
WorkstreamTotals,
)
from api.schemas.task import TaskRead
from api.schemas.topic import TopicWithWorkstreams
from api.schemas.workstream import WorkstreamRead, WorkstreamWithTaskCounts, WorkstreamWithDeps
from api.schemas.workstream_dependency import WorkstreamDepStub
router = APIRouter(prefix="/state", tags=["state"])
@router.get("/summary", response_model=StateSummary)
async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSummary:
# Run all queries sequentially on one session.
# AsyncSession does not support concurrent operations (no gather on same session).
topics_rows = await session.execute(
select(Topic).where(Topic.status != TopicStatus.archived).order_by(Topic.created_at)
)
topics = list(topics_rows.scalars().all())
blocking_rows = await session.execute(
select(Decision)
.where(Decision.decision_type == DecisionType.pending)
.where(Decision.status.in_([DecisionStatus.open, DecisionStatus.escalated]))
.order_by(Decision.deadline.asc().nullslast(), Decision.created_at)
)
blocking = list(blocking_rows.scalars().all())
blocked_rows = await session.execute(
select(Task).where(Task.status == TaskStatus.blocked).order_by(Task.created_at)
)
blocked = list(blocked_rows.scalars().all())
recent_rows = await session.execute(
select(ProgressEvent).order_by(ProgressEvent.created_at.desc()).limit(20)
)
recent = list(recent_rows.scalars().all())
open_ws_rows = await session.execute(
select(Workstream)
.where(Workstream.status.in_([WorkstreamStatus.active, WorkstreamStatus.blocked]))
.order_by(Workstream.due_date.asc().nullslast(), Workstream.created_at)
)
open_ws = list(open_ws_rows.scalars().all())
# Task counts per workstream (used to enrich open_workstreams)
task_per_ws: dict = {}
for ws_id, tstat, cnt in await session.execute(
select(Task.workstream_id, Task.status, func.count()).group_by(Task.workstream_id, Task.status)
):
task_per_ws.setdefault(ws_id, {})[tstat] = cnt
# Dependency graph for open workstreams
open_ws_ids = [w.id for w in open_ws]
dep_rows = []
if open_ws_ids:
dep_result = await session.execute(
select(WorkstreamDependency).where(
(WorkstreamDependency.from_workstream_id.in_(open_ws_ids))
| (WorkstreamDependency.to_workstream_id.in_(open_ws_ids))
)
)
dep_rows = list(dep_result.scalars().all())
# Build a slug+title lookup for all workstreams referenced in deps
dep_ws_ids = set()
for d in dep_rows:
dep_ws_ids.add(d.from_workstream_id)
dep_ws_ids.add(d.to_workstream_id)
ws_lookup: dict = {w.id: w for w in open_ws}
extra_ids = dep_ws_ids - set(ws_lookup.keys())
if extra_ids:
extra_rows = await session.execute(
select(Workstream).where(Workstream.id.in_(extra_ids))
)
for w in extra_rows.scalars():
ws_lookup[w.id] = w
# Index: workstream_id → (depends_on stubs, blocks stubs)
dep_index: dict = {w.id: {"depends_on": [], "blocks": []} for w in open_ws}
for d in dep_rows:
from_id, to_id = d.from_workstream_id, d.to_workstream_id
if from_id in dep_index and to_id in ws_lookup:
dep_index[from_id]["depends_on"].append(WorkstreamDepStub(
dep_id=d.id,
workstream_id=to_id,
workstream_slug=ws_lookup[to_id].slug,
workstream_title=ws_lookup[to_id].title,
description=d.description,
))
if to_id in dep_index and from_id in ws_lookup:
dep_index[to_id]["blocks"].append(WorkstreamDepStub(
dep_id=d.id,
workstream_id=from_id,
workstream_slug=ws_lookup[from_id].slug,
workstream_title=ws_lookup[from_id].title,
description=d.description,
))
# Totals — one GROUP BY per table
topic_counts = {r[0]: r[1] for r in await session.execute(
select(Topic.status, func.count()).group_by(Topic.status)
)}
ws_counts = {r[0]: r[1] for r in await session.execute(
select(Workstream.status, func.count()).group_by(Workstream.status)
)}
task_counts = {r[0]: r[1] for r in await session.execute(
select(Task.status, func.count()).group_by(Task.status)
)}
dec_counts = {r[0]: r[1] for r in await session.execute(
select(Decision.status, func.count()).group_by(Decision.status)
)}
totals = Totals(
topics=TopicTotals(
active=topic_counts.get(TopicStatus.active, 0),
paused=topic_counts.get(TopicStatus.paused, 0),
archived=topic_counts.get(TopicStatus.archived, 0),
total=sum(topic_counts.values()),
),
workstreams=WorkstreamTotals(
active=ws_counts.get(WorkstreamStatus.active, 0),
blocked=ws_counts.get(WorkstreamStatus.blocked, 0),
completed=ws_counts.get(WorkstreamStatus.completed, 0),
archived=ws_counts.get(WorkstreamStatus.archived, 0),
total=sum(ws_counts.values()),
),
tasks=TaskTotals(
todo=task_counts.get(TaskStatus.todo, 0),
in_progress=task_counts.get(TaskStatus.in_progress, 0),
blocked=task_counts.get(TaskStatus.blocked, 0),
done=task_counts.get(TaskStatus.done, 0),
cancelled=task_counts.get(TaskStatus.cancelled, 0),
total=sum(task_counts.values()),
),
decisions=DecisionTotals(
open=dec_counts.get(DecisionStatus.open, 0),
resolved=dec_counts.get(DecisionStatus.resolved, 0),
escalated=dec_counts.get(DecisionStatus.escalated, 0),
superseded=dec_counts.get(DecisionStatus.superseded, 0),
total=sum(dec_counts.values()),
),
)
next_steps = await _derive_next_steps(session)
# Domain summary stats
domain_summaries = await _build_domain_summaries(session)
# Contribution counts (by type and status)
contrib_type_counts = {r[0].value: r[1] for r in await session.execute(
select(Contribution.type, func.count()).group_by(Contribution.type)
)}
contrib_status_counts = {r[0].value: r[1] for r in await session.execute(
select(Contribution.status, func.count()).group_by(Contribution.status)
)}
contribution_counts = {**contrib_type_counts, **contrib_status_counts}
# Licence risk: copyleft packages in direct prod deps
_COPYLEFT_PATS = ("GPL", "AGPL", "LGPL", "EUPL", "CDDL", "MPL")
copyleft_risk_rows = await session.execute(
select(func.count()).select_from(SBOMEntry)
.where(SBOMEntry.is_direct.is_(True))
.where(SBOMEntry.is_dev.is_(False))
)
# Filter in Python since ILIKE across multiple patterns is verbose in SQLAlchemy
all_direct_prod_rows = await session.execute(
select(SBOMEntry.license_spdx)
.where(SBOMEntry.is_direct.is_(True))
.where(SBOMEntry.is_dev.is_(False))
)
licence_risk_count = sum(
1 for (lic,) in all_direct_prod_rows.all()
if lic and any(pat in lic.upper() for pat in _COPYLEFT_PATS)
)
return StateSummary(
generated_at=datetime.now(tz=timezone.utc),
totals=totals,
topics=[TopicWithWorkstreams.model_validate(t) for t in topics],
blocking_decisions=[DecisionRead.model_validate(d) for d in blocking],
blocked_tasks=[TaskRead.model_validate(t) for t in blocked],
recent_progress=[ProgressEventRead.model_validate(e) for e in recent],
next_steps=next_steps,
domains=domain_summaries,
contribution_counts=contribution_counts,
licence_risk_count=licence_risk_count,
open_workstreams=[
WorkstreamWithDeps(
**WorkstreamRead.model_validate(w).model_dump(),
tasks_total=sum(task_per_ws.get(w.id, {}).values()),
tasks_todo=task_per_ws.get(w.id, {}).get(TaskStatus.todo, 0),
tasks_in_progress=task_per_ws.get(w.id, {}).get(TaskStatus.in_progress, 0),
tasks_blocked=task_per_ws.get(w.id, {}).get(TaskStatus.blocked, 0),
tasks_done=task_per_ws.get(w.id, {}).get(TaskStatus.done, 0),
depends_on=dep_index.get(w.id, {}).get("depends_on", []),
blocks=dep_index.get(w.id, {}).get("blocks", []),
)
for w in open_ws
],
)
async def _build_domain_summaries(session: AsyncSession) -> list[DomainSummary]:
"""Compute per-domain stats for the state summary."""
domains_rows = await session.execute(
select(Domain).where(Domain.status == "active").order_by(Domain.name)
)
domains = list(domains_rows.scalars().all())
# Repo counts per domain
repo_counts = {r[0]: r[1] for r in await session.execute(
select(ManagedRepo.domain_id, func.count())
.where(ManagedRepo.status == "active")
.group_by(ManagedRepo.domain_id)
)}
# Active workstream counts per domain (join through topics)
ws_per_domain = {}
for domain_id, cnt in await session.execute(
select(Topic.domain_id, func.count(Workstream.id))
.join(Workstream, Workstream.topic_id == Topic.id)
.where(Workstream.status == WorkstreamStatus.active)
.group_by(Topic.domain_id)
):
ws_per_domain[domain_id] = cnt
# EP counts per domain id (via FK)
ep_counts = {r[0]: r[1] for r in await session.execute(
select(ExtensionPoint.domain_id, func.count()).group_by(ExtensionPoint.domain_id)
)}
# TD counts per domain id (via FK)
td_counts = {r[0]: r[1] for r in await session.execute(
select(TechnicalDebt.domain_id, func.count()).group_by(TechnicalDebt.domain_id)
)}
return [
DomainSummary(
slug=d.slug,
name=d.name,
repo_count=repo_counts.get(d.id, 0),
active_workstream_count=ws_per_domain.get(d.id, 0),
ep_count=ep_counts.get(d.id, 0),
td_count=td_counts.get(d.id, 0),
)
for d in domains
]
_PRIORITY_RANK = {
TaskPriority.critical: 0,
TaskPriority.high: 1,
TaskPriority.medium: 2,
TaskPriority.low: 3,
}
async def _derive_next_steps(session: AsyncSession) -> list[NextStep]:
"""Derive contextual next-action suggestions from current hub state.
Two signal sources:
1. Recently resolved decisions (last 7 days) → first open task in same workstream
2. Workstreams whose every dependency is now completed → first todo task in that workstream
"""
steps: list[NextStep] = []
seen_task_ids: set = set()
# ── Signal 1: recently resolved decisions ────────────────────────────────
cutoff = datetime.now(tz=timezone.utc) - timedelta(days=7)
resolved_rows = await session.execute(
select(Decision)
.where(Decision.status == DecisionStatus.resolved)
.where(Decision.decided_at >= cutoff)
.where(Decision.workstream_id.isnot(None))
.order_by(Decision.decided_at.desc())
)
for decision in resolved_rows.scalars().all():
open_tasks_rows = await session.execute(
select(Task)
.where(Task.workstream_id == decision.workstream_id)
.where(Task.status.in_([TaskStatus.todo, TaskStatus.in_progress]))
)
open_tasks = list(open_tasks_rows.scalars().all())
if not open_tasks:
continue
task = min(open_tasks, key=lambda t: (_PRIORITY_RANK.get(t.priority, 99), t.created_at))
if task.id in seen_task_ids:
continue
ws = await session.get(Workstream, decision.workstream_id)
domain_slug = await _get_domain_slug_for_workstream(ws, session)
steps.append(NextStep(
type="resolved_decision",
domain=domain_slug,
workstream_id=ws.id if ws else None,
workstream_title=ws.title if ws else None,
workstream_slug=ws.slug if ws else None,
task_id=task.id,
task_title=task.title,
message=(
f"Decision '{decision.title}' was resolved → "
f"'{task.title}' is the next open task in '{ws.title if ws else '?'}'"
),
))
seen_task_ids.add(task.id)
# ── Signal 2: cleared dependencies ──────────────────────────────────────
all_dep_rows = await session.execute(select(WorkstreamDependency))
all_deps = list(all_dep_rows.scalars().all())
# Group from_workstream_id → set of to_workstream_ids
dep_map: dict = {}
for d in all_deps:
dep_map.setdefault(d.from_workstream_id, set()).add(d.to_workstream_id)
for from_ws_id, to_ws_ids in dep_map.items():
# All targets must be completed
all_done = True
for to_id in to_ws_ids:
to_ws = await session.get(Workstream, to_id)
if to_ws is None or to_ws.status != WorkstreamStatus.completed:
all_done = False
break
if not all_done:
continue
from_ws = await session.get(Workstream, from_ws_id)
if from_ws is None or from_ws.status not in (WorkstreamStatus.active, WorkstreamStatus.blocked):
continue
todo_rows = await session.execute(
select(Task)
.where(Task.workstream_id == from_ws_id)
.where(Task.status == TaskStatus.todo)
)
todo_tasks = list(todo_rows.scalars().all())
if not todo_tasks:
continue
task = min(todo_tasks, key=lambda t: (_PRIORITY_RANK.get(t.priority, 99), t.created_at))
if task.id in seen_task_ids:
continue
domain_slug = await _get_domain_slug_for_workstream(from_ws, session)
_blocker_slugs = []
for tid in to_ws_ids:
_ws = await session.get(Workstream, tid)
if _ws:
_blocker_slugs.append(_ws.slug)
blocker_slugs = ", ".join(_blocker_slugs)
steps.append(NextStep(
type="dependency_cleared",
domain=domain_slug,
workstream_id=from_ws.id,
workstream_title=from_ws.title,
workstream_slug=from_ws.slug,
task_id=task.id,
task_title=task.title,
message=(
f"All dependencies of '{from_ws.title}' are completed ({blocker_slugs}) → "
f"'{task.title}' is ready to start"
),
))
seen_task_ids.add(task.id)
return steps
async def _get_domain_slug_for_workstream(ws: Workstream | None, session: AsyncSession) -> str | None:
"""Get the domain slug for a workstream via its topic."""
if ws is None or ws.topic_id is None:
return None
topic = await session.get(Topic, ws.topic_id)
if topic is None or topic.domain_id is None:
return None
domain = await session.get(Domain, topic.domain_id)
return domain.slug if domain else None
@router.get("/next_steps", response_model=list[NextStep])
async def get_next_steps(session: AsyncSession = Depends(get_session)) -> list[NextStep]:
"""Derive contextual next-action suggestions from current hub state.
Returns suggestions based on:
- Recently resolved decisions → first open task in the same workstream
- Workstreams whose every dependency workstream is now completed → first todo task
"""
return await _derive_next_steps(session)
@router.get("/health")
async def health_check() -> dict:
try:
async with engine.connect() as conn:
await conn.execute(text("SELECT 1"))
return {"status": "ok", "db": "connected"}
except Exception as exc:
return JSONResponse(
status_code=503,
content={"status": "error", "db": str(exc)},
)