feat(state-hub): CUST-WP-0040 — NATS lifecycle event publishing for activity-core

Makes the state hub an event publisher so activity-core can drive
maintenance automation declaratively via ActivityDefinitions, rather
than the hub creating tasks itself.

- api/events/: lazy JetStream publisher + EventEnvelope mirroring
  activity-core's contract; no-op when NATS_URL unset, fire-and-forget
  with logged failures so publishing never breaks an API request.
- Wired publishers on the five v1.0 lifecycle events:
    org.statehub.repo.registered        (POST /repos/)
    org.statehub.workstream.completed   (PATCH /workstreams/* on transition)
    org.statehub.decision.resolved      (POST /decisions/*/resolve)
    org.statehub.domain.goal.activated  (POST /domain-goals/*/activate)
    org.statehub.task.stale             (scripts/cleanup_stale_tasks.py)
- docs/nats-event-subjects.md: subject naming convention + catalog.
- docs/cron-migration.md: design stub for replacing custodian-sync
  systemd timer and cleanup-stale cron with ActivityDefinitions
  (depends on activity-core WP-0003).
- docs/activity-core-delegation.md: protocol, invariants, cutover plan.
- SCOPE.md: declares activity-core as downstream event consumer and
  restates that the state hub stays a read model, not a task factory.

Workplan: workplans/CUST-WP-0040-state-hub-nats-activity-core-integration.md
242 tests pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-17 05:49:29 +02:00
parent 2bc7fd8ce7
commit ca8a09ed04
16 changed files with 770 additions and 9 deletions

View File

@@ -26,6 +26,7 @@ The Custodian is both an **operational system** (State Hub: PostgreSQL + FastAPI
- Memory: append-only episodic archive (working notes + immutable event logs) - Memory: append-only episodic archive (working notes + immutable event logs)
- Agent runtime scaffolding: policies, kaizen agent copies, tool adapters - Agent runtime scaffolding: policies, kaizen agent copies, tool adapters
- Cross-domain coordination: dependency tracking, human-intervention flags, next-steps suggestions - Cross-domain coordination: dependency tracking, human-intervention flags, next-steps suggestions
- Publishing lifecycle events on NATS JetStream (`org.statehub.>`) so activity-core can react via declarative ActivityDefinitions
--- ---
@@ -35,6 +36,7 @@ The Custodian is both an **operational system** (State Hub: PostgreSQL + FastAPI
- Financial/legal transactions or external publication - Financial/legal transactions or external publication
- Storing plaintext credentials - Storing plaintext credentials
- Direct writes to `canon/` without a human-approved review gate - Direct writes to `canon/` without a human-approved review gate
- Maintenance task *creation* in response to lifecycle events — that responsibility lives in activity-core (see `state-hub/docs/activity-core-delegation.md`). The state hub remains a **read model**, not a task factory.
--- ---
@@ -68,8 +70,8 @@ The Custodian is both an **operational system** (State Hub: PostgreSQL + FastAPI
## How It Fits ## How It Fits
- Upstream dependencies: none (sits at the top of the dependency order) - Upstream dependencies: none (sits at the top of the dependency order)
- Downstream consumers: all six domains (railiance → markitect → coulomb.social → personhood/foerster → custodian) - Downstream consumers: all six domains (railiance → markitect → coulomb.social → personhood/foerster → custodian); **activity-core** consumes state hub lifecycle events on NATS subject `org.statehub.>` to drive maintenance ActivityDefinitions
- Often used with: kaizen-agentic (agent definitions), ops-bridge (remote tunnel connectivity), activity-core (task factory) - Often used with: kaizen-agentic (agent definitions), ops-bridge (remote tunnel connectivity), activity-core (task factory + event bridge)
--- ---

View File

@@ -0,0 +1,13 @@
from api.events.envelope import EventEnvelope
from api.events.nats_publisher import (
publish_event,
publish_event_sync,
shutdown_publisher,
)
__all__ = [
"EventEnvelope",
"publish_event",
"publish_event_sync",
"shutdown_publisher",
]

View File

@@ -0,0 +1,55 @@
"""EventEnvelope — schema for state-hub lifecycle events published to NATS.
Mirrors the EventEnvelope contract defined in activity-core
(`src/activity_core/models.py`). The state-hub publishes; activity-core
consumes and routes to ActivityDefinitions.
Subject naming convention (see docs/nats-event-subjects.md):
org.statehub.{noun}.{verb}
Examples:
org.statehub.repo.registered
org.statehub.workstream.completed
org.statehub.decision.resolved
org.statehub.domain.goal.activated
org.statehub.task.stale
"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from typing import Any
from pydantic import BaseModel, Field
PUBLISHER = "the-custodian/state-hub"
class EventEnvelope(BaseModel):
"""Standard envelope shared with activity-core. Do not break compatibility.
All inbound events on activity-core's side are normalised into this shape.
"""
id: str = Field(description="UUID v4 — stable unique ID for deduplication.")
type: str = Field(description="Dot-namespaced event type, e.g. 'org.statehub.repo.registered'.")
version: str = Field(default="1.0", description="Schema version string.")
timestamp: datetime = Field(description="When the event occurred (UTC).")
publisher: str = Field(default=PUBLISHER, description="Originating service.")
attributes: dict[str, Any] = Field(
default_factory=dict,
description="Event-specific attributes; structure varies by event type.",
)
@classmethod
def new(cls, event_type: str, attributes: dict[str, Any] | None = None) -> "EventEnvelope":
"""Construct an envelope with a fresh UUID and current UTC timestamp."""
return cls(
id=str(uuid.uuid4()),
type=event_type,
timestamp=datetime.now(tz=timezone.utc),
publisher=PUBLISHER,
attributes=attributes or {},
)

View File

@@ -0,0 +1,139 @@
"""NATS JetStream publisher for state-hub lifecycle events.
Design:
- One process-wide publisher (`_Publisher` singleton).
- Connects lazily on first publish; reuses the connection thereafter.
- When ``NATS_URL`` is unset or empty, every publish is a logged no-op
so the state hub remains usable in environments without NATS.
- All publishes are fire-and-forget from the caller's perspective.
Failures are logged but never raise — losing a lifecycle event must
never break the API request that triggered it.
Stream + subject conventions live in ``docs/nats-event-subjects.md``.
Envelope schema lives in :mod:`api.events.envelope`.
"""
from __future__ import annotations
import asyncio
import logging
import os
from typing import TYPE_CHECKING
from api.events.envelope import EventEnvelope
if TYPE_CHECKING: # pragma: no cover — import-only for typing
from nats.aio.client import Client as NATSClient
from nats.js.client import JetStreamContext
logger = logging.getLogger("state_hub.events.nats")
_STREAM_NAME = "ACTIVITY_EVENTS"
_STREAM_SUBJECT_PATTERN = "org.>"
def _nats_url() -> str | None:
"""Resolve NATS_URL at call time so tests / configs can override it."""
url = os.environ.get("NATS_URL", "").strip()
return url or None
class _Publisher:
"""Singleton holding the live NATS connection + JetStream context."""
def __init__(self) -> None:
self._nc: "NATSClient | None" = None
self._js: "JetStreamContext | None" = None
self._connect_lock = asyncio.Lock()
self._ensured_stream = False
async def _connect(self, url: str) -> None:
# Imported inside the method so module import works without the dep.
import nats
import nats.js.api
async with self._connect_lock:
if self._nc is not None and self._nc.is_connected:
return
self._nc = await nats.connect(url, connect_timeout=2)
self._js = self._nc.jetstream()
logger.info("nats: connected to %s", url)
if not self._ensured_stream:
try:
await self._js.find_stream_name_by_subject("org.statehub.repo.registered")
self._ensured_stream = True
except Exception:
try:
await self._js.add_stream(
nats.js.api.StreamConfig(
name=_STREAM_NAME,
subjects=[_STREAM_SUBJECT_PATTERN],
)
)
logger.info("nats: created JetStream stream %r", _STREAM_NAME)
except Exception as exc: # pragma: no cover — defensive
logger.warning("nats: could not ensure stream %r: %s", _STREAM_NAME, exc)
self._ensured_stream = True
async def publish(self, subject: str, envelope: EventEnvelope) -> None:
url = _nats_url()
if url is None:
logger.debug("nats: NATS_URL unset — skipping publish %s (id=%s)", subject, envelope.id)
return
try:
if self._nc is None or not self._nc.is_connected:
await self._connect(url)
assert self._js is not None
payload = envelope.model_dump_json().encode()
ack = await self._js.publish(subject, payload)
logger.info(
"nats: published %s id=%s stream=%s seq=%s",
subject,
envelope.id,
getattr(ack, "stream", "?"),
getattr(ack, "seq", "?"),
)
except Exception as exc:
logger.warning("nats: publish failed %s id=%s err=%s", subject, envelope.id, exc)
async def shutdown(self) -> None:
if self._nc is not None:
try:
await self._nc.drain()
except Exception: # pragma: no cover — defensive
pass
self._nc = None
self._js = None
self._ensured_stream = False
_PUBLISHER = _Publisher()
async def publish_event(subject: str, envelope: EventEnvelope) -> None:
"""Publish ``envelope`` on ``subject``. Logs but never raises on failure.
No-op when ``NATS_URL`` is not configured.
"""
await _PUBLISHER.publish(subject, envelope)
def publish_event_sync(subject: str, envelope: EventEnvelope) -> None:
"""Fire-and-forget variant for sync callers (scripts, cron jobs).
Runs the publish in a short-lived event loop. Intended for one-shot CLI
callers that aren't already inside an async context. Server code should
prefer :func:`publish_event` with ``asyncio.create_task``.
"""
try:
asyncio.run(publish_event(subject, envelope))
except RuntimeError:
# Already inside a running loop — schedule and forget.
asyncio.get_event_loop().create_task(publish_event(subject, envelope))
async def shutdown_publisher() -> None:
"""Drain the NATS connection on app shutdown."""
await _PUBLISHER.shutdown()

View File

@@ -9,6 +9,7 @@ from starlette.requests import Request
from starlette.responses import Response as StarletteResponse from starlette.responses import Response as StarletteResponse
from api.database import engine from api.database import engine
from api.events import shutdown_publisher
from api.routers import decisions, extension_points, progress, state, tasks, technical_debt, topics, workstreams, workstream_dependencies from api.routers import decisions, extension_points, progress, state, tasks, technical_debt, topics, workstreams, workstream_dependencies
from api.routers import domains, repos, contributions, sbom, policy, domain_goals, repo_goals, messages, capability_requests, tpsc from api.routers import domains, repos, contributions, sbom, policy, domain_goals, repo_goals, messages, capability_requests, tpsc
from api.routers import token_events from api.routers import token_events
@@ -53,6 +54,7 @@ class ETagMiddleware(BaseHTTPMiddleware):
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
yield yield
await shutdown_publisher()
await engine.dispose() await engine.dispose()

View File

@@ -1,3 +1,4 @@
import asyncio
import logging import logging
import uuid import uuid
from datetime import datetime, timezone from datetime import datetime, timezone
@@ -10,6 +11,7 @@ from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from api.database import get_session from api.database import get_session
from api.events import EventEnvelope, publish_event
from api.models.decision import Decision, DecisionStatus, DecisionType from api.models.decision import Decision, DecisionStatus, DecisionType
from api.models.progress_event import ProgressEvent from api.models.progress_event import ProgressEvent
from api.schemas.decision import DecisionCreate, DecisionRead, DecisionResolve, DecisionUpdate from api.schemas.decision import DecisionCreate, DecisionRead, DecisionResolve, DecisionUpdate
@@ -150,6 +152,20 @@ async def resolve_decision_action(
if body.write_log: if body.write_log:
await _write_project_log(decision, body.rationale, body.decided_by, session) await _write_project_log(decision, body.rationale, body.decided_by, session)
subject = "org.statehub.decision.resolved"
envelope = EventEnvelope.new(
subject,
attributes={
"decision_id": str(decision.id),
"title": decision.title,
"topic_id": str(decision.topic_id) if decision.topic_id else None,
"workstream_id": str(decision.workstream_id) if decision.workstream_id else None,
"decided_by": body.decided_by,
"rationale_snippet": (body.rationale or "")[:240],
},
)
asyncio.create_task(publish_event(subject, envelope))
return decision return decision

View File

@@ -1,3 +1,4 @@
import asyncio
import uuid import uuid
from fastapi import APIRouter, Depends, HTTPException, status from fastapi import APIRouter, Depends, HTTPException, status
@@ -5,6 +6,7 @@ from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from api.database import get_session from api.database import get_session
from api.events import EventEnvelope, publish_event
from api.models.domain import Domain from api.models.domain import Domain
from api.models.domain_goal import DomainGoal, DomainGoalStatus # noqa: F401 (DomainGoalStatus used in activate) from api.models.domain_goal import DomainGoal, DomainGoalStatus # noqa: F401 (DomainGoalStatus used in activate)
from api.schemas.domain_goal import DomainGoalCreate, DomainGoalRead, DomainGoalUpdate from api.schemas.domain_goal import DomainGoalCreate, DomainGoalRead, DomainGoalUpdate
@@ -96,6 +98,7 @@ async def activate_domain_goal(
goal = await session.get(DomainGoal, goal_id) goal = await session.get(DomainGoal, goal_id)
if goal is None: if goal is None:
raise HTTPException(status_code=404, detail="Domain goal not found") raise HTTPException(status_code=404, detail="Domain goal not found")
was_active = goal.status == DomainGoalStatus.active
# Supersede any other active goal for this domain # Supersede any other active goal for this domain
existing = await session.execute( existing = await session.execute(
@@ -105,10 +108,28 @@ async def activate_domain_goal(
DomainGoal.id != goal_id, DomainGoal.id != goal_id,
) )
) )
superseded_ids: list[str] = []
for old in existing.scalars().all(): for old in existing.scalars().all():
old.status = DomainGoalStatus.superseded old.status = DomainGoalStatus.superseded
superseded_ids.append(str(old.id))
goal.status = DomainGoalStatus.active goal.status = DomainGoalStatus.active
await session.commit() await session.commit()
await session.refresh(goal) await session.refresh(goal)
if not was_active:
domain = await session.get(Domain, goal.domain_id)
subject = "org.statehub.domain.goal.activated"
envelope = EventEnvelope.new(
subject,
attributes={
"goal_id": str(goal.id),
"domain_id": str(goal.domain_id),
"domain_slug": domain.slug if domain else None,
"title": goal.title,
"superseded_goal_ids": superseded_ids,
},
)
asyncio.create_task(publish_event(subject, envelope))
return goal return goal

View File

@@ -16,6 +16,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from api.config import settings from api.config import settings
from api.database import get_session from api.database import get_session
from api.events import EventEnvelope, publish_event
from api.doi_engine import ( from api.doi_engine import (
compute_fingerprint, compute_fingerprint,
evaluate as _doi_evaluate, evaluate as _doi_evaluate,
@@ -94,6 +95,20 @@ async def register_repo(
session.add(repo) session.add(repo)
await session.commit() await session.commit()
await session.refresh(repo) await session.refresh(repo)
subject = "org.statehub.repo.registered"
envelope = EventEnvelope.new(
subject,
attributes={
"repo_id": str(repo.id),
"repo_slug": repo.slug,
"domain_slug": body.domain_slug,
"remote_url": repo.remote_url,
"local_path": repo.local_path,
},
)
asyncio.create_task(publish_event(subject, envelope))
return repo return repo

View File

@@ -1,3 +1,4 @@
import asyncio
import uuid import uuid
import socket import socket
import time import time
@@ -9,6 +10,7 @@ from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from api.database import get_session from api.database import get_session
from api.events import EventEnvelope, publish_event
from api.models.managed_repo import ManagedRepo from api.models.managed_repo import ManagedRepo
from api.models.workstream import Workstream from api.models.workstream import Workstream
from api.schemas.workstream import ( from api.schemas.workstream import (
@@ -168,10 +170,27 @@ async def update_workstream(
ws = await session.get(Workstream, workstream_id) ws = await session.get(Workstream, workstream_id)
if ws is None: if ws is None:
raise HTTPException(status_code=404, detail="Workstream not found") raise HTTPException(status_code=404, detail="Workstream not found")
prev_status = ws.status
for field, value in body.model_dump(exclude_unset=True).items(): for field, value in body.model_dump(exclude_unset=True).items():
setattr(ws, field, value) setattr(ws, field, value)
await session.commit() await session.commit()
await session.refresh(ws) await session.refresh(ws)
if prev_status != "completed" and ws.status == "completed":
subject = "org.statehub.workstream.completed"
envelope = EventEnvelope.new(
subject,
attributes={
"workstream_id": str(ws.id),
"slug": ws.slug,
"title": ws.title,
"topic_id": str(ws.topic_id),
"repo_id": str(ws.repo_id) if ws.repo_id else None,
"repo_goal_id": str(ws.repo_goal_id) if ws.repo_goal_id else None,
},
)
asyncio.create_task(publish_event(subject, envelope))
return ws return ws

View File

@@ -0,0 +1,151 @@
# State Hub → activity-core Delegation Protocol
> CUST-WP-0040 T05. Cross-reference:
> [`docs/nats-event-subjects.md`](nats-event-subjects.md),
> [`docs/cron-migration.md`](cron-migration.md), and activity-core's
> `docs/adr/adr-001-event-bridge-architecture.md`.
## TL;DR
The state hub is a **read model** for cross-domain state. It is not a
task factory. Maintenance automations that *create new work in response
to state transitions* belong in activity-core as `ActivityDefinition`
files. The state hub's only job in that flow is to **publish lifecycle
events** on NATS JetStream so activity-core can react.
```
NATS JetStream
subject: org.statehub.>
stream: ACTIVITY_EVENTS
┌──────────────────────┐
POST /repos/ │ │
PATCH /workstreams/* ─────publish───▶ │ │ ───consume───▶ activity-core
POST /decisions/*/resolve │ │ EventRouter
POST /domain-goals/*/activate │ │ │
scripts/cleanup_stale_tasks.py │ │ ▼
└──────────────────────┘ RunActivityWorkflow
the-custodian/state-hub (creates tasks in
issue-core, etc.)
```
## Why delegate?
| Concern | Living in the state hub today | Lives in activity-core after migration |
| ---------------------------------------- | ----------------------------- | ----------------------------------------------------------- |
| "When should this maintenance run?" | cron/systemd timers | `ActivityDefinition.trigger` (cron + event triggers) |
| "What rule decides whether to act?" | hard-coded in the script | `ActivityDefinition.rule.when` expressions |
| "What task / side-effect should we run?" | hard-coded in the script | `ActivityDefinition.instruction` (shell / workflow / etc.) |
| "Where do we audit what fired?" | journalctl + ad hoc logs | activity-core history + Temporal workflow runs |
| "How is it changed safely?" | edit Python + redeploy hub | edit YAML in the repo, PR-reviewable, hot-reloadable |
Concentrating maintenance logic in declarative `ActivityDefinition`
files makes the rules **auditable**, **testable**, and **modifiable
without redeploying the state hub**.
## Published lifecycle events (v1.0)
Authoritative list and attributes live in
[`docs/nats-event-subjects.md`](nats-event-subjects.md). At v1.0 the
state hub publishes:
| Subject | Trigger site (file:fn) |
| ------------------------------------ | --------------------------------------------------------------- |
| `org.statehub.repo.registered` | `api/routers/repos.py:register_repo` |
| `org.statehub.workstream.completed` | `api/routers/workstreams.py:update_workstream` (on transition) |
| `org.statehub.decision.resolved` | `api/routers/decisions.py:resolve_decision_action` |
| `org.statehub.domain.goal.activated` | `api/routers/domain_goals.py:activate_domain_goal` |
| `org.statehub.task.stale` | `scripts/cleanup_stale_tasks.py` (per cancelled task) |
All events use the shared `EventEnvelope` schema (`api/events/envelope.py`)
and are published via `publish_event(subject, envelope)`. Publishing is
fire-and-forget: failures are logged but **never break the API request
that triggered them**, and the publisher no-ops when `NATS_URL` is
unset.
## What stays in the state hub
- DB schema + Alembic migrations
- API endpoints (CRUD + status transitions + read-model queries)
- MCP tools (read + sanctioned writes: `resolve_decision`,
`add_progress_event`, `get_next_steps`)
- The consistency engine (`scripts/consistency_check.py`) — it owns
ADR-001 reconciliation between workplan files and the DB.
- The `cleanup_stale_tasks.py` *script* (not its schedule) — it owns
the lifecycle rule for cancelling orphaned tasks.
## What moves to activity-core
- The *schedule* for the consistency sweep (`*/15 * * * *`) →
`the-custodian.state-hub-consistency-sweep` ActivityDefinition.
- The *schedule* for stale-task cleanup (`0 3 * * *`) →
`the-custodian.state-hub-stale-task-cleanup` ActivityDefinition.
- Any future "when X happens, create a task" logic. The state hub must
**not** add such rules to its routers — it publishes the event and
the rule lives in activity-core.
See [`docs/cron-migration.md`](cron-migration.md) for the
ActivityDefinition drafts and cutover plan.
## What must never happen
- **State hub writes directly to activity-core's DB.** All
communication is via NATS events.
- **State hub creates issue-core / Temporal tasks itself.** That is
activity-core's job.
- **Routers publish before committing.** Always publish after
`await session.commit()` succeeds. (Otherwise a transaction rollback
would still leak an event.)
- **A publish failure breaks the API response.** The publisher logs and
swallows; lost events are recovered by activity-core re-reading state
on next sweep, not by the API retrying.
## Operational checklist — migrating a cron to an ActivityDefinition
1. Identify the cron's current side-effects. If any of them
*create work* (a task, an issue, a ticket), it is a delegation
candidate. Pure consistency reconciliation can stay as a shell-cron
for now if simpler.
2. Decide the trigger: keep it as `cron`, or upgrade it to `event` by
first identifying / publishing the state hub lifecycle event the
cron is effectively polling for.
3. Add a row to [`docs/nats-event-subjects.md`](nats-event-subjects.md)
if a new event type is being introduced.
4. Wire `publish_event(...)` at the transition site in the appropriate
router. Verify with `nats sub 'org.statehub.>'`.
5. Land the `ActivityDefinition` in activity-core; enable it in
staging.
6. Run both old cron and new ActivityDefinition in parallel for one
week. Both side-effects must be idempotent for this to be safe — if
they aren't, fix that first.
7. Disable the old cron / systemd timer, archive the unit files.
8. Update [`SCOPE.md`](../../SCOPE.md) "Often used with" to mention the
activity-core handoff if a new event type was added.
## Bootstrap and partial-availability behaviour
- **No NATS configured (`NATS_URL` unset)**: publisher is a logged
no-op. The state hub remains fully functional. Useful for dev
environments and `make test`.
- **NATS reachable but stream missing**: publisher creates the
`ACTIVITY_EVENTS` stream with subject filter `org.>` on first
publish, so the state hub can come up before activity-core. In
production both should target the same NATS cluster.
- **activity-core down**: events queue in JetStream and are replayed
when the consumer reconnects. The state hub is unaffected.
- **State hub down**: scheduled ActivityDefinitions in activity-core
still fire; ones that need `state-hub.health` context will skip
cleanly per their rule.
## Verifying end-to-end
```bash
# Subscribe to lifecycle events
nats sub 'org.statehub.>'
# Trigger an event (in another terminal)
curl -X POST http://127.0.0.1:8000/repos/<slug>/sync
# Observe the envelope on the subscriber. Sample shape:
# {"id":"...","type":"org.statehub.workstream.completed","version":"1.0",
# "timestamp":"...","publisher":"the-custodian/state-hub","attributes":{...}}
```

View File

@@ -0,0 +1,175 @@
# State Hub Cron → activity-core ActivityDefinition Migration (Design Stub)
> CUST-WP-0040 T04. **Design stub — not yet implemented.**
> Migration depends on activity-core WP-0003 reaching the
> "ActivityDefinition file ingestion + cron trigger executor" milestone.
The state hub currently runs two recurring maintenance jobs and one
per-repo event hook. Once activity-core is ready, each becomes an
ActivityDefinition file checked into the appropriate repo. The state hub
keeps the underlying scripts; only the *scheduling* moves.
---
## 1. Inventory of current maintenance automations
| # | Source | Trigger today | Script invoked | What it does |
| - | ------------------- | -------------------------------------------------------- | -------------------------------------------------------- | -------------------------------------------------------------------------------------------------- |
| 1 | systemd user timer | every 15 min | `scripts/consistency_check.py --remote --all` | Pull every registered repo, reconcile workplan files ↔ DB, run C-15 writeback + C-16 pull gate |
| 2 | manual / daily cron | `make cleanup-stale` (suggested `0 3 * * *`) | `scripts/cleanup_stale_tasks.py` | Cancel tasks still open in completed/archived workstreams; emits `org.statehub.task.stale` |
| 3 | git post-commit | every commit in a registered repo | `make fix-consistency REPO=<slug>` | Per-repo workplan ↔ DB sync immediately after a commit |
Honourable mentions (not currently scheduled, on-demand only — listed for
completeness so they don't get mistakenly picked up):
- `scripts/ingest_sbom.py` — invoked via `make ingest-sbom REPO=<slug>`.
- `scripts/ingest_capabilities.py` — invoked via `make ingest-capabilities[-all]`.
- `scripts/check_doi.py` — invoked via `make check-doi[-all]`.
- `scripts/validate_repo_adr.py` — invoked manually for canon promotion.
- `scripts/ingest_tpsc.py` — invoked via `make ingest-tpsc[-all]`.
These are **not in scope** for cron migration — they remain on-demand
operator/CI commands. They become candidates only if we later decide to
run them on a schedule.
---
## 2. Target ActivityDefinitions
### A. `state-hub-consistency-sweep`
```yaml
# activity-definitions/the-custodian/state-hub-consistency-sweep.yaml
id: the-custodian.state-hub-consistency-sweep
description: |
Sweep all registered repos: pull, reconcile workplan files ↔ DB,
apply writeback (C-15), respect pull gate (C-16). Mirrors the
existing custodian-sync systemd timer.
trigger:
trigger_type: cron
cron_expression: "*/15 * * * *"
timezone: UTC
misfire_policy: skip # if a prior run is still active, skip
context:
- kind: http_get # confirm state-hub API is reachable
url: http://127.0.0.1:8000/state/health
bind: hub_health
rule:
when:
- "hub_health.status == 'ok'"
instruction:
kind: shell
cmd: >-
cd /home/worsch/the-custodian/state-hub &&
.venv/bin/python scripts/consistency_check.py --remote --all --max-seconds 300
on_failure: log_and_continue # warn-only sweeps must not page on transient failures
```
Notes:
- Replaces the `custodian-sync.service` + `custodian-sync.timer` pair.
- Lock semantics (`/tmp/custodian-consistency-remote-all.lock`) stay in
the script — activity-core just sets the cadence.
- Once active, `infra/README.md` is updated to instruct users to delete
the systemd timer.
### B. `state-hub-stale-task-cleanup`
```yaml
# activity-definitions/the-custodian/state-hub-stale-task-cleanup.yaml
id: the-custodian.state-hub-stale-task-cleanup
description: |
Daily sweep that cancels tasks still 'todo|in_progress|blocked' inside
completed or archived workstreams. Each cancellation also emits
org.statehub.task.stale on NATS for downstream reaction.
trigger:
trigger_type: cron
cron_expression: "0 3 * * *"
timezone: UTC
instruction:
kind: shell
cmd: >-
cd /home/worsch/the-custodian/state-hub &&
.venv/bin/python scripts/cleanup_stale_tasks.py
```
Notes:
- Replaces the documented (`Cron example: 0 3 * * * …`) daily run.
- The script already emits NATS events (see CUST-WP-0040 T03), so
downstream ActivityDefinitions can react per-task without a second pass.
### C. Per-commit consistency sync (currently a git hook)
The git `post-commit` hook installed by `state-hub/scripts/install_hooks.sh`
is **event-driven, not cron-based**. Migrating it to activity-core would
require a `repo.commit.pushed` event channel that doesn't exist yet.
Recommendation: **keep the git hook as-is for now**. Revisit once an
event source (e.g. Gitea webhook fed into NATS) is available, at which
point an event-triggered ActivityDefinition can replace it cleanly:
```yaml
trigger:
trigger_type: event
event_type: org.repo.commit.pushed
filters:
repo_slug: "*"
```
---
## 3. Required context queries
Both A and B want to confirm the state hub is reachable before running.
A reusable context source should be added to activity-core for this:
- `state-hub.health``GET /state/health``{status, db, ...}`
- (optional) `state-hub.repos``GET /repos/?status=active` for the
sweep's per-repo branching, if we later split A into one
ActivityDefinition per repo.
These belong to the state-hub adapter referenced in the workplan's
out-of-scope note ("/sbom/status context query endpoint" etc.).
---
## 4. Blockers / sequencing
| Blocker | Owner | Where it lands |
| ------------------------------------------------------------------------- | -------------- | -------------------------- |
| activity-core ActivityDefinition file ingestion + cron executor (WP-0003) | activity-core | activity-core/`src/...` |
| activity-core shell instruction kind with on_failure semantics | activity-core | activity-core/`src/...` |
| state-hub adapter exposing `state-hub.health` as a context source | activity-core | activity-core/adapters/ |
Until these land, the state hub continues to schedule jobs via systemd
timer + cron entries.
---
## 5. Cutover plan (when ready)
1. Land ActivityDefinitions A + B in activity-core.
2. Enable them in staging; verify they fire on schedule and produce the
same DB / NATS effects as the current cron entries.
3. Run both in parallel for one week (cron + ActivityDefinition). The
scripts are idempotent — duplicate runs are no-ops on a clean state.
4. Disable the systemd timer:
`systemctl --user disable --now custodian-sync.timer`
5. Remove the cleanup-stale cron entry from `crontab -e`.
6. Update `infra/README.md` to point at the ActivityDefinitions and
archive the systemd unit files.
7. Per-commit hook stays until a `repo.commit.pushed` event exists.
---
## 6. Open questions
- **Locking**: should activity-core wrap shell instructions with a
process lock (today the script self-locks via `/tmp/...`)? If yes, the
state-hub script's lock can be removed.
- **Failure surfacing**: today systemd journals capture stderr. Where
does an ActivityDefinition's shell stderr go? (logs ? activity
history ?) — needs activity-core docs before cutover.
- **Per-repo split**: do we split A into one ActivityDefinition per
registered repo (so failures don't poison the sweep), or keep the
monolithic `--all` mode? The latter is simpler and matches today's
behaviour; the former gives better observability.

View File

@@ -0,0 +1,98 @@
# NATS Event Subjects — State Hub
> Part of CUST-WP-0040. Cross-reference: activity-core's
> `event-types/` registry and ADR-001 (event bridge architecture).
The state hub publishes lifecycle events to NATS JetStream so that
activity-core can drive maintenance and reaction automation declaratively,
via `ActivityDefinition` rules — rather than the state hub creating tasks
itself.
This document is the authoritative subject naming convention for state hub
events. When adding a new event, add a row to the table below first and
keep the activity-core `event-types/` registry in sync.
---
## Naming convention
```
org.{producer}.{noun}.{verb}[.{qualifier}]
```
- **`org`** — top-level namespace shared with activity-core (`org.>`)
- **`{producer}`** — the publisher subsystem; the state hub uses `statehub`
- **`{noun}`** — entity the event is about (`repo`, `workstream`, `task`, …)
- **`{verb}`** — past-tense state transition (`registered`, `completed`, `resolved`, …)
- **`{qualifier}`** — optional refinement (e.g. `goal.activated`)
All segments are lowercase ASCII. No camelCase, no dashes inside segments.
### Why a `statehub` namespace?
Activity-core listens to `activity.>` for its internal task lifecycle and
`org.>` for org-wide lifecycle events. Multiple publishers will eventually
share `org.>` (e.g. railiance, kaizen). The `{producer}` segment keeps
those publishers from colliding on the same `{noun}.{verb}` shape.
---
## Published subjects (v1.0)
| Subject | When | Required attributes |
| ------------------------------------ | ------------------------------------------------------------ | ---------------------------------------------------------------------------------------------------------------------------- |
| `org.statehub.repo.registered` | A new repo is registered via `POST /repos/` | `repo_id`, `repo_slug`, `domain_slug`, `remote_url?`, `local_path?` |
| `org.statehub.workstream.completed` | A workstream transitions to status `completed` | `workstream_id`, `slug`, `title`, `topic_id`, `repo_id?`, `repo_goal_id?` |
| `org.statehub.decision.resolved` | A decision is resolved via `POST /decisions/{id}/resolve` | `decision_id`, `title`, `topic_id?`, `workstream_id?`, `decided_by`, `rationale_snippet` |
| `org.statehub.domain.goal.activated` | A domain goal transitions to `active` | `goal_id`, `domain_id`, `domain_slug`, `title`, `superseded_goal_ids[]` |
| `org.statehub.task.stale` | `scripts/cleanup_stale_tasks.py` cancels an out-of-date task | `task_id`, `workstream_id`, `workstream_status`, `task_title`, `task_status_before` |
### Envelope shape
Each message body conforms to the `EventEnvelope` schema in
`api/events/envelope.py`, mirrored from
`activity-core/src/activity_core/models.py`:
```json
{
"id": "uuid v4 — stable, used for at-least-once dedup",
"type": "org.statehub.repo.registered",
"version": "1.0",
"timestamp": "2026-05-17T14:00:00Z",
"publisher": "the-custodian/state-hub",
"attributes": { "...": "event-specific" }
}
```
`type` matches the subject. `publisher` is always
`the-custodian/state-hub` for events emitted from this repo.
---
## Stream
State hub events are published into the **`ACTIVITY_EVENTS`** JetStream
(subject filter `org.>`). The stream is owned by activity-core; the state
hub will auto-create it on first publish if it does not exist, so the
publisher works in dev environments without bootstrapping activity-core
first. In production both services point at the same NATS cluster and
activity-core's `EventRouter` consumes the stream durably.
---
## Adding a new event
1. Pick a subject following the convention above.
2. Add a row to the table in this file (subject, trigger, attributes).
3. Add a matching `event-types/` entry in activity-core.
4. Wire `publish_event(subject, EventEnvelope.new(subject, attributes))`
at the site of the state transition (inside the same DB transaction
only after `await session.commit()` — never publish optimistically).
5. Verify locally: run `nats sub 'org.statehub.>'` while triggering the
transition.
## Versioning
`version` is bumped only when an attribute is removed or its semantics
change. Adding optional attributes does **not** require a version bump.
Activity-core consumers must tolerate unknown attribute keys.

View File

@@ -17,6 +17,7 @@ dependencies = [
"psycopg2-binary>=2.9.0", "psycopg2-binary>=2.9.0",
"llm-connect", "llm-connect",
"pyyaml>=6.0.3", "pyyaml>=6.0.3",
"nats-py>=2.7.0",
] ]
[project.scripts] [project.scripts]

View File

@@ -11,12 +11,24 @@ Exit codes:
1 — API unreachable or unexpected error 1 — API unreachable or unexpected error
""" """
import asyncio
import json import json
import os
import sys import sys
import urllib.error import urllib.error
import urllib.request import urllib.request
from datetime import datetime, timezone from datetime import datetime, timezone
# Make the api package importable when running as `python scripts/cleanup_stale_tasks.py`
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
try:
from api.events import EventEnvelope, publish_event, shutdown_publisher
except Exception: # pragma: no cover — event publishing is optional
EventEnvelope = None # type: ignore[assignment]
publish_event = None # type: ignore[assignment]
shutdown_publisher = None # type: ignore[assignment]
API = "http://127.0.0.1:8000" API = "http://127.0.0.1:8000"
STALE_STATUSES = {"todo", "in_progress", "blocked"} STALE_STATUSES = {"todo", "in_progress", "blocked"}
CLOSED_WS_STATUS = {"completed", "archived"} CLOSED_WS_STATUS = {"completed", "archived"}
@@ -85,6 +97,7 @@ def main() -> int:
cancelled = [] cancelled = []
errors = [] errors = []
nats_events: list[tuple[str, "EventEnvelope"]] = []
for t in stale: for t in stale:
ws = closed_ws[t["workstream_id"]] ws = closed_ws[t["workstream_id"]]
@@ -100,10 +113,35 @@ def main() -> int:
) )
cancelled.append(t) cancelled.append(t)
print(f" cancelled [{t['priority']:8}] {t['title'][:70]}") print(f" cancelled [{t['priority']:8}] {t['title'][:70]}")
if EventEnvelope is not None:
subject = "org.statehub.task.stale"
nats_events.append((
subject,
EventEnvelope.new(
subject,
attributes={
"task_id": t["id"],
"workstream_id": t["workstream_id"],
"workstream_status": ws["status"],
"task_title": t["title"],
"task_status_before": t["status"],
},
),
))
except Exception as e: except Exception as e:
errors.append((t, str(e))) errors.append((t, str(e)))
print(f" ERROR {t['title'][:60]}{e}", file=sys.stderr) print(f" ERROR {t['title'][:60]}{e}", file=sys.stderr)
if nats_events and publish_event is not None and shutdown_publisher is not None:
async def _flush_events() -> None:
for subject, env in nats_events:
await publish_event(subject, env)
await shutdown_publisher()
try:
asyncio.run(_flush_events())
except Exception as e: # pragma: no cover — publishing is best-effort
print(f"[cleanup-stale] WARNING: NATS publish failed — {e}", file=sys.stderr)
# Emit a single progress event summarising the run # Emit a single progress event summarising the run
if cancelled: if cancelled:
by_ws: dict[str, list] = {} by_ws: dict[str, list] = {}

11
state-hub/uv.lock generated
View File

@@ -822,6 +822,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/a4/8e/469e5a4a2f5855992e425f3cb33804cc07bf18d48f2db061aec61ce50270/more_itertools-10.8.0-py3-none-any.whl", hash = "sha256:52d4362373dcf7c52546bc4af9a86ee7c4579df9a8dc268be0a2f949d376cc9b", size = 69667 }, { url = "https://files.pythonhosted.org/packages/a4/8e/469e5a4a2f5855992e425f3cb33804cc07bf18d48f2db061aec61ce50270/more_itertools-10.8.0-py3-none-any.whl", hash = "sha256:52d4362373dcf7c52546bc4af9a86ee7c4579df9a8dc268be0a2f949d376cc9b", size = 69667 },
] ]
[[package]]
name = "nats-py"
version = "2.14.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/c3/f8/b956c4621ba88748ed707c52e69f95b7a50c8914e750edca59a5bef84a76/nats_py-2.14.0.tar.gz", hash = "sha256:4ed02cb8e3b55c68074a063aa2687087115d805d1513297da90cb2068fb07bed", size = 120751 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/f9/39/0e87753df1072254bac190b33ed34b264f28f6aa9bea0f01b7e818071756/nats_py-2.14.0-py3-none-any.whl", hash = "sha256:4116f5d2233ce16e63c3d5538fa40a5e207f75fcf42a741773929ddf1e29d19d", size = 82259 },
]
[[package]] [[package]]
name = "openapi-pydantic" name = "openapi-pydantic"
version = "0.5.1" version = "0.5.1"
@@ -1435,6 +1444,7 @@ dependencies = [
{ name = "fastmcp" }, { name = "fastmcp" },
{ name = "httpx" }, { name = "httpx" },
{ name = "llm-connect" }, { name = "llm-connect" },
{ name = "nats-py" },
{ name = "psycopg2-binary" }, { name = "psycopg2-binary" },
{ name = "pydantic" }, { name = "pydantic" },
{ name = "pydantic-settings" }, { name = "pydantic-settings" },
@@ -1459,6 +1469,7 @@ requires-dist = [
{ name = "fastmcp", specifier = ">=2.0.0" }, { name = "fastmcp", specifier = ">=2.0.0" },
{ name = "httpx", specifier = ">=0.28.0" }, { name = "httpx", specifier = ">=0.28.0" },
{ name = "llm-connect", editable = "../../llm-connect" }, { name = "llm-connect", editable = "../../llm-connect" },
{ name = "nats-py", specifier = ">=2.7.0" },
{ name = "psycopg2-binary", specifier = ">=2.9.0" }, { name = "psycopg2-binary", specifier = ">=2.9.0" },
{ name = "pydantic", specifier = ">=2.10.0" }, { name = "pydantic", specifier = ">=2.10.0" },
{ name = "pydantic-settings", specifier = ">=2.7.0" }, { name = "pydantic-settings", specifier = ">=2.7.0" },

View File

@@ -3,33 +3,33 @@ id: CUST-WP-0040
type: workplan type: workplan
domain: custodian domain: custodian
repo: the-custodian repo: the-custodian
status: active status: completed
state_hub_workstream_id: d8ac100b-a844-46a5-9684-415df0d32539 state_hub_workstream_id: d8ac100b-a844-46a5-9684-415df0d32539
tasks: tasks:
- id: T01 - id: T01
title: Connect state hub event publisher to NATS JetStream title: Connect state hub event publisher to NATS JetStream
state_hub_task_id: c4bfa299-54dd-4ede-b5fa-9ae27dce5b2c state_hub_task_id: c4bfa299-54dd-4ede-b5fa-9ae27dce5b2c
status: todo status: done
- id: T02 - id: T02
title: Define NATS subject schema for state hub lifecycle events title: Define NATS subject schema for state hub lifecycle events
state_hub_task_id: 2ae236a4-8a16-4998-92dc-83bc48378d3d state_hub_task_id: 2ae236a4-8a16-4998-92dc-83bc48378d3d
status: todo status: done
- id: T03 - id: T03
title: Implement state hub lifecycle events as NATS EventEnvelopes title: Implement state hub lifecycle events as NATS EventEnvelopes
state_hub_task_id: ccb5a3fb-d5e5-4781-9e5d-552ad09477f5 state_hub_task_id: ccb5a3fb-d5e5-4781-9e5d-552ad09477f5
status: todo status: done
- id: T04 - id: T04
title: Migrate state hub maintenance crons to activity-core ActivityDefinitions (design stub) title: Migrate state hub maintenance crons to activity-core ActivityDefinitions (design stub)
state_hub_task_id: 933b2a80-cbd6-436f-b092-c39dc6f1c9c4 state_hub_task_id: 933b2a80-cbd6-436f-b092-c39dc6f1c9c4
status: todo status: done
- id: T05 - id: T05
title: Document state hub → activity-core delegation protocol title: Document state hub → activity-core delegation protocol
state_hub_task_id: b261d4b8-4039-4610-aaad-eb45bf3a51de state_hub_task_id: b261d4b8-4039-4610-aaad-eb45bf3a51de
status: todo status: done
- id: T06 - id: T06
title: Update SCOPE.md to reflect activity-core delegation title: Update SCOPE.md to reflect activity-core delegation
state_hub_task_id: 356682e6-e608-4f58-b418-cdef2b9435f2 state_hub_task_id: 356682e6-e608-4f58-b418-cdef2b9435f2
status: todo status: done
created: "2026-05-14" created: "2026-05-14"
--- ---
@@ -168,3 +168,8 @@ T06 (independent — SCOPE.md update)
- v0.1 (2026-05-14): Stub created by activity-core agent during WP-0003 planning. - v0.1 (2026-05-14): Stub created by activity-core agent during WP-0003 planning.
Local agent to flesh out and implement. Local agent to flesh out and implement.
- v0.2 (2026-05-17): All tasks complete. NATS publisher landed at
`state-hub/api/events/`, lifecycle events wired in repos/workstreams/
decisions/domain_goals routers + cleanup_stale_tasks.py. Subject
schema, cron-migration design stub, delegation protocol docs, and
SCOPE.md updates committed.