from __future__ import annotations from dataclasses import dataclass from typing import Any from task_flow_engine.evaluator import AssertionEvaluator, CustomOp from task_flow_engine.models import ( AssertionResult, FlowDef, FlowResult, UnreachableWorkstation, WorkstationDef, ) @dataclass class FlowEngine: custom_ops: dict[str, CustomOp] | None = None def evaluate(self, obj: dict[str, Any], flow: FlowDef) -> FlowResult: evaluator = AssertionEvaluator(custom_ops=self.custom_ops) current_name = str(obj.get("workstation") or obj.get("status") or "") current = flow.workstation(current_name) blocking_assertions = ( self._failed_assertions(current.exit_assertions, obj, evaluator) if current is not None else [] ) reachable: list[str] = [] unreachable: list[UnreachableWorkstation] = [] for workstation in flow.workstations: failed = self._failed_assertions(workstation.entry_assertions, obj, evaluator) if failed: unreachable.append( UnreachableWorkstation( workstation=workstation.name, blocking=failed[0], ) ) else: reachable.append(workstation.name) return FlowResult( current_workstation=current_name, exit_blocked=bool(blocking_assertions), blocking_assertions=blocking_assertions, reachable=reachable, unreachable=unreachable, ) def can_reach( self, obj: dict[str, Any], flow: FlowDef, target_workstation: str, ) -> tuple[bool, list[AssertionResult]]: workstation = flow.workstation(target_workstation) if workstation is None: return False, [ AssertionResult( id="flow.unknown_workstation", passed=False, target="workstation", op="exists", expected=target_workstation, actual=[item.name for item in flow.workstations], reason=f"Flow '{flow.id}' has no workstation '{target_workstation}'.", ) ] evaluator = AssertionEvaluator(custom_ops=self.custom_ops) failed = self._failed_assertions(workstation.entry_assertions, obj, evaluator) return not failed, failed @staticmethod def _failed_assertions( assertions: list, obj: dict[str, Any], evaluator: AssertionEvaluator, ) -> list[AssertionResult]: results = [evaluator.evaluate(assertion, obj) for assertion in assertions] return [result for result in results if not result.passed]