feat(STATE-WP-0064): wire consistency_sweep_remote_all state-hub query

Add POST /consistency/sweep/remote-all resolver support with a 330s
timeout and k8s projection for the consistency sweep definition.
This commit is contained in:
2026-06-21 20:19:22 +02:00
parent dbd2fbb11c
commit 3a981cc98f
3 changed files with 122 additions and 2 deletions

View File

@@ -164,6 +164,35 @@ data:
Kubernetes projection of the Custodian-owned definition in
`/home/worsch/the-custodian/activity-definitions/hourly-recently-on-scope.md`.
state-hub-consistency-sweep.md: |
---
id: "7c4e9a12-8f3b-4d5e-9c6a-1b2d3e4f5a6b"
name: "State Hub Consistency Sweep"
type: activity-definition
version: "1.0"
enabled: false
owner: custodian
governance: custodian
status: proposed
created: "2026-06-21"
trigger:
type: cron
cron_expression: "*/15 * * * *"
timezone: UTC
misfire_policy: skip
context_sources:
- type: state-hub
query: consistency_sweep_remote_all
required: true
params:
max_seconds: 300
bind_to: context.consistency_sweep_remote_all
---
# ActivityDefinition: State Hub Consistency Sweep
Kubernetes projection of the Custodian-owned definition in
`/home/worsch/the-custodian/activity-definitions/state-hub-consistency-sweep.md`.
ops-service-inventory-probes.md: |
---
id: "40d15a87-7ff6-4d8e-992c-37df15f95110"

View File

@@ -12,6 +12,7 @@ Supported queries:
- coding_retro: latest /progress/ item with event_type=coding_retro
- daily_triage_digest: curated scalar JSON digest for daily WSJF triage
- recently_on_scope_hourly: POST {STATE_HUB_URL}/recently-on-scope/hourly
- consistency_sweep_remote_all: POST {STATE_HUB_URL}/consistency/sweep/remote-all
No caching — state hub data is live operational state and must not be stale
within a single workflow run.
@@ -31,6 +32,7 @@ from activity_core.context_resolvers.base import CONTEXT_RESOLVER_REGISTRY, Cont
_DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000"
_TIMEOUT_SECONDS = 10.0
_SWEEP_TIMEOUT_SECONDS = 330.0
_OPEN_WORKSTREAM_STATUSES = {"active", "ready", "blocked"}
_OPEN_TASK_STATUSES = {"wait", "todo", "progress"}
# Sentinel age for repos that have never had an SBOM ingested. Large enough
@@ -53,13 +55,26 @@ def _fetch_json(path: str, params: dict[str, Any] | None = None) -> Any:
return {}
def _post_json(path: str, payload: dict[str, Any]) -> Any:
def _post_json(path: str, payload: dict[str, Any], *, timeout: float = _TIMEOUT_SECONDS) -> Any:
url = f"{_base_url()}{path}"
resp = httpx.post(url, json=payload, timeout=_TIMEOUT_SECONDS)
resp = httpx.post(url, json=payload, timeout=timeout)
resp.raise_for_status()
return resp.json()
def _validate_consistency_sweep_remote_all(result: Any) -> dict[str, Any]:
if not isinstance(result, dict):
raise RuntimeError("consistency_sweep_remote_all returned a non-object response")
required_keys = {"exit_code", "lock_skipped", "repos_processed"}
missing = required_keys - set(result)
if missing:
missing_list = ", ".join(sorted(missing))
raise RuntimeError(
f"consistency_sweep_remote_all response missing required key(s): {missing_list}"
)
return result
def _validate_recently_on_scope_hourly(result: Any) -> dict[str, Any]:
if not isinstance(result, dict):
raise RuntimeError("recently_on_scope_hourly returned a non-object response")
@@ -107,6 +122,18 @@ class StateHubContextResolver(ContextResolver):
}
result = _post_json("/recently-on-scope/hourly", payload)
return _validate_recently_on_scope_hourly(result)
if query == "consistency_sweep_remote_all":
payload = {
key: value
for key, value in params.items()
if key not in {"required"}
}
result = _post_json(
"/consistency/sweep/remote-all",
payload,
timeout=_SWEEP_TIMEOUT_SECONDS,
)
return _validate_consistency_sweep_remote_all(result)
return {}

View File

@@ -407,6 +407,70 @@ def test_recently_on_scope_hourly_failure_bubbles(monkeypatch) -> None:
StateHubContextResolver().resolve("recently_on_scope_hourly", None, {"range": "1h"})
def test_consistency_sweep_remote_all_posts_batch(monkeypatch) -> None:
calls: list[dict[str, Any]] = []
def fake_post(url: str, **kwargs: Any) -> DummyResponse:
calls.append({"url": url, **kwargs})
return DummyResponse(
{
"exit_code": 0,
"lock_skipped": False,
"repos_processed": [{"repo_slug": "state-hub", "result": "pass"}],
"skipped_clean": ["quiet-repo"],
"skipped_missing": [],
"skipped_budget": [],
}
)
monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test/")
monkeypatch.setattr(httpx, "post", fake_post)
result = StateHubContextResolver().resolve(
"consistency_sweep_remote_all",
None,
{"max_seconds": 300, "required": True},
)
assert result["exit_code"] == 0
assert result["repos_processed"][0]["repo_slug"] == "state-hub"
assert calls == [
{
"url": "http://state-hub.test/consistency/sweep/remote-all",
"json": {"max_seconds": 300},
"timeout": 330.0,
}
]
def test_consistency_sweep_remote_all_failure_bubbles(monkeypatch) -> None:
def fake_post(url: str, **kwargs: Any) -> DummyResponse:
raise httpx.ConnectError("offline")
monkeypatch.setattr(httpx, "post", fake_post)
with pytest.raises(httpx.ConnectError):
StateHubContextResolver().resolve(
"consistency_sweep_remote_all",
None,
{"max_seconds": 300},
)
def test_consistency_sweep_remote_all_rejects_empty_response(monkeypatch) -> None:
def fake_post(url: str, **kwargs: Any) -> DummyResponse:
return DummyResponse({})
monkeypatch.setattr(httpx, "post", fake_post)
with pytest.raises(RuntimeError, match="missing required key"):
StateHubContextResolver().resolve(
"consistency_sweep_remote_all",
None,
{"max_seconds": 300},
)
def test_recently_on_scope_hourly_rejects_empty_response(monkeypatch) -> None:
def fake_post(url: str, **kwargs: Any) -> DummyResponse:
return DummyResponse({})