diff --git a/api/main.py b/api/main.py index 4327b86..c3d2c89 100644 --- a/api/main.py +++ b/api/main.py @@ -5,7 +5,7 @@ from fastapi.middleware.cors import CORSMiddleware from api.database import engine from api.routers import decisions, extension_points, progress, state, tasks, technical_debt, topics, workstreams, workstream_dependencies -from api.routers import domains, repos, contributions, sbom, policy, domain_goals, repo_goals +from api.routers import domains, repos, contributions, sbom, policy, domain_goals, repo_goals, messages @asynccontextmanager @@ -42,6 +42,7 @@ app.include_router(domain_goals.router) app.include_router(repo_goals.router) app.include_router(contributions.router) app.include_router(sbom.router) +app.include_router(messages.router) app.include_router(state.router) app.include_router(policy.router) diff --git a/api/models/__init__.py b/api/models/__init__.py index e8c925d..4b8a7a5 100644 --- a/api/models/__init__.py +++ b/api/models/__init__.py @@ -14,6 +14,7 @@ from api.models.technical_debt import TechnicalDebt, TDStatus from api.models.contribution import Contribution, ContributionType, ContributionStatus from api.models.sbom_snapshot import SBOMSnapshot from api.models.sbom_entry import SBOMEntry, Ecosystem +from api.models.agent_message import AgentMessage __all__ = [ "Base", @@ -32,4 +33,5 @@ __all__ = [ "Contribution", "ContributionType", "ContributionStatus", "SBOMSnapshot", "SBOMEntry", "Ecosystem", + "AgentMessage", ] diff --git a/api/models/agent_message.py b/api/models/agent_message.py new file mode 100644 index 0000000..d521165 --- /dev/null +++ b/api/models/agent_message.py @@ -0,0 +1,44 @@ +import uuid +from datetime import datetime + +from sqlalchemy import DateTime, ForeignKey, String, Text, text +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from api.models.base import Base, new_uuid + + +class AgentMessage(Base): + __tablename__ = "agent_messages" + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=new_uuid + ) + from_agent: Mapped[str] = mapped_column(String(100), nullable=False) + to_agent: Mapped[str] = mapped_column(String(100), nullable=False, index=True) + subject: Mapped[str] = mapped_column(String(500), nullable=False) + body: Mapped[str] = mapped_column(Text, nullable=False) + thread_id: Mapped[uuid.UUID | None] = mapped_column( + UUID(as_uuid=True), + ForeignKey("agent_messages.id", ondelete="SET NULL"), + nullable=True, + index=True, + ) + read_at: Mapped[datetime | None] = mapped_column( + DateTime(timezone=True), nullable=True + ) + archived_at: Mapped[datetime | None] = mapped_column( + DateTime(timezone=True), nullable=True + ) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + server_default=text("now()"), + nullable=False, + ) + + thread_root: Mapped["AgentMessage | None"] = relationship( + "AgentMessage", + remote_side="AgentMessage.id", + foreign_keys=[thread_id], + lazy="select", + ) diff --git a/api/routers/messages.py b/api/routers/messages.py new file mode 100644 index 0000000..5b31604 --- /dev/null +++ b/api/routers/messages.py @@ -0,0 +1,138 @@ +import uuid +from datetime import datetime, timezone + +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from api.database import get_session +from api.models.agent_message import AgentMessage +from api.schemas.agent_message import MessageCreate, MessageRead, MessageReply + +router = APIRouter(prefix="/messages", tags=["messages"]) + + +@router.post("/", response_model=MessageRead, status_code=status.HTTP_201_CREATED) +async def send_message( + body: MessageCreate, + session: AsyncSession = Depends(get_session), +) -> AgentMessage: + """Send a message from one agent to another (or 'broadcast').""" + if body.thread_id: + root = await session.get(AgentMessage, body.thread_id) + if root is None: + raise HTTPException(status_code=404, detail=f"Thread root {body.thread_id} not found") + + msg = AgentMessage( + from_agent=body.from_agent, + to_agent=body.to_agent, + subject=body.subject, + body=body.body, + thread_id=body.thread_id, + ) + session.add(msg) + await session.commit() + await session.refresh(msg) + return msg + + +@router.get("/", response_model=list[MessageRead]) +async def list_messages( + to_agent: str | None = None, + from_agent: str | None = None, + unread_only: bool = False, + limit: int = 50, + session: AsyncSession = Depends(get_session), +) -> list[AgentMessage]: + """List messages. Filter by recipient, sender, or unread status.""" + q = select(AgentMessage).where(AgentMessage.archived_at.is_(None)) + if to_agent: + q = q.where( + (AgentMessage.to_agent == to_agent) | (AgentMessage.to_agent == "broadcast") + ) + if from_agent: + q = q.where(AgentMessage.from_agent == from_agent) + if unread_only: + q = q.where(AgentMessage.read_at.is_(None)) + q = q.order_by(AgentMessage.created_at.desc()).limit(limit) + result = await session.execute(q) + return list(result.scalars().all()) + + +@router.get("/thread/{thread_id}", response_model=list[MessageRead]) +async def get_thread( + thread_id: uuid.UUID, + session: AsyncSession = Depends(get_session), +) -> list[AgentMessage]: + """Get all messages in a thread (root + replies), oldest first.""" + # Include the root message itself + q = select(AgentMessage).where( + (AgentMessage.id == thread_id) | (AgentMessage.thread_id == thread_id) + ).order_by(AgentMessage.created_at) + result = await session.execute(q) + return list(result.scalars().all()) + + +@router.patch("/{message_id}/read", response_model=MessageRead) +async def mark_read( + message_id: uuid.UUID, + session: AsyncSession = Depends(get_session), +) -> AgentMessage: + """Mark a message as read.""" + msg = await _get_message(message_id, session) + if msg.read_at is None: + msg.read_at = datetime.now(timezone.utc) + await session.commit() + await session.refresh(msg) + return msg + + +@router.patch("/{message_id}/archive", response_model=MessageRead) +async def archive_message( + message_id: uuid.UUID, + session: AsyncSession = Depends(get_session), +) -> AgentMessage: + """Archive a message (soft-delete).""" + msg = await _get_message(message_id, session) + msg.archived_at = datetime.now(timezone.utc) + if msg.read_at is None: + msg.read_at = msg.archived_at + await session.commit() + await session.refresh(msg) + return msg + + +@router.post("/{message_id}/reply", response_model=MessageRead, status_code=status.HTTP_201_CREATED) +async def reply_to_message( + message_id: uuid.UUID, + body: MessageReply, + session: AsyncSession = Depends(get_session), +) -> AgentMessage: + """Reply to a message. Marks the original as read and creates a reply in the same thread.""" + original = await _get_message(message_id, session) + + # Mark original as read + if original.read_at is None: + original.read_at = datetime.now(timezone.utc) + + # Thread root is either the original's thread_id or the original itself + thread_root = original.thread_id or original.id + + reply = AgentMessage( + from_agent=body.from_agent, + to_agent=original.from_agent, + subject=f"Re: {original.subject}", + body=body.body, + thread_id=thread_root, + ) + session.add(reply) + await session.commit() + await session.refresh(reply) + return reply + + +async def _get_message(message_id: uuid.UUID, session: AsyncSession) -> AgentMessage: + msg = await session.get(AgentMessage, message_id) + if msg is None: + raise HTTPException(status_code=404, detail=f"Message {message_id} not found") + return msg diff --git a/api/schemas/agent_message.py b/api/schemas/agent_message.py new file mode 100644 index 0000000..1721116 --- /dev/null +++ b/api/schemas/agent_message.py @@ -0,0 +1,30 @@ +import uuid +from datetime import datetime + +from pydantic import BaseModel, ConfigDict + + +class MessageCreate(BaseModel): + from_agent: str + to_agent: str + subject: str + body: str + thread_id: uuid.UUID | None = None + + +class MessageReply(BaseModel): + from_agent: str + body: str + + +class MessageRead(BaseModel): + model_config = ConfigDict(from_attributes=True) + id: uuid.UUID + from_agent: str + to_agent: str + subject: str + body: str + thread_id: uuid.UUID | None = None + read_at: datetime | None = None + archived_at: datetime | None = None + created_at: datetime diff --git a/dashboard/observablehq.config.js b/dashboard/observablehq.config.js index a7552e4..1074879 100644 --- a/dashboard/observablehq.config.js +++ b/dashboard/observablehq.config.js @@ -27,6 +27,7 @@ export default { { name: "Contributions", path: "/contributions" }, { name: "SBOM", path: "/sbom" }, { name: "Repo Sync", path: "/repo-sync" }, + { name: "Inbox", path: "/inbox" }, { name: "Progress", path: "/progress" }, // ── Policy ──────────────────────────────────────────────────────────────── { diff --git a/dashboard/src/data/messages.json.py b/dashboard/src/data/messages.json.py new file mode 100644 index 0000000..05fa84d --- /dev/null +++ b/dashboard/src/data/messages.json.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python3 +"""Observable data loader: fetches /messages/ from the API.""" +import json +import os +import urllib.request +import urllib.error + +API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000").rstrip("/") + +try: + with urllib.request.urlopen(f"{API_BASE}/messages/?limit=100", timeout=10) as resp: + data = json.loads(resp.read()) + print(json.dumps(data)) +except urllib.error.URLError as e: + print(json.dumps({"error": str(e), "messages": []})) diff --git a/dashboard/src/inbox.md b/dashboard/src/inbox.md new file mode 100644 index 0000000..2b9d511 --- /dev/null +++ b/dashboard/src/inbox.md @@ -0,0 +1,187 @@ +--- +title: Agent Inbox +--- + +```js +import {API, POLL} from "./components/config.js"; +``` + +```js +// Live poll: messages list +const inboxState = (async function*() { + while (true) { + let messages = [], ok = false; + try { + const resp = await fetch(`${API}/messages/?limit=100`); + ok = resp.ok; + if (ok) messages = await resp.json(); + } catch {} + yield {messages, ok, ts: new Date()}; + await new Promise(res => setTimeout(res, POLL)); + } +})(); +``` + +```js +const messages = inboxState.messages ?? []; +const _ok = inboxState.ok ?? false; +const _ts = inboxState.ts; + +const unread = messages.filter(m => !m.read_at && !m.archived_at); +const read = messages.filter(m => m.read_at && !m.archived_at); +const archived = messages.filter(m => m.archived_at); + +// Group unread by agent for KPI +const agentCounts = {}; +for (const m of unread) { + agentCounts[m.to_agent] = (agentCounts[m.to_agent] ?? 0) + 1; +} +const topAgents = Object.entries(agentCounts).sort((a, b) => b[1] - a[1]).slice(0, 4); +``` + +# Agent Inbox + +```js +import {injectTocTop} from "./components/toc-sidebar.js"; +import {withDocHelp} from "./components/doc-overlay.js"; + +const _kpiBox = html`
make api`}
+${m.body}
+ No unread messages.
`); +} else { + display(html`No read messages.
`); +} else { + display(html`No archived messages.
`); +} else { + display(html`