feat: metrics record --emit-event for kaizen.metrics.recorded
Publish activity-core EventEnvelope payloads to NATS subject activity.kaizen.metrics.recorded after a successful append. Optional nats-py via kaizen-agentic[events]; project slug from KAIZEN_PROJECT_SLUG or directory basename. Skips emit on idempotency duplicates. Closes KAIZEN-WP-0008 T03.
This commit is contained in:
@@ -15,6 +15,11 @@ from .integrations.artifact_store import (
|
||||
default_api_url,
|
||||
publish_optimizer_evidence,
|
||||
)
|
||||
from .integrations.event_bus import (
|
||||
build_metrics_recorded_envelope,
|
||||
publish_metrics_recorded_event,
|
||||
resolve_project_slug,
|
||||
)
|
||||
from .integrations.helix import HelixCorrelationAdapter, enrich_helix_correlation
|
||||
from .metrics import MetricsStore, OptimizerStore, performance_summary_markdown
|
||||
from .optimization import OptimizationLoop, MIN_SAMPLES_FOR_RECOMMENDATIONS
|
||||
@@ -1069,6 +1074,11 @@ def metrics():
|
||||
@click.option(
|
||||
"--json", "json_input", is_flag=True, help="Read full record JSON from stdin"
|
||||
)
|
||||
@click.option(
|
||||
"--emit-event",
|
||||
is_flag=True,
|
||||
help="Publish kaizen.metrics.recorded to NATS (requires nats-py)",
|
||||
)
|
||||
def metrics_record(
|
||||
agent_name: str,
|
||||
target: str,
|
||||
@@ -1079,6 +1089,7 @@ def metrics_record(
|
||||
session_id: Optional[str],
|
||||
idempotency_key: Optional[str],
|
||||
json_input: bool,
|
||||
emit_event: bool,
|
||||
):
|
||||
"""Append one execution record for an agent."""
|
||||
store = MetricsStore(_project_root(target), agent_name)
|
||||
@@ -1109,6 +1120,21 @@ def metrics_record(
|
||||
|
||||
if store.append(payload, idempotency_key=idempotency_key):
|
||||
click.echo(f"Recorded metrics for '{agent_name}'")
|
||||
if emit_event:
|
||||
summary = store.read_summary() or store.write_summary()
|
||||
envelope = build_metrics_recorded_envelope(
|
||||
agent=agent_name,
|
||||
project=resolve_project_slug(store.project_root),
|
||||
summary=summary,
|
||||
)
|
||||
try:
|
||||
subject = publish_metrics_recorded_event(envelope)
|
||||
except RuntimeError as exc:
|
||||
click.echo(f"Error: {exc}", err=True)
|
||||
sys.exit(1)
|
||||
click.echo(
|
||||
f"Emitted kaizen.metrics.recorded for '{agent_name}' → {subject}"
|
||||
)
|
||||
else:
|
||||
click.echo(
|
||||
f"Skipped duplicate record for '{agent_name}' (idempotency key exists)"
|
||||
|
||||
@@ -1,10 +1,18 @@
|
||||
"""Ecosystem integration adapters (Helix Forge, artifact-store)."""
|
||||
"""Ecosystem integration adapters (Helix Forge, artifact-store, event bus)."""
|
||||
|
||||
from .artifact_store import publish_optimizer_evidence
|
||||
from .event_bus import (
|
||||
build_metrics_recorded_envelope,
|
||||
publish_metrics_recorded_event,
|
||||
resolve_project_slug,
|
||||
)
|
||||
from .helix import HelixCorrelationAdapter, enrich_helix_correlation
|
||||
|
||||
__all__ = [
|
||||
"HelixCorrelationAdapter",
|
||||
"build_metrics_recorded_envelope",
|
||||
"enrich_helix_correlation",
|
||||
"publish_metrics_recorded_event",
|
||||
"publish_optimizer_evidence",
|
||||
"resolve_project_slug",
|
||||
]
|
||||
|
||||
95
src/kaizen_agentic/integrations/event_bus.py
Normal file
95
src/kaizen_agentic/integrations/event_bus.py
Normal file
@@ -0,0 +1,95 @@
|
||||
"""NATS event emission for activity-core integration (Pattern 2)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Mapping, Optional
|
||||
|
||||
EVENT_TYPE_METRICS_RECORDED = "kaizen.metrics.recorded"
|
||||
DEFAULT_NATS_URL = "nats://localhost:4222"
|
||||
DEFAULT_PUBLISHER = "kaizen-agentic"
|
||||
|
||||
|
||||
def resolve_project_slug(project_root: Path) -> str:
|
||||
"""Return state-hub repo slug for a project checkout."""
|
||||
override = os.environ.get("KAIZEN_PROJECT_SLUG", "").strip()
|
||||
if override:
|
||||
return override
|
||||
return Path(project_root).resolve().name
|
||||
|
||||
|
||||
def metrics_summary_for_event(summary: Mapping[str, Any]) -> Dict[str, Any]:
|
||||
"""Map ADR-004 summary.json to the LOOP-WP-0002 event contract."""
|
||||
return {
|
||||
"success_rate": summary.get("success_rate", 0.0),
|
||||
"execution_count": summary.get("execution_count", 0),
|
||||
"avg_quality": summary.get(
|
||||
"avg_quality",
|
||||
summary.get("avg_quality_score", 0.0),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
def build_metrics_recorded_envelope(
|
||||
*,
|
||||
agent: str,
|
||||
project: str,
|
||||
summary: Mapping[str, Any],
|
||||
event_id: Optional[str] = None,
|
||||
publisher: str = DEFAULT_PUBLISHER,
|
||||
) -> Dict[str, Any]:
|
||||
"""Build an activity-core EventEnvelope dict for kaizen.metrics.recorded."""
|
||||
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
return {
|
||||
"id": event_id or str(uuid.uuid4()),
|
||||
"type": EVENT_TYPE_METRICS_RECORDED,
|
||||
"version": "1.0",
|
||||
"timestamp": timestamp,
|
||||
"publisher": publisher,
|
||||
"attributes": {
|
||||
"agent": agent,
|
||||
"project": project,
|
||||
"summary": metrics_summary_for_event(summary),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def nats_subject_for_event(event_type: str) -> str:
|
||||
"""Subject pattern used by activity-core webhook receiver and event router."""
|
||||
return f"activity.{event_type}"
|
||||
|
||||
|
||||
async def _publish_bytes(subject: str, payload: bytes, *, nats_url: str) -> None:
|
||||
try:
|
||||
import nats
|
||||
except ImportError as exc:
|
||||
raise RuntimeError(
|
||||
"nats-py is required for --emit-event. "
|
||||
"Install with: pip install 'kaizen-agentic[events]'"
|
||||
) from exc
|
||||
|
||||
nc = await nats.connect(nats_url)
|
||||
try:
|
||||
await nc.publish(subject, payload)
|
||||
await nc.flush()
|
||||
finally:
|
||||
await nc.close()
|
||||
|
||||
|
||||
def publish_metrics_recorded_event(
|
||||
envelope: Mapping[str, Any],
|
||||
*,
|
||||
nats_url: Optional[str] = None,
|
||||
) -> str:
|
||||
"""Publish envelope to NATS. Returns the subject used."""
|
||||
url = (nats_url or os.environ.get("NATS_URL") or DEFAULT_NATS_URL).strip()
|
||||
event_type = str(envelope.get("type", EVENT_TYPE_METRICS_RECORDED))
|
||||
subject = nats_subject_for_event(event_type)
|
||||
payload = json.dumps(envelope, sort_keys=True).encode("utf-8")
|
||||
asyncio.run(_publish_bytes(subject, payload, nats_url=url))
|
||||
return subject
|
||||
Reference in New Issue
Block a user