Files
activity-core/tests/test_integration_event_bridge.py
tegwick 827ef9c1a0 feat(WP-0003c): context adapters, first ActivityDefinition, full test suite
T51: ContextResolver ABC + CONTEXT_RESOLVER_REGISTRY; resolve_context activity
updated to dispatch via registry (warns + binds {} on failure, never aborts run).
T52: RepoScopingContextResolver with 5-min in-process cache.
T53: StateHubContextResolver (no cache) for domain_summary and repo_sbom_status.
T54: activity-definitions/weekly-sbom-staleness.md (Monday 09:00 Berlin, cron
trigger, flag-stale-sbom rule at >30 days) + tasks/sbom-rescan.md template.
T55: 51 parametrized evaluator tests — all whitelisted operators, unsafe
expression rejection, empty condition, missing attribute, nested context access.
T56: 15 executor safety tests — UntrustedFieldError, object-type rejection,
injection fixture, LLM retry on bad JSON, review_required field.
T57: 6 integration tests — parses real definition, evaluates rule per-repo
(stale/fresh boundary), emits via NullSink, verifies spawn log entries.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 23:24:48 +02:00

185 lines
6.4 KiB
Python

"""T57: Integration test — fixture event → rule → spawn log → IssueSink.
No Temporal, no live DB required. Tests the complete event-bridge pipeline:
1. Parse ActivityDefinition from file
2. Evaluate rules against mock context data (per-repo iteration)
3. Emit matching TaskSpecs via NullSink
4. Assert spawn log entries have correct source_id, condition_matched, and event ID
Uses the weekly-sbom-staleness definition as a concrete test case.
"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from pathlib import Path
import pytest
from activity_core.definition_parser import parse_file
from activity_core.issue_sink import NullSink
from activity_core.models import EventEnvelope
from activity_core.rules.evaluator import evaluate_condition
from activity_core.rules.models import TaskRef, TaskSpec
_DEFINITIONS_DIR = Path(__file__).parent.parent / "activity-definitions"
_SBOM_DEF_PATH = _DEFINITIONS_DIR / "weekly-sbom-staleness.md"
# ── Helpers ───────────────────────────────────────────────────────────────────
class _EmptyEvent:
"""Stub event with no attributes (simulates a cron tick)."""
pass
def _build_event(event_id: str) -> EventEnvelope:
return EventEnvelope(
id=event_id,
type="org.cron.tick",
timestamp=datetime.now(tz=timezone.utc),
publisher="activity-core/scheduler",
)
def _run_rule_pipeline(
rule: dict,
repos: list[dict],
event: _EmptyEvent,
sink: NullSink,
) -> tuple[list[TaskRef], list[dict]]:
"""Evaluate one rule against each repo in mock data, emit via sink.
Returns (task_refs, spawn_log_entries).
Simulates the per-repo iteration that a real context resolver + workflow
would perform for multi-repo SBOM checks.
"""
task_refs: list[TaskRef] = []
spawn_log: list[dict] = []
triggering_event_id = str(uuid.uuid4())
for repo in repos:
context = {"repos": repo}
if not evaluate_condition(rule["condition"], event, context):
continue
action = rule.get("action", {})
spec = TaskSpec(
title=f"Run SBOM rescan — {repo['repo_slug']}",
description="SBOM rescan needed — age threshold exceeded.",
target_repo=repo["repo_slug"],
priority=action.get("priority", "medium"),
labels=action.get("labels", []),
source_type="rule",
source_id=rule["id"],
triggering_event_id=triggering_event_id,
)
ref = sink.emit(spec)
task_refs.append(ref)
spawn_log.append({
"source_id": rule["id"],
"condition_matched": rule["condition"],
"triggering_event_id": triggering_event_id,
"task_ref": ref.external_id,
})
return task_refs, spawn_log
# ── Tests ─────────────────────────────────────────────────────────────────────
def test_sbom_definition_parses_correctly():
defn = parse_file(_SBOM_DEF_PATH)
assert defn.id == "weekly-sbom-staleness"
assert defn.trigger_config["trigger_type"] == "cron"
assert defn.trigger_config["cron_expression"] == "0 9 * * 1"
assert len(defn.rules) == 1
assert defn.rules[0]["id"] == "flag-stale-sbom"
def test_pipeline_emits_one_task_for_stale_repo_only():
"""Stale repo (45 days) matches; fresh repo (10 days) does not."""
defn = parse_file(_SBOM_DEF_PATH)
rule = defn.rules[0]
repos = [
{"repo_slug": "repo-a", "sbom_age_days": 45}, # stale — should match
{"repo_slug": "repo-b", "sbom_age_days": 10}, # fresh — should not
]
task_refs, spawn_log = _run_rule_pipeline(rule, repos, _EmptyEvent(), NullSink())
# Step 5: one TaskSpec returned (repo-a only)
assert len(task_refs) == 1
# Step 6: NullSink returned a synthetic TaskRef
assert task_refs[0].backend == "null"
assert task_refs[0].external_id.startswith("null-")
# Step 7: one spawn_log entry with correct fields
assert len(spawn_log) == 1
entry = spawn_log[0]
assert entry["source_id"] == "flag-stale-sbom"
assert entry["condition_matched"] == "context.repos.sbom_age_days > 30"
assert entry["triggering_event_id"] == spawn_log[0]["triggering_event_id"]
def test_pipeline_emits_no_tasks_when_all_repos_fresh():
defn = parse_file(_SBOM_DEF_PATH)
rule = defn.rules[0]
repos = [
{"repo_slug": "repo-x", "sbom_age_days": 5},
{"repo_slug": "repo-y", "sbom_age_days": 28},
]
task_refs, spawn_log = _run_rule_pipeline(rule, repos, _EmptyEvent(), NullSink())
assert task_refs == []
assert spawn_log == []
def test_pipeline_emits_multiple_tasks_when_multiple_stale():
defn = parse_file(_SBOM_DEF_PATH)
rule = defn.rules[0]
repos = [
{"repo_slug": "repo-a", "sbom_age_days": 60},
{"repo_slug": "repo-b", "sbom_age_days": 45},
{"repo_slug": "repo-c", "sbom_age_days": 10},
]
task_refs, spawn_log = _run_rule_pipeline(rule, repos, _EmptyEvent(), NullSink())
assert len(task_refs) == 2
assert len(spawn_log) == 2
assert all(e["source_id"] == "flag-stale-sbom" for e in spawn_log)
# Verify all task_refs are unique
assert len({r.external_id for r in task_refs}) == 2
def test_pipeline_task_ref_has_null_backend():
defn = parse_file(_SBOM_DEF_PATH)
rule = defn.rules[0]
repos = [{"repo_slug": "stale-repo", "sbom_age_days": 100}]
task_refs, spawn_log = _run_rule_pipeline(rule, repos, _EmptyEvent(), NullSink())
assert task_refs[0].backend == "null"
assert spawn_log[0]["task_ref"] == task_refs[0].external_id
def test_rule_condition_boundary_exactly_30_days():
"""30 days exactly is NOT stale (condition is strict >)."""
defn = parse_file(_SBOM_DEF_PATH)
rule = defn.rules[0]
repos_at_boundary = [{"repo_slug": "repo-edge", "sbom_age_days": 30}]
task_refs, _ = _run_rule_pipeline(rule, repos_at_boundary, _EmptyEvent(), NullSink())
assert task_refs == [], "30 days exactly should not trigger (condition is > 30)"
repos_over_boundary = [{"repo_slug": "repo-edge", "sbom_age_days": 31}]
task_refs, _ = _run_rule_pipeline(rule, repos_over_boundary, _EmptyEvent(), NullSink())
assert len(task_refs) == 1, "31 days should trigger"