Files
activity-core/tests/test_report_sinks.py
2026-05-19 18:36:58 +02:00

139 lines
3.9 KiB
Python

from __future__ import annotations
from typing import Any
import httpx
import pytest
from activity_core.report_sinks import persist_reports
class DummyResponse:
def __init__(self, payload: Any) -> None:
self.payload = payload
def raise_for_status(self) -> None:
return None
def json(self) -> Any:
return self.payload
def _payload(sinks: list[dict[str, Any]]) -> dict[str, Any]:
return {
"activity_id": "activity-1",
"run_id": "12345678-aaaa-bbbb-cccc-123456789abc",
"scheduled_for": "2026-05-19T05:20:00+00:00",
"reports": [
{
"instruction_id": "daily-triage-report",
"report": {
"summary": "State Hub has loose ends.",
"recommendations": [{"candidate": "CUST-WP-0045"}],
},
"sinks": sinks,
}
],
}
def test_working_memory_sink_writes_idempotently(tmp_path) -> None:
payload = _payload([
{
"type": "working-memory",
"path": str(tmp_path),
"timezone": "Europe/Berlin",
}
])
first = persist_reports(payload)
second = persist_reports(payload)
assert first[0]["status"] == "written"
assert second[0]["status"] == "exists"
note = tmp_path / "daily-triage-2026-05-19-12345678.md"
text = note.read_text(encoding="utf-8")
assert "activity_core_run_id: 12345678-aaaa-bbbb-cccc-123456789abc" in text
assert "State Hub has loose ends." in text
def test_working_memory_sink_refuses_canonical_custodian_path() -> None:
payload = _payload([
{
"type": "working-memory",
"path": "/home/worsch/the-custodian/workplans",
}
])
with pytest.raises(RuntimeError, match="refusing to write report"):
persist_reports(payload)
def test_state_hub_progress_sink_posts(monkeypatch) -> None:
posts: list[dict[str, Any]] = []
def fake_get(url: str, **kwargs: Any) -> DummyResponse:
assert url == "http://state-hub.test/progress/"
return DummyResponse([])
def fake_post(url: str, **kwargs: Any) -> DummyResponse:
posts.append({"url": url, **kwargs})
return DummyResponse({"id": "progress-1"})
monkeypatch.setattr(httpx, "get", fake_get)
monkeypatch.setattr(httpx, "post", fake_post)
result = persist_reports(_payload([
{
"type": "state-hub-progress",
"state_hub_url": "http://state-hub.test",
"event_type": "daily_triage",
"workstream_id": "workstream-1",
}
]))
assert result == [
{
"type": "state-hub-progress",
"status": "posted",
"event_type": "daily_triage",
"progress_id": "progress-1",
}
]
assert posts[0]["url"] == "http://state-hub.test/progress/"
assert posts[0]["json"]["workstream_id"] == "workstream-1"
assert posts[0]["json"]["detail"]["activity_core_run_id"] == payload_run_id()
def test_state_hub_progress_sink_is_idempotent(monkeypatch) -> None:
def fake_get(url: str, **kwargs: Any) -> DummyResponse:
return DummyResponse([
{
"event_type": "daily_triage",
"detail": {
"activity_core_run_id": payload_run_id(),
"instruction_id": "daily-triage-report",
},
}
])
def fake_post(url: str, **kwargs: Any) -> DummyResponse:
raise AssertionError("post should not be called")
monkeypatch.setattr(httpx, "get", fake_get)
monkeypatch.setattr(httpx, "post", fake_post)
result = persist_reports(_payload([
{
"type": "state-hub-progress",
"state_hub_url": "http://state-hub.test",
"event_type": "daily_triage",
}
]))
assert result[0]["status"] == "exists"
def payload_run_id() -> str:
return "12345678-aaaa-bbbb-cccc-123456789abc"