diff --git a/k8s/railiance/20-runtime.yaml b/k8s/railiance/20-runtime.yaml index afc8b87..2d82ee5 100644 --- a/k8s/railiance/20-runtime.yaml +++ b/k8s/railiance/20-runtime.yaml @@ -164,6 +164,35 @@ data: Kubernetes projection of the Custodian-owned definition in `/home/worsch/the-custodian/activity-definitions/hourly-recently-on-scope.md`. + state-hub-consistency-sweep.md: | + --- + id: "7c4e9a12-8f3b-4d5e-9c6a-1b2d3e4f5a6b" + name: "State Hub Consistency Sweep" + type: activity-definition + version: "1.0" + enabled: false + owner: custodian + governance: custodian + status: proposed + created: "2026-06-21" + trigger: + type: cron + cron_expression: "*/15 * * * *" + timezone: UTC + misfire_policy: skip + context_sources: + - type: state-hub + query: consistency_sweep_remote_all + required: true + params: + max_seconds: 300 + bind_to: context.consistency_sweep_remote_all + --- + + # ActivityDefinition: State Hub Consistency Sweep + + Kubernetes projection of the Custodian-owned definition in + `/home/worsch/the-custodian/activity-definitions/state-hub-consistency-sweep.md`. ops-service-inventory-probes.md: | --- id: "40d15a87-7ff6-4d8e-992c-37df15f95110" diff --git a/src/activity_core/context_resolvers/state_hub.py b/src/activity_core/context_resolvers/state_hub.py index 1af86ba..4de5439 100644 --- a/src/activity_core/context_resolvers/state_hub.py +++ b/src/activity_core/context_resolvers/state_hub.py @@ -12,6 +12,7 @@ Supported queries: - coding_retro: latest /progress/ item with event_type=coding_retro - daily_triage_digest: curated scalar JSON digest for daily WSJF triage - recently_on_scope_hourly: POST {STATE_HUB_URL}/recently-on-scope/hourly + - consistency_sweep_remote_all: POST {STATE_HUB_URL}/consistency/sweep/remote-all No caching — state hub data is live operational state and must not be stale within a single workflow run. @@ -31,6 +32,7 @@ from activity_core.context_resolvers.base import CONTEXT_RESOLVER_REGISTRY, Cont _DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000" _TIMEOUT_SECONDS = 10.0 +_SWEEP_TIMEOUT_SECONDS = 330.0 _OPEN_WORKSTREAM_STATUSES = {"active", "ready", "blocked"} _OPEN_TASK_STATUSES = {"wait", "todo", "progress"} # Sentinel age for repos that have never had an SBOM ingested. Large enough @@ -53,13 +55,26 @@ def _fetch_json(path: str, params: dict[str, Any] | None = None) -> Any: return {} -def _post_json(path: str, payload: dict[str, Any]) -> Any: +def _post_json(path: str, payload: dict[str, Any], *, timeout: float = _TIMEOUT_SECONDS) -> Any: url = f"{_base_url()}{path}" - resp = httpx.post(url, json=payload, timeout=_TIMEOUT_SECONDS) + resp = httpx.post(url, json=payload, timeout=timeout) resp.raise_for_status() return resp.json() +def _validate_consistency_sweep_remote_all(result: Any) -> dict[str, Any]: + if not isinstance(result, dict): + raise RuntimeError("consistency_sweep_remote_all returned a non-object response") + required_keys = {"exit_code", "lock_skipped", "repos_processed"} + missing = required_keys - set(result) + if missing: + missing_list = ", ".join(sorted(missing)) + raise RuntimeError( + f"consistency_sweep_remote_all response missing required key(s): {missing_list}" + ) + return result + + def _validate_recently_on_scope_hourly(result: Any) -> dict[str, Any]: if not isinstance(result, dict): raise RuntimeError("recently_on_scope_hourly returned a non-object response") @@ -107,6 +122,18 @@ class StateHubContextResolver(ContextResolver): } result = _post_json("/recently-on-scope/hourly", payload) return _validate_recently_on_scope_hourly(result) + if query == "consistency_sweep_remote_all": + payload = { + key: value + for key, value in params.items() + if key not in {"required"} + } + result = _post_json( + "/consistency/sweep/remote-all", + payload, + timeout=_SWEEP_TIMEOUT_SECONDS, + ) + return _validate_consistency_sweep_remote_all(result) return {} diff --git a/tests/test_state_hub_context_resolver.py b/tests/test_state_hub_context_resolver.py index 70a744e..4cf968d 100644 --- a/tests/test_state_hub_context_resolver.py +++ b/tests/test_state_hub_context_resolver.py @@ -407,6 +407,70 @@ def test_recently_on_scope_hourly_failure_bubbles(monkeypatch) -> None: StateHubContextResolver().resolve("recently_on_scope_hourly", None, {"range": "1h"}) +def test_consistency_sweep_remote_all_posts_batch(monkeypatch) -> None: + calls: list[dict[str, Any]] = [] + + def fake_post(url: str, **kwargs: Any) -> DummyResponse: + calls.append({"url": url, **kwargs}) + return DummyResponse( + { + "exit_code": 0, + "lock_skipped": False, + "repos_processed": [{"repo_slug": "state-hub", "result": "pass"}], + "skipped_clean": ["quiet-repo"], + "skipped_missing": [], + "skipped_budget": [], + } + ) + + monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test/") + monkeypatch.setattr(httpx, "post", fake_post) + + result = StateHubContextResolver().resolve( + "consistency_sweep_remote_all", + None, + {"max_seconds": 300, "required": True}, + ) + + assert result["exit_code"] == 0 + assert result["repos_processed"][0]["repo_slug"] == "state-hub" + assert calls == [ + { + "url": "http://state-hub.test/consistency/sweep/remote-all", + "json": {"max_seconds": 300}, + "timeout": 330.0, + } + ] + + +def test_consistency_sweep_remote_all_failure_bubbles(monkeypatch) -> None: + def fake_post(url: str, **kwargs: Any) -> DummyResponse: + raise httpx.ConnectError("offline") + + monkeypatch.setattr(httpx, "post", fake_post) + + with pytest.raises(httpx.ConnectError): + StateHubContextResolver().resolve( + "consistency_sweep_remote_all", + None, + {"max_seconds": 300}, + ) + + +def test_consistency_sweep_remote_all_rejects_empty_response(monkeypatch) -> None: + def fake_post(url: str, **kwargs: Any) -> DummyResponse: + return DummyResponse({}) + + monkeypatch.setattr(httpx, "post", fake_post) + + with pytest.raises(RuntimeError, match="missing required key"): + StateHubContextResolver().resolve( + "consistency_sweep_remote_all", + None, + {"max_seconds": 300}, + ) + + def test_recently_on_scope_hourly_rejects_empty_response(monkeypatch) -> None: def fake_post(url: str, **kwargs: Any) -> DummyResponse: return DummyResponse({})