diff --git a/docs/task-flow-engine-spec.md b/docs/task-flow-engine-spec.md new file mode 100644 index 0000000..f44e831 --- /dev/null +++ b/docs/task-flow-engine-spec.md @@ -0,0 +1,200 @@ +--- +id: task-flow-engine-spec +type: design-spec +title: "Task Flow Engine Specification" +status: draft +created: "2026-05-01" +updated: "2026-05-01" +--- + +# Task Flow Engine Specification + +## Purpose + +The task flow engine is a lightweight, declarative workflow substrate for +information objects that move through named workstations. It replaces local +status enums and hardcoded transition tables with pure assertions over an +object's observable properties. + +The engine is intentionally small: it receives a plain dictionary plus a flow +definition, evaluates assertions, and returns a machine-readable result. It +does not know about SQLAlchemy, FastAPI, State Hub routers, or Custodian domain +rules. + +## Core Terms + +### Information Object + +An information object is any entity with: + +- a current workstation label, usually exposed as `workstation` or `status` +- a bag of observable properties +- optional nested collections of related entities + +Examples include workstreams, tasks, contributions, capability requests, and +future interface changes. The engine treats all of them as plain dictionaries. + +### WorkstationDef + +A workstation is a named position an information object can occupy. + +```yaml +name: active +description: Work is underway. +entry_assertions: [] +exit_assertions: + - id: tasks.all_done + target: tasks.*.status + op: all_eq + value: [done, cancelled] + description: All child tasks are done or cancelled. +``` + +Schema: + +- `name: str` +- `entry_assertions: list[AssertionDef]` +- `exit_assertions: list[AssertionDef]` +- `description: str` + +A workstation with no entry assertions is always reachable. A workstation with +no exit assertions is always exitable. + +### AssertionDef + +An assertion is a pure predicate over object data. + +Schema: + +- `id: str` +- `target: str` +- `op: str` +- `value: Any` +- `description: str` + +The `target` is a dot path into the information object. It supports normal dict +and attribute traversal plus `*` for collection expansion: + +- `tasks.*.status` +- `dependencies.all.workstation` +- `metadata.approved_by` + +The built-in operations are: + +- `all_eq`: every resolved value equals the expected value, or is included in + the expected list +- `any_eq`: at least one resolved value equals the expected value, or is + included in the expected list +- `none_eq`: no resolved values equal the expected value, or are included in + the expected list +- `exists`: at least one non-empty value resolves +- `count_gte`: the number of resolved values is greater than or equal to the + expected integer +- `custom`: delegates evaluation to a host-injected callable + +Assertions never mutate state. + +### FlowDef + +A flow definition is a named workstation graph for one entity type. + +Schema: + +- `id: str` +- `entity_type: str` +- `workstations: list[WorkstationDef]` + +Multiple flows may exist for the same entity type, for example a lightweight +workstream flow and a governance-heavy workstream flow. + +### Transition + +Transition is not a first-class model. The engine derives reachable +workstations by evaluating every workstation's entry assertions against the +current object state. If the assertions for a target workstation are satisfied, +that workstation is reachable from the current workstation. + +The current workstation's exit assertions determine whether the object is +blocked where it is. Unsatisfied exit assertions become blocking reasons. + +### FlowResult + +Evaluation returns: + +```yaml +current_workstation: active +exit_blocked: true +blocking_assertions: + - id: tasks.all_done + passed: false + reason: "Expected all values at tasks.*.status to be in ['done', 'cancelled']; got ['done', 'todo']." +reachable: + - todo + - active +unreachable: + - workstation: completed + blocking: + id: tasks.all_done + passed: false + reason: "Expected all values at tasks.*.status to be in ['done', 'cancelled']; got ['done', 'todo']." +``` + +Schema: + +- `current_workstation: str` +- `exit_blocked: bool` +- `blocking_assertions: list[AssertionResult]` +- `reachable: list[str]` +- `unreachable: list[UnreachableWorkstation]` + +## Expressiveness Across Existing Entities + +### Workstreams + +Workstreams can express readiness for completion by asserting that child tasks +are `done` or `cancelled`. They can express dependency blocking by checking that +all dependency workstreams have reached `completed`. + +### Tasks + +Tasks can express human intervention with the existing `needs_human` flag. +Returning from `blocked` to `in_progress` is an entry assertion over that same +flag. Lightweight completion remains unconstrained because curator intent is +the deciding signal. + +### Contributions + +Contributions can reproduce the current draft, submitted, acknowledged, +accepted, merged, rejected, and withdrawn lifecycle by giving each workstation +entry assertions that describe which previous statuses may enter it. This keeps +the current lifecycle readable without baking domain transitions into engine +code. + +### Capability Requests + +Capability requests can reproduce the existing requested, routing disputed, +accepted, in progress, ready for review, completed, rejected, and withdrawn +lifecycle the same way. Host-specific effects such as notifications remain in +the State Hub router; the flow engine only answers whether the target +workstation is reachable. + +## Host Boundary + +The engine owns: + +- dataclasses for flow definitions and results +- target path resolution +- built-in predicate evaluation +- host-injected custom predicate dispatch +- reachable and blocked derivation + +State Hub owns: + +- loading domain-specific YAML flow definitions +- converting ORM entities into plain dictionaries +- migrations from enum-backed status fields to strings +- router side effects such as timestamps and notifications +- MCP tools and user-facing explanations + +This boundary keeps the first implementation extractable into a standalone +`task-flow-engine` package once the API stabilizes. diff --git a/flows/capability_request.yaml b/flows/capability_request.yaml new file mode 100644 index 0000000..503b9d1 --- /dev/null +++ b/flows/capability_request.yaml @@ -0,0 +1,84 @@ +id: custodian.capability_request.v1 +entity_type: capability_request +workstations: + - name: requested + description: Capability has been requested and awaits routing or acceptance. + entry_assertions: [] + exit_assertions: [] + - name: routing_disputed + description: Routing decision has been disputed. + entry_assertions: + - id: capability_request.disputable_source + target: previous_workstation + op: any_eq + value: + - requested + description: Only requested items can enter routing dispute. + exit_assertions: [] + - name: accepted + description: A provider has accepted the capability request. + entry_assertions: + - id: capability_request.acceptable_source + target: previous_workstation + op: any_eq + value: + - requested + description: Acceptance follows the requested workstation. + exit_assertions: [] + - name: in_progress + description: Provider work is underway. + entry_assertions: + - id: capability_request.progress_source + target: previous_workstation + op: any_eq + value: + - accepted + - ready_for_review + description: Work can start after acceptance or return from review. + exit_assertions: [] + - name: ready_for_review + description: Capability is ready for requester review. + entry_assertions: + - id: capability_request.review_source + target: previous_workstation + op: any_eq + value: + - in_progress + description: Review follows in-progress work. + exit_assertions: [] + - name: completed + description: Capability request has been completed. + entry_assertions: + - id: capability_request.completion_source + target: previous_workstation + op: any_eq + value: + - ready_for_review + description: Completion follows review readiness. + exit_assertions: [] + - name: rejected + description: Capability request was rejected. + entry_assertions: + - id: capability_request.rejectable_source + target: previous_workstation + op: any_eq + value: + - requested + - accepted + - in_progress + description: Requested through in-progress items can be rejected. + exit_assertions: [] + - name: withdrawn + description: Capability request was withdrawn. + entry_assertions: + - id: capability_request.withdrawable_source + target: previous_workstation + op: any_eq + value: + - requested + - routing_disputed + - accepted + - in_progress + - ready_for_review + description: Non-terminal active items can be withdrawn. + exit_assertions: [] diff --git a/flows/contribution.yaml b/flows/contribution.yaml new file mode 100644 index 0000000..96825b8 --- /dev/null +++ b/flows/contribution.yaml @@ -0,0 +1,71 @@ +id: custodian.contribution.v1 +entity_type: contribution +workstations: + - name: draft + description: Local draft, not yet submitted. + entry_assertions: [] + exit_assertions: [] + - name: submitted + description: Submitted for acknowledgement. + entry_assertions: + - id: contribution.from_draft + target: previous_workstation + op: any_eq + value: + - draft + description: Submitted contributions originate from draft. + exit_assertions: [] + - name: acknowledged + description: Submission has been acknowledged. + entry_assertions: + - id: contribution.from_submitted + target: previous_workstation + op: any_eq + value: + - submitted + description: Acknowledgement follows submission. + exit_assertions: [] + - name: accepted + description: Contribution has been accepted for merge. + entry_assertions: + - id: contribution.from_acknowledged + target: previous_workstation + op: any_eq + value: + - acknowledged + description: Acceptance follows acknowledgement. + exit_assertions: [] + - name: merged + description: Contribution has been merged. + entry_assertions: + - id: contribution.from_accepted + target: previous_workstation + op: any_eq + value: + - accepted + description: Merge follows acceptance. + exit_assertions: [] + - name: rejected + description: Contribution was rejected. + entry_assertions: + - id: contribution.rejectable_source + target: previous_workstation + op: any_eq + value: + - submitted + - acknowledged + description: Only submitted or acknowledged contributions can be rejected. + exit_assertions: [] + - name: withdrawn + description: Contribution was withdrawn by its owner. + entry_assertions: + - id: contribution.withdrawable_source + target: previous_workstation + op: any_eq + value: + - draft + - submitted + - acknowledged + - accepted + description: Draft through accepted contributions can be withdrawn. + exit_assertions: [] diff --git a/flows/task.yaml b/flows/task.yaml new file mode 100644 index 0000000..600ff98 --- /dev/null +++ b/flows/task.yaml @@ -0,0 +1,43 @@ +id: custodian.task.v1 +entity_type: task +workstations: + - name: todo + description: Task is known but not currently underway. + entry_assertions: [] + exit_assertions: [] + - name: in_progress + description: Task is being actively worked. + entry_assertions: + - id: task.needs_human_false + target: needs_human + op: all_eq + value: false + description: Human intervention is not currently required. + exit_assertions: + - id: task.needs_human_false + target: needs_human + op: all_eq + value: false + description: Human intervention is not currently required. + - name: blocked + description: Task is blocked by a human decision or unavailable input. + entry_assertions: + - id: task.needs_human_true + target: needs_human + op: all_eq + value: true + description: The task requires human intervention. + exit_assertions: + - id: task.needs_human_false + target: needs_human + op: all_eq + value: false + description: Human intervention has been cleared. + - name: done + description: Task is complete. + entry_assertions: [] + exit_assertions: [] + - name: cancelled + description: Task is intentionally not being completed. + entry_assertions: [] + exit_assertions: [] diff --git a/flows/workstream.yaml b/flows/workstream.yaml new file mode 100644 index 0000000..3e4a6be --- /dev/null +++ b/flows/workstream.yaml @@ -0,0 +1,45 @@ +id: custodian.workstream.v1 +entity_type: workstream +workstations: + - name: todo + description: Planned but not yet active. + entry_assertions: [] + exit_assertions: [] + - name: active + description: Work is underway. + entry_assertions: [] + exit_assertions: + - id: dependencies.all_complete + target: dependencies.*.workstation + op: all_eq + value: completed + description: Dependency workstreams have reached completed. + - name: blocked + description: Work is blocked by incomplete dependencies or missing input. + entry_assertions: + - id: dependencies.any_incomplete + target: dependencies.*.workstation + op: custom + value: completed + description: At least one dependency is not completed. + exit_assertions: + - id: dependencies.all_complete + target: dependencies.*.workstation + op: all_eq + value: completed + description: All dependency workstreams have reached completed. + - name: completed + description: Work is complete. + entry_assertions: + - id: tasks.all_done + target: tasks.*.status + op: all_eq + value: + - done + - cancelled + description: All child tasks are done or cancelled. + exit_assertions: [] + - name: archived + description: Completed work has been moved out of the active set. + entry_assertions: [] + exit_assertions: [] diff --git a/pyproject.toml b/pyproject.toml index 3f351c0..de76f91 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,7 @@ requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] -packages = ["api", "mcp_server"] +packages = ["api", "mcp_server", "task_flow_engine"] artifacts = ["custodian_cli.py"] [tool.uv.sources] diff --git a/task_flow_engine/__init__.py b/task_flow_engine/__init__.py new file mode 100644 index 0000000..c7e0562 --- /dev/null +++ b/task_flow_engine/__init__.py @@ -0,0 +1,22 @@ +from task_flow_engine.engine import FlowEngine +from task_flow_engine.evaluator import AssertionEvaluator, resolve_target +from task_flow_engine.models import ( + AssertionDef, + AssertionResult, + FlowDef, + FlowResult, + UnreachableWorkstation, + WorkstationDef, +) + +__all__ = [ + "AssertionDef", + "AssertionEvaluator", + "AssertionResult", + "FlowDef", + "FlowEngine", + "FlowResult", + "UnreachableWorkstation", + "WorkstationDef", + "resolve_target", +] diff --git a/task_flow_engine/builtins.py b/task_flow_engine/builtins.py new file mode 100644 index 0000000..dfe5100 --- /dev/null +++ b/task_flow_engine/builtins.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from collections.abc import Sequence +from typing import Any + + +EMPTY_VALUES = (None, "", [], {}, ()) + + +def all_eq(values: list[Any], expected: Any) -> bool: + return all(_matches(value, expected) for value in values) + + +def any_eq(values: list[Any], expected: Any) -> bool: + return any(_matches(value, expected) for value in values) + + +def none_eq(values: list[Any], expected: Any) -> bool: + return all(not _matches(value, expected) for value in values) + + +def exists(values: list[Any], expected: Any = None) -> bool: + return any(value not in EMPTY_VALUES for value in values) + + +def count_gte(values: list[Any], expected: Any) -> bool: + try: + threshold = int(expected) + except (TypeError, ValueError): + return False + return len(values) >= threshold + + +def _matches(value: Any, expected: Any) -> bool: + if isinstance(expected, Sequence) and not isinstance(expected, (str, bytes, bytearray)): + return value in expected + return value == expected diff --git a/task_flow_engine/engine.py b/task_flow_engine/engine.py new file mode 100644 index 0000000..d005578 --- /dev/null +++ b/task_flow_engine/engine.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +from task_flow_engine.evaluator import AssertionEvaluator, CustomOp +from task_flow_engine.models import ( + AssertionResult, + FlowDef, + FlowResult, + UnreachableWorkstation, + WorkstationDef, +) + + +@dataclass +class FlowEngine: + custom_ops: dict[str, CustomOp] | None = None + + def evaluate(self, obj: dict[str, Any], flow: FlowDef) -> FlowResult: + evaluator = AssertionEvaluator(custom_ops=self.custom_ops) + current_name = str(obj.get("workstation") or obj.get("status") or "") + current = flow.workstation(current_name) + blocking_assertions = ( + self._failed_assertions(current.exit_assertions, obj, evaluator) + if current is not None + else [] + ) + + reachable: list[str] = [] + unreachable: list[UnreachableWorkstation] = [] + for workstation in flow.workstations: + failed = self._failed_assertions(workstation.entry_assertions, obj, evaluator) + if failed: + unreachable.append( + UnreachableWorkstation( + workstation=workstation.name, + blocking=failed[0], + ) + ) + else: + reachable.append(workstation.name) + + return FlowResult( + current_workstation=current_name, + exit_blocked=bool(blocking_assertions), + blocking_assertions=blocking_assertions, + reachable=reachable, + unreachable=unreachable, + ) + + def can_reach( + self, + obj: dict[str, Any], + flow: FlowDef, + target_workstation: str, + ) -> tuple[bool, list[AssertionResult]]: + workstation = flow.workstation(target_workstation) + if workstation is None: + return False, [ + AssertionResult( + id="flow.unknown_workstation", + passed=False, + target="workstation", + op="exists", + expected=target_workstation, + actual=[item.name for item in flow.workstations], + reason=f"Flow '{flow.id}' has no workstation '{target_workstation}'.", + ) + ] + evaluator = AssertionEvaluator(custom_ops=self.custom_ops) + failed = self._failed_assertions(workstation.entry_assertions, obj, evaluator) + return not failed, failed + + @staticmethod + def _failed_assertions( + assertions: list, + obj: dict[str, Any], + evaluator: AssertionEvaluator, + ) -> list[AssertionResult]: + results = [evaluator.evaluate(assertion, obj) for assertion in assertions] + return [result for result in results if not result.passed] diff --git a/task_flow_engine/evaluator.py b/task_flow_engine/evaluator.py new file mode 100644 index 0000000..279a6e9 --- /dev/null +++ b/task_flow_engine/evaluator.py @@ -0,0 +1,140 @@ +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass +from typing import Any + +from task_flow_engine import builtins +from task_flow_engine.models import AssertionDef, AssertionResult + + +CustomOp = Callable[[AssertionDef, dict[str, Any], list[Any]], bool | tuple[bool, str]] + + +@dataclass +class AssertionEvaluator: + custom_ops: dict[str, CustomOp] | None = None + max_nodes: int = 1_000 + + def evaluate(self, assertion: AssertionDef, obj: dict[str, Any]) -> AssertionResult: + values = resolve_target(obj, assertion.target, max_nodes=self.max_nodes) + passed, reason = self._evaluate(assertion, obj, values) + if not reason: + reason = _default_reason(assertion, values, passed) + return AssertionResult( + id=assertion.id, + passed=passed, + target=assertion.target, + op=assertion.op, + expected=assertion.value, + actual=values, + description=assertion.description, + reason=reason, + ) + + def _evaluate( + self, + assertion: AssertionDef, + obj: dict[str, Any], + values: list[Any], + ) -> tuple[bool, str]: + if assertion.op == "all_eq": + return builtins.all_eq(values, assertion.value), "" + if assertion.op == "any_eq": + return builtins.any_eq(values, assertion.value), "" + if assertion.op == "none_eq": + return builtins.none_eq(values, assertion.value), "" + if assertion.op == "exists": + return builtins.exists(values, assertion.value), "" + if assertion.op == "count_gte": + return builtins.count_gte(values, assertion.value), "" + if assertion.op == "custom": + return self._evaluate_custom(assertion, obj, values) + return False, f"Unknown assertion op '{assertion.op}'." + + def _evaluate_custom( + self, + assertion: AssertionDef, + obj: dict[str, Any], + values: list[Any], + ) -> tuple[bool, str]: + if not self.custom_ops or assertion.id not in self.custom_ops: + return False, f"No custom op registered for assertion '{assertion.id}'." + result = self.custom_ops[assertion.id](assertion, obj, values) + if isinstance(result, tuple): + passed, reason = result + return bool(passed), reason + return bool(result), "" + + +def resolve_target(obj: Any, target: str, max_nodes: int = 1_000) -> list[Any]: + if not target: + return [obj] + parts = target.split(".") + seen: set[int] = set() + values = _resolve(obj, parts, seen, max_nodes) + return [_scalarize(value) for value in values] + + +def _resolve(current: Any, parts: list[str], seen: set[int], max_nodes: int) -> list[Any]: + if len(seen) > max_nodes: + return [] + current_id = id(current) + if isinstance(current, (dict, list, tuple, set)) or hasattr(current, "__dict__"): + if current_id in seen: + return [] + seen.add(current_id) + + if not parts: + return [current] + + part, rest = parts[0], parts[1:] + if part == "*": + values: list[Any] = [] + for item in _iter_items(current): + values.extend(_resolve(item, rest, seen.copy(), max_nodes)) + return values + + next_value = _get_child(current, part) + if next_value is _MISSING: + return [] + return _resolve(next_value, rest, seen, max_nodes) + + +def _iter_items(current: Any) -> list[Any]: + if isinstance(current, dict): + return list(current.values()) + if isinstance(current, (list, tuple, set)): + return list(current) + return [] + + +_MISSING = object() + + +def _get_child(current: Any, part: str) -> Any: + if isinstance(current, dict): + return current.get(part, _MISSING) + if isinstance(current, (list, tuple)) and part.isdigit(): + index = int(part) + return current[index] if index < len(current) else _MISSING + return getattr(current, part, _MISSING) + + +def _scalarize(value: Any) -> Any: + if hasattr(value, "value") and not isinstance(value, (str, bytes, bytearray)): + return value.value + return value + + +def _default_reason(assertion: AssertionDef, values: list[Any], passed: bool) -> str: + if passed: + return f"Assertion '{assertion.id}' passed." + if assertion.op == "exists": + return f"Expected at least one non-empty value at {assertion.target}; got {values}." + if assertion.op == "count_gte": + return f"Expected at least {assertion.value} values at {assertion.target}; got {len(values)}." + return ( + f"Expected {assertion.op} at {assertion.target} with {assertion.value!r}; " + f"got {values!r}." + ) diff --git a/task_flow_engine/models.py b/task_flow_engine/models.py new file mode 100644 index 0000000..9f9de9c --- /dev/null +++ b/task_flow_engine/models.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + + +@dataclass(frozen=True) +class AssertionDef: + id: str + target: str + op: str + value: Any = None + description: str = "" + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "AssertionDef": + return cls( + id=str(data["id"]), + target=str(data.get("target", "")), + op=str(data["op"]), + value=data.get("value"), + description=str(data.get("description", "")), + ) + + +@dataclass(frozen=True) +class WorkstationDef: + name: str + entry_assertions: list[AssertionDef] = field(default_factory=list) + exit_assertions: list[AssertionDef] = field(default_factory=list) + description: str = "" + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "WorkstationDef": + return cls( + name=str(data["name"]), + description=str(data.get("description", "")), + entry_assertions=[ + AssertionDef.from_dict(item) + for item in data.get("entry_assertions", []) or [] + ], + exit_assertions=[ + AssertionDef.from_dict(item) + for item in data.get("exit_assertions", []) or [] + ], + ) + + +@dataclass(frozen=True) +class FlowDef: + id: str + entity_type: str + workstations: list[WorkstationDef] + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "FlowDef": + return cls( + id=str(data["id"]), + entity_type=str(data["entity_type"]), + workstations=[ + WorkstationDef.from_dict(item) + for item in data.get("workstations", []) or [] + ], + ) + + def workstation(self, name: str) -> WorkstationDef | None: + return next((item for item in self.workstations if item.name == name), None) + + +@dataclass(frozen=True) +class AssertionResult: + id: str + passed: bool + target: str + op: str + expected: Any = None + actual: list[Any] = field(default_factory=list) + description: str = "" + reason: str = "" + + +@dataclass(frozen=True) +class UnreachableWorkstation: + workstation: str + blocking: AssertionResult + + +@dataclass(frozen=True) +class FlowResult: + current_workstation: str + exit_blocked: bool + blocking_assertions: list[AssertionResult] + reachable: list[str] + unreachable: list[UnreachableWorkstation] diff --git a/tests/test_task_flow_engine.py b/tests/test_task_flow_engine.py new file mode 100644 index 0000000..cd58990 --- /dev/null +++ b/tests/test_task_flow_engine.py @@ -0,0 +1,170 @@ +from pathlib import Path + +import yaml + +from task_flow_engine import AssertionDef, FlowDef, FlowEngine, WorkstationDef, resolve_target + + +def test_all_assertions_satisfied_reports_reachable_workstations(): + flow = FlowDef( + id="workstream.test", + entity_type="workstream", + workstations=[ + WorkstationDef(name="active"), + WorkstationDef( + name="completed", + entry_assertions=[ + AssertionDef( + id="tasks.all_done", + target="tasks.*.status", + op="all_eq", + value=["done", "cancelled"], + ) + ], + ), + ], + ) + + result = FlowEngine().evaluate( + {"status": "active", "tasks": [{"status": "done"}, {"status": "cancelled"}]}, + flow, + ) + + assert result.exit_blocked is False + assert result.reachable == ["active", "completed"] + assert result.unreachable == [] + + +def test_failing_exit_assertion_identifies_blocking_assertion(): + flow = FlowDef( + id="workstream.test", + entity_type="workstream", + workstations=[ + WorkstationDef( + name="active", + exit_assertions=[ + AssertionDef( + id="tasks.all_done", + target="tasks.*.status", + op="all_eq", + value=["done", "cancelled"], + ) + ], + ) + ], + ) + + result = FlowEngine().evaluate( + {"status": "active", "tasks": [{"status": "done"}, {"status": "todo"}]}, + flow, + ) + + assert result.exit_blocked is True + assert [item.id for item in result.blocking_assertions] == ["tasks.all_done"] + assert result.blocking_assertions[0].actual == ["done", "todo"] + + +def test_custom_op_callable_is_invoked(): + calls = [] + + def has_review(assertion, obj, values): + calls.append((assertion.id, values)) + return bool(obj.get("review")) + + flow = FlowDef( + id="review.test", + entity_type="contribution", + workstations=[ + WorkstationDef(name="draft"), + WorkstationDef( + name="accepted", + entry_assertions=[ + AssertionDef(id="contribution.has_review", target="review", op="custom") + ], + ), + ], + ) + + result = FlowEngine(custom_ops={"contribution.has_review": has_review}).evaluate( + {"status": "draft", "review": {"approved_by": "bernd"}}, + flow, + ) + + assert "accepted" in result.reachable + assert calls == [("contribution.has_review", [{"approved_by": "bernd"}])] + + +def test_empty_assertions_make_all_workstations_reachable(): + flow = FlowDef( + id="empty.test", + entity_type="task", + workstations=[ + WorkstationDef(name="todo"), + WorkstationDef(name="in_progress"), + WorkstationDef(name="done"), + ], + ) + + result = FlowEngine().evaluate({"status": "todo"}, flow) + + assert result.exit_blocked is False + assert result.reachable == ["todo", "in_progress", "done"] + + +def test_circular_reference_in_target_path_does_not_loop_forever(): + obj = {"status": "active"} + obj["self"] = obj + + assert resolve_target(obj, "self.self.self.status") == [] + + +def test_yaml_flow_definitions_load_and_evaluate_representative_entities(): + flow_dir = Path(__file__).resolve().parents[1] / "flows" + flows = { + path.stem: FlowDef.from_dict(yaml.safe_load(path.read_text())) + for path in sorted(flow_dir.glob("*.yaml")) + } + + assert sorted(flows) == [ + "capability_request", + "contribution", + "task", + "workstream", + ] + + workstream_result = FlowEngine( + custom_ops={ + "dependencies.any_incomplete": lambda assertion, obj, values: any( + value != assertion.value for value in values + ) + } + ).evaluate( + { + "status": "active", + "tasks": [{"status": "done"}], + "dependencies": [{"workstation": "completed"}], + }, + flows["workstream"], + ) + assert "completed" in workstream_result.reachable + assert "blocked" in [item.workstation for item in workstream_result.unreachable] + + task_result = FlowEngine().evaluate( + {"status": "blocked", "needs_human": False}, + flows["task"], + ) + assert "in_progress" in task_result.reachable + + contribution_result = FlowEngine().evaluate( + {"status": "acknowledged", "previous_workstation": "acknowledged"}, + flows["contribution"], + ) + assert "accepted" in contribution_result.reachable + assert "merged" not in contribution_result.reachable + + capability_result = FlowEngine().evaluate( + {"status": "ready_for_review", "previous_workstation": "ready_for_review"}, + flows["capability_request"], + ) + assert "completed" in capability_result.reachable + assert "rejected" not in capability_result.reachable