From 4b3cb1b0396ecb9ee078e83563cb44d00aae365e Mon Sep 17 00:00:00 2001 From: tegwick Date: Mon, 16 Mar 2026 02:55:45 +0100 Subject: [PATCH] feat(CUST-WP-0015): implement agent inbox for inter-agent coordination Adds a message-passing layer to state-hub so Claude instances can coordinate across sessions without polling shared progress events. - Migration f3a4b5c6d7e8: agent_messages table with thread support - FastAPI router: POST/GET /messages/, thread view, mark-read, archive, reply - 4 MCP tools: send_message, get_messages, mark_message_read, reply_to_message - Observable dashboard: /inbox page with unread/read/archived sections + KPI - CLAUDE.md updates: global, custodian, marki-docx, activity-core, template - TOOLS.md: Agent Inbox tools section documented Co-Authored-By: Claude Sonnet 4.6 --- api/main.py | 3 +- api/models/__init__.py | 2 + api/models/agent_message.py | 44 +++++ api/routers/messages.py | 138 +++++++++++++ api/schemas/agent_message.py | 30 +++ dashboard/observablehq.config.js | 1 + dashboard/src/data/messages.json.py | 15 ++ dashboard/src/inbox.md | 187 ++++++++++++++++++ mcp_server/TOOLS.md | 19 ++ mcp_server/server.py | 68 +++++++ .../f3a4b5c6d7e8_add_agent_messages.py | 48 +++++ 11 files changed, 554 insertions(+), 1 deletion(-) create mode 100644 api/models/agent_message.py create mode 100644 api/routers/messages.py create mode 100644 api/schemas/agent_message.py create mode 100644 dashboard/src/data/messages.json.py create mode 100644 dashboard/src/inbox.md create mode 100644 migrations/versions/f3a4b5c6d7e8_add_agent_messages.py diff --git a/api/main.py b/api/main.py index 4327b86..c3d2c89 100644 --- a/api/main.py +++ b/api/main.py @@ -5,7 +5,7 @@ from fastapi.middleware.cors import CORSMiddleware from api.database import engine from api.routers import decisions, extension_points, progress, state, tasks, technical_debt, topics, workstreams, workstream_dependencies -from api.routers import domains, repos, contributions, sbom, policy, domain_goals, repo_goals +from api.routers import domains, repos, contributions, sbom, policy, domain_goals, repo_goals, messages @asynccontextmanager @@ -42,6 +42,7 @@ app.include_router(domain_goals.router) app.include_router(repo_goals.router) app.include_router(contributions.router) app.include_router(sbom.router) +app.include_router(messages.router) app.include_router(state.router) app.include_router(policy.router) diff --git a/api/models/__init__.py b/api/models/__init__.py index e8c925d..4b8a7a5 100644 --- a/api/models/__init__.py +++ b/api/models/__init__.py @@ -14,6 +14,7 @@ from api.models.technical_debt import TechnicalDebt, TDStatus from api.models.contribution import Contribution, ContributionType, ContributionStatus from api.models.sbom_snapshot import SBOMSnapshot from api.models.sbom_entry import SBOMEntry, Ecosystem +from api.models.agent_message import AgentMessage __all__ = [ "Base", @@ -32,4 +33,5 @@ __all__ = [ "Contribution", "ContributionType", "ContributionStatus", "SBOMSnapshot", "SBOMEntry", "Ecosystem", + "AgentMessage", ] diff --git a/api/models/agent_message.py b/api/models/agent_message.py new file mode 100644 index 0000000..d521165 --- /dev/null +++ b/api/models/agent_message.py @@ -0,0 +1,44 @@ +import uuid +from datetime import datetime + +from sqlalchemy import DateTime, ForeignKey, String, Text, text +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from api.models.base import Base, new_uuid + + +class AgentMessage(Base): + __tablename__ = "agent_messages" + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=new_uuid + ) + from_agent: Mapped[str] = mapped_column(String(100), nullable=False) + to_agent: Mapped[str] = mapped_column(String(100), nullable=False, index=True) + subject: Mapped[str] = mapped_column(String(500), nullable=False) + body: Mapped[str] = mapped_column(Text, nullable=False) + thread_id: Mapped[uuid.UUID | None] = mapped_column( + UUID(as_uuid=True), + ForeignKey("agent_messages.id", ondelete="SET NULL"), + nullable=True, + index=True, + ) + read_at: Mapped[datetime | None] = mapped_column( + DateTime(timezone=True), nullable=True + ) + archived_at: Mapped[datetime | None] = mapped_column( + DateTime(timezone=True), nullable=True + ) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + server_default=text("now()"), + nullable=False, + ) + + thread_root: Mapped["AgentMessage | None"] = relationship( + "AgentMessage", + remote_side="AgentMessage.id", + foreign_keys=[thread_id], + lazy="select", + ) diff --git a/api/routers/messages.py b/api/routers/messages.py new file mode 100644 index 0000000..5b31604 --- /dev/null +++ b/api/routers/messages.py @@ -0,0 +1,138 @@ +import uuid +from datetime import datetime, timezone + +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from api.database import get_session +from api.models.agent_message import AgentMessage +from api.schemas.agent_message import MessageCreate, MessageRead, MessageReply + +router = APIRouter(prefix="/messages", tags=["messages"]) + + +@router.post("/", response_model=MessageRead, status_code=status.HTTP_201_CREATED) +async def send_message( + body: MessageCreate, + session: AsyncSession = Depends(get_session), +) -> AgentMessage: + """Send a message from one agent to another (or 'broadcast').""" + if body.thread_id: + root = await session.get(AgentMessage, body.thread_id) + if root is None: + raise HTTPException(status_code=404, detail=f"Thread root {body.thread_id} not found") + + msg = AgentMessage( + from_agent=body.from_agent, + to_agent=body.to_agent, + subject=body.subject, + body=body.body, + thread_id=body.thread_id, + ) + session.add(msg) + await session.commit() + await session.refresh(msg) + return msg + + +@router.get("/", response_model=list[MessageRead]) +async def list_messages( + to_agent: str | None = None, + from_agent: str | None = None, + unread_only: bool = False, + limit: int = 50, + session: AsyncSession = Depends(get_session), +) -> list[AgentMessage]: + """List messages. Filter by recipient, sender, or unread status.""" + q = select(AgentMessage).where(AgentMessage.archived_at.is_(None)) + if to_agent: + q = q.where( + (AgentMessage.to_agent == to_agent) | (AgentMessage.to_agent == "broadcast") + ) + if from_agent: + q = q.where(AgentMessage.from_agent == from_agent) + if unread_only: + q = q.where(AgentMessage.read_at.is_(None)) + q = q.order_by(AgentMessage.created_at.desc()).limit(limit) + result = await session.execute(q) + return list(result.scalars().all()) + + +@router.get("/thread/{thread_id}", response_model=list[MessageRead]) +async def get_thread( + thread_id: uuid.UUID, + session: AsyncSession = Depends(get_session), +) -> list[AgentMessage]: + """Get all messages in a thread (root + replies), oldest first.""" + # Include the root message itself + q = select(AgentMessage).where( + (AgentMessage.id == thread_id) | (AgentMessage.thread_id == thread_id) + ).order_by(AgentMessage.created_at) + result = await session.execute(q) + return list(result.scalars().all()) + + +@router.patch("/{message_id}/read", response_model=MessageRead) +async def mark_read( + message_id: uuid.UUID, + session: AsyncSession = Depends(get_session), +) -> AgentMessage: + """Mark a message as read.""" + msg = await _get_message(message_id, session) + if msg.read_at is None: + msg.read_at = datetime.now(timezone.utc) + await session.commit() + await session.refresh(msg) + return msg + + +@router.patch("/{message_id}/archive", response_model=MessageRead) +async def archive_message( + message_id: uuid.UUID, + session: AsyncSession = Depends(get_session), +) -> AgentMessage: + """Archive a message (soft-delete).""" + msg = await _get_message(message_id, session) + msg.archived_at = datetime.now(timezone.utc) + if msg.read_at is None: + msg.read_at = msg.archived_at + await session.commit() + await session.refresh(msg) + return msg + + +@router.post("/{message_id}/reply", response_model=MessageRead, status_code=status.HTTP_201_CREATED) +async def reply_to_message( + message_id: uuid.UUID, + body: MessageReply, + session: AsyncSession = Depends(get_session), +) -> AgentMessage: + """Reply to a message. Marks the original as read and creates a reply in the same thread.""" + original = await _get_message(message_id, session) + + # Mark original as read + if original.read_at is None: + original.read_at = datetime.now(timezone.utc) + + # Thread root is either the original's thread_id or the original itself + thread_root = original.thread_id or original.id + + reply = AgentMessage( + from_agent=body.from_agent, + to_agent=original.from_agent, + subject=f"Re: {original.subject}", + body=body.body, + thread_id=thread_root, + ) + session.add(reply) + await session.commit() + await session.refresh(reply) + return reply + + +async def _get_message(message_id: uuid.UUID, session: AsyncSession) -> AgentMessage: + msg = await session.get(AgentMessage, message_id) + if msg is None: + raise HTTPException(status_code=404, detail=f"Message {message_id} not found") + return msg diff --git a/api/schemas/agent_message.py b/api/schemas/agent_message.py new file mode 100644 index 0000000..1721116 --- /dev/null +++ b/api/schemas/agent_message.py @@ -0,0 +1,30 @@ +import uuid +from datetime import datetime + +from pydantic import BaseModel, ConfigDict + + +class MessageCreate(BaseModel): + from_agent: str + to_agent: str + subject: str + body: str + thread_id: uuid.UUID | None = None + + +class MessageReply(BaseModel): + from_agent: str + body: str + + +class MessageRead(BaseModel): + model_config = ConfigDict(from_attributes=True) + id: uuid.UUID + from_agent: str + to_agent: str + subject: str + body: str + thread_id: uuid.UUID | None = None + read_at: datetime | None = None + archived_at: datetime | None = None + created_at: datetime diff --git a/dashboard/observablehq.config.js b/dashboard/observablehq.config.js index a7552e4..1074879 100644 --- a/dashboard/observablehq.config.js +++ b/dashboard/observablehq.config.js @@ -27,6 +27,7 @@ export default { { name: "Contributions", path: "/contributions" }, { name: "SBOM", path: "/sbom" }, { name: "Repo Sync", path: "/repo-sync" }, + { name: "Inbox", path: "/inbox" }, { name: "Progress", path: "/progress" }, // ── Policy ──────────────────────────────────────────────────────────────── { diff --git a/dashboard/src/data/messages.json.py b/dashboard/src/data/messages.json.py new file mode 100644 index 0000000..05fa84d --- /dev/null +++ b/dashboard/src/data/messages.json.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python3 +"""Observable data loader: fetches /messages/ from the API.""" +import json +import os +import urllib.request +import urllib.error + +API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000").rstrip("/") + +try: + with urllib.request.urlopen(f"{API_BASE}/messages/?limit=100", timeout=10) as resp: + data = json.loads(resp.read()) + print(json.dumps(data)) +except urllib.error.URLError as e: + print(json.dumps({"error": str(e), "messages": []})) diff --git a/dashboard/src/inbox.md b/dashboard/src/inbox.md new file mode 100644 index 0000000..2b9d511 --- /dev/null +++ b/dashboard/src/inbox.md @@ -0,0 +1,187 @@ +--- +title: Agent Inbox +--- + +```js +import {API, POLL} from "./components/config.js"; +``` + +```js +// Live poll: messages list +const inboxState = (async function*() { + while (true) { + let messages = [], ok = false; + try { + const resp = await fetch(`${API}/messages/?limit=100`); + ok = resp.ok; + if (ok) messages = await resp.json(); + } catch {} + yield {messages, ok, ts: new Date()}; + await new Promise(res => setTimeout(res, POLL)); + } +})(); +``` + +```js +const messages = inboxState.messages ?? []; +const _ok = inboxState.ok ?? false; +const _ts = inboxState.ts; + +const unread = messages.filter(m => !m.read_at && !m.archived_at); +const read = messages.filter(m => m.read_at && !m.archived_at); +const archived = messages.filter(m => m.archived_at); + +// Group unread by agent for KPI +const agentCounts = {}; +for (const m of unread) { + agentCounts[m.to_agent] = (agentCounts[m.to_agent] ?? 0) + 1; +} +const topAgents = Object.entries(agentCounts).sort((a, b) => b[1] - a[1]).slice(0, 4); +``` + +# Agent Inbox + +```js +import {injectTocTop} from "./components/toc-sidebar.js"; +import {withDocHelp} from "./components/doc-overlay.js"; + +const _kpiBox = html`
+
Inbox
+
+ unread +
+
${unread.length}
+
+
+
+ total +
+
${messages.length}
+
+
+ ${topAgents.map(([agent, count]) => html` +
+ ${agent} +
+
${count}
+
+
`)} +
`; + +const _liveEl = html`
+ + ${_ok + ? `Live · updated ${_ts?.toLocaleTimeString()}` + : html`Offline — run: make api`} +
`; + +injectTocTop("inbox-kpi-box", _kpiBox); +injectTocTop("live-indicator", _liveEl); +``` + +Inter-agent coordination messages. Agents send messages via `send_message()` MCP tool and read them via `get_messages()`. + +--- + +## Unread + +```js +function fmtDate(s) { + if (!s) return "—"; + const d = new Date(s); + return d.toLocaleDateString() + " " + d.toLocaleTimeString([], {hour: "2-digit", minute: "2-digit"}); +} + +function renderMessage(m, showMarkRead = false) { + const isBroadcast = m.to_agent === "broadcast"; + const borderColor = !m.read_at ? "#d97706" : "#6b7280"; + + async function onMarkRead() { + await fetch(`${API}/messages/${m.id}/read`, {method: "PATCH"}); + } + + async function onArchive() { + await fetch(`${API}/messages/${m.id}/archive`, {method: "PATCH"}); + } + + return html`
+
+ ${m.from_agent} + + ${m.to_agent} + ${fmtDate(m.created_at)} +
+ ${showMarkRead ? html`` : ""} + +
+
+
${m.subject}
+
+ body +
${m.body}
+
+ ${m.thread_id ? html`
thread: ${m.thread_id}
` : ""} +
`; +} + +if (unread.length === 0) { + display(html`

No unread messages.

`); +} else { + display(html`
${unread.map(m => renderMessage(m, true))}
`); +} +``` + +--- + +## Read + +```js +if (read.length === 0) { + display(html`

No read messages.

`); +} else { + display(html`
${read.map(m => renderMessage(m, false))}
`); +} +``` + +--- + +## Archived + +```js +if (archived.length === 0) { + display(html`

No archived messages.

`); +} else { + display(html`
${archived.map(m => renderMessage(m, false))}
`); +} +``` + + diff --git a/mcp_server/TOOLS.md b/mcp_server/TOOLS.md index e50cf0d..edbe121 100644 --- a/mcp_server/TOOLS.md +++ b/mcp_server/TOOLS.md @@ -107,6 +107,25 @@ Domains are now first-class DB entities. Use `list_domains()` to discover availa --- +## Agent Inbox Tools + +Inter-agent coordination via shared message board. Check inbox at session start; +send messages to coordinate across Claude instances. + +Agent names: use the repo slug (e.g. `"marki-docx"`, `"railiance"`) or `"hub"` for the custodian agent. +Use `"broadcast"` as `to_agent` to send to all agents. + +| Tool | Key Args | When to use | +|------|----------|-------------| +| `get_messages(to_agent?, from_agent?, unread_only?, limit?)` | `to_agent`: your agent name; `unread_only`: True recommended at session start | Check for pending coordination messages. | +| `send_message(from_agent, to_agent, subject, body, thread_id?)` | all except `thread_id` required | Send a coordination message to another agent (or broadcast). | +| `mark_message_read(message_id)` | `message_id`: UUID | Mark a message as read after acting on it. | +| `reply_to_message(message_id, from_agent, body)` | all required | Reply in-thread; marks original as read. | + +Dashboard: `http://localhost:3000/inbox` + +--- + ## Domain Slugs Run `list_domains()` to get the live list. Default 6: `custodian` · `railiance` · `markitect` · `coulomb_social` · `personhood` · `foerster_capabilities` diff --git a/mcp_server/server.py b/mcp_server/server.py index 836a6bb..83225c5 100644 --- a/mcp_server/server.py +++ b/mcp_server/server.py @@ -1434,6 +1434,74 @@ def get_repo_dispatch(repo_slug: str) -> str: return json.dumps(_get(f"/repos/{repo_slug}/dispatch"), indent=2) +# --------------------------------------------------------------------------- +# Agent Inbox (inter-agent message passing) +# --------------------------------------------------------------------------- + +@mcp.tool() +def send_message(from_agent: str, to_agent: str, subject: str, body: str, thread_id: str | None = None) -> str: + """Send a message from one agent to another (or 'broadcast' for all). + + Use this to coordinate with other Claude instances — e.g. a worker agent + reporting status back to the orchestrator, or the hub agent dispatching + instructions to a domain agent. + + Args: + from_agent: Sender identifier (e.g. 'hub', 'marki-docx', 'railiance') + to_agent: Recipient identifier or 'broadcast' for all agents + subject: Short subject line (max 500 chars) + body: Full message body (markdown supported) + thread_id: UUID of the root message to create a thread (optional) + """ + payload: dict = {"from_agent": from_agent, "to_agent": to_agent, "subject": subject, "body": body} + if thread_id: + payload["thread_id"] = thread_id + msg = _post("/messages/", payload) + return json.dumps(msg, indent=2) + + +@mcp.tool() +def get_messages(to_agent: str | None = None, from_agent: str | None = None, unread_only: bool = False, limit: int = 20) -> str: + """List messages in the agent inbox. + + Call this at session start to check for pending coordination messages. + + Args: + to_agent: Filter by recipient (your agent name, or omit for all) + from_agent: Filter by sender (optional) + unread_only: Return only unread messages (default: False) + limit: Maximum number of messages to return (default: 20) + """ + params: dict = {"limit": limit, "unread_only": unread_only} + if to_agent: + params["to_agent"] = to_agent + if from_agent: + params["from_agent"] = from_agent + return json.dumps(_get("/messages/", params), indent=2) + + +@mcp.tool() +def mark_message_read(message_id: str) -> str: + """Mark an inbox message as read. + + Args: + message_id: UUID of the message to mark as read + """ + return json.dumps(_patch(f"/messages/{message_id}/read", {}), indent=2) + + +@mcp.tool() +def reply_to_message(message_id: str, from_agent: str, body: str) -> str: + """Reply to a message. Marks the original as read and creates a reply in the same thread. + + Args: + message_id: UUID of the message to reply to + from_agent: Your agent identifier + body: Reply body (markdown supported) + """ + return json.dumps(_post(f"/messages/{message_id}/reply", {"from_agent": from_agent, "body": body}), indent=2) + + # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- diff --git a/migrations/versions/f3a4b5c6d7e8_add_agent_messages.py b/migrations/versions/f3a4b5c6d7e8_add_agent_messages.py new file mode 100644 index 0000000..1650679 --- /dev/null +++ b/migrations/versions/f3a4b5c6d7e8_add_agent_messages.py @@ -0,0 +1,48 @@ +"""Add agent_messages table for inter-agent coordination + +Revision ID: f3a4b5c6d7e8 +Revises: e2f3a4b5c6d7 +Create Date: 2026-03-16 00:00:00.000000 +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects.postgresql import UUID + +revision: str = "f3a4b5c6d7e8" +down_revision: Union[str, None] = "e2f3a4b5c6d7" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "agent_messages", + sa.Column("id", UUID(as_uuid=True), primary_key=True, + server_default=sa.text("gen_random_uuid()")), + sa.Column("from_agent", sa.String(100), nullable=False), + sa.Column("to_agent", sa.String(100), nullable=False), + sa.Column("subject", sa.String(500), nullable=False), + sa.Column("body", sa.Text, nullable=False), + sa.Column("thread_id", UUID(as_uuid=True), + sa.ForeignKey("agent_messages.id", ondelete="SET NULL"), + nullable=True), + sa.Column("read_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("archived_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), + server_default=sa.text("now()"), nullable=False), + ) + op.create_index("ix_agent_messages_to_agent_read_at", + "agent_messages", ["to_agent", "read_at"]) + op.create_index("ix_agent_messages_thread_id", + "agent_messages", ["thread_id"]) + op.create_index("ix_agent_messages_created_at", + "agent_messages", ["created_at"]) + + +def downgrade() -> None: + op.drop_index("ix_agent_messages_created_at", "agent_messages") + op.drop_index("ix_agent_messages_thread_id", "agent_messages") + op.drop_index("ix_agent_messages_to_agent_read_at", "agent_messages") + op.drop_table("agent_messages")