Implement State Hub v0.2: dependency graph, next-steps suggestions, design boundary

S0 — Design boundary formalised across all integration surfaces:
- TOOLS.md restructured with Design Boundary section, Sanctioned Write Tools,
  and Bootstrap-Only Tools (create_workstream, create_task) with explicit note
- project_claude_md.template and railiance CLAUDE.md updated with boundary note
  and get_next_steps() in session start protocol
- Global ~/.claude/CLAUDE.md updated accordingly

S1 — Workstream dependency graph:
- WorkstreamDependency model (directed edge, CASCADE on delete, unique pair constraint)
- Alembic migration 0b547c153153; script.py.mako added (was missing)
- REST API: POST/GET /workstreams/{id}/dependencies/, DELETE …/{dep_id} (hard delete)
- StateSummary open_workstreams enriched with depends_on/blocks lists
- MCP tools: create_dependency(), list_dependencies()
- Dashboard workstreams page: Dependencies section with relationship cards
- Seeded: custodian-agent-runtime → llm-shared-library + phase-0-operational-baseline

S2 — Suggesting Next Steps (sanctioned write use case #2):
- GET /state/next_steps derives suggestions from recently resolved decisions
  (→ first open task in same workstream) and cleared dependencies
  (→ first todo task in now-unblocked workstream)
- StateSummary.next_steps included on every summary call
- MCP tool: get_next_steps()
- Dashboard: "What's next?" card grid above Registered Projects

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-25 23:33:14 +01:00
parent 9965349135
commit f34b49ebde
15 changed files with 678 additions and 35 deletions

View File

@@ -4,7 +4,7 @@ from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from api.database import engine
from api.routers import decisions, progress, state, tasks, topics, workstreams
from api.routers import decisions, progress, state, tasks, topics, workstreams, workstream_dependencies
@asynccontextmanager
@@ -23,12 +23,13 @@ app = FastAPI(
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000", "http://127.0.0.1:3000"],
allow_methods=["GET", "POST", "PATCH"],
allow_methods=["GET", "POST", "PATCH", "DELETE"],
allow_headers=["Content-Type"],
)
app.include_router(topics.router)
app.include_router(workstreams.router)
app.include_router(workstream_dependencies.router)
app.include_router(tasks.router)
app.include_router(decisions.router)
app.include_router(progress.router)

View File

@@ -1,6 +1,7 @@
from api.models.base import Base
from api.models.topic import Topic, TopicStatus, Domain
from api.models.workstream import Workstream, WorkstreamStatus
from api.models.workstream_dependency import WorkstreamDependency
from api.models.task import Task, TaskStatus, TaskPriority
from api.models.decision import Decision, DecisionType, DecisionStatus
from api.models.progress_event import ProgressEvent
@@ -9,6 +10,7 @@ __all__ = [
"Base",
"Topic", "TopicStatus", "Domain",
"Workstream", "WorkstreamStatus",
"WorkstreamDependency",
"Task", "TaskStatus", "TaskPriority",
"Decision", "DecisionType", "DecisionStatus",
"ProgressEvent",

View File

@@ -0,0 +1,45 @@
import uuid
from sqlalchemy import ForeignKey, Text, UniqueConstraint
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from api.models.base import Base, TimestampMixin, new_uuid
class WorkstreamDependency(Base, TimestampMixin):
"""Directed dependency edge: `from_workstream` depends on `to_workstream`.
Semantics: `to_workstream` must reach a satisfactory state before
`from_workstream` can fully proceed. Hard deletes are intentional —
removing an edge removes a constraint, not information.
"""
__tablename__ = "workstream_dependencies"
__table_args__ = (
UniqueConstraint("from_workstream_id", "to_workstream_id", name="uq_ws_dep_pair"),
)
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=new_uuid
)
from_workstream_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("workstreams.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
to_workstream_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("workstreams.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
description: Mapped[str | None] = mapped_column(Text, nullable=True)
from_workstream: Mapped["Workstream"] = relationship( # noqa: F821
"Workstream", foreign_keys=[from_workstream_id]
)
to_workstream: Mapped["Workstream"] = relationship( # noqa: F821
"Workstream", foreign_keys=[to_workstream_id]
)

View File

@@ -1,4 +1,4 @@
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, Depends
from fastapi.responses import JSONResponse
@@ -8,13 +8,15 @@ from sqlalchemy.ext.asyncio import AsyncSession
from api.database import get_session, engine
from api.models.decision import Decision, DecisionStatus, DecisionType
from api.models.progress_event import ProgressEvent
from api.models.task import Task, TaskStatus
from api.models.task import Task, TaskPriority, TaskStatus
from api.models.topic import Topic, TopicStatus
from api.models.workstream import Workstream, WorkstreamStatus
from api.models.workstream_dependency import WorkstreamDependency
from api.schemas.decision import DecisionRead
from api.schemas.progress_event import ProgressEventRead
from api.schemas.state import (
DecisionTotals,
NextStep,
StateSummary,
TaskTotals,
Totals,
@@ -23,7 +25,8 @@ from api.schemas.state import (
)
from api.schemas.task import TaskRead
from api.schemas.topic import TopicWithWorkstreams
from api.schemas.workstream import WorkstreamRead, WorkstreamWithTaskCounts
from api.schemas.workstream import WorkstreamRead, WorkstreamWithTaskCounts, WorkstreamWithDeps
from api.schemas.workstream_dependency import WorkstreamDepStub
router = APIRouter(prefix="/state", tags=["state"])
@@ -70,6 +73,53 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm
):
task_per_ws.setdefault(ws_id, {})[tstat] = cnt
# Dependency graph for open workstreams
open_ws_ids = [w.id for w in open_ws]
dep_rows = []
if open_ws_ids:
dep_result = await session.execute(
select(WorkstreamDependency).where(
(WorkstreamDependency.from_workstream_id.in_(open_ws_ids))
| (WorkstreamDependency.to_workstream_id.in_(open_ws_ids))
)
)
dep_rows = list(dep_result.scalars().all())
# Build a slug+title lookup for all workstreams referenced in deps
dep_ws_ids = set()
for d in dep_rows:
dep_ws_ids.add(d.from_workstream_id)
dep_ws_ids.add(d.to_workstream_id)
ws_lookup: dict = {w.id: w for w in open_ws}
extra_ids = dep_ws_ids - set(ws_lookup.keys())
if extra_ids:
extra_rows = await session.execute(
select(Workstream).where(Workstream.id.in_(extra_ids))
)
for w in extra_rows.scalars():
ws_lookup[w.id] = w
# Index: workstream_id → (depends_on stubs, blocks stubs)
dep_index: dict = {w.id: {"depends_on": [], "blocks": []} for w in open_ws}
for d in dep_rows:
from_id, to_id = d.from_workstream_id, d.to_workstream_id
if from_id in dep_index and to_id in ws_lookup:
dep_index[from_id]["depends_on"].append(WorkstreamDepStub(
dep_id=d.id,
workstream_id=to_id,
workstream_slug=ws_lookup[to_id].slug,
workstream_title=ws_lookup[to_id].title,
description=d.description,
))
if to_id in dep_index and from_id in ws_lookup:
dep_index[to_id]["blocks"].append(WorkstreamDepStub(
dep_id=d.id,
workstream_id=from_id,
workstream_slug=ws_lookup[from_id].slug,
workstream_title=ws_lookup[from_id].title,
description=d.description,
))
# Totals — one GROUP BY per table
topic_counts = {r[0]: r[1] for r in await session.execute(
select(Topic.status, func.count()).group_by(Topic.status)
@@ -115,6 +165,8 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm
),
)
next_steps = await _derive_next_steps(session)
return StateSummary(
generated_at=datetime.now(tz=timezone.utc),
totals=totals,
@@ -122,20 +174,149 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm
blocking_decisions=[DecisionRead.model_validate(d) for d in blocking],
blocked_tasks=[TaskRead.model_validate(t) for t in blocked],
recent_progress=[ProgressEventRead.model_validate(e) for e in recent],
next_steps=next_steps,
open_workstreams=[
WorkstreamWithTaskCounts(
WorkstreamWithDeps(
**WorkstreamRead.model_validate(w).model_dump(),
tasks_total=sum(task_per_ws.get(w.id, {}).values()),
tasks_todo=task_per_ws.get(w.id, {}).get(TaskStatus.todo, 0),
tasks_in_progress=task_per_ws.get(w.id, {}).get(TaskStatus.in_progress, 0),
tasks_blocked=task_per_ws.get(w.id, {}).get(TaskStatus.blocked, 0),
tasks_done=task_per_ws.get(w.id, {}).get(TaskStatus.done, 0),
depends_on=dep_index.get(w.id, {}).get("depends_on", []),
blocks=dep_index.get(w.id, {}).get("blocks", []),
)
for w in open_ws
],
)
_PRIORITY_RANK = {
TaskPriority.critical: 0,
TaskPriority.high: 1,
TaskPriority.medium: 2,
TaskPriority.low: 3,
}
async def _derive_next_steps(session: AsyncSession) -> list[NextStep]:
"""Derive contextual next-action suggestions from current hub state.
Two signal sources:
1. Recently resolved decisions (last 7 days) → first open task in same workstream
2. Workstreams whose every dependency is now completed → first todo task in that workstream
"""
steps: list[NextStep] = []
seen_task_ids: set = set()
# ── Signal 1: recently resolved decisions ────────────────────────────────
cutoff = datetime.now(tz=timezone.utc) - timedelta(days=7)
resolved_rows = await session.execute(
select(Decision)
.where(Decision.status == DecisionStatus.resolved)
.where(Decision.decided_at >= cutoff)
.where(Decision.workstream_id.isnot(None))
.order_by(Decision.decided_at.desc())
)
for decision in resolved_rows.scalars().all():
open_tasks_rows = await session.execute(
select(Task)
.where(Task.workstream_id == decision.workstream_id)
.where(Task.status.in_([TaskStatus.todo, TaskStatus.in_progress]))
)
open_tasks = list(open_tasks_rows.scalars().all())
if not open_tasks:
continue
task = min(open_tasks, key=lambda t: (_PRIORITY_RANK.get(t.priority, 99), t.created_at))
if task.id in seen_task_ids:
continue
ws = await session.get(Workstream, decision.workstream_id)
topic = await session.get(Topic, ws.topic_id) if ws else None
steps.append(NextStep(
type="resolved_decision",
domain=topic.domain if topic else None,
workstream_id=ws.id if ws else None,
workstream_title=ws.title if ws else None,
workstream_slug=ws.slug if ws else None,
task_id=task.id,
task_title=task.title,
message=(
f"Decision '{decision.title}' was resolved → "
f"'{task.title}' is the next open task in '{ws.title if ws else '?'}'"
),
))
seen_task_ids.add(task.id)
# ── Signal 2: cleared dependencies ──────────────────────────────────────
all_dep_rows = await session.execute(select(WorkstreamDependency))
all_deps = list(all_dep_rows.scalars().all())
# Group from_workstream_id → set of to_workstream_ids
dep_map: dict = {}
for d in all_deps:
dep_map.setdefault(d.from_workstream_id, set()).add(d.to_workstream_id)
for from_ws_id, to_ws_ids in dep_map.items():
# All targets must be completed
all_done = True
for to_id in to_ws_ids:
to_ws = await session.get(Workstream, to_id)
if to_ws is None or to_ws.status != WorkstreamStatus.completed:
all_done = False
break
if not all_done:
continue
from_ws = await session.get(Workstream, from_ws_id)
if from_ws is None or from_ws.status not in (WorkstreamStatus.active, WorkstreamStatus.blocked):
continue
todo_rows = await session.execute(
select(Task)
.where(Task.workstream_id == from_ws_id)
.where(Task.status == TaskStatus.todo)
)
todo_tasks = list(todo_rows.scalars().all())
if not todo_tasks:
continue
task = min(todo_tasks, key=lambda t: (_PRIORITY_RANK.get(t.priority, 99), t.created_at))
if task.id in seen_task_ids:
continue
topic = await session.get(Topic, from_ws.topic_id)
blocker_slugs = ", ".join(
(await session.get(Workstream, tid)).slug
for tid in to_ws_ids
if await session.get(Workstream, tid)
)
steps.append(NextStep(
type="dependency_cleared",
domain=topic.domain if topic else None,
workstream_id=from_ws.id,
workstream_title=from_ws.title,
workstream_slug=from_ws.slug,
task_id=task.id,
task_title=task.title,
message=(
f"All dependencies of '{from_ws.title}' are completed ({blocker_slugs}) → "
f"'{task.title}' is ready to start"
),
))
seen_task_ids.add(task.id)
return steps
@router.get("/next_steps", response_model=list[NextStep])
async def get_next_steps(session: AsyncSession = Depends(get_session)) -> list[NextStep]:
"""Derive contextual next-action suggestions from current hub state.
Returns suggestions based on:
- Recently resolved decisions → first open task in the same workstream
- Workstreams whose every dependency workstream is now completed → first todo task
"""
return await _derive_next_steps(session)
@router.get("/health")
async def health_check() -> dict:
try:

View File

@@ -0,0 +1,80 @@
import uuid
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from api.database import get_session
from api.models.workstream import Workstream
from api.models.workstream_dependency import WorkstreamDependency
from api.schemas.workstream_dependency import WorkstreamDependencyCreate, WorkstreamDependencyRead
router = APIRouter(prefix="/workstreams", tags=["dependencies"])
@router.post(
"/{workstream_id}/dependencies/",
response_model=WorkstreamDependencyRead,
status_code=status.HTTP_201_CREATED,
)
async def create_dependency(
workstream_id: uuid.UUID,
body: WorkstreamDependencyCreate,
session: AsyncSession = Depends(get_session),
) -> WorkstreamDependency:
"""Record that workstream_id depends on body.to_workstream_id."""
if await session.get(Workstream, workstream_id) is None:
raise HTTPException(status_code=404, detail="from workstream not found")
if await session.get(Workstream, body.to_workstream_id) is None:
raise HTTPException(status_code=404, detail="to workstream not found")
if workstream_id == body.to_workstream_id:
raise HTTPException(status_code=422, detail="a workstream cannot depend on itself")
dep = WorkstreamDependency(
from_workstream_id=workstream_id,
to_workstream_id=body.to_workstream_id,
description=body.description,
)
session.add(dep)
await session.commit()
await session.refresh(dep)
return dep
@router.get(
"/{workstream_id}/dependencies/",
response_model=list[WorkstreamDependencyRead],
)
async def list_dependencies(
workstream_id: uuid.UUID,
session: AsyncSession = Depends(get_session),
) -> list[WorkstreamDependency]:
"""Return all dependency edges touching this workstream (both directions)."""
if await session.get(Workstream, workstream_id) is None:
raise HTTPException(status_code=404, detail="workstream not found")
rows = await session.execute(
select(WorkstreamDependency).where(
(WorkstreamDependency.from_workstream_id == workstream_id)
| (WorkstreamDependency.to_workstream_id == workstream_id)
)
)
return list(rows.scalars().all())
@router.delete(
"/{workstream_id}/dependencies/{dep_id}",
status_code=status.HTTP_204_NO_CONTENT,
)
async def delete_dependency(
workstream_id: uuid.UUID,
dep_id: uuid.UUID,
session: AsyncSession = Depends(get_session),
) -> None:
"""Hard-delete a dependency edge. Removing a constraint is safe — no information is lost."""
dep = await session.get(WorkstreamDependency, dep_id)
if dep is None:
raise HTTPException(status_code=404, detail="dependency not found")
if dep.from_workstream_id != workstream_id:
raise HTTPException(status_code=403, detail="dependency does not belong to this workstream")
await session.delete(dep)
await session.commit()

View File

@@ -1,3 +1,4 @@
import uuid
from datetime import datetime
from pydantic import BaseModel
@@ -6,7 +7,7 @@ from api.schemas.decision import DecisionRead
from api.schemas.progress_event import ProgressEventRead
from api.schemas.task import TaskRead
from api.schemas.topic import TopicWithWorkstreams
from api.schemas.workstream import WorkstreamWithTaskCounts
from api.schemas.workstream import WorkstreamWithDeps
class TopicTotals(BaseModel):
@@ -48,6 +49,23 @@ class Totals(BaseModel):
decisions: DecisionTotals
class NextStep(BaseModel):
"""A derived suggestion pointing to where work should happen next.
Suggestions are never persisted — they are computed on demand from
current hub state: recently resolved decisions, newly unblocked tasks,
cleared dependencies.
"""
type: str # unblocked_task | resolved_decision | dependency_cleared
domain: str | None = None
workstream_id: uuid.UUID | None = None
workstream_title: str | None = None
workstream_slug: str | None = None
task_id: uuid.UUID | None = None
task_title: str | None = None
message: str # plain-language explanation
class StateSummary(BaseModel):
generated_at: datetime
totals: Totals
@@ -55,4 +73,5 @@ class StateSummary(BaseModel):
blocking_decisions: list[DecisionRead]
blocked_tasks: list[TaskRead]
recent_progress: list[ProgressEventRead]
open_workstreams: list[WorkstreamWithTaskCounts]
open_workstreams: list[WorkstreamWithDeps]
next_steps: list[NextStep] = []

View File

@@ -4,6 +4,7 @@ from datetime import date, datetime
from pydantic import BaseModel, ConfigDict
from api.models.workstream import WorkstreamStatus
from api.schemas.workstream_dependency import WorkstreamDepStub
class WorkstreamCreate(BaseModel):
@@ -44,3 +45,9 @@ class WorkstreamWithTaskCounts(WorkstreamRead):
tasks_in_progress: int = 0
tasks_blocked: int = 0
tasks_done: int = 0
class WorkstreamWithDeps(WorkstreamWithTaskCounts):
"""WorkstreamWithTaskCounts enriched with dependency graph edges."""
depends_on: list[WorkstreamDepStub] = []
blocks: list[WorkstreamDepStub] = []

View File

@@ -0,0 +1,28 @@
import uuid
from datetime import datetime
from pydantic import BaseModel, ConfigDict
class WorkstreamDependencyCreate(BaseModel):
to_workstream_id: uuid.UUID
description: str | None = None
class WorkstreamDependencyRead(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: uuid.UUID
from_workstream_id: uuid.UUID
to_workstream_id: uuid.UUID
description: str | None = None
created_at: datetime
updated_at: datetime
class WorkstreamDepStub(BaseModel):
"""Minimal projection of the other end of a dependency edge."""
dep_id: uuid.UUID
workstream_id: uuid.UUID
workstream_slug: str
workstream_title: str
description: str | None = None