Files
kaizen-agentic/tests/test_metrics_emit_event.py
tegwick 1641a3165d feat: metrics record --emit-event for kaizen.metrics.recorded
Publish activity-core EventEnvelope payloads to NATS subject
activity.kaizen.metrics.recorded after a successful append.
Optional nats-py via kaizen-agentic[events]; project slug from
KAIZEN_PROJECT_SLUG or directory basename. Skips emit on
idempotency duplicates. Closes KAIZEN-WP-0008 T03.
2026-06-18 08:53:36 +02:00

189 lines
5.2 KiB
Python

"""Tests for kaizen.metrics.recorded event emission."""
from __future__ import annotations
import json
from pathlib import Path
from unittest.mock import AsyncMock, patch
import pytest
from click.testing import CliRunner
from kaizen_agentic.cli import cli
from kaizen_agentic.integrations.event_bus import (
EVENT_TYPE_METRICS_RECORDED,
build_metrics_recorded_envelope,
metrics_summary_for_event,
nats_subject_for_event,
publish_metrics_recorded_event,
resolve_project_slug,
)
from kaizen_agentic.metrics import MetricsStore
def test_metrics_summary_for_event_maps_avg_quality_score() -> None:
summary = metrics_summary_for_event(
{
"success_rate": 0.75,
"execution_count": 12,
"avg_quality_score": 0.81,
}
)
assert summary == {
"success_rate": 0.75,
"execution_count": 12,
"avg_quality": 0.81,
}
def test_build_metrics_recorded_envelope_shape() -> None:
envelope = build_metrics_recorded_envelope(
agent="coach",
project="kaizen-agentic",
summary={
"success_rate": 0.9,
"execution_count": 5,
"avg_quality_score": 0.85,
},
event_id="test-event-id",
)
assert envelope["id"] == "test-event-id"
assert envelope["type"] == EVENT_TYPE_METRICS_RECORDED
assert envelope["publisher"] == "kaizen-agentic"
assert envelope["attributes"] == {
"agent": "coach",
"project": "kaizen-agentic",
"summary": {
"success_rate": 0.9,
"execution_count": 5,
"avg_quality": 0.85,
},
}
def test_nats_subject_for_event() -> None:
assert nats_subject_for_event("kaizen.metrics.recorded") == (
"activity.kaizen.metrics.recorded"
)
def test_resolve_project_slug_prefers_env(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
monkeypatch.setenv("KAIZEN_PROJECT_SLUG", "custom-slug")
assert resolve_project_slug(tmp_path / "some-dir") == "custom-slug"
def test_resolve_project_slug_falls_back_to_directory_name(tmp_path: Path) -> None:
project = tmp_path / "kaizen-agentic"
project.mkdir()
assert resolve_project_slug(project) == "kaizen-agentic"
def test_publish_metrics_recorded_event_uses_activity_subject(
monkeypatch: pytest.MonkeyPatch,
) -> None:
published: dict[str, object] = {}
async def fake_publish(subject: str, payload: bytes, *, nats_url: str) -> None:
published["subject"] = subject
published["payload"] = payload
published["url"] = nats_url
monkeypatch.setattr(
"kaizen_agentic.integrations.event_bus._publish_bytes",
fake_publish,
)
envelope = build_metrics_recorded_envelope(
agent="coach",
project="activity-core",
summary={"success_rate": 1.0, "execution_count": 1, "avg_quality_score": 1.0},
event_id="evt-1",
)
subject = publish_metrics_recorded_event(
envelope, nats_url="nats://broker.test:4222"
)
assert subject == "activity.kaizen.metrics.recorded"
assert published["subject"] == "activity.kaizen.metrics.recorded"
body = json.loads(published["payload"].decode())
assert body["attributes"]["project"] == "activity-core"
def test_metrics_record_emit_event_after_append(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
emitted: list[dict] = []
def capture(envelope, *, nats_url=None):
emitted.append(dict(envelope))
return "activity.kaizen.metrics.recorded"
monkeypatch.setattr(
"kaizen_agentic.cli.publish_metrics_recorded_event",
capture,
)
runner = CliRunner()
result = runner.invoke(
cli,
[
"metrics",
"record",
"coach",
"--target",
str(tmp_path),
"--success",
"--time",
"120",
"--quality",
"0.9",
"--emit-event",
],
)
assert result.exit_code == 0, result.output
assert "Recorded metrics for 'coach'" in result.output
assert "Emitted kaizen.metrics.recorded" in result.output
assert len(emitted) == 1
assert emitted[0]["attributes"]["agent"] == "coach"
assert emitted[0]["attributes"]["project"] == tmp_path.name
assert emitted[0]["attributes"]["summary"]["execution_count"] == 1
store = MetricsStore(tmp_path, "coach")
assert store.read_summary() is not None
def test_metrics_record_skips_emit_on_idempotency_duplicate(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
emitted: list[dict] = []
def capture(envelope, *, nats_url=None):
emitted.append(dict(envelope))
return "activity.kaizen.metrics.recorded"
monkeypatch.setattr(
"kaizen_agentic.cli.publish_metrics_recorded_event",
capture,
)
runner = CliRunner()
common = [
"metrics",
"record",
"coach",
"--target",
str(tmp_path),
"--success",
"--emit-event",
"--idempotency-key",
"session-1",
]
assert runner.invoke(cli, common).exit_code == 0
assert runner.invoke(cli, common).exit_code == 0
assert len(emitted) == 1