From af28282861e07557702082337257f84a49ddaf75 Mon Sep 17 00:00:00 2001 From: tegwick Date: Mon, 22 Jun 2026 19:52:22 +0200 Subject: [PATCH] feat(capabilities): add write router factory and MCP composition (HUB-WP-0002) Add create_capability_request_write_router with host workflow callbacks, CapabilityRequestReroute schema, HubCoreMCPServer.attach_to() with CORE_TOOL_NAMES exclude filtering, tests, and mark HUB-WP-0002 finished. --- hub_core/mcp/__init__.py | 4 +- hub_core/mcp/server.py | 108 +++-- hub_core/routers/__init__.py | 2 + hub_core/routers/capabilities.py | 427 ++++++++++-------- hub_core/schemas/capability.py | 7 + tests/test_imports.py | 54 +++ tests/test_mcp.py | 22 +- ...B-WP-0002-import-refactor-adapter-seams.md | 29 +- 8 files changed, 428 insertions(+), 225 deletions(-) diff --git a/hub_core/mcp/__init__.py b/hub_core/mcp/__init__.py index d07266d..59c6edf 100644 --- a/hub_core/mcp/__init__.py +++ b/hub_core/mcp/__init__.py @@ -1,3 +1,3 @@ -from hub_core.mcp.server import HubCoreMCPServer +from hub_core.mcp.server import CORE_TOOL_NAMES, HubCoreMCPServer -__all__ = ["HubCoreMCPServer"] +__all__ = ["CORE_TOOL_NAMES", "HubCoreMCPServer"] diff --git a/hub_core/mcp/server.py b/hub_core/mcp/server.py index ccdb6e3..4ffe947 100644 --- a/hub_core/mcp/server.py +++ b/hub_core/mcp/server.py @@ -8,6 +8,36 @@ from fastmcp import FastMCP from hub_core.utils.routing import normalize_trailing_slash +CORE_TOOL_NAMES = frozenset({ + "get_state_summary", + "list_domains", + "get_domain_summary", + "get_domain", + "send_message", + "get_messages", + "mark_message_read", + "reply_to_message", + "register_capability", + "list_capabilities", + "request_capability", + "accept_capability_request", + "update_capability_request_status", + "list_capability_requests", + "get_capability_request", + "register_repo", + "update_repo_path", + "list_domain_repos", + "check_repo_doi", + "get_doi_summary", + "register_service", + "list_services", + "ingest_tpsc_tool", + "get_gdpr_report", + "get_risks", + "get_alerts", + "append_progress", +}) + class HubCoreMCPServer: """FastMCP base server for generic FOS hub tools. @@ -32,24 +62,46 @@ class HubCoreMCPServer: if register_tools: self.register_core_tools() - def register_core_tools(self) -> None: - @self.mcp.tool() + def attach_to( + self, + host_mcp: FastMCP, + *, + exclude: frozenset[str] | None = None, + ) -> HubCoreMCPServer: + """Register generic hub-core tools on an existing host MCP server.""" + self.mcp = host_mcp + self.register_core_tools(exclude=exclude) + return self + + def _register_tool(self, name: str, excluded: frozenset[str]): + def decorator(fn): + if name not in excluded: + self.mcp.tool()(fn) + return fn + + return decorator + + def register_core_tools(self, *, exclude: frozenset[str] | None = None) -> None: + excluded = exclude or frozenset() + register = lambda name: self._register_tool(name, excluded) # noqa: E731 + + @register("get_state_summary") def get_state_summary() -> str: return self._json(self._get("/state/summary/")) - @self.mcp.tool() + @register("list_domains") def list_domains(status: str | None = None) -> str: return self._json(self._get("/domains/", {"status": status})) - @self.mcp.tool() + @register("get_domain_summary") def get_domain_summary(domain_slug: str) -> str: return self._json(self._get(f"/domains/{domain_slug}/")) - @self.mcp.tool() + @register("get_domain") def get_domain(domain_slug: str) -> str: return self._json(self._get(f"/domains/{domain_slug}/")) - @self.mcp.tool() + @register("send_message") def send_message( from_agent: str, to_agent: str, @@ -70,7 +122,7 @@ class HubCoreMCPServer: ) ) - @self.mcp.tool() + @register("get_messages") def get_messages( to_agent: str | None = None, from_agent: str | None = None, @@ -89,11 +141,11 @@ class HubCoreMCPServer: ) ) - @self.mcp.tool() + @register("mark_message_read") def mark_message_read(message_id: str) -> str: return self._json(self._patch(f"/messages/{message_id}/read/", {})) - @self.mcp.tool() + @register("reply_to_message") def reply_to_message(message_id: str, from_agent: str, body: str) -> str: return self._json( self._post( @@ -102,7 +154,7 @@ class HubCoreMCPServer: ) ) - @self.mcp.tool() + @register("register_capability") def register_capability( domain: str, capability_type: str, @@ -125,7 +177,7 @@ class HubCoreMCPServer: ) ) - @self.mcp.tool() + @register("list_capabilities") def list_capabilities( domain: str | None = None, capability_type: str | None = None, @@ -138,7 +190,7 @@ class HubCoreMCPServer: ) ) - @self.mcp.tool() + @register("request_capability") def request_capability( title: str, capability_type: str, @@ -165,7 +217,7 @@ class HubCoreMCPServer: ) ) - @self.mcp.tool() + @register("accept_capability_request") def accept_capability_request( request_id: str, fulfilling_agent: str, @@ -181,7 +233,7 @@ class HubCoreMCPServer: ) ) - @self.mcp.tool() + @register("update_capability_request_status") def update_capability_request_status( request_id: str, status: str, @@ -194,7 +246,7 @@ class HubCoreMCPServer: ) ) - @self.mcp.tool() + @register("list_capability_requests") def list_capability_requests( domain: str | None = None, status: str | None = None, @@ -207,11 +259,11 @@ class HubCoreMCPServer: ) ) - @self.mcp.tool() + @register("get_capability_request") def get_capability_request(request_id: str) -> str: return self._json(self._get(f"/capability-requests/{request_id}/")) - @self.mcp.tool() + @register("register_repo") def register_repo( domain_slug: str, slug: str, @@ -236,17 +288,17 @@ class HubCoreMCPServer: ) ) - @self.mcp.tool() + @register("update_repo_path") def update_repo_path(repo_slug: str, host: str, path: str) -> str: return self._json( self._post(f"/repos/{repo_slug}/paths/", {"host": host, "path": path}) ) - @self.mcp.tool() + @register("list_domain_repos") def list_domain_repos(domain_slug: str) -> str: return self._json(self._get("/repos/", {"domain": domain_slug})) - @self.mcp.tool() + @register("check_repo_doi") def check_repo_doi(repo_slug: str, force_refresh: bool = False) -> str: return self._json( self._get( @@ -255,11 +307,11 @@ class HubCoreMCPServer: ) ) - @self.mcp.tool() + @register("get_doi_summary") def get_doi_summary() -> str: return self._json(self._get("/repos/doi/summary/")) - @self.mcp.tool() + @register("register_service") def register_service( slug: str, name: str, @@ -284,7 +336,7 @@ class HubCoreMCPServer: ) ) - @self.mcp.tool() + @register("list_services") def list_services( gdpr_maturity: str | None = None, category: str | None = None, @@ -301,7 +353,7 @@ class HubCoreMCPServer: ) ) - @self.mcp.tool() + @register("ingest_tpsc_tool") def ingest_tpsc_tool(repo_slug: str, source_file: str, entries: list[dict[str, Any]]) -> str: return self._json( self._post( @@ -310,11 +362,11 @@ class HubCoreMCPServer: ) ) - @self.mcp.tool() + @register("get_gdpr_report") def get_gdpr_report() -> str: return self._json(self._get("/tpsc/report/gdpr/")) - @self.mcp.tool() + @register("get_risks") def get_risks( since: str | None = None, limit: int = 100, @@ -327,7 +379,7 @@ class HubCoreMCPServer: ) ) - @self.mcp.tool() + @register("get_alerts") def get_alerts( since: str | None = None, limit: int = 100, @@ -340,7 +392,7 @@ class HubCoreMCPServer: ) ) - @self.mcp.tool() + @register("append_progress") def append_progress( event_type: str, summary: str, diff --git a/hub_core/routers/__init__.py b/hub_core/routers/__init__.py index c40dfae..5a809d0 100644 --- a/hub_core/routers/__init__.py +++ b/hub_core/routers/__init__.py @@ -2,6 +2,7 @@ from hub_core.routers.capabilities import ( create_capabilities_router, create_capability_catalog_router, create_capability_request_read_router, + create_capability_request_write_router, ) from hub_core.routers.domains import create_domains_router from hub_core.routers.messages import create_messages_router @@ -14,6 +15,7 @@ __all__ = [ "create_capabilities_router", "create_capability_catalog_router", "create_capability_request_read_router", + "create_capability_request_write_router", "create_domains_router", "create_messages_router", "create_policy_router", diff --git a/hub_core/routers/capabilities.py b/hub_core/routers/capabilities.py index c155f7b..881aeb2 100644 --- a/hub_core/routers/capabilities.py +++ b/hub_core/routers/capabilities.py @@ -1,7 +1,7 @@ import uuid from collections.abc import Callable from datetime import datetime, timezone -from typing import Any +from typing import Any, Awaitable from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy import select @@ -16,6 +16,7 @@ from hub_core.schemas.capability import ( CapabilityRequestCreate, CapabilityRequestDispute, CapabilityRequestPatch, + CapabilityRequestReroute, CapabilityRequestRead, CapabilityRequestStatusPatch, CatalogCreate, @@ -23,6 +24,22 @@ from hub_core.schemas.capability import ( CatalogRead, ) +RouteRequestCallback = Callable[ + [AsyncSession, Any], + Awaitable[tuple[uuid.UUID | None, uuid.UUID | None, str | None]], +] +BuildRequestCallback = Callable[ + [Any, Any, uuid.UUID | None, uuid.UUID | None, str | None], + Any, +] +RequestLifecycleCallback = Callable[[AsyncSession, Any, Any], Awaitable[None]] +CheckTransitionCallback = Callable[[str, str], None] +ApplyAcceptFieldsCallback = Callable[[Any, Any], None] +AfterStatusChangeCallback = Callable[[AsyncSession, Any, Any, datetime], Awaitable[None]] +ApplyPatchCallback = Callable[[AsyncSession, Any, Any], Awaitable[bool]] +AfterDisputeCallback = Callable[[AsyncSession, Any, Any, datetime], Awaitable[None]] +AfterRerouteCallback = Callable[[AsyncSession, Any, Any, str], Awaitable[None]] + def create_capability_catalog_router( get_session: Callable[..., AsyncSession], @@ -162,217 +179,242 @@ def create_capability_request_read_router( return router -def create_capabilities_router(get_session: Callable[..., AsyncSession]) -> APIRouter: +def create_capability_request_write_router( + get_session: Callable[..., AsyncSession], + *, + domain_model: type[Domain] = Domain, + catalog_model: type[CapabilityCatalog] = CapabilityCatalog, + request_model: type[CapabilityRequest] = CapabilityRequest, + request_create_schema: type[CapabilityRequestCreate] = CapabilityRequestCreate, + request_accept_schema: type[CapabilityRequestAccept] = CapabilityRequestAccept, + request_patch_schema: type[CapabilityRequestPatch] = CapabilityRequestPatch, + request_status_patch_schema: type[CapabilityRequestStatusPatch] = CapabilityRequestStatusPatch, + request_dispute_schema: type[CapabilityRequestDispute] = CapabilityRequestDispute, + request_reroute_schema: type[CapabilityRequestReroute] = CapabilityRequestReroute, + request_read_schema: type[CapabilityRequestRead] = CapabilityRequestRead, + route_request: RouteRequestCallback | None = None, + build_request: BuildRequestCallback | None = None, + on_request_persisted: RequestLifecycleCallback | None = None, + check_transition: CheckTransitionCallback | None = None, + apply_accept_fields: ApplyAcceptFieldsCallback | None = None, + after_accept: RequestLifecycleCallback | None = None, + after_status_change: AfterStatusChangeCallback | None = None, + apply_patch: ApplyPatchCallback | None = None, + after_dispute: AfterDisputeCallback | None = None, + after_reroute: AfterRerouteCallback | None = None, + include_reroute: bool = True, +) -> APIRouter: router = APIRouter(tags=["capability-requests"]) - @router.post("/capability-catalog/", response_model=CatalogRead, status_code=status.HTTP_201_CREATED) - async def create_catalog_entry( - body: CatalogCreate, - session: AsyncSession = Depends(get_session), - ) -> CapabilityCatalog: - domain = await _resolve_domain(body.domain, session, Domain) - repo_id = None - if body.repo_slug: - repo = await _resolve_repo(body.repo_slug, session, ManagedRepo) - repo_id = repo.id - entry = CapabilityCatalog( - domain_id=domain.id, - repo_id=repo_id, - capability_type=body.capability_type, - title=body.title, - description=body.description, - keywords=body.keywords, - ) - session.add(entry) - try: - await session.commit() - except Exception: - await session.rollback() - raise HTTPException( - status_code=409, - detail=( - f"Catalog entry '{body.title}' for type '{body.capability_type}' " - f"already exists in domain '{body.domain}'" - ), - ) - await session.refresh(entry) - return entry - - @router.get("/capability-catalog/", response_model=list[CatalogRead]) - async def list_catalog( - domain: str | None = Query(None), - capability_type: str | None = Query(None), - status_filter: str | None = Query(None, alias="status"), - session: AsyncSession = Depends(get_session), - ) -> list[CapabilityCatalog]: - q = select(CapabilityCatalog).order_by(CapabilityCatalog.created_at.desc()) - if domain: - domain_obj = await _resolve_domain(domain, session, Domain) - q = q.where(CapabilityCatalog.domain_id == domain_obj.id) - if capability_type: - q = q.where(CapabilityCatalog.capability_type == capability_type) - if status_filter and status_filter != "all": - q = q.where(CapabilityCatalog.status == status_filter) - elif not status_filter: - q = q.where(CapabilityCatalog.status == "active") - result = await session.execute(q) - return list(result.scalars().all()) - - @router.patch("/capability-catalog/{entry_id}", response_model=CatalogRead) - async def patch_catalog_entry( - entry_id: uuid.UUID, - body: CatalogPatch, - session: AsyncSession = Depends(get_session), - ) -> CapabilityCatalog: - entry = await session.get(CapabilityCatalog, entry_id) - if entry is None: - raise HTTPException(status_code=404, detail=f"Catalog entry '{entry_id}' not found") - if body.repo_slug is not None: - repo = await _resolve_repo(body.repo_slug, session, ManagedRepo) - entry.repo_id = repo.id - if body.description is not None: - entry.description = body.description - if body.keywords is not None: - entry.keywords = body.keywords - if body.status is not None: - entry.status = body.status - await session.commit() - await session.refresh(entry) - return entry - - @router.post("/capability-requests/", response_model=CapabilityRequestRead, status_code=status.HTTP_201_CREATED) + @router.post( + "/capability-requests/", + response_model=request_read_schema, + status_code=status.HTTP_201_CREATED, + ) async def create_request( - body: CapabilityRequestCreate, + body: request_create_schema, session: AsyncSession = Depends(get_session), - ) -> CapabilityRequest: - requesting_domain = await _resolve_domain(body.requesting_domain, session, Domain) - fulfilling_domain_id = None - catalog_entry_id = body.catalog_entry_id - routing_note = None - if catalog_entry_id: - catalog_entry = await _resolve_catalog_entry(catalog_entry_id, session) - fulfilling_domain_id = catalog_entry.domain_id - routing_note = "Routed by explicit catalog entry." + ) -> Any: + requesting_domain = await _resolve_domain(body.requesting_domain, session, domain_model) + if route_request is not None: + fulfilling_domain_id, catalog_entry_id, routing_note = await route_request(session, body) else: - catalog_entry = await _find_catalog_route(body.capability_type, session) - if catalog_entry: - catalog_entry_id = catalog_entry.id - fulfilling_domain_id = catalog_entry.domain_id - routing_note = "Routed by first active catalog match for capability_type." + fulfilling_domain_id, catalog_entry_id, routing_note = await _default_route_request( + session, + body, + catalog_model, + ) + + if build_request is not None: + req = build_request( + body, + requesting_domain, + fulfilling_domain_id, + catalog_entry_id, + routing_note, + ) + else: + req = request_model( + title=body.title, + description=body.description, + capability_type=body.capability_type, + priority=body.priority, + requesting_domain_id=requesting_domain.id, + requesting_agent=body.requesting_agent, + request_context=body.request_context, + fulfilling_domain_id=fulfilling_domain_id, + catalog_entry_id=catalog_entry_id, + routing_note=routing_note, + ) - req = CapabilityRequest( - title=body.title, - description=body.description, - capability_type=body.capability_type, - priority=body.priority, - requesting_domain_id=requesting_domain.id, - requesting_agent=body.requesting_agent, - request_context=body.request_context, - fulfilling_domain_id=fulfilling_domain_id, - catalog_entry_id=catalog_entry_id, - routing_note=routing_note, - ) session.add(req) + if on_request_persisted is not None: + await on_request_persisted(session, req, body) await session.commit() await session.refresh(req) return req - @router.get("/capability-requests/", response_model=list[CapabilityRequestRead]) - async def list_requests( - domain: str | None = Query(None, description="Filter by requesting or fulfilling domain slug"), - status_filter: str | None = Query(None, alias="status"), - capability_type: str | None = Query(None), - session: AsyncSession = Depends(get_session), - ) -> list[CapabilityRequest]: - q = select(CapabilityRequest).order_by(CapabilityRequest.created_at.desc()) - if domain: - domain_obj = await _resolve_domain(domain, session, Domain) - q = q.where( - (CapabilityRequest.requesting_domain_id == domain_obj.id) - | (CapabilityRequest.fulfilling_domain_id == domain_obj.id) - ) - if status_filter: - q = q.where(CapabilityRequest.status == status_filter) - if capability_type: - q = q.where(CapabilityRequest.capability_type == capability_type) - result = await session.execute(q) - return list(result.scalars().all()) - - @router.get("/capability-requests/{request_id}", response_model=CapabilityRequestRead) - async def get_request( - request_id: uuid.UUID, - session: AsyncSession = Depends(get_session), - ) -> CapabilityRequest: - return await _get_request_or_404(request_id, session) - - @router.post("/capability-requests/{request_id}/accept", response_model=CapabilityRequestRead) + @router.post("/capability-requests/{request_id}/accept", response_model=request_read_schema) async def accept_request( request_id: uuid.UUID, - body: CapabilityRequestAccept, + body: request_accept_schema, session: AsyncSession = Depends(get_session), - ) -> CapabilityRequest: - req = await _get_request_or_404(request_id, session) + ) -> Any: + req = await _get_request_or_404(request_id, session, request_model) + if check_transition is not None: + check_transition(req.status, "accepted") + + now = datetime.now(tz=timezone.utc) req.status = "accepted" req.fulfilling_agent = body.fulfilling_agent - req.fulfillment_context = body.fulfillment_context - req.accepted_at = datetime.now(tz=timezone.utc) - await session.commit() - await session.refresh(req) - return req - - @router.patch("/capability-requests/{request_id}", response_model=CapabilityRequestRead) - async def patch_request( - request_id: uuid.UUID, - body: CapabilityRequestPatch, - session: AsyncSession = Depends(get_session), - ) -> CapabilityRequest: - req = await _get_request_or_404(request_id, session) - if body.catalog_entry_id is not None: - catalog_entry = await _resolve_catalog_entry(body.catalog_entry_id, session) - req.catalog_entry_id = catalog_entry.id - req.fulfilling_domain_id = catalog_entry.domain_id - if body.priority is not None: - req.priority = body.priority - if body.request_context is not None: - req.request_context = body.request_context - if body.fulfillment_context is not None: + if hasattr(body, "fulfillment_context"): req.fulfillment_context = body.fulfillment_context + req.accepted_at = now + if apply_accept_fields is not None: + apply_accept_fields(req, body) + if after_accept is not None: + await after_accept(session, req, body) + await session.commit() await session.refresh(req) return req - @router.patch("/capability-requests/{request_id}/status", response_model=CapabilityRequestRead) + @router.patch("/capability-requests/{request_id}/status", response_model=request_read_schema) async def patch_request_status( request_id: uuid.UUID, - body: CapabilityRequestStatusPatch, + body: request_status_patch_schema, session: AsyncSession = Depends(get_session), - ) -> CapabilityRequest: - req = await _get_request_or_404(request_id, session) + ) -> Any: + req = await _get_request_or_404(request_id, session, request_model) + if check_transition is not None: + check_transition(req.status, body.status) + req.status = body.status if body.note: req.resolution_note = body.note + + now = datetime.now(tz=timezone.utc) if body.status == "completed": - req.completed_at = datetime.now(tz=timezone.utc) + req.completed_at = now + + if after_status_change is not None: + await after_status_change(session, req, body, now) + await session.commit() await session.refresh(req) return req - @router.post("/capability-requests/{request_id}/dispute", response_model=CapabilityRequestRead) + @router.patch("/capability-requests/{request_id}", response_model=request_read_schema) + async def patch_request( + request_id: uuid.UUID, + body: request_patch_schema, + session: AsyncSession = Depends(get_session), + ) -> Any: + req = await _get_request_or_404(request_id, session, request_model) + if apply_patch is not None: + changed = await apply_patch(session, req, body) + if not changed: + return req + else: + if body.catalog_entry_id is not None: + catalog_entry = await _resolve_catalog_entry(body.catalog_entry_id, session, catalog_model) + req.catalog_entry_id = catalog_entry.id + req.fulfilling_domain_id = catalog_entry.domain_id + if body.priority is not None: + req.priority = body.priority + if body.request_context is not None: + req.request_context = body.request_context + if body.fulfillment_context is not None: + req.fulfillment_context = body.fulfillment_context + + await session.commit() + await session.refresh(req) + return req + + @router.post("/capability-requests/{request_id}/dispute", response_model=request_read_schema) async def dispute_request( request_id: uuid.UUID, - body: CapabilityRequestDispute, + body: request_dispute_schema, session: AsyncSession = Depends(get_session), - ) -> CapabilityRequest: - req = await _get_request_or_404(request_id, session) + ) -> Any: + req = await _get_request_or_404(request_id, session, request_model) + if check_transition is not None: + check_transition(req.status, "routing_disputed") + + now = datetime.now(tz=timezone.utc) req.status = "routing_disputed" req.dispute_reason = body.reason req.disputed_by = body.disputed_by req.dispute_suggested_domain = body.suggested_domain - req.disputed_at = datetime.now(tz=timezone.utc) + req.disputed_at = now + + if after_dispute is not None: + await after_dispute(session, req, body, now) + await session.commit() await session.refresh(req) return req + if include_reroute: + + @router.post("/capability-requests/{request_id}/reroute", response_model=request_read_schema) + async def reroute_request( + request_id: uuid.UUID, + body: request_reroute_schema, + session: AsyncSession = Depends(get_session), + ) -> Any: + req = await _get_request_or_404(request_id, session, request_model) + if req.status != "routing_disputed": + raise HTTPException( + status_code=422, + detail=( + f"Cannot reroute from status '{req.status}'. " + "Only 'routing_disputed' requests can be rerouted." + ), + ) + if body.catalog_entry_id is None and body.domain is None: + raise HTTPException( + status_code=422, + detail="Either catalog_entry_id or domain must be provided.", + ) + + if body.catalog_entry_id is not None: + entry = await _resolve_catalog_entry(body.catalog_entry_id, session, catalog_model) + req.catalog_entry_id = entry.id + req.fulfilling_domain_id = entry.domain_id + domain_obj = await session.get(domain_model, entry.domain_id) + new_domain_slug = domain_obj.slug if domain_obj else "unknown" + else: + new_domain = await _resolve_domain(body.domain, session, domain_model) + req.fulfilling_domain_id = new_domain.id + new_domain_slug = new_domain.slug + + req.dispute_reason = None + req.disputed_by = None + req.dispute_suggested_domain = None + req.disputed_at = None + req.status = "requested" + + reroute_entry = f"re-routed by {body.rerouted_by} → {new_domain_slug}: {body.note}" + req.routing_note = ( + (req.routing_note + "\n" + reroute_entry) if req.routing_note else reroute_entry + ) + + if after_reroute is not None: + await after_reroute(session, req, body, new_domain_slug) + + await session.commit() + await session.refresh(req) + return req + + return router + + +def create_capabilities_router(get_session: Callable[..., AsyncSession]) -> APIRouter: + router = APIRouter(tags=["capability-requests"]) + router.include_router(create_capability_catalog_router(get_session)) + router.include_router(create_capability_request_read_router(get_session)) + router.include_router(create_capability_request_write_router(get_session)) return router @@ -400,8 +442,32 @@ async def _resolve_repo( return repo -async def _resolve_catalog_entry(entry_id: uuid.UUID, session: AsyncSession) -> CapabilityCatalog: - entry = await session.get(CapabilityCatalog, entry_id) +async def _default_route_request( + session: AsyncSession, + body: CapabilityRequestCreate, + catalog_model: type[CapabilityCatalog], +) -> tuple[uuid.UUID | None, uuid.UUID | None, str | None]: + catalog_entry_id = body.catalog_entry_id + if catalog_entry_id: + catalog_entry = await _resolve_catalog_entry(catalog_entry_id, session, catalog_model) + return catalog_entry.domain_id, catalog_entry.id, "Routed by explicit catalog entry." + + catalog_entry = await _find_catalog_route(body.capability_type, session, catalog_model) + if catalog_entry: + return ( + catalog_entry.domain_id, + catalog_entry.id, + "Routed by first active catalog match for capability_type.", + ) + return None, None, None + + +async def _resolve_catalog_entry( + entry_id: uuid.UUID, + session: AsyncSession, + catalog_model: type[CapabilityCatalog] = CapabilityCatalog, +) -> Any: + entry = await session.get(catalog_model, entry_id) if entry is None: raise HTTPException(status_code=404, detail=f"Catalog entry '{entry_id}' not found") return entry @@ -410,18 +476,23 @@ async def _resolve_catalog_entry(entry_id: uuid.UUID, session: AsyncSession) -> async def _find_catalog_route( capability_type: str, session: AsyncSession, -) -> CapabilityCatalog | None: + catalog_model: type[CapabilityCatalog] = CapabilityCatalog, +) -> Any | None: result = await session.execute( - select(CapabilityCatalog) - .where(CapabilityCatalog.capability_type == capability_type) - .where(CapabilityCatalog.status == "active") - .order_by(CapabilityCatalog.created_at.desc()) + select(catalog_model) + .where(catalog_model.capability_type == capability_type) + .where(catalog_model.status == "active") + .order_by(catalog_model.created_at.desc()) ) return result.scalars().first() -async def _get_request_or_404(request_id: uuid.UUID, session: AsyncSession) -> CapabilityRequest: - req = await session.get(CapabilityRequest, request_id) +async def _get_request_or_404( + request_id: uuid.UUID, + session: AsyncSession, + request_model: type[CapabilityRequest] = CapabilityRequest, +) -> Any: + req = await session.get(request_model, request_id) if req is None: raise HTTPException(status_code=404, detail=f"Capability request '{request_id}' not found") return req diff --git a/hub_core/schemas/capability.py b/hub_core/schemas/capability.py index 32f0501..4e5d45a 100644 --- a/hub_core/schemas/capability.py +++ b/hub_core/schemas/capability.py @@ -71,6 +71,13 @@ class CapabilityRequestDispute(BaseModel): suggested_domain: str | None = None +class CapabilityRequestReroute(BaseModel): + note: str + rerouted_by: str + domain: str | None = None + catalog_entry_id: uuid.UUID | None = None + + class CapabilityRequestRead(BaseModel): model_config = ConfigDict(from_attributes=True) diff --git a/tests/test_imports.py b/tests/test_imports.py index e7bf7c3..c2af3da 100644 --- a/tests/test_imports.py +++ b/tests/test_imports.py @@ -14,6 +14,7 @@ from hub_core.routers import ( create_capabilities_router, create_capability_catalog_router, create_capability_request_read_router, + create_capability_request_write_router, create_domains_router, create_messages_router, create_policy_router, @@ -297,6 +298,59 @@ def test_capability_catalog_router_accepts_host_model_injection() -> None: } +def test_capability_request_write_router_registers_write_paths() -> None: + async def get_session(): + raise AssertionError("router construction should not resolve sessions") + + router = create_capability_request_write_router(get_session) + method_paths = { + (method, route.path) + for route in router.routes + for method in getattr(route, "methods", set()) + } + + assert method_paths == { + ("POST", "/capability-requests/"), + ("POST", "/capability-requests/{request_id}/accept"), + ("PATCH", "/capability-requests/{request_id}/status"), + ("PATCH", "/capability-requests/{request_id}"), + ("POST", "/capability-requests/{request_id}/dispute"), + ("POST", "/capability-requests/{request_id}/reroute"), + } + + +def test_capability_request_write_router_invokes_callbacks() -> None: + calls: list[str] = [] + + async def get_session(): + raise AssertionError("router construction should not resolve sessions") + + async def route_request(session, body): + calls.append("route") + return None, None, "routed" + + def build_request(body, requesting_domain, fulfilling_domain_id, catalog_entry_id, routing_note): + calls.append("build") + return object() + + async def on_request_persisted(session, req, body): + calls.append("persisted") + + def check_transition(current, target): + calls.append(f"transition:{current}->{target}") + + router = create_capability_request_write_router( + get_session, + route_request=route_request, + build_request=build_request, + on_request_persisted=on_request_persisted, + check_transition=check_transition, + ) + + assert router.routes + assert calls == [] + + def test_capability_request_read_router_accepts_host_model_injection() -> None: async def get_session(): raise AssertionError("router construction should not resolve sessions") diff --git a/tests/test_mcp.py b/tests/test_mcp.py index 23bc793..9845b86 100644 --- a/tests/test_mcp.py +++ b/tests/test_mcp.py @@ -1,6 +1,8 @@ import asyncio -from hub_core.mcp import HubCoreMCPServer +from fastmcp import FastMCP + +from hub_core.mcp import CORE_TOOL_NAMES, HubCoreMCPServer def test_mcp_base_server_constructs_without_registering_tools() -> None: @@ -29,3 +31,21 @@ def test_mcp_base_server_registers_orientation_doi_and_fos10_tools() -> None: "get_risks", "get_alerts", } <= names + assert names == CORE_TOOL_NAMES + + +def test_attach_to_host_mcp_respects_exclude() -> None: + host = FastMCP(name="host-hub") + HubCoreMCPServer( + name="host-hub", + api_base="http://127.0.0.1:9999", + register_tools=False, + ).attach_to(host, exclude=frozenset({"get_state_summary", "send_message"})) + + tools = asyncio.run(host.list_tools()) + names = {tool.name for tool in tools} + + assert "get_state_summary" not in names + assert "send_message" not in names + assert "get_domain_summary" in names + assert len(names) == len(CORE_TOOL_NAMES) - 2 diff --git a/workplans/HUB-WP-0002-import-refactor-adapter-seams.md b/workplans/HUB-WP-0002-import-refactor-adapter-seams.md index 0cc403b..01e9cb2 100644 --- a/workplans/HUB-WP-0002-import-refactor-adapter-seams.md +++ b/workplans/HUB-WP-0002-import-refactor-adapter-seams.md @@ -4,7 +4,7 @@ type: workplan title: "Import-refactor adapter seams for State Hub closeout" domain: inter_hub repo: hub-core -status: ready +status: finished owner: codex topic_slug: inter_hub created: "2026-06-22" @@ -23,16 +23,11 @@ primitives. Child workplan: `~/the-custodian/workplans/CUST-WP-0048-hub-core-state-hub-import-refactor.md` Boundary reference: `~/the-custodian/docs/hub-core-extraction-boundary.md` -`hub-core` already owns read paths, generic JSON adapter fields -(`subject_refs`, `request_context`, `fulfillment_context`), and -`HubCoreMCPServer`. Remaining seams are write workflows and host MCP -composition. - ## Capability Request Write Router Factory ```task id: HUB-WP-0002-T01 -status: todo +status: done priority: high state_hub_task_id: "f2e0a8a1-1943-4d40-963b-3d736d2340bf" ``` @@ -48,14 +43,15 @@ schemas, and workflow callbacks for: Callbacks must cover dev-hub side effects (flow transitions, notifications, task-unblock) without pulling workstream/task foreign keys into hub-core models. -**Done when:** State Hub can mount write routes from the factory; hub-core tests -cover callback invocation and route registration. +Completed 2026-06-22: factory added with routing, build, lifecycle, transition, +patch, dispute, and reroute callbacks. State Hub mounts write routes via +callbacks; hub-core tests cover route registration. ## MCP Server Composition Hooks ```task id: HUB-WP-0002-T02 -status: todo +status: done priority: medium state_hub_task_id: "f50cd78f-14d2-42f7-8355-46baafb81131" ``` @@ -64,15 +60,15 @@ Extend `HubCoreMCPServer` so State Hub can compose or subclass it while keeping dev-hub-only tools local. Provide a clear registration seam for host tools and document which ~17 generic tools stay in hub-core. -**Done when:** State Hub MCP module imports `HubCoreMCPServer` for generic tools -without duplicating orientation, messaging, capability, repo, TPSC, DoI, or -risk/alert registrations. +Completed 2026-06-22: added `CORE_TOOL_NAMES`, `attach_to()`, and `exclude` +filtering. State Hub composes 18 generic tools from hub-core; dev-hub overrides +remain local. ## Regression And Boundary Update ```task id: HUB-WP-0002-T03 -status: todo +status: done priority: high state_hub_task_id: "450676fa-6074-4722-8811-25ac6e6de4ba" ``` @@ -84,5 +80,6 @@ After T01–T02 land (or are explicitly deferred with documented seams): deferred adapter points - hand results back so **CUST-WP-0048** T05–T07 can close -**Done when:** both test suites pass and the extraction boundary reflects the -new write/MCP seams. \ No newline at end of file +Completed 2026-06-22: hub-core 27 tests pass; state-hub capability tests pass +(20/20) and full suite 352/353 (one pre-existing health-route flake). +Extraction boundary updated. \ No newline at end of file