"""NATS JetStream publisher for state-hub lifecycle events. Design: - One process-wide publisher (`_Publisher` singleton). - Connects lazily on first publish; reuses the connection thereafter. - When ``NATS_URL`` is unset or empty, every publish is a logged no-op so the state hub remains usable in environments without NATS. - All publishes are fire-and-forget from the caller's perspective. Failures are logged but never raise — losing a lifecycle event must never break the API request that triggered it. Stream + subject conventions live in ``docs/nats-event-subjects.md``. Envelope schema lives in :mod:`api.events.envelope`. """ from __future__ import annotations import asyncio import logging import os from typing import TYPE_CHECKING from api.events.envelope import EventEnvelope if TYPE_CHECKING: # pragma: no cover — import-only for typing from nats.aio.client import Client as NATSClient from nats.js.client import JetStreamContext logger = logging.getLogger("state_hub.events.nats") _STREAM_NAME = "ACTIVITY_EVENTS" _STREAM_SUBJECT_PATTERN = "org.>" def _nats_url() -> str | None: """Resolve NATS_URL at call time so tests / configs can override it.""" url = os.environ.get("NATS_URL", "").strip() return url or None class _Publisher: """Singleton holding the live NATS connection + JetStream context.""" def __init__(self) -> None: self._nc: "NATSClient | None" = None self._js: "JetStreamContext | None" = None self._connect_lock = asyncio.Lock() self._ensured_stream = False async def _connect(self, url: str) -> None: # Imported inside the method so module import works without the dep. import nats import nats.js.api async with self._connect_lock: if self._nc is not None and self._nc.is_connected: return self._nc = await nats.connect(url, connect_timeout=2) self._js = self._nc.jetstream() logger.info("nats: connected to %s", url) if not self._ensured_stream: try: await self._js.find_stream_name_by_subject("org.statehub.repo.registered") self._ensured_stream = True except Exception: try: await self._js.add_stream( nats.js.api.StreamConfig( name=_STREAM_NAME, subjects=[_STREAM_SUBJECT_PATTERN], ) ) logger.info("nats: created JetStream stream %r", _STREAM_NAME) except Exception as exc: # pragma: no cover — defensive logger.warning("nats: could not ensure stream %r: %s", _STREAM_NAME, exc) self._ensured_stream = True async def publish(self, subject: str, envelope: EventEnvelope) -> None: url = _nats_url() if url is None: logger.debug("nats: NATS_URL unset — skipping publish %s (id=%s)", subject, envelope.id) return try: if self._nc is None or not self._nc.is_connected: await self._connect(url) assert self._js is not None payload = envelope.model_dump_json().encode() ack = await self._js.publish(subject, payload) logger.info( "nats: published %s id=%s stream=%s seq=%s", subject, envelope.id, getattr(ack, "stream", "?"), getattr(ack, "seq", "?"), ) except Exception as exc: logger.warning("nats: publish failed %s id=%s err=%s", subject, envelope.id, exc) async def shutdown(self) -> None: if self._nc is not None: try: await self._nc.drain() except Exception: # pragma: no cover — defensive pass self._nc = None self._js = None self._ensured_stream = False _PUBLISHER = _Publisher() async def publish_event(subject: str, envelope: EventEnvelope) -> None: """Publish ``envelope`` on ``subject``. Logs but never raises on failure. No-op when ``NATS_URL`` is not configured. """ await _PUBLISHER.publish(subject, envelope) def publish_event_sync(subject: str, envelope: EventEnvelope) -> None: """Fire-and-forget variant for sync callers (scripts, cron jobs). Runs the publish in a short-lived event loop. Intended for one-shot CLI callers that aren't already inside an async context. Server code should prefer :func:`publish_event` with ``asyncio.create_task``. """ try: asyncio.run(publish_event(subject, envelope)) except RuntimeError: # Already inside a running loop — schedule and forget. asyncio.get_event_loop().create_task(publish_event(subject, envelope)) async def shutdown_publisher() -> None: """Drain the NATS connection on app shutdown.""" await _PUBLISHER.shutdown()