Files
the-custodian/workplans/CUST-WP-0035-task-flow-engine.md
tegwick f99d77d539 chore(consistency): sync task status from DB [auto]
Updated by fix-consistency on 2026-05-01:
  - update .custodian-brief.md for the-custodian
2026-05-01 00:55:53 +02:00

13 KiB

id, type, title, domain, repo, status, owner, topic_slug, created, updated, state_hub_workstream_id
id type title domain repo status owner topic_slug created updated state_hub_workstream_id
CUST-WP-0035 workplan Task-Flow-Engine — Declarative Workstation and Requisite Model custodian the-custodian todo custodian custodian 2026-04-30 2026-04-30 781e519b-0dd7-451b-b63c-fad50f999c9c

CUST-WP-0035 — Task-Flow-Engine

Goal

Build a lightweight, declarative workflow substrate that replaces the custodian's current hardcoded status enums and _VALID_TRANSITIONS dicts with a generalised model of workstations, information objects, and requisite assertions.

The core idea:

  • Information objects are any entities that move through a lifecycle: workstreams, tasks, contributions, capability requests, interface changes.
  • Workstations are named positions an information object can occupy. They are not fixed lifecycle stages — they are general nodes whose semantics are defined by their entry and exit assertions.
  • Requisite assertions are declarative predicates on data elements or qualities of an information object or its environment. An assertion might say "all child tasks have status done", "a human approval record exists", or "the dependency workstream X is at workstation completed". Assertions compose: a workstation is reachable when all its entry assertions are satisfied; it is exitable when all its exit assertions are satisfied.
  • Transitions are derived, not enumerated. Any workstation is reachable from any other if the entry assertions are met. There are no hardcoded valid transition tables. Blocked state is also derived — an object is blocked at its current workstation when one or more exit assertions are unsatisfied, and the engine surfaces exactly which assertions are failing and why.

This design supports the loose, flexible coupling of activities needed across the custodian ecosystem. Work items don't march through a prescribed pipeline; they move when their world matches what the target workstation requires.

The engine is designed to eventually live in its own repository (task-flow-engine) as a reusable Python package, independent of the state-hub. This workplan builds the first version inside the custodian, then scopes the extraction boundary.

T01: Design specification — workstations, assertions, flow definitions

id: CUST-WP-0035-T01
status: todo
priority: high
state_hub_task_id: "25ad9022-987f-4d30-b1a1-a96c4a83889a"

Write state-hub/docs/task-flow-engine-spec.md capturing the full data model before any code is written. The spec must cover:

Information Object: any entity with a current workstation label and a bag of observable properties. The engine is not coupled to a specific DB schema — it receives a plain dict of properties.

WorkstationDef: {name: str, entry_assertions: list[AssertionDef], exit_assertions: list[AssertionDef], description: str}. A workstation with no assertions is always reachable / always exitable (unconstrained).

AssertionDef: {id: str, target: str, op: str, value: Any, description: str}.

  • target is a dot-path into the information object's properties: "tasks.*.status", "dependencies.all.workstation", "metadata.approved_by"
  • op is a predicate: all_eq, any_eq, none_eq, exists, count_gte, custom (for assertions that call back into the engine host)
  • Assertions are pure — they do not mutate state

FlowDef: {id: str, entity_type: str, workstations: list[WorkstationDef]}. A flow definition is a named graph. Multiple flows can exist per entity type (e.g., a "lightweight" flow and a "governance" flow for workstreams).

Transition: not a first-class type. The engine derives valid next workstations by evaluating entry assertions of all workstations in the flow against the current object state. The caller sees: current workstation, satisfied exit assertions, unsatisfied exit assertions (blocking reasons), reachable workstations, unreachable workstations with the blocking assertion for each.

FlowResult: {current_workstation: str, exit_blocked: bool, blocking_assertions: list[AssertionResult], reachable: list[str], unreachable: list[{workstation: str, blocking: AssertionResult}]}.

Acceptance: spec document exists; the model can express the full lifecycle of workstreams, tasks, contributions, and capability requests without hardcoding any domain knowledge into the engine itself.

T02: Core Python library — pure engine, no FastAPI dependency

id: CUST-WP-0035-T02
status: todo
priority: high
state_hub_task_id: "df5ce1f2-a0d0-4f90-9629-c28c6021b909"

New package at state-hub/task_flow_engine/:

task_flow_engine/
  __init__.py
  models.py        # AssertionDef, WorkstationDef, FlowDef, FlowResult dataclasses
  evaluator.py     # assertion evaluation logic
  engine.py        # FlowEngine.evaluate(obj: dict, flow: FlowDef) -> FlowResult
  builtins.py      # built-in op implementations: all_eq, any_eq, exists, count_gte, …

Design constraints:

  • No SQLAlchemy, no FastAPI, no HTTP — this is a pure computation library
  • FlowEngine.evaluate() takes a plain dict (the information object's properties) and a FlowDef, returns a FlowResult
  • FlowDef instances can be loaded from YAML or constructed in code; the engine does not care
  • The custom op accepts a callable injected by the host — keeping the engine pure while allowing host-specific assertions (e.g., "has a linked approval decision in the DB")

Acceptance: unit tests in state-hub/tests/test_task_flow_engine.py cover:

  • object with all assertions satisfied → correct reachable workstations
  • object with one failing exit assertion → exit_blocked: true with the specific assertion identified
  • custom op callable invoked correctly
  • empty flow def (no assertions) → all workstations reachable
  • circular reference in target path → handled without infinite loop

T03: Flow definitions for existing custodian entities

id: CUST-WP-0035-T03
status: todo
priority: high
state_hub_task_id: "3d01fc77-0329-44ee-8a60-20a3de1c1d6e"

Write YAML flow definitions for the four entity types currently tracked in the state-hub. Store them at state-hub/flows/:

workstream.yaml — replaces WorkstreamStatus enum:

  • Workstations: todo, active, blocked, completed, archived
  • todo → active: no entry assertions (planning is unconstrained)
  • active → completed: exit assertion tasks.all_done = all tasks have status done or cancelled
  • active → blocked: exit assertion dependencies.any_incomplete (any dependency workstream not yet at completed)
  • blocked → active: entry assertion dependencies.all_complete
  • completed → archived: no entry assertions

task.yaml — replaces the informal todo | in_progress | blocked | done model:

  • Workstations: todo, in_progress, blocked, done, cancelled
  • in_progress → blocked: exit assertion needs_human == false (maps to the existing needs_human flag)
  • blocked → in_progress: entry assertion needs_human == false
  • in_progress → done: no assertions beyond curator intent

contribution.yaml — replaces _VALID_TRANSITIONS dict in routers/contributions.py:

  • Workstations: draft, submitted, acknowledged, accepted, merged, rejected, withdrawn
  • Express the same lifecycle as the current dict but as assertion-annotated workstation definitions, making the intent readable rather than just the allowed edges

capability_request.yaml — replaces _VALID_TRANSITIONS in routers/capability_requests.py

Acceptance: each YAML file loads as a valid FlowDef; running FlowEngine.evaluate() on a representative set of existing DB entities (via a test fixture) produces FlowResults consistent with the current manual status labels.

T04: State-hub integration — migrate from enums to engine

id: CUST-WP-0035-T04
status: todo
priority: high
state_hub_task_id: "db320d4e-cbcd-4787-a42c-e7cb109737a3"

4a: Migrate WorkstreamStatus from SA Enum to String(20)

Write an Alembic migration that alters the workstreams.status column from the WorkstreamStatus enum type to VARCHAR(20). Existing values (active, blocked, completed, archived) are valid workstation names and survive unchanged. Drop the WorkstreamStatus Python enum after migration; use plain strings throughout. Follow the pattern already established by tasks (String(20) with no SA Enum).

4b: Replace _VALID_TRANSITIONS guards with engine evaluation

In routers/contributions.py and routers/capability_requests.py: replace the _VALID_TRANSITIONS dict lookup with FlowEngine.evaluate(). The router loads the appropriate FlowDef (from the YAML files in T03), calls evaluate, and returns 409 with a structured error body listing the failing assertions if the target workstation is unreachable. The error body replaces the current free-text "transition not allowed" message with machine-readable assertion failures.

4c: Derive blocked automatically in state summary

In routers/state.py: instead of filtering Workstream.status == 'blocked' directly, evaluate each active workstream against its flow definition and surface it as effectively blocked when exit_blocked: true. This means the blocked status on a workstream can be set automatically by the engine rather than requiring manual update_workstream_status("blocked") calls.

Acceptance: existing API tests pass after migration; the state summary blocked_workstreams count matches what the engine derives; a workstream with all tasks done automatically surfaces as ready to move to completed.

T05: MCP tools — flow-aware session orientation

id: CUST-WP-0035-T05
status: todo
priority: medium
state_hub_task_id: "8ea7e49f-f1ad-4290-84f4-c1ee75c79786"

Three new tools in mcp_server/server.py:

  • get_flow_state(entity_type: str, entity_id: str) — returns the FlowResult for the given entity: current workstation, exit-blocking assertions with human-readable reasons, and list of reachable workstations
  • advance_workstation(entity_type: str, entity_id: str, target_workstation: str) — attempts to move the entity to the target workstation; returns the FlowResult on success, or a 409-equivalent with the specific failing assertions if blocked
  • list_flow_definitions() — returns the registered flow definitions with their workstation names and assertion counts (orientation tool)

Update get_state_summary() and get_domain_summary() to include a blocked_reasons field per blocked workstream so agents see not just that a workstream is blocked but specifically which assertion is failing.

Acceptance: get_flow_state("workstream", "<id>") returns a readable result for an existing workstream; advance_workstation refuses correctly when assertions are unmet and accepts correctly when they are met.

T06: Extraction boundary and future repo scope

id: CUST-WP-0035-T06
status: todo
priority: low
state_hub_task_id: "b9242cb4-5fb4-4e9e-9f16-9a1866cedc6a"

Before closing this workplan, write a brief design note at canon/projects/custodian/task_flow_engine_scope_v0.1.md that captures:

  • What belongs in the standalone task-flow-engine package: models.py, evaluator.py, engine.py, builtins.py — pure Python, no custodian dependency
  • What stays in the state-hub integration layer: YAML flow definitions (domain-specific), DB migration, router changes, MCP tools, custom op callables that query the DB
  • The extraction path: once the engine is stable, state-hub/task_flow_engine/ is published as a separate pip package and re-imported as a dependency
  • Register a new managed repo concept (task-flow-engine) in the capabilities domain for when extraction happens

Also register an extension point:

ep_type: architecture
title: task-flow-engine extraction as standalone package
description: >
  task_flow_engine/ is currently co-located in the state-hub. Extract to its
  own repo and pip package once the API is stable after at least one non-trivial
  flow definition has been running in production.
status: open
priority: low

Acceptance: design note file exists; extension point registered.


Closing note — reference documentation cleanup

Once this workplan is complete and the task-flow-engine model is live, the following custodian reference materials will need to be updated to reflect the refined terminology (workstations, information objects, requisite assertions) and to retire language that assumed fixed lifecycle enums:

  • state-hub/dashboard/src/docs/ — any page describing workstream or task lifecycle, status values, or contribution flows
  • state-hub/policies/repo-doi.md — references to task/workstream status checks that assume specific enum values
  • agents/agent-scope-analyst.md and other kaizen agents that reference status transitions by name
  • CLAUDE.md (global and project) — session protocol references to update_workstream_status() and update_task_status() should be updated to the advance_workstation() pattern
  • memory/MEMORY.md entries covering the state-hub data model

This cleanup is intentionally deferred — the new terminology should stabilise in practice before documentation is frozen. A dedicated workplan should be opened at the close of T05 to track this.