Publish activity-core EventEnvelope payloads to NATS subject activity.kaizen.metrics.recorded after a successful append. Optional nats-py via kaizen-agentic[events]; project slug from KAIZEN_PROJECT_SLUG or directory basename. Skips emit on idempotency duplicates. Closes KAIZEN-WP-0008 T03.
96 lines
2.9 KiB
Python
96 lines
2.9 KiB
Python
"""NATS event emission for activity-core integration (Pattern 2)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Mapping, Optional
|
|
|
|
EVENT_TYPE_METRICS_RECORDED = "kaizen.metrics.recorded"
|
|
DEFAULT_NATS_URL = "nats://localhost:4222"
|
|
DEFAULT_PUBLISHER = "kaizen-agentic"
|
|
|
|
|
|
def resolve_project_slug(project_root: Path) -> str:
|
|
"""Return state-hub repo slug for a project checkout."""
|
|
override = os.environ.get("KAIZEN_PROJECT_SLUG", "").strip()
|
|
if override:
|
|
return override
|
|
return Path(project_root).resolve().name
|
|
|
|
|
|
def metrics_summary_for_event(summary: Mapping[str, Any]) -> Dict[str, Any]:
|
|
"""Map ADR-004 summary.json to the LOOP-WP-0002 event contract."""
|
|
return {
|
|
"success_rate": summary.get("success_rate", 0.0),
|
|
"execution_count": summary.get("execution_count", 0),
|
|
"avg_quality": summary.get(
|
|
"avg_quality",
|
|
summary.get("avg_quality_score", 0.0),
|
|
),
|
|
}
|
|
|
|
|
|
def build_metrics_recorded_envelope(
|
|
*,
|
|
agent: str,
|
|
project: str,
|
|
summary: Mapping[str, Any],
|
|
event_id: Optional[str] = None,
|
|
publisher: str = DEFAULT_PUBLISHER,
|
|
) -> Dict[str, Any]:
|
|
"""Build an activity-core EventEnvelope dict for kaizen.metrics.recorded."""
|
|
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
return {
|
|
"id": event_id or str(uuid.uuid4()),
|
|
"type": EVENT_TYPE_METRICS_RECORDED,
|
|
"version": "1.0",
|
|
"timestamp": timestamp,
|
|
"publisher": publisher,
|
|
"attributes": {
|
|
"agent": agent,
|
|
"project": project,
|
|
"summary": metrics_summary_for_event(summary),
|
|
},
|
|
}
|
|
|
|
|
|
def nats_subject_for_event(event_type: str) -> str:
|
|
"""Subject pattern used by activity-core webhook receiver and event router."""
|
|
return f"activity.{event_type}"
|
|
|
|
|
|
async def _publish_bytes(subject: str, payload: bytes, *, nats_url: str) -> None:
|
|
try:
|
|
import nats
|
|
except ImportError as exc:
|
|
raise RuntimeError(
|
|
"nats-py is required for --emit-event. "
|
|
"Install with: pip install 'kaizen-agentic[events]'"
|
|
) from exc
|
|
|
|
nc = await nats.connect(nats_url)
|
|
try:
|
|
await nc.publish(subject, payload)
|
|
await nc.flush()
|
|
finally:
|
|
await nc.close()
|
|
|
|
|
|
def publish_metrics_recorded_event(
|
|
envelope: Mapping[str, Any],
|
|
*,
|
|
nats_url: Optional[str] = None,
|
|
) -> str:
|
|
"""Publish envelope to NATS. Returns the subject used."""
|
|
url = (nats_url or os.environ.get("NATS_URL") or DEFAULT_NATS_URL).strip()
|
|
event_type = str(envelope.get("type", EVENT_TYPE_METRICS_RECORDED))
|
|
subject = nats_subject_for_event(event_type)
|
|
payload = json.dumps(envelope, sort_keys=True).encode("utf-8")
|
|
asyncio.run(_publish_bytes(subject, payload, nats_url=url))
|
|
return subject
|