Files
activity-core/tests/test_state_hub_context_resolver.py

233 lines
7.4 KiB
Python

from __future__ import annotations
from typing import Any
import httpx
from activity_core.context_resolvers.state_hub import StateHubContextResolver
class DummyResponse:
def __init__(self, payload: Any, status_error: Exception | None = None) -> None:
self.payload = payload
self.status_error = status_error
def raise_for_status(self) -> None:
if self.status_error is not None:
raise self.status_error
def json(self) -> Any:
return self.payload
def test_state_summary_query(monkeypatch) -> None:
calls: list[dict[str, Any]] = []
def fake_get(url: str, **kwargs: Any) -> DummyResponse:
calls.append({"url": url, **kwargs})
return DummyResponse({"tasks": {"todo": 3}})
monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test")
monkeypatch.setattr(httpx, "get", fake_get)
result = StateHubContextResolver().resolve("state_summary", None, {})
assert result == {"tasks": {"todo": 3}}
assert calls == [
{
"url": "http://state-hub.test/state/summary",
"params": None,
"timeout": 10.0,
}
]
def test_daily_triage_queries(monkeypatch) -> None:
calls: list[dict[str, Any]] = []
def fake_get(url: str, **kwargs: Any) -> DummyResponse:
calls.append({"url": url, **kwargs})
return DummyResponse({"url": url, "params": kwargs.get("params")})
monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test/")
monkeypatch.setattr(httpx, "get", fake_get)
resolver = StateHubContextResolver()
resolver.resolve("next_steps", None, {})
resolver.resolve("workplan_index", None, {"refresh": False})
resolver.resolve("hub_inbox", None, {"to_agent": "hub", "unread_only": True})
assert calls == [
{
"url": "http://state-hub.test/state/next_steps",
"params": None,
"timeout": 10.0,
},
{
"url": "http://state-hub.test/workstreams/workplan-index",
"params": {"refresh": False},
"timeout": 10.0,
},
{
"url": "http://state-hub.test/messages/",
"params": {"to_agent": "hub", "unread_only": True},
"timeout": 10.0,
},
]
def test_existing_queries_still_resolve(monkeypatch) -> None:
calls: list[dict[str, Any]] = []
def fake_get(url: str, **kwargs: Any) -> DummyResponse:
calls.append({"url": url, **kwargs})
return DummyResponse({"ok": True})
monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test")
monkeypatch.setattr(httpx, "get", fake_get)
resolver = StateHubContextResolver()
assert resolver.resolve("domain_summary", None, {"domain": "custodian"}) == {"ok": True}
assert resolver.resolve("repo_sbom_status", None, {"repo_slug": "activity-core"}) == {"ok": True}
assert calls == [
{
"url": "http://state-hub.test/state/domain/custodian",
"params": None,
"timeout": 10.0,
},
{
"url": "http://state-hub.test/sbom/status",
"params": {"repo": "activity-core"},
"timeout": 10.0,
},
]
def test_resolver_failure_returns_empty(monkeypatch) -> None:
def fake_get(url: str, **kwargs: Any) -> DummyResponse:
raise httpx.ConnectError("offline")
monkeypatch.setattr(httpx, "get", fake_get)
assert StateHubContextResolver().resolve("state_summary", None, {}) == {}
def test_unknown_query_returns_empty() -> None:
assert StateHubContextResolver().resolve("unknown", None, {}) == {}
def test_daily_triage_digest_is_curated_scalar_json(monkeypatch) -> None:
payloads = {
"/state/summary": {
"generated_at": "2026-05-19T05:20:00Z",
"totals": {"tasks": {"todo": 4, "blocked": 1}},
"topics": [
{
"slug": "custodian",
"domain_slug": "custodian",
"workstreams": [
{
"id": "ws-1",
"slug": "cust-wp-0045",
"title": "Activity-Core Daily Triage Runner Cutover",
"status": "ready",
"owner": "custodian",
},
{
"id": "ws-closed",
"slug": "closed",
"title": "Closed",
"status": "finished",
"owner": "custodian",
},
],
}
],
},
"/workstreams/workplan-index": {
"workstreams": {
"ws-1": {
"repo_slug": "the-custodian",
"relative_path": "workplans/CUST-WP-0045.md",
"needs_review": True,
"health_labels": ["needs_review"],
}
}
},
"/state/next_steps": [
{
"type": "resolved_decision",
"domain": "custodian",
"workstream_id": "ws-1",
"workstream_slug": "cust-wp-0045",
"workstream_title": "Activity-Core Daily Triage Runner Cutover",
"task_id": "task-1",
"task_title": "T05 - Update ActivityDefinition",
"message": "free text should not be included",
}
],
"/messages/": [
{
"id": "msg-1",
"from_agent": "hub",
"subject": "Please review",
"body": "free text should not be included",
"created_at": "2026-05-19T05:00:00Z",
}
],
"/workstreams/ws-1": {
"planning_priority": "high",
"planning_order": 45,
},
"/tasks/": [
{
"id": "task-1",
"title": "T05 - Update ActivityDefinition",
"status": "todo",
"priority": "high",
"needs_human": False,
"description": "free text should not be included",
},
{
"id": "task-2",
"title": "T06 - Canary Cutover",
"status": "blocked",
"priority": "medium",
"needs_human": True,
},
],
}
def fake_get(url: str, **kwargs: Any) -> DummyResponse:
path = url.removeprefix("http://state-hub.test")
return DummyResponse(payloads[path])
monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test")
monkeypatch.setattr(httpx, "get", fake_get)
raw_digest = StateHubContextResolver().resolve(
"daily_triage_digest",
None,
{"max_workstreams": 4, "max_next_steps": 4},
)
assert isinstance(raw_digest, str)
assert "free text should not be included" not in raw_digest
import json
digest = json.loads(raw_digest)
assert digest["totals"] == {"tasks": {"todo": 4, "blocked": 1}}
assert digest["open_workstreams"][0]["slug"] == "cust-wp-0045"
assert digest["open_workstreams"][0]["planning_priority"] == "high"
assert digest["open_workstreams"][0]["open_task_counts"] == {
"todo": 1,
"in_progress": 0,
"blocked": 1,
"needs_human": 1,
"open_total": 2,
}
assert digest["deterministic_scoring"]["future_mode"] == (
"code_score_high_gain_high_effort_candidates"
)