generated from coulomb/repo-seed
Add create_capability_request_write_router with host workflow callbacks, CapabilityRequestReroute schema, HubCoreMCPServer.attach_to() with CORE_TOOL_NAMES exclude filtering, tests, and mark HUB-WP-0002 finished.
52 lines
1.5 KiB
Python
52 lines
1.5 KiB
Python
import asyncio
|
|
|
|
from fastmcp import FastMCP
|
|
|
|
from hub_core.mcp import CORE_TOOL_NAMES, HubCoreMCPServer
|
|
|
|
|
|
def test_mcp_base_server_constructs_without_registering_tools() -> None:
|
|
server = HubCoreMCPServer(
|
|
name="test-hub",
|
|
api_base="http://127.0.0.1:9999/",
|
|
register_tools=False,
|
|
)
|
|
|
|
assert server.api_base == "http://127.0.0.1:9999"
|
|
assert server.mcp.name == "test-hub"
|
|
assert server._clean({"a": None, "b": 1}) == {"b": 1}
|
|
|
|
|
|
def test_mcp_base_server_registers_orientation_doi_and_fos10_tools() -> None:
|
|
server = HubCoreMCPServer(name="test-hub", api_base="http://127.0.0.1:9999")
|
|
|
|
tools = asyncio.run(server.mcp.list_tools())
|
|
names = {tool.name for tool in tools}
|
|
|
|
assert {
|
|
"get_state_summary",
|
|
"get_domain_summary",
|
|
"check_repo_doi",
|
|
"get_doi_summary",
|
|
"get_risks",
|
|
"get_alerts",
|
|
} <= names
|
|
assert names == CORE_TOOL_NAMES
|
|
|
|
|
|
def test_attach_to_host_mcp_respects_exclude() -> None:
|
|
host = FastMCP(name="host-hub")
|
|
HubCoreMCPServer(
|
|
name="host-hub",
|
|
api_base="http://127.0.0.1:9999",
|
|
register_tools=False,
|
|
).attach_to(host, exclude=frozenset({"get_state_summary", "send_message"}))
|
|
|
|
tools = asyncio.run(host.list_tools())
|
|
names = {tool.name for tool in tools}
|
|
|
|
assert "get_state_summary" not in names
|
|
assert "send_message" not in names
|
|
assert "get_domain_summary" in names
|
|
assert len(names) == len(CORE_TOOL_NAMES) - 2
|