Files
state-hub/tests/test_task_flow_engine.py

215 lines
6.3 KiB
Python

from pathlib import Path
import yaml
from task_flow_engine import AssertionDef, FlowDef, FlowEngine, WorkstationDef, resolve_target
def test_all_assertions_satisfied_reports_reachable_workstations():
flow = FlowDef(
id="workstream.test",
entity_type="workstream",
workstations=[
WorkstationDef(name="active"),
WorkstationDef(
name="finished",
entry_assertions=[
AssertionDef(
id="tasks.all_done",
target="tasks.*.status",
op="all_eq",
value=["done", "cancel"],
)
],
),
],
)
result = FlowEngine().evaluate(
{"status": "active", "tasks": [{"status": "done"}, {"status": "cancel"}]},
flow,
)
assert result.exit_blocked is False
assert result.reachable == ["active", "finished"]
assert result.unreachable == []
def test_failing_exit_assertion_identifies_blocking_assertion():
flow = FlowDef(
id="workstream.test",
entity_type="workstream",
workstations=[
WorkstationDef(
name="active",
exit_assertions=[
AssertionDef(
id="tasks.all_done",
target="tasks.*.status",
op="all_eq",
value=["done", "cancel"],
)
],
)
],
)
result = FlowEngine().evaluate(
{"status": "active", "tasks": [{"status": "done"}, {"status": "todo"}]},
flow,
)
assert result.exit_blocked is True
assert [item.id for item in result.blocking_assertions] == ["tasks.all_done"]
assert result.blocking_assertions[0].actual == ["done", "todo"]
def test_can_reach_checks_current_exit_assertions_before_target_entry():
flow = FlowDef(
id="workstream.test",
entity_type="workstream",
workstations=[
WorkstationDef(
name="active",
exit_assertions=[
AssertionDef(
id="dependencies.all_complete",
target="dependencies.*.workstation",
op="all_eq",
value=["finished", "archived"],
)
],
),
WorkstationDef(
name="finished",
entry_assertions=[
AssertionDef(
id="tasks.all_done",
target="tasks.*.status",
op="all_eq",
value=["done", "cancel"],
)
],
),
],
)
can_reach, failures = FlowEngine().can_reach(
{
"status": "active",
"tasks": [{"status": "done"}],
"dependencies": [{"workstation": "active"}],
},
flow,
"finished",
)
assert can_reach is False
assert [item.id for item in failures] == ["dependencies.all_complete"]
def test_custom_op_callable_is_invoked():
calls = []
def has_review(assertion, obj, values):
calls.append((assertion.id, values))
return bool(obj.get("review"))
flow = FlowDef(
id="review.test",
entity_type="contribution",
workstations=[
WorkstationDef(name="draft"),
WorkstationDef(
name="accepted",
entry_assertions=[
AssertionDef(id="contribution.has_review", target="review", op="custom")
],
),
],
)
result = FlowEngine(custom_ops={"contribution.has_review": has_review}).evaluate(
{"status": "draft", "review": {"approved_by": "bernd"}},
flow,
)
assert "accepted" in result.reachable
assert calls == [("contribution.has_review", [{"approved_by": "bernd"}])]
def test_empty_assertions_make_all_workstations_reachable():
flow = FlowDef(
id="empty.test",
entity_type="task",
workstations=[
WorkstationDef(name="todo"),
WorkstationDef(name="progress"),
WorkstationDef(name="done"),
],
)
result = FlowEngine().evaluate({"status": "todo"}, flow)
assert result.exit_blocked is False
assert result.reachable == ["todo", "progress", "done"]
def test_circular_reference_in_target_path_does_not_loop_forever():
obj = {"status": "active"}
obj["self"] = obj
assert resolve_target(obj, "self.self.self.status") == []
def test_yaml_flow_definitions_load_and_evaluate_representative_entities():
flow_dir = Path(__file__).resolve().parents[1] / "flows"
flows = {
path.stem: FlowDef.from_dict(yaml.safe_load(path.read_text()))
for path in sorted(flow_dir.glob("*.yaml"))
}
assert sorted(flows) == [
"capability_request",
"contribution",
"task",
"workstream",
]
workstream_result = FlowEngine(
custom_ops={
"dependencies.any_incomplete": lambda assertion, obj, values: any(
value not in assertion.value for value in values
)
}
).evaluate(
{
"status": "active",
"tasks": [{"status": "done"}],
"dependencies": [{"workstation": "finished"}],
},
flows["workstream"],
)
assert "finished" in workstream_result.reachable
assert "blocked" in [item.workstation for item in workstream_result.unreachable]
task_result = FlowEngine().evaluate(
{"status": "wait", "needs_human": False},
flows["task"],
)
assert "progress" in task_result.reachable
contribution_result = FlowEngine().evaluate(
{"status": "acknowledged", "previous_workstation": "acknowledged"},
flows["contribution"],
)
assert "accepted" in contribution_result.reachable
assert "merged" not in contribution_result.reachable
capability_result = FlowEngine().evaluate(
{"status": "ready_for_review", "previous_workstation": "ready_for_review"},
flows["capability_request"],
)
assert "completed" in capability_result.reachable
assert "rejected" not in capability_result.reachable