Files
state-hub/task_flow_engine/engine.py

89 lines
3.1 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from task_flow_engine.evaluator import AssertionEvaluator, CustomOp
from task_flow_engine.models import (
AssertionResult,
FlowDef,
FlowResult,
UnreachableWorkstation,
WorkstationDef,
)
@dataclass
class FlowEngine:
custom_ops: dict[str, CustomOp] | None = None
def evaluate(self, obj: dict[str, Any], flow: FlowDef) -> FlowResult:
evaluator = AssertionEvaluator(custom_ops=self.custom_ops)
current_name = str(obj.get("workstation") or obj.get("status") or "")
current = flow.workstation(current_name)
blocking_assertions = (
self._failed_assertions(current.exit_assertions, obj, evaluator)
if current is not None
else []
)
reachable: list[str] = []
unreachable: list[UnreachableWorkstation] = []
for workstation in flow.workstations:
failed = self._failed_assertions(workstation.entry_assertions, obj, evaluator)
if failed:
unreachable.append(
UnreachableWorkstation(
workstation=workstation.name,
blocking=failed[0],
)
)
else:
reachable.append(workstation.name)
return FlowResult(
current_workstation=current_name,
exit_blocked=bool(blocking_assertions),
blocking_assertions=blocking_assertions,
reachable=reachable,
unreachable=unreachable,
)
def can_reach(
self,
obj: dict[str, Any],
flow: FlowDef,
target_workstation: str,
) -> tuple[bool, list[AssertionResult]]:
workstation = flow.workstation(target_workstation)
if workstation is None:
return False, [
AssertionResult(
id="flow.unknown_workstation",
passed=False,
target="workstation",
op="exists",
expected=target_workstation,
actual=[item.name for item in flow.workstations],
reason=f"Flow '{flow.id}' has no workstation '{target_workstation}'.",
)
]
evaluator = AssertionEvaluator(custom_ops=self.custom_ops)
current_name = str(obj.get("workstation") or obj.get("status") or "")
failed: list[AssertionResult] = []
if target_workstation != current_name:
current = flow.workstation(current_name)
if current is not None:
failed.extend(self._failed_assertions(current.exit_assertions, obj, evaluator))
failed.extend(self._failed_assertions(workstation.entry_assertions, obj, evaluator))
return not failed, failed
@staticmethod
def _failed_assertions(
assertions: list,
obj: dict[str, Any],
evaluator: AssertionEvaluator,
) -> list[AssertionResult]:
results = [evaluator.evaluate(assertion, obj) for assertion in assertions]
return [result for result in results if not result.passed]