83 lines
2.8 KiB
Python
83 lines
2.8 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
from task_flow_engine.evaluator import AssertionEvaluator, CustomOp
|
|
from task_flow_engine.models import (
|
|
AssertionResult,
|
|
FlowDef,
|
|
FlowResult,
|
|
UnreachableWorkstation,
|
|
WorkstationDef,
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class FlowEngine:
|
|
custom_ops: dict[str, CustomOp] | None = None
|
|
|
|
def evaluate(self, obj: dict[str, Any], flow: FlowDef) -> FlowResult:
|
|
evaluator = AssertionEvaluator(custom_ops=self.custom_ops)
|
|
current_name = str(obj.get("workstation") or obj.get("status") or "")
|
|
current = flow.workstation(current_name)
|
|
blocking_assertions = (
|
|
self._failed_assertions(current.exit_assertions, obj, evaluator)
|
|
if current is not None
|
|
else []
|
|
)
|
|
|
|
reachable: list[str] = []
|
|
unreachable: list[UnreachableWorkstation] = []
|
|
for workstation in flow.workstations:
|
|
failed = self._failed_assertions(workstation.entry_assertions, obj, evaluator)
|
|
if failed:
|
|
unreachable.append(
|
|
UnreachableWorkstation(
|
|
workstation=workstation.name,
|
|
blocking=failed[0],
|
|
)
|
|
)
|
|
else:
|
|
reachable.append(workstation.name)
|
|
|
|
return FlowResult(
|
|
current_workstation=current_name,
|
|
exit_blocked=bool(blocking_assertions),
|
|
blocking_assertions=blocking_assertions,
|
|
reachable=reachable,
|
|
unreachable=unreachable,
|
|
)
|
|
|
|
def can_reach(
|
|
self,
|
|
obj: dict[str, Any],
|
|
flow: FlowDef,
|
|
target_workstation: str,
|
|
) -> tuple[bool, list[AssertionResult]]:
|
|
workstation = flow.workstation(target_workstation)
|
|
if workstation is None:
|
|
return False, [
|
|
AssertionResult(
|
|
id="flow.unknown_workstation",
|
|
passed=False,
|
|
target="workstation",
|
|
op="exists",
|
|
expected=target_workstation,
|
|
actual=[item.name for item in flow.workstations],
|
|
reason=f"Flow '{flow.id}' has no workstation '{target_workstation}'.",
|
|
)
|
|
]
|
|
evaluator = AssertionEvaluator(custom_ops=self.custom_ops)
|
|
failed = self._failed_assertions(workstation.entry_assertions, obj, evaluator)
|
|
return not failed, failed
|
|
|
|
@staticmethod
|
|
def _failed_assertions(
|
|
assertions: list,
|
|
obj: dict[str, Any],
|
|
evaluator: AssertionEvaluator,
|
|
) -> list[AssertionResult]:
|
|
results = [evaluator.evaluate(assertion, obj) for assertion in assertions]
|
|
return [result for result in results if not result.passed]
|