generated from coulomb/repo-seed
215 lines
7.2 KiB
Python
215 lines
7.2 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
from activity_core.ops_evidence_sinks import persist_ops_inventory_evidence
|
|
|
|
|
|
class DummyResponse:
|
|
def __init__(self, payload: Any) -> None:
|
|
self.payload = payload
|
|
|
|
def raise_for_status(self) -> None:
|
|
return None
|
|
|
|
def json(self) -> Any:
|
|
return self.payload
|
|
|
|
|
|
def _payload(sinks: list[dict[str, Any]]) -> dict[str, Any]:
|
|
return {
|
|
"activity_id": "activity-1",
|
|
"run_id": "12345678-aaaa-bbbb-cccc-123456789abc",
|
|
"scheduled_for": "2026-06-05T10:15:00+00:00",
|
|
"version_used": 1,
|
|
"context_sources": [
|
|
{
|
|
"type": "ops-inventory",
|
|
"query": "probe_services",
|
|
"bind_to": "context.ops_probe",
|
|
"params": {"evidence_sinks": sinks},
|
|
}
|
|
],
|
|
"context": {
|
|
"ops_probe": {
|
|
"generated_at": "2026-06-05T10:15:01+00:00",
|
|
"inventory_path": "/tmp/service-inventory.yml",
|
|
"summary": {"ok": 1, "degraded": 0, "down": 0, "skipped": 1},
|
|
"services": [
|
|
{
|
|
"service_id": "state-hub",
|
|
"name": "State Hub",
|
|
"kind": "coordination-service",
|
|
"environment": "local",
|
|
"lifecycle_state": "observed",
|
|
"declared_health_status": "unknown",
|
|
"owner_repos": ["state-hub"],
|
|
"endpoint_count": 1,
|
|
"access_path_count": 1,
|
|
}
|
|
],
|
|
"endpoints": [
|
|
{
|
|
"service_id": "state-hub",
|
|
"service_name": "State Hub",
|
|
"endpoint_id": "state-hub-health",
|
|
"endpoint_type": "http",
|
|
"url": "http://user:pass@state-hub.test/health?token=secret",
|
|
"expected_status": 200,
|
|
"expected_signal_present": True,
|
|
"widget_ref": "ops:endpoint:state-hub-health",
|
|
"status": "ok",
|
|
"status_code": 200,
|
|
"matched_expected_status": True,
|
|
"matched_expected_signal": True,
|
|
"response_body": "secret response body",
|
|
"headers": {"Authorization": "Bearer secret"},
|
|
}
|
|
],
|
|
"access_paths": [
|
|
{
|
|
"service_id": "state-hub",
|
|
"service_name": "State Hub",
|
|
"access_path_id": "state-hub-access-1",
|
|
"access_path_type": "k8s",
|
|
"declared_status": "unknown",
|
|
"status": "skipped",
|
|
"reason": "unsupported_access_path_type",
|
|
}
|
|
],
|
|
}
|
|
},
|
|
}
|
|
|
|
|
|
def test_state_hub_progress_sink_posts_compact_probe_summary(monkeypatch) -> None:
|
|
posts: list[dict[str, Any]] = []
|
|
|
|
def fake_get(url: str, **kwargs: Any) -> DummyResponse:
|
|
assert url == "http://state-hub.test/progress/"
|
|
return DummyResponse([])
|
|
|
|
def fake_post(url: str, **kwargs: Any) -> DummyResponse:
|
|
posts.append({"url": url, **kwargs})
|
|
return DummyResponse({"id": "progress-1"})
|
|
|
|
monkeypatch.setattr(httpx, "get", fake_get)
|
|
monkeypatch.setattr(httpx, "post", fake_post)
|
|
|
|
result = persist_ops_inventory_evidence(
|
|
_payload([
|
|
{
|
|
"type": "state-hub-progress",
|
|
"state_hub_url": "http://state-hub.test",
|
|
"event_type": "ops_inventory_probe",
|
|
"workstream_id": "workstream-1",
|
|
"task_id": "task-1",
|
|
}
|
|
])
|
|
)
|
|
|
|
assert result == [
|
|
{
|
|
"type": "state-hub-progress",
|
|
"status": "posted",
|
|
"event_type": "ops_inventory_probe",
|
|
"progress_id": "progress-1",
|
|
"idempotency_key": "12345678-aaaa-bbbb-cccc-123456789abc:ops_probe:ops_inventory_probe",
|
|
"context_key": "ops_probe",
|
|
}
|
|
]
|
|
body = posts[0]["json"]
|
|
assert body["summary"] == "Ops inventory probe: 1 ok, 0 degraded, 0 down, 1 skipped"
|
|
assert body["workstream_id"] == "workstream-1"
|
|
assert body["task_id"] == "task-1"
|
|
assert body["detail"]["activity_core_run_id"] == _run_id()
|
|
assert body["detail"]["idempotency_key"] == result[0]["idempotency_key"]
|
|
assert body["detail"]["probe"]["endpoints"][0]["url"] == "http://state-hub.test/health"
|
|
|
|
serialized = json.dumps(body, sort_keys=True)
|
|
assert "secret response body" not in serialized
|
|
assert "Authorization" not in serialized
|
|
assert "user:pass" not in serialized
|
|
assert "token=secret" not in serialized
|
|
|
|
|
|
def test_state_hub_progress_sink_is_idempotent(monkeypatch) -> None:
|
|
idempotency_key = f"{_run_id()}:ops_probe:ops_inventory_probe"
|
|
|
|
def fake_get(url: str, **kwargs: Any) -> DummyResponse:
|
|
return DummyResponse([
|
|
{
|
|
"event_type": "ops_inventory_probe",
|
|
"detail": {"idempotency_key": idempotency_key},
|
|
}
|
|
])
|
|
|
|
def fake_post(url: str, **kwargs: Any) -> DummyResponse:
|
|
raise AssertionError("post should not be called")
|
|
|
|
monkeypatch.setattr(httpx, "get", fake_get)
|
|
monkeypatch.setattr(httpx, "post", fake_post)
|
|
|
|
result = persist_ops_inventory_evidence(
|
|
_payload([
|
|
{
|
|
"type": "state-hub-progress",
|
|
"state_hub_url": "http://state-hub.test",
|
|
}
|
|
])
|
|
)
|
|
|
|
assert result[0]["status"] == "exists"
|
|
assert result[0]["idempotency_key"] == idempotency_key
|
|
|
|
|
|
def test_inter_hub_sink_skips_cleanly_when_config_missing(monkeypatch) -> None:
|
|
monkeypatch.delenv("INTER_HUB_URL", raising=False)
|
|
monkeypatch.delenv("OPS_HUB_KEY", raising=False)
|
|
|
|
result = persist_ops_inventory_evidence(
|
|
_payload([{"type": "inter-hub-interaction-event"}])
|
|
)
|
|
|
|
assert result == [
|
|
{
|
|
"type": "inter-hub-interaction-event",
|
|
"status": "skipped",
|
|
"reason": "missing_inter_hub_config",
|
|
"missing": ["INTER_HUB_URL", "OPS_HUB_KEY", "widget_mapping"],
|
|
}
|
|
]
|
|
|
|
|
|
def test_inter_hub_sink_accepts_widget_mapping_from_env(monkeypatch) -> None:
|
|
monkeypatch.delenv("INTER_HUB_URL", raising=False)
|
|
monkeypatch.delenv("OPS_HUB_KEY", raising=False)
|
|
monkeypatch.setenv("OPS_HUB_WIDGET_MAPPING", "ops:endpoint:gitea-registry")
|
|
|
|
result = persist_ops_inventory_evidence(
|
|
_payload([{"type": "inter-hub-interaction-event"}])
|
|
)
|
|
|
|
assert result == [
|
|
{
|
|
"type": "inter-hub-interaction-event",
|
|
"status": "skipped",
|
|
"reason": "missing_inter_hub_config",
|
|
"missing": ["INTER_HUB_URL", "OPS_HUB_KEY"],
|
|
}
|
|
]
|
|
|
|
|
|
def test_no_evidence_sinks_returns_no_results() -> None:
|
|
payload = _payload([])
|
|
payload["context_sources"][0]["params"] = {}
|
|
|
|
assert persist_ops_inventory_evidence(payload) == []
|
|
|
|
|
|
def _run_id() -> str:
|
|
return "12345678-aaaa-bbbb-cccc-123456789abc"
|