From 0554014083864b7ed286f16b1fb033a8739ef28e Mon Sep 17 00:00:00 2001 From: tegwick Date: Thu, 18 Jun 2026 14:01:11 +0200 Subject: [PATCH] Add event-payload context resolver --- src/activity_core/activities.py | 22 ++- .../context_resolvers/__init__.py | 8 +- .../context_resolvers/event_payload.py | 51 ++++++ tests/test_resolve_context_binding.py | 119 +++++++++++++- ...-WP-0011-event-payload-context-resolver.md | 155 ++++++++++++++++++ 5 files changed, 351 insertions(+), 4 deletions(-) create mode 100644 src/activity_core/context_resolvers/event_payload.py create mode 100644 workplans/ACTIVITY-WP-0011-event-payload-context-resolver.md diff --git a/src/activity_core/activities.py b/src/activity_core/activities.py index 19e0bfb..6fe6357 100644 --- a/src/activity_core/activities.py +++ b/src/activity_core/activities.py @@ -11,6 +11,7 @@ activities that need DB access. from __future__ import annotations +import json import uuid from datetime import datetime, timezone from typing import Any @@ -65,6 +66,24 @@ def _bind_resolver_result(bind_key: str, result: Any) -> Any: return result +def _parse_event_envelope(event_envelope_json: str | None) -> dict[str, Any] | None: + """Parse an event envelope JSON string for context resolvers.""" + if not event_envelope_json: + return None + try: + payload = json.loads(event_envelope_json) + except (TypeError, json.JSONDecodeError) as exc: + activity.logger.warning("Invalid event envelope JSON - %s", exc) + return None + if not isinstance(payload, dict): + activity.logger.warning( + "Invalid event envelope JSON - expected object, got %s", + type(payload).__name__, + ) + return None + return payload + + # ── Activities ───────────────────────────────────────────────────────────────── @activity.defn @@ -124,6 +143,7 @@ async def resolve_context( from activity_core.context_resolvers.base import CONTEXT_RESOLVER_REGISTRY snapshot: dict = {} + event_envelope = _parse_event_envelope(event_envelope_json) for source in context_sources: source_type = source.get("type", "") query = source.get("query", "") @@ -152,7 +172,7 @@ async def resolve_context( continue try: - resolved = resolver_cls().resolve(query, None, params) + resolved = resolver_cls().resolve(query, event_envelope, params) snapshot[bind_key] = _bind_resolver_result(bind_key, resolved) except Exception as exc: if required: diff --git a/src/activity_core/context_resolvers/__init__.py b/src/activity_core/context_resolvers/__init__.py index 002d059..9391ffe 100644 --- a/src/activity_core/context_resolvers/__init__.py +++ b/src/activity_core/context_resolvers/__init__.py @@ -1 +1,7 @@ -from activity_core.context_resolvers import kaizen, ops_inventory, repo_scoping, state_hub # noqa: F401 +from activity_core.context_resolvers import ( # noqa: F401 + event_payload, + kaizen, + ops_inventory, + repo_scoping, + state_hub, +) diff --git a/src/activity_core/context_resolvers/event_payload.py b/src/activity_core/context_resolvers/event_payload.py new file mode 100644 index 0000000..fbb271d --- /dev/null +++ b/src/activity_core/context_resolvers/event_payload.py @@ -0,0 +1,51 @@ +"""Event payload context adapter. + +Registered as source type ``event-payload``. It exposes the triggering +EventEnvelope attributes to event-triggered ActivityDefinitions without +requiring an external context service call. +""" + +from __future__ import annotations + +from typing import Any + +from activity_core.context_resolvers.base import CONTEXT_RESOLVER_REGISTRY, ContextResolver + + +class EventPayloadContextResolver(ContextResolver): + """Resolve context from the triggering event envelope attributes.""" + + def resolve(self, query: str, event: Any, params: dict[str, Any]) -> Any: + attributes = _event_attributes(event) + if query in {"", "attributes"}: + return attributes + if query.startswith("attributes."): + return _resolve_path(attributes, query.removeprefix("attributes.")) + return _resolve_path(attributes, query) + + +def _event_attributes(event: Any) -> dict[str, Any]: + if not isinstance(event, dict): + raise RuntimeError("event-payload source requires an event envelope") + attributes = event.get("attributes") + if not isinstance(attributes, dict): + raise RuntimeError("event-payload source requires envelope attributes") + return attributes + + +def _resolve_path(root: dict[str, Any], path: str) -> Any: + if not path: + return root + current: Any = root + for part in path.split("."): + if not part: + return {} + if not isinstance(current, dict): + return {} + current = current.get(part) + if current is None: + return {} + return current + + +CONTEXT_RESOLVER_REGISTRY["event-payload"] = EventPayloadContextResolver diff --git a/tests/test_resolve_context_binding.py b/tests/test_resolve_context_binding.py index 74790f3..124050e 100644 --- a/tests/test_resolve_context_binding.py +++ b/tests/test_resolve_context_binding.py @@ -1,7 +1,11 @@ from __future__ import annotations -import pytest +import json +import pytest +from temporalio.exceptions import ApplicationError + +from activity_core import activities from activity_core.activities import _bind_resolver_result, resolve_context @@ -42,4 +46,115 @@ async def test_resolve_context_unwraps_kaizen_projects(monkeypatch) -> None: ] ) - assert snapshot == {"projects": [{"repo": "pilot", "has_metrics": True}]} \ No newline at end of file + assert snapshot == {"projects": [{"repo": "pilot", "has_metrics": True}]} + + +@pytest.mark.asyncio +async def test_resolve_context_binds_event_payload_attributes() -> None: + envelope = { + "type": "kaizen.metrics.recorded", + "attributes": { + "agent": "coach", + "project": "kaizen-agentic", + "summary": { + "success_rate": 0.75, + "execution_count": 12, + "avg_quality": 0.81, + }, + }, + } + + snapshot = await resolve_context( + [ + { + "type": "event-payload", + "bind_to": "context.metrics", + } + ], + json.dumps(envelope), + ) + + assert snapshot == { + "metrics": { + "agent": "coach", + "project": "kaizen-agentic", + "summary": { + "success_rate": 0.75, + "execution_count": 12, + "avg_quality": 0.81, + }, + } + } + + +@pytest.mark.asyncio +async def test_event_payload_context_supports_low_success_rate_rule() -> None: + snapshot = await resolve_context( + [ + { + "type": "event-payload", + "bind_to": "context.metrics", + } + ], + json.dumps({ + "type": "kaizen.metrics.recorded", + "attributes": { + "agent": "coach", + "project": "kaizen-agentic", + "summary": {"success_rate": 0.75}, + }, + }), + ) + + result = await activities.evaluate_rules({ + "rules": [ + { + "id": "flag-low-success-rate", + "condition": "context.metrics.summary.success_rate < 0.8", + "action": { + "task_template": ( + "Review low success rate for {context.metrics.agent}" + ), + "target_repo": "context.metrics.project", + "priority": "high", + "labels": ["kaizen", "{context.metrics.agent}"], + }, + } + ], + "event": {}, + "context": snapshot, + }) + + assert len(result) == 1 + assert result[0]["source_id"] == "flag-low-success-rate" + assert result[0]["title"] == "Review low success rate for coach" + assert result[0]["target_repo"] == "kaizen-agentic" + assert result[0]["labels"] == ["kaizen", "coach"] + + +@pytest.mark.asyncio +async def test_event_payload_context_binds_empty_when_optional_envelope_missing() -> None: + snapshot = await resolve_context( + [ + { + "type": "event-payload", + "bind_to": "context.metrics", + } + ], + ) + + assert snapshot == {"metrics": {}} + + +@pytest.mark.asyncio +async def test_event_payload_context_fails_when_required_envelope_missing() -> None: + with pytest.raises(ApplicationError, match="Required context resolver"): + await resolve_context( + [ + { + "type": "event-payload", + "bind_to": "context.metrics", + "required": True, + } + ], + ) diff --git a/workplans/ACTIVITY-WP-0011-event-payload-context-resolver.md b/workplans/ACTIVITY-WP-0011-event-payload-context-resolver.md new file mode 100644 index 0000000..85b202c --- /dev/null +++ b/workplans/ACTIVITY-WP-0011-event-payload-context-resolver.md @@ -0,0 +1,155 @@ +--- +id: ACTIVITY-WP-0011 +type: workplan +title: "Event Payload Context Resolver" +domain: custodian +repo: activity-core +status: blocked +owner: codex +topic_slug: custodian +created: "2026-06-18" +updated: "2026-06-18" +state_hub_workstream_id: "4efe4bcf-2148-4489-b57c-87f6039d4ed5" +--- + +# ACTIVITY-WP-0011 - Event Payload Context Resolver + +## Context + +State Hub message `d561ebd7-ba01-4dc6-8ffc-fe87d45304ee` from +`kaizen-agentic` handed off an urgent blocker for LOOP-WP-0002: +event-triggered definitions can receive the triggering EventEnvelope JSON, but +activity-core did not bind `source.type: event-payload` into the context +snapshot. The immediate customer is the disabled +`coulomb-low-success-rate-review` ActivityDefinition, whose +`flag-low-success-rate` rule needs to evaluate +`context.metrics.summary.success_rate`. + +This is in activity-core scope because the repo owns ActivityDefinition context +resolution and the Event Bridge workflow boundary. The remaining event type +registry and live NATS smoke evidence are cross-repo/operator gates and should +wait in State Hub rather than depending on local kubectl or ad hoc live cluster +access from this repo. + +## Implement Event Payload Resolver + +```task +id: ACTIVITY-WP-0011-T01 +status: done +priority: high +state_hub_task_id: "5c87ce0b-3bd0-4a44-aae5-10d7586c939e" +``` + +Register resolver type `event-payload` so event-triggered definitions can bind +the triggering EventEnvelope attributes into `context.*`. + +Done when: + +- `activity_core.context_resolvers` imports and registers an `event-payload` + resolver. +- `resolve_context` parses `event_envelope_json` once and passes the parsed + envelope to registered resolvers. +- `source.type: event-payload` extracts envelope `attributes`. +- `bind_to: context.metrics` strips the `context.` prefix and unwraps a + single-key `{"metrics": ...}` attributes payload into `snapshot["metrics"]`. +- Missing or malformed envelopes fail required sources visibly and bind `{}` for + optional sources. + +2026-06-18: Completed in `src/activity_core/activities.py` and +`src/activity_core/context_resolvers/event_payload.py`. + +## Cover Binding And Rule Evaluation + +```task +id: ACTIVITY-WP-0011-T02 +status: done +priority: high +state_hub_task_id: "c6f7dea6-9adc-4997-a22e-4bf2e94dc05a" +``` + +Add focused tests for the handoff acceptance contract. + +Done when: + +- sample `kaizen.metrics.recorded` envelope attributes resolve to: + `{"metrics": {"agent": "coach", "project": "kaizen-agentic", "summary": ...}}`; +- `flag-low-success-rate` evaluates + `context.metrics.summary.success_rate < 0.8`; +- optional missing envelopes bind `{}`; +- required missing envelopes raise a visible activity failure. + +2026-06-18: Completed in `tests/test_resolve_context_binding.py`. Focused +tests passed: +`.venv/bin/python -m pytest tests/test_resolve_context_binding.py tests/test_rule_evaluation_activity.py` +reported 8 passed, and adjacent rule tests +`.venv/bin/python -m pytest tests/rules/test_evaluator.py tests/rules/test_actions.py` +reported 55 passed. + +## Wait For Event Type Registry + +```task +id: ACTIVITY-WP-0011-T03 +status: wait +priority: high +state_hub_task_id: "a4f277de-eb83-41bc-860e-b26586c72495" +``` + +Confirm that `kaizen.metrics.recorded` is registered in the shared event type +catalog through the owning State Hub / producer workflow. + +Done when: + +- State Hub or the producer-owned event catalog exposes + `kaizen.metrics.recorded` with an attributes schema covering + `metrics.agent`, `metrics.project`, and `metrics.summary.success_rate`; +- the registry decision names the owning repo for future schema changes; +- activity-core has no local-only event type drift from the producer contract. + +Current wait reason: the event type is producer/catalog owned. Activity-core has +implemented the resolver side and should wait for State Hub-backed registry +confirmation before treating the live customer definition as fully unblocked. + +## Wait For Live Event Smoke + +```task +id: ACTIVITY-WP-0011-T04 +status: wait +priority: high +state_hub_task_id: "3b636d5e-8f93-49b4-ae53-3da4f736a4d9" +``` + +After T03, run the live event-triggered path without relying on local kubectl +from activity-core. + +Done when State Hub records non-secret evidence that: + +- a sample `kaizen.metrics.recorded` envelope was published on the expected NATS + subject; +- activity-core triggered `coulomb-low-success-rate-review`; +- the resolved context snapshot contained `context.metrics.summary.success_rate`; +- `flag-low-success-rate` matched and produced the expected task/report output; +- any disabled-definition or operator-controlled enablement state was recorded. + +Current wait reason: this is a cross-repo/live-runtime smoke owned by the event +producer, customer definition owner, and cluster/operator path. Activity-core +should consume the evidence from State Hub. + +## Close Handoff + +```task +id: ACTIVITY-WP-0011-T05 +status: wait +priority: medium +state_hub_task_id: "5169d8c5-769f-4272-97cf-c25b31087601" +``` + +Close the urgent handoff once State Hub has the registry and live smoke +evidence. + +Done when: + +- State Hub message `d561ebd7-ba01-4dc6-8ffc-fe87d45304ee` is answered or + linked to this workplan; +- `kaizen-agentic` / LOOP-WP-0002 can proceed without an activity-core code + blocker; +- this workplan is marked `finished`.