diff --git a/src/activity_core/activities.py b/src/activity_core/activities.py index 9c6957f..3812904 100644 --- a/src/activity_core/activities.py +++ b/src/activity_core/activities.py @@ -102,7 +102,8 @@ async def resolve_context( Returns: {bind_key: resolved_value, ...} Source types are dispatched via CONTEXT_RESOLVER_REGISTRY. - A resolver that raises logs a warning and binds {} — it does not abort the run. + A resolver that raises logs a warning and binds {} unless the context source + is marked required, in which case the activity fails visibly. The 'static' type is handled inline without a registry entry. """ import activity_core.context_resolvers # noqa: F401 — registers all adapters @@ -113,6 +114,7 @@ async def resolve_context( source_type = source.get("type", "") query = source.get("query", "") params = source.get("params") or {} + required = bool(source.get("required") or params.get("required", False)) raw_bind = source.get("bind_to") or source.get("name") or source_type # Strip the 'context.' namespace prefix so evaluator can find the key. bind_key = raw_bind.removeprefix("context.") if raw_bind.startswith("context.") else raw_bind @@ -123,6 +125,11 @@ async def resolve_context( resolver_cls = CONTEXT_RESOLVER_REGISTRY.get(source_type) if resolver_cls is None: + if required: + raise ApplicationError( + f"Required context source type {source_type!r} is not registered", + non_retryable=True, + ) activity.logger.warning( "Unknown context source type %r — binding {}", source_type, @@ -133,6 +140,10 @@ async def resolve_context( try: snapshot[bind_key] = resolver_cls().resolve(query, None, params) except Exception as exc: + if required: + raise ApplicationError( + f"Required context resolver {source_type!r}/{query!r} failed: {exc}" + ) from exc activity.logger.warning( "Context resolver %r failed — %s; binding {}", source_type, diff --git a/src/activity_core/context_resolvers/state_hub.py b/src/activity_core/context_resolvers/state_hub.py index 31da486..31918e3 100644 --- a/src/activity_core/context_resolvers/state_hub.py +++ b/src/activity_core/context_resolvers/state_hub.py @@ -9,6 +9,7 @@ Supported queries: - workplan_index: GET {STATE_HUB_URL}/workstreams/workplan-index - hub_inbox: GET {STATE_HUB_URL}/messages/?to_agent=hub&unread_only=true - daily_triage_digest: curated scalar JSON digest for daily WSJF triage + - recently_on_scope_hourly: POST {STATE_HUB_URL}/recently-on-scope/hourly No caching — state hub data is live operational state and must not be stale within a single workflow run. @@ -45,6 +46,13 @@ def _fetch_json(path: str, params: dict[str, Any] | None = None) -> Any: return {} +def _post_json(path: str, payload: dict[str, Any]) -> Any: + url = f"{_base_url()}{path}" + resp = httpx.post(url, json=payload, timeout=_TIMEOUT_SECONDS) + resp.raise_for_status() + return resp.json() + + class StateHubContextResolver(ContextResolver): """Fetches live data from the Custodian State Hub.""" @@ -70,6 +78,13 @@ class StateHubContextResolver(ContextResolver): return _fetch_json("/messages/", query_params) if query == "daily_triage_digest": return _daily_triage_digest(params) + if query == "recently_on_scope_hourly": + payload = { + key: value + for key, value in params.items() + if key not in {"required"} + } + return _post_json("/recently-on-scope/hourly", payload) return {} diff --git a/src/activity_core/models.py b/src/activity_core/models.py index a5d2700..fad6c96 100644 --- a/src/activity_core/models.py +++ b/src/activity_core/models.py @@ -132,6 +132,10 @@ class ContextSource(BaseModel): query: str = Field(default="", description="Named query to execute against the source.") params: dict[str, Any] = Field(default_factory=dict) bind_to: str = Field(default="", description="Context key to bind the result to.") + required: bool = Field( + default=False, + description="When true, resolver failures fail the activity run instead of binding {}.", + ) # ── Task templates (legacy) ─────────────────────────────────────────────────── diff --git a/tests/test_state_hub_context_resolver.py b/tests/test_state_hub_context_resolver.py index 4702e97..2350717 100644 --- a/tests/test_state_hub_context_resolver.py +++ b/tests/test_state_hub_context_resolver.py @@ -3,6 +3,7 @@ from __future__ import annotations from typing import Any import httpx +import pytest from activity_core.context_resolvers.state_hub import StateHubContextResolver @@ -117,6 +118,47 @@ def test_unknown_query_returns_empty() -> None: assert StateHubContextResolver().resolve("unknown", None, {}) == {} +def test_recently_on_scope_hourly_posts_batch(monkeypatch) -> None: + calls: list[dict[str, Any]] = [] + + def fake_post(url: str, **kwargs: Any) -> DummyResponse: + calls.append({"url": url, **kwargs}) + return DummyResponse({"generated": [{"domain_slug": "custodian"}]}) + + monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test/") + monkeypatch.setattr(httpx, "post", fake_post) + + result = StateHubContextResolver().resolve( + "recently_on_scope_hourly", + None, + { + "range": "1h", + "active_only": True, + "include_attention": False, + "required": True, + }, + ) + + assert result == {"generated": [{"domain_slug": "custodian"}]} + assert calls == [ + { + "url": "http://state-hub.test/recently-on-scope/hourly", + "json": {"range": "1h", "active_only": True, "include_attention": False}, + "timeout": 10.0, + } + ] + + +def test_recently_on_scope_hourly_failure_bubbles(monkeypatch) -> None: + def fake_post(url: str, **kwargs: Any) -> DummyResponse: + raise httpx.ConnectError("offline") + + monkeypatch.setattr(httpx, "post", fake_post) + + with pytest.raises(httpx.ConnectError): + StateHubContextResolver().resolve("recently_on_scope_hourly", None, {"range": "1h"}) + + def test_daily_triage_digest_is_curated_scalar_json(monkeypatch) -> None: payloads = { "/state/summary": {