From 95bcc5c83c860bb8e9b26e605da3f06532addf31 Mon Sep 17 00:00:00 2001 From: tegwick Date: Sat, 2 May 2026 00:21:14 +0200 Subject: [PATCH] Task flow engine implementation --- .../custodian/task_flow_engine_scope_v0.1.md | 92 ++++++++++ state-hub/api/flow_defs.py | 85 +++++++++ state-hub/api/main.py | 2 + state-hub/api/models/__init__.py | 4 +- state-hub/api/models/workstream.py | 14 +- state-hub/api/routers/capability_requests.py | 38 ++-- state-hub/api/routers/contributions.py | 54 +++--- state-hub/api/routers/domains.py | 4 +- state-hub/api/routers/flows.py | 167 ++++++++++++++++++ state-hub/api/routers/state.py | 51 ++++-- state-hub/api/routers/workstreams.py | 11 +- state-hub/api/schemas/workstream.py | 7 +- state-hub/flows/capability_request.yaml | 8 +- state-hub/mcp_server/server.py | 60 ++++++- .../r5m6n7o8p9q0_workstream_status_string.py | 48 +++++ state-hub/tests/test_routers_core.py | 50 ++++++ ...P-0037-task-flow-reference-docs-cleanup.md | 102 +++++++++++ ...501-CUST-WP-0028-e2e-sandbox-framework.md} | 2 +- .../260501-CUST-WP-0035-task-flow-engine.md} | 8 +- 19 files changed, 716 insertions(+), 91 deletions(-) create mode 100644 canon/projects/custodian/task_flow_engine_scope_v0.1.md create mode 100644 state-hub/api/flow_defs.py create mode 100644 state-hub/api/routers/flows.py create mode 100644 state-hub/migrations/versions/r5m6n7o8p9q0_workstream_status_string.py create mode 100644 workplans/CUST-WP-0037-task-flow-reference-docs-cleanup.md rename workplans/{CUST-WP-0028-e2e-sandbox-framework.md => archived/260501-CUST-WP-0028-e2e-sandbox-framework.md} (99%) rename workplans/{CUST-WP-0035-task-flow-engine.md => archived/260501-CUST-WP-0035-task-flow-engine.md} (99%) diff --git a/canon/projects/custodian/task_flow_engine_scope_v0.1.md b/canon/projects/custodian/task_flow_engine_scope_v0.1.md new file mode 100644 index 0000000..d5052b6 --- /dev/null +++ b/canon/projects/custodian/task_flow_engine_scope_v0.1.md @@ -0,0 +1,92 @@ +--- +id: CUST-TFE-SCOPE-2026-000001 +type: architecture-note +title: "Task Flow Engine Extraction Scope v0.1" +status: draft +owners: ["Bernd", "Custodian"] +created: "2026-05-01" +updated: "2026-05-01" +scope: + domains: ["Custodian"] + sensitivity: internal +tags: ["task-flow-engine", "architecture", "state-hub", "workflow"] +domain: custodian +provenance: + workplan: CUST-WP-0035 + task: CUST-WP-0035-T06 +--- + +# Task Flow Engine Extraction Scope v0.1 + +## Purpose + +The task flow engine is currently co-located in `state-hub/` so it can replace +hardcoded lifecycle logic where the need is immediate. Its core model is more +general than State Hub, so it should become a standalone Python package once +the API has stabilized through real use. + +## Standalone Package Boundary + +The future `task-flow-engine` package should contain only pure computation: + +- `models.py`: dataclasses for assertions, workstations, flows, and results +- `evaluator.py`: target path resolution and assertion evaluation +- `engine.py`: reachable workstation and exit-blocking derivation +- `builtins.py`: built-in operations such as `all_eq`, `any_eq`, `none_eq`, + `exists`, and `count_gte` + +The package must not depend on State Hub, SQLAlchemy, FastAPI, MCP, Custodian +canon files, or any specific database schema. + +## State Hub Integration Boundary + +State Hub should retain the domain-specific integration layer: + +- YAML flow definitions in `state-hub/flows/` +- conversion from ORM entities into plain information-object dictionaries +- Alembic migrations and status-column storage choices +- API routers and MCP tools +- custom assertion callables that query State Hub data +- progress events, timestamps, notifications, and other side effects + +This keeps the reusable engine small while allowing State Hub to remain the +place where Custodian-specific lifecycle semantics are declared and exposed. + +## Extraction Path + +1. Keep `state-hub/task_flow_engine/` in-tree until at least one non-trivial + flow definition runs in normal State Hub use. +2. Stabilize the dataclass and result shapes around real consumers: + State Hub API, MCP tools, and repo-facing workflows. +3. Extract the pure package into a new `task-flow-engine` repository. +4. Publish it as an internal pip package. +5. Replace the in-tree package with a dependency import in State Hub. + +The extraction should preserve the current import surface where practical: +`FlowDef`, `WorkstationDef`, `AssertionDef`, `FlowResult`, `AssertionResult`, +`FlowEngine`, and `resolve_target`. + +## Managed Repo Concept + +When extraction starts, register a managed repository concept: + +- slug: `task-flow-engine` +- domain: `custodian` +- purpose: reusable declarative workstation/assertion engine +- primary capability: `workflow.evaluate` +- secondary capabilities: `workflow.define`, `workflow.explain` + +## Extension Point + +An extension point is registered in State Hub to keep this extraction visible: + +- type: `architecture` +- title: `task-flow-engine extraction as standalone package` +- status: `open` +- priority: `low` + +Description: + +`task_flow_engine/` is currently co-located in the State Hub. Extract it to its +own repository and pip package once the API is stable after at least one +non-trivial flow definition has been running in production. diff --git a/state-hub/api/flow_defs.py b/state-hub/api/flow_defs.py new file mode 100644 index 0000000..9c7c5a2 --- /dev/null +++ b/state-hub/api/flow_defs.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +from functools import lru_cache +from pathlib import Path +from typing import Any + +import yaml + +from task_flow_engine import AssertionDef, AssertionResult, FlowDef, FlowEngine, FlowResult + + +FLOW_DIR = Path(__file__).resolve().parents[1] / "flows" + + +@lru_cache +def load_flow(entity_type: str) -> FlowDef: + path = FLOW_DIR / f"{entity_type}.yaml" + data = yaml.safe_load(path.read_text(encoding="utf-8")) + return FlowDef.from_dict(data) + + +def evaluate_transition( + entity_type: str, + current_workstation: str, + target_workstation: str, + extra: dict[str, Any] | None = None, +) -> tuple[bool, list[AssertionResult], FlowResult]: + flow = load_flow(entity_type) + obj = { + "status": current_workstation, + "workstation": current_workstation, + "previous_workstation": current_workstation, + **(extra or {}), + } + engine = create_flow_engine() + result = engine.evaluate(obj, flow) + can_reach, failures = engine.can_reach(obj, flow, target_workstation) + return can_reach, failures, result + + +def create_flow_engine() -> FlowEngine: + return FlowEngine( + custom_ops={ + "dependencies.any_incomplete": _dependencies_any_incomplete, + } + ) + + +def _dependencies_any_incomplete( + assertion: AssertionDef, + obj: dict[str, Any], + values: list[Any], +) -> bool: + return bool(values) and any(value != assertion.value for value in values) + + +def assertion_result_to_dict(result: AssertionResult) -> dict[str, Any]: + return { + "id": result.id, + "passed": result.passed, + "target": result.target, + "op": result.op, + "expected": result.expected, + "actual": result.actual, + "description": result.description, + "reason": result.reason, + } + + +def flow_result_to_dict(result: FlowResult) -> dict[str, Any]: + return { + "current_workstation": result.current_workstation, + "exit_blocked": result.exit_blocked, + "blocking_assertions": [ + assertion_result_to_dict(item) for item in result.blocking_assertions + ], + "reachable": result.reachable, + "unreachable": [ + { + "workstation": item.workstation, + "blocking": assertion_result_to_dict(item.blocking), + } + for item in result.unreachable + ], + } diff --git a/state-hub/api/main.py b/state-hub/api/main.py index 8d9a2fd..9d35136 100644 --- a/state-hub/api/main.py +++ b/state-hub/api/main.py @@ -9,6 +9,7 @@ from api.routers import decisions, extension_points, progress, state, tasks, tec from api.routers import domains, repos, contributions, sbom, policy, domain_goals, repo_goals, messages, capability_requests, tpsc from api.routers import token_events from api.routers import interface_changes +from api.routers import flows @asynccontextmanager @@ -53,6 +54,7 @@ app.include_router(capability_requests.router) app.include_router(tpsc.router) app.include_router(token_events.router) app.include_router(interface_changes.router) +app.include_router(flows.router) app.include_router(state.router) app.include_router(policy.router) diff --git a/state-hub/api/models/__init__.py b/state-hub/api/models/__init__.py index 75c424e..c3377d0 100644 --- a/state-hub/api/models/__init__.py +++ b/state-hub/api/models/__init__.py @@ -4,7 +4,7 @@ from api.models.domain_goal import DomainGoal, DomainGoalStatus from api.models.topic import Topic, TopicStatus from api.models.managed_repo import ManagedRepo from api.models.repo_goal import RepoGoal, RepoGoalStatus -from api.models.workstream import Workstream, WorkstreamStatus +from api.models.workstream import Workstream from api.models.workstream_dependency import WorkstreamDependency from api.models.task import Task, TaskStatus, TaskPriority from api.models.decision import Decision, DecisionType, DecisionStatus @@ -29,7 +29,7 @@ __all__ = [ "Topic", "TopicStatus", "ManagedRepo", "RepoGoal", "RepoGoalStatus", - "Workstream", "WorkstreamStatus", + "Workstream", "WorkstreamDependency", "Task", "TaskStatus", "TaskPriority", "Decision", "DecisionType", "DecisionStatus", diff --git a/state-hub/api/models/workstream.py b/state-hub/api/models/workstream.py index ebe669d..db45d16 100644 --- a/state-hub/api/models/workstream.py +++ b/state-hub/api/models/workstream.py @@ -1,21 +1,13 @@ -import enum import uuid from datetime import date -from sqlalchemy import Date, Enum, ForeignKey, String, Text +from sqlalchemy import Date, ForeignKey, String, Text from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.orm import Mapped, mapped_column, relationship from api.models.base import Base, TimestampMixin, new_uuid -class WorkstreamStatus(str, enum.Enum): - active = "active" - blocked = "blocked" - completed = "completed" - archived = "archived" - - class Workstream(Base, TimestampMixin): __tablename__ = "workstreams" @@ -28,8 +20,8 @@ class Workstream(Base, TimestampMixin): slug: Mapped[str] = mapped_column(String(100), unique=True, nullable=False, index=True) title: Mapped[str] = mapped_column(String(255), nullable=False) description: Mapped[str | None] = mapped_column(Text, nullable=True) - status: Mapped[WorkstreamStatus] = mapped_column( - Enum(WorkstreamStatus), nullable=False, default=WorkstreamStatus.active + status: Mapped[str] = mapped_column( + String(20), nullable=False, default="active", server_default="active" ) owner: Mapped[str | None] = mapped_column(String(100), nullable=True) due_date: Mapped[date | None] = mapped_column(Date, nullable=True) diff --git a/state-hub/api/routers/capability_requests.py b/state-hub/api/routers/capability_requests.py index 8130d59..6b5fda9 100644 --- a/state-hub/api/routers/capability_requests.py +++ b/state-hub/api/routers/capability_requests.py @@ -7,6 +7,7 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from api.database import get_session +from api.flow_defs import assertion_result_to_dict, evaluate_transition, flow_result_to_dict from api.models.agent_message import AgentMessage from api.models.capability_catalog import CapabilityCatalog from api.models.capability_request import CapabilityRequest @@ -28,22 +29,6 @@ from api.schemas.capability_request import ( router = APIRouter(tags=["capability-requests"]) -# --------------------------------------------------------------------------- -# Lifecycle guard -# --------------------------------------------------------------------------- - -_VALID_TRANSITIONS: dict[str, set[str]] = { - "requested": {"accepted", "rejected", "withdrawn", "routing_disputed"}, - "routing_disputed": {"requested", "withdrawn"}, - "accepted": {"in_progress", "rejected", "withdrawn"}, - "in_progress": {"ready_for_review", "rejected", "withdrawn"}, - "ready_for_review": {"completed", "in_progress", "withdrawn"}, - "completed": set(), - "rejected": set(), - "withdrawn": set(), -} - - # --------------------------------------------------------------------------- # Capability Catalog endpoints # --------------------------------------------------------------------------- @@ -602,12 +587,21 @@ async def _get_request_or_404(request_id: uuid.UUID, session: AsyncSession) -> C def _check_transition(current: str, target: str) -> None: - allowed = _VALID_TRANSITIONS.get(current, set()) - if target not in allowed: + can_reach, failures, flow_result = evaluate_transition( + "capability_request", + current, + target, + ) + if not can_reach: raise HTTPException( status_code=422, - detail=( - f"Cannot transition from '{current}' to '{target}'. " - f"Allowed: {sorted(allowed) or 'none (terminal state)'}" - ), + detail={ + "message": f"Cannot transition from '{current}' to '{target}'.", + "current_workstation": current, + "target_workstation": target, + "blocking_assertions": [ + assertion_result_to_dict(item) for item in failures + ], + "flow_result": flow_result_to_dict(flow_result), + }, ) diff --git a/state-hub/api/routers/contributions.py b/state-hub/api/routers/contributions.py index 3616abe..7427ee2 100644 --- a/state-hub/api/routers/contributions.py +++ b/state-hub/api/routers/contributions.py @@ -6,37 +6,12 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from api.database import get_session +from api.flow_defs import assertion_result_to_dict, evaluate_transition, flow_result_to_dict from api.models.contribution import Contribution, ContributionStatus, ContributionType from api.schemas.contribution import ContributionCreate, ContributionRead, ContributionStatusPatch router = APIRouter(prefix="/contributions", tags=["contributions"]) -# Valid forward transitions in the lifecycle -_VALID_TRANSITIONS: dict[ContributionStatus, set[ContributionStatus]] = { - ContributionStatus.draft: { - ContributionStatus.submitted, - ContributionStatus.withdrawn, - }, - ContributionStatus.submitted: { - ContributionStatus.acknowledged, - ContributionStatus.rejected, - ContributionStatus.withdrawn, - }, - ContributionStatus.acknowledged: { - ContributionStatus.accepted, - ContributionStatus.rejected, - ContributionStatus.withdrawn, - }, - ContributionStatus.accepted: { - ContributionStatus.merged, - ContributionStatus.withdrawn, - }, - ContributionStatus.rejected: set(), - ContributionStatus.merged: set(), - ContributionStatus.withdrawn: set(), -} - - @router.get("/", response_model=list[ContributionRead]) async def list_contributions( type: ContributionType | None = Query(None), @@ -93,14 +68,25 @@ async def patch_contribution_status( session: AsyncSession = Depends(get_session), ) -> Contribution: contrib = await _get_or_404(contribution_id, session) - allowed = _VALID_TRANSITIONS.get(contrib.status, set()) - if body.status not in allowed: + current = _status_value(contrib.status) + target = _status_value(body.status) + can_reach, failures, flow_result = evaluate_transition( + "contribution", + current, + target, + ) + if not can_reach: raise HTTPException( status_code=422, - detail=( - f"Cannot transition from '{contrib.status}' to '{body.status}'. " - f"Allowed: {[s.value for s in allowed] or 'none (terminal state)'}" - ), + detail={ + "message": f"Cannot transition from '{current}' to '{target}'.", + "current_workstation": current, + "target_workstation": target, + "blocking_assertions": [ + assertion_result_to_dict(item) for item in failures + ], + "flow_result": flow_result_to_dict(flow_result), + }, ) contrib.status = body.status if body.notes: @@ -145,3 +131,7 @@ async def _get_or_404(contribution_id: uuid.UUID, session: AsyncSession) -> Cont if contrib is None: raise HTTPException(status_code=404, detail=f"Contribution '{contribution_id}' not found") return contrib + + +def _status_value(status: ContributionStatus | str) -> str: + return status.value if isinstance(status, ContributionStatus) else str(status) diff --git a/state-hub/api/routers/domains.py b/state-hub/api/routers/domains.py index 2b1ed94..274b989 100644 --- a/state-hub/api/routers/domains.py +++ b/state-hub/api/routers/domains.py @@ -10,7 +10,7 @@ from api.models.extension_point import ExtensionPoint from api.models.managed_repo import ManagedRepo from api.models.technical_debt import TechnicalDebt from api.models.topic import Topic -from api.models.workstream import Workstream, WorkstreamStatus +from api.models.workstream import Workstream from api.schemas.domain import DomainCreate, DomainDetail, DomainRead, DomainRename, DomainUpdate, RepoStub router = APIRouter(prefix="/domains", tags=["domains"]) @@ -69,7 +69,7 @@ async def get_domain( ws_count_row = await session.execute( select(func.count()).select_from(Workstream) .where(Workstream.topic_id.in_(topic_ids)) - .where(Workstream.status == WorkstreamStatus.active) + .where(Workstream.status == "active") ) ws_count = ws_count_row.scalar_one() diff --git a/state-hub/api/routers/flows.py b/state-hub/api/routers/flows.py new file mode 100644 index 0000000..c3919d3 --- /dev/null +++ b/state-hub/api/routers/flows.py @@ -0,0 +1,167 @@ +from __future__ import annotations + +import uuid +from typing import Any + +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from api.database import get_session +from api.flow_defs import ( + assertion_result_to_dict, + create_flow_engine, + flow_result_to_dict, + load_flow, +) +from api.models.capability_request import CapabilityRequest +from api.models.contribution import Contribution +from api.models.task import Task +from api.models.workstream import Workstream +from api.models.workstream_dependency import WorkstreamDependency + +router = APIRouter(prefix="/flows", tags=["flows"]) + + +@router.get("/definitions") +async def list_flow_definitions() -> list[dict[str, Any]]: + flows = [ + load_flow(entity_type) + for entity_type in ( + "workstream", + "task", + "contribution", + "capability_request", + ) + ] + return [ + { + "id": flow.id, + "entity_type": flow.entity_type, + "workstations": [ + { + "name": workstation.name, + "description": workstation.description, + "entry_assertion_count": len(workstation.entry_assertions), + "exit_assertion_count": len(workstation.exit_assertions), + } + for workstation in flow.workstations + ], + } + for flow in flows + ] + + +@router.get("/{entity_type}/{entity_id}") +async def get_flow_state( + entity_type: str, + entity_id: uuid.UUID, + session: AsyncSession = Depends(get_session), +) -> dict[str, Any]: + obj = await _flow_object(entity_type, entity_id, session) + flow = load_flow(entity_type) + result = create_flow_engine().evaluate(obj, flow) + return flow_result_to_dict(result) + + +@router.post("/{entity_type}/{entity_id}/advance/{target_workstation}") +async def advance_workstation( + entity_type: str, + entity_id: uuid.UUID, + target_workstation: str, + session: AsyncSession = Depends(get_session), +) -> dict[str, Any]: + obj = await _flow_object(entity_type, entity_id, session) + flow = load_flow(entity_type) + engine = create_flow_engine() + can_reach, failures = engine.can_reach(obj, flow, target_workstation) + if not can_reach: + raise HTTPException( + status_code=409, + detail={ + "message": ( + f"Cannot advance {entity_type} '{entity_id}' " + f"to '{target_workstation}'." + ), + "blocking_assertions": [ + assertion_result_to_dict(item) for item in failures + ], + "flow_result": flow_result_to_dict(engine.evaluate(obj, flow)), + }, + ) + + entity = await _entity(entity_type, entity_id, session) + entity.status = target_workstation + await session.commit() + await session.refresh(entity) + return await get_flow_state(entity_type, entity_id, session) + + +async def _flow_object( + entity_type: str, + entity_id: uuid.UUID, + session: AsyncSession, +) -> dict[str, Any]: + entity = await _entity(entity_type, entity_id, session) + status = _value(entity.status) + obj: dict[str, Any] = { + "id": str(entity.id), + "status": status, + "workstation": status, + "previous_workstation": status, + } + + if entity_type == "workstream": + tasks = list((await session.execute( + select(Task).where(Task.workstream_id == entity_id) + )).scalars().all()) + deps = list((await session.execute( + select(WorkstreamDependency).where( + WorkstreamDependency.from_workstream_id == entity_id + ) + )).scalars().all()) + dependency_ids = [dep.to_workstream_id for dep in deps] + dependency_workstations: list[dict[str, Any]] = [] + if dependency_ids: + dep_ws = list((await session.execute( + select(Workstream).where(Workstream.id.in_(dependency_ids)) + )).scalars().all()) + dependency_workstations = [ + {"id": str(ws.id), "workstation": ws.status} + for ws in dep_ws + ] + obj.update({ + "tasks": [{"id": str(task.id), "status": _value(task.status)} for task in tasks], + "dependencies": dependency_workstations, + }) + elif entity_type == "task": + obj.update({ + "needs_human": entity.needs_human, + "blocking_reason": entity.blocking_reason, + }) + + return obj + + +async def _entity( + entity_type: str, + entity_id: uuid.UUID, + session: AsyncSession, +): + model_by_type = { + "workstream": Workstream, + "task": Task, + "contribution": Contribution, + "capability_request": CapabilityRequest, + } + model = model_by_type.get(entity_type) + if model is None: + raise HTTPException(status_code=404, detail=f"Unknown flow entity type '{entity_type}'") + entity = await session.get(model, entity_id) + if entity is None: + raise HTTPException(status_code=404, detail=f"{entity_type} '{entity_id}' not found") + return entity + + +def _value(item): + return item.value if hasattr(item, "value") else item diff --git a/state-hub/api/routers/state.py b/state-hub/api/routers/state.py index f890450..17b3988 100644 --- a/state-hub/api/routers/state.py +++ b/state-hub/api/routers/state.py @@ -6,6 +6,7 @@ from sqlalchemy import func, select, text from sqlalchemy.ext.asyncio import AsyncSession from api.database import get_session, engine +from api.flow_defs import assertion_result_to_dict, load_flow from api.models.capability_request import CapabilityRequest from api.models.contribution import Contribution, ContributionStatus, ContributionType from api.models.decision import Decision, DecisionStatus, DecisionType @@ -17,7 +18,7 @@ from api.models.sbom_entry import SBOMEntry from api.models.task import Task, TaskPriority, TaskStatus from api.models.technical_debt import TechnicalDebt from api.models.topic import Topic, TopicStatus -from api.models.workstream import Workstream, WorkstreamStatus +from api.models.workstream import Workstream from api.models.workstream_dependency import WorkstreamDependency from api.schemas.decision import DecisionRead from api.schemas.domain import DomainSummary @@ -35,6 +36,7 @@ from api.schemas.task import TaskRead from api.schemas.topic import TopicWithWorkstreams from api.schemas.workstream import WorkstreamRead, WorkstreamWithTaskCounts, WorkstreamWithDeps from api.schemas.workstream_dependency import WorkstreamDepStub +from task_flow_engine import FlowEngine router = APIRouter(prefix="/state", tags=["state"]) @@ -69,7 +71,7 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm open_ws_rows = await session.execute( select(Workstream) - .where(Workstream.status.in_([WorkstreamStatus.active, WorkstreamStatus.blocked])) + .where(Workstream.status.in_(["active", "blocked"])) .order_by(Workstream.due_date.asc().nullslast(), Workstream.created_at) ) open_ws = list(open_ws_rows.scalars().all()) @@ -128,6 +130,27 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm description=d.description, )) + workstream_flow = load_flow("workstream") + flow_engine = FlowEngine() + effective_status: dict = {} + blocked_reasons: dict = {} + for w in open_ws: + flow_obj = { + "status": w.status, + "workstation": w.status, + "tasks": [{"status": _value(t.status)} for t in w.tasks], + "dependencies": [ + {"workstation": ws_lookup[d.to_workstream_id].status} + for d in dep_rows + if d.from_workstream_id == w.id and d.to_workstream_id in ws_lookup + ], + } + flow_result = flow_engine.evaluate(flow_obj, workstream_flow) + effective_status[w.id] = "blocked" if flow_result.exit_blocked else w.status + blocked_reasons[w.id] = [ + assertion_result_to_dict(item) for item in flow_result.blocking_assertions + ] + # Totals — one GROUP BY per table topic_counts = {r[0]: r[1] for r in await session.execute( select(Topic.status, func.count()).group_by(Topic.status) @@ -150,10 +173,10 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm total=sum(topic_counts.values()), ), workstreams=WorkstreamTotals( - active=ws_counts.get(WorkstreamStatus.active, 0), - blocked=ws_counts.get(WorkstreamStatus.blocked, 0), - completed=ws_counts.get(WorkstreamStatus.completed, 0), - archived=ws_counts.get(WorkstreamStatus.archived, 0), + active=sum(1 for status in effective_status.values() if status == "active"), + blocked=sum(1 for status in effective_status.values() if status == "blocked"), + completed=ws_counts.get("completed", 0), + archived=ws_counts.get("archived", 0), total=sum(ws_counts.values()), ), tasks=TaskTotals( @@ -226,7 +249,10 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm open_capability_requests=open_cap_req_count, open_workstreams=[ WorkstreamWithDeps( - **WorkstreamRead.model_validate(w).model_dump(), + **{ + **WorkstreamRead.model_validate(w).model_dump(), + "status": effective_status.get(w.id, w.status), + }, tasks_total=sum(task_per_ws.get(w.id, {}).values()), tasks_todo=task_per_ws.get(w.id, {}).get(TaskStatus.todo, 0), tasks_in_progress=task_per_ws.get(w.id, {}).get(TaskStatus.in_progress, 0), @@ -234,6 +260,7 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm tasks_done=task_per_ws.get(w.id, {}).get(TaskStatus.done, 0), depends_on=dep_index.get(w.id, {}).get("depends_on", []), blocks=dep_index.get(w.id, {}).get("blocks", []), + blocked_reasons=blocked_reasons.get(w.id, []), ) for w in open_ws ], @@ -259,7 +286,7 @@ async def _build_domain_summaries(session: AsyncSession) -> list[DomainSummary]: for domain_id, cnt in await session.execute( select(Topic.domain_id, func.count(Workstream.id)) .join(Workstream, Workstream.topic_id == Topic.id) - .where(Workstream.status == WorkstreamStatus.active) + .where(Workstream.status == "active") .group_by(Topic.domain_id) ): ws_per_domain[domain_id] = cnt @@ -357,14 +384,14 @@ async def _derive_next_steps(session: AsyncSession) -> list[NextStep]: all_done = True for to_id in to_ws_ids: to_ws = await session.get(Workstream, to_id) - if to_ws is None or to_ws.status != WorkstreamStatus.completed: + if to_ws is None or to_ws.status != "completed": all_done = False break if not all_done: continue from_ws = await session.get(Workstream, from_ws_id) - if from_ws is None or from_ws.status not in (WorkstreamStatus.active, WorkstreamStatus.blocked): + if from_ws is None or from_ws.status not in ("active", "blocked"): continue todo_rows = await session.execute( @@ -414,6 +441,10 @@ async def _get_domain_slug_for_workstream(ws: Workstream | None, session: AsyncS return domain.slug if domain else None +def _value(item): + return item.value if hasattr(item, "value") else item + + @router.get("/next_steps", response_model=list[NextStep]) async def get_next_steps(session: AsyncSession = Depends(get_session)) -> list[NextStep]: """Derive contextual next-action suggestions from current hub state. diff --git a/state-hub/api/routers/workstreams.py b/state-hub/api/routers/workstreams.py index 4166a40..efd8705 100644 --- a/state-hub/api/routers/workstreams.py +++ b/state-hub/api/routers/workstreams.py @@ -5,8 +5,13 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from api.database import get_session -from api.models.workstream import Workstream, WorkstreamStatus -from api.schemas.workstream import WorkstreamCreate, WorkstreamRead, WorkstreamUpdate +from api.models.workstream import Workstream +from api.schemas.workstream import ( + WorkstreamCreate, + WorkstreamRead, + WorkstreamStatus, + WorkstreamUpdate, +) router = APIRouter(prefix="/workstreams", tags=["workstreams"]) @@ -86,7 +91,7 @@ async def archive_workstream( ws = await session.get(Workstream, workstream_id) if ws is None: raise HTTPException(status_code=404, detail="Workstream not found") - ws.status = WorkstreamStatus.archived + ws.status = "archived" await session.commit() await session.refresh(ws) return ws diff --git a/state-hub/api/schemas/workstream.py b/state-hub/api/schemas/workstream.py index 70dbb11..77e4e2e 100644 --- a/state-hub/api/schemas/workstream.py +++ b/state-hub/api/schemas/workstream.py @@ -1,18 +1,20 @@ import uuid from datetime import date, datetime +from typing import Literal from pydantic import BaseModel, ConfigDict -from api.models.workstream import WorkstreamStatus from api.schemas.workstream_dependency import WorkstreamDepStub +WorkstreamStatus = Literal["todo", "active", "blocked", "completed", "archived"] + class WorkstreamCreate(BaseModel): topic_id: uuid.UUID slug: str title: str description: str | None = None - status: WorkstreamStatus = WorkstreamStatus.active + status: WorkstreamStatus = "active" owner: str | None = None due_date: date | None = None repo_id: uuid.UUID | None = None # GEMS primary: the owning repository @@ -57,3 +59,4 @@ class WorkstreamWithDeps(WorkstreamWithTaskCounts): """WorkstreamWithTaskCounts enriched with dependency graph edges.""" depends_on: list[WorkstreamDepStub] = [] blocks: list[WorkstreamDepStub] = [] + blocked_reasons: list[dict] = [] diff --git a/state-hub/flows/capability_request.yaml b/state-hub/flows/capability_request.yaml index 503b9d1..d32fd12 100644 --- a/state-hub/flows/capability_request.yaml +++ b/state-hub/flows/capability_request.yaml @@ -3,7 +3,13 @@ entity_type: capability_request workstations: - name: requested description: Capability has been requested and awaits routing or acceptance. - entry_assertions: [] + entry_assertions: + - id: capability_request.from_routing_disputed + target: previous_workstation + op: any_eq + value: + - routing_disputed + description: Rerouting returns a disputed request to requested. exit_assertions: [] - name: routing_disputed description: Routing decision has been disputed. diff --git a/state-hub/mcp_server/server.py b/state-hub/mcp_server/server.py index ac4341e..098aa3c 100644 --- a/state-hub/mcp_server/server.py +++ b/state-hub/mcp_server/server.py @@ -173,7 +173,11 @@ def get_domain_summary(domain_slug: str) -> str: topic_id = topic["id"] - workstreams = _get("/workstreams", {"topic_id": topic_id, "status": "active"}) + state_summary = _get("/state/summary") + workstreams = [ + ws for ws in state_summary.get("open_workstreams", []) + if ws.get("topic_id") == topic_id + ] blocking = _get("/decisions", {"decision_type": "pending", "topic_id": topic_id}) recent = _get("/progress", {"topic_id": topic_id, "limit": 5}) repos = _get("/repos", {"domain": domain_slug}) @@ -348,6 +352,60 @@ def get_recent_progress(limit: int = 20, since: str | None = None) -> str: return json.dumps(_get("/progress", {"limit": limit, "since": since}), indent=2) +@mcp.tool() +def list_flow_definitions() -> str: + """List registered declarative flow definitions. + + Returns each entity type, its workstations, and entry/exit assertion counts. + Use this for orientation before calling get_flow_state or advance_workstation. + """ + return json.dumps(_get("/flows/definitions"), indent=2) + + +@mcp.tool() +def get_flow_state(entity_type: str, entity_id: str) -> str: + """Return the declarative flow state for one entity. + + Args: + entity_type: workstream | task | contribution | capability_request + entity_id: UUID of the entity + + Returns current workstation, exit-blocking assertions, reachable + workstations, and unreachable workstations with the first blocking + assertion for each. + """ + return json.dumps(_get(f"/flows/{entity_type}/{entity_id}"), indent=2) + + +@mcp.tool() +def advance_workstation(entity_type: str, entity_id: str, target_workstation: str) -> str: + """Attempt to move an entity to a target workstation. + + Args: + entity_type: workstream | task | contribution | capability_request + entity_id: UUID of the entity + target_workstation: desired workstation/status name + + Returns the new FlowResult on success. If the target is unreachable, the + response contains a 409-equivalent error with machine-readable failing + assertions. + """ + result = _post(f"/flows/{entity_type}/{entity_id}/advance/{target_workstation}", {}) + if not isinstance(result, dict) or "error" not in result: + _post("/progress", { + "event_type": "workstation_advanced", + "summary": f"{entity_type} {entity_id} advanced to {target_workstation}", + "author": "custodian", + "detail": { + "entity_type": entity_type, + "entity_id": entity_id, + "target_workstation": target_workstation, + "flow_result": result, + }, + }) + return json.dumps(result, indent=2) + + # --------------------------------------------------------------------------- # Mutate tools # --------------------------------------------------------------------------- diff --git a/state-hub/migrations/versions/r5m6n7o8p9q0_workstream_status_string.py b/state-hub/migrations/versions/r5m6n7o8p9q0_workstream_status_string.py new file mode 100644 index 0000000..e9601f9 --- /dev/null +++ b/state-hub/migrations/versions/r5m6n7o8p9q0_workstream_status_string.py @@ -0,0 +1,48 @@ +"""convert workstream status enum to string + +Revision ID: r5m6n7o8p9q0 +Revises: q4l5m6n7o8p9 +Create Date: 2026-05-01 + +""" +from alembic import op +import sqlalchemy as sa + +revision = "r5m6n7o8p9q0" +down_revision = "q4l5m6n7o8p9" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.alter_column("workstreams", "status", server_default=None) + op.alter_column( + "workstreams", + "status", + existing_type=sa.Enum( + "active", "blocked", "completed", "archived", name="workstreamstatus" + ), + type_=sa.String(length=20), + existing_nullable=False, + postgresql_using="status::text", + server_default="active", + ) + op.execute("DROP TYPE IF EXISTS workstreamstatus") + + +def downgrade() -> None: + op.execute("UPDATE workstreams SET status = 'active' WHERE status = 'todo'") + op.alter_column("workstreams", "status", server_default=None) + workstream_status = sa.Enum( + "active", "blocked", "completed", "archived", name="workstreamstatus" + ) + workstream_status.create(op.get_bind(), checkfirst=True) + op.alter_column( + "workstreams", + "status", + existing_type=sa.String(length=20), + type_=workstream_status, + existing_nullable=False, + postgresql_using="status::workstreamstatus", + server_default="active", + ) diff --git a/state-hub/tests/test_routers_core.py b/state-hub/tests/test_routers_core.py index 5002316..fb1ed50 100644 --- a/state-hub/tests/test_routers_core.py +++ b/state-hub/tests/test_routers_core.py @@ -274,3 +274,53 @@ class TestStateSummary: r = await client.get("/state/summary") body = r.json() assert len(body["blocked_tasks"]) >= 1 + + async def test_summary_derives_blocked_workstream_from_flow_engine(self, client): + await _create_domain(client) + topic = await _create_topic(client) + blocked_ws = await _create_workstream(client, topic["id"], slug="blocked-ws") + dependency_ws = await _create_workstream(client, topic["id"], slug="dependency-ws") + + r = await client.post( + f"/workstreams/{blocked_ws['id']}/dependencies/", + json={ + "to_workstream_id": dependency_ws["id"], + "description": "Blocked until dependency completes", + }, + ) + assert r.status_code == 201 + + r = await client.get("/state/summary") + assert r.status_code == 200 + body = r.json() + summaries = {item["id"]: item for item in body["open_workstreams"]} + + assert summaries[blocked_ws["id"]]["status"] == "blocked" + assert summaries[blocked_ws["id"]]["blocked_reasons"][0]["id"] == "dependencies.all_complete" + assert body["totals"]["workstreams"]["blocked"] == 1 + + +class TestFlowEndpoints: + async def test_list_flow_definitions(self, client): + r = await client.get("/flows/definitions") + assert r.status_code == 200 + entity_types = {item["entity_type"] for item in r.json()} + assert {"workstream", "task", "contribution", "capability_request"} <= entity_types + + async def test_get_flow_state_and_advance_workstream(self, client): + await _create_domain(client) + topic = await _create_topic(client) + ws = await _create_workstream(client, topic["id"]) + task = await _create_task(client, ws["id"]) + await client.patch(f"/tasks/{task['id']}", json={"status": "done"}) + + r = await client.get(f"/flows/workstream/{ws['id']}") + assert r.status_code == 200 + assert "completed" in r.json()["reachable"] + + r = await client.post(f"/flows/workstream/{ws['id']}/advance/completed") + assert r.status_code == 200 + assert r.json()["current_workstation"] == "completed" + + r = await client.get(f"/workstreams/{ws['id']}") + assert r.json()["status"] == "completed" diff --git a/workplans/CUST-WP-0037-task-flow-reference-docs-cleanup.md b/workplans/CUST-WP-0037-task-flow-reference-docs-cleanup.md new file mode 100644 index 0000000..fc7d5f1 --- /dev/null +++ b/workplans/CUST-WP-0037-task-flow-reference-docs-cleanup.md @@ -0,0 +1,102 @@ +--- +id: CUST-WP-0037 +type: workplan +title: "Task-Flow Reference Documentation Cleanup" +domain: custodian +repo: the-custodian +status: todo +owner: custodian +topic_slug: custodian +created: "2026-05-01" +updated: "2026-05-01" +state_hub_workstream_id: "599d3c40-9f99-466f-b30d-f9d64317345c" +--- + +# CUST-WP-0037 — Task-Flow Reference Documentation Cleanup + +## Goal + +Update Custodian reference materials after the task-flow-engine terminology has +landed in State Hub. Retire wording that assumes fixed lifecycle enums and +replace it with the new model of information objects, workstations, and +requisite assertions. + +This work is intentionally separate from CUST-WP-0035 so the engine can +stabilize before broad documentation churn. + +## T01: Dashboard lifecycle documentation + +```task +id: CUST-WP-0037-T01 +status: todo +priority: medium +state_hub_task_id: "919d21cb-1cdc-448e-a722-38f1b6374ffb" +``` + +Review `state-hub/dashboard/src/docs/` and update any page that describes +workstream or task lifecycle, status values, contribution flows, or capability +request transitions. + +Acceptance: dashboard docs describe workstations and derived blocked state +without implying that lifecycle movement is controlled by fixed enum tables. + +## T02: Repo DOI policy wording + +```task +id: CUST-WP-0037-T02 +status: todo +priority: medium +state_hub_task_id: "d6485f6c-815f-4f50-a35e-3fd42046691f" +``` + +Review `state-hub/policies/repo-doi.md` for task/workstream status checks that +assume specific enum values. Update wording to distinguish stored workstation +labels from engine-derived health and blocking assertions. + +Acceptance: policy language remains operationally precise while matching the +task-flow-engine model. + +## T03: Agent guidance refresh + +```task +id: CUST-WP-0037-T03 +status: todo +priority: medium +state_hub_task_id: "79251442-12a0-4a00-9e3a-fc484933259d" +``` + +Review `agents/agent-scope-analyst.md` and other active kaizen/custodian agent +instructions for status-transition assumptions. + +Acceptance: agents prefer `get_flow_state()` and `advance_workstation()` where +flow-aware lifecycle movement is relevant. + +## T04: Session protocol references + +```task +id: CUST-WP-0037-T04 +status: todo +priority: medium +state_hub_task_id: "e2a9d096-b66a-43f2-af87-ec802ba9e795" +``` + +Review project/global `CLAUDE.md` references that instruct agents to call +`update_workstream_status()` or `update_task_status()` for lifecycle movement. + +Acceptance: references point to the flow-aware pattern where appropriate while +keeping direct status tools documented for bootstrap or compatibility cases. + +## T05: Memory/data-model references + +```task +id: CUST-WP-0037-T05 +status: todo +priority: low +state_hub_task_id: "2320cd6b-8dc6-4160-9623-4e6425b637e5" +``` + +Review `memory/MEMORY.md` and nearby state-hub data-model notes for stale enum +language. + +Acceptance: memory/reference notes explain the status-to-workstation shift and +link back to CUST-WP-0035 or the task-flow-engine spec. diff --git a/workplans/CUST-WP-0028-e2e-sandbox-framework.md b/workplans/archived/260501-CUST-WP-0028-e2e-sandbox-framework.md similarity index 99% rename from workplans/CUST-WP-0028-e2e-sandbox-framework.md rename to workplans/archived/260501-CUST-WP-0028-e2e-sandbox-framework.md index 838f137..976e91c 100644 --- a/workplans/CUST-WP-0028-e2e-sandbox-framework.md +++ b/workplans/archived/260501-CUST-WP-0028-e2e-sandbox-framework.md @@ -4,7 +4,7 @@ type: workplan title: "Cross-Repo E2E Sandbox Framework" domain: railiance repo: the-custodian -status: active +status: completed owner: custodian topic_slug: railiance created: "2026-03-27" diff --git a/workplans/CUST-WP-0035-task-flow-engine.md b/workplans/archived/260501-CUST-WP-0035-task-flow-engine.md similarity index 99% rename from workplans/CUST-WP-0035-task-flow-engine.md rename to workplans/archived/260501-CUST-WP-0035-task-flow-engine.md index 80c358c..0bdf002 100644 --- a/workplans/CUST-WP-0035-task-flow-engine.md +++ b/workplans/archived/260501-CUST-WP-0035-task-flow-engine.md @@ -4,7 +4,7 @@ type: workplan title: "Task-Flow-Engine — Declarative Workstation and Requisite Model" domain: custodian repo: the-custodian -status: active +status: completed owner: custodian topic_slug: custodian created: "2026-04-30" @@ -184,7 +184,7 @@ status labels. ```task id: CUST-WP-0035-T04 -status: todo +status: done priority: high state_hub_task_id: "db320d4e-cbcd-4787-a42c-e7cb109737a3" ``` @@ -224,7 +224,7 @@ with all tasks done automatically surfaces as ready to move to `completed`. ```task id: CUST-WP-0035-T05 -status: todo +status: done priority: medium state_hub_task_id: "8ea7e49f-f1ad-4290-84f4-c1ee75c79786" ``` @@ -253,7 +253,7 @@ assertions are unmet and accepts correctly when they are met. ```task id: CUST-WP-0035-T06 -status: todo +status: done priority: low state_hub_task_id: "b9242cb4-5fb4-4e9e-9f16-9a1866cedc6a" ```