Files
activity-core/tests/test_integration_event_bridge.py

246 lines
8.6 KiB
Python

"""T57: Integration test — fixture event → rule → spawn log → IssueSink.
No Temporal, no live DB required. Tests the complete event-bridge pipeline:
1. Parse ActivityDefinition from file
2. Evaluate rules against mock context data (per-repo iteration)
3. Emit matching TaskSpecs via NullSink
4. Assert spawn log entries have correct source_id, condition_matched, and event ID
Uses the weekly-sbom-staleness definition as a concrete test case.
"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from pathlib import Path
import pytest
from activity_core.definition_parser import parse_file
from activity_core.issue_sink import NullSink
from activity_core.models import EventEnvelope
from activity_core.rules.actions import expand_rule_actions
from activity_core.rules.models import TaskRef, TaskSpec
_DEFINITIONS_DIR = Path(__file__).parent.parent / "activity-definitions"
_SBOM_DEF_PATH = _DEFINITIONS_DIR / "weekly-sbom-staleness.md"
_CODING_RETRO_DEF_PATH = _DEFINITIONS_DIR / "weekly-coding-retro.md"
# ── Helpers ───────────────────────────────────────────────────────────────────
class _EmptyEvent:
"""Stub event with no attributes (simulates a cron tick)."""
pass
def _build_event(event_id: str) -> EventEnvelope:
return EventEnvelope(
id=event_id,
type="org.cron.tick",
timestamp=datetime.now(tz=timezone.utc),
publisher="activity-core/scheduler",
)
def _run_rule_pipeline(
rule: dict,
repos: list[dict],
event: _EmptyEvent,
sink: NullSink,
) -> tuple[list[TaskRef], list[dict]]:
"""Evaluate one rule against each repo in mock data, emit via sink.
Returns (task_refs, spawn_log_entries).
Simulates the per-repo iteration that a real context resolver + workflow
would perform for multi-repo SBOM checks.
"""
task_refs: list[TaskRef] = []
spawn_log: list[dict] = []
triggering_event_id = str(uuid.uuid4())
context = {"repos": {"repos": repos}}
for spec_dict in expand_rule_actions([rule], event, context):
spec = TaskSpec(
title=spec_dict["title"],
description=spec_dict["description"],
target_repo=spec_dict["target_repo"],
priority=spec_dict["priority"],
labels=spec_dict["labels"],
due_in_days=spec_dict["due_in_days"],
source_type="rule",
source_id=spec_dict["source_id"],
triggering_event_id=triggering_event_id,
)
ref = sink.emit(spec)
task_refs.append(ref)
spawn_log.append({
"source_id": spec_dict["source_id"],
"condition_matched": spec_dict["condition"],
"triggering_event_id": triggering_event_id,
"task_ref": ref.external_id,
})
return task_refs, spawn_log
# ── Tests ─────────────────────────────────────────────────────────────────────
def test_sbom_definition_parses_correctly():
defn = parse_file(_SBOM_DEF_PATH)
assert defn.id == "weekly-sbom-staleness"
assert defn.trigger_config["trigger_type"] == "cron"
assert defn.trigger_config["cron_expression"] == "0 9 * * 1"
assert len(defn.rules) == 1
assert defn.rules[0]["id"] == "flag-stale-sbom"
def test_coding_retro_definition_parses_disabled_until_verified():
defn = parse_file(_CODING_RETRO_DEF_PATH)
assert defn.id == "weekly-coding-retro"
assert defn.enabled is False
assert defn.trigger_config["trigger_type"] == "cron"
assert defn.trigger_config["cron_expression"] == "0 19 * * 6"
assert defn.trigger_config["timezone"] == "Europe/Berlin"
assert defn.context_sources == [
{
"type": "state-hub",
"query": "coding_retro",
"params": {"window_days": 7, "limit": 100},
"bind_to": "context.retro",
}
]
assert len(defn.rules) == 1
assert defn.rules[0]["id"] == "propose-weekly-improvements"
def test_coding_retro_rule_emits_one_task_per_positive_suggestion():
defn = parse_file(_CODING_RETRO_DEF_PATH)
rule = defn.rules[0]
context = {
"retro": {
"suggestions": [
{
"repo": "activity-core",
"title": "Harden coding retro smoke gates",
"recommendation": "Dry-run with fixture and live hub evidence.",
"priority": "high",
"score": 8.5,
},
{
"repo": "quiet-repo",
"title": "Do not emit zero-score suggestion",
"recommendation": "This should stay quiet.",
"priority": "low",
"score": 0,
},
]
}
}
specs = expand_rule_actions([rule], _EmptyEvent(), context)
assert specs == [
{
"title": "Harden coding retro smoke gates",
"description": "Dry-run with fixture and live hub evidence.",
"target_repo": "activity-core",
"priority": "high",
"labels": ["coding-retro", "improvement", "automated"],
"due_in_days": None,
"source_type": "rule",
"source_id": "propose-weekly-improvements",
"triggering_event_id": "",
"activity_definition_id": "",
"condition": "context.s.score > 0",
}
]
def test_pipeline_emits_one_task_for_stale_repo_only():
"""Stale repo (45 days) matches; fresh repo (10 days) does not."""
defn = parse_file(_SBOM_DEF_PATH)
rule = defn.rules[0]
repos = [
{"repo_slug": "repo-a", "sbom_age_days": 45}, # stale — should match
{"repo_slug": "repo-b", "sbom_age_days": 10}, # fresh — should not
]
task_refs, spawn_log = _run_rule_pipeline(rule, repos, _EmptyEvent(), NullSink())
# Step 5: one TaskSpec returned (repo-a only)
assert len(task_refs) == 1
# Step 6: NullSink returned a synthetic TaskRef
assert task_refs[0].backend == "null"
assert task_refs[0].external_id.startswith("null-")
# Step 7: one spawn_log entry with correct fields
assert len(spawn_log) == 1
entry = spawn_log[0]
assert entry["source_id"] == "flag-stale-sbom"
assert entry["condition_matched"] == "context.repo.sbom_age_days > 30"
assert entry["triggering_event_id"] == spawn_log[0]["triggering_event_id"]
def test_pipeline_emits_no_tasks_when_all_repos_fresh():
defn = parse_file(_SBOM_DEF_PATH)
rule = defn.rules[0]
repos = [
{"repo_slug": "repo-x", "sbom_age_days": 5},
{"repo_slug": "repo-y", "sbom_age_days": 28},
]
task_refs, spawn_log = _run_rule_pipeline(rule, repos, _EmptyEvent(), NullSink())
assert task_refs == []
assert spawn_log == []
def test_pipeline_emits_multiple_tasks_when_multiple_stale():
defn = parse_file(_SBOM_DEF_PATH)
rule = defn.rules[0]
repos = [
{"repo_slug": "repo-a", "sbom_age_days": 60},
{"repo_slug": "repo-b", "sbom_age_days": 45},
{"repo_slug": "repo-c", "sbom_age_days": 10},
]
task_refs, spawn_log = _run_rule_pipeline(rule, repos, _EmptyEvent(), NullSink())
assert len(task_refs) == 2
assert len(spawn_log) == 2
assert all(e["source_id"] == "flag-stale-sbom" for e in spawn_log)
# Verify all task_refs are unique
assert len({r.external_id for r in task_refs}) == 2
def test_pipeline_task_ref_has_null_backend():
defn = parse_file(_SBOM_DEF_PATH)
rule = defn.rules[0]
repos = [{"repo_slug": "stale-repo", "sbom_age_days": 100}]
task_refs, spawn_log = _run_rule_pipeline(rule, repos, _EmptyEvent(), NullSink())
assert task_refs[0].backend == "null"
assert spawn_log[0]["task_ref"] == task_refs[0].external_id
def test_rule_condition_boundary_exactly_30_days():
"""30 days exactly is NOT stale (condition is strict >)."""
defn = parse_file(_SBOM_DEF_PATH)
rule = defn.rules[0]
repos_at_boundary = [{"repo_slug": "repo-edge", "sbom_age_days": 30}]
task_refs, _ = _run_rule_pipeline(rule, repos_at_boundary, _EmptyEvent(), NullSink())
assert task_refs == [], "30 days exactly should not trigger (condition is > 30)"
repos_over_boundary = [{"repo_slug": "repo-edge", "sbom_age_days": 31}]
task_refs, _ = _run_rule_pipeline(rule, repos_over_boundary, _EmptyEvent(), NullSink())
assert len(task_refs) == 1, "31 days should trigger"