Add State Hub RecentlyOnScope invocation

This commit is contained in:
2026-05-22 16:14:10 +02:00
parent f4c38e2d5f
commit 5055f3eaca
4 changed files with 73 additions and 1 deletions

View File

@@ -102,7 +102,8 @@ async def resolve_context(
Returns: {bind_key: resolved_value, ...}
Source types are dispatched via CONTEXT_RESOLVER_REGISTRY.
A resolver that raises logs a warning and binds {} — it does not abort the run.
A resolver that raises logs a warning and binds {} unless the context source
is marked required, in which case the activity fails visibly.
The 'static' type is handled inline without a registry entry.
"""
import activity_core.context_resolvers # noqa: F401 — registers all adapters
@@ -113,6 +114,7 @@ async def resolve_context(
source_type = source.get("type", "")
query = source.get("query", "")
params = source.get("params") or {}
required = bool(source.get("required") or params.get("required", False))
raw_bind = source.get("bind_to") or source.get("name") or source_type
# Strip the 'context.' namespace prefix so evaluator can find the key.
bind_key = raw_bind.removeprefix("context.") if raw_bind.startswith("context.") else raw_bind
@@ -123,6 +125,11 @@ async def resolve_context(
resolver_cls = CONTEXT_RESOLVER_REGISTRY.get(source_type)
if resolver_cls is None:
if required:
raise ApplicationError(
f"Required context source type {source_type!r} is not registered",
non_retryable=True,
)
activity.logger.warning(
"Unknown context source type %r — binding {}",
source_type,
@@ -133,6 +140,10 @@ async def resolve_context(
try:
snapshot[bind_key] = resolver_cls().resolve(query, None, params)
except Exception as exc:
if required:
raise ApplicationError(
f"Required context resolver {source_type!r}/{query!r} failed: {exc}"
) from exc
activity.logger.warning(
"Context resolver %r failed — %s; binding {}",
source_type,

View File

@@ -9,6 +9,7 @@ Supported queries:
- workplan_index: GET {STATE_HUB_URL}/workstreams/workplan-index
- hub_inbox: GET {STATE_HUB_URL}/messages/?to_agent=hub&unread_only=true
- daily_triage_digest: curated scalar JSON digest for daily WSJF triage
- recently_on_scope_hourly: POST {STATE_HUB_URL}/recently-on-scope/hourly
No caching — state hub data is live operational state and must not be stale
within a single workflow run.
@@ -45,6 +46,13 @@ def _fetch_json(path: str, params: dict[str, Any] | None = None) -> Any:
return {}
def _post_json(path: str, payload: dict[str, Any]) -> Any:
url = f"{_base_url()}{path}"
resp = httpx.post(url, json=payload, timeout=_TIMEOUT_SECONDS)
resp.raise_for_status()
return resp.json()
class StateHubContextResolver(ContextResolver):
"""Fetches live data from the Custodian State Hub."""
@@ -70,6 +78,13 @@ class StateHubContextResolver(ContextResolver):
return _fetch_json("/messages/", query_params)
if query == "daily_triage_digest":
return _daily_triage_digest(params)
if query == "recently_on_scope_hourly":
payload = {
key: value
for key, value in params.items()
if key not in {"required"}
}
return _post_json("/recently-on-scope/hourly", payload)
return {}

View File

@@ -132,6 +132,10 @@ class ContextSource(BaseModel):
query: str = Field(default="", description="Named query to execute against the source.")
params: dict[str, Any] = Field(default_factory=dict)
bind_to: str = Field(default="", description="Context key to bind the result to.")
required: bool = Field(
default=False,
description="When true, resolver failures fail the activity run instead of binding {}.",
)
# ── Task templates (legacy) ───────────────────────────────────────────────────

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
from typing import Any
import httpx
import pytest
from activity_core.context_resolvers.state_hub import StateHubContextResolver
@@ -117,6 +118,47 @@ def test_unknown_query_returns_empty() -> None:
assert StateHubContextResolver().resolve("unknown", None, {}) == {}
def test_recently_on_scope_hourly_posts_batch(monkeypatch) -> None:
calls: list[dict[str, Any]] = []
def fake_post(url: str, **kwargs: Any) -> DummyResponse:
calls.append({"url": url, **kwargs})
return DummyResponse({"generated": [{"domain_slug": "custodian"}]})
monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test/")
monkeypatch.setattr(httpx, "post", fake_post)
result = StateHubContextResolver().resolve(
"recently_on_scope_hourly",
None,
{
"range": "1h",
"active_only": True,
"include_attention": False,
"required": True,
},
)
assert result == {"generated": [{"domain_slug": "custodian"}]}
assert calls == [
{
"url": "http://state-hub.test/recently-on-scope/hourly",
"json": {"range": "1h", "active_only": True, "include_attention": False},
"timeout": 10.0,
}
]
def test_recently_on_scope_hourly_failure_bubbles(monkeypatch) -> None:
def fake_post(url: str, **kwargs: Any) -> DummyResponse:
raise httpx.ConnectError("offline")
monkeypatch.setattr(httpx, "post", fake_post)
with pytest.raises(httpx.ConnectError):
StateHubContextResolver().resolve("recently_on_scope_hourly", None, {"range": "1h"})
def test_daily_triage_digest_is_curated_scalar_json(monkeypatch) -> None:
payloads = {
"/state/summary": {