Files
activity-core/tests/test_resolve_context_binding.py

161 lines
4.6 KiB
Python

from __future__ import annotations
import json
import pytest
from temporalio.exceptions import ApplicationError
from activity_core import activities
from activity_core.activities import _bind_resolver_result, resolve_context
def test_bind_resolver_result_unwraps_single_key_wrapper() -> None:
projects = [{"repo": "kaizen-agentic", "has_metrics": True}]
assert _bind_resolver_result("projects", {"projects": projects}) == projects
def test_bind_resolver_result_keeps_multi_key_summary() -> None:
summary = {
"repos": [{"repo_slug": "a"}],
"stale_count": 1,
"total_count": 2,
}
assert _bind_resolver_result("repos", summary) == summary
@pytest.mark.asyncio
async def test_resolve_context_unwraps_kaizen_projects(monkeypatch) -> None:
class _FakeResolver:
def resolve(self, query: str, event: object, params: dict) -> dict:
assert query == "discover_kaizen_projects"
return {"projects": [{"repo": "pilot", "has_metrics": True}]}
import activity_core.context_resolvers # noqa: F401
from activity_core.context_resolvers.base import CONTEXT_RESOLVER_REGISTRY
monkeypatch.setitem(CONTEXT_RESOLVER_REGISTRY, "kaizen", lambda: _FakeResolver())
snapshot = await resolve_context(
[
{
"type": "kaizen",
"query": "discover_kaizen_projects",
"params": {},
"bind_to": "context.projects",
}
]
)
assert snapshot == {"projects": [{"repo": "pilot", "has_metrics": True}]}
@pytest.mark.asyncio
async def test_resolve_context_binds_event_payload_attributes() -> None:
envelope = {
"type": "kaizen.metrics.recorded",
"attributes": {
"agent": "coach",
"project": "kaizen-agentic",
"summary": {
"success_rate": 0.75,
"execution_count": 12,
"avg_quality": 0.81,
},
},
}
snapshot = await resolve_context(
[
{
"type": "event-payload",
"bind_to": "context.metrics",
}
],
json.dumps(envelope),
)
assert snapshot == {
"metrics": {
"agent": "coach",
"project": "kaizen-agentic",
"summary": {
"success_rate": 0.75,
"execution_count": 12,
"avg_quality": 0.81,
},
}
}
@pytest.mark.asyncio
async def test_event_payload_context_supports_low_success_rate_rule() -> None:
snapshot = await resolve_context(
[
{
"type": "event-payload",
"bind_to": "context.metrics",
}
],
json.dumps({
"type": "kaizen.metrics.recorded",
"attributes": {
"agent": "coach",
"project": "kaizen-agentic",
"summary": {"success_rate": 0.75},
},
}),
)
result = await activities.evaluate_rules({
"rules": [
{
"id": "flag-low-success-rate",
"condition": "context.metrics.summary.success_rate < 0.8",
"action": {
"task_template": (
"Review low success rate for {context.metrics.agent}"
),
"target_repo": "context.metrics.project",
"priority": "high",
"labels": ["kaizen", "{context.metrics.agent}"],
},
}
],
"event": {},
"context": snapshot,
})
assert len(result) == 1
assert result[0]["source_id"] == "flag-low-success-rate"
assert result[0]["title"] == "Review low success rate for coach"
assert result[0]["target_repo"] == "kaizen-agentic"
assert result[0]["labels"] == ["kaizen", "coach"]
@pytest.mark.asyncio
async def test_event_payload_context_binds_empty_when_optional_envelope_missing() -> None:
snapshot = await resolve_context(
[
{
"type": "event-payload",
"bind_to": "context.metrics",
}
],
)
assert snapshot == {"metrics": {}}
@pytest.mark.asyncio
async def test_event_payload_context_fails_when_required_envelope_missing() -> None:
with pytest.raises(ApplicationError, match="Required context resolver"):
await resolve_context(
[
{
"type": "event-payload",
"bind_to": "context.metrics",
"required": True,
}
],
)