自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

從線上偶發(fā)的宕機(jī)事件看Netty流量控制

開(kāi)發(fā) 前端
目前移動(dòng)端的使用場(chǎng)景中會(huì)用到大量的消息推送,push消息可以幫助運(yùn)營(yíng)人員更高效地實(shí)現(xiàn)運(yùn)營(yíng)目標(biāo)(比如給用戶推送營(yíng)銷(xiāo)活動(dòng)或者提醒APP新功能)。

 

業(yè)務(wù)背景

目前移動(dòng)端的使用場(chǎng)景中會(huì)用到大量的消息推送,push消息可以幫助運(yùn)營(yíng)人員更高效地實(shí)現(xiàn)運(yùn)營(yíng)目標(biāo)(比如給用戶推送營(yíng)銷(xiāo)活動(dòng)或者提醒APP新功能)。

對(duì)于推送系統(tǒng)來(lái)說(shuō)需要具備以下兩個(gè)特性:

  • 消息秒級(jí)送到用戶,無(wú)延時(shí),支持每秒百萬(wàn)推送,單機(jī)百萬(wàn)長(zhǎng)連接。

  • 支持通知、文本、自定義消息透?jìng)鞯日宫F(xiàn)形式。正是由于以上原因,對(duì)于系統(tǒng)的開(kāi)發(fā)和維護(hù)帶來(lái)了挑戰(zhàn)。下圖是推送系統(tǒng)的簡(jiǎn)單描述(API->推送模塊->手機(jī))。

問(wèn)題背景

推送系統(tǒng)中長(zhǎng)連接集群在穩(wěn)定性測(cè)試、壓力測(cè)試階運(yùn)行一段時(shí)間后隨機(jī)會(huì)出現(xiàn)一個(gè)進(jìn)程掛掉的情況,概率較小(頻率為一個(gè)月左右發(fā)生一次),這會(huì)影響部分客戶端消息送到的時(shí)效。

推送系統(tǒng)中的長(zhǎng)連接節(jié)點(diǎn)(Broker系統(tǒng))是基于Netty開(kāi)發(fā),此節(jié)點(diǎn)維護(hù)了服務(wù)端和手機(jī)終端的長(zhǎng)連接,線上問(wèn)題出現(xiàn)后,添加Netty內(nèi)存泄露監(jiān)控參數(shù)進(jìn)行問(wèn)題排查,觀察多天但并未排查出問(wèn)題。

由于長(zhǎng)連接節(jié)點(diǎn)是Netty開(kāi)發(fā),為便于讀者理解,下面簡(jiǎn)單介紹一下Netty。

Netty介紹

Netty是一個(gè)高性能、異步事件驅(qū)動(dòng)的NIO框架,基于Java NIO提供的API實(shí)現(xiàn)。它提供了對(duì)TCP、UDP和文件傳輸?shù)闹С?,作為?dāng)前最流行的NIO框架,Netty在互聯(lián)網(wǎng)領(lǐng)域、大數(shù)據(jù)分布式計(jì)算領(lǐng)域、游戲行業(yè)、通信行業(yè)等獲得了廣泛的應(yīng)用,HBase,Hadoop,Bees,Dubbo等開(kāi)源組件也基于Netty的NIO框架構(gòu)建。

問(wèn)題分析

猜想

最初猜想是長(zhǎng)連接數(shù)導(dǎo)致的,但經(jīng)過(guò)排查日志、分析代碼,發(fā)現(xiàn)并不是此原因造成。

長(zhǎng)連接數(shù):39萬(wàn),如下圖:

 

 

連接數(shù) 

 

每個(gè)channel字節(jié)大小1456, 按40萬(wàn)長(zhǎng)連接計(jì)算,不致于產(chǎn)生內(nèi)存過(guò)大現(xiàn)象。

查看GC日志

查看GC日志,發(fā)現(xiàn)進(jìn)程掛掉之前頻繁full GC(頻率5分鐘一次),但內(nèi)存并未降低,懷疑堆外內(nèi)存泄露。

分析heap內(nèi)存情況

ChannelOutboundBuffer對(duì)象占將近5G內(nèi)存,泄露原因基本可以確定:ChannelOutboundBuffer的entry數(shù)過(guò)多導(dǎo)致,查看ChannelOutboundBuffer的源碼可以分析出,是ChannelOutboundBuffer中的數(shù)據(jù)。

沒(méi)有寫(xiě)出去,導(dǎo)致一直積壓;

ChannelOutboundBuffer內(nèi)部是一個(gè)鏈表結(jié)構(gòu)。

從上圖分析數(shù)據(jù)未寫(xiě)出去,為什么會(huì)出現(xiàn)這種情況?

代碼中實(shí)際有判斷連接是否可用的情況(Channel.isActive),并且會(huì)對(duì)超時(shí)的連接進(jìn)行關(guān)閉。從歷史經(jīng)驗(yàn)來(lái)看,這種情況發(fā)生在連接半打開(kāi)(客戶端異常關(guān)閉)的情況比較多---雙方不進(jìn)行數(shù)據(jù)通信無(wú)問(wèn)題。

按上述猜想,測(cè)試環(huán)境進(jìn)行重現(xiàn)和測(cè)試。

1)模擬客戶端集群,并與長(zhǎng)連接服務(wù)器建立連接,設(shè)置客戶端節(jié)點(diǎn)的防火墻,模擬服務(wù)器與客戶端網(wǎng)絡(luò)異常的場(chǎng)景(即要模擬Channel.isActive調(diào)用成功,但數(shù)據(jù)實(shí)際發(fā)送不出去的情況)。
2)調(diào)小堆外內(nèi)存,持續(xù)發(fā)送測(cè)試消息給之前的客戶端。消息大小(1K左右)。
3)按照128M內(nèi)存來(lái)計(jì)算,實(shí)際上調(diào)用9W多次就會(huì)出現(xiàn)。

問(wèn)題解決

啟用autoRead機(jī)制

當(dāng)channel不可寫(xiě)時(shí),關(guān)閉autoRead;

  1. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 
  2.     if (!ctx.channel().isWritable()) { 
  3.         Channel channel = ctx.channel(); 
  4.         ChannelInfo channelInfo = ChannelManager.CHANNEL_CHANNELINFO.get(channel); 
  5.         String clientId = ""
  6.         if (channelInfo != null) { 
  7.             clientId = channelInfo.getClientId(); 
  8.         } 
  9.  
  10.         LOGGER.info("channel is unwritable, turn off autoread, clientId:{}", clientId); 
  11.         channel.config().setAutoRead(false); 
  12.     } 

當(dāng)數(shù)據(jù)可寫(xiě)時(shí)開(kāi)啟autoRead;

  1. @Override 
  2. public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception 
  3.     Channel channel = ctx.channel(); 
  4.     ChannelInfo channelInfo = ChannelManager.CHANNEL_CHANNELINFO.get(channel); 
  5.     String clientId = ""
  6.     if (channelInfo != null) { 
  7.         clientId = channelInfo.getClientId(); 
  8.     } 
  9.     if (channel.isWritable()) { 
  10.         LOGGER.info("channel is writable again, turn on autoread, clientId:{}", clientId); 
  11.         channel.config().setAutoRead(true); 
  12.     } 

說(shuō)明:

autoRead的作用是更精確的速率控制,如果打開(kāi)的時(shí)候Netty就會(huì)幫我們注冊(cè)讀事件。當(dāng)注冊(cè)了讀事件后,如果網(wǎng)絡(luò)可讀,則Netty就會(huì)從channel讀取數(shù)據(jù)。那如果autoread關(guān)掉后,則Netty會(huì)不注冊(cè)讀事件。

這樣即使是對(duì)端發(fā)送數(shù)據(jù)過(guò)來(lái)了也不會(huì)觸發(fā)讀事件,從而也不會(huì)從channel讀取到數(shù)據(jù)。當(dāng)recv_buffer滿時(shí),也就不會(huì)再接收數(shù)據(jù)。

設(shè)置高低水位

  1. serverBootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 10248 * 1024 * 1024)); 

注:高低水位配合后面的isWritable使用

增加channel.isWritable()的判斷

channel是否可用除了校驗(yàn)channel.isActive()還需要加上channel.isWrite()的判斷,isActive只是保證連接是否激活,而是否可寫(xiě)由isWrite來(lái)決定。

  1. private void writeBackMessage(ChannelHandlerContext ctx, MqttMessage message) { 
  2.     Channel channel = ctx.channel(); 
  3.     //增加channel.isWritable()的判斷 
  4.     if (channel.isActive() && channel.isWritable()) { 
  5.         ChannelFuture cf = channel.writeAndFlush(message); 
  6.         if (cf.isDone() && cf.cause() != null) { 
  7.             LOGGER.error("channelWrite error!", cf.cause()); 
  8.             ctx.close(); 
  9.         } 
  10.     } 

注:isWritable可以來(lái)控制ChannelOutboundBuffer,不讓其無(wú)限制膨脹。其機(jī)制就是利用設(shè)置好的channel高低水位來(lái)進(jìn)行判斷。

問(wèn)題驗(yàn)證

修改后再進(jìn)行測(cè)試,發(fā)送到27W次也并不報(bào)錯(cuò);

解決思路分析

一般Netty數(shù)據(jù)處理流程如下:將讀取的數(shù)據(jù)交由業(yè)務(wù)線程處理,處理完成再發(fā)送出去(整個(gè)過(guò)程是異步的),Netty為了提高網(wǎng)絡(luò)的吞吐量,在業(yè)務(wù)層與socket之間增加了一個(gè)ChannelOutboundBuffer。

在調(diào)用channel.write的時(shí)候,所有寫(xiě)出的數(shù)據(jù)其實(shí)并沒(méi)有寫(xiě)到socket,而是先寫(xiě)到ChannelOutboundBuffer。當(dāng)調(diào)用channel.flush的時(shí)候才真正的向socket寫(xiě)出。因?yàn)檫@中間有一個(gè)buffer,就存在速率匹配了,而且這個(gè)buffer還是無(wú)界的(鏈表),也就是你如果沒(méi)有控制channel.write的速度,會(huì)有大量的數(shù)據(jù)在這個(gè)buffer里堆積,如果又碰到socket寫(xiě)不出數(shù)據(jù)的時(shí)候(isActive此時(shí)判斷無(wú)效)或者寫(xiě)得慢的情況。

很有可能的結(jié)果就是資源耗盡,而且如果ChannelOutboundBuffer存放的是

DirectByteBuffer,這會(huì)讓問(wèn)題更加難排查。

流程可抽象如下:

從上面的分析可以看出,步驟一寫(xiě)太快(快到處理不過(guò)來(lái))或者下游發(fā)送不出數(shù)據(jù)都會(huì)造成問(wèn)題,這實(shí)際是一個(gè)速率匹配問(wèn)題。

Netty源碼說(shuō)明

超過(guò)高水位

當(dāng)ChannelOutboundBuffer的容量超過(guò)高水位設(shè)定閾值后,isWritable()返回false,設(shè)置channel不可寫(xiě)(setUnwritable),并且觸發(fā)fireChannelWritabilityChanged()。

  1. private void incrementPendingOutboundBytes(long size, boolean invokeLater) { 
  2.     if (size == 0) { 
  3.         return
  4.     } 
  5.  
  6.     long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); 
  7.     if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { 
  8.         setUnwritable(invokeLater); 
  9.     } 
  10. private void setUnwritable(boolean invokeLater) { 
  11.     for (;;) { 
  12.         final int oldValue = unwritable; 
  13.         final int newValue = oldValue | 1
  14.         if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { 
  15.             if (oldValue == 0 && newValue != 0) { 
  16.                 fireChannelWritabilityChanged(invokeLater); 
  17.             } 
  18.             break
  19.         } 
  20.     } 

低于低水位

當(dāng)ChannelOutboundBuffer的容量低于低水位設(shè)定閾值后,isWritable()返回true,設(shè)置channel可寫(xiě),并且觸發(fā)fireChannelWritabilityChanged()。

  1. private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) { 
  2.     if (size == 0) { 
  3.         return
  4.     } 
  5.  
  6.     long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); 
  7.     if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { 
  8.         setWritable(invokeLater); 
  9.     } 
  10. private void setWritable(boolean invokeLater) { 
  11.     for (;;) { 
  12.         final int oldValue = unwritable; 
  13.         final int newValue = oldValue & ~1
  14.         if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { 
  15.             if (oldValue != 0 && newValue == 0) { 
  16.                 fireChannelWritabilityChanged(invokeLater); 
  17.             } 
  18.             break
  19.         } 
  20.     } 

總結(jié)

當(dāng)ChannelOutboundBuffer的容量超過(guò)高水位設(shè)定閾值后,isWritable()返回false,表明消息產(chǎn)生堆積,需要降低寫(xiě)入速度。

當(dāng)ChannelOutboundBuffer的容量低于低水位設(shè)定閾值后,isWritable()返回true,表明消息過(guò)少,需要提高寫(xiě)入速度。通過(guò)以上三個(gè)步驟修改后,部署線上觀察半年未發(fā)生問(wèn)題出現(xiàn)。

 

責(zé)任編輯:張燕妮 來(lái)源: Kirito的技術(shù)分享
相關(guān)推薦

2010-02-03 23:04:31

流量控制P2P華夏創(chuàng)新

2023-10-08 12:14:42

Sentinel流量控制

2011-06-23 09:09:37

流量控制

2013-07-22 14:25:29

iOS開(kāi)發(fā)ASIHTTPRequ

2010-06-04 10:49:58

Linux流量控制

2010-05-27 11:03:44

Linux流量控制

2010-08-06 10:02:07

2015-07-02 11:41:04

宕機(jī)云服務(wù)

2010-06-17 17:00:07

Linux流量控制

2021-03-09 07:38:15

Percona Xtr流量控制運(yùn)維

2010-06-04 11:21:42

Linux 流量控制

2009-02-05 10:13:00

局域網(wǎng)流量控制數(shù)據(jù)流量

2024-12-02 08:02:36

2019-07-02 10:22:15

TCP流量數(shù)據(jù)

2024-03-04 00:02:00

Redis存儲(chǔ)令牌

2021-03-22 08:06:59

SpringBootSentinel項(xiàng)目

2010-05-27 10:43:29

Linux流量控制

2024-04-30 11:00:10

數(shù)據(jù)中心

2010-06-13 13:34:47

Linux 流量控制

2010-11-30 09:40:15

流量控制設(shè)備AllotQOS策略
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)