feat(capability-requests): add cross-domain capability catalog and request routing

Introduces a capability catalog (CUST-WP-0022) so domains can advertise what
they provide and agents can request capabilities from other domains with
auto-routing, lifecycle tracking, and task-unblocking on completion.

- New models: CapabilityCatalog, CapabilityRequest with full lifecycle
  (requested → accepted → in_progress → ready_for_review → completed/rejected/withdrawn)
- Migration i6d7e8f9a0b1: capability_catalog + capability_requests tables
- Router /capability-catalog and /capability-requests with accept/status endpoints
- 7 new MCP tools: register_capability, list_capabilities, request_capability,
  accept_capability_request, update_capability_request_status,
  list_capability_requests, get_capability_request
- StateSummary gains open_capability_requests count
- Dashboard: capability-requests.md page + docs/capabilities.md + docs/scope.md
- SCOPE.md: three seed capabilities documented (MCP registration, state tracking, SBOM)
- scope.template: Provided Capabilities section with example block
- scripts/ingest_capabilities.py + make ingest-capabilities[/-all] targets

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-19 21:07:50 +01:00
parent 7bf3cf583a
commit d45234531b
18 changed files with 2105 additions and 1 deletions

View File

@@ -6,7 +6,7 @@ from fastapi.middleware.cors import CORSMiddleware
from api.database import engine
from api.routers import decisions, extension_points, progress, state, tasks, technical_debt, topics, workstreams, workstream_dependencies
from api.routers import domains, repos, contributions, sbom, policy, domain_goals, repo_goals, messages
from api.routers import domains, repos, contributions, sbom, policy, domain_goals, repo_goals, messages, capability_requests
@asynccontextmanager
@@ -47,6 +47,7 @@ app.include_router(repo_goals.router)
app.include_router(contributions.router)
app.include_router(sbom.router)
app.include_router(messages.router)
app.include_router(capability_requests.router)
app.include_router(state.router)
app.include_router(policy.router)

View File

@@ -15,6 +15,8 @@ from api.models.contribution import Contribution, ContributionType, Contribution
from api.models.sbom_snapshot import SBOMSnapshot
from api.models.sbom_entry import SBOMEntry, Ecosystem
from api.models.agent_message import AgentMessage
from api.models.capability_catalog import CapabilityCatalog
from api.models.capability_request import CapabilityRequest
__all__ = [
"Base",
@@ -34,4 +36,6 @@ __all__ = [
"SBOMSnapshot",
"SBOMEntry", "Ecosystem",
"AgentMessage",
"CapabilityCatalog",
"CapabilityRequest",
]

View File

@@ -0,0 +1,39 @@
import uuid
from sqlalchemy import ARRAY, ForeignKey, String, Text, UniqueConstraint
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from api.models.base import Base, TimestampMixin, new_uuid
class CapabilityCatalog(Base, TimestampMixin):
__tablename__ = "capability_catalog"
__table_args__ = (
UniqueConstraint("domain_id", "capability_type", "title", name="uq_catalog_domain_type_title"),
)
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=new_uuid
)
domain_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("domains.id", ondelete="RESTRICT"),
nullable=False,
index=True,
)
capability_type: Mapped[str] = mapped_column(String(50), nullable=False)
title: Mapped[str] = mapped_column(String(255), nullable=False)
description: Mapped[str | None] = mapped_column(Text, nullable=True)
keywords: Mapped[list[str]] = mapped_column(
ARRAY(String), nullable=False, server_default="{}"
)
status: Mapped[str] = mapped_column(
String(20), nullable=False, default="active", server_default="active"
)
domain: Mapped["Domain"] = relationship("Domain", lazy="selectin") # noqa: F821
@property
def domain_slug(self) -> str:
return self.domain.slug if self.domain is not None else ""

View File

@@ -0,0 +1,93 @@
import uuid
from datetime import datetime
from sqlalchemy import DateTime, ForeignKey, String, Text
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from api.models.base import Base, TimestampMixin, new_uuid
class CapabilityRequest(Base, TimestampMixin):
__tablename__ = "capability_requests"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=new_uuid
)
title: Mapped[str] = mapped_column(String(500), nullable=False)
description: Mapped[str | None] = mapped_column(Text, nullable=True)
capability_type: Mapped[str] = mapped_column(String(50), nullable=False)
priority: Mapped[str] = mapped_column(
String(20), nullable=False, default="medium", server_default="medium"
)
status: Mapped[str] = mapped_column(
String(20), nullable=False, default="requested", server_default="requested"
)
# Requester side
requesting_domain_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("domains.id", ondelete="RESTRICT"),
nullable=False,
index=True,
)
requesting_workstream_id: Mapped[uuid.UUID | None] = mapped_column(
UUID(as_uuid=True),
ForeignKey("workstreams.id", ondelete="SET NULL"),
nullable=True,
)
requesting_agent: Mapped[str] = mapped_column(String(100), nullable=False)
# Fulfiller side (populated on accept / auto-route)
fulfilling_domain_id: Mapped[uuid.UUID | None] = mapped_column(
UUID(as_uuid=True),
ForeignKey("domains.id", ondelete="SET NULL"),
nullable=True,
index=True,
)
fulfilling_workstream_id: Mapped[uuid.UUID | None] = mapped_column(
UUID(as_uuid=True),
ForeignKey("workstreams.id", ondelete="SET NULL"),
nullable=True,
)
fulfilling_agent: Mapped[str | None] = mapped_column(String(100), nullable=True)
# Links
blocking_task_id: Mapped[uuid.UUID | None] = mapped_column(
UUID(as_uuid=True),
ForeignKey("tasks.id", ondelete="SET NULL"),
nullable=True,
)
catalog_entry_id: Mapped[uuid.UUID | None] = mapped_column(
UUID(as_uuid=True),
ForeignKey("capability_catalog.id", ondelete="SET NULL"),
nullable=True,
)
resolution_note: Mapped[str | None] = mapped_column(Text, nullable=True)
accepted_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True
)
completed_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True
)
# Relationships
requesting_domain: Mapped["Domain"] = relationship( # noqa: F821
"Domain", foreign_keys=[requesting_domain_id], lazy="selectin"
)
fulfilling_domain: Mapped["Domain | None"] = relationship( # noqa: F821
"Domain", foreign_keys=[fulfilling_domain_id], lazy="selectin"
)
blocking_task: Mapped["Task | None"] = relationship("Task", lazy="selectin") # noqa: F821
catalog_entry: Mapped["CapabilityCatalog | None"] = relationship( # noqa: F821
"CapabilityCatalog", lazy="selectin"
)
@property
def requesting_domain_slug(self) -> str:
return self.requesting_domain.slug if self.requesting_domain else ""
@property
def fulfilling_domain_slug(self) -> str | None:
return self.fulfilling_domain.slug if self.fulfilling_domain else None

View File

@@ -0,0 +1,358 @@
import uuid
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from api.database import get_session
from api.models.agent_message import AgentMessage
from api.models.capability_catalog import CapabilityCatalog
from api.models.capability_request import CapabilityRequest
from api.models.domain import Domain
from api.models.task import Task
from api.schemas.capability_request import (
CatalogCreate,
CatalogRead,
CapabilityRequestAccept,
CapabilityRequestCreate,
CapabilityRequestRead,
CapabilityRequestStatusPatch,
)
router = APIRouter(tags=["capability-requests"])
# ---------------------------------------------------------------------------
# Lifecycle guard
# ---------------------------------------------------------------------------
_VALID_TRANSITIONS: dict[str, set[str]] = {
"requested": {"accepted", "rejected", "withdrawn"},
"accepted": {"in_progress", "rejected", "withdrawn"},
"in_progress": {"ready_for_review", "rejected", "withdrawn"},
"ready_for_review": {"completed", "in_progress", "withdrawn"},
"completed": set(),
"rejected": set(),
"withdrawn": set(),
}
# ---------------------------------------------------------------------------
# Capability Catalog endpoints
# ---------------------------------------------------------------------------
@router.post("/capability-catalog/", response_model=CatalogRead, status_code=status.HTTP_201_CREATED)
async def create_catalog_entry(
body: CatalogCreate,
session: AsyncSession = Depends(get_session),
) -> CapabilityCatalog:
domain = await _resolve_domain(body.domain, session)
entry = CapabilityCatalog(
domain_id=domain.id,
capability_type=body.capability_type,
title=body.title,
description=body.description,
keywords=body.keywords,
)
session.add(entry)
try:
await session.commit()
except Exception:
await session.rollback()
raise HTTPException(
status_code=409,
detail=f"Catalog entry '{body.title}' for type '{body.capability_type}' already exists in domain '{body.domain}'",
)
await session.refresh(entry)
return entry
@router.get("/capability-catalog/", response_model=list[CatalogRead])
async def list_catalog(
domain: str | None = Query(None),
capability_type: str | None = Query(None),
status_filter: str | None = Query(None, alias="status"),
session: AsyncSession = Depends(get_session),
) -> list[CapabilityCatalog]:
q = select(CapabilityCatalog).order_by(CapabilityCatalog.created_at.desc())
if domain:
d = await _resolve_domain(domain, session)
q = q.where(CapabilityCatalog.domain_id == d.id)
if capability_type:
q = q.where(CapabilityCatalog.capability_type == capability_type)
if status_filter and status_filter != "all":
q = q.where(CapabilityCatalog.status == status_filter)
elif not status_filter:
q = q.where(CapabilityCatalog.status == "active")
result = await session.execute(q)
return list(result.scalars().all())
# ---------------------------------------------------------------------------
# Capability Request endpoints
# ---------------------------------------------------------------------------
@router.post("/capability-requests/", response_model=CapabilityRequestRead, status_code=status.HTTP_201_CREATED)
async def create_request(
body: CapabilityRequestCreate,
session: AsyncSession = Depends(get_session),
) -> CapabilityRequest:
req_domain = await _resolve_domain(body.requesting_domain, session)
# Route to provider
fulfilling_domain_id, catalog_entry_id = await _route_capability(
session, body.capability_type, body.description or ""
)
req = CapabilityRequest(
title=body.title,
description=body.description,
capability_type=body.capability_type,
priority=body.priority,
requesting_domain_id=req_domain.id,
requesting_agent=body.requesting_agent,
requesting_workstream_id=body.requesting_workstream_id,
blocking_task_id=body.blocking_task_id,
fulfilling_domain_id=fulfilling_domain_id,
catalog_entry_id=catalog_entry_id,
)
session.add(req)
await session.flush() # get req.id before creating notification
# Auto-notify
if fulfilling_domain_id:
ful_domain = await session.get(Domain, fulfilling_domain_id)
to_agent = ful_domain.slug if ful_domain else "broadcast"
else:
to_agent = "broadcast"
_add_notification(
session,
from_agent="system",
to_agent=to_agent,
subject=f"[capability-request] {body.title}",
body=(
f"New capability request from **{body.requesting_agent}** "
f"({body.requesting_domain}):\n\n"
f"**Type:** {body.capability_type}\n"
f"**Priority:** {body.priority}\n\n"
f"{body.description or '(no description)'}"
),
)
await session.commit()
await session.refresh(req)
return req
@router.get("/capability-requests/", response_model=list[CapabilityRequestRead])
async def list_requests(
domain: str | None = Query(None, description="Filter by requesting OR fulfilling domain slug"),
status_filter: str | None = Query(None, alias="status"),
capability_type: str | None = Query(None),
session: AsyncSession = Depends(get_session),
) -> list[CapabilityRequest]:
q = select(CapabilityRequest).order_by(CapabilityRequest.created_at.desc())
if domain:
d = await _resolve_domain(domain, session)
q = q.where(
(CapabilityRequest.requesting_domain_id == d.id)
| (CapabilityRequest.fulfilling_domain_id == d.id)
)
if status_filter:
q = q.where(CapabilityRequest.status == status_filter)
if capability_type:
q = q.where(CapabilityRequest.capability_type == capability_type)
result = await session.execute(q)
return list(result.scalars().all())
@router.get("/capability-requests/{request_id}", response_model=CapabilityRequestRead)
async def get_request(
request_id: uuid.UUID,
session: AsyncSession = Depends(get_session),
) -> CapabilityRequest:
return await _get_request_or_404(request_id, session)
@router.post("/capability-requests/{request_id}/accept", response_model=CapabilityRequestRead)
async def accept_request(
request_id: uuid.UUID,
body: CapabilityRequestAccept,
session: AsyncSession = Depends(get_session),
) -> CapabilityRequest:
req = await _get_request_or_404(request_id, session)
_check_transition(req.status, "accepted")
now = datetime.now(tz=timezone.utc)
req.status = "accepted"
req.fulfilling_agent = body.fulfilling_agent
req.fulfilling_workstream_id = body.fulfilling_workstream_id
req.accepted_at = now
# If no fulfilling domain was set by routing, infer from the accepting agent's context
# (The agent can also PATCH it later if needed)
_add_notification(
session,
from_agent=body.fulfilling_agent,
to_agent=req.requesting_agent,
subject=f"[capability-accepted] {req.title}",
body=f"Your capability request **{req.title}** has been accepted by **{body.fulfilling_agent}**.",
)
await session.commit()
await session.refresh(req)
return req
@router.patch("/capability-requests/{request_id}/status", response_model=CapabilityRequestRead)
async def patch_request_status(
request_id: uuid.UUID,
body: CapabilityRequestStatusPatch,
session: AsyncSession = Depends(get_session),
) -> CapabilityRequest:
req = await _get_request_or_404(request_id, session)
_check_transition(req.status, body.status)
req.status = body.status
if body.note:
req.resolution_note = body.note
now = datetime.now(tz=timezone.utc)
# Status-specific side effects
if body.status == "completed":
req.completed_at = now
# Auto-unblock the blocking task
if req.blocking_task_id:
task = await session.get(Task, req.blocking_task_id)
if task and task.status == "blocked":
task.status = "todo"
task.blocking_reason = None
_add_notification(
session,
from_agent="system",
to_agent=req.requesting_agent,
subject=f"[capability-completed] {req.title}",
body=(
f"Capability request **{req.title}** has been completed.\n\n"
f"{body.note or ''}"
),
)
elif body.status == "ready_for_review":
_add_notification(
session,
from_agent=req.fulfilling_agent or "system",
to_agent=req.requesting_agent,
subject=f"[capability-ready] {req.title} -- please review",
body=(
f"Capability **{req.title}** is ready for your review and optimization.\n\n"
f"{body.note or ''}"
),
)
elif body.status == "rejected":
_add_notification(
session,
from_agent=req.fulfilling_agent or "system",
to_agent=req.requesting_agent,
subject=f"[capability-rejected] {req.title}",
body=(
f"Capability request **{req.title}** has been rejected.\n\n"
f"**Reason:** {body.note or '(no reason given)'}"
),
)
elif body.status == "in_progress":
_add_notification(
session,
from_agent=req.fulfilling_agent or "system",
to_agent=req.requesting_agent,
subject=f"[capability-in-progress] {req.title}",
body=f"Work on capability **{req.title}** is now in progress.",
)
await session.commit()
await session.refresh(req)
return req
# ---------------------------------------------------------------------------
# Routing algorithm
# ---------------------------------------------------------------------------
async def _route_capability(
session: AsyncSession, capability_type: str, description: str
) -> tuple[uuid.UUID | None, uuid.UUID | None]:
"""Find the best-matching domain for a capability request.
Returns (domain_id, catalog_entry_id) or (None, None) for broadcast.
"""
q = select(CapabilityCatalog).where(
CapabilityCatalog.capability_type == capability_type,
CapabilityCatalog.status == "active",
)
entries = list((await session.execute(q)).scalars().all())
if len(entries) == 1:
return entries[0].domain_id, entries[0].id
if len(entries) > 1 and description:
desc_lower = description.lower()
scored: list[tuple[int, CapabilityCatalog]] = []
for entry in entries:
score = sum(1 for kw in (entry.keywords or []) if kw.lower() in desc_lower)
scored.append((score, entry))
scored.sort(key=lambda x: -x[0])
if scored[0][0] > 0 and (len(scored) < 2 or scored[0][0] > scored[1][0]):
return scored[0][1].domain_id, scored[0][1].id
return None, None
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _add_notification(
session: AsyncSession,
from_agent: str,
to_agent: str,
subject: str,
body: str,
) -> None:
"""Create an AgentMessage notification in the current session (no commit)."""
msg = AgentMessage(
from_agent=from_agent,
to_agent=to_agent,
subject=subject,
body=body,
)
session.add(msg)
async def _resolve_domain(slug: str, session: AsyncSession) -> Domain:
result = await session.execute(select(Domain).where(Domain.slug == slug))
domain = result.scalar_one_or_none()
if domain is None:
raise HTTPException(status_code=404, detail=f"Domain '{slug}' not found")
return domain
async def _get_request_or_404(request_id: uuid.UUID, session: AsyncSession) -> CapabilityRequest:
req = await session.get(CapabilityRequest, request_id)
if req is None:
raise HTTPException(status_code=404, detail=f"Capability request '{request_id}' not found")
return req
def _check_transition(current: str, target: str) -> None:
allowed = _VALID_TRANSITIONS.get(current, set())
if target not in allowed:
raise HTTPException(
status_code=422,
detail=(
f"Cannot transition from '{current}' to '{target}'. "
f"Allowed: {sorted(allowed) or 'none (terminal state)'}"
),
)

View File

@@ -6,6 +6,7 @@ from sqlalchemy import func, select, text
from sqlalchemy.ext.asyncio import AsyncSession
from api.database import get_session, engine
from api.models.capability_request import CapabilityRequest
from api.models.contribution import Contribution, ContributionStatus, ContributionType
from api.models.decision import Decision, DecisionStatus, DecisionType
from api.models.domain import Domain
@@ -204,6 +205,13 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm
if lic and any(pat in lic.upper() for pat in _COPYLEFT_PATS)
)
# Open capability requests (non-terminal statuses)
open_cap_req_count = (await session.execute(
select(func.count()).select_from(CapabilityRequest).where(
CapabilityRequest.status.in_(["requested", "accepted", "in_progress", "ready_for_review"])
)
)).scalar() or 0
return StateSummary(
generated_at=datetime.now(tz=timezone.utc),
totals=totals,
@@ -215,6 +223,7 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm
domains=domain_summaries,
contribution_counts=contribution_counts,
licence_risk_count=licence_risk_count,
open_capability_requests=open_cap_req_count,
open_workstreams=[
WorkstreamWithDeps(
**WorkstreamRead.model_validate(w).model_dump(),

View File

@@ -0,0 +1,79 @@
import uuid
from datetime import datetime
from pydantic import BaseModel, ConfigDict
# ---------------------------------------------------------------------------
# Capability Catalog schemas
# ---------------------------------------------------------------------------
class CatalogCreate(BaseModel):
domain: str # slug, resolved to domain_id in router
capability_type: str
title: str
description: str | None = None
keywords: list[str] = []
class CatalogRead(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: uuid.UUID
domain_slug: str
capability_type: str
title: str
description: str | None = None
keywords: list[str] = []
status: str
created_at: datetime
updated_at: datetime
# ---------------------------------------------------------------------------
# Capability Request schemas
# ---------------------------------------------------------------------------
class CapabilityRequestCreate(BaseModel):
title: str
description: str | None = None
capability_type: str
priority: str = "medium"
requesting_domain: str # slug, resolved to domain_id in router
requesting_agent: str
requesting_workstream_id: uuid.UUID | None = None
blocking_task_id: uuid.UUID | None = None
class CapabilityRequestAccept(BaseModel):
fulfilling_agent: str
fulfilling_workstream_id: uuid.UUID | None = None
class CapabilityRequestStatusPatch(BaseModel):
status: str # in_progress | ready_for_review | completed | rejected | withdrawn
note: str | None = None
class CapabilityRequestRead(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: uuid.UUID
title: str
description: str | None = None
capability_type: str
priority: str
status: str
requesting_domain_slug: str
requesting_agent: str
requesting_workstream_id: uuid.UUID | None = None
fulfilling_domain_slug: str | None = None
fulfilling_agent: str | None = None
fulfilling_workstream_id: uuid.UUID | None = None
blocking_task_id: uuid.UUID | None = None
catalog_entry_id: uuid.UUID | None = None
resolution_note: str | None = None
accepted_at: datetime | None = None
completed_at: datetime | None = None
created_at: datetime
updated_at: datetime

View File

@@ -79,3 +79,4 @@ class StateSummary(BaseModel):
domains: list[DomainSummary] = []
contribution_counts: dict[str, int] = {}
licence_risk_count: int = 0
open_capability_requests: int = 0