generated from coulomb/repo-seed
T51: ContextResolver ABC + CONTEXT_RESOLVER_REGISTRY; resolve_context activity
updated to dispatch via registry (warns + binds {} on failure, never aborts run).
T52: RepoScopingContextResolver with 5-min in-process cache.
T53: StateHubContextResolver (no cache) for domain_summary and repo_sbom_status.
T54: activity-definitions/weekly-sbom-staleness.md (Monday 09:00 Berlin, cron
trigger, flag-stale-sbom rule at >30 days) + tasks/sbom-rescan.md template.
T55: 51 parametrized evaluator tests — all whitelisted operators, unsafe
expression rejection, empty condition, missing attribute, nested context access.
T56: 15 executor safety tests — UntrustedFieldError, object-type rejection,
injection fixture, LLM retry on bad JSON, review_required field.
T57: 6 integration tests — parses real definition, evaluates rule per-repo
(stale/fresh boundary), emits via NullSink, verifies spawn log entries.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
179 lines
7.0 KiB
Python
179 lines
7.0 KiB
Python
"""T55: Rule evaluator unit tests.
|
|
|
|
Covers:
|
|
- All whitelisted comparison operators
|
|
- Boolean operators (and, or, not)
|
|
- len() function
|
|
- Empty condition → True
|
|
- Missing attribute → None (no raise)
|
|
- Context dict attribute access (nested)
|
|
- Unsafe expression rejection for forbidden AST constructs
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
|
|
from activity_core.rules.evaluator import UnsafeExpression, evaluate_condition
|
|
|
|
|
|
# ── Event fixture helpers ──────────────────────────────────────────────────────
|
|
|
|
class _Attrs:
|
|
def __init__(self, **kw):
|
|
for k, v in kw.items():
|
|
setattr(self, k, v)
|
|
|
|
|
|
class _Event:
|
|
def __init__(self, **attrs):
|
|
self.attributes = _Attrs(**attrs)
|
|
|
|
|
|
def _event(**attrs) -> _Event:
|
|
return _Event(**attrs)
|
|
|
|
|
|
# ── Operator coverage (parametrized) ──────────────────────────────────────────
|
|
|
|
@pytest.mark.parametrize("expr,event_attrs,context,expected", [
|
|
# Equality
|
|
("event.attributes.x == 5", {"x": 5}, {}, True),
|
|
("event.attributes.x == 5", {"x": 3}, {}, False),
|
|
# Inequality
|
|
("event.attributes.x != 5", {"x": 3}, {}, True),
|
|
("event.attributes.x != 5", {"x": 5}, {}, False),
|
|
# Less than
|
|
("event.attributes.x < 10", {"x": 5}, {}, True),
|
|
("event.attributes.x < 10", {"x": 10}, {}, False),
|
|
# Less than or equal
|
|
("event.attributes.x <= 5", {"x": 5}, {}, True),
|
|
("event.attributes.x <= 5", {"x": 6}, {}, False),
|
|
# Greater than
|
|
("event.attributes.x > 3", {"x": 5}, {}, True),
|
|
("event.attributes.x > 3", {"x": 3}, {}, False),
|
|
# Greater than or equal
|
|
("event.attributes.x >= 5", {"x": 5}, {}, True),
|
|
("event.attributes.x >= 5", {"x": 4}, {}, False),
|
|
# in
|
|
("event.attributes.x in [1, 2, 3]", {"x": 2}, {}, True),
|
|
("event.attributes.x in [1, 2, 3]", {"x": 5}, {}, False),
|
|
# not in
|
|
("event.attributes.x not in [1, 2, 3]", {"x": 5}, {}, True),
|
|
("event.attributes.x not in [1, 2, 3]", {"x": 2}, {}, False),
|
|
# and
|
|
("event.attributes.x > 3 and event.attributes.x < 10", {"x": 5}, {}, True),
|
|
("event.attributes.x > 3 and event.attributes.x < 10", {"x": 2}, {}, False),
|
|
# or
|
|
("event.attributes.x < 3 or event.attributes.x > 8", {"x": 9}, {}, True),
|
|
("event.attributes.x < 3 or event.attributes.x > 8", {"x": 5}, {}, False),
|
|
# not
|
|
("not event.attributes.x == 5", {"x": 3}, {}, True),
|
|
("not event.attributes.x == 5", {"x": 5}, {}, False),
|
|
# len()
|
|
("len(event.attributes.items) > 2", {"items": [1, 2, 3]}, {}, True),
|
|
("len(event.attributes.items) > 2", {"items": [1]}, {}, False),
|
|
# None comparison (using == None since 'is' is not whitelisted)
|
|
("event.attributes.x == None", {"x": None}, {}, True),
|
|
("event.attributes.x == None", {"x": 5}, {}, False),
|
|
("event.attributes.x != None", {"x": 5}, {}, True),
|
|
("event.attributes.x != None", {"x": None}, {}, False),
|
|
])
|
|
def test_operator_coverage(expr, event_attrs, context, expected):
|
|
ev = _event(**event_attrs)
|
|
assert evaluate_condition(expr, ev, context) == expected
|
|
|
|
|
|
# ── Empty condition ───────────────────────────────────────────────────────────
|
|
|
|
def test_empty_condition_is_true():
|
|
assert evaluate_condition("", _event(), {}) is True
|
|
|
|
|
|
def test_whitespace_only_condition_is_true():
|
|
assert evaluate_condition(" ", _event(), {}) is True
|
|
|
|
|
|
# ── Missing attribute → None, no raise ───────────────────────────────────────
|
|
|
|
def test_missing_event_attribute_returns_none_in_comparison():
|
|
ev = _event() # no 'score' attribute
|
|
assert evaluate_condition("event.attributes.score == None", ev, {}) is True
|
|
|
|
|
|
def test_missing_event_attribute_in_comparison_is_false():
|
|
ev = _event()
|
|
# None > 5 → TypeError caught internally → False
|
|
assert evaluate_condition("event.attributes.score > 5", ev, {}) is False
|
|
|
|
|
|
# ── Context dict attribute access (nested) ────────────────────────────────────
|
|
|
|
def test_context_flat_key():
|
|
assert evaluate_condition("context.count > 5", None, {"count": 10}) is True
|
|
|
|
|
|
def test_context_nested_key():
|
|
context = {"repos": {"sbom_age_days": 45}}
|
|
assert evaluate_condition("context.repos.sbom_age_days > 30", None, context) is True
|
|
assert evaluate_condition("context.repos.sbom_age_days > 60", None, context) is False
|
|
|
|
|
|
def test_context_nested_missing_key_is_none():
|
|
context = {"repos": {}}
|
|
assert evaluate_condition("context.repos.sbom_age_days == None", None, context) is True
|
|
|
|
|
|
# ── Unsafe expression rejection ───────────────────────────────────────────────
|
|
|
|
@pytest.mark.parametrize("unsafe_expr", [
|
|
"__import__('os')",
|
|
"exec('pass')",
|
|
"eval('1+1')",
|
|
"open('/etc/passwd')", # arbitrary function call (not len)
|
|
"print('hello')", # arbitrary function call
|
|
"[x for x in [1,2,3]]", # list comprehension → ListComp
|
|
"{k: k for k in [1]}", # dict comprehension → DictComp
|
|
"{x for x in [1]}", # set comprehension → SetComp
|
|
"lambda: 5", # Lambda
|
|
"event.attributes.x if True else 0", # IfExp
|
|
])
|
|
def test_unsafe_expressions_are_rejected(unsafe_expr):
|
|
with pytest.raises(UnsafeExpression):
|
|
evaluate_condition(unsafe_expr, _event(), {})
|
|
|
|
|
|
def test_len_with_keyword_args_rejected():
|
|
with pytest.raises(UnsafeExpression):
|
|
evaluate_condition("len([1,2], extra=3)", _event(), {})
|
|
|
|
|
|
def test_is_none_rejected_as_unsafe():
|
|
# 'is' operator (ast.Is) is not whitelisted — use '== None' instead.
|
|
with pytest.raises(UnsafeExpression):
|
|
evaluate_condition("event.attributes.x is None", _event(x=None), {})
|
|
|
|
|
|
def test_walrus_operator_rejected():
|
|
# Walrus := is a SyntaxError in eval mode, so it raises UnsafeExpression.
|
|
with pytest.raises(UnsafeExpression):
|
|
evaluate_condition("(x := 5) > 3", _event(), {})
|
|
|
|
|
|
def test_assignment_rejected():
|
|
# Assignments are statements, not expressions — SyntaxError → UnsafeExpression.
|
|
with pytest.raises(UnsafeExpression):
|
|
evaluate_condition("x = 5", _event(), {})
|
|
|
|
|
|
# ── Weekly SBOM staleness rule smoke test ─────────────────────────────────────
|
|
|
|
def test_sbom_staleness_rule_matches_stale_repo():
|
|
context = {"repos": {"repo_slug": "repo-a", "sbom_age_days": 45}}
|
|
assert evaluate_condition("context.repos.sbom_age_days > 30", None, context) is True
|
|
|
|
|
|
def test_sbom_staleness_rule_skips_fresh_repo():
|
|
context = {"repos": {"repo_slug": "repo-b", "sbom_age_days": 10}}
|
|
assert evaluate_condition("context.repos.sbom_age_days > 30", None, context) is False
|