Files
state-hub/api/routers/capability_requests.py
tegwick 09bbf62430 feat(capability-registry): CUST-WP-0031 domain capability registry
- Migration p3k4l5m6n7o8: nullable repo_id FK on capability_catalog
- PATCH /capability-catalog/{id} endpoint for back-filling repo attribution
- register_capability MCP tool accepts optional repo_slug
- get_domain_summary now includes compact capabilities list (type+title+repo_slug)
- New get_capability_profile MCP tool: domain → repos → capabilities tree
- 6 repo descriptions populated; 25 catalog entries attributed to repos
- 9 new capabilities registered for personhood, foerster_capabilities, coulomb_social
- TOOLS.md: Capability Catalog & Requests section with full tool reference

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-31 17:23:45 +02:00

614 lines
22 KiB
Python

import re
import uuid
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from api.database import get_session
from api.models.agent_message import AgentMessage
from api.models.capability_catalog import CapabilityCatalog
from api.models.capability_request import CapabilityRequest
from api.models.domain import Domain
from api.models.managed_repo import ManagedRepo
from api.models.task import Task
from api.schemas.capability_request import (
CatalogCreate,
CatalogPatch,
CatalogRead,
CapabilityRequestAccept,
CapabilityRequestCreate,
CapabilityRequestDispute,
CapabilityRequestPatch,
CapabilityRequestRead,
CapabilityRequestReroute,
CapabilityRequestStatusPatch,
)
router = APIRouter(tags=["capability-requests"])
# ---------------------------------------------------------------------------
# Lifecycle guard
# ---------------------------------------------------------------------------
_VALID_TRANSITIONS: dict[str, set[str]] = {
"requested": {"accepted", "rejected", "withdrawn", "routing_disputed"},
"routing_disputed": {"requested", "withdrawn"},
"accepted": {"in_progress", "rejected", "withdrawn"},
"in_progress": {"ready_for_review", "rejected", "withdrawn"},
"ready_for_review": {"completed", "in_progress", "withdrawn"},
"completed": set(),
"rejected": set(),
"withdrawn": set(),
}
# ---------------------------------------------------------------------------
# Capability Catalog endpoints
# ---------------------------------------------------------------------------
@router.post("/capability-catalog/", response_model=CatalogRead, status_code=status.HTTP_201_CREATED)
async def create_catalog_entry(
body: CatalogCreate,
session: AsyncSession = Depends(get_session),
) -> CapabilityCatalog:
domain = await _resolve_domain(body.domain, session)
repo_id = None
if body.repo_slug:
repo = await _resolve_repo(body.repo_slug, session)
repo_id = repo.id
entry = CapabilityCatalog(
domain_id=domain.id,
repo_id=repo_id,
capability_type=body.capability_type,
title=body.title,
description=body.description,
keywords=body.keywords,
)
session.add(entry)
try:
await session.commit()
except Exception:
await session.rollback()
raise HTTPException(
status_code=409,
detail=f"Catalog entry '{body.title}' for type '{body.capability_type}' already exists in domain '{body.domain}'",
)
await session.refresh(entry)
return entry
@router.patch("/capability-catalog/{entry_id}", response_model=CatalogRead)
async def patch_catalog_entry(
entry_id: uuid.UUID,
body: CatalogPatch,
session: AsyncSession = Depends(get_session),
) -> CapabilityCatalog:
entry = await session.get(CapabilityCatalog, entry_id)
if entry is None:
raise HTTPException(status_code=404, detail=f"Catalog entry '{entry_id}' not found")
if body.repo_slug is not None:
repo = await _resolve_repo(body.repo_slug, session)
entry.repo_id = repo.id
if body.description is not None:
entry.description = body.description
if body.keywords is not None:
entry.keywords = body.keywords
if body.status is not None:
entry.status = body.status
await session.commit()
await session.refresh(entry)
return entry
@router.get("/capability-catalog/", response_model=list[CatalogRead])
async def list_catalog(
domain: str | None = Query(None),
capability_type: str | None = Query(None),
status_filter: str | None = Query(None, alias="status"),
session: AsyncSession = Depends(get_session),
) -> list[CapabilityCatalog]:
q = select(CapabilityCatalog).order_by(CapabilityCatalog.created_at.desc())
if domain:
d = await _resolve_domain(domain, session)
q = q.where(CapabilityCatalog.domain_id == d.id)
if capability_type:
q = q.where(CapabilityCatalog.capability_type == capability_type)
if status_filter and status_filter != "all":
q = q.where(CapabilityCatalog.status == status_filter)
elif not status_filter:
q = q.where(CapabilityCatalog.status == "active")
result = await session.execute(q)
return list(result.scalars().all())
# ---------------------------------------------------------------------------
# Capability Request endpoints
# ---------------------------------------------------------------------------
@router.post("/capability-requests/", response_model=CapabilityRequestRead, status_code=status.HTTP_201_CREATED)
async def create_request(
body: CapabilityRequestCreate,
session: AsyncSession = Depends(get_session),
) -> CapabilityRequest:
req_domain = await _resolve_domain(body.requesting_domain, session)
# Route to provider
fulfilling_domain_id, catalog_entry_id, routing_note = await _route_capability(
session, body.capability_type, body.title, body.description or ""
)
req = CapabilityRequest(
title=body.title,
description=body.description,
capability_type=body.capability_type,
priority=body.priority,
requesting_domain_id=req_domain.id,
requesting_agent=body.requesting_agent,
requesting_workstream_id=body.requesting_workstream_id,
blocking_task_id=body.blocking_task_id,
fulfilling_domain_id=fulfilling_domain_id,
catalog_entry_id=catalog_entry_id,
routing_note=routing_note,
)
session.add(req)
await session.flush() # get req.id before creating notification
# Auto-notify
if fulfilling_domain_id:
ful_domain = await session.get(Domain, fulfilling_domain_id)
to_agent = ful_domain.slug if ful_domain else "broadcast"
else:
to_agent = "broadcast"
_add_notification(
session,
from_agent="system",
to_agent=to_agent,
subject=f"[capability-request] {body.title}",
body=(
f"New capability request from **{body.requesting_agent}** "
f"({body.requesting_domain}):\n\n"
f"**Type:** {body.capability_type}\n"
f"**Priority:** {body.priority}\n\n"
f"{body.description or '(no description)'}"
),
)
await session.commit()
await session.refresh(req)
return req
@router.get("/capability-requests/", response_model=list[CapabilityRequestRead])
async def list_requests(
domain: str | None = Query(None, description="Filter by requesting OR fulfilling domain slug"),
status_filter: str | None = Query(None, alias="status"),
capability_type: str | None = Query(None),
session: AsyncSession = Depends(get_session),
) -> list[CapabilityRequest]:
q = select(CapabilityRequest).order_by(CapabilityRequest.created_at.desc())
if domain:
d = await _resolve_domain(domain, session)
q = q.where(
(CapabilityRequest.requesting_domain_id == d.id)
| (CapabilityRequest.fulfilling_domain_id == d.id)
)
if status_filter:
q = q.where(CapabilityRequest.status == status_filter)
if capability_type:
q = q.where(CapabilityRequest.capability_type == capability_type)
result = await session.execute(q)
return list(result.scalars().all())
@router.get("/capability-requests/{request_id}", response_model=CapabilityRequestRead)
async def get_request(
request_id: uuid.UUID,
session: AsyncSession = Depends(get_session),
) -> CapabilityRequest:
return await _get_request_or_404(request_id, session)
@router.post("/capability-requests/{request_id}/accept", response_model=CapabilityRequestRead)
async def accept_request(
request_id: uuid.UUID,
body: CapabilityRequestAccept,
session: AsyncSession = Depends(get_session),
) -> CapabilityRequest:
req = await _get_request_or_404(request_id, session)
_check_transition(req.status, "accepted")
now = datetime.now(tz=timezone.utc)
req.status = "accepted"
req.fulfilling_agent = body.fulfilling_agent
req.fulfilling_workstream_id = body.fulfilling_workstream_id
req.accepted_at = now
# If no fulfilling domain was set by routing, infer from the accepting agent's context
# (The agent can also PATCH it later if needed)
_add_notification(
session,
from_agent=body.fulfilling_agent,
to_agent=req.requesting_agent,
subject=f"[capability-accepted] {req.title}",
body=f"Your capability request **{req.title}** has been accepted by **{body.fulfilling_agent}**.",
)
await session.commit()
await session.refresh(req)
return req
@router.patch("/capability-requests/{request_id}/status", response_model=CapabilityRequestRead)
async def patch_request_status(
request_id: uuid.UUID,
body: CapabilityRequestStatusPatch,
session: AsyncSession = Depends(get_session),
) -> CapabilityRequest:
req = await _get_request_or_404(request_id, session)
_check_transition(req.status, body.status)
req.status = body.status
if body.note:
req.resolution_note = body.note
now = datetime.now(tz=timezone.utc)
# Status-specific side effects
if body.status == "completed":
req.completed_at = now
# Auto-unblock the blocking task
if req.blocking_task_id:
task = await session.get(Task, req.blocking_task_id)
if task and task.status == "blocked":
task.status = "todo"
task.blocking_reason = None
_add_notification(
session,
from_agent="system",
to_agent=req.requesting_agent,
subject=f"[capability-completed] {req.title}",
body=(
f"Capability request **{req.title}** has been completed.\n\n"
f"{body.note or ''}"
),
)
elif body.status == "ready_for_review":
_add_notification(
session,
from_agent=req.fulfilling_agent or "system",
to_agent=req.requesting_agent,
subject=f"[capability-ready] {req.title} -- please review",
body=(
f"Capability **{req.title}** is ready for your review and optimization.\n\n"
f"{body.note or ''}"
),
)
elif body.status == "rejected":
_add_notification(
session,
from_agent=req.fulfilling_agent or "system",
to_agent=req.requesting_agent,
subject=f"[capability-rejected] {req.title}",
body=(
f"Capability request **{req.title}** has been rejected.\n\n"
f"**Reason:** {body.note or '(no reason given)'}"
),
)
elif body.status == "in_progress":
_add_notification(
session,
from_agent=req.fulfilling_agent or "system",
to_agent=req.requesting_agent,
subject=f"[capability-in-progress] {req.title}",
body=f"Work on capability **{req.title}** is now in progress.",
)
await session.commit()
await session.refresh(req)
return req
@router.patch("/capability-requests/{request_id}", response_model=CapabilityRequestRead)
async def patch_request(
request_id: uuid.UUID,
body: CapabilityRequestPatch,
session: AsyncSession = Depends(get_session),
) -> CapabilityRequest:
"""Correct mutable metadata: catalog_entry_id (re-derives fulfilling domain),
priority, blocking_task_id, fulfilling_workstream_id.
Only fields present in the request body (non-None) are updated.
"""
req = await _get_request_or_404(request_id, session)
corrections: list[str] = []
if body.catalog_entry_id is not None:
old_entry_id = req.catalog_entry_id
entry = await session.get(CapabilityCatalog, body.catalog_entry_id)
if entry is None:
raise HTTPException(status_code=404, detail=f"Catalog entry '{body.catalog_entry_id}' not found")
req.catalog_entry_id = entry.id
# Re-derive fulfilling domain from catalog entry
old_domain_id = req.fulfilling_domain_id
req.fulfilling_domain_id = entry.domain_id
corrections.append(
f"catalog_entry: {old_entry_id}{entry.id} ({entry.title}); "
f"fulfilling_domain re-derived → {entry.domain_id}"
)
if body.priority is not None:
req.priority = body.priority
corrections.append(f"priority → {body.priority}")
if body.blocking_task_id is not None:
req.blocking_task_id = body.blocking_task_id
corrections.append(f"blocking_task_id → {body.blocking_task_id}")
if body.fulfilling_workstream_id is not None:
req.fulfilling_workstream_id = body.fulfilling_workstream_id
corrections.append(f"fulfilling_workstream_id → {body.fulfilling_workstream_id}")
if not corrections:
return req # no-op
correction_note = "hub correction: " + "; ".join(corrections)
req.routing_note = (req.routing_note + "\n" + correction_note) if req.routing_note else correction_note
await session.commit()
await session.refresh(req)
return req
# ---------------------------------------------------------------------------
# Dispute endpoints
# ---------------------------------------------------------------------------
@router.post("/capability-requests/{request_id}/dispute", response_model=CapabilityRequestRead)
async def dispute_request(
request_id: uuid.UUID,
body: CapabilityRequestDispute,
session: AsyncSession = Depends(get_session),
) -> CapabilityRequest:
"""Flag a routing decision as incorrect. Transitions to routing_disputed."""
req = await _get_request_or_404(request_id, session)
_check_transition(req.status, "routing_disputed")
now = datetime.now(tz=timezone.utc)
req.status = "routing_disputed"
req.dispute_reason = body.reason
req.disputed_by = body.disputed_by
req.dispute_suggested_domain = body.suggested_domain
req.disputed_at = now
dispute_entry = (
f"disputed by {body.disputed_by}: {body.reason}"
+ (f" (suggested: {body.suggested_domain})" if body.suggested_domain else "")
)
req.routing_note = (req.routing_note + "\n" + dispute_entry) if req.routing_note else dispute_entry
# Notify custodian
_add_notification(
session,
from_agent=body.disputed_by,
to_agent="custodian",
subject=f"[routing-disputed] {req.title}",
body=(
f"**{body.disputed_by}** has disputed the routing of capability request "
f"**{req.title}**.\n\n"
f"**Reason:** {body.reason}\n"
+ (f"**Suggested domain:** {body.suggested_domain}\n" if body.suggested_domain else "")
+ f"\nCurrently routed to: {req.fulfilling_domain_slug or 'unrouted'}"
),
)
# Notify current fulfilling domain
if req.fulfilling_domain_slug:
_add_notification(
session,
from_agent=body.disputed_by,
to_agent=req.fulfilling_domain_slug,
subject=f"[routing-disputed] {req.title}",
body=(
f"The capability request **{req.title}** routed to your domain has been disputed "
f"by **{body.disputed_by}**.\n\n"
f"**Reason:** {body.reason}\n"
+ (f"**Suggested domain:** {body.suggested_domain}" if body.suggested_domain else "")
),
)
await session.commit()
await session.refresh(req)
return req
@router.post("/capability-requests/{request_id}/reroute", response_model=CapabilityRequestRead)
async def reroute_request(
request_id: uuid.UUID,
body: CapabilityRequestReroute,
session: AsyncSession = Depends(get_session),
) -> CapabilityRequest:
"""Re-route a disputed request to a new domain. Resets to requested."""
req = await _get_request_or_404(request_id, session)
if req.status != "routing_disputed":
raise HTTPException(
status_code=422,
detail=f"Cannot reroute from status '{req.status}'. Only 'routing_disputed' requests can be rerouted.",
)
if body.catalog_entry_id is None and body.domain is None:
raise HTTPException(status_code=422, detail="Either catalog_entry_id or domain must be provided.")
if body.catalog_entry_id is not None:
entry = await session.get(CapabilityCatalog, body.catalog_entry_id)
if entry is None:
raise HTTPException(status_code=404, detail=f"Catalog entry '{body.catalog_entry_id}' not found")
req.catalog_entry_id = entry.id
req.fulfilling_domain_id = entry.domain_id
new_domain_slug = (await session.get(Domain, entry.domain_id)).slug if entry.domain_id else "unknown"
else:
new_domain = await _resolve_domain(body.domain, session)
req.fulfilling_domain_id = new_domain.id
new_domain_slug = new_domain.slug
old_domain = req.dispute_suggested_domain or "unknown"
# Clear dispute fields
req.dispute_reason = None
req.disputed_by = None
req.dispute_suggested_domain = None
req.disputed_at = None
req.status = "requested"
reroute_entry = f"re-routed by {body.rerouted_by}{new_domain_slug}: {body.note}"
req.routing_note = (req.routing_note + "\n" + reroute_entry) if req.routing_note else reroute_entry
# Notify requester
_add_notification(
session,
from_agent=body.rerouted_by,
to_agent=req.requesting_agent,
subject=f"[re-routed] {req.title}",
body=(
f"Capability request **{req.title}** has been re-routed to **{new_domain_slug}**.\n\n"
f"**Note:** {body.note}"
),
)
# Notify new fulfilling domain
_add_notification(
session,
from_agent=body.rerouted_by,
to_agent=new_domain_slug,
subject=f"[capability-request] {req.title}",
body=(
f"Capability request **{req.title}** has been re-routed to your domain.\n\n"
f"**From:** {req.requesting_agent} ({req.requesting_domain_slug})\n"
f"**Type:** {req.capability_type}\n"
f"**Priority:** {req.priority}\n\n"
f"{req.description or '(no description)'}"
),
)
await session.commit()
await session.refresh(req)
return req
# ---------------------------------------------------------------------------
# Routing algorithm
# ---------------------------------------------------------------------------
async def _route_capability(
session: AsyncSession, capability_type: str, title: str, description: str
) -> tuple[uuid.UUID | None, uuid.UUID | None, str]:
"""Find the best-matching catalog entry for a capability request.
Returns (domain_id, catalog_entry_id, routing_note).
Uses word-boundary matching on (title + description) combined to avoid
false positives from substring matches (e.g. 'postgres' inside 'postgresql',
'ha' inside 'has').
"""
q = select(CapabilityCatalog).where(
CapabilityCatalog.capability_type == capability_type,
CapabilityCatalog.status == "active",
)
entries = list((await session.execute(q)).scalars().all())
if not entries:
return None, None, f"no active catalog entries for type '{capability_type}' — broadcast"
if len(entries) == 1:
e = entries[0]
return e.domain_id, e.id, f"single match: '{e.title}' (domain={e.domain_id})"
# Score by word-boundary keyword overlap against title + description combined
combined = f"{title} {description or ''}".lower()
scored: list[tuple[int, CapabilityCatalog]] = []
for entry in entries:
keywords = [kw for kw in (entry.keywords or []) if len(kw) >= 3]
score = sum(
1 for kw in keywords
if re.search(r'\b' + re.escape(kw.lower()) + r'\b', combined)
)
scored.append((score, entry))
scored.sort(key=lambda x: -x[0])
best_score, best = scored[0]
if best_score == 0:
return None, None, (
f"no keyword overlap for type '{capability_type}' among "
f"{len(entries)} entries — broadcast"
)
if len(scored) >= 2 and scored[1][0] == best_score:
return None, None, (
f"ambiguous routing: '{scored[0][1].title}' and '{scored[1][1].title}' "
f"both scored {best_score} — broadcast"
)
return best.domain_id, best.id, (
f"matched '{best.title}' (score={best_score}, "
f"keywords matched from: {title!r})"
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _add_notification(
session: AsyncSession,
from_agent: str,
to_agent: str,
subject: str,
body: str,
) -> None:
"""Create an AgentMessage notification in the current session (no commit)."""
msg = AgentMessage(
from_agent=from_agent,
to_agent=to_agent,
subject=subject,
body=body,
)
session.add(msg)
async def _resolve_domain(slug: str, session: AsyncSession) -> Domain:
result = await session.execute(select(Domain).where(Domain.slug == slug))
domain = result.scalar_one_or_none()
if domain is None:
raise HTTPException(status_code=404, detail=f"Domain '{slug}' not found")
return domain
async def _resolve_repo(slug: str, session: AsyncSession) -> ManagedRepo:
result = await session.execute(select(ManagedRepo).where(ManagedRepo.slug == slug))
repo = result.scalar_one_or_none()
if repo is None:
raise HTTPException(status_code=404, detail=f"Repo '{slug}' not found")
return repo
async def _get_request_or_404(request_id: uuid.UUID, session: AsyncSession) -> CapabilityRequest:
req = await session.get(CapabilityRequest, request_id)
if req is None:
raise HTTPException(status_code=404, detail=f"Capability request '{request_id}' not found")
return req
def _check_transition(current: str, target: str) -> None:
allowed = _VALID_TRANSITIONS.get(current, set())
if target not in allowed:
raise HTTPException(
status_code=422,
detail=(
f"Cannot transition from '{current}' to '{target}'. "
f"Allowed: {sorted(allowed) or 'none (terminal state)'}"
),
)