From 1641a3165d6f387569db30167ae5f725c60264d0 Mon Sep 17 00:00:00 2001 From: tegwick Date: Thu, 18 Jun 2026 08:53:36 +0200 Subject: [PATCH] feat: metrics record --emit-event for kaizen.metrics.recorded Publish activity-core EventEnvelope payloads to NATS subject activity.kaizen.metrics.recorded after a successful append. Optional nats-py via kaizen-agentic[events]; project slug from KAIZEN_PROJECT_SLUG or directory basename. Skips emit on idempotency duplicates. Closes KAIZEN-WP-0008 T03. --- CHANGELOG.md | 6 + docs/INTEGRATION_PATTERNS.md | 2 +- docs/TELEMETRY.md | 3 +- .../low-success-rate-review.md | 6 +- .../kaizen-metrics-recorded-event.md | 69 +++++++ pyproject.toml | 3 + src/kaizen_agentic/cli.py | 26 +++ src/kaizen_agentic/integrations/__init__.py | 10 +- src/kaizen_agentic/integrations/event_bus.py | 95 +++++++++ tests/test_metrics_emit_event.py | 188 ++++++++++++++++++ ...P-0008-coulomb-loop-supplier-engagement.md | 4 +- 11 files changed, 405 insertions(+), 7 deletions(-) create mode 100644 docs/integrations/kaizen-metrics-recorded-event.md create mode 100644 src/kaizen_agentic/integrations/event_bus.py create mode 100644 tests/test_metrics_emit_event.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 28ab856..2757c16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- **`metrics record --emit-event`** — publishes `kaizen.metrics.recorded` NATS + envelope for activity-core event-driven definitions (optional `nats-py` via + `pip install 'kaizen-agentic[events]'`) +- **Event contract** — `docs/integrations/kaizen-metrics-recorded-event.md` + ## [1.4.0] - 2026-06-18 ### Added diff --git a/docs/INTEGRATION_PATTERNS.md b/docs/INTEGRATION_PATTERNS.md index 5141d4a..9fa8a8c 100644 --- a/docs/INTEGRATION_PATTERNS.md +++ b/docs/INTEGRATION_PATTERNS.md @@ -37,7 +37,7 @@ invoke kaizen-agentic CLI commands. |------------|---------|-------------| | [weekly-metrics-optimize](integrations/activity-definitions/weekly-metrics-optimize.md) | Cron Mon 08:00 | `metrics optimize` | | [post-install-metrics-scaffold](integrations/activity-definitions/post-install-metrics-scaffold.md) | `kaizen.agent.installed` | `memory init` validation | -| [low-success-rate-review](integrations/activity-definitions/low-success-rate-review.md) | `kaizen.metrics.recorded` | `metrics show` + `optimize` | +| [low-success-rate-review](integrations/activity-definitions/low-success-rate-review.md) | `kaizen.metrics.recorded` | `metrics record --emit-event` | **Activation handoff (activity-core owners):** diff --git a/docs/TELEMETRY.md b/docs/TELEMETRY.md index 9312bb0..40406f6 100644 --- a/docs/TELEMETRY.md +++ b/docs/TELEMETRY.md @@ -17,6 +17,7 @@ Memory-enabled agents record per-session outcomes at close: ```bash kaizen-agentic metrics record --success --time --quality <0-1> +kaizen-agentic metrics record --success --time --quality <0-1> --emit-event kaizen-agentic metrics optimize [agent] kaizen-agentic memory brief # includes Performance Summary ``` @@ -45,4 +46,4 @@ fleet layers above satisfy INTENT's "measurable agents" requirement without tele ## Feedback loop User experience feedback uses [FEEDBACK.md](FEEDBACK.md) and Gitea issue templates — -separate from execution metrics. \ No newline at end of file +separate from execution metrics. diff --git a/docs/integrations/activity-definitions/low-success-rate-review.md b/docs/integrations/activity-definitions/low-success-rate-review.md index c85e7a8..d841d58 100644 --- a/docs/integrations/activity-definitions/low-success-rate-review.md +++ b/docs/integrations/activity-definitions/low-success-rate-review.md @@ -35,9 +35,11 @@ action: **Threshold:** 0.8 success rate, minimum 5 executions (avoids noise on early pilots). -**CLI mapping:** Event emitter is future work; manual check today: +**CLI mapping:** `kaizen-agentic metrics record --emit-event` after each +append (see [kaizen-metrics-recorded-event.md](../kaizen-metrics-recorded-event.md)). +Manual check today: ```bash kaizen-agentic metrics show # inspect summary.success_rate kaizen-agentic metrics optimize -``` \ No newline at end of file +``` diff --git a/docs/integrations/kaizen-metrics-recorded-event.md b/docs/integrations/kaizen-metrics-recorded-event.md new file mode 100644 index 0000000..8ce11f1 --- /dev/null +++ b/docs/integrations/kaizen-metrics-recorded-event.md @@ -0,0 +1,69 @@ +# Event Payload: `kaizen.metrics.recorded` + +**Status:** implemented — `kaizen-agentic metrics record --emit-event` + +Emitted after a successful metrics append when `--emit-event` is set. Default +off for backward compatibility. + +## Subject + +``` +activity.kaizen.metrics.recorded +``` + +Published to NATS (activity-core `EventEnvelope` format). Consumed by +activity-core event router and definitions such as `low-success-rate-review`. + +## Envelope + +```json +{ + "id": "550e8400-e29b-41d4-a716-446655440000", + "type": "kaizen.metrics.recorded", + "version": "1.0", + "timestamp": "2026-06-18T12:00:00Z", + "publisher": "kaizen-agentic", + "attributes": { + "agent": "coach", + "project": "kaizen-agentic", + "summary": { + "success_rate": 0.75, + "execution_count": 12, + "avg_quality": 0.81 + } + } +} +``` + +### Attribute fields + +| Field | Type | Notes | +|-------|------|-------| +| `agent` | string | Agent name from `metrics record ` | +| `project` | string | Repo slug — `KAIZEN_PROJECT_SLUG` env or directory basename | +| `summary.success_rate` | float | Rolling rate from `summary.json` after append | +| `summary.execution_count` | int | Total executions | +| `summary.avg_quality` | float | Maps from `avg_quality_score` in ADR-004 summary | + +## CLI + +```bash +kaizen-agentic metrics record coach --success --time 120 --quality 0.9 --emit-event +``` + +Requires `nats-py` (`pip install 'kaizen-agentic[events]'`). Configure broker via +`NATS_URL` (default `nats://localhost:4222`). + +Events are **not** emitted when append is skipped (duplicate idempotency key). + +## Consumers + +- **activity-core** — `trigger.type: event` with `event_type: kaizen.metrics.recorded` +- **coulomb-loop** — `low-success-rate-review` (LOOP-WP-0002); replaces hourly + health sweep when event path is stable + +## Related + +- [low-success-rate-review](activity-definitions/low-success-rate-review.md) +- [INTEGRATION_PATTERNS.md Pattern 2](../INTEGRATION_PATTERNS.md) +- coulomb-loop `loops/quality-escalation/event-payload.md` (customer contract) diff --git a/pyproject.toml b/pyproject.toml index 6cdb4f9..c1e1777 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,9 @@ dependencies = [ ] [project.optional-dependencies] +events = [ + "nats-py>=2.6.0", +] dev = [ "pytest>=6.0.0", "pytest-cov>=4.0.0", diff --git a/src/kaizen_agentic/cli.py b/src/kaizen_agentic/cli.py index f18f4d6..3eb2a71 100644 --- a/src/kaizen_agentic/cli.py +++ b/src/kaizen_agentic/cli.py @@ -15,6 +15,11 @@ from .integrations.artifact_store import ( default_api_url, publish_optimizer_evidence, ) +from .integrations.event_bus import ( + build_metrics_recorded_envelope, + publish_metrics_recorded_event, + resolve_project_slug, +) from .integrations.helix import HelixCorrelationAdapter, enrich_helix_correlation from .metrics import MetricsStore, OptimizerStore, performance_summary_markdown from .optimization import OptimizationLoop, MIN_SAMPLES_FOR_RECOMMENDATIONS @@ -1069,6 +1074,11 @@ def metrics(): @click.option( "--json", "json_input", is_flag=True, help="Read full record JSON from stdin" ) +@click.option( + "--emit-event", + is_flag=True, + help="Publish kaizen.metrics.recorded to NATS (requires nats-py)", +) def metrics_record( agent_name: str, target: str, @@ -1079,6 +1089,7 @@ def metrics_record( session_id: Optional[str], idempotency_key: Optional[str], json_input: bool, + emit_event: bool, ): """Append one execution record for an agent.""" store = MetricsStore(_project_root(target), agent_name) @@ -1109,6 +1120,21 @@ def metrics_record( if store.append(payload, idempotency_key=idempotency_key): click.echo(f"Recorded metrics for '{agent_name}'") + if emit_event: + summary = store.read_summary() or store.write_summary() + envelope = build_metrics_recorded_envelope( + agent=agent_name, + project=resolve_project_slug(store.project_root), + summary=summary, + ) + try: + subject = publish_metrics_recorded_event(envelope) + except RuntimeError as exc: + click.echo(f"Error: {exc}", err=True) + sys.exit(1) + click.echo( + f"Emitted kaizen.metrics.recorded for '{agent_name}' → {subject}" + ) else: click.echo( f"Skipped duplicate record for '{agent_name}' (idempotency key exists)" diff --git a/src/kaizen_agentic/integrations/__init__.py b/src/kaizen_agentic/integrations/__init__.py index b9c58aa..0146568 100644 --- a/src/kaizen_agentic/integrations/__init__.py +++ b/src/kaizen_agentic/integrations/__init__.py @@ -1,10 +1,18 @@ -"""Ecosystem integration adapters (Helix Forge, artifact-store).""" +"""Ecosystem integration adapters (Helix Forge, artifact-store, event bus).""" from .artifact_store import publish_optimizer_evidence +from .event_bus import ( + build_metrics_recorded_envelope, + publish_metrics_recorded_event, + resolve_project_slug, +) from .helix import HelixCorrelationAdapter, enrich_helix_correlation __all__ = [ "HelixCorrelationAdapter", + "build_metrics_recorded_envelope", "enrich_helix_correlation", + "publish_metrics_recorded_event", "publish_optimizer_evidence", + "resolve_project_slug", ] diff --git a/src/kaizen_agentic/integrations/event_bus.py b/src/kaizen_agentic/integrations/event_bus.py new file mode 100644 index 0000000..2be39a2 --- /dev/null +++ b/src/kaizen_agentic/integrations/event_bus.py @@ -0,0 +1,95 @@ +"""NATS event emission for activity-core integration (Pattern 2).""" + +from __future__ import annotations + +import asyncio +import json +import os +import uuid +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, Mapping, Optional + +EVENT_TYPE_METRICS_RECORDED = "kaizen.metrics.recorded" +DEFAULT_NATS_URL = "nats://localhost:4222" +DEFAULT_PUBLISHER = "kaizen-agentic" + + +def resolve_project_slug(project_root: Path) -> str: + """Return state-hub repo slug for a project checkout.""" + override = os.environ.get("KAIZEN_PROJECT_SLUG", "").strip() + if override: + return override + return Path(project_root).resolve().name + + +def metrics_summary_for_event(summary: Mapping[str, Any]) -> Dict[str, Any]: + """Map ADR-004 summary.json to the LOOP-WP-0002 event contract.""" + return { + "success_rate": summary.get("success_rate", 0.0), + "execution_count": summary.get("execution_count", 0), + "avg_quality": summary.get( + "avg_quality", + summary.get("avg_quality_score", 0.0), + ), + } + + +def build_metrics_recorded_envelope( + *, + agent: str, + project: str, + summary: Mapping[str, Any], + event_id: Optional[str] = None, + publisher: str = DEFAULT_PUBLISHER, +) -> Dict[str, Any]: + """Build an activity-core EventEnvelope dict for kaizen.metrics.recorded.""" + timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + return { + "id": event_id or str(uuid.uuid4()), + "type": EVENT_TYPE_METRICS_RECORDED, + "version": "1.0", + "timestamp": timestamp, + "publisher": publisher, + "attributes": { + "agent": agent, + "project": project, + "summary": metrics_summary_for_event(summary), + }, + } + + +def nats_subject_for_event(event_type: str) -> str: + """Subject pattern used by activity-core webhook receiver and event router.""" + return f"activity.{event_type}" + + +async def _publish_bytes(subject: str, payload: bytes, *, nats_url: str) -> None: + try: + import nats + except ImportError as exc: + raise RuntimeError( + "nats-py is required for --emit-event. " + "Install with: pip install 'kaizen-agentic[events]'" + ) from exc + + nc = await nats.connect(nats_url) + try: + await nc.publish(subject, payload) + await nc.flush() + finally: + await nc.close() + + +def publish_metrics_recorded_event( + envelope: Mapping[str, Any], + *, + nats_url: Optional[str] = None, +) -> str: + """Publish envelope to NATS. Returns the subject used.""" + url = (nats_url or os.environ.get("NATS_URL") or DEFAULT_NATS_URL).strip() + event_type = str(envelope.get("type", EVENT_TYPE_METRICS_RECORDED)) + subject = nats_subject_for_event(event_type) + payload = json.dumps(envelope, sort_keys=True).encode("utf-8") + asyncio.run(_publish_bytes(subject, payload, nats_url=url)) + return subject diff --git a/tests/test_metrics_emit_event.py b/tests/test_metrics_emit_event.py new file mode 100644 index 0000000..141899e --- /dev/null +++ b/tests/test_metrics_emit_event.py @@ -0,0 +1,188 @@ +"""Tests for kaizen.metrics.recorded event emission.""" + +from __future__ import annotations + +import json +from pathlib import Path +from unittest.mock import AsyncMock, patch + +import pytest +from click.testing import CliRunner + +from kaizen_agentic.cli import cli +from kaizen_agentic.integrations.event_bus import ( + EVENT_TYPE_METRICS_RECORDED, + build_metrics_recorded_envelope, + metrics_summary_for_event, + nats_subject_for_event, + publish_metrics_recorded_event, + resolve_project_slug, +) +from kaizen_agentic.metrics import MetricsStore + + +def test_metrics_summary_for_event_maps_avg_quality_score() -> None: + summary = metrics_summary_for_event( + { + "success_rate": 0.75, + "execution_count": 12, + "avg_quality_score": 0.81, + } + ) + assert summary == { + "success_rate": 0.75, + "execution_count": 12, + "avg_quality": 0.81, + } + + +def test_build_metrics_recorded_envelope_shape() -> None: + envelope = build_metrics_recorded_envelope( + agent="coach", + project="kaizen-agentic", + summary={ + "success_rate": 0.9, + "execution_count": 5, + "avg_quality_score": 0.85, + }, + event_id="test-event-id", + ) + + assert envelope["id"] == "test-event-id" + assert envelope["type"] == EVENT_TYPE_METRICS_RECORDED + assert envelope["publisher"] == "kaizen-agentic" + assert envelope["attributes"] == { + "agent": "coach", + "project": "kaizen-agentic", + "summary": { + "success_rate": 0.9, + "execution_count": 5, + "avg_quality": 0.85, + }, + } + + +def test_nats_subject_for_event() -> None: + assert nats_subject_for_event("kaizen.metrics.recorded") == ( + "activity.kaizen.metrics.recorded" + ) + + +def test_resolve_project_slug_prefers_env( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.setenv("KAIZEN_PROJECT_SLUG", "custom-slug") + assert resolve_project_slug(tmp_path / "some-dir") == "custom-slug" + + +def test_resolve_project_slug_falls_back_to_directory_name(tmp_path: Path) -> None: + project = tmp_path / "kaizen-agentic" + project.mkdir() + assert resolve_project_slug(project) == "kaizen-agentic" + + +def test_publish_metrics_recorded_event_uses_activity_subject( + monkeypatch: pytest.MonkeyPatch, +) -> None: + published: dict[str, object] = {} + + async def fake_publish(subject: str, payload: bytes, *, nats_url: str) -> None: + published["subject"] = subject + published["payload"] = payload + published["url"] = nats_url + + monkeypatch.setattr( + "kaizen_agentic.integrations.event_bus._publish_bytes", + fake_publish, + ) + + envelope = build_metrics_recorded_envelope( + agent="coach", + project="activity-core", + summary={"success_rate": 1.0, "execution_count": 1, "avg_quality_score": 1.0}, + event_id="evt-1", + ) + subject = publish_metrics_recorded_event( + envelope, nats_url="nats://broker.test:4222" + ) + + assert subject == "activity.kaizen.metrics.recorded" + assert published["subject"] == "activity.kaizen.metrics.recorded" + body = json.loads(published["payload"].decode()) + assert body["attributes"]["project"] == "activity-core" + + +def test_metrics_record_emit_event_after_append( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + emitted: list[dict] = [] + + def capture(envelope, *, nats_url=None): + emitted.append(dict(envelope)) + return "activity.kaizen.metrics.recorded" + + monkeypatch.setattr( + "kaizen_agentic.cli.publish_metrics_recorded_event", + capture, + ) + + runner = CliRunner() + result = runner.invoke( + cli, + [ + "metrics", + "record", + "coach", + "--target", + str(tmp_path), + "--success", + "--time", + "120", + "--quality", + "0.9", + "--emit-event", + ], + ) + + assert result.exit_code == 0, result.output + assert "Recorded metrics for 'coach'" in result.output + assert "Emitted kaizen.metrics.recorded" in result.output + assert len(emitted) == 1 + assert emitted[0]["attributes"]["agent"] == "coach" + assert emitted[0]["attributes"]["project"] == tmp_path.name + assert emitted[0]["attributes"]["summary"]["execution_count"] == 1 + + store = MetricsStore(tmp_path, "coach") + assert store.read_summary() is not None + + +def test_metrics_record_skips_emit_on_idempotency_duplicate( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + emitted: list[dict] = [] + + def capture(envelope, *, nats_url=None): + emitted.append(dict(envelope)) + return "activity.kaizen.metrics.recorded" + + monkeypatch.setattr( + "kaizen_agentic.cli.publish_metrics_recorded_event", + capture, + ) + + runner = CliRunner() + common = [ + "metrics", + "record", + "coach", + "--target", + str(tmp_path), + "--success", + "--emit-event", + "--idempotency-key", + "session-1", + ] + assert runner.invoke(cli, common).exit_code == 0 + assert runner.invoke(cli, common).exit_code == 0 + assert len(emitted) == 1 diff --git a/workplans/kaizen-agentic-WP-0008-coulomb-loop-supplier-engagement.md b/workplans/kaizen-agentic-WP-0008-coulomb-loop-supplier-engagement.md index 9770d7c..3942244 100644 --- a/workplans/kaizen-agentic-WP-0008-coulomb-loop-supplier-engagement.md +++ b/workplans/kaizen-agentic-WP-0008-coulomb-loop-supplier-engagement.md @@ -21,7 +21,7 @@ tasks: status: todo title: Add docs/integrations/customer-engagement-playbook.md skeleton - id: T03 - status: todo + status: done title: Implement metrics record --emit-event for kaizen.metrics.recorded - id: T04 status: todo @@ -170,7 +170,7 @@ Record friction in coulomb-loop `loops/kaizen-stack/supplier-notes.md`. ```task id: KAIZEN-WP-0008-T03 -status: todo +status: done priority: medium state_hub_task_id: "26ee0f8d-2b69-4796-b276-b76238d67546" ```