Files
the-custodian/workplans/CUST-WP-0035-task-flow-engine.md

312 lines
13 KiB
Markdown

---
id: CUST-WP-0035
type: workplan
title: "Task-Flow-Engine — Declarative Workstation and Requisite Model"
domain: custodian
repo: the-custodian
status: active
owner: custodian
topic_slug: custodian
created: "2026-04-30"
updated: "2026-04-30"
state_hub_workstream_id: "781e519b-0dd7-451b-b63c-fad50f999c9c"
---
# CUST-WP-0035 — Task-Flow-Engine
## Goal
Build a lightweight, declarative workflow substrate that replaces the
custodian's current hardcoded status enums and `_VALID_TRANSITIONS` dicts with
a generalised model of **workstations**, **information objects**, and
**requisite assertions**.
The core idea:
- **Information objects** are any entities that move through a lifecycle:
workstreams, tasks, contributions, capability requests, interface changes.
- **Workstations** are named positions an information object can occupy. They
are not fixed lifecycle stages — they are general nodes whose semantics are
defined by their entry and exit assertions.
- **Requisite assertions** are declarative predicates on data elements or
qualities of an information object or its environment. An assertion might
say "all child tasks have status `done`", "a human approval record exists",
or "the dependency workstream `X` is at workstation `completed`". Assertions
compose: a workstation is reachable when all its entry assertions are
satisfied; it is exitable when all its exit assertions are satisfied.
- **Transitions** are derived, not enumerated. Any workstation is reachable
from any other if the entry assertions are met. There are no hardcoded valid
transition tables. Blocked state is also derived — an object is blocked at
its current workstation when one or more exit assertions are unsatisfied, and
the engine surfaces exactly which assertions are failing and why.
This design supports the loose, flexible coupling of activities needed across
the custodian ecosystem. Work items don't march through a prescribed pipeline;
they move when their world matches what the target workstation requires.
The engine is designed to eventually live in its own repository
(`task-flow-engine`) as a reusable Python package, independent of the
state-hub. This workplan builds the first version inside the custodian, then
scopes the extraction boundary.
## T01: Design specification — workstations, assertions, flow definitions
```task
id: CUST-WP-0035-T01
status: done
priority: high
state_hub_task_id: "25ad9022-987f-4d30-b1a1-a96c4a83889a"
```
Write `state-hub/docs/task-flow-engine-spec.md` capturing the full data model
before any code is written. The spec must cover:
**Information Object:** any entity with a current workstation label and a
bag of observable properties. The engine is not coupled to a specific DB
schema — it receives a plain dict of properties.
**WorkstationDef:** `{name: str, entry_assertions: list[AssertionDef],
exit_assertions: list[AssertionDef], description: str}`. A workstation with
no assertions is always reachable / always exitable (unconstrained).
**AssertionDef:** `{id: str, target: str, op: str, value: Any,
description: str}`.
- `target` is a dot-path into the information object's properties:
`"tasks.*.status"`, `"dependencies.all.workstation"`, `"metadata.approved_by"`
- `op` is a predicate: `all_eq`, `any_eq`, `none_eq`, `exists`, `count_gte`,
`custom` (for assertions that call back into the engine host)
- Assertions are pure — they do not mutate state
**FlowDef:** `{id: str, entity_type: str, workstations: list[WorkstationDef]}`.
A flow definition is a named graph. Multiple flows can exist per entity type
(e.g., a "lightweight" flow and a "governance" flow for workstreams).
**Transition:** not a first-class type. The engine derives valid next
workstations by evaluating entry assertions of all workstations in the flow
against the current object state. The caller sees: current workstation,
satisfied exit assertions, unsatisfied exit assertions (blocking reasons),
reachable workstations, unreachable workstations with the blocking assertion
for each.
**FlowResult:** `{current_workstation: str, exit_blocked: bool,
blocking_assertions: list[AssertionResult], reachable: list[str],
unreachable: list[{workstation: str, blocking: AssertionResult}]}`.
Acceptance: spec document exists; the model can express the full lifecycle of
workstreams, tasks, contributions, and capability requests without hardcoding
any domain knowledge into the engine itself.
## T02: Core Python library — pure engine, no FastAPI dependency
```task
id: CUST-WP-0035-T02
status: done
priority: high
state_hub_task_id: "df5ce1f2-a0d0-4f90-9629-c28c6021b909"
```
New package at `state-hub/task_flow_engine/`:
```
task_flow_engine/
__init__.py
models.py # AssertionDef, WorkstationDef, FlowDef, FlowResult dataclasses
evaluator.py # assertion evaluation logic
engine.py # FlowEngine.evaluate(obj: dict, flow: FlowDef) -> FlowResult
builtins.py # built-in op implementations: all_eq, any_eq, exists, count_gte, …
```
Design constraints:
- No SQLAlchemy, no FastAPI, no HTTP — this is a pure computation library
- `FlowEngine.evaluate()` takes a plain `dict` (the information object's
properties) and a `FlowDef`, returns a `FlowResult`
- `FlowDef` instances can be loaded from YAML or constructed in code; the
engine does not care
- The `custom` op accepts a callable injected by the host — keeping the engine
pure while allowing host-specific assertions (e.g., "has a linked approval
decision in the DB")
Acceptance: unit tests in `state-hub/tests/test_task_flow_engine.py` cover:
- object with all assertions satisfied → correct reachable workstations
- object with one failing exit assertion → `exit_blocked: true` with the
specific assertion identified
- custom op callable invoked correctly
- empty flow def (no assertions) → all workstations reachable
- circular reference in target path → handled without infinite loop
## T03: Flow definitions for existing custodian entities
```task
id: CUST-WP-0035-T03
status: done
priority: high
state_hub_task_id: "3d01fc77-0329-44ee-8a60-20a3de1c1d6e"
```
Write YAML flow definitions for the four entity types currently tracked in the
state-hub. Store them at `state-hub/flows/`:
**`workstream.yaml`** — replaces `WorkstreamStatus` enum:
- Workstations: `todo`, `active`, `blocked`, `completed`, `archived`
- `todo → active`: no entry assertions (planning is unconstrained)
- `active → completed`: exit assertion `tasks.all_done` = all tasks have
status `done` or `cancelled`
- `active → blocked`: exit assertion `dependencies.any_incomplete` (any
dependency workstream not yet at `completed`)
- `blocked → active`: entry assertion `dependencies.all_complete`
- `completed → archived`: no entry assertions
**`task.yaml`** — replaces the informal `todo | in_progress | blocked | done`
model:
- Workstations: `todo`, `in_progress`, `blocked`, `done`, `cancelled`
- `in_progress → blocked`: exit assertion `needs_human == false` (maps to the
existing `needs_human` flag)
- `blocked → in_progress`: entry assertion `needs_human == false`
- `in_progress → done`: no assertions beyond curator intent
**`contribution.yaml`** — replaces `_VALID_TRANSITIONS` dict in
`routers/contributions.py`:
- Workstations: `draft`, `submitted`, `acknowledged`, `accepted`, `merged`,
`rejected`, `withdrawn`
- Express the same lifecycle as the current dict but as assertion-annotated
workstation definitions, making the intent readable rather than just the
allowed edges
**`capability_request.yaml`** — replaces `_VALID_TRANSITIONS` in
`routers/capability_requests.py`
Acceptance: each YAML file loads as a valid `FlowDef`; running
`FlowEngine.evaluate()` on a representative set of existing DB entities (via
a test fixture) produces `FlowResult`s consistent with the current manual
status labels.
## T04: State-hub integration — migrate from enums to engine
```task
id: CUST-WP-0035-T04
status: todo
priority: high
state_hub_task_id: "db320d4e-cbcd-4787-a42c-e7cb109737a3"
```
**4a: Migrate `WorkstreamStatus` from SA Enum to `String(20)`**
Write an Alembic migration that alters the `workstreams.status` column from
the `WorkstreamStatus` enum type to `VARCHAR(20)`. Existing values (`active`,
`blocked`, `completed`, `archived`) are valid workstation names and survive
unchanged. Drop the `WorkstreamStatus` Python enum after migration; use plain
strings throughout. Follow the pattern already established by tasks
(`String(20)` with no SA Enum).
**4b: Replace `_VALID_TRANSITIONS` guards with engine evaluation**
In `routers/contributions.py` and `routers/capability_requests.py`: replace
the `_VALID_TRANSITIONS` dict lookup with `FlowEngine.evaluate()`. The router
loads the appropriate `FlowDef` (from the YAML files in T03), calls evaluate,
and returns 409 with a structured error body listing the failing assertions
if the target workstation is unreachable. The error body replaces the current
free-text `"transition not allowed"` message with machine-readable assertion
failures.
**4c: Derive `blocked` automatically in state summary**
In `routers/state.py`: instead of filtering `Workstream.status == 'blocked'`
directly, evaluate each active workstream against its flow definition and
surface it as effectively blocked when `exit_blocked: true`. This means the
`blocked` status on a workstream can be set automatically by the engine rather
than requiring manual `update_workstream_status("blocked")` calls.
Acceptance: existing API tests pass after migration; the state summary
`blocked_workstreams` count matches what the engine derives; a workstream
with all tasks done automatically surfaces as ready to move to `completed`.
## T05: MCP tools — flow-aware session orientation
```task
id: CUST-WP-0035-T05
status: todo
priority: medium
state_hub_task_id: "8ea7e49f-f1ad-4290-84f4-c1ee75c79786"
```
Three new tools in `mcp_server/server.py`:
- `get_flow_state(entity_type: str, entity_id: str)` — returns the
`FlowResult` for the given entity: current workstation, exit-blocking
assertions with human-readable reasons, and list of reachable workstations
- `advance_workstation(entity_type: str, entity_id: str,
target_workstation: str)` — attempts to move the entity to the target
workstation; returns the `FlowResult` on success, or a 409-equivalent with
the specific failing assertions if blocked
- `list_flow_definitions()` — returns the registered flow definitions with
their workstation names and assertion counts (orientation tool)
Update `get_state_summary()` and `get_domain_summary()` to include a
`blocked_reasons` field per blocked workstream so agents see not just that a
workstream is blocked but specifically which assertion is failing.
Acceptance: `get_flow_state("workstream", "<id>")` returns a readable result
for an existing workstream; `advance_workstation` refuses correctly when
assertions are unmet and accepts correctly when they are met.
## T06: Extraction boundary and future repo scope
```task
id: CUST-WP-0035-T06
status: todo
priority: low
state_hub_task_id: "b9242cb4-5fb4-4e9e-9f16-9a1866cedc6a"
```
Before closing this workplan, write a brief design note at
`canon/projects/custodian/task_flow_engine_scope_v0.1.md` that captures:
- What belongs in the standalone `task-flow-engine` package:
`models.py`, `evaluator.py`, `engine.py`, `builtins.py` — pure Python,
no custodian dependency
- What stays in the state-hub integration layer:
YAML flow definitions (domain-specific), DB migration, router changes, MCP
tools, custom op callables that query the DB
- The extraction path: once the engine is stable, `state-hub/task_flow_engine/`
is published as a separate pip package and re-imported as a dependency
- Register a new managed repo concept (`task-flow-engine`) in the capabilities
domain for when extraction happens
Also register an extension point:
```
ep_type: architecture
title: task-flow-engine extraction as standalone package
description: >
task_flow_engine/ is currently co-located in the state-hub. Extract to its
own repo and pip package once the API is stable after at least one non-trivial
flow definition has been running in production.
status: open
priority: low
```
Acceptance: design note file exists; extension point registered.
---
## Closing note — reference documentation cleanup
Once this workplan is complete and the task-flow-engine model is live, the
following custodian reference materials will need to be updated to reflect the
refined terminology (workstations, information objects, requisite assertions)
and to retire language that assumed fixed lifecycle enums:
- `state-hub/dashboard/src/docs/` — any page describing workstream or task
lifecycle, status values, or contribution flows
- `state-hub/policies/repo-doi.md` — references to task/workstream status
checks that assume specific enum values
- `agents/agent-scope-analyst.md` and other kaizen agents that reference
status transitions by name
- `CLAUDE.md` (global and project) — session protocol references to
`update_workstream_status()` and `update_task_status()` should be updated
to the `advance_workstation()` pattern
- `memory/MEMORY.md` entries covering the state-hub data model
This cleanup is intentionally deferred — the new terminology should stabilise
in practice before documentation is frozen. A dedicated workplan should be
opened at the close of T05 to track this.