from __future__ import annotations import json from typing import Any import httpx from activity_core.ops_evidence_sinks import persist_ops_inventory_evidence class DummyResponse: def __init__(self, payload: Any) -> None: self.payload = payload def raise_for_status(self) -> None: return None def json(self) -> Any: return self.payload def _payload(sinks: list[dict[str, Any]]) -> dict[str, Any]: return { "activity_id": "activity-1", "run_id": "12345678-aaaa-bbbb-cccc-123456789abc", "scheduled_for": "2026-06-05T10:15:00+00:00", "version_used": 1, "context_sources": [ { "type": "ops-inventory", "query": "probe_services", "bind_to": "context.ops_probe", "params": {"evidence_sinks": sinks}, } ], "context": { "ops_probe": { "generated_at": "2026-06-05T10:15:01+00:00", "inventory_path": "/tmp/service-inventory.yml", "summary": {"ok": 1, "degraded": 0, "down": 0, "skipped": 1}, "services": [ { "service_id": "state-hub", "name": "State Hub", "kind": "coordination-service", "environment": "local", "lifecycle_state": "observed", "declared_health_status": "unknown", "owner_repos": ["state-hub"], "endpoint_count": 1, "access_path_count": 1, } ], "endpoints": [ { "service_id": "state-hub", "service_name": "State Hub", "endpoint_id": "state-hub-health", "endpoint_type": "http", "url": "http://user:pass@state-hub.test/health?token=secret", "expected_status": 200, "expected_signal_present": True, "widget_ref": "ops:endpoint:state-hub-health", "status": "ok", "status_code": 200, "matched_expected_status": True, "matched_expected_signal": True, "response_body": "secret response body", "headers": {"Authorization": "Bearer secret"}, } ], "access_paths": [ { "service_id": "state-hub", "service_name": "State Hub", "access_path_id": "state-hub-access-1", "access_path_type": "k8s", "declared_status": "unknown", "status": "skipped", "reason": "unsupported_access_path_type", } ], } }, } def test_state_hub_progress_sink_posts_compact_probe_summary(monkeypatch) -> None: posts: list[dict[str, Any]] = [] def fake_get(url: str, **kwargs: Any) -> DummyResponse: assert url == "http://state-hub.test/progress/" return DummyResponse([]) def fake_post(url: str, **kwargs: Any) -> DummyResponse: posts.append({"url": url, **kwargs}) return DummyResponse({"id": "progress-1"}) monkeypatch.setattr(httpx, "get", fake_get) monkeypatch.setattr(httpx, "post", fake_post) result = persist_ops_inventory_evidence( _payload([ { "type": "state-hub-progress", "state_hub_url": "http://state-hub.test", "event_type": "ops_inventory_probe", "workstream_id": "workstream-1", "task_id": "task-1", } ]) ) assert result == [ { "type": "state-hub-progress", "status": "posted", "event_type": "ops_inventory_probe", "progress_id": "progress-1", "idempotency_key": "12345678-aaaa-bbbb-cccc-123456789abc:ops_probe:ops_inventory_probe", "context_key": "ops_probe", } ] body = posts[0]["json"] assert body["summary"] == "Ops inventory probe: 1 ok, 0 degraded, 0 down, 1 skipped" assert body["workstream_id"] == "workstream-1" assert body["task_id"] == "task-1" assert body["detail"]["activity_core_run_id"] == _run_id() assert body["detail"]["idempotency_key"] == result[0]["idempotency_key"] assert body["detail"]["probe"]["endpoints"][0]["url"] == "http://state-hub.test/health" serialized = json.dumps(body, sort_keys=True) assert "secret response body" not in serialized assert "Authorization" not in serialized assert "user:pass" not in serialized assert "token=secret" not in serialized def test_state_hub_progress_sink_is_idempotent(monkeypatch) -> None: idempotency_key = f"{_run_id()}:ops_probe:ops_inventory_probe" def fake_get(url: str, **kwargs: Any) -> DummyResponse: return DummyResponse([ { "event_type": "ops_inventory_probe", "detail": {"idempotency_key": idempotency_key}, } ]) def fake_post(url: str, **kwargs: Any) -> DummyResponse: raise AssertionError("post should not be called") monkeypatch.setattr(httpx, "get", fake_get) monkeypatch.setattr(httpx, "post", fake_post) result = persist_ops_inventory_evidence( _payload([ { "type": "state-hub-progress", "state_hub_url": "http://state-hub.test", } ]) ) assert result[0]["status"] == "exists" assert result[0]["idempotency_key"] == idempotency_key def test_inter_hub_sink_skips_cleanly_when_config_missing(monkeypatch) -> None: monkeypatch.delenv("INTER_HUB_URL", raising=False) monkeypatch.delenv("OPS_HUB_KEY", raising=False) result = persist_ops_inventory_evidence( _payload([{"type": "inter-hub-interaction-event"}]) ) assert result == [ { "type": "inter-hub-interaction-event", "status": "skipped", "reason": "missing_inter_hub_config", "missing": ["INTER_HUB_URL", "OPS_HUB_KEY", "widget_mapping"], } ] def test_inter_hub_sink_accepts_widget_mapping_from_env(monkeypatch) -> None: monkeypatch.delenv("INTER_HUB_URL", raising=False) monkeypatch.delenv("OPS_HUB_KEY", raising=False) monkeypatch.setenv("OPS_HUB_WIDGET_MAPPING", "ops:endpoint:gitea-registry") result = persist_ops_inventory_evidence( _payload([{"type": "inter-hub-interaction-event"}]) ) assert result == [ { "type": "inter-hub-interaction-event", "status": "skipped", "reason": "missing_inter_hub_config", "missing": ["INTER_HUB_URL", "OPS_HUB_KEY"], } ] def test_no_evidence_sinks_returns_no_results() -> None: payload = _payload([]) payload["context_sources"][0]["params"] = {} assert persist_ops_inventory_evidence(payload) == [] def _run_id() -> str: return "12345678-aaaa-bbbb-cccc-123456789abc"