import uuid from collections.abc import Callable from datetime import datetime, timezone from typing import Any from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from hub_core.models.capability_catalog import CapabilityCatalog from hub_core.models.capability_request import CapabilityRequest from hub_core.models.domain import Domain from hub_core.models.managed_repo import ManagedRepo from hub_core.schemas.capability import ( CapabilityRequestAccept, CapabilityRequestCreate, CapabilityRequestDispute, CapabilityRequestPatch, CapabilityRequestRead, CapabilityRequestStatusPatch, CatalogCreate, CatalogPatch, CatalogRead, ) def create_capability_catalog_router( get_session: Callable[..., AsyncSession], *, domain_model: type[Domain] = Domain, repo_model: type[ManagedRepo] = ManagedRepo, catalog_model: type[CapabilityCatalog] = CapabilityCatalog, catalog_create_schema: type[CatalogCreate] = CatalogCreate, catalog_patch_schema: type[CatalogPatch] = CatalogPatch, catalog_read_schema: type[CatalogRead] = CatalogRead, ) -> APIRouter: router = APIRouter(tags=["capability-requests"]) list_response_model = list[catalog_read_schema] @router.post("/capability-catalog/", response_model=catalog_read_schema, status_code=status.HTTP_201_CREATED) async def create_catalog_entry( body: catalog_create_schema, session: AsyncSession = Depends(get_session), ) -> Any: domain = await _resolve_domain(body.domain, session, domain_model) repo_id = None if body.repo_slug: repo = await _resolve_repo(body.repo_slug, session, repo_model) repo_id = repo.id entry = catalog_model( domain_id=domain.id, repo_id=repo_id, capability_type=body.capability_type, title=body.title, description=body.description, keywords=body.keywords, ) session.add(entry) try: await session.commit() except Exception: await session.rollback() raise HTTPException( status_code=409, detail=( f"Catalog entry '{body.title}' for type '{body.capability_type}' " f"already exists in domain '{body.domain}'" ), ) await session.refresh(entry) return entry @router.get("/capability-catalog/", response_model=list_response_model) async def list_catalog( domain: str | None = Query(None), capability_type: str | None = Query(None), status_filter: str | None = Query(None, alias="status"), session: AsyncSession = Depends(get_session), ) -> list[Any]: q = select(catalog_model).order_by(catalog_model.created_at.desc()) if domain: domain_obj = await _resolve_domain(domain, session, domain_model) q = q.where(catalog_model.domain_id == domain_obj.id) if capability_type: q = q.where(catalog_model.capability_type == capability_type) if status_filter and status_filter != "all": q = q.where(catalog_model.status == status_filter) elif not status_filter: q = q.where(catalog_model.status == "active") result = await session.execute(q) return list(result.scalars().all()) @router.patch("/capability-catalog/{entry_id}", response_model=catalog_read_schema) async def patch_catalog_entry( entry_id: uuid.UUID, body: catalog_patch_schema, session: AsyncSession = Depends(get_session), ) -> Any: entry = await session.get(catalog_model, entry_id) if entry is None: raise HTTPException(status_code=404, detail=f"Catalog entry '{entry_id}' not found") if body.repo_slug is not None: repo = await _resolve_repo(body.repo_slug, session, repo_model) entry.repo_id = repo.id if body.description is not None: entry.description = body.description if body.keywords is not None: entry.keywords = body.keywords if body.status is not None: entry.status = body.status await session.commit() await session.refresh(entry) return entry return router def create_capability_request_read_router( get_session: Callable[..., AsyncSession], *, domain_model: type[Domain] = Domain, request_model: type[CapabilityRequest] = CapabilityRequest, request_read_schema: type[CapabilityRequestRead] = CapabilityRequestRead, ) -> APIRouter: router = APIRouter(tags=["capability-requests"]) list_response_model = list[request_read_schema] @router.get("/capability-requests/", response_model=list_response_model) async def list_requests( domain: str | None = Query( None, description="Filter by requesting or fulfilling domain slug", ), status_filter: str | None = Query(None, alias="status"), capability_type: str | None = Query(None), session: AsyncSession = Depends(get_session), ) -> list[Any]: q = select(request_model).order_by(request_model.created_at.desc()) if domain: domain_obj = await _resolve_domain(domain, session, domain_model) q = q.where( (request_model.requesting_domain_id == domain_obj.id) | (request_model.fulfilling_domain_id == domain_obj.id) ) if status_filter: q = q.where(request_model.status == status_filter) if capability_type: q = q.where(request_model.capability_type == capability_type) result = await session.execute(q) return list(result.scalars().all()) @router.get("/capability-requests/{request_id}", response_model=request_read_schema) async def get_request( request_id: uuid.UUID, session: AsyncSession = Depends(get_session), ) -> Any: req = await session.get(request_model, request_id) if req is None: raise HTTPException(status_code=404, detail=f"Capability request '{request_id}' not found") return req return router def create_capabilities_router(get_session: Callable[..., AsyncSession]) -> APIRouter: router = APIRouter(tags=["capability-requests"]) @router.post("/capability-catalog/", response_model=CatalogRead, status_code=status.HTTP_201_CREATED) async def create_catalog_entry( body: CatalogCreate, session: AsyncSession = Depends(get_session), ) -> CapabilityCatalog: domain = await _resolve_domain(body.domain, session, Domain) repo_id = None if body.repo_slug: repo = await _resolve_repo(body.repo_slug, session, ManagedRepo) repo_id = repo.id entry = CapabilityCatalog( domain_id=domain.id, repo_id=repo_id, capability_type=body.capability_type, title=body.title, description=body.description, keywords=body.keywords, ) session.add(entry) try: await session.commit() except Exception: await session.rollback() raise HTTPException( status_code=409, detail=( f"Catalog entry '{body.title}' for type '{body.capability_type}' " f"already exists in domain '{body.domain}'" ), ) await session.refresh(entry) return entry @router.get("/capability-catalog/", response_model=list[CatalogRead]) async def list_catalog( domain: str | None = Query(None), capability_type: str | None = Query(None), status_filter: str | None = Query(None, alias="status"), session: AsyncSession = Depends(get_session), ) -> list[CapabilityCatalog]: q = select(CapabilityCatalog).order_by(CapabilityCatalog.created_at.desc()) if domain: domain_obj = await _resolve_domain(domain, session, Domain) q = q.where(CapabilityCatalog.domain_id == domain_obj.id) if capability_type: q = q.where(CapabilityCatalog.capability_type == capability_type) if status_filter and status_filter != "all": q = q.where(CapabilityCatalog.status == status_filter) elif not status_filter: q = q.where(CapabilityCatalog.status == "active") result = await session.execute(q) return list(result.scalars().all()) @router.patch("/capability-catalog/{entry_id}", response_model=CatalogRead) async def patch_catalog_entry( entry_id: uuid.UUID, body: CatalogPatch, session: AsyncSession = Depends(get_session), ) -> CapabilityCatalog: entry = await session.get(CapabilityCatalog, entry_id) if entry is None: raise HTTPException(status_code=404, detail=f"Catalog entry '{entry_id}' not found") if body.repo_slug is not None: repo = await _resolve_repo(body.repo_slug, session, ManagedRepo) entry.repo_id = repo.id if body.description is not None: entry.description = body.description if body.keywords is not None: entry.keywords = body.keywords if body.status is not None: entry.status = body.status await session.commit() await session.refresh(entry) return entry @router.post("/capability-requests/", response_model=CapabilityRequestRead, status_code=status.HTTP_201_CREATED) async def create_request( body: CapabilityRequestCreate, session: AsyncSession = Depends(get_session), ) -> CapabilityRequest: requesting_domain = await _resolve_domain(body.requesting_domain, session, Domain) fulfilling_domain_id = None catalog_entry_id = body.catalog_entry_id routing_note = None if catalog_entry_id: catalog_entry = await _resolve_catalog_entry(catalog_entry_id, session) fulfilling_domain_id = catalog_entry.domain_id routing_note = "Routed by explicit catalog entry." else: catalog_entry = await _find_catalog_route(body.capability_type, session) if catalog_entry: catalog_entry_id = catalog_entry.id fulfilling_domain_id = catalog_entry.domain_id routing_note = "Routed by first active catalog match for capability_type." req = CapabilityRequest( title=body.title, description=body.description, capability_type=body.capability_type, priority=body.priority, requesting_domain_id=requesting_domain.id, requesting_agent=body.requesting_agent, request_context=body.request_context, fulfilling_domain_id=fulfilling_domain_id, catalog_entry_id=catalog_entry_id, routing_note=routing_note, ) session.add(req) await session.commit() await session.refresh(req) return req @router.get("/capability-requests/", response_model=list[CapabilityRequestRead]) async def list_requests( domain: str | None = Query(None, description="Filter by requesting or fulfilling domain slug"), status_filter: str | None = Query(None, alias="status"), capability_type: str | None = Query(None), session: AsyncSession = Depends(get_session), ) -> list[CapabilityRequest]: q = select(CapabilityRequest).order_by(CapabilityRequest.created_at.desc()) if domain: domain_obj = await _resolve_domain(domain, session, Domain) q = q.where( (CapabilityRequest.requesting_domain_id == domain_obj.id) | (CapabilityRequest.fulfilling_domain_id == domain_obj.id) ) if status_filter: q = q.where(CapabilityRequest.status == status_filter) if capability_type: q = q.where(CapabilityRequest.capability_type == capability_type) result = await session.execute(q) return list(result.scalars().all()) @router.get("/capability-requests/{request_id}", response_model=CapabilityRequestRead) async def get_request( request_id: uuid.UUID, session: AsyncSession = Depends(get_session), ) -> CapabilityRequest: return await _get_request_or_404(request_id, session) @router.post("/capability-requests/{request_id}/accept", response_model=CapabilityRequestRead) async def accept_request( request_id: uuid.UUID, body: CapabilityRequestAccept, session: AsyncSession = Depends(get_session), ) -> CapabilityRequest: req = await _get_request_or_404(request_id, session) req.status = "accepted" req.fulfilling_agent = body.fulfilling_agent req.fulfillment_context = body.fulfillment_context req.accepted_at = datetime.now(tz=timezone.utc) await session.commit() await session.refresh(req) return req @router.patch("/capability-requests/{request_id}", response_model=CapabilityRequestRead) async def patch_request( request_id: uuid.UUID, body: CapabilityRequestPatch, session: AsyncSession = Depends(get_session), ) -> CapabilityRequest: req = await _get_request_or_404(request_id, session) if body.catalog_entry_id is not None: catalog_entry = await _resolve_catalog_entry(body.catalog_entry_id, session) req.catalog_entry_id = catalog_entry.id req.fulfilling_domain_id = catalog_entry.domain_id if body.priority is not None: req.priority = body.priority if body.request_context is not None: req.request_context = body.request_context if body.fulfillment_context is not None: req.fulfillment_context = body.fulfillment_context await session.commit() await session.refresh(req) return req @router.patch("/capability-requests/{request_id}/status", response_model=CapabilityRequestRead) async def patch_request_status( request_id: uuid.UUID, body: CapabilityRequestStatusPatch, session: AsyncSession = Depends(get_session), ) -> CapabilityRequest: req = await _get_request_or_404(request_id, session) req.status = body.status if body.note: req.resolution_note = body.note if body.status == "completed": req.completed_at = datetime.now(tz=timezone.utc) await session.commit() await session.refresh(req) return req @router.post("/capability-requests/{request_id}/dispute", response_model=CapabilityRequestRead) async def dispute_request( request_id: uuid.UUID, body: CapabilityRequestDispute, session: AsyncSession = Depends(get_session), ) -> CapabilityRequest: req = await _get_request_or_404(request_id, session) req.status = "routing_disputed" req.dispute_reason = body.reason req.disputed_by = body.disputed_by req.dispute_suggested_domain = body.suggested_domain req.disputed_at = datetime.now(tz=timezone.utc) await session.commit() await session.refresh(req) return req return router async def _resolve_domain( slug: str, session: AsyncSession, domain_model: type[Domain], ) -> Any: result = await session.execute(select(domain_model).where(domain_model.slug == slug)) domain = result.scalar_one_or_none() if domain is None: raise HTTPException(status_code=404, detail=f"Domain '{slug}' not found") return domain async def _resolve_repo( slug: str, session: AsyncSession, repo_model: type[ManagedRepo], ) -> Any: result = await session.execute(select(repo_model).where(repo_model.slug == slug)) repo = result.scalar_one_or_none() if repo is None: raise HTTPException(status_code=404, detail=f"Repo '{slug}' not found") return repo async def _resolve_catalog_entry(entry_id: uuid.UUID, session: AsyncSession) -> CapabilityCatalog: entry = await session.get(CapabilityCatalog, entry_id) if entry is None: raise HTTPException(status_code=404, detail=f"Catalog entry '{entry_id}' not found") return entry async def _find_catalog_route( capability_type: str, session: AsyncSession, ) -> CapabilityCatalog | None: result = await session.execute( select(CapabilityCatalog) .where(CapabilityCatalog.capability_type == capability_type) .where(CapabilityCatalog.status == "active") .order_by(CapabilityCatalog.created_at.desc()) ) return result.scalars().first() async def _get_request_or_404(request_id: uuid.UUID, session: AsyncSession) -> CapabilityRequest: req = await session.get(CapabilityRequest, request_id) if req is None: raise HTTPException(status_code=404, detail=f"Capability request '{request_id}' not found") return req