Makes the state hub an event publisher so activity-core can drive
maintenance automation declaratively via ActivityDefinitions, rather
than the hub creating tasks itself.
- api/events/: lazy JetStream publisher + EventEnvelope mirroring
activity-core's contract; no-op when NATS_URL unset, fire-and-forget
with logged failures so publishing never breaks an API request.
- Wired publishers on the five v1.0 lifecycle events:
org.statehub.repo.registered (POST /repos/)
org.statehub.workstream.completed (PATCH /workstreams/* on transition)
org.statehub.decision.resolved (POST /decisions/*/resolve)
org.statehub.domain.goal.activated (POST /domain-goals/*/activate)
org.statehub.task.stale (scripts/cleanup_stale_tasks.py)
- docs/nats-event-subjects.md: subject naming convention + catalog.
- docs/cron-migration.md: design stub for replacing custodian-sync
systemd timer and cleanup-stale cron with ActivityDefinitions
(depends on activity-core WP-0003).
- docs/activity-core-delegation.md: protocol, invariants, cutover plan.
- SCOPE.md: declares activity-core as downstream event consumer and
restates that the state hub stays a read model, not a task factory.
Workplan: workplans/CUST-WP-0040-state-hub-nats-activity-core-integration.md
242 tests pass.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
140 lines
5.0 KiB
Python
140 lines
5.0 KiB
Python
"""NATS JetStream publisher for state-hub lifecycle events.
|
|
|
|
Design:
|
|
- One process-wide publisher (`_Publisher` singleton).
|
|
- Connects lazily on first publish; reuses the connection thereafter.
|
|
- When ``NATS_URL`` is unset or empty, every publish is a logged no-op
|
|
so the state hub remains usable in environments without NATS.
|
|
- All publishes are fire-and-forget from the caller's perspective.
|
|
Failures are logged but never raise — losing a lifecycle event must
|
|
never break the API request that triggered it.
|
|
|
|
Stream + subject conventions live in ``docs/nats-event-subjects.md``.
|
|
Envelope schema lives in :mod:`api.events.envelope`.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
from typing import TYPE_CHECKING
|
|
|
|
from api.events.envelope import EventEnvelope
|
|
|
|
if TYPE_CHECKING: # pragma: no cover — import-only for typing
|
|
from nats.aio.client import Client as NATSClient
|
|
from nats.js.client import JetStreamContext
|
|
|
|
logger = logging.getLogger("state_hub.events.nats")
|
|
|
|
_STREAM_NAME = "ACTIVITY_EVENTS"
|
|
_STREAM_SUBJECT_PATTERN = "org.>"
|
|
|
|
|
|
def _nats_url() -> str | None:
|
|
"""Resolve NATS_URL at call time so tests / configs can override it."""
|
|
url = os.environ.get("NATS_URL", "").strip()
|
|
return url or None
|
|
|
|
|
|
class _Publisher:
|
|
"""Singleton holding the live NATS connection + JetStream context."""
|
|
|
|
def __init__(self) -> None:
|
|
self._nc: "NATSClient | None" = None
|
|
self._js: "JetStreamContext | None" = None
|
|
self._connect_lock = asyncio.Lock()
|
|
self._ensured_stream = False
|
|
|
|
async def _connect(self, url: str) -> None:
|
|
# Imported inside the method so module import works without the dep.
|
|
import nats
|
|
import nats.js.api
|
|
|
|
async with self._connect_lock:
|
|
if self._nc is not None and self._nc.is_connected:
|
|
return
|
|
self._nc = await nats.connect(url, connect_timeout=2)
|
|
self._js = self._nc.jetstream()
|
|
logger.info("nats: connected to %s", url)
|
|
|
|
if not self._ensured_stream:
|
|
try:
|
|
await self._js.find_stream_name_by_subject("org.statehub.repo.registered")
|
|
self._ensured_stream = True
|
|
except Exception:
|
|
try:
|
|
await self._js.add_stream(
|
|
nats.js.api.StreamConfig(
|
|
name=_STREAM_NAME,
|
|
subjects=[_STREAM_SUBJECT_PATTERN],
|
|
)
|
|
)
|
|
logger.info("nats: created JetStream stream %r", _STREAM_NAME)
|
|
except Exception as exc: # pragma: no cover — defensive
|
|
logger.warning("nats: could not ensure stream %r: %s", _STREAM_NAME, exc)
|
|
self._ensured_stream = True
|
|
|
|
async def publish(self, subject: str, envelope: EventEnvelope) -> None:
|
|
url = _nats_url()
|
|
if url is None:
|
|
logger.debug("nats: NATS_URL unset — skipping publish %s (id=%s)", subject, envelope.id)
|
|
return
|
|
|
|
try:
|
|
if self._nc is None or not self._nc.is_connected:
|
|
await self._connect(url)
|
|
assert self._js is not None
|
|
payload = envelope.model_dump_json().encode()
|
|
ack = await self._js.publish(subject, payload)
|
|
logger.info(
|
|
"nats: published %s id=%s stream=%s seq=%s",
|
|
subject,
|
|
envelope.id,
|
|
getattr(ack, "stream", "?"),
|
|
getattr(ack, "seq", "?"),
|
|
)
|
|
except Exception as exc:
|
|
logger.warning("nats: publish failed %s id=%s err=%s", subject, envelope.id, exc)
|
|
|
|
async def shutdown(self) -> None:
|
|
if self._nc is not None:
|
|
try:
|
|
await self._nc.drain()
|
|
except Exception: # pragma: no cover — defensive
|
|
pass
|
|
self._nc = None
|
|
self._js = None
|
|
self._ensured_stream = False
|
|
|
|
|
|
_PUBLISHER = _Publisher()
|
|
|
|
|
|
async def publish_event(subject: str, envelope: EventEnvelope) -> None:
|
|
"""Publish ``envelope`` on ``subject``. Logs but never raises on failure.
|
|
|
|
No-op when ``NATS_URL`` is not configured.
|
|
"""
|
|
await _PUBLISHER.publish(subject, envelope)
|
|
|
|
|
|
def publish_event_sync(subject: str, envelope: EventEnvelope) -> None:
|
|
"""Fire-and-forget variant for sync callers (scripts, cron jobs).
|
|
|
|
Runs the publish in a short-lived event loop. Intended for one-shot CLI
|
|
callers that aren't already inside an async context. Server code should
|
|
prefer :func:`publish_event` with ``asyncio.create_task``.
|
|
"""
|
|
try:
|
|
asyncio.run(publish_event(subject, envelope))
|
|
except RuntimeError:
|
|
# Already inside a running loop — schedule and forget.
|
|
asyncio.get_event_loop().create_task(publish_event(subject, envelope))
|
|
|
|
|
|
async def shutdown_publisher() -> None:
|
|
"""Drain the NATS connection on app shutdown."""
|
|
await _PUBLISHER.shutdown()
|