Expand rule actions for per-repo tasks

Add safe action interpolation and for_each binding for rule fan-out, update the weekly SBOM definition, cover the new evaluation path, and reconcile activity-core scope/workplans for the State Hub sync.
This commit is contained in:
2026-06-03 11:58:24 +02:00
parent 4b4e162c44
commit 30598fd1ad
12 changed files with 619 additions and 81 deletions

View File

@@ -24,10 +24,10 @@ from activity_core.db import make_engine
from activity_core.issue_sink import get_issue_sink
from activity_core.orm import ActivityDefinition as ActivityDefinitionRow
from activity_core.orm import ActivityRun, TaskInstance, TaskSpawnLog
from activity_core.rules import evaluate_condition
from activity_core.llm_client import get_llm_client
from activity_core.models import InstructionDef
from activity_core.report_sinks import persist_reports
from activity_core.rules.actions import expand_rule_actions
from activity_core.rules.executor import execute_instruction_with_audit
@@ -241,9 +241,8 @@ async def persist_task_instance(task_payload: dict) -> str:
@activity.defn
async def evaluate_rules(payload: dict) -> list[dict]:
"""Evaluate each rule condition against the event and context.
"""Evaluate rules and render matching actions as task specs.
Returns the list of matching rule dicts (those whose condition is True).
Rules that raise UnsafeExpression or any other error are skipped and logged.
Expected keys in payload:
@@ -268,18 +267,16 @@ async def evaluate_rules(payload: dict) -> list[dict]:
event_obj = _Env(event_attrs)
matched: list[dict] = []
task_specs: list[dict] = []
for rule in rules:
condition = rule.get("condition", "")
try:
if evaluate_condition(condition, event_obj, context):
matched.append(rule)
task_specs.extend(expand_rule_actions([rule], event_obj, context))
except UnsafeExpression as exc:
activity.logger.warning("rule %r unsafe expression — skipping: %s", rule.get("id"), exc)
except Exception as exc:
activity.logger.warning("rule %r eval error — skipping: %s", rule.get("id"), exc)
return matched
return task_specs
@activity.defn

View File

@@ -92,6 +92,14 @@ class ActionDef(BaseModel):
class RuleDef(BaseModel):
id: str
for_each: str | None = Field(
default=None,
description="Optional event/context path to a list for per-item rule expansion.",
)
bind_as: str = Field(
default="item",
description="Context key used for each item when for_each is set.",
)
condition: str = Field(
default="",
description="Rule DSL expression; empty string means always true.",

View File

@@ -0,0 +1,153 @@
"""Rule action expansion into concrete task specs.
Boundary: no imports from temporalio, sqlalchemy, fastapi, or any
activity_core.* module outside rules/.
"""
from __future__ import annotations
import re
from dataclasses import asdict
from typing import Any
from activity_core.rules.evaluator import UnsafeExpression, evaluate_condition
from activity_core.rules.models import TaskSpec
_PLACEHOLDER_RE = re.compile(r"\{([a-zA-Z_][a-zA-Z0-9_.]*)\}")
_PATH_RE = re.compile(r"^(event|context)(?:\.[a-zA-Z_][a-zA-Z0-9_]*)+$")
def expand_rule_actions(rules: list[dict], event: Any, context: dict) -> list[dict]:
"""Evaluate rule conditions and render matching actions as TaskSpec dicts.
A rule can opt into per-item expansion with ``for_each``:
for_each: context.repos.repos
bind_as: repo
Each list item is then available as ``context.repo`` while rendering the
condition and action fields. Without ``for_each``, a rule is evaluated once
against the original context.
"""
task_specs: list[dict] = []
for rule in rules:
for bound_context in _iteration_contexts(rule, event, context):
if not _condition_matches(rule, event, bound_context):
continue
task_specs.append(_task_spec_for_rule(rule, event, bound_context))
return task_specs
def _iteration_contexts(rule: dict, event: Any, context: dict) -> list[dict]:
for_each = rule.get("for_each")
if not for_each:
return [context]
if not isinstance(for_each, str) or not _PATH_RE.fullmatch(for_each):
raise UnsafeExpression(f"invalid for_each path: {for_each!r}")
values = _resolve_field(for_each, event, context)
if values is None:
return []
if not isinstance(values, list):
raise UnsafeExpression(f"for_each path does not resolve to a list: {for_each!r}")
bind_as = rule.get("bind_as", "item")
if not isinstance(bind_as, str) or not re.fullmatch(r"[a-zA-Z_][a-zA-Z0-9_]*", bind_as):
raise UnsafeExpression(f"invalid bind_as name: {bind_as!r}")
contexts: list[dict] = []
for value in values:
bound = dict(context)
bound[bind_as] = value
contexts.append(bound)
return contexts
def _condition_matches(rule: dict, event: Any, context: dict) -> bool:
return evaluate_condition(rule.get("condition", ""), event, context)
def _task_spec_for_rule(rule: dict, event: Any, context: dict) -> dict:
action = rule.get("action", {})
spec = TaskSpec(
title=str(_render_value(action.get("task_template", rule.get("id", "")), event, context) or ""),
description=str(_render_value(action.get("description", ""), event, context) or ""),
target_repo=_string_or_none(_render_value(action.get("target_repo"), event, context)),
priority=str(_render_value(action.get("priority", "medium"), event, context) or "medium"),
labels=_render_labels(action.get("labels", []), event, context),
due_in_days=_int_or_none(_render_value(action.get("due_in_days"), event, context)),
source_type="rule",
source_id=rule.get("id", ""),
)
result = asdict(spec)
result["condition"] = rule.get("condition", "")
return result
def _render_labels(value: Any, event: Any, context: dict) -> list[str]:
if not isinstance(value, list):
return []
rendered = []
for item in value:
rendered_item = _render_value(item, event, context)
if rendered_item is not None:
rendered.append(str(rendered_item))
return rendered
def _render_value(value: Any, event: Any, context: dict) -> Any:
if isinstance(value, str):
if _PATH_RE.fullmatch(value):
return _resolve_field(value, event, context)
if "{" in value and "}" in value:
return _PLACEHOLDER_RE.sub(
lambda match: _string_or_empty(
_resolve_field(match.group(1), event, context)
),
value,
)
return value
def _resolve_field(field_path: str, event: Any, context: dict) -> Any:
if not _PATH_RE.fullmatch(field_path):
raise UnsafeExpression(f"invalid field path: {field_path!r}")
root, tail = field_path.split(".", 1)
if root == "event":
return _resolve_path(event, tail)
return _resolve_path(context, tail)
def _resolve_path(obj: Any, path: str) -> Any:
current = obj
for part in path.split("."):
if current is None:
return None
if isinstance(current, dict):
current = current.get(part)
else:
current = getattr(current, part, None)
return current
def _string_or_none(value: Any) -> str | None:
if value is None:
return None
return str(value)
def _string_or_empty(value: Any) -> str:
if value is None:
return ""
if isinstance(value, (dict, list)):
raise UnsafeExpression("template placeholder resolved to a non-scalar value")
return str(value)
def _int_or_none(value: Any) -> int | None:
if value is None:
return None
try:
return int(value)
except (TypeError, ValueError) as exc:
raise UnsafeExpression(f"field cannot be converted to int: {value!r}") from exc

View File

@@ -114,7 +114,7 @@ class RunActivityWorkflow:
except Exception:
pass
matched_rules: list[dict] = await workflow.execute_activity(
task_spec_dicts: list[dict] = await workflow.execute_activity(
evaluate_rules,
{
"rules": defn.get("rules", []),
@@ -125,22 +125,6 @@ class RunActivityWorkflow:
retry_policy=_RETRY_POLICY,
)
# Convert matched rules to TaskSpec dicts for emission.
task_spec_dicts: list[dict] = []
for rule in matched_rules:
action = rule.get("action", {})
task_spec_dicts.append({
"title": action.get("task_template", rule.get("id", "")),
"description": "",
"target_repo": action.get("target_repo"),
"priority": action.get("priority", "medium"),
"labels": action.get("labels", []),
"due_in_days": action.get("due_in_days"),
"source_type": "rule",
"source_id": rule.get("id", ""),
"condition": rule.get("condition", ""),
})
report_dicts: list[dict] = []
if defn.get("instructions"):
instruction_result: dict = await workflow.execute_activity(