解密 Netty 高性能之謎:NioEventLoop 線程池阻塞分析與調(diào)優(yōu)策略
我們使用NioEventLoop常會出現(xiàn)一個(gè)奇怪的現(xiàn)象,在消息密集的情況下,服務(wù)端處理會斷斷續(xù)續(xù)的,偶發(fā)出現(xiàn)消息處理阻塞,經(jīng)過不斷的摸索排查發(fā)現(xiàn)是線程池使用不當(dāng)導(dǎo)致的,遂此文簡單介紹一下這個(gè)故障的現(xiàn)象和排查思路。
詳解NioEventLoop阻塞問題分析與解決過程
1. 故障復(fù)現(xiàn)
在演示代碼之前,我們不妨先來了解一下這個(gè)需求,客戶端和服務(wù)端建立連接之后,會向該通道不斷發(fā)送消息。然后服務(wù)端收到消息,會將消息提交到業(yè)務(wù)線程池中異步處理:
2. 客戶端代碼實(shí)現(xiàn)分析
先來看看客戶端的connect代碼,就是一套標(biāo)準(zhǔn)的模板代碼,設(shè)置好對應(yīng)參數(shù)以及業(yè)務(wù)處理器之后,直接向服務(wù)端的9999端口發(fā)起連接:
public class NettyClient {
public void connect() throws Exception {
EventLoopGroup group = new NioEventLoopGroup(8);
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//業(yè)務(wù)處理器
ch.pipeline().addLast(new NettyClientHandler());
}
});
ChannelFuture f = b.connect("127.0.0.1", 9999).sync();
//......
}
}
對應(yīng)我們給出客戶端處理器的代碼,和服務(wù)端建立了連接之后,創(chuàng)建一個(gè)線程,無限循環(huán),每次刷一個(gè)消息就休息1ms:
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
static final int MSG_SIZE = 256;
@Override
public void channelActive(ChannelHandlerContext ctx) {
new Thread(() -> {
//無限循環(huán),每隔一毫秒發(fā)送一次消息
while (true) {
ByteBuf firstMessage = Unpooled.buffer(MSG_SIZE);
for (int i = 0; i < firstMessage.capacity(); i++) {
firstMessage.writeByte((byte) i);
}
//刷一次消息后休眠1ms
ctx.writeAndFlush(firstMessage);
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (Exception e) {
//......
}
}
}).start();
}
}
3. 服務(wù)端處理邏輯分析
而服務(wù)端啟動(dòng)類也比較簡單,就是一套比較經(jīng)典的NIO模板:
public class NettyServer {
public static void main(String[] args) throws Exception {
//聲明主從reactor
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//追加業(yè)務(wù)處理器NettyServerHandler
ChannelPipeline p = ch.pipeline();
p.addLast(new NettyServerHandler());
}
});
//監(jiān)聽9999端口
ChannelFuture f = b.bind(9999).sync();
//......
}
}
NettyServerHandler 處理器的邏輯也比較簡單,簡單的將消息提交到業(yè)務(wù)線程池中執(zhí)行即可,注意筆者代碼中的一行代碼Thread.currentThread() == ctx.channel().eventLoop()這就是后續(xù)問題引發(fā)的關(guān)鍵:
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private static AtomicInteger sum = new AtomicInteger(0);
//設(shè)置一個(gè)最大線程數(shù)為3的線程池,當(dāng)線程處理不過來的時(shí)候采用CallerRunsPolicy策略
private static ExecutorService executorService = new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//原子類記錄收到消息數(shù)以及打印消息時(shí)間
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
String date = simpleDateFormat.format(new Date());
System.out.println("--> Server receive client message : " + sum.incrementAndGet() + "time: " + date);
//將消息提交到業(yè)務(wù)線程池中處理
executorService.execute(() -> {
ByteBuf req = (ByteBuf) msg;
//如果當(dāng)前執(zhí)行線程是nio線程則休眠15s
if (Thread.currentThread() == ctx.channel().eventLoop())
try {
TimeUnit.SECONDS.sleep(15);
} catch (Exception e) {
e.printStackTrace();
}
//轉(zhuǎn)發(fā)消息,此處代碼省略,轉(zhuǎn)發(fā)成功之后返回響應(yīng)給終端
ctx.writeAndFlush(req);
});
}
//......
}
自此我們的代碼都編寫完成,我們不妨將服務(wù)端和客戶端代碼都啟動(dòng)。通過控制臺可以發(fā)現(xiàn),1毫秒發(fā)送的消息,會時(shí)不時(shí)的卡15s才能繼續(xù)處理消息。
4. 排查思路
這類問題我們用jvisualvm看看GC情況是否正常,看看是不是頻繁的Full GC導(dǎo)致整個(gè)進(jìn)程處于STW狀態(tài)導(dǎo)致消息任務(wù)阻塞。
監(jiān)控結(jié)果如下,很明顯GC沒有問題,我們只能看看CPU使用情況。
很明顯的CPU使用情況也是正常,沒有什么奇奇怪怪的任務(wù)導(dǎo)致使用率飆升。
所以我們只能看看線程使用情況了,果然,我們發(fā)現(xiàn)NioEventLoop居然長時(shí)間的處于休眠狀態(tài):
所以我們用jps定位Java進(jìn)程id后鍵入jstack查看線程使用情況:
jstack -l 17892
自此我們終于找到了線程長期休眠的原因,從下面的堆棧我們可以看出,正是任務(wù)量巨大,導(dǎo)致業(yè)務(wù)線程池?zé)o法及時(shí)處理消息,最終業(yè)務(wù)線程池走到了拒絕策略,這就使得業(yè)務(wù)線程池一直走到CallerRunsPolicy,也就是說業(yè)務(wù)線程池忙不過來的時(shí)候會將任務(wù)交由NioEventLoop執(zhí)行。而一個(gè)連接只會有一個(gè)NioEventLoop的線程執(zhí)行,使得原本非常忙碌的NioEventLoop還得分神處理一下我們業(yè)務(wù)線程池的任務(wù)。
為了驗(yàn)證這一點(diǎn),我們不妨在業(yè)務(wù)線程池中打印線程名:
//將消息提交到業(yè)務(wù)線程池中處理
executorService.execute(() -> {
System.out.println(" executorService execute thread name: "+Thread.currentThread().getName());
ByteBuf req = (ByteBuf) msg;
//其它業(yè)務(wù)邏輯處理,訪問數(shù)據(jù)庫
if ((Thread.currentThread() == ctx.channel().eventLoop()))
try {
//訪問數(shù)據(jù)庫,模擬偶現(xiàn)的數(shù)據(jù)庫慢,同步阻塞15秒
TimeUnit.SECONDS.sleep(15);
} catch (Exception e) {
e.printStackTrace();
}
//轉(zhuǎn)發(fā)消息,此處代碼省略,轉(zhuǎn)發(fā)成功之后返回響應(yīng)給終端
ctx.writeAndFlush(req);
});
最終我們可以看到,線程池中的任務(wù)都被nioEventLoopGroup這個(gè)線程執(zhí)行,所以這也是筆者為什么在模擬問題時(shí)在if中增加 (Thread.currentThread() == ctx.channel().eventLoop())的原因,就是為了模仿那些耗時(shí)的業(yè)務(wù)被nioEventLoopGroup的線程執(zhí)行的情況,例如:一個(gè)耗時(shí)需要15s的任務(wù)剛剛好因?yàn)榫芙^策略被nioEventLoopGroup執(zhí)行,那么Netty服務(wù)端的消息處理自然就會阻塞,出現(xiàn)本文所說的問題。
5. 解決方案
從上文的分析中我們可以得出下面這樣一個(gè)結(jié)果,所以解決該問題的方式又兩種:
- 調(diào)整業(yè)務(wù)線程池大小,提升線程池處理效率并適當(dāng)增加隊(duì)列長度。
- 調(diào)整拒絕策略,處理不過來時(shí)直接丟棄。
以筆者為例,結(jié)合各種耗時(shí)工具排查后發(fā)現(xiàn)夯住線程池的業(yè)務(wù)功能存在可以優(yōu)化的空間,所以將功能優(yōu)化后結(jié)合arthas等工具大體可以定位到阻塞隊(duì)列穩(wěn)定的消息數(shù),最終給的策略就是優(yōu)化功能代碼+調(diào)大阻塞隊(duì)列和最大線程數(shù):
對應(yīng)我們給出線程池優(yōu)化后的參數(shù),整體上又優(yōu)化了任務(wù)處理速度避免了線程池夯?。?/p>
//調(diào)大阻塞隊(duì)列
private static ExecutorService executorService = new ThreadPoolExecutor(1, 8, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10_0000), new ThreadPoolExecutor.CallerRunsPolicy());
自此之后我們再查看控制臺輸出和NioEventLoop線程狀態(tài),發(fā)現(xiàn)運(yùn)行都沒有阻塞,那些實(shí)在無法處理的消息都被丟棄了:
總結(jié)
自此我們對于本次的事件總結(jié)出以下幾點(diǎn)要求和建議:
- 耗時(shí)操作不要用NioEventLoop,尤其是本次這種高并發(fā)且拒絕策略配置為用執(zhí)行線程接收忙碌任務(wù)的方式。
- 服務(wù)端收不到消息時(shí),建議優(yōu)先從CPU、GC、線程等角度分析問題。
- 服務(wù)端開發(fā)時(shí)建議使用兩個(gè)NioEventLoop構(gòu)成主從Reactor模式,并結(jié)合業(yè)務(wù)場景壓測出合適的線程數(shù)。