"""Tests for kaizen.metrics.recorded event emission.""" from __future__ import annotations import json from pathlib import Path from unittest.mock import AsyncMock, patch import pytest from click.testing import CliRunner from kaizen_agentic.cli import cli from kaizen_agentic.integrations.event_bus import ( EVENT_TYPE_METRICS_RECORDED, build_metrics_recorded_envelope, metrics_summary_for_event, nats_subject_for_event, publish_metrics_recorded_event, resolve_project_slug, ) from kaizen_agentic.metrics import MetricsStore def test_metrics_summary_for_event_maps_avg_quality_score() -> None: summary = metrics_summary_for_event( { "success_rate": 0.75, "execution_count": 12, "avg_quality_score": 0.81, } ) assert summary == { "success_rate": 0.75, "execution_count": 12, "avg_quality": 0.81, } def test_build_metrics_recorded_envelope_shape() -> None: envelope = build_metrics_recorded_envelope( agent="coach", project="kaizen-agentic", summary={ "success_rate": 0.9, "execution_count": 5, "avg_quality_score": 0.85, }, event_id="test-event-id", ) assert envelope["id"] == "test-event-id" assert envelope["type"] == EVENT_TYPE_METRICS_RECORDED assert envelope["publisher"] == "kaizen-agentic" assert envelope["attributes"] == { "agent": "coach", "project": "kaizen-agentic", "summary": { "success_rate": 0.9, "execution_count": 5, "avg_quality": 0.85, }, } def test_nats_subject_for_event() -> None: assert nats_subject_for_event("kaizen.metrics.recorded") == ( "activity.kaizen.metrics.recorded" ) def test_resolve_project_slug_prefers_env( monkeypatch: pytest.MonkeyPatch, tmp_path: Path ) -> None: monkeypatch.setenv("KAIZEN_PROJECT_SLUG", "custom-slug") assert resolve_project_slug(tmp_path / "some-dir") == "custom-slug" def test_resolve_project_slug_falls_back_to_directory_name(tmp_path: Path) -> None: project = tmp_path / "kaizen-agentic" project.mkdir() assert resolve_project_slug(project) == "kaizen-agentic" def test_publish_metrics_recorded_event_uses_activity_subject( monkeypatch: pytest.MonkeyPatch, ) -> None: published: dict[str, object] = {} async def fake_publish(subject: str, payload: bytes, *, nats_url: str) -> None: published["subject"] = subject published["payload"] = payload published["url"] = nats_url monkeypatch.setattr( "kaizen_agentic.integrations.event_bus._publish_bytes", fake_publish, ) envelope = build_metrics_recorded_envelope( agent="coach", project="activity-core", summary={"success_rate": 1.0, "execution_count": 1, "avg_quality_score": 1.0}, event_id="evt-1", ) subject = publish_metrics_recorded_event( envelope, nats_url="nats://broker.test:4222" ) assert subject == "activity.kaizen.metrics.recorded" assert published["subject"] == "activity.kaizen.metrics.recorded" body = json.loads(published["payload"].decode()) assert body["attributes"]["project"] == "activity-core" def test_metrics_record_emit_event_after_append( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ) -> None: emitted: list[dict] = [] def capture(envelope, *, nats_url=None): emitted.append(dict(envelope)) return "activity.kaizen.metrics.recorded" monkeypatch.setattr( "kaizen_agentic.cli.publish_metrics_recorded_event", capture, ) runner = CliRunner() result = runner.invoke( cli, [ "metrics", "record", "coach", "--target", str(tmp_path), "--success", "--time", "120", "--quality", "0.9", "--emit-event", ], ) assert result.exit_code == 0, result.output assert "Recorded metrics for 'coach'" in result.output assert "Emitted kaizen.metrics.recorded" in result.output assert len(emitted) == 1 assert emitted[0]["attributes"]["agent"] == "coach" assert emitted[0]["attributes"]["project"] == tmp_path.name assert emitted[0]["attributes"]["summary"]["execution_count"] == 1 store = MetricsStore(tmp_path, "coach") assert store.read_summary() is not None def test_metrics_record_skips_emit_on_idempotency_duplicate( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: emitted: list[dict] = [] def capture(envelope, *, nats_url=None): emitted.append(dict(envelope)) return "activity.kaizen.metrics.recorded" monkeypatch.setattr( "kaizen_agentic.cli.publish_metrics_recorded_event", capture, ) runner = CliRunner() common = [ "metrics", "record", "coach", "--target", str(tmp_path), "--success", "--emit-event", "--idempotency-key", "session-1", ] assert runner.invoke(cli, common).exit_code == 0 assert runner.invoke(cli, common).exit_code == 0 assert len(emitted) == 1