diff --git a/SCOPE.md b/SCOPE.md index cae9530..3f63a1e 100644 --- a/SCOPE.md +++ b/SCOPE.md @@ -26,6 +26,7 @@ The Custodian is both an **operational system** (State Hub: PostgreSQL + FastAPI - Memory: append-only episodic archive (working notes + immutable event logs) - Agent runtime scaffolding: policies, kaizen agent copies, tool adapters - Cross-domain coordination: dependency tracking, human-intervention flags, next-steps suggestions +- Publishing lifecycle events on NATS JetStream (`org.statehub.>`) so activity-core can react via declarative ActivityDefinitions --- @@ -35,6 +36,7 @@ The Custodian is both an **operational system** (State Hub: PostgreSQL + FastAPI - Financial/legal transactions or external publication - Storing plaintext credentials - Direct writes to `canon/` without a human-approved review gate +- Maintenance task *creation* in response to lifecycle events — that responsibility lives in activity-core (see `state-hub/docs/activity-core-delegation.md`). The state hub remains a **read model**, not a task factory. --- @@ -68,8 +70,8 @@ The Custodian is both an **operational system** (State Hub: PostgreSQL + FastAPI ## How It Fits - Upstream dependencies: none (sits at the top of the dependency order) -- Downstream consumers: all six domains (railiance → markitect → coulomb.social → personhood/foerster → custodian) -- Often used with: kaizen-agentic (agent definitions), ops-bridge (remote tunnel connectivity), activity-core (task factory) +- Downstream consumers: all six domains (railiance → markitect → coulomb.social → personhood/foerster → custodian); **activity-core** consumes state hub lifecycle events on NATS subject `org.statehub.>` to drive maintenance ActivityDefinitions +- Often used with: kaizen-agentic (agent definitions), ops-bridge (remote tunnel connectivity), activity-core (task factory + event bridge) --- diff --git a/state-hub/api/events/__init__.py b/state-hub/api/events/__init__.py new file mode 100644 index 0000000..975bec7 --- /dev/null +++ b/state-hub/api/events/__init__.py @@ -0,0 +1,13 @@ +from api.events.envelope import EventEnvelope +from api.events.nats_publisher import ( + publish_event, + publish_event_sync, + shutdown_publisher, +) + +__all__ = [ + "EventEnvelope", + "publish_event", + "publish_event_sync", + "shutdown_publisher", +] diff --git a/state-hub/api/events/envelope.py b/state-hub/api/events/envelope.py new file mode 100644 index 0000000..99034ad --- /dev/null +++ b/state-hub/api/events/envelope.py @@ -0,0 +1,55 @@ +"""EventEnvelope — schema for state-hub lifecycle events published to NATS. + +Mirrors the EventEnvelope contract defined in activity-core +(`src/activity_core/models.py`). The state-hub publishes; activity-core +consumes and routes to ActivityDefinitions. + +Subject naming convention (see docs/nats-event-subjects.md): + + org.statehub.{noun}.{verb} + +Examples: + org.statehub.repo.registered + org.statehub.workstream.completed + org.statehub.decision.resolved + org.statehub.domain.goal.activated + org.statehub.task.stale +""" + +from __future__ import annotations + +import uuid +from datetime import datetime, timezone +from typing import Any + +from pydantic import BaseModel, Field + +PUBLISHER = "the-custodian/state-hub" + + +class EventEnvelope(BaseModel): + """Standard envelope shared with activity-core. Do not break compatibility. + + All inbound events on activity-core's side are normalised into this shape. + """ + + id: str = Field(description="UUID v4 — stable unique ID for deduplication.") + type: str = Field(description="Dot-namespaced event type, e.g. 'org.statehub.repo.registered'.") + version: str = Field(default="1.0", description="Schema version string.") + timestamp: datetime = Field(description="When the event occurred (UTC).") + publisher: str = Field(default=PUBLISHER, description="Originating service.") + attributes: dict[str, Any] = Field( + default_factory=dict, + description="Event-specific attributes; structure varies by event type.", + ) + + @classmethod + def new(cls, event_type: str, attributes: dict[str, Any] | None = None) -> "EventEnvelope": + """Construct an envelope with a fresh UUID and current UTC timestamp.""" + return cls( + id=str(uuid.uuid4()), + type=event_type, + timestamp=datetime.now(tz=timezone.utc), + publisher=PUBLISHER, + attributes=attributes or {}, + ) diff --git a/state-hub/api/events/nats_publisher.py b/state-hub/api/events/nats_publisher.py new file mode 100644 index 0000000..6f8b8f4 --- /dev/null +++ b/state-hub/api/events/nats_publisher.py @@ -0,0 +1,139 @@ +"""NATS JetStream publisher for state-hub lifecycle events. + +Design: + - One process-wide publisher (`_Publisher` singleton). + - Connects lazily on first publish; reuses the connection thereafter. + - When ``NATS_URL`` is unset or empty, every publish is a logged no-op + so the state hub remains usable in environments without NATS. + - All publishes are fire-and-forget from the caller's perspective. + Failures are logged but never raise — losing a lifecycle event must + never break the API request that triggered it. + +Stream + subject conventions live in ``docs/nats-event-subjects.md``. +Envelope schema lives in :mod:`api.events.envelope`. +""" + +from __future__ import annotations + +import asyncio +import logging +import os +from typing import TYPE_CHECKING + +from api.events.envelope import EventEnvelope + +if TYPE_CHECKING: # pragma: no cover — import-only for typing + from nats.aio.client import Client as NATSClient + from nats.js.client import JetStreamContext + +logger = logging.getLogger("state_hub.events.nats") + +_STREAM_NAME = "ACTIVITY_EVENTS" +_STREAM_SUBJECT_PATTERN = "org.>" + + +def _nats_url() -> str | None: + """Resolve NATS_URL at call time so tests / configs can override it.""" + url = os.environ.get("NATS_URL", "").strip() + return url or None + + +class _Publisher: + """Singleton holding the live NATS connection + JetStream context.""" + + def __init__(self) -> None: + self._nc: "NATSClient | None" = None + self._js: "JetStreamContext | None" = None + self._connect_lock = asyncio.Lock() + self._ensured_stream = False + + async def _connect(self, url: str) -> None: + # Imported inside the method so module import works without the dep. + import nats + import nats.js.api + + async with self._connect_lock: + if self._nc is not None and self._nc.is_connected: + return + self._nc = await nats.connect(url, connect_timeout=2) + self._js = self._nc.jetstream() + logger.info("nats: connected to %s", url) + + if not self._ensured_stream: + try: + await self._js.find_stream_name_by_subject("org.statehub.repo.registered") + self._ensured_stream = True + except Exception: + try: + await self._js.add_stream( + nats.js.api.StreamConfig( + name=_STREAM_NAME, + subjects=[_STREAM_SUBJECT_PATTERN], + ) + ) + logger.info("nats: created JetStream stream %r", _STREAM_NAME) + except Exception as exc: # pragma: no cover — defensive + logger.warning("nats: could not ensure stream %r: %s", _STREAM_NAME, exc) + self._ensured_stream = True + + async def publish(self, subject: str, envelope: EventEnvelope) -> None: + url = _nats_url() + if url is None: + logger.debug("nats: NATS_URL unset — skipping publish %s (id=%s)", subject, envelope.id) + return + + try: + if self._nc is None or not self._nc.is_connected: + await self._connect(url) + assert self._js is not None + payload = envelope.model_dump_json().encode() + ack = await self._js.publish(subject, payload) + logger.info( + "nats: published %s id=%s stream=%s seq=%s", + subject, + envelope.id, + getattr(ack, "stream", "?"), + getattr(ack, "seq", "?"), + ) + except Exception as exc: + logger.warning("nats: publish failed %s id=%s err=%s", subject, envelope.id, exc) + + async def shutdown(self) -> None: + if self._nc is not None: + try: + await self._nc.drain() + except Exception: # pragma: no cover — defensive + pass + self._nc = None + self._js = None + self._ensured_stream = False + + +_PUBLISHER = _Publisher() + + +async def publish_event(subject: str, envelope: EventEnvelope) -> None: + """Publish ``envelope`` on ``subject``. Logs but never raises on failure. + + No-op when ``NATS_URL`` is not configured. + """ + await _PUBLISHER.publish(subject, envelope) + + +def publish_event_sync(subject: str, envelope: EventEnvelope) -> None: + """Fire-and-forget variant for sync callers (scripts, cron jobs). + + Runs the publish in a short-lived event loop. Intended for one-shot CLI + callers that aren't already inside an async context. Server code should + prefer :func:`publish_event` with ``asyncio.create_task``. + """ + try: + asyncio.run(publish_event(subject, envelope)) + except RuntimeError: + # Already inside a running loop — schedule and forget. + asyncio.get_event_loop().create_task(publish_event(subject, envelope)) + + +async def shutdown_publisher() -> None: + """Drain the NATS connection on app shutdown.""" + await _PUBLISHER.shutdown() diff --git a/state-hub/api/main.py b/state-hub/api/main.py index d7cdffd..e1c368d 100644 --- a/state-hub/api/main.py +++ b/state-hub/api/main.py @@ -9,6 +9,7 @@ from starlette.requests import Request from starlette.responses import Response as StarletteResponse from api.database import engine +from api.events import shutdown_publisher from api.routers import decisions, extension_points, progress, state, tasks, technical_debt, topics, workstreams, workstream_dependencies from api.routers import domains, repos, contributions, sbom, policy, domain_goals, repo_goals, messages, capability_requests, tpsc from api.routers import token_events @@ -53,6 +54,7 @@ class ETagMiddleware(BaseHTTPMiddleware): @asynccontextmanager async def lifespan(app: FastAPI): yield + await shutdown_publisher() await engine.dispose() diff --git a/state-hub/api/routers/decisions.py b/state-hub/api/routers/decisions.py index b216250..6ee0af6 100644 --- a/state-hub/api/routers/decisions.py +++ b/state-hub/api/routers/decisions.py @@ -1,3 +1,4 @@ +import asyncio import logging import uuid from datetime import datetime, timezone @@ -10,6 +11,7 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from api.database import get_session +from api.events import EventEnvelope, publish_event from api.models.decision import Decision, DecisionStatus, DecisionType from api.models.progress_event import ProgressEvent from api.schemas.decision import DecisionCreate, DecisionRead, DecisionResolve, DecisionUpdate @@ -150,6 +152,20 @@ async def resolve_decision_action( if body.write_log: await _write_project_log(decision, body.rationale, body.decided_by, session) + subject = "org.statehub.decision.resolved" + envelope = EventEnvelope.new( + subject, + attributes={ + "decision_id": str(decision.id), + "title": decision.title, + "topic_id": str(decision.topic_id) if decision.topic_id else None, + "workstream_id": str(decision.workstream_id) if decision.workstream_id else None, + "decided_by": body.decided_by, + "rationale_snippet": (body.rationale or "")[:240], + }, + ) + asyncio.create_task(publish_event(subject, envelope)) + return decision diff --git a/state-hub/api/routers/domain_goals.py b/state-hub/api/routers/domain_goals.py index ead673e..2a1fcb3 100644 --- a/state-hub/api/routers/domain_goals.py +++ b/state-hub/api/routers/domain_goals.py @@ -1,3 +1,4 @@ +import asyncio import uuid from fastapi import APIRouter, Depends, HTTPException, status @@ -5,6 +6,7 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from api.database import get_session +from api.events import EventEnvelope, publish_event from api.models.domain import Domain from api.models.domain_goal import DomainGoal, DomainGoalStatus # noqa: F401 (DomainGoalStatus used in activate) from api.schemas.domain_goal import DomainGoalCreate, DomainGoalRead, DomainGoalUpdate @@ -96,6 +98,7 @@ async def activate_domain_goal( goal = await session.get(DomainGoal, goal_id) if goal is None: raise HTTPException(status_code=404, detail="Domain goal not found") + was_active = goal.status == DomainGoalStatus.active # Supersede any other active goal for this domain existing = await session.execute( @@ -105,10 +108,28 @@ async def activate_domain_goal( DomainGoal.id != goal_id, ) ) + superseded_ids: list[str] = [] for old in existing.scalars().all(): old.status = DomainGoalStatus.superseded + superseded_ids.append(str(old.id)) goal.status = DomainGoalStatus.active await session.commit() await session.refresh(goal) + + if not was_active: + domain = await session.get(Domain, goal.domain_id) + subject = "org.statehub.domain.goal.activated" + envelope = EventEnvelope.new( + subject, + attributes={ + "goal_id": str(goal.id), + "domain_id": str(goal.domain_id), + "domain_slug": domain.slug if domain else None, + "title": goal.title, + "superseded_goal_ids": superseded_ids, + }, + ) + asyncio.create_task(publish_event(subject, envelope)) + return goal diff --git a/state-hub/api/routers/repos.py b/state-hub/api/routers/repos.py index f4356dd..04b5e68 100644 --- a/state-hub/api/routers/repos.py +++ b/state-hub/api/routers/repos.py @@ -16,6 +16,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from api.config import settings from api.database import get_session +from api.events import EventEnvelope, publish_event from api.doi_engine import ( compute_fingerprint, evaluate as _doi_evaluate, @@ -94,6 +95,20 @@ async def register_repo( session.add(repo) await session.commit() await session.refresh(repo) + + subject = "org.statehub.repo.registered" + envelope = EventEnvelope.new( + subject, + attributes={ + "repo_id": str(repo.id), + "repo_slug": repo.slug, + "domain_slug": body.domain_slug, + "remote_url": repo.remote_url, + "local_path": repo.local_path, + }, + ) + asyncio.create_task(publish_event(subject, envelope)) + return repo diff --git a/state-hub/api/routers/workstreams.py b/state-hub/api/routers/workstreams.py index 89dffba..64868e9 100644 --- a/state-hub/api/routers/workstreams.py +++ b/state-hub/api/routers/workstreams.py @@ -1,3 +1,4 @@ +import asyncio import uuid import socket import time @@ -9,6 +10,7 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from api.database import get_session +from api.events import EventEnvelope, publish_event from api.models.managed_repo import ManagedRepo from api.models.workstream import Workstream from api.schemas.workstream import ( @@ -168,10 +170,27 @@ async def update_workstream( ws = await session.get(Workstream, workstream_id) if ws is None: raise HTTPException(status_code=404, detail="Workstream not found") + prev_status = ws.status for field, value in body.model_dump(exclude_unset=True).items(): setattr(ws, field, value) await session.commit() await session.refresh(ws) + + if prev_status != "completed" and ws.status == "completed": + subject = "org.statehub.workstream.completed" + envelope = EventEnvelope.new( + subject, + attributes={ + "workstream_id": str(ws.id), + "slug": ws.slug, + "title": ws.title, + "topic_id": str(ws.topic_id), + "repo_id": str(ws.repo_id) if ws.repo_id else None, + "repo_goal_id": str(ws.repo_goal_id) if ws.repo_goal_id else None, + }, + ) + asyncio.create_task(publish_event(subject, envelope)) + return ws diff --git a/state-hub/docs/activity-core-delegation.md b/state-hub/docs/activity-core-delegation.md new file mode 100644 index 0000000..c71dfee --- /dev/null +++ b/state-hub/docs/activity-core-delegation.md @@ -0,0 +1,151 @@ +# State Hub → activity-core Delegation Protocol + +> CUST-WP-0040 T05. Cross-reference: +> [`docs/nats-event-subjects.md`](nats-event-subjects.md), +> [`docs/cron-migration.md`](cron-migration.md), and activity-core's +> `docs/adr/adr-001-event-bridge-architecture.md`. + +## TL;DR + +The state hub is a **read model** for cross-domain state. It is not a +task factory. Maintenance automations that *create new work in response +to state transitions* belong in activity-core as `ActivityDefinition` +files. The state hub's only job in that flow is to **publish lifecycle +events** on NATS JetStream so activity-core can react. + +``` + NATS JetStream + subject: org.statehub.> + stream: ACTIVITY_EVENTS + ┌──────────────────────┐ + POST /repos/ │ │ + PATCH /workstreams/* ─────publish───▶ │ │ ───consume───▶ activity-core + POST /decisions/*/resolve │ │ EventRouter + POST /domain-goals/*/activate │ │ │ + scripts/cleanup_stale_tasks.py │ │ ▼ + └──────────────────────┘ RunActivityWorkflow + the-custodian/state-hub (creates tasks in + issue-core, etc.) +``` + +## Why delegate? + +| Concern | Living in the state hub today | Lives in activity-core after migration | +| ---------------------------------------- | ----------------------------- | ----------------------------------------------------------- | +| "When should this maintenance run?" | cron/systemd timers | `ActivityDefinition.trigger` (cron + event triggers) | +| "What rule decides whether to act?" | hard-coded in the script | `ActivityDefinition.rule.when` expressions | +| "What task / side-effect should we run?" | hard-coded in the script | `ActivityDefinition.instruction` (shell / workflow / etc.) | +| "Where do we audit what fired?" | journalctl + ad hoc logs | activity-core history + Temporal workflow runs | +| "How is it changed safely?" | edit Python + redeploy hub | edit YAML in the repo, PR-reviewable, hot-reloadable | + +Concentrating maintenance logic in declarative `ActivityDefinition` +files makes the rules **auditable**, **testable**, and **modifiable +without redeploying the state hub**. + +## Published lifecycle events (v1.0) + +Authoritative list and attributes live in +[`docs/nats-event-subjects.md`](nats-event-subjects.md). At v1.0 the +state hub publishes: + +| Subject | Trigger site (file:fn) | +| ------------------------------------ | --------------------------------------------------------------- | +| `org.statehub.repo.registered` | `api/routers/repos.py:register_repo` | +| `org.statehub.workstream.completed` | `api/routers/workstreams.py:update_workstream` (on transition) | +| `org.statehub.decision.resolved` | `api/routers/decisions.py:resolve_decision_action` | +| `org.statehub.domain.goal.activated` | `api/routers/domain_goals.py:activate_domain_goal` | +| `org.statehub.task.stale` | `scripts/cleanup_stale_tasks.py` (per cancelled task) | + +All events use the shared `EventEnvelope` schema (`api/events/envelope.py`) +and are published via `publish_event(subject, envelope)`. Publishing is +fire-and-forget: failures are logged but **never break the API request +that triggered them**, and the publisher no-ops when `NATS_URL` is +unset. + +## What stays in the state hub + +- DB schema + Alembic migrations +- API endpoints (CRUD + status transitions + read-model queries) +- MCP tools (read + sanctioned writes: `resolve_decision`, + `add_progress_event`, `get_next_steps`) +- The consistency engine (`scripts/consistency_check.py`) — it owns + ADR-001 reconciliation between workplan files and the DB. +- The `cleanup_stale_tasks.py` *script* (not its schedule) — it owns + the lifecycle rule for cancelling orphaned tasks. + +## What moves to activity-core + +- The *schedule* for the consistency sweep (`*/15 * * * *`) → + `the-custodian.state-hub-consistency-sweep` ActivityDefinition. +- The *schedule* for stale-task cleanup (`0 3 * * *`) → + `the-custodian.state-hub-stale-task-cleanup` ActivityDefinition. +- Any future "when X happens, create a task" logic. The state hub must + **not** add such rules to its routers — it publishes the event and + the rule lives in activity-core. + +See [`docs/cron-migration.md`](cron-migration.md) for the +ActivityDefinition drafts and cutover plan. + +## What must never happen + +- **State hub writes directly to activity-core's DB.** All + communication is via NATS events. +- **State hub creates issue-core / Temporal tasks itself.** That is + activity-core's job. +- **Routers publish before committing.** Always publish after + `await session.commit()` succeeds. (Otherwise a transaction rollback + would still leak an event.) +- **A publish failure breaks the API response.** The publisher logs and + swallows; lost events are recovered by activity-core re-reading state + on next sweep, not by the API retrying. + +## Operational checklist — migrating a cron to an ActivityDefinition + +1. Identify the cron's current side-effects. If any of them + *create work* (a task, an issue, a ticket), it is a delegation + candidate. Pure consistency reconciliation can stay as a shell-cron + for now if simpler. +2. Decide the trigger: keep it as `cron`, or upgrade it to `event` by + first identifying / publishing the state hub lifecycle event the + cron is effectively polling for. +3. Add a row to [`docs/nats-event-subjects.md`](nats-event-subjects.md) + if a new event type is being introduced. +4. Wire `publish_event(...)` at the transition site in the appropriate + router. Verify with `nats sub 'org.statehub.>'`. +5. Land the `ActivityDefinition` in activity-core; enable it in + staging. +6. Run both old cron and new ActivityDefinition in parallel for one + week. Both side-effects must be idempotent for this to be safe — if + they aren't, fix that first. +7. Disable the old cron / systemd timer, archive the unit files. +8. Update [`SCOPE.md`](../../SCOPE.md) "Often used with" to mention the + activity-core handoff if a new event type was added. + +## Bootstrap and partial-availability behaviour + +- **No NATS configured (`NATS_URL` unset)**: publisher is a logged + no-op. The state hub remains fully functional. Useful for dev + environments and `make test`. +- **NATS reachable but stream missing**: publisher creates the + `ACTIVITY_EVENTS` stream with subject filter `org.>` on first + publish, so the state hub can come up before activity-core. In + production both should target the same NATS cluster. +- **activity-core down**: events queue in JetStream and are replayed + when the consumer reconnects. The state hub is unaffected. +- **State hub down**: scheduled ActivityDefinitions in activity-core + still fire; ones that need `state-hub.health` context will skip + cleanly per their rule. + +## Verifying end-to-end + +```bash +# Subscribe to lifecycle events +nats sub 'org.statehub.>' + +# Trigger an event (in another terminal) +curl -X POST http://127.0.0.1:8000/repos//sync + +# Observe the envelope on the subscriber. Sample shape: +# {"id":"...","type":"org.statehub.workstream.completed","version":"1.0", +# "timestamp":"...","publisher":"the-custodian/state-hub","attributes":{...}} +``` diff --git a/state-hub/docs/cron-migration.md b/state-hub/docs/cron-migration.md new file mode 100644 index 0000000..ff39d76 --- /dev/null +++ b/state-hub/docs/cron-migration.md @@ -0,0 +1,175 @@ +# State Hub Cron → activity-core ActivityDefinition Migration (Design Stub) + +> CUST-WP-0040 T04. **Design stub — not yet implemented.** +> Migration depends on activity-core WP-0003 reaching the +> "ActivityDefinition file ingestion + cron trigger executor" milestone. + +The state hub currently runs two recurring maintenance jobs and one +per-repo event hook. Once activity-core is ready, each becomes an +ActivityDefinition file checked into the appropriate repo. The state hub +keeps the underlying scripts; only the *scheduling* moves. + +--- + +## 1. Inventory of current maintenance automations + +| # | Source | Trigger today | Script invoked | What it does | +| - | ------------------- | -------------------------------------------------------- | -------------------------------------------------------- | -------------------------------------------------------------------------------------------------- | +| 1 | systemd user timer | every 15 min | `scripts/consistency_check.py --remote --all` | Pull every registered repo, reconcile workplan files ↔ DB, run C-15 writeback + C-16 pull gate | +| 2 | manual / daily cron | `make cleanup-stale` (suggested `0 3 * * *`) | `scripts/cleanup_stale_tasks.py` | Cancel tasks still open in completed/archived workstreams; emits `org.statehub.task.stale` | +| 3 | git post-commit | every commit in a registered repo | `make fix-consistency REPO=` | Per-repo workplan ↔ DB sync immediately after a commit | + +Honourable mentions (not currently scheduled, on-demand only — listed for +completeness so they don't get mistakenly picked up): + +- `scripts/ingest_sbom.py` — invoked via `make ingest-sbom REPO=`. +- `scripts/ingest_capabilities.py` — invoked via `make ingest-capabilities[-all]`. +- `scripts/check_doi.py` — invoked via `make check-doi[-all]`. +- `scripts/validate_repo_adr.py` — invoked manually for canon promotion. +- `scripts/ingest_tpsc.py` — invoked via `make ingest-tpsc[-all]`. + +These are **not in scope** for cron migration — they remain on-demand +operator/CI commands. They become candidates only if we later decide to +run them on a schedule. + +--- + +## 2. Target ActivityDefinitions + +### A. `state-hub-consistency-sweep` + +```yaml +# activity-definitions/the-custodian/state-hub-consistency-sweep.yaml +id: the-custodian.state-hub-consistency-sweep +description: | + Sweep all registered repos: pull, reconcile workplan files ↔ DB, + apply writeback (C-15), respect pull gate (C-16). Mirrors the + existing custodian-sync systemd timer. +trigger: + trigger_type: cron + cron_expression: "*/15 * * * *" + timezone: UTC + misfire_policy: skip # if a prior run is still active, skip +context: + - kind: http_get # confirm state-hub API is reachable + url: http://127.0.0.1:8000/state/health + bind: hub_health +rule: + when: + - "hub_health.status == 'ok'" +instruction: + kind: shell + cmd: >- + cd /home/worsch/the-custodian/state-hub && + .venv/bin/python scripts/consistency_check.py --remote --all --max-seconds 300 + on_failure: log_and_continue # warn-only sweeps must not page on transient failures +``` + +Notes: +- Replaces the `custodian-sync.service` + `custodian-sync.timer` pair. +- Lock semantics (`/tmp/custodian-consistency-remote-all.lock`) stay in + the script — activity-core just sets the cadence. +- Once active, `infra/README.md` is updated to instruct users to delete + the systemd timer. + +### B. `state-hub-stale-task-cleanup` + +```yaml +# activity-definitions/the-custodian/state-hub-stale-task-cleanup.yaml +id: the-custodian.state-hub-stale-task-cleanup +description: | + Daily sweep that cancels tasks still 'todo|in_progress|blocked' inside + completed or archived workstreams. Each cancellation also emits + org.statehub.task.stale on NATS for downstream reaction. +trigger: + trigger_type: cron + cron_expression: "0 3 * * *" + timezone: UTC +instruction: + kind: shell + cmd: >- + cd /home/worsch/the-custodian/state-hub && + .venv/bin/python scripts/cleanup_stale_tasks.py +``` + +Notes: +- Replaces the documented (`Cron example: 0 3 * * * …`) daily run. +- The script already emits NATS events (see CUST-WP-0040 T03), so + downstream ActivityDefinitions can react per-task without a second pass. + +### C. Per-commit consistency sync (currently a git hook) + +The git `post-commit` hook installed by `state-hub/scripts/install_hooks.sh` +is **event-driven, not cron-based**. Migrating it to activity-core would +require a `repo.commit.pushed` event channel that doesn't exist yet. + +Recommendation: **keep the git hook as-is for now**. Revisit once an +event source (e.g. Gitea webhook fed into NATS) is available, at which +point an event-triggered ActivityDefinition can replace it cleanly: + +```yaml +trigger: + trigger_type: event + event_type: org.repo.commit.pushed + filters: + repo_slug: "*" +``` + +--- + +## 3. Required context queries + +Both A and B want to confirm the state hub is reachable before running. +A reusable context source should be added to activity-core for this: + +- `state-hub.health` — `GET /state/health` → `{status, db, ...}` +- (optional) `state-hub.repos` — `GET /repos/?status=active` for the + sweep's per-repo branching, if we later split A into one + ActivityDefinition per repo. + +These belong to the state-hub adapter referenced in the workplan's +out-of-scope note ("/sbom/status context query endpoint" etc.). + +--- + +## 4. Blockers / sequencing + +| Blocker | Owner | Where it lands | +| ------------------------------------------------------------------------- | -------------- | -------------------------- | +| activity-core ActivityDefinition file ingestion + cron executor (WP-0003) | activity-core | activity-core/`src/...` | +| activity-core shell instruction kind with on_failure semantics | activity-core | activity-core/`src/...` | +| state-hub adapter exposing `state-hub.health` as a context source | activity-core | activity-core/adapters/ | + +Until these land, the state hub continues to schedule jobs via systemd +timer + cron entries. + +--- + +## 5. Cutover plan (when ready) + +1. Land ActivityDefinitions A + B in activity-core. +2. Enable them in staging; verify they fire on schedule and produce the + same DB / NATS effects as the current cron entries. +3. Run both in parallel for one week (cron + ActivityDefinition). The + scripts are idempotent — duplicate runs are no-ops on a clean state. +4. Disable the systemd timer: + `systemctl --user disable --now custodian-sync.timer` +5. Remove the cleanup-stale cron entry from `crontab -e`. +6. Update `infra/README.md` to point at the ActivityDefinitions and + archive the systemd unit files. +7. Per-commit hook stays until a `repo.commit.pushed` event exists. + +--- + +## 6. Open questions + +- **Locking**: should activity-core wrap shell instructions with a + process lock (today the script self-locks via `/tmp/...`)? If yes, the + state-hub script's lock can be removed. +- **Failure surfacing**: today systemd journals capture stderr. Where + does an ActivityDefinition's shell stderr go? (logs ? activity + history ?) — needs activity-core docs before cutover. +- **Per-repo split**: do we split A into one ActivityDefinition per + registered repo (so failures don't poison the sweep), or keep the + monolithic `--all` mode? The latter is simpler and matches today's + behaviour; the former gives better observability. diff --git a/state-hub/docs/nats-event-subjects.md b/state-hub/docs/nats-event-subjects.md new file mode 100644 index 0000000..83624d6 --- /dev/null +++ b/state-hub/docs/nats-event-subjects.md @@ -0,0 +1,98 @@ +# NATS Event Subjects — State Hub + +> Part of CUST-WP-0040. Cross-reference: activity-core's +> `event-types/` registry and ADR-001 (event bridge architecture). + +The state hub publishes lifecycle events to NATS JetStream so that +activity-core can drive maintenance and reaction automation declaratively, +via `ActivityDefinition` rules — rather than the state hub creating tasks +itself. + +This document is the authoritative subject naming convention for state hub +events. When adding a new event, add a row to the table below first and +keep the activity-core `event-types/` registry in sync. + +--- + +## Naming convention + +``` +org.{producer}.{noun}.{verb}[.{qualifier}] +``` + +- **`org`** — top-level namespace shared with activity-core (`org.>`) +- **`{producer}`** — the publisher subsystem; the state hub uses `statehub` +- **`{noun}`** — entity the event is about (`repo`, `workstream`, `task`, …) +- **`{verb}`** — past-tense state transition (`registered`, `completed`, `resolved`, …) +- **`{qualifier}`** — optional refinement (e.g. `goal.activated`) + +All segments are lowercase ASCII. No camelCase, no dashes inside segments. + +### Why a `statehub` namespace? + +Activity-core listens to `activity.>` for its internal task lifecycle and +`org.>` for org-wide lifecycle events. Multiple publishers will eventually +share `org.>` (e.g. railiance, kaizen). The `{producer}` segment keeps +those publishers from colliding on the same `{noun}.{verb}` shape. + +--- + +## Published subjects (v1.0) + +| Subject | When | Required attributes | +| ------------------------------------ | ------------------------------------------------------------ | ---------------------------------------------------------------------------------------------------------------------------- | +| `org.statehub.repo.registered` | A new repo is registered via `POST /repos/` | `repo_id`, `repo_slug`, `domain_slug`, `remote_url?`, `local_path?` | +| `org.statehub.workstream.completed` | A workstream transitions to status `completed` | `workstream_id`, `slug`, `title`, `topic_id`, `repo_id?`, `repo_goal_id?` | +| `org.statehub.decision.resolved` | A decision is resolved via `POST /decisions/{id}/resolve` | `decision_id`, `title`, `topic_id?`, `workstream_id?`, `decided_by`, `rationale_snippet` | +| `org.statehub.domain.goal.activated` | A domain goal transitions to `active` | `goal_id`, `domain_id`, `domain_slug`, `title`, `superseded_goal_ids[]` | +| `org.statehub.task.stale` | `scripts/cleanup_stale_tasks.py` cancels an out-of-date task | `task_id`, `workstream_id`, `workstream_status`, `task_title`, `task_status_before` | + +### Envelope shape + +Each message body conforms to the `EventEnvelope` schema in +`api/events/envelope.py`, mirrored from +`activity-core/src/activity_core/models.py`: + +```json +{ + "id": "uuid v4 — stable, used for at-least-once dedup", + "type": "org.statehub.repo.registered", + "version": "1.0", + "timestamp": "2026-05-17T14:00:00Z", + "publisher": "the-custodian/state-hub", + "attributes": { "...": "event-specific" } +} +``` + +`type` matches the subject. `publisher` is always +`the-custodian/state-hub` for events emitted from this repo. + +--- + +## Stream + +State hub events are published into the **`ACTIVITY_EVENTS`** JetStream +(subject filter `org.>`). The stream is owned by activity-core; the state +hub will auto-create it on first publish if it does not exist, so the +publisher works in dev environments without bootstrapping activity-core +first. In production both services point at the same NATS cluster and +activity-core's `EventRouter` consumes the stream durably. + +--- + +## Adding a new event + +1. Pick a subject following the convention above. +2. Add a row to the table in this file (subject, trigger, attributes). +3. Add a matching `event-types/` entry in activity-core. +4. Wire `publish_event(subject, EventEnvelope.new(subject, attributes))` + at the site of the state transition (inside the same DB transaction + only after `await session.commit()` — never publish optimistically). +5. Verify locally: run `nats sub 'org.statehub.>'` while triggering the + transition. + +## Versioning + +`version` is bumped only when an attribute is removed or its semantics +change. Adding optional attributes does **not** require a version bump. +Activity-core consumers must tolerate unknown attribute keys. diff --git a/state-hub/pyproject.toml b/state-hub/pyproject.toml index de76f91..a019b0e 100644 --- a/state-hub/pyproject.toml +++ b/state-hub/pyproject.toml @@ -17,6 +17,7 @@ dependencies = [ "psycopg2-binary>=2.9.0", "llm-connect", "pyyaml>=6.0.3", + "nats-py>=2.7.0", ] [project.scripts] diff --git a/state-hub/scripts/cleanup_stale_tasks.py b/state-hub/scripts/cleanup_stale_tasks.py index 6bca210..5446716 100644 --- a/state-hub/scripts/cleanup_stale_tasks.py +++ b/state-hub/scripts/cleanup_stale_tasks.py @@ -11,12 +11,24 @@ Exit codes: 1 — API unreachable or unexpected error """ +import asyncio import json +import os import sys import urllib.error import urllib.request from datetime import datetime, timezone +# Make the api package importable when running as `python scripts/cleanup_stale_tasks.py` +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +try: + from api.events import EventEnvelope, publish_event, shutdown_publisher +except Exception: # pragma: no cover — event publishing is optional + EventEnvelope = None # type: ignore[assignment] + publish_event = None # type: ignore[assignment] + shutdown_publisher = None # type: ignore[assignment] + API = "http://127.0.0.1:8000" STALE_STATUSES = {"todo", "in_progress", "blocked"} CLOSED_WS_STATUS = {"completed", "archived"} @@ -85,6 +97,7 @@ def main() -> int: cancelled = [] errors = [] + nats_events: list[tuple[str, "EventEnvelope"]] = [] for t in stale: ws = closed_ws[t["workstream_id"]] @@ -100,10 +113,35 @@ def main() -> int: ) cancelled.append(t) print(f" cancelled [{t['priority']:8}] {t['title'][:70]}") + if EventEnvelope is not None: + subject = "org.statehub.task.stale" + nats_events.append(( + subject, + EventEnvelope.new( + subject, + attributes={ + "task_id": t["id"], + "workstream_id": t["workstream_id"], + "workstream_status": ws["status"], + "task_title": t["title"], + "task_status_before": t["status"], + }, + ), + )) except Exception as e: errors.append((t, str(e))) print(f" ERROR {t['title'][:60]} — {e}", file=sys.stderr) + if nats_events and publish_event is not None and shutdown_publisher is not None: + async def _flush_events() -> None: + for subject, env in nats_events: + await publish_event(subject, env) + await shutdown_publisher() + try: + asyncio.run(_flush_events()) + except Exception as e: # pragma: no cover — publishing is best-effort + print(f"[cleanup-stale] WARNING: NATS publish failed — {e}", file=sys.stderr) + # Emit a single progress event summarising the run if cancelled: by_ws: dict[str, list] = {} diff --git a/state-hub/uv.lock b/state-hub/uv.lock index 0e80783..b711cb7 100644 --- a/state-hub/uv.lock +++ b/state-hub/uv.lock @@ -822,6 +822,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a4/8e/469e5a4a2f5855992e425f3cb33804cc07bf18d48f2db061aec61ce50270/more_itertools-10.8.0-py3-none-any.whl", hash = "sha256:52d4362373dcf7c52546bc4af9a86ee7c4579df9a8dc268be0a2f949d376cc9b", size = 69667 }, ] +[[package]] +name = "nats-py" +version = "2.14.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/c3/f8/b956c4621ba88748ed707c52e69f95b7a50c8914e750edca59a5bef84a76/nats_py-2.14.0.tar.gz", hash = "sha256:4ed02cb8e3b55c68074a063aa2687087115d805d1513297da90cb2068fb07bed", size = 120751 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f9/39/0e87753df1072254bac190b33ed34b264f28f6aa9bea0f01b7e818071756/nats_py-2.14.0-py3-none-any.whl", hash = "sha256:4116f5d2233ce16e63c3d5538fa40a5e207f75fcf42a741773929ddf1e29d19d", size = 82259 }, +] + [[package]] name = "openapi-pydantic" version = "0.5.1" @@ -1435,6 +1444,7 @@ dependencies = [ { name = "fastmcp" }, { name = "httpx" }, { name = "llm-connect" }, + { name = "nats-py" }, { name = "psycopg2-binary" }, { name = "pydantic" }, { name = "pydantic-settings" }, @@ -1459,6 +1469,7 @@ requires-dist = [ { name = "fastmcp", specifier = ">=2.0.0" }, { name = "httpx", specifier = ">=0.28.0" }, { name = "llm-connect", editable = "../../llm-connect" }, + { name = "nats-py", specifier = ">=2.7.0" }, { name = "psycopg2-binary", specifier = ">=2.9.0" }, { name = "pydantic", specifier = ">=2.10.0" }, { name = "pydantic-settings", specifier = ">=2.7.0" }, diff --git a/workplans/CUST-WP-0040-state-hub-nats-activity-core-integration.md b/workplans/CUST-WP-0040-state-hub-nats-activity-core-integration.md index 3b91c07..253f877 100644 --- a/workplans/CUST-WP-0040-state-hub-nats-activity-core-integration.md +++ b/workplans/CUST-WP-0040-state-hub-nats-activity-core-integration.md @@ -3,33 +3,33 @@ id: CUST-WP-0040 type: workplan domain: custodian repo: the-custodian -status: active +status: completed state_hub_workstream_id: d8ac100b-a844-46a5-9684-415df0d32539 tasks: - id: T01 title: Connect state hub event publisher to NATS JetStream state_hub_task_id: c4bfa299-54dd-4ede-b5fa-9ae27dce5b2c - status: todo + status: done - id: T02 title: Define NATS subject schema for state hub lifecycle events state_hub_task_id: 2ae236a4-8a16-4998-92dc-83bc48378d3d - status: todo + status: done - id: T03 title: Implement state hub lifecycle events as NATS EventEnvelopes state_hub_task_id: ccb5a3fb-d5e5-4781-9e5d-552ad09477f5 - status: todo + status: done - id: T04 title: Migrate state hub maintenance crons to activity-core ActivityDefinitions (design stub) state_hub_task_id: 933b2a80-cbd6-436f-b092-c39dc6f1c9c4 - status: todo + status: done - id: T05 title: Document state hub → activity-core delegation protocol state_hub_task_id: b261d4b8-4039-4610-aaad-eb45bf3a51de - status: todo + status: done - id: T06 title: Update SCOPE.md to reflect activity-core delegation state_hub_task_id: 356682e6-e608-4f58-b418-cdef2b9435f2 - status: todo + status: done created: "2026-05-14" --- @@ -168,3 +168,8 @@ T06 (independent — SCOPE.md update) - v0.1 (2026-05-14): Stub created by activity-core agent during WP-0003 planning. Local agent to flesh out and implement. +- v0.2 (2026-05-17): All tasks complete. NATS publisher landed at + `state-hub/api/events/`, lifecycle events wired in repos/workstreams/ + decisions/domain_goals routers + cleanup_stale_tasks.py. Subject + schema, cron-migration design stub, delegation protocol docs, and + SCOPE.md updates committed.