Add event-payload context resolver

This commit is contained in:
2026-06-18 14:01:11 +02:00
parent b84e474ac5
commit 0554014083
5 changed files with 351 additions and 4 deletions

View File

@@ -11,6 +11,7 @@ activities that need DB access.
from __future__ import annotations
import json
import uuid
from datetime import datetime, timezone
from typing import Any
@@ -65,6 +66,24 @@ def _bind_resolver_result(bind_key: str, result: Any) -> Any:
return result
def _parse_event_envelope(event_envelope_json: str | None) -> dict[str, Any] | None:
"""Parse an event envelope JSON string for context resolvers."""
if not event_envelope_json:
return None
try:
payload = json.loads(event_envelope_json)
except (TypeError, json.JSONDecodeError) as exc:
activity.logger.warning("Invalid event envelope JSON - %s", exc)
return None
if not isinstance(payload, dict):
activity.logger.warning(
"Invalid event envelope JSON - expected object, got %s",
type(payload).__name__,
)
return None
return payload
# ── Activities ─────────────────────────────────────────────────────────────────
@activity.defn
@@ -124,6 +143,7 @@ async def resolve_context(
from activity_core.context_resolvers.base import CONTEXT_RESOLVER_REGISTRY
snapshot: dict = {}
event_envelope = _parse_event_envelope(event_envelope_json)
for source in context_sources:
source_type = source.get("type", "")
query = source.get("query", "")
@@ -152,7 +172,7 @@ async def resolve_context(
continue
try:
resolved = resolver_cls().resolve(query, None, params)
resolved = resolver_cls().resolve(query, event_envelope, params)
snapshot[bind_key] = _bind_resolver_result(bind_key, resolved)
except Exception as exc:
if required:

View File

@@ -1 +1,7 @@
from activity_core.context_resolvers import kaizen, ops_inventory, repo_scoping, state_hub # noqa: F401
from activity_core.context_resolvers import ( # noqa: F401
event_payload,
kaizen,
ops_inventory,
repo_scoping,
state_hub,
)

View File

@@ -0,0 +1,51 @@
"""Event payload context adapter.
Registered as source type ``event-payload``. It exposes the triggering
EventEnvelope attributes to event-triggered ActivityDefinitions without
requiring an external context service call.
"""
from __future__ import annotations
from typing import Any
from activity_core.context_resolvers.base import CONTEXT_RESOLVER_REGISTRY, ContextResolver
class EventPayloadContextResolver(ContextResolver):
"""Resolve context from the triggering event envelope attributes."""
def resolve(self, query: str, event: Any, params: dict[str, Any]) -> Any:
attributes = _event_attributes(event)
if query in {"", "attributes"}:
return attributes
if query.startswith("attributes."):
return _resolve_path(attributes, query.removeprefix("attributes."))
return _resolve_path(attributes, query)
def _event_attributes(event: Any) -> dict[str, Any]:
if not isinstance(event, dict):
raise RuntimeError("event-payload source requires an event envelope")
attributes = event.get("attributes")
if not isinstance(attributes, dict):
raise RuntimeError("event-payload source requires envelope attributes")
return attributes
def _resolve_path(root: dict[str, Any], path: str) -> Any:
if not path:
return root
current: Any = root
for part in path.split("."):
if not part:
return {}
if not isinstance(current, dict):
return {}
current = current.get(part)
if current is None:
return {}
return current
CONTEXT_RESOLVER_REGISTRY["event-payload"] = EventPayloadContextResolver

View File

@@ -1,7 +1,11 @@
from __future__ import annotations
import pytest
import json
import pytest
from temporalio.exceptions import ApplicationError
from activity_core import activities
from activity_core.activities import _bind_resolver_result, resolve_context
@@ -42,4 +46,115 @@ async def test_resolve_context_unwraps_kaizen_projects(monkeypatch) -> None:
]
)
assert snapshot == {"projects": [{"repo": "pilot", "has_metrics": True}]}
assert snapshot == {"projects": [{"repo": "pilot", "has_metrics": True}]}
@pytest.mark.asyncio
async def test_resolve_context_binds_event_payload_attributes() -> None:
envelope = {
"type": "kaizen.metrics.recorded",
"attributes": {
"agent": "coach",
"project": "kaizen-agentic",
"summary": {
"success_rate": 0.75,
"execution_count": 12,
"avg_quality": 0.81,
},
},
}
snapshot = await resolve_context(
[
{
"type": "event-payload",
"bind_to": "context.metrics",
}
],
json.dumps(envelope),
)
assert snapshot == {
"metrics": {
"agent": "coach",
"project": "kaizen-agentic",
"summary": {
"success_rate": 0.75,
"execution_count": 12,
"avg_quality": 0.81,
},
}
}
@pytest.mark.asyncio
async def test_event_payload_context_supports_low_success_rate_rule() -> None:
snapshot = await resolve_context(
[
{
"type": "event-payload",
"bind_to": "context.metrics",
}
],
json.dumps({
"type": "kaizen.metrics.recorded",
"attributes": {
"agent": "coach",
"project": "kaizen-agentic",
"summary": {"success_rate": 0.75},
},
}),
)
result = await activities.evaluate_rules({
"rules": [
{
"id": "flag-low-success-rate",
"condition": "context.metrics.summary.success_rate < 0.8",
"action": {
"task_template": (
"Review low success rate for {context.metrics.agent}"
),
"target_repo": "context.metrics.project",
"priority": "high",
"labels": ["kaizen", "{context.metrics.agent}"],
},
}
],
"event": {},
"context": snapshot,
})
assert len(result) == 1
assert result[0]["source_id"] == "flag-low-success-rate"
assert result[0]["title"] == "Review low success rate for coach"
assert result[0]["target_repo"] == "kaizen-agentic"
assert result[0]["labels"] == ["kaizen", "coach"]
@pytest.mark.asyncio
async def test_event_payload_context_binds_empty_when_optional_envelope_missing() -> None:
snapshot = await resolve_context(
[
{
"type": "event-payload",
"bind_to": "context.metrics",
}
],
)
assert snapshot == {"metrics": {}}
@pytest.mark.asyncio
async def test_event_payload_context_fails_when_required_envelope_missing() -> None:
with pytest.raises(ApplicationError, match="Required context resolver"):
await resolve_context(
[
{
"type": "event-payload",
"bind_to": "context.metrics",
"required": True,
}
],
)

View File

@@ -0,0 +1,155 @@
---
id: ACTIVITY-WP-0011
type: workplan
title: "Event Payload Context Resolver"
domain: custodian
repo: activity-core
status: blocked
owner: codex
topic_slug: custodian
created: "2026-06-18"
updated: "2026-06-18"
state_hub_workstream_id: "4efe4bcf-2148-4489-b57c-87f6039d4ed5"
---
# ACTIVITY-WP-0011 - Event Payload Context Resolver
## Context
State Hub message `d561ebd7-ba01-4dc6-8ffc-fe87d45304ee` from
`kaizen-agentic` handed off an urgent blocker for LOOP-WP-0002:
event-triggered definitions can receive the triggering EventEnvelope JSON, but
activity-core did not bind `source.type: event-payload` into the context
snapshot. The immediate customer is the disabled
`coulomb-low-success-rate-review` ActivityDefinition, whose
`flag-low-success-rate` rule needs to evaluate
`context.metrics.summary.success_rate`.
This is in activity-core scope because the repo owns ActivityDefinition context
resolution and the Event Bridge workflow boundary. The remaining event type
registry and live NATS smoke evidence are cross-repo/operator gates and should
wait in State Hub rather than depending on local kubectl or ad hoc live cluster
access from this repo.
## Implement Event Payload Resolver
```task
id: ACTIVITY-WP-0011-T01
status: done
priority: high
state_hub_task_id: "5c87ce0b-3bd0-4a44-aae5-10d7586c939e"
```
Register resolver type `event-payload` so event-triggered definitions can bind
the triggering EventEnvelope attributes into `context.*`.
Done when:
- `activity_core.context_resolvers` imports and registers an `event-payload`
resolver.
- `resolve_context` parses `event_envelope_json` once and passes the parsed
envelope to registered resolvers.
- `source.type: event-payload` extracts envelope `attributes`.
- `bind_to: context.metrics` strips the `context.` prefix and unwraps a
single-key `{"metrics": ...}` attributes payload into `snapshot["metrics"]`.
- Missing or malformed envelopes fail required sources visibly and bind `{}` for
optional sources.
2026-06-18: Completed in `src/activity_core/activities.py` and
`src/activity_core/context_resolvers/event_payload.py`.
## Cover Binding And Rule Evaluation
```task
id: ACTIVITY-WP-0011-T02
status: done
priority: high
state_hub_task_id: "c6f7dea6-9adc-4997-a22e-4bf2e94dc05a"
```
Add focused tests for the handoff acceptance contract.
Done when:
- sample `kaizen.metrics.recorded` envelope attributes resolve to:
`{"metrics": {"agent": "coach", "project": "kaizen-agentic", "summary": ...}}`;
- `flag-low-success-rate` evaluates
`context.metrics.summary.success_rate < 0.8`;
- optional missing envelopes bind `{}`;
- required missing envelopes raise a visible activity failure.
2026-06-18: Completed in `tests/test_resolve_context_binding.py`. Focused
tests passed:
`.venv/bin/python -m pytest tests/test_resolve_context_binding.py tests/test_rule_evaluation_activity.py`
reported 8 passed, and adjacent rule tests
`.venv/bin/python -m pytest tests/rules/test_evaluator.py tests/rules/test_actions.py`
reported 55 passed.
## Wait For Event Type Registry
```task
id: ACTIVITY-WP-0011-T03
status: wait
priority: high
state_hub_task_id: "a4f277de-eb83-41bc-860e-b26586c72495"
```
Confirm that `kaizen.metrics.recorded` is registered in the shared event type
catalog through the owning State Hub / producer workflow.
Done when:
- State Hub or the producer-owned event catalog exposes
`kaizen.metrics.recorded` with an attributes schema covering
`metrics.agent`, `metrics.project`, and `metrics.summary.success_rate`;
- the registry decision names the owning repo for future schema changes;
- activity-core has no local-only event type drift from the producer contract.
Current wait reason: the event type is producer/catalog owned. Activity-core has
implemented the resolver side and should wait for State Hub-backed registry
confirmation before treating the live customer definition as fully unblocked.
## Wait For Live Event Smoke
```task
id: ACTIVITY-WP-0011-T04
status: wait
priority: high
state_hub_task_id: "3b636d5e-8f93-49b4-ae53-3da4f736a4d9"
```
After T03, run the live event-triggered path without relying on local kubectl
from activity-core.
Done when State Hub records non-secret evidence that:
- a sample `kaizen.metrics.recorded` envelope was published on the expected NATS
subject;
- activity-core triggered `coulomb-low-success-rate-review`;
- the resolved context snapshot contained `context.metrics.summary.success_rate`;
- `flag-low-success-rate` matched and produced the expected task/report output;
- any disabled-definition or operator-controlled enablement state was recorded.
Current wait reason: this is a cross-repo/live-runtime smoke owned by the event
producer, customer definition owner, and cluster/operator path. Activity-core
should consume the evidence from State Hub.
## Close Handoff
```task
id: ACTIVITY-WP-0011-T05
status: wait
priority: medium
state_hub_task_id: "5169d8c5-769f-4272-97cf-c25b31087601"
```
Close the urgent handoff once State Hub has the registry and live smoke
evidence.
Done when:
- State Hub message `d561ebd7-ba01-4dc6-8ffc-fe87d45304ee` is answered or
linked to this workplan;
- `kaizen-agentic` / LOOP-WP-0002 can proceed without an activity-core code
blocker;
- this workplan is marked `finished`.