Enable hourly RecentlyOnScope rollout

This commit is contained in:
2026-05-23 02:51:54 +02:00
parent 5055f3eaca
commit ca6d80ec07
3 changed files with 214 additions and 5 deletions

View File

@@ -53,6 +53,19 @@ def _post_json(path: str, payload: dict[str, Any]) -> Any:
return resp.json()
def _validate_recently_on_scope_hourly(result: Any) -> dict[str, Any]:
if not isinstance(result, dict):
raise RuntimeError("recently_on_scope_hourly returned a non-object response")
required_keys = {"generated", "skipped", "failed"}
missing = required_keys - set(result)
if missing:
missing_list = ", ".join(sorted(missing))
raise RuntimeError(
f"recently_on_scope_hourly response missing required key(s): {missing_list}"
)
return result
class StateHubContextResolver(ContextResolver):
"""Fetches live data from the Custodian State Hub."""
@@ -84,7 +97,8 @@ class StateHubContextResolver(ContextResolver):
for key, value in params.items()
if key not in {"required"}
}
return _post_json("/recently-on-scope/hourly", payload)
result = _post_json("/recently-on-scope/hourly", payload)
return _validate_recently_on_scope_hourly(result)
return {}