import uuid from datetime import datetime, timezone from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from api.database import get_session from api.models.agent_message import AgentMessage from api.models.capability_catalog import CapabilityCatalog from api.models.capability_request import CapabilityRequest from api.models.domain import Domain from api.models.task import Task from api.schemas.capability_request import ( CatalogCreate, CatalogRead, CapabilityRequestAccept, CapabilityRequestCreate, CapabilityRequestRead, CapabilityRequestStatusPatch, ) router = APIRouter(tags=["capability-requests"]) # --------------------------------------------------------------------------- # Lifecycle guard # --------------------------------------------------------------------------- _VALID_TRANSITIONS: dict[str, set[str]] = { "requested": {"accepted", "rejected", "withdrawn"}, "accepted": {"in_progress", "rejected", "withdrawn"}, "in_progress": {"ready_for_review", "rejected", "withdrawn"}, "ready_for_review": {"completed", "in_progress", "withdrawn"}, "completed": set(), "rejected": set(), "withdrawn": set(), } # --------------------------------------------------------------------------- # Capability Catalog endpoints # --------------------------------------------------------------------------- @router.post("/capability-catalog/", response_model=CatalogRead, status_code=status.HTTP_201_CREATED) async def create_catalog_entry( body: CatalogCreate, session: AsyncSession = Depends(get_session), ) -> CapabilityCatalog: domain = await _resolve_domain(body.domain, session) entry = CapabilityCatalog( domain_id=domain.id, capability_type=body.capability_type, title=body.title, description=body.description, keywords=body.keywords, ) session.add(entry) try: await session.commit() except Exception: await session.rollback() raise HTTPException( status_code=409, detail=f"Catalog entry '{body.title}' for type '{body.capability_type}' already exists in domain '{body.domain}'", ) await session.refresh(entry) return entry @router.get("/capability-catalog/", response_model=list[CatalogRead]) async def list_catalog( domain: str | None = Query(None), capability_type: str | None = Query(None), status_filter: str | None = Query(None, alias="status"), session: AsyncSession = Depends(get_session), ) -> list[CapabilityCatalog]: q = select(CapabilityCatalog).order_by(CapabilityCatalog.created_at.desc()) if domain: d = await _resolve_domain(domain, session) q = q.where(CapabilityCatalog.domain_id == d.id) if capability_type: q = q.where(CapabilityCatalog.capability_type == capability_type) if status_filter and status_filter != "all": q = q.where(CapabilityCatalog.status == status_filter) elif not status_filter: q = q.where(CapabilityCatalog.status == "active") result = await session.execute(q) return list(result.scalars().all()) # --------------------------------------------------------------------------- # Capability Request endpoints # --------------------------------------------------------------------------- @router.post("/capability-requests/", response_model=CapabilityRequestRead, status_code=status.HTTP_201_CREATED) async def create_request( body: CapabilityRequestCreate, session: AsyncSession = Depends(get_session), ) -> CapabilityRequest: req_domain = await _resolve_domain(body.requesting_domain, session) # Route to provider fulfilling_domain_id, catalog_entry_id = await _route_capability( session, body.capability_type, body.description or "" ) req = CapabilityRequest( title=body.title, description=body.description, capability_type=body.capability_type, priority=body.priority, requesting_domain_id=req_domain.id, requesting_agent=body.requesting_agent, requesting_workstream_id=body.requesting_workstream_id, blocking_task_id=body.blocking_task_id, fulfilling_domain_id=fulfilling_domain_id, catalog_entry_id=catalog_entry_id, ) session.add(req) await session.flush() # get req.id before creating notification # Auto-notify if fulfilling_domain_id: ful_domain = await session.get(Domain, fulfilling_domain_id) to_agent = ful_domain.slug if ful_domain else "broadcast" else: to_agent = "broadcast" _add_notification( session, from_agent="system", to_agent=to_agent, subject=f"[capability-request] {body.title}", body=( f"New capability request from **{body.requesting_agent}** " f"({body.requesting_domain}):\n\n" f"**Type:** {body.capability_type}\n" f"**Priority:** {body.priority}\n\n" f"{body.description or '(no description)'}" ), ) await session.commit() await session.refresh(req) return req @router.get("/capability-requests/", response_model=list[CapabilityRequestRead]) async def list_requests( domain: str | None = Query(None, description="Filter by requesting OR fulfilling domain slug"), status_filter: str | None = Query(None, alias="status"), capability_type: str | None = Query(None), session: AsyncSession = Depends(get_session), ) -> list[CapabilityRequest]: q = select(CapabilityRequest).order_by(CapabilityRequest.created_at.desc()) if domain: d = await _resolve_domain(domain, session) q = q.where( (CapabilityRequest.requesting_domain_id == d.id) | (CapabilityRequest.fulfilling_domain_id == d.id) ) if status_filter: q = q.where(CapabilityRequest.status == status_filter) if capability_type: q = q.where(CapabilityRequest.capability_type == capability_type) result = await session.execute(q) return list(result.scalars().all()) @router.get("/capability-requests/{request_id}", response_model=CapabilityRequestRead) async def get_request( request_id: uuid.UUID, session: AsyncSession = Depends(get_session), ) -> CapabilityRequest: return await _get_request_or_404(request_id, session) @router.post("/capability-requests/{request_id}/accept", response_model=CapabilityRequestRead) async def accept_request( request_id: uuid.UUID, body: CapabilityRequestAccept, session: AsyncSession = Depends(get_session), ) -> CapabilityRequest: req = await _get_request_or_404(request_id, session) _check_transition(req.status, "accepted") now = datetime.now(tz=timezone.utc) req.status = "accepted" req.fulfilling_agent = body.fulfilling_agent req.fulfilling_workstream_id = body.fulfilling_workstream_id req.accepted_at = now # If no fulfilling domain was set by routing, infer from the accepting agent's context # (The agent can also PATCH it later if needed) _add_notification( session, from_agent=body.fulfilling_agent, to_agent=req.requesting_agent, subject=f"[capability-accepted] {req.title}", body=f"Your capability request **{req.title}** has been accepted by **{body.fulfilling_agent}**.", ) await session.commit() await session.refresh(req) return req @router.patch("/capability-requests/{request_id}/status", response_model=CapabilityRequestRead) async def patch_request_status( request_id: uuid.UUID, body: CapabilityRequestStatusPatch, session: AsyncSession = Depends(get_session), ) -> CapabilityRequest: req = await _get_request_or_404(request_id, session) _check_transition(req.status, body.status) req.status = body.status if body.note: req.resolution_note = body.note now = datetime.now(tz=timezone.utc) # Status-specific side effects if body.status == "completed": req.completed_at = now # Auto-unblock the blocking task if req.blocking_task_id: task = await session.get(Task, req.blocking_task_id) if task and task.status == "blocked": task.status = "todo" task.blocking_reason = None _add_notification( session, from_agent="system", to_agent=req.requesting_agent, subject=f"[capability-completed] {req.title}", body=( f"Capability request **{req.title}** has been completed.\n\n" f"{body.note or ''}" ), ) elif body.status == "ready_for_review": _add_notification( session, from_agent=req.fulfilling_agent or "system", to_agent=req.requesting_agent, subject=f"[capability-ready] {req.title} -- please review", body=( f"Capability **{req.title}** is ready for your review and optimization.\n\n" f"{body.note or ''}" ), ) elif body.status == "rejected": _add_notification( session, from_agent=req.fulfilling_agent or "system", to_agent=req.requesting_agent, subject=f"[capability-rejected] {req.title}", body=( f"Capability request **{req.title}** has been rejected.\n\n" f"**Reason:** {body.note or '(no reason given)'}" ), ) elif body.status == "in_progress": _add_notification( session, from_agent=req.fulfilling_agent or "system", to_agent=req.requesting_agent, subject=f"[capability-in-progress] {req.title}", body=f"Work on capability **{req.title}** is now in progress.", ) await session.commit() await session.refresh(req) return req # --------------------------------------------------------------------------- # Routing algorithm # --------------------------------------------------------------------------- async def _route_capability( session: AsyncSession, capability_type: str, description: str ) -> tuple[uuid.UUID | None, uuid.UUID | None]: """Find the best-matching domain for a capability request. Returns (domain_id, catalog_entry_id) or (None, None) for broadcast. """ q = select(CapabilityCatalog).where( CapabilityCatalog.capability_type == capability_type, CapabilityCatalog.status == "active", ) entries = list((await session.execute(q)).scalars().all()) if len(entries) == 1: return entries[0].domain_id, entries[0].id if len(entries) > 1 and description: desc_lower = description.lower() scored: list[tuple[int, CapabilityCatalog]] = [] for entry in entries: score = sum(1 for kw in (entry.keywords or []) if kw.lower() in desc_lower) scored.append((score, entry)) scored.sort(key=lambda x: -x[0]) if scored[0][0] > 0 and (len(scored) < 2 or scored[0][0] > scored[1][0]): return scored[0][1].domain_id, scored[0][1].id return None, None # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _add_notification( session: AsyncSession, from_agent: str, to_agent: str, subject: str, body: str, ) -> None: """Create an AgentMessage notification in the current session (no commit).""" msg = AgentMessage( from_agent=from_agent, to_agent=to_agent, subject=subject, body=body, ) session.add(msg) async def _resolve_domain(slug: str, session: AsyncSession) -> Domain: result = await session.execute(select(Domain).where(Domain.slug == slug)) domain = result.scalar_one_or_none() if domain is None: raise HTTPException(status_code=404, detail=f"Domain '{slug}' not found") return domain async def _get_request_or_404(request_id: uuid.UUID, session: AsyncSession) -> CapabilityRequest: req = await session.get(CapabilityRequest, request_id) if req is None: raise HTTPException(status_code=404, detail=f"Capability request '{request_id}' not found") return req def _check_transition(current: str, target: str) -> None: allowed = _VALID_TRANSITIONS.get(current, set()) if target not in allowed: raise HTTPException( status_code=422, detail=( f"Cannot transition from '{current}' to '{target}'. " f"Allowed: {sorted(allowed) or 'none (terminal state)'}" ), )