"""Rule action expansion into concrete task specs. Boundary: no imports from temporalio, sqlalchemy, fastapi, or any activity_core.* module outside rules/. """ from __future__ import annotations import re from dataclasses import asdict from typing import Any from activity_core.rules.evaluator import UnsafeExpression, evaluate_condition from activity_core.rules.models import TaskSpec _PLACEHOLDER_RE = re.compile(r"\{([a-zA-Z_][a-zA-Z0-9_.]*)\}") _PATH_RE = re.compile(r"^(event|context)(?:\.[a-zA-Z_][a-zA-Z0-9_]*)+$") def expand_rule_actions(rules: list[dict], event: Any, context: dict) -> list[dict]: """Evaluate rule conditions and render matching actions as TaskSpec dicts. A rule can opt into per-item expansion with ``for_each``: for_each: context.repos.repos bind_as: repo Each list item is then available as ``context.repo`` while rendering the condition and action fields. Without ``for_each``, a rule is evaluated once against the original context. """ task_specs: list[dict] = [] for rule in rules: for bound_context in _iteration_contexts(rule, event, context): if not _condition_matches(rule, event, bound_context): continue task_specs.append(_task_spec_for_rule(rule, event, bound_context)) return task_specs def _iteration_contexts(rule: dict, event: Any, context: dict) -> list[dict]: for_each = rule.get("for_each") if not for_each: return [context] if not isinstance(for_each, str) or not _PATH_RE.fullmatch(for_each): raise UnsafeExpression(f"invalid for_each path: {for_each!r}") values = _resolve_field(for_each, event, context) if values is None: return [] if not isinstance(values, list): raise UnsafeExpression(f"for_each path does not resolve to a list: {for_each!r}") bind_as = rule.get("bind_as", "item") if not isinstance(bind_as, str) or not re.fullmatch(r"[a-zA-Z_][a-zA-Z0-9_]*", bind_as): raise UnsafeExpression(f"invalid bind_as name: {bind_as!r}") contexts: list[dict] = [] for value in values: bound = dict(context) bound[bind_as] = value contexts.append(bound) return contexts def _condition_matches(rule: dict, event: Any, context: dict) -> bool: return evaluate_condition(rule.get("condition", ""), event, context) def _task_spec_for_rule(rule: dict, event: Any, context: dict) -> dict: action = rule.get("action", {}) spec = TaskSpec( title=str(_render_value(action.get("task_template", rule.get("id", "")), event, context) or ""), description=str(_render_value(action.get("description", ""), event, context) or ""), target_repo=_string_or_none(_render_value(action.get("target_repo"), event, context)), priority=str(_render_value(action.get("priority", "medium"), event, context) or "medium"), labels=_render_labels(action.get("labels", []), event, context), due_in_days=_int_or_none(_render_value(action.get("due_in_days"), event, context)), source_type="rule", source_id=rule.get("id", ""), ) result = asdict(spec) result["condition"] = rule.get("condition", "") return result def _render_labels(value: Any, event: Any, context: dict) -> list[str]: if not isinstance(value, list): return [] rendered = [] for item in value: rendered_item = _render_value(item, event, context) if rendered_item is not None: rendered.append(str(rendered_item)) return rendered def _render_value(value: Any, event: Any, context: dict) -> Any: if isinstance(value, str): if _PATH_RE.fullmatch(value): return _resolve_field(value, event, context) if "{" in value and "}" in value: return _PLACEHOLDER_RE.sub( lambda match: _string_or_empty( _resolve_field(match.group(1), event, context) ), value, ) return value def _resolve_field(field_path: str, event: Any, context: dict) -> Any: if not _PATH_RE.fullmatch(field_path): raise UnsafeExpression(f"invalid field path: {field_path!r}") root, tail = field_path.split(".", 1) if root == "event": return _resolve_path(event, tail) return _resolve_path(context, tail) def _resolve_path(obj: Any, path: str) -> Any: current = obj for part in path.split("."): if current is None: return None if isinstance(current, dict): current = current.get(part) else: current = getattr(current, part, None) return current def _string_or_none(value: Any) -> str | None: if value is None: return None return str(value) def _string_or_empty(value: Any) -> str: if value is None: return "" if isinstance(value, (dict, list)): raise UnsafeExpression("template placeholder resolved to a non-scalar value") return str(value) def _int_or_none(value: Any) -> int | None: if value is None: return None try: return int(value) except (TypeError, ValueError) as exc: raise UnsafeExpression(f"field cannot be converted to int: {value!r}") from exc