from __future__ import annotations from typing import Any import httpx import pytest from activity_core.context_resolvers.state_hub import StateHubContextResolver class DummyResponse: def __init__(self, payload: Any, status_error: Exception | None = None) -> None: self.payload = payload self.status_error = status_error def raise_for_status(self) -> None: if self.status_error is not None: raise self.status_error def json(self) -> Any: return self.payload def test_state_summary_query(monkeypatch) -> None: calls: list[dict[str, Any]] = [] def fake_get(url: str, **kwargs: Any) -> DummyResponse: calls.append({"url": url, **kwargs}) return DummyResponse({"tasks": {"todo": 3}}) monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test") monkeypatch.setattr(httpx, "get", fake_get) result = StateHubContextResolver().resolve("state_summary", None, {}) assert result == {"tasks": {"todo": 3}} assert calls == [ { "url": "http://state-hub.test/state/summary", "params": None, "timeout": 10.0, } ] def test_daily_triage_queries(monkeypatch) -> None: calls: list[dict[str, Any]] = [] def fake_get(url: str, **kwargs: Any) -> DummyResponse: calls.append({"url": url, **kwargs}) return DummyResponse({"url": url, "params": kwargs.get("params")}) monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test/") monkeypatch.setattr(httpx, "get", fake_get) resolver = StateHubContextResolver() resolver.resolve("next_steps", None, {}) resolver.resolve("workplan_index", None, {"refresh": False}) resolver.resolve("hub_inbox", None, {"to_agent": "hub", "unread_only": True}) assert calls == [ { "url": "http://state-hub.test/state/next_steps", "params": None, "timeout": 10.0, }, { "url": "http://state-hub.test/workstreams/workplan-index", "params": {"refresh": False}, "timeout": 10.0, }, { "url": "http://state-hub.test/messages/", "params": {"to_agent": "hub", "unread_only": True}, "timeout": 10.0, }, ] def test_existing_queries_still_resolve(monkeypatch) -> None: calls: list[dict[str, Any]] = [] def fake_get(url: str, **kwargs: Any) -> DummyResponse: calls.append({"url": url, **kwargs}) if url.endswith("/state/domain/custodian"): return DummyResponse({"ok": True}) if url.endswith("/sbom/activity-core"): return DummyResponse({ "repo_slug": "activity-core", "last_sbom_at": "2026-04-26T11:37:56+00:00", "entry_count": 38, "entries": [], }) raise AssertionError(f"unexpected url {url}") monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test") monkeypatch.setattr(httpx, "get", fake_get) resolver = StateHubContextResolver() assert resolver.resolve("domain_summary", None, {"domain": "custodian"}) == {"ok": True} sbom = resolver.resolve("repo_sbom_status", None, {"repo_slug": "activity-core"}) assert sbom["repo_slug"] == "activity-core" assert sbom["has_sbom"] is True assert sbom["last_sbom_at"] == "2026-04-26T11:37:56+00:00" assert isinstance(sbom["sbom_age_days"], int) and sbom["sbom_age_days"] >= 0 assert [c["url"] for c in calls] == [ "http://state-hub.test/state/domain/custodian", "http://state-hub.test/sbom/activity-core", ] def test_repo_sbom_status_bulk_returns_worst_repo(monkeypatch) -> None: calls: list[dict[str, Any]] = [] def fake_get(url: str, **kwargs: Any) -> DummyResponse: calls.append({"url": url, **kwargs}) return DummyResponse([ {"slug": "fresh-repo", "last_sbom_at": "2099-01-01T00:00:00+00:00"}, {"slug": "stale-repo", "last_sbom_at": "2024-01-01T00:00:00+00:00"}, {"slug": "never-scanned", "last_sbom_at": None}, ]) monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test") monkeypatch.setattr(httpx, "get", fake_get) result = StateHubContextResolver().resolve( "repo_sbom_status", None, {"repos": "all"} ) assert calls == [ {"url": "http://state-hub.test/repos/", "params": None, "timeout": 10.0}, ] assert result["total_count"] == 3 # both stale-repo and never-scanned exceed the 30-day staleness threshold assert result["stale_count"] == 2 assert result["worst_repo_slug"] == "never-scanned" assert result["worst_age_days"] == 99999 by_slug = {entry["repo_slug"]: entry for entry in result["repos"]} assert by_slug["fresh-repo"]["has_sbom"] is True assert by_slug["fresh-repo"]["sbom_age_days"] == 0 assert by_slug["never-scanned"]["has_sbom"] is False assert by_slug["never-scanned"]["last_sbom_at"] is None def test_repo_sbom_status_returns_empty_on_failure(monkeypatch) -> None: def fake_get(url: str, **kwargs: Any) -> DummyResponse: return DummyResponse(None, status_error=httpx.HTTPError("boom")) monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test") monkeypatch.setattr(httpx, "get", fake_get) resolver = StateHubContextResolver() assert resolver.resolve("repo_sbom_status", None, {"repo_slug": "x"}) == {} assert resolver.resolve("repo_sbom_status", None, {"repos": "all"}) == {} def test_coding_retro_returns_latest_progress_suggestions(monkeypatch) -> None: calls: list[dict[str, Any]] = [] def fake_get(url: str, **kwargs: Any) -> DummyResponse: calls.append({"url": url, **kwargs}) return DummyResponse([ { "id": "older-retro", "event_type": "coding_retro", "summary": "older", "created_at": "2026-05-31T17:00:00Z", "detail": { "generated_at": "2026-05-31T17:00:00Z", "suggestions": [ { "repo": "old-repo", "title": "Old recommendation", "recommendation": "Do the older thing.", "priority": "low", "score": 1, } ], }, }, { "id": "note-1", "event_type": "note", "summary": "ignore me", "created_at": "2026-06-07T17:05:00Z", "detail": {}, }, { "id": "newer-retro", "event_type": "coding_retro", "summary": "weekly coding retro ready", "created_at": "2026-06-07T17:10:00Z", "detail": { "generated_at": "2026-06-07T17:09:30Z", "window": { "since": "2026-05-31T00:00:00Z", "until": "2026-06-07T00:00:00Z", }, "suggestions": [ { "target_repo": "activity-core", "title": "Harden schedule smoke gates", "description": "Add a smoke proof before enablement.", "priority": "HIGH", "score": "8.5", }, { "repo_slug": "repo-without-title", "recommendation": "missing title should be skipped", "score": 9, }, ], }, }, ]) monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test/") monkeypatch.setattr(httpx, "get", fake_get) result = StateHubContextResolver().resolve( "coding_retro", None, {"limit": 20, "window_days": 7}, ) assert calls == [ { "url": "http://state-hub.test/progress/", "params": {"limit": 20}, "timeout": 10.0, } ] assert result["source_progress_id"] == "newer-retro" assert result["generated_at"] == "2026-06-07T17:09:30Z" assert result["window"] == { "since": "2026-05-31T00:00:00Z", "until": "2026-06-07T00:00:00Z", } assert result["summary"] == "weekly coding retro ready" assert result["suggestions"] == [ { "repo": "activity-core", "title": "Harden schedule smoke gates", "recommendation": "Add a smoke proof before enablement.", "priority": "high", "score": 8.5, } ] def test_coding_retro_returns_empty_shape_when_not_published(monkeypatch) -> None: def fake_get(url: str, **kwargs: Any) -> DummyResponse: return DummyResponse([ { "id": "note-1", "event_type": "note", "created_at": "2026-06-07T17:10:00Z", } ]) monkeypatch.setattr(httpx, "get", fake_get) result = StateHubContextResolver().resolve( "coding_retro", None, {"event_type": "coding_retro"}, ) assert result == { "suggestions": [], "window": None, "generated_at": None, "source_progress_id": None, "event_type": "coding_retro", "summary": "", } def test_resolver_failure_returns_empty(monkeypatch) -> None: def fake_get(url: str, **kwargs: Any) -> DummyResponse: raise httpx.ConnectError("offline") monkeypatch.setattr(httpx, "get", fake_get) assert StateHubContextResolver().resolve("state_summary", None, {}) == {} def test_unknown_query_returns_empty() -> None: assert StateHubContextResolver().resolve("unknown", None, {}) == {} def test_recently_on_scope_hourly_posts_batch(monkeypatch) -> None: calls: list[dict[str, Any]] = [] def fake_post(url: str, **kwargs: Any) -> DummyResponse: calls.append({"url": url, **kwargs}) return DummyResponse( { "generated": [{"domain_slug": "custodian"}], "skipped": [], "failed": [], } ) monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test/") monkeypatch.setattr(httpx, "post", fake_post) result = StateHubContextResolver().resolve( "recently_on_scope_hourly", None, { "range": "1h", "active_only": True, "include_attention": False, "required": True, }, ) assert result == { "generated": [{"domain_slug": "custodian"}], "skipped": [], "failed": [], } assert calls == [ { "url": "http://state-hub.test/recently-on-scope/hourly", "json": {"range": "1h", "active_only": True, "include_attention": False}, "timeout": 10.0, } ] def test_recently_on_scope_hourly_failure_bubbles(monkeypatch) -> None: def fake_post(url: str, **kwargs: Any) -> DummyResponse: raise httpx.ConnectError("offline") monkeypatch.setattr(httpx, "post", fake_post) with pytest.raises(httpx.ConnectError): StateHubContextResolver().resolve("recently_on_scope_hourly", None, {"range": "1h"}) def test_recently_on_scope_hourly_rejects_empty_response(monkeypatch) -> None: def fake_post(url: str, **kwargs: Any) -> DummyResponse: return DummyResponse({}) monkeypatch.setattr(httpx, "post", fake_post) with pytest.raises(RuntimeError, match="missing required key"): StateHubContextResolver().resolve("recently_on_scope_hourly", None, {"range": "1h"}) def test_daily_triage_digest_is_curated_scalar_json(monkeypatch) -> None: payloads = { "/state/summary": { "generated_at": "2026-05-19T05:20:00Z", "totals": {"tasks": {"todo": 4, "wait": 1}}, "topics": [ { "slug": "custodian", "domain_slug": "custodian", "workstreams": [ { "id": "ws-1", "slug": "cust-wp-0045", "title": "Activity-Core Daily Triage Runner Cutover", "status": "ready", "owner": "custodian", }, { "id": "ws-closed", "slug": "closed", "title": "Closed", "status": "finished", "owner": "custodian", }, ], } ], }, "/workstreams/workplan-index": { "workstreams": { "ws-1": { "repo_slug": "the-custodian", "relative_path": "workplans/CUST-WP-0045.md", "needs_review": True, "health_labels": ["needs_review"], } } }, "/state/next_steps": [ { "type": "resolved_decision", "domain": "custodian", "workstream_id": "ws-1", "workstream_slug": "cust-wp-0045", "workstream_title": "Activity-Core Daily Triage Runner Cutover", "task_id": "task-1", "task_title": "T05 - Update ActivityDefinition", "message": "free text should not be included", } ], "/messages/": [ { "id": "msg-1", "from_agent": "hub", "subject": "Please review", "body": "free text should not be included", "created_at": "2026-05-19T05:00:00Z", } ], "/workstreams/ws-1": { "planning_priority": "high", "planning_order": 45, }, "/tasks/": [ { "id": "task-1", "title": "T05 - Update ActivityDefinition", "status": "todo", "priority": "high", "needs_human": False, "description": "free text should not be included", }, { "id": "task-2", "title": "T06 - Canary Cutover", "status": "wait", "priority": "medium", "needs_human": True, }, ], } def fake_get(url: str, **kwargs: Any) -> DummyResponse: path = url.removeprefix("http://state-hub.test") return DummyResponse(payloads[path]) monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test") monkeypatch.setattr(httpx, "get", fake_get) raw_digest = StateHubContextResolver().resolve( "daily_triage_digest", None, {"max_workstreams": 4, "max_next_steps": 4}, ) assert isinstance(raw_digest, str) assert "free text should not be included" not in raw_digest import json digest = json.loads(raw_digest) assert digest["totals"] == {"tasks": {"todo": 4, "wait": 1}} assert digest["open_workstreams"][0]["slug"] == "cust-wp-0045" assert digest["open_workstreams"][0]["planning_priority"] == "high" assert digest["open_workstreams"][0]["open_task_counts"] == { "wait": 1, "todo": 1, "progress": 0, "needs_human": 1, "open_total": 2, } assert digest["deterministic_scoring"]["future_mode"] == ( "code_score_high_gain_high_effort_candidates" )