一文快速了解高性能網(wǎng)絡(luò)通信框架 Netty
傳統(tǒng)BIO與不完美的解決方案
1.BIO編程及其問題
Java程序員早期進(jìn)行網(wǎng)絡(luò)程序開發(fā)的時(shí)候,采用的都是傳統(tǒng)BIO模式進(jìn)行開發(fā),這種模式工作流程非常簡單:
- 阻塞監(jiān)聽。
- 收到連接分配現(xiàn)場處理該連接。
- 循環(huán)回到第一步。
這種做法在少量的客戶端連接下還是可以保證可靠運(yùn)行的,我們都知道每當(dāng)服務(wù)器啟動(dòng)就會(huì)其一個(gè)端口監(jiān)聽連接,筆者以自己的服務(wù)器的1234號(hào)進(jìn)程為例:
netstat -ano | findstr :1234
此時(shí)對(duì)應(yīng)的端口使用情況為只有一個(gè)8080端口監(jiān)聽:
TCP 0.0.0.0:8080 0.0.0.0:0 LISTENING 11312
TCP [::]:8080 [::]:0 LISTENING 11312
每當(dāng)我們一個(gè)客戶端接入,服務(wù)器就會(huì)為其分配一個(gè)端口端口處理和該客戶端的收發(fā),以筆者的程序?yàn)槔?,可以看到此時(shí)該進(jìn)程正使用58891與客戶端socket進(jìn)程交互:
TCP 0.0.0.0:8080 0.0.0.0:0 LISTENING 11312
TCP 127.0.0.1:8080 127.0.0.1:58891 ESTABLISHED 11312
TCP 127.0.0.1:58891 127.0.0.1:8080 ESTABLISHED 4928
TCP [::]:8080 [::]:0 LISTENING 11312
由此可知,一旦遇到高并發(fā)IO讀寫,由于一個(gè)客戶端綁定一個(gè)線程的模式,所以每一個(gè)端口號(hào)的收發(fā)都需要一個(gè)線程進(jìn)程處理,如果有大量連接接入勢(shì)必導(dǎo)致頻繁的線程上下文切換進(jìn)而導(dǎo)致各種資源的消耗,由此導(dǎo)致著名的C10k問題:
這里筆者也給出一段比較基礎(chǔ)的bio代碼示例供讀者參考一下這種實(shí)現(xiàn),可以看到我們的主線程阻塞監(jiān)聽,每當(dāng)收到一個(gè)新的連接就創(chuàng)建一個(gè)線程處理這個(gè)客戶端的讀寫請(qǐng)求:
public class IOServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(8888);
//創(chuàng)建一個(gè)線程等待連接進(jìn)來的客戶端
new Thread(() -> waitConnect(serverSocket)).start();
}
private static void waitConnect(ServerSocket serverSocket) {
while (true) {
try {
// 1. 阻塞方法獲取新連接
Socket socket = serverSocket.accept();
// 2. 每個(gè)客戶端來了,就專門創(chuàng)建一個(gè)新的連接處理
new Thread(() -> {
int len;
byte[] data = new byte[1024];
try {
InputStream inputStream = socket.getInputStream();
// 3. 按字節(jié)流方式讀取數(shù)據(jù)
while ((len = inputStream.read(data)) != -1) {
System.out.println(Thread.currentThread().getName() + " receive msg:" + new String(data, 0, len));
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}).start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
2.epoll事件驅(qū)動(dòng)編程
于是就有了epoll事件驅(qū)動(dòng)編程這一方案,也就是我們常說的IO多路復(fù)用,該方案的理念是將所有socket的讀寫事件注冊(cè)到epoll上。 以我們的服務(wù)端為例,創(chuàng)建socket監(jiān)聽連接時(shí)就會(huì)將自己的感興趣的連接事件注冊(cè)到epoll上,隨后服務(wù)端就可以在循環(huán)中非阻塞的獲取是否有連接接入,每當(dāng)有連接接入就會(huì)為請(qǐng)求客戶端建立連接并將其讀寫事件注冊(cè)到處理客戶都安的epoll上,后續(xù)所有客戶端讀寫請(qǐng)求都會(huì)交給這個(gè)epoll處理,由此實(shí)現(xiàn)最少的線程做最多的事情,提升性能同時(shí)還降低消耗:
對(duì)此我們也用一段偽代碼展示一下事件驅(qū)動(dòng)編程:
//創(chuàng)建epoll
EpollFd epollFd=createEpoll();
//將文件描述符注冊(cè)到epoll上
epollCreateCtl(epollFd,socketFdList)
while(true){
//收到epoll推送過來的事件
List<event> eventList=epollWait(epollFd);
//遍歷并處理事件
eventList.foreach(e->handler(e));
}
3.JDK傳統(tǒng)事件驅(qū)動(dòng)編程
基于上述描述我們對(duì)事件驅(qū)動(dòng)編程有了初步的了解,接下來我們就來看看原生的jdk是如何實(shí)現(xiàn)NIO事件驅(qū)動(dòng)編程的。
首先我們需要?jiǎng)?chuàng)建一個(gè)serverSelector用于非阻塞查詢是否有就緒的socket事件,一旦收到客戶端的請(qǐng)求后,為其建立連接之后,將客戶端的讀寫事件注冊(cè)到clientSelector,由clientSelector的線程處理這些客戶端讀寫,而serverSelector依然負(fù)責(zé)非阻塞輪詢監(jiān)聽是否有新連接:
簡單介紹之后我們給出Selector 聲明:
//負(fù)責(zé)輪詢是否有新連接
Selector serverSelector = Selector.open();
//負(fù)責(zé)處理每個(gè)客戶端是否有數(shù)據(jù)可讀
Selector clientSelector = Selector.open();
然后我們使用這個(gè)socket非阻塞輪詢就緒的連接事件并注冊(cè)到客戶端的epoll模型上:
new Thread(() -> {
try {
//創(chuàng)建服務(wù)端socket監(jiān)聽通道
ServerSocketChannel listenerChannel = ServerSocketChannel.open();
//綁定端口
listenerChannel.socket().bind(new InetSocketAddress(8888));
//設(shè)置為非阻塞監(jiān)聽
listenerChannel.configureBlocking(false);
//注冊(cè)感興趣的事件為OP_ACCEPT事件,即可處理當(dāng)前socket的ACCEPT連接接入事件
listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT);
//循環(huán)非阻塞獲取就緒事件
while (true) {
//阻塞1毫秒查看是否有新的連接進(jìn)來
if (serverSelector.select(1) > 0) {
//查看是否有就緒的事件
Set<SelectionKey> set = serverSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = set.iterator();
//遍歷事件
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
//判斷是否是新的socket連接加入
if (key.isAcceptable()) {
System.out.println("有新的socket連接加入");
//接收此通道與socket的連接
SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
clientChannel.configureBlocking(false);
//服務(wù)端監(jiān)測(cè)到新連接之后,不再創(chuàng)建一個(gè)新線程,而是直接將
//新連接綁定到clientSelector上
clientChannel.register(clientSelector, SelectionKey.OP_READ);
keyIterator.remove();
}
}
}
}
} catch (Exception e) {
}
}).start();
我們?cè)賮砜纯纯蛻舳颂幚砭€程邏輯,和上文差不多,都是非阻塞輪詢客戶端就緒的事件,我們以輸出的方式模擬事件處理,然后進(jìn)入下一次循環(huán):
new Thread(() -> {
while (true) {
try {
//通過clientSelector.select(1)方法可以輪詢出來,進(jìn)而批量處理
if (clientSelector.select(1) > 0) {
//獲取就緒的客戶端事件
Set<SelectionKey> set = clientSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = set.iterator();
//循環(huán)遍歷處理客戶端事件,完成后將該key移除,并在此注冊(cè)一個(gè)OP_READ等待下一次該socket就緒
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isReadable()) {
try {
//獲取事件的通道
SocketChannel clientChannel = (SocketChannel) key.channel();
//數(shù)據(jù)的讀寫面向Buffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//讀取數(shù)據(jù)到buffer中
clientChannel.read(byteBuffer);
byteBuffer.flip();
System.out.println(Thread.currentThread().getName() + ":" + Charset.defaultCharset().newDecoder().decode(byteBuffer).toString());
} catch (Exception e) {
} finally {
keyIterator.remove();
key.interestOps(SelectionKey.OP_READ);
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
可以看出原生nio雖然相對(duì)bio減小了一定開銷且提高一定的性能,但是缺點(diǎn)也很明顯:
原生的JDK的NIO概念非常多,使用非常復(fù)雜對(duì)新手不友好。
- 底層使用epoll,很容易導(dǎo)致空輪詢進(jìn)而出現(xiàn)CPU100%。
- 沒有對(duì)建立連接和處理請(qǐng)求的兩個(gè)處理建立線程模型,無法較好的發(fā)揮它的優(yōu)勢(shì),需要自己進(jìn)行擴(kuò)展實(shí)現(xiàn)。
- 項(xiàng)目龐大后,會(huì)出現(xiàn)各種奇奇怪怪的bug,很難排查,且維護(hù)成本較高。
高性能網(wǎng)絡(luò)通信框架Netty
相對(duì)與JDK的原生nio,Netty與之相比有著一下的優(yōu)勢(shì):
- 統(tǒng)一的API,支持多種傳輸類型、阻塞的和非阻塞的簡單而強(qiáng)大的線程模型,真正的無連接數(shù)據(jù)報(bào)套接字,支持鏈接邏輯組件以支持復(fù)用。
- 易于使用,各種配置只需幾個(gè)方法的調(diào)用就能完成。
- 性能較好,擁有比 Java 的核心API更高的吞吐量以及更低的延遲得益于池化和復(fù)用,擁有更低的資源消耗最少的內(nèi)存復(fù)制。
- 健壯,不會(huì)因?yàn)槁?、快速或者超載的連接而導(dǎo)致OutOfMemoryError消除在高速網(wǎng)絡(luò)中NIO應(yīng)用程序常見的不公平讀/寫比率。
- 安全,完整的SSL/TLS以及 StartTLS支持可用于受限環(huán)境下,如Applet和 OSGI。
- 社區(qū)活躍。
同樣以以上述客戶端服務(wù)端通信,Netty實(shí)現(xiàn)就比較簡單了,我們編寫服務(wù)端時(shí),只需通過NioEventLoopGroup 完成上圖所說兩個(gè)slector創(chuàng)建,再通過channel指明當(dāng)前事件輪詢采用NIO非阻塞方式,最后將事件處理器FirstServerHandler添加到當(dāng)前服務(wù)端childHandler的pipeline上即可處理所有客戶端讀寫請(qǐng)求:
public static void main(String[] args) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
//創(chuàng)建處理連接的事件輪詢eventLoop
NioEventLoopGroup boss = new NioEventLoopGroup();
//創(chuàng)建處理客戶端讀寫請(qǐng)求的eventLoop
NioEventLoopGroup worker = new NioEventLoopGroup();
serverBootstrap.group(boss, worker)
//設(shè)置為非阻塞輪詢
.channel(NioServerSocketChannel.class)
//childHandler添加ServerHandler客戶端讀寫請(qǐng)求
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new FirstServerHandler());
}
});
serverBootstrap.bind("127.0.0.1", 8080);
}
最后我們給出FirstServerHandler 的代碼,可以看到我們直接繼承ChannelInboundHandlerAdapter 處理客戶端發(fā)送的數(shù)據(jù),每當(dāng)服務(wù)端收到客戶端數(shù)據(jù)時(shí)就會(huì)回調(diào)channelRead,我們的邏輯也很簡單,收到數(shù)據(jù)之后直接回復(fù)Hello Netty client:
public class FirstServerHandler extends ChannelInboundHandlerAdapter {
/**
* 收到客戶端數(shù)據(jù)后會(huì)回調(diào)該方法
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
//打印讀取到的數(shù)據(jù)
System.out.println(new Date() + ": 服務(wù)端讀到數(shù)據(jù) -> " + byteBuf.toString(StandardCharsets.UTF_8));
// 回復(fù)客戶端數(shù)據(jù)
System.out.println(new Date() + ": 服務(wù)端寫出數(shù)據(jù)");
//組裝數(shù)據(jù)并發(fā)送
ByteBuf out = getByteBuf(ctx);
ctx.channel().writeAndFlush(out);
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
ByteBuf buffer = ctx.alloc().buffer();
byte[] bytes = "Hello Netty client ".getBytes(StandardCharsets.UTF_8);
buffer.writeBytes(bytes);
return buffer;
}
}
此時(shí)我們通過telnet 127.0.0.1 8080進(jìn)行數(shù)據(jù)發(fā)送即可收到服務(wù)端的響應(yīng)了: