diff --git a/src/activity_core/context_resolvers/state_hub.py b/src/activity_core/context_resolvers/state_hub.py index 5973ac5..9372d0e 100644 --- a/src/activity_core/context_resolvers/state_hub.py +++ b/src/activity_core/context_resolvers/state_hub.py @@ -4,6 +4,10 @@ Registered as source type 'state-hub'. Supported queries: - domain_summary: GET {STATE_HUB_URL}/state/domain/{domain} - repo_sbom_status: GET {STATE_HUB_URL}/sbom/status?repo={repo_slug} + - state_summary: GET {STATE_HUB_URL}/state/summary + - next_steps: GET {STATE_HUB_URL}/state/next_steps + - workplan_index: GET {STATE_HUB_URL}/workstreams/workplan-index + - hub_inbox: GET {STATE_HUB_URL}/messages/?to_agent=hub&unread_only=true No caching — state hub data is live operational state and must not be stale within a single workflow run. @@ -19,24 +23,47 @@ import httpx from activity_core.context_resolvers.base import CONTEXT_RESOLVER_REGISTRY, ContextResolver -_STATE_HUB_URL = os.environ.get("STATE_HUB_URL", "http://127.0.0.1:8000") +_DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000" +_TIMEOUT_SECONDS = 10.0 + + +def _base_url() -> str: + return os.environ.get("STATE_HUB_URL", _DEFAULT_STATE_HUB_URL).rstrip("/") + + +def _fetch_json(path: str, params: dict[str, Any] | None = None) -> Any: + url = f"{_base_url()}{path}" + try: + resp = httpx.get(url, params=params, timeout=_TIMEOUT_SECONDS) + resp.raise_for_status() + return resp.json() + except (httpx.HTTPError, ValueError): + return {} class StateHubContextResolver(ContextResolver): """Fetches live data from the Custodian State Hub.""" - def resolve(self, query: str, event: Any, params: dict[str, Any]) -> dict[str, Any]: - base = _STATE_HUB_URL.rstrip("/") + def resolve(self, query: str, event: Any, params: dict[str, Any]) -> Any: if query == "domain_summary": domain = params.get("domain", "") - resp = httpx.get(f"{base}/state/domain/{domain}", timeout=10.0) - resp.raise_for_status() - return resp.json() + return _fetch_json(f"/state/domain/{domain}") if query == "repo_sbom_status": repo_slug = params.get("repo_slug", "") - resp = httpx.get(f"{base}/sbom/status", params={"repo": repo_slug}, timeout=10.0) - resp.raise_for_status() - return resp.json() + return _fetch_json("/sbom/status", {"repo": repo_slug}) + if query == "state_summary": + return _fetch_json("/state/summary") + if query == "next_steps": + return _fetch_json("/state/next_steps") + if query == "workplan_index": + query_params = dict(params) + return _fetch_json("/workstreams/workplan-index", query_params) + if query == "hub_inbox": + query_params = { + "to_agent": params.get("to_agent", "hub"), + "unread_only": params.get("unread_only", True), + } + return _fetch_json("/messages/", query_params) return {} diff --git a/tests/test_state_hub_context_resolver.py b/tests/test_state_hub_context_resolver.py new file mode 100644 index 0000000..b5e2d13 --- /dev/null +++ b/tests/test_state_hub_context_resolver.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +from typing import Any + +import httpx + +from activity_core.context_resolvers.state_hub import StateHubContextResolver + + +class DummyResponse: + def __init__(self, payload: Any, status_error: Exception | None = None) -> None: + self.payload = payload + self.status_error = status_error + + def raise_for_status(self) -> None: + if self.status_error is not None: + raise self.status_error + + def json(self) -> Any: + return self.payload + + +def test_state_summary_query(monkeypatch) -> None: + calls: list[dict[str, Any]] = [] + + def fake_get(url: str, **kwargs: Any) -> DummyResponse: + calls.append({"url": url, **kwargs}) + return DummyResponse({"tasks": {"todo": 3}}) + + monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test") + monkeypatch.setattr(httpx, "get", fake_get) + + result = StateHubContextResolver().resolve("state_summary", None, {}) + + assert result == {"tasks": {"todo": 3}} + assert calls == [ + { + "url": "http://state-hub.test/state/summary", + "params": None, + "timeout": 10.0, + } + ] + + +def test_daily_triage_queries(monkeypatch) -> None: + calls: list[dict[str, Any]] = [] + + def fake_get(url: str, **kwargs: Any) -> DummyResponse: + calls.append({"url": url, **kwargs}) + return DummyResponse({"url": url, "params": kwargs.get("params")}) + + monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test/") + monkeypatch.setattr(httpx, "get", fake_get) + resolver = StateHubContextResolver() + + resolver.resolve("next_steps", None, {}) + resolver.resolve("workplan_index", None, {"refresh": False}) + resolver.resolve("hub_inbox", None, {"to_agent": "hub", "unread_only": True}) + + assert calls == [ + { + "url": "http://state-hub.test/state/next_steps", + "params": None, + "timeout": 10.0, + }, + { + "url": "http://state-hub.test/workstreams/workplan-index", + "params": {"refresh": False}, + "timeout": 10.0, + }, + { + "url": "http://state-hub.test/messages/", + "params": {"to_agent": "hub", "unread_only": True}, + "timeout": 10.0, + }, + ] + + +def test_existing_queries_still_resolve(monkeypatch) -> None: + calls: list[dict[str, Any]] = [] + + def fake_get(url: str, **kwargs: Any) -> DummyResponse: + calls.append({"url": url, **kwargs}) + return DummyResponse({"ok": True}) + + monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test") + monkeypatch.setattr(httpx, "get", fake_get) + resolver = StateHubContextResolver() + + assert resolver.resolve("domain_summary", None, {"domain": "custodian"}) == {"ok": True} + assert resolver.resolve("repo_sbom_status", None, {"repo_slug": "activity-core"}) == {"ok": True} + + assert calls == [ + { + "url": "http://state-hub.test/state/domain/custodian", + "params": None, + "timeout": 10.0, + }, + { + "url": "http://state-hub.test/sbom/status", + "params": {"repo": "activity-core"}, + "timeout": 10.0, + }, + ] + + +def test_resolver_failure_returns_empty(monkeypatch) -> None: + def fake_get(url: str, **kwargs: Any) -> DummyResponse: + raise httpx.ConnectError("offline") + + monkeypatch.setattr(httpx, "get", fake_get) + + assert StateHubContextResolver().resolve("state_summary", None, {}) == {} + + +def test_unknown_query_returns_empty() -> None: + assert StateHubContextResolver().resolve("unknown", None, {}) == {}