Task flow engine implementation

This commit is contained in:
2026-05-02 00:21:14 +02:00
parent e12a26109f
commit 95bcc5c83c
19 changed files with 716 additions and 91 deletions

View File

@@ -0,0 +1,92 @@
---
id: CUST-TFE-SCOPE-2026-000001
type: architecture-note
title: "Task Flow Engine Extraction Scope v0.1"
status: draft
owners: ["Bernd", "Custodian"]
created: "2026-05-01"
updated: "2026-05-01"
scope:
domains: ["Custodian"]
sensitivity: internal
tags: ["task-flow-engine", "architecture", "state-hub", "workflow"]
domain: custodian
provenance:
workplan: CUST-WP-0035
task: CUST-WP-0035-T06
---
# Task Flow Engine Extraction Scope v0.1
## Purpose
The task flow engine is currently co-located in `state-hub/` so it can replace
hardcoded lifecycle logic where the need is immediate. Its core model is more
general than State Hub, so it should become a standalone Python package once
the API has stabilized through real use.
## Standalone Package Boundary
The future `task-flow-engine` package should contain only pure computation:
- `models.py`: dataclasses for assertions, workstations, flows, and results
- `evaluator.py`: target path resolution and assertion evaluation
- `engine.py`: reachable workstation and exit-blocking derivation
- `builtins.py`: built-in operations such as `all_eq`, `any_eq`, `none_eq`,
`exists`, and `count_gte`
The package must not depend on State Hub, SQLAlchemy, FastAPI, MCP, Custodian
canon files, or any specific database schema.
## State Hub Integration Boundary
State Hub should retain the domain-specific integration layer:
- YAML flow definitions in `state-hub/flows/`
- conversion from ORM entities into plain information-object dictionaries
- Alembic migrations and status-column storage choices
- API routers and MCP tools
- custom assertion callables that query State Hub data
- progress events, timestamps, notifications, and other side effects
This keeps the reusable engine small while allowing State Hub to remain the
place where Custodian-specific lifecycle semantics are declared and exposed.
## Extraction Path
1. Keep `state-hub/task_flow_engine/` in-tree until at least one non-trivial
flow definition runs in normal State Hub use.
2. Stabilize the dataclass and result shapes around real consumers:
State Hub API, MCP tools, and repo-facing workflows.
3. Extract the pure package into a new `task-flow-engine` repository.
4. Publish it as an internal pip package.
5. Replace the in-tree package with a dependency import in State Hub.
The extraction should preserve the current import surface where practical:
`FlowDef`, `WorkstationDef`, `AssertionDef`, `FlowResult`, `AssertionResult`,
`FlowEngine`, and `resolve_target`.
## Managed Repo Concept
When extraction starts, register a managed repository concept:
- slug: `task-flow-engine`
- domain: `custodian`
- purpose: reusable declarative workstation/assertion engine
- primary capability: `workflow.evaluate`
- secondary capabilities: `workflow.define`, `workflow.explain`
## Extension Point
An extension point is registered in State Hub to keep this extraction visible:
- type: `architecture`
- title: `task-flow-engine extraction as standalone package`
- status: `open`
- priority: `low`
Description:
`task_flow_engine/` is currently co-located in the State Hub. Extract it to its
own repository and pip package once the API is stable after at least one
non-trivial flow definition has been running in production.

View File

@@ -0,0 +1,85 @@
from __future__ import annotations
from functools import lru_cache
from pathlib import Path
from typing import Any
import yaml
from task_flow_engine import AssertionDef, AssertionResult, FlowDef, FlowEngine, FlowResult
FLOW_DIR = Path(__file__).resolve().parents[1] / "flows"
@lru_cache
def load_flow(entity_type: str) -> FlowDef:
path = FLOW_DIR / f"{entity_type}.yaml"
data = yaml.safe_load(path.read_text(encoding="utf-8"))
return FlowDef.from_dict(data)
def evaluate_transition(
entity_type: str,
current_workstation: str,
target_workstation: str,
extra: dict[str, Any] | None = None,
) -> tuple[bool, list[AssertionResult], FlowResult]:
flow = load_flow(entity_type)
obj = {
"status": current_workstation,
"workstation": current_workstation,
"previous_workstation": current_workstation,
**(extra or {}),
}
engine = create_flow_engine()
result = engine.evaluate(obj, flow)
can_reach, failures = engine.can_reach(obj, flow, target_workstation)
return can_reach, failures, result
def create_flow_engine() -> FlowEngine:
return FlowEngine(
custom_ops={
"dependencies.any_incomplete": _dependencies_any_incomplete,
}
)
def _dependencies_any_incomplete(
assertion: AssertionDef,
obj: dict[str, Any],
values: list[Any],
) -> bool:
return bool(values) and any(value != assertion.value for value in values)
def assertion_result_to_dict(result: AssertionResult) -> dict[str, Any]:
return {
"id": result.id,
"passed": result.passed,
"target": result.target,
"op": result.op,
"expected": result.expected,
"actual": result.actual,
"description": result.description,
"reason": result.reason,
}
def flow_result_to_dict(result: FlowResult) -> dict[str, Any]:
return {
"current_workstation": result.current_workstation,
"exit_blocked": result.exit_blocked,
"blocking_assertions": [
assertion_result_to_dict(item) for item in result.blocking_assertions
],
"reachable": result.reachable,
"unreachable": [
{
"workstation": item.workstation,
"blocking": assertion_result_to_dict(item.blocking),
}
for item in result.unreachable
],
}

View File

@@ -9,6 +9,7 @@ from api.routers import decisions, extension_points, progress, state, tasks, tec
from api.routers import domains, repos, contributions, sbom, policy, domain_goals, repo_goals, messages, capability_requests, tpsc from api.routers import domains, repos, contributions, sbom, policy, domain_goals, repo_goals, messages, capability_requests, tpsc
from api.routers import token_events from api.routers import token_events
from api.routers import interface_changes from api.routers import interface_changes
from api.routers import flows
@asynccontextmanager @asynccontextmanager
@@ -53,6 +54,7 @@ app.include_router(capability_requests.router)
app.include_router(tpsc.router) app.include_router(tpsc.router)
app.include_router(token_events.router) app.include_router(token_events.router)
app.include_router(interface_changes.router) app.include_router(interface_changes.router)
app.include_router(flows.router)
app.include_router(state.router) app.include_router(state.router)
app.include_router(policy.router) app.include_router(policy.router)

View File

@@ -4,7 +4,7 @@ from api.models.domain_goal import DomainGoal, DomainGoalStatus
from api.models.topic import Topic, TopicStatus from api.models.topic import Topic, TopicStatus
from api.models.managed_repo import ManagedRepo from api.models.managed_repo import ManagedRepo
from api.models.repo_goal import RepoGoal, RepoGoalStatus from api.models.repo_goal import RepoGoal, RepoGoalStatus
from api.models.workstream import Workstream, WorkstreamStatus from api.models.workstream import Workstream
from api.models.workstream_dependency import WorkstreamDependency from api.models.workstream_dependency import WorkstreamDependency
from api.models.task import Task, TaskStatus, TaskPriority from api.models.task import Task, TaskStatus, TaskPriority
from api.models.decision import Decision, DecisionType, DecisionStatus from api.models.decision import Decision, DecisionType, DecisionStatus
@@ -29,7 +29,7 @@ __all__ = [
"Topic", "TopicStatus", "Topic", "TopicStatus",
"ManagedRepo", "ManagedRepo",
"RepoGoal", "RepoGoalStatus", "RepoGoal", "RepoGoalStatus",
"Workstream", "WorkstreamStatus", "Workstream",
"WorkstreamDependency", "WorkstreamDependency",
"Task", "TaskStatus", "TaskPriority", "Task", "TaskStatus", "TaskPriority",
"Decision", "DecisionType", "DecisionStatus", "Decision", "DecisionType", "DecisionStatus",

View File

@@ -1,21 +1,13 @@
import enum
import uuid import uuid
from datetime import date from datetime import date
from sqlalchemy import Date, Enum, ForeignKey, String, Text from sqlalchemy import Date, ForeignKey, String, Text
from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy.orm import Mapped, mapped_column, relationship
from api.models.base import Base, TimestampMixin, new_uuid from api.models.base import Base, TimestampMixin, new_uuid
class WorkstreamStatus(str, enum.Enum):
active = "active"
blocked = "blocked"
completed = "completed"
archived = "archived"
class Workstream(Base, TimestampMixin): class Workstream(Base, TimestampMixin):
__tablename__ = "workstreams" __tablename__ = "workstreams"
@@ -28,8 +20,8 @@ class Workstream(Base, TimestampMixin):
slug: Mapped[str] = mapped_column(String(100), unique=True, nullable=False, index=True) slug: Mapped[str] = mapped_column(String(100), unique=True, nullable=False, index=True)
title: Mapped[str] = mapped_column(String(255), nullable=False) title: Mapped[str] = mapped_column(String(255), nullable=False)
description: Mapped[str | None] = mapped_column(Text, nullable=True) description: Mapped[str | None] = mapped_column(Text, nullable=True)
status: Mapped[WorkstreamStatus] = mapped_column( status: Mapped[str] = mapped_column(
Enum(WorkstreamStatus), nullable=False, default=WorkstreamStatus.active String(20), nullable=False, default="active", server_default="active"
) )
owner: Mapped[str | None] = mapped_column(String(100), nullable=True) owner: Mapped[str | None] = mapped_column(String(100), nullable=True)
due_date: Mapped[date | None] = mapped_column(Date, nullable=True) due_date: Mapped[date | None] = mapped_column(Date, nullable=True)

View File

@@ -7,6 +7,7 @@ from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from api.database import get_session from api.database import get_session
from api.flow_defs import assertion_result_to_dict, evaluate_transition, flow_result_to_dict
from api.models.agent_message import AgentMessage from api.models.agent_message import AgentMessage
from api.models.capability_catalog import CapabilityCatalog from api.models.capability_catalog import CapabilityCatalog
from api.models.capability_request import CapabilityRequest from api.models.capability_request import CapabilityRequest
@@ -28,22 +29,6 @@ from api.schemas.capability_request import (
router = APIRouter(tags=["capability-requests"]) router = APIRouter(tags=["capability-requests"])
# ---------------------------------------------------------------------------
# Lifecycle guard
# ---------------------------------------------------------------------------
_VALID_TRANSITIONS: dict[str, set[str]] = {
"requested": {"accepted", "rejected", "withdrawn", "routing_disputed"},
"routing_disputed": {"requested", "withdrawn"},
"accepted": {"in_progress", "rejected", "withdrawn"},
"in_progress": {"ready_for_review", "rejected", "withdrawn"},
"ready_for_review": {"completed", "in_progress", "withdrawn"},
"completed": set(),
"rejected": set(),
"withdrawn": set(),
}
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Capability Catalog endpoints # Capability Catalog endpoints
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -602,12 +587,21 @@ async def _get_request_or_404(request_id: uuid.UUID, session: AsyncSession) -> C
def _check_transition(current: str, target: str) -> None: def _check_transition(current: str, target: str) -> None:
allowed = _VALID_TRANSITIONS.get(current, set()) can_reach, failures, flow_result = evaluate_transition(
if target not in allowed: "capability_request",
current,
target,
)
if not can_reach:
raise HTTPException( raise HTTPException(
status_code=422, status_code=422,
detail=( detail={
f"Cannot transition from '{current}' to '{target}'. " "message": f"Cannot transition from '{current}' to '{target}'.",
f"Allowed: {sorted(allowed) or 'none (terminal state)'}" "current_workstation": current,
), "target_workstation": target,
"blocking_assertions": [
assertion_result_to_dict(item) for item in failures
],
"flow_result": flow_result_to_dict(flow_result),
},
) )

View File

@@ -6,37 +6,12 @@ from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from api.database import get_session from api.database import get_session
from api.flow_defs import assertion_result_to_dict, evaluate_transition, flow_result_to_dict
from api.models.contribution import Contribution, ContributionStatus, ContributionType from api.models.contribution import Contribution, ContributionStatus, ContributionType
from api.schemas.contribution import ContributionCreate, ContributionRead, ContributionStatusPatch from api.schemas.contribution import ContributionCreate, ContributionRead, ContributionStatusPatch
router = APIRouter(prefix="/contributions", tags=["contributions"]) router = APIRouter(prefix="/contributions", tags=["contributions"])
# Valid forward transitions in the lifecycle
_VALID_TRANSITIONS: dict[ContributionStatus, set[ContributionStatus]] = {
ContributionStatus.draft: {
ContributionStatus.submitted,
ContributionStatus.withdrawn,
},
ContributionStatus.submitted: {
ContributionStatus.acknowledged,
ContributionStatus.rejected,
ContributionStatus.withdrawn,
},
ContributionStatus.acknowledged: {
ContributionStatus.accepted,
ContributionStatus.rejected,
ContributionStatus.withdrawn,
},
ContributionStatus.accepted: {
ContributionStatus.merged,
ContributionStatus.withdrawn,
},
ContributionStatus.rejected: set(),
ContributionStatus.merged: set(),
ContributionStatus.withdrawn: set(),
}
@router.get("/", response_model=list[ContributionRead]) @router.get("/", response_model=list[ContributionRead])
async def list_contributions( async def list_contributions(
type: ContributionType | None = Query(None), type: ContributionType | None = Query(None),
@@ -93,14 +68,25 @@ async def patch_contribution_status(
session: AsyncSession = Depends(get_session), session: AsyncSession = Depends(get_session),
) -> Contribution: ) -> Contribution:
contrib = await _get_or_404(contribution_id, session) contrib = await _get_or_404(contribution_id, session)
allowed = _VALID_TRANSITIONS.get(contrib.status, set()) current = _status_value(contrib.status)
if body.status not in allowed: target = _status_value(body.status)
can_reach, failures, flow_result = evaluate_transition(
"contribution",
current,
target,
)
if not can_reach:
raise HTTPException( raise HTTPException(
status_code=422, status_code=422,
detail=( detail={
f"Cannot transition from '{contrib.status}' to '{body.status}'. " "message": f"Cannot transition from '{current}' to '{target}'.",
f"Allowed: {[s.value for s in allowed] or 'none (terminal state)'}" "current_workstation": current,
), "target_workstation": target,
"blocking_assertions": [
assertion_result_to_dict(item) for item in failures
],
"flow_result": flow_result_to_dict(flow_result),
},
) )
contrib.status = body.status contrib.status = body.status
if body.notes: if body.notes:
@@ -145,3 +131,7 @@ async def _get_or_404(contribution_id: uuid.UUID, session: AsyncSession) -> Cont
if contrib is None: if contrib is None:
raise HTTPException(status_code=404, detail=f"Contribution '{contribution_id}' not found") raise HTTPException(status_code=404, detail=f"Contribution '{contribution_id}' not found")
return contrib return contrib
def _status_value(status: ContributionStatus | str) -> str:
return status.value if isinstance(status, ContributionStatus) else str(status)

View File

@@ -10,7 +10,7 @@ from api.models.extension_point import ExtensionPoint
from api.models.managed_repo import ManagedRepo from api.models.managed_repo import ManagedRepo
from api.models.technical_debt import TechnicalDebt from api.models.technical_debt import TechnicalDebt
from api.models.topic import Topic from api.models.topic import Topic
from api.models.workstream import Workstream, WorkstreamStatus from api.models.workstream import Workstream
from api.schemas.domain import DomainCreate, DomainDetail, DomainRead, DomainRename, DomainUpdate, RepoStub from api.schemas.domain import DomainCreate, DomainDetail, DomainRead, DomainRename, DomainUpdate, RepoStub
router = APIRouter(prefix="/domains", tags=["domains"]) router = APIRouter(prefix="/domains", tags=["domains"])
@@ -69,7 +69,7 @@ async def get_domain(
ws_count_row = await session.execute( ws_count_row = await session.execute(
select(func.count()).select_from(Workstream) select(func.count()).select_from(Workstream)
.where(Workstream.topic_id.in_(topic_ids)) .where(Workstream.topic_id.in_(topic_ids))
.where(Workstream.status == WorkstreamStatus.active) .where(Workstream.status == "active")
) )
ws_count = ws_count_row.scalar_one() ws_count = ws_count_row.scalar_one()

View File

@@ -0,0 +1,167 @@
from __future__ import annotations
import uuid
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from api.database import get_session
from api.flow_defs import (
assertion_result_to_dict,
create_flow_engine,
flow_result_to_dict,
load_flow,
)
from api.models.capability_request import CapabilityRequest
from api.models.contribution import Contribution
from api.models.task import Task
from api.models.workstream import Workstream
from api.models.workstream_dependency import WorkstreamDependency
router = APIRouter(prefix="/flows", tags=["flows"])
@router.get("/definitions")
async def list_flow_definitions() -> list[dict[str, Any]]:
flows = [
load_flow(entity_type)
for entity_type in (
"workstream",
"task",
"contribution",
"capability_request",
)
]
return [
{
"id": flow.id,
"entity_type": flow.entity_type,
"workstations": [
{
"name": workstation.name,
"description": workstation.description,
"entry_assertion_count": len(workstation.entry_assertions),
"exit_assertion_count": len(workstation.exit_assertions),
}
for workstation in flow.workstations
],
}
for flow in flows
]
@router.get("/{entity_type}/{entity_id}")
async def get_flow_state(
entity_type: str,
entity_id: uuid.UUID,
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
obj = await _flow_object(entity_type, entity_id, session)
flow = load_flow(entity_type)
result = create_flow_engine().evaluate(obj, flow)
return flow_result_to_dict(result)
@router.post("/{entity_type}/{entity_id}/advance/{target_workstation}")
async def advance_workstation(
entity_type: str,
entity_id: uuid.UUID,
target_workstation: str,
session: AsyncSession = Depends(get_session),
) -> dict[str, Any]:
obj = await _flow_object(entity_type, entity_id, session)
flow = load_flow(entity_type)
engine = create_flow_engine()
can_reach, failures = engine.can_reach(obj, flow, target_workstation)
if not can_reach:
raise HTTPException(
status_code=409,
detail={
"message": (
f"Cannot advance {entity_type} '{entity_id}' "
f"to '{target_workstation}'."
),
"blocking_assertions": [
assertion_result_to_dict(item) for item in failures
],
"flow_result": flow_result_to_dict(engine.evaluate(obj, flow)),
},
)
entity = await _entity(entity_type, entity_id, session)
entity.status = target_workstation
await session.commit()
await session.refresh(entity)
return await get_flow_state(entity_type, entity_id, session)
async def _flow_object(
entity_type: str,
entity_id: uuid.UUID,
session: AsyncSession,
) -> dict[str, Any]:
entity = await _entity(entity_type, entity_id, session)
status = _value(entity.status)
obj: dict[str, Any] = {
"id": str(entity.id),
"status": status,
"workstation": status,
"previous_workstation": status,
}
if entity_type == "workstream":
tasks = list((await session.execute(
select(Task).where(Task.workstream_id == entity_id)
)).scalars().all())
deps = list((await session.execute(
select(WorkstreamDependency).where(
WorkstreamDependency.from_workstream_id == entity_id
)
)).scalars().all())
dependency_ids = [dep.to_workstream_id for dep in deps]
dependency_workstations: list[dict[str, Any]] = []
if dependency_ids:
dep_ws = list((await session.execute(
select(Workstream).where(Workstream.id.in_(dependency_ids))
)).scalars().all())
dependency_workstations = [
{"id": str(ws.id), "workstation": ws.status}
for ws in dep_ws
]
obj.update({
"tasks": [{"id": str(task.id), "status": _value(task.status)} for task in tasks],
"dependencies": dependency_workstations,
})
elif entity_type == "task":
obj.update({
"needs_human": entity.needs_human,
"blocking_reason": entity.blocking_reason,
})
return obj
async def _entity(
entity_type: str,
entity_id: uuid.UUID,
session: AsyncSession,
):
model_by_type = {
"workstream": Workstream,
"task": Task,
"contribution": Contribution,
"capability_request": CapabilityRequest,
}
model = model_by_type.get(entity_type)
if model is None:
raise HTTPException(status_code=404, detail=f"Unknown flow entity type '{entity_type}'")
entity = await session.get(model, entity_id)
if entity is None:
raise HTTPException(status_code=404, detail=f"{entity_type} '{entity_id}' not found")
return entity
def _value(item):
return item.value if hasattr(item, "value") else item

View File

@@ -6,6 +6,7 @@ from sqlalchemy import func, select, text
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from api.database import get_session, engine from api.database import get_session, engine
from api.flow_defs import assertion_result_to_dict, load_flow
from api.models.capability_request import CapabilityRequest from api.models.capability_request import CapabilityRequest
from api.models.contribution import Contribution, ContributionStatus, ContributionType from api.models.contribution import Contribution, ContributionStatus, ContributionType
from api.models.decision import Decision, DecisionStatus, DecisionType from api.models.decision import Decision, DecisionStatus, DecisionType
@@ -17,7 +18,7 @@ from api.models.sbom_entry import SBOMEntry
from api.models.task import Task, TaskPriority, TaskStatus from api.models.task import Task, TaskPriority, TaskStatus
from api.models.technical_debt import TechnicalDebt from api.models.technical_debt import TechnicalDebt
from api.models.topic import Topic, TopicStatus from api.models.topic import Topic, TopicStatus
from api.models.workstream import Workstream, WorkstreamStatus from api.models.workstream import Workstream
from api.models.workstream_dependency import WorkstreamDependency from api.models.workstream_dependency import WorkstreamDependency
from api.schemas.decision import DecisionRead from api.schemas.decision import DecisionRead
from api.schemas.domain import DomainSummary from api.schemas.domain import DomainSummary
@@ -35,6 +36,7 @@ from api.schemas.task import TaskRead
from api.schemas.topic import TopicWithWorkstreams from api.schemas.topic import TopicWithWorkstreams
from api.schemas.workstream import WorkstreamRead, WorkstreamWithTaskCounts, WorkstreamWithDeps from api.schemas.workstream import WorkstreamRead, WorkstreamWithTaskCounts, WorkstreamWithDeps
from api.schemas.workstream_dependency import WorkstreamDepStub from api.schemas.workstream_dependency import WorkstreamDepStub
from task_flow_engine import FlowEngine
router = APIRouter(prefix="/state", tags=["state"]) router = APIRouter(prefix="/state", tags=["state"])
@@ -69,7 +71,7 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm
open_ws_rows = await session.execute( open_ws_rows = await session.execute(
select(Workstream) select(Workstream)
.where(Workstream.status.in_([WorkstreamStatus.active, WorkstreamStatus.blocked])) .where(Workstream.status.in_(["active", "blocked"]))
.order_by(Workstream.due_date.asc().nullslast(), Workstream.created_at) .order_by(Workstream.due_date.asc().nullslast(), Workstream.created_at)
) )
open_ws = list(open_ws_rows.scalars().all()) open_ws = list(open_ws_rows.scalars().all())
@@ -128,6 +130,27 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm
description=d.description, description=d.description,
)) ))
workstream_flow = load_flow("workstream")
flow_engine = FlowEngine()
effective_status: dict = {}
blocked_reasons: dict = {}
for w in open_ws:
flow_obj = {
"status": w.status,
"workstation": w.status,
"tasks": [{"status": _value(t.status)} for t in w.tasks],
"dependencies": [
{"workstation": ws_lookup[d.to_workstream_id].status}
for d in dep_rows
if d.from_workstream_id == w.id and d.to_workstream_id in ws_lookup
],
}
flow_result = flow_engine.evaluate(flow_obj, workstream_flow)
effective_status[w.id] = "blocked" if flow_result.exit_blocked else w.status
blocked_reasons[w.id] = [
assertion_result_to_dict(item) for item in flow_result.blocking_assertions
]
# Totals — one GROUP BY per table # Totals — one GROUP BY per table
topic_counts = {r[0]: r[1] for r in await session.execute( topic_counts = {r[0]: r[1] for r in await session.execute(
select(Topic.status, func.count()).group_by(Topic.status) select(Topic.status, func.count()).group_by(Topic.status)
@@ -150,10 +173,10 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm
total=sum(topic_counts.values()), total=sum(topic_counts.values()),
), ),
workstreams=WorkstreamTotals( workstreams=WorkstreamTotals(
active=ws_counts.get(WorkstreamStatus.active, 0), active=sum(1 for status in effective_status.values() if status == "active"),
blocked=ws_counts.get(WorkstreamStatus.blocked, 0), blocked=sum(1 for status in effective_status.values() if status == "blocked"),
completed=ws_counts.get(WorkstreamStatus.completed, 0), completed=ws_counts.get("completed", 0),
archived=ws_counts.get(WorkstreamStatus.archived, 0), archived=ws_counts.get("archived", 0),
total=sum(ws_counts.values()), total=sum(ws_counts.values()),
), ),
tasks=TaskTotals( tasks=TaskTotals(
@@ -226,7 +249,10 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm
open_capability_requests=open_cap_req_count, open_capability_requests=open_cap_req_count,
open_workstreams=[ open_workstreams=[
WorkstreamWithDeps( WorkstreamWithDeps(
**WorkstreamRead.model_validate(w).model_dump(), **{
**WorkstreamRead.model_validate(w).model_dump(),
"status": effective_status.get(w.id, w.status),
},
tasks_total=sum(task_per_ws.get(w.id, {}).values()), tasks_total=sum(task_per_ws.get(w.id, {}).values()),
tasks_todo=task_per_ws.get(w.id, {}).get(TaskStatus.todo, 0), tasks_todo=task_per_ws.get(w.id, {}).get(TaskStatus.todo, 0),
tasks_in_progress=task_per_ws.get(w.id, {}).get(TaskStatus.in_progress, 0), tasks_in_progress=task_per_ws.get(w.id, {}).get(TaskStatus.in_progress, 0),
@@ -234,6 +260,7 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm
tasks_done=task_per_ws.get(w.id, {}).get(TaskStatus.done, 0), tasks_done=task_per_ws.get(w.id, {}).get(TaskStatus.done, 0),
depends_on=dep_index.get(w.id, {}).get("depends_on", []), depends_on=dep_index.get(w.id, {}).get("depends_on", []),
blocks=dep_index.get(w.id, {}).get("blocks", []), blocks=dep_index.get(w.id, {}).get("blocks", []),
blocked_reasons=blocked_reasons.get(w.id, []),
) )
for w in open_ws for w in open_ws
], ],
@@ -259,7 +286,7 @@ async def _build_domain_summaries(session: AsyncSession) -> list[DomainSummary]:
for domain_id, cnt in await session.execute( for domain_id, cnt in await session.execute(
select(Topic.domain_id, func.count(Workstream.id)) select(Topic.domain_id, func.count(Workstream.id))
.join(Workstream, Workstream.topic_id == Topic.id) .join(Workstream, Workstream.topic_id == Topic.id)
.where(Workstream.status == WorkstreamStatus.active) .where(Workstream.status == "active")
.group_by(Topic.domain_id) .group_by(Topic.domain_id)
): ):
ws_per_domain[domain_id] = cnt ws_per_domain[domain_id] = cnt
@@ -357,14 +384,14 @@ async def _derive_next_steps(session: AsyncSession) -> list[NextStep]:
all_done = True all_done = True
for to_id in to_ws_ids: for to_id in to_ws_ids:
to_ws = await session.get(Workstream, to_id) to_ws = await session.get(Workstream, to_id)
if to_ws is None or to_ws.status != WorkstreamStatus.completed: if to_ws is None or to_ws.status != "completed":
all_done = False all_done = False
break break
if not all_done: if not all_done:
continue continue
from_ws = await session.get(Workstream, from_ws_id) from_ws = await session.get(Workstream, from_ws_id)
if from_ws is None or from_ws.status not in (WorkstreamStatus.active, WorkstreamStatus.blocked): if from_ws is None or from_ws.status not in ("active", "blocked"):
continue continue
todo_rows = await session.execute( todo_rows = await session.execute(
@@ -414,6 +441,10 @@ async def _get_domain_slug_for_workstream(ws: Workstream | None, session: AsyncS
return domain.slug if domain else None return domain.slug if domain else None
def _value(item):
return item.value if hasattr(item, "value") else item
@router.get("/next_steps", response_model=list[NextStep]) @router.get("/next_steps", response_model=list[NextStep])
async def get_next_steps(session: AsyncSession = Depends(get_session)) -> list[NextStep]: async def get_next_steps(session: AsyncSession = Depends(get_session)) -> list[NextStep]:
"""Derive contextual next-action suggestions from current hub state. """Derive contextual next-action suggestions from current hub state.

View File

@@ -5,8 +5,13 @@ from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from api.database import get_session from api.database import get_session
from api.models.workstream import Workstream, WorkstreamStatus from api.models.workstream import Workstream
from api.schemas.workstream import WorkstreamCreate, WorkstreamRead, WorkstreamUpdate from api.schemas.workstream import (
WorkstreamCreate,
WorkstreamRead,
WorkstreamStatus,
WorkstreamUpdate,
)
router = APIRouter(prefix="/workstreams", tags=["workstreams"]) router = APIRouter(prefix="/workstreams", tags=["workstreams"])
@@ -86,7 +91,7 @@ async def archive_workstream(
ws = await session.get(Workstream, workstream_id) ws = await session.get(Workstream, workstream_id)
if ws is None: if ws is None:
raise HTTPException(status_code=404, detail="Workstream not found") raise HTTPException(status_code=404, detail="Workstream not found")
ws.status = WorkstreamStatus.archived ws.status = "archived"
await session.commit() await session.commit()
await session.refresh(ws) await session.refresh(ws)
return ws return ws

View File

@@ -1,18 +1,20 @@
import uuid import uuid
from datetime import date, datetime from datetime import date, datetime
from typing import Literal
from pydantic import BaseModel, ConfigDict from pydantic import BaseModel, ConfigDict
from api.models.workstream import WorkstreamStatus
from api.schemas.workstream_dependency import WorkstreamDepStub from api.schemas.workstream_dependency import WorkstreamDepStub
WorkstreamStatus = Literal["todo", "active", "blocked", "completed", "archived"]
class WorkstreamCreate(BaseModel): class WorkstreamCreate(BaseModel):
topic_id: uuid.UUID topic_id: uuid.UUID
slug: str slug: str
title: str title: str
description: str | None = None description: str | None = None
status: WorkstreamStatus = WorkstreamStatus.active status: WorkstreamStatus = "active"
owner: str | None = None owner: str | None = None
due_date: date | None = None due_date: date | None = None
repo_id: uuid.UUID | None = None # GEMS primary: the owning repository repo_id: uuid.UUID | None = None # GEMS primary: the owning repository
@@ -57,3 +59,4 @@ class WorkstreamWithDeps(WorkstreamWithTaskCounts):
"""WorkstreamWithTaskCounts enriched with dependency graph edges.""" """WorkstreamWithTaskCounts enriched with dependency graph edges."""
depends_on: list[WorkstreamDepStub] = [] depends_on: list[WorkstreamDepStub] = []
blocks: list[WorkstreamDepStub] = [] blocks: list[WorkstreamDepStub] = []
blocked_reasons: list[dict] = []

View File

@@ -3,7 +3,13 @@ entity_type: capability_request
workstations: workstations:
- name: requested - name: requested
description: Capability has been requested and awaits routing or acceptance. description: Capability has been requested and awaits routing or acceptance.
entry_assertions: [] entry_assertions:
- id: capability_request.from_routing_disputed
target: previous_workstation
op: any_eq
value:
- routing_disputed
description: Rerouting returns a disputed request to requested.
exit_assertions: [] exit_assertions: []
- name: routing_disputed - name: routing_disputed
description: Routing decision has been disputed. description: Routing decision has been disputed.

View File

@@ -173,7 +173,11 @@ def get_domain_summary(domain_slug: str) -> str:
topic_id = topic["id"] topic_id = topic["id"]
workstreams = _get("/workstreams", {"topic_id": topic_id, "status": "active"}) state_summary = _get("/state/summary")
workstreams = [
ws for ws in state_summary.get("open_workstreams", [])
if ws.get("topic_id") == topic_id
]
blocking = _get("/decisions", {"decision_type": "pending", "topic_id": topic_id}) blocking = _get("/decisions", {"decision_type": "pending", "topic_id": topic_id})
recent = _get("/progress", {"topic_id": topic_id, "limit": 5}) recent = _get("/progress", {"topic_id": topic_id, "limit": 5})
repos = _get("/repos", {"domain": domain_slug}) repos = _get("/repos", {"domain": domain_slug})
@@ -348,6 +352,60 @@ def get_recent_progress(limit: int = 20, since: str | None = None) -> str:
return json.dumps(_get("/progress", {"limit": limit, "since": since}), indent=2) return json.dumps(_get("/progress", {"limit": limit, "since": since}), indent=2)
@mcp.tool()
def list_flow_definitions() -> str:
"""List registered declarative flow definitions.
Returns each entity type, its workstations, and entry/exit assertion counts.
Use this for orientation before calling get_flow_state or advance_workstation.
"""
return json.dumps(_get("/flows/definitions"), indent=2)
@mcp.tool()
def get_flow_state(entity_type: str, entity_id: str) -> str:
"""Return the declarative flow state for one entity.
Args:
entity_type: workstream | task | contribution | capability_request
entity_id: UUID of the entity
Returns current workstation, exit-blocking assertions, reachable
workstations, and unreachable workstations with the first blocking
assertion for each.
"""
return json.dumps(_get(f"/flows/{entity_type}/{entity_id}"), indent=2)
@mcp.tool()
def advance_workstation(entity_type: str, entity_id: str, target_workstation: str) -> str:
"""Attempt to move an entity to a target workstation.
Args:
entity_type: workstream | task | contribution | capability_request
entity_id: UUID of the entity
target_workstation: desired workstation/status name
Returns the new FlowResult on success. If the target is unreachable, the
response contains a 409-equivalent error with machine-readable failing
assertions.
"""
result = _post(f"/flows/{entity_type}/{entity_id}/advance/{target_workstation}", {})
if not isinstance(result, dict) or "error" not in result:
_post("/progress", {
"event_type": "workstation_advanced",
"summary": f"{entity_type} {entity_id} advanced to {target_workstation}",
"author": "custodian",
"detail": {
"entity_type": entity_type,
"entity_id": entity_id,
"target_workstation": target_workstation,
"flow_result": result,
},
})
return json.dumps(result, indent=2)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Mutate tools # Mutate tools
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------

View File

@@ -0,0 +1,48 @@
"""convert workstream status enum to string
Revision ID: r5m6n7o8p9q0
Revises: q4l5m6n7o8p9
Create Date: 2026-05-01
"""
from alembic import op
import sqlalchemy as sa
revision = "r5m6n7o8p9q0"
down_revision = "q4l5m6n7o8p9"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.alter_column("workstreams", "status", server_default=None)
op.alter_column(
"workstreams",
"status",
existing_type=sa.Enum(
"active", "blocked", "completed", "archived", name="workstreamstatus"
),
type_=sa.String(length=20),
existing_nullable=False,
postgresql_using="status::text",
server_default="active",
)
op.execute("DROP TYPE IF EXISTS workstreamstatus")
def downgrade() -> None:
op.execute("UPDATE workstreams SET status = 'active' WHERE status = 'todo'")
op.alter_column("workstreams", "status", server_default=None)
workstream_status = sa.Enum(
"active", "blocked", "completed", "archived", name="workstreamstatus"
)
workstream_status.create(op.get_bind(), checkfirst=True)
op.alter_column(
"workstreams",
"status",
existing_type=sa.String(length=20),
type_=workstream_status,
existing_nullable=False,
postgresql_using="status::workstreamstatus",
server_default="active",
)

View File

@@ -274,3 +274,53 @@ class TestStateSummary:
r = await client.get("/state/summary") r = await client.get("/state/summary")
body = r.json() body = r.json()
assert len(body["blocked_tasks"]) >= 1 assert len(body["blocked_tasks"]) >= 1
async def test_summary_derives_blocked_workstream_from_flow_engine(self, client):
await _create_domain(client)
topic = await _create_topic(client)
blocked_ws = await _create_workstream(client, topic["id"], slug="blocked-ws")
dependency_ws = await _create_workstream(client, topic["id"], slug="dependency-ws")
r = await client.post(
f"/workstreams/{blocked_ws['id']}/dependencies/",
json={
"to_workstream_id": dependency_ws["id"],
"description": "Blocked until dependency completes",
},
)
assert r.status_code == 201
r = await client.get("/state/summary")
assert r.status_code == 200
body = r.json()
summaries = {item["id"]: item for item in body["open_workstreams"]}
assert summaries[blocked_ws["id"]]["status"] == "blocked"
assert summaries[blocked_ws["id"]]["blocked_reasons"][0]["id"] == "dependencies.all_complete"
assert body["totals"]["workstreams"]["blocked"] == 1
class TestFlowEndpoints:
async def test_list_flow_definitions(self, client):
r = await client.get("/flows/definitions")
assert r.status_code == 200
entity_types = {item["entity_type"] for item in r.json()}
assert {"workstream", "task", "contribution", "capability_request"} <= entity_types
async def test_get_flow_state_and_advance_workstream(self, client):
await _create_domain(client)
topic = await _create_topic(client)
ws = await _create_workstream(client, topic["id"])
task = await _create_task(client, ws["id"])
await client.patch(f"/tasks/{task['id']}", json={"status": "done"})
r = await client.get(f"/flows/workstream/{ws['id']}")
assert r.status_code == 200
assert "completed" in r.json()["reachable"]
r = await client.post(f"/flows/workstream/{ws['id']}/advance/completed")
assert r.status_code == 200
assert r.json()["current_workstation"] == "completed"
r = await client.get(f"/workstreams/{ws['id']}")
assert r.json()["status"] == "completed"

View File

@@ -0,0 +1,102 @@
---
id: CUST-WP-0037
type: workplan
title: "Task-Flow Reference Documentation Cleanup"
domain: custodian
repo: the-custodian
status: todo
owner: custodian
topic_slug: custodian
created: "2026-05-01"
updated: "2026-05-01"
state_hub_workstream_id: "599d3c40-9f99-466f-b30d-f9d64317345c"
---
# CUST-WP-0037 — Task-Flow Reference Documentation Cleanup
## Goal
Update Custodian reference materials after the task-flow-engine terminology has
landed in State Hub. Retire wording that assumes fixed lifecycle enums and
replace it with the new model of information objects, workstations, and
requisite assertions.
This work is intentionally separate from CUST-WP-0035 so the engine can
stabilize before broad documentation churn.
## T01: Dashboard lifecycle documentation
```task
id: CUST-WP-0037-T01
status: todo
priority: medium
state_hub_task_id: "919d21cb-1cdc-448e-a722-38f1b6374ffb"
```
Review `state-hub/dashboard/src/docs/` and update any page that describes
workstream or task lifecycle, status values, contribution flows, or capability
request transitions.
Acceptance: dashboard docs describe workstations and derived blocked state
without implying that lifecycle movement is controlled by fixed enum tables.
## T02: Repo DOI policy wording
```task
id: CUST-WP-0037-T02
status: todo
priority: medium
state_hub_task_id: "d6485f6c-815f-4f50-a35e-3fd42046691f"
```
Review `state-hub/policies/repo-doi.md` for task/workstream status checks that
assume specific enum values. Update wording to distinguish stored workstation
labels from engine-derived health and blocking assertions.
Acceptance: policy language remains operationally precise while matching the
task-flow-engine model.
## T03: Agent guidance refresh
```task
id: CUST-WP-0037-T03
status: todo
priority: medium
state_hub_task_id: "79251442-12a0-4a00-9e3a-fc484933259d"
```
Review `agents/agent-scope-analyst.md` and other active kaizen/custodian agent
instructions for status-transition assumptions.
Acceptance: agents prefer `get_flow_state()` and `advance_workstation()` where
flow-aware lifecycle movement is relevant.
## T04: Session protocol references
```task
id: CUST-WP-0037-T04
status: todo
priority: medium
state_hub_task_id: "e2a9d096-b66a-43f2-af87-ec802ba9e795"
```
Review project/global `CLAUDE.md` references that instruct agents to call
`update_workstream_status()` or `update_task_status()` for lifecycle movement.
Acceptance: references point to the flow-aware pattern where appropriate while
keeping direct status tools documented for bootstrap or compatibility cases.
## T05: Memory/data-model references
```task
id: CUST-WP-0037-T05
status: todo
priority: low
state_hub_task_id: "2320cd6b-8dc6-4160-9623-4e6425b637e5"
```
Review `memory/MEMORY.md` and nearby state-hub data-model notes for stale enum
language.
Acceptance: memory/reference notes explain the status-to-workstation shift and
link back to CUST-WP-0035 or the task-flow-engine spec.

View File

@@ -4,7 +4,7 @@ type: workplan
title: "Cross-Repo E2E Sandbox Framework" title: "Cross-Repo E2E Sandbox Framework"
domain: railiance domain: railiance
repo: the-custodian repo: the-custodian
status: active status: completed
owner: custodian owner: custodian
topic_slug: railiance topic_slug: railiance
created: "2026-03-27" created: "2026-03-27"

View File

@@ -4,7 +4,7 @@ type: workplan
title: "Task-Flow-Engine — Declarative Workstation and Requisite Model" title: "Task-Flow-Engine — Declarative Workstation and Requisite Model"
domain: custodian domain: custodian
repo: the-custodian repo: the-custodian
status: active status: completed
owner: custodian owner: custodian
topic_slug: custodian topic_slug: custodian
created: "2026-04-30" created: "2026-04-30"
@@ -184,7 +184,7 @@ status labels.
```task ```task
id: CUST-WP-0035-T04 id: CUST-WP-0035-T04
status: todo status: done
priority: high priority: high
state_hub_task_id: "db320d4e-cbcd-4787-a42c-e7cb109737a3" state_hub_task_id: "db320d4e-cbcd-4787-a42c-e7cb109737a3"
``` ```
@@ -224,7 +224,7 @@ with all tasks done automatically surfaces as ready to move to `completed`.
```task ```task
id: CUST-WP-0035-T05 id: CUST-WP-0035-T05
status: todo status: done
priority: medium priority: medium
state_hub_task_id: "8ea7e49f-f1ad-4290-84f4-c1ee75c79786" state_hub_task_id: "8ea7e49f-f1ad-4290-84f4-c1ee75c79786"
``` ```
@@ -253,7 +253,7 @@ assertions are unmet and accepts correctly when they are met.
```task ```task
id: CUST-WP-0035-T06 id: CUST-WP-0035-T06
status: todo status: done
priority: low priority: low
state_hub_task_id: "b9242cb4-5fb4-4e9e-9f16-9a1866cedc6a" state_hub_task_id: "b9242cb4-5fb4-4e9e-9f16-9a1866cedc6a"
``` ```