--- id: CUST-WP-0035 type: workplan title: "Task-Flow-Engine — Declarative Workstation and Requisite Model" domain: custodian repo: the-custodian status: todo owner: custodian topic_slug: custodian created: "2026-04-30" updated: "2026-04-30" state_hub_workstream_id: "781e519b-0dd7-451b-b63c-fad50f999c9c" --- # CUST-WP-0035 — Task-Flow-Engine ## Goal Build a lightweight, declarative workflow substrate that replaces the custodian's current hardcoded status enums and `_VALID_TRANSITIONS` dicts with a generalised model of **workstations**, **information objects**, and **requisite assertions**. The core idea: - **Information objects** are any entities that move through a lifecycle: workstreams, tasks, contributions, capability requests, interface changes. - **Workstations** are named positions an information object can occupy. They are not fixed lifecycle stages — they are general nodes whose semantics are defined by their entry and exit assertions. - **Requisite assertions** are declarative predicates on data elements or qualities of an information object or its environment. An assertion might say "all child tasks have status `done`", "a human approval record exists", or "the dependency workstream `X` is at workstation `completed`". Assertions compose: a workstation is reachable when all its entry assertions are satisfied; it is exitable when all its exit assertions are satisfied. - **Transitions** are derived, not enumerated. Any workstation is reachable from any other if the entry assertions are met. There are no hardcoded valid transition tables. Blocked state is also derived — an object is blocked at its current workstation when one or more exit assertions are unsatisfied, and the engine surfaces exactly which assertions are failing and why. This design supports the loose, flexible coupling of activities needed across the custodian ecosystem. Work items don't march through a prescribed pipeline; they move when their world matches what the target workstation requires. The engine is designed to eventually live in its own repository (`task-flow-engine`) as a reusable Python package, independent of the state-hub. This workplan builds the first version inside the custodian, then scopes the extraction boundary. ## T01: Design specification — workstations, assertions, flow definitions ```task id: CUST-WP-0035-T01 status: todo priority: high state_hub_task_id: "25ad9022-987f-4d30-b1a1-a96c4a83889a" ``` Write `state-hub/docs/task-flow-engine-spec.md` capturing the full data model before any code is written. The spec must cover: **Information Object:** any entity with a current workstation label and a bag of observable properties. The engine is not coupled to a specific DB schema — it receives a plain dict of properties. **WorkstationDef:** `{name: str, entry_assertions: list[AssertionDef], exit_assertions: list[AssertionDef], description: str}`. A workstation with no assertions is always reachable / always exitable (unconstrained). **AssertionDef:** `{id: str, target: str, op: str, value: Any, description: str}`. - `target` is a dot-path into the information object's properties: `"tasks.*.status"`, `"dependencies.all.workstation"`, `"metadata.approved_by"` - `op` is a predicate: `all_eq`, `any_eq`, `none_eq`, `exists`, `count_gte`, `custom` (for assertions that call back into the engine host) - Assertions are pure — they do not mutate state **FlowDef:** `{id: str, entity_type: str, workstations: list[WorkstationDef]}`. A flow definition is a named graph. Multiple flows can exist per entity type (e.g., a "lightweight" flow and a "governance" flow for workstreams). **Transition:** not a first-class type. The engine derives valid next workstations by evaluating entry assertions of all workstations in the flow against the current object state. The caller sees: current workstation, satisfied exit assertions, unsatisfied exit assertions (blocking reasons), reachable workstations, unreachable workstations with the blocking assertion for each. **FlowResult:** `{current_workstation: str, exit_blocked: bool, blocking_assertions: list[AssertionResult], reachable: list[str], unreachable: list[{workstation: str, blocking: AssertionResult}]}`. Acceptance: spec document exists; the model can express the full lifecycle of workstreams, tasks, contributions, and capability requests without hardcoding any domain knowledge into the engine itself. ## T02: Core Python library — pure engine, no FastAPI dependency ```task id: CUST-WP-0035-T02 status: todo priority: high state_hub_task_id: "df5ce1f2-a0d0-4f90-9629-c28c6021b909" ``` New package at `state-hub/task_flow_engine/`: ``` task_flow_engine/ __init__.py models.py # AssertionDef, WorkstationDef, FlowDef, FlowResult dataclasses evaluator.py # assertion evaluation logic engine.py # FlowEngine.evaluate(obj: dict, flow: FlowDef) -> FlowResult builtins.py # built-in op implementations: all_eq, any_eq, exists, count_gte, … ``` Design constraints: - No SQLAlchemy, no FastAPI, no HTTP — this is a pure computation library - `FlowEngine.evaluate()` takes a plain `dict` (the information object's properties) and a `FlowDef`, returns a `FlowResult` - `FlowDef` instances can be loaded from YAML or constructed in code; the engine does not care - The `custom` op accepts a callable injected by the host — keeping the engine pure while allowing host-specific assertions (e.g., "has a linked approval decision in the DB") Acceptance: unit tests in `state-hub/tests/test_task_flow_engine.py` cover: - object with all assertions satisfied → correct reachable workstations - object with one failing exit assertion → `exit_blocked: true` with the specific assertion identified - custom op callable invoked correctly - empty flow def (no assertions) → all workstations reachable - circular reference in target path → handled without infinite loop ## T03: Flow definitions for existing custodian entities ```task id: CUST-WP-0035-T03 status: todo priority: high state_hub_task_id: "3d01fc77-0329-44ee-8a60-20a3de1c1d6e" ``` Write YAML flow definitions for the four entity types currently tracked in the state-hub. Store them at `state-hub/flows/`: **`workstream.yaml`** — replaces `WorkstreamStatus` enum: - Workstations: `todo`, `active`, `blocked`, `completed`, `archived` - `todo → active`: no entry assertions (planning is unconstrained) - `active → completed`: exit assertion `tasks.all_done` = all tasks have status `done` or `cancelled` - `active → blocked`: exit assertion `dependencies.any_incomplete` (any dependency workstream not yet at `completed`) - `blocked → active`: entry assertion `dependencies.all_complete` - `completed → archived`: no entry assertions **`task.yaml`** — replaces the informal `todo | in_progress | blocked | done` model: - Workstations: `todo`, `in_progress`, `blocked`, `done`, `cancelled` - `in_progress → blocked`: exit assertion `needs_human == false` (maps to the existing `needs_human` flag) - `blocked → in_progress`: entry assertion `needs_human == false` - `in_progress → done`: no assertions beyond curator intent **`contribution.yaml`** — replaces `_VALID_TRANSITIONS` dict in `routers/contributions.py`: - Workstations: `draft`, `submitted`, `acknowledged`, `accepted`, `merged`, `rejected`, `withdrawn` - Express the same lifecycle as the current dict but as assertion-annotated workstation definitions, making the intent readable rather than just the allowed edges **`capability_request.yaml`** — replaces `_VALID_TRANSITIONS` in `routers/capability_requests.py` Acceptance: each YAML file loads as a valid `FlowDef`; running `FlowEngine.evaluate()` on a representative set of existing DB entities (via a test fixture) produces `FlowResult`s consistent with the current manual status labels. ## T04: State-hub integration — migrate from enums to engine ```task id: CUST-WP-0035-T04 status: todo priority: high state_hub_task_id: "db320d4e-cbcd-4787-a42c-e7cb109737a3" ``` **4a: Migrate `WorkstreamStatus` from SA Enum to `String(20)`** Write an Alembic migration that alters the `workstreams.status` column from the `WorkstreamStatus` enum type to `VARCHAR(20)`. Existing values (`active`, `blocked`, `completed`, `archived`) are valid workstation names and survive unchanged. Drop the `WorkstreamStatus` Python enum after migration; use plain strings throughout. Follow the pattern already established by tasks (`String(20)` with no SA Enum). **4b: Replace `_VALID_TRANSITIONS` guards with engine evaluation** In `routers/contributions.py` and `routers/capability_requests.py`: replace the `_VALID_TRANSITIONS` dict lookup with `FlowEngine.evaluate()`. The router loads the appropriate `FlowDef` (from the YAML files in T03), calls evaluate, and returns 409 with a structured error body listing the failing assertions if the target workstation is unreachable. The error body replaces the current free-text `"transition not allowed"` message with machine-readable assertion failures. **4c: Derive `blocked` automatically in state summary** In `routers/state.py`: instead of filtering `Workstream.status == 'blocked'` directly, evaluate each active workstream against its flow definition and surface it as effectively blocked when `exit_blocked: true`. This means the `blocked` status on a workstream can be set automatically by the engine rather than requiring manual `update_workstream_status("blocked")` calls. Acceptance: existing API tests pass after migration; the state summary `blocked_workstreams` count matches what the engine derives; a workstream with all tasks done automatically surfaces as ready to move to `completed`. ## T05: MCP tools — flow-aware session orientation ```task id: CUST-WP-0035-T05 status: todo priority: medium state_hub_task_id: "8ea7e49f-f1ad-4290-84f4-c1ee75c79786" ``` Three new tools in `mcp_server/server.py`: - `get_flow_state(entity_type: str, entity_id: str)` — returns the `FlowResult` for the given entity: current workstation, exit-blocking assertions with human-readable reasons, and list of reachable workstations - `advance_workstation(entity_type: str, entity_id: str, target_workstation: str)` — attempts to move the entity to the target workstation; returns the `FlowResult` on success, or a 409-equivalent with the specific failing assertions if blocked - `list_flow_definitions()` — returns the registered flow definitions with their workstation names and assertion counts (orientation tool) Update `get_state_summary()` and `get_domain_summary()` to include a `blocked_reasons` field per blocked workstream so agents see not just that a workstream is blocked but specifically which assertion is failing. Acceptance: `get_flow_state("workstream", "")` returns a readable result for an existing workstream; `advance_workstation` refuses correctly when assertions are unmet and accepts correctly when they are met. ## T06: Extraction boundary and future repo scope ```task id: CUST-WP-0035-T06 status: todo priority: low state_hub_task_id: "b9242cb4-5fb4-4e9e-9f16-9a1866cedc6a" ``` Before closing this workplan, write a brief design note at `canon/projects/custodian/task_flow_engine_scope_v0.1.md` that captures: - What belongs in the standalone `task-flow-engine` package: `models.py`, `evaluator.py`, `engine.py`, `builtins.py` — pure Python, no custodian dependency - What stays in the state-hub integration layer: YAML flow definitions (domain-specific), DB migration, router changes, MCP tools, custom op callables that query the DB - The extraction path: once the engine is stable, `state-hub/task_flow_engine/` is published as a separate pip package and re-imported as a dependency - Register a new managed repo concept (`task-flow-engine`) in the capabilities domain for when extraction happens Also register an extension point: ``` ep_type: architecture title: task-flow-engine extraction as standalone package description: > task_flow_engine/ is currently co-located in the state-hub. Extract to its own repo and pip package once the API is stable after at least one non-trivial flow definition has been running in production. status: open priority: low ``` Acceptance: design note file exists; extension point registered. --- ## Closing note — reference documentation cleanup Once this workplan is complete and the task-flow-engine model is live, the following custodian reference materials will need to be updated to reflect the refined terminology (workstations, information objects, requisite assertions) and to retire language that assumed fixed lifecycle enums: - `state-hub/dashboard/src/docs/` — any page describing workstream or task lifecycle, status values, or contribution flows - `state-hub/policies/repo-doi.md` — references to task/workstream status checks that assume specific enum values - `agents/agent-scope-analyst.md` and other kaizen agents that reference status transitions by name - `CLAUDE.md` (global and project) — session protocol references to `update_workstream_status()` and `update_task_status()` should be updated to the `advance_workstation()` pattern - `memory/MEMORY.md` entries covering the state-hub data model This cleanup is intentionally deferred — the new terminology should stabilise in practice before documentation is frozen. A dedicated workplan should be opened at the close of T05 to track this.