深入理解Netty編解碼、粘包拆包、心跳機(jī)制
前言
Netty系列文章:
- BIO 、NIO 、AIO 總結(jié)
- Unix網(wǎng)絡(luò)編程中的五種IO模型
- 深入理解IO多路復(fù)用實現(xiàn)機(jī)制
- Netty核心功能與線程模型
前面我們講了 BIO、NIO、AIO 等一些基礎(chǔ)知識和Netty核心功能與線程模型,本篇重點來理解Netty的編解碼、粘包拆包、心跳機(jī)制等實現(xiàn)原理進(jìn)行講解。
Netty編解碼
Netty 涉及到編解碼的組件有 Channel 、 ChannelHandler 、 ChannelPipe 等,我們先大概了解下這幾個組件的作用。
ChannelHandler
ChannelHandler 充當(dāng)來處理入站和出站數(shù)據(jù)的應(yīng)用程序邏輯容器。例如,實現(xiàn) ChannelInboundHandler 接口(或 ChannelInboundHandlerAdapter),你就可以接收入站事件和數(shù)據(jù),這些數(shù)據(jù)隨后會被你的應(yīng)用程序的業(yè)務(wù)邏輯處理。當(dāng)你要給連接的客戶端發(fā)送響應(yīng)時,也可以從 ChannelInboundHandler 刷數(shù)據(jù)。你的業(yè)務(wù)邏輯通常下在一個或者多個 ChannelInboundHandler 中。
ChannelOutboundHandler 原理一樣,只不過它是用來處理出站數(shù)據(jù)的。
ChannelPipeline
ChannelPipeline 提供了 ChannelHandler 鏈的容器。以客戶端應(yīng)用程序為例,如果有事件的運動方向是從客戶端到服務(wù)端,那么我們稱這些事件為出站的,即客戶端發(fā)送給服務(wù)端的數(shù)據(jù)會通過 pipeline 中的一系列 ChannelOutboundHandler (ChannelOutboundHandler 調(diào)用是從 tail 到 head 方向逐個調(diào)用每個 handler 的邏輯),并被這些 Hadnler 處理,反之稱為入站的,入站只調(diào)用 pipeline 里的 ChannelInboundHandler 邏輯(ChannelInboundHandler 調(diào)用是從 head 到 tail 方向 逐個調(diào)用每個 handler 的邏輯。)
編解碼器
當(dāng)你通過Netty發(fā)送或者接受一個消息的時候,就將會發(fā)生一次數(shù)據(jù)轉(zhuǎn)換。入站消息會被解碼:從字節(jié)轉(zhuǎn)換為另一種格式(比如java對象);如果是出站消息,它會被編碼成字節(jié)。
Netty提供了一系列實用的編碼解碼器,它們都實現(xiàn)了ChannelInboundHadnler或者ChannelOutboundHandler接口。在這些類中, channelRead方法已經(jīng)被重寫了。
以入站為例,對于每個從入站Channel讀取的消息,這個方法會被調(diào)用。隨后,它將調(diào)用由已知解碼器所提供的decode()方法進(jìn)行解碼,并將已經(jīng)解碼的字節(jié)轉(zhuǎn)發(fā)給ChannelPipeline中的下一個ChannelInboundHandler。
Netty提供了很多編解碼器,比如編解碼字符串的StringEncoder和StringDecoder,編解碼對象的ObjectEncoder和ObjectDecoder 等。
當(dāng)然也可以通過集成ByteToMessageDecoder自定義編解碼器。
示例代碼
完整代碼在 Github :
https://github.com/Niuh-Study/niuh-netty.git
對應(yīng)的包 com.niuh.netty.codec
Netty粘包拆包
TCP 粘包拆包是指發(fā)送方發(fā)送的若干包數(shù)據(jù)到接收方接收時粘成一包或某個數(shù)據(jù)包被拆開接收。如下圖所示,client 發(fā)送了兩個數(shù)據(jù)包 D1 和 D2,但是 server 端可能會收到如下幾種情況的數(shù)據(jù)。
程序演示
首先準(zhǔn)備客戶端負(fù)責(zé)發(fā)送消息,連續(xù)發(fā)送5次消息,代碼如下:
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- for (int i = 1; i <= 5; i++) {
- ByteBuf byteBuf = Unpooled.copiedBuffer("msg No" + i + " ", Charset.forName("utf-8"));
- ctx.writeAndFlush(byteBuf);
- }
- }
然后服務(wù)端作為接收方,接收并且打印結(jié)果:
- // count 變量,用于計數(shù)
- private int count;
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- System.out.println("服務(wù)器讀取線程 " + Thread.currentThread().getName());
- ByteBuf buf = (ByteBuf) msg;
- byte[] bytes = new byte[buf.readableBytes()];
- // 把ByteBuf的數(shù)據(jù)讀到bytes數(shù)組中
- buf.readBytes(bytes);
- String message = new String(bytes, Charset.forName("utf-8"));
- System.out.println("服務(wù)器接收到數(shù)據(jù):" + message);
- // 打印接收的次數(shù)
- System.out.println("接收到的數(shù)據(jù)量是:" + (++this.count));
- }
啟動服務(wù)端,再啟動兩個客戶端發(fā)送消息,服務(wù)端的控制臺可以看到這樣:
粘包的問題其實是隨機(jī)的,所以每次結(jié)果都不太一樣。
完整代碼在 Github :
https://github.com/Niuh-Study/niuh-netty.git
對應(yīng)的包 com.niuh.splitpacket0
為什么出現(xiàn)粘包現(xiàn)象?
TCP 是面向連接的,面向流的,提供高可靠性服務(wù)。收發(fā)兩端(客戶端和服務(wù)器端)都要有成對的 socket,因此,發(fā)送端為了將多個發(fā)送給接收端的包,更有效的發(fā)送給對方,使用了優(yōu)化方法(Nagle算法),將多次間隔較少且數(shù)據(jù)量小的數(shù)據(jù),合并成一個大的數(shù)據(jù)塊,然后進(jìn)行封包,這樣做雖然提供了效率,但是接收端就難以分辨出完整的數(shù)據(jù)包了,因為面向流的通信是無消息保護(hù)邊界的。
如何理解TCP是面向字節(jié)流的
- 應(yīng)用程序和 TCP 的交互是一次一個數(shù)據(jù)塊(大小不等),但 TCP 把應(yīng)用程序交下來的數(shù)據(jù)僅僅看成是一連串的無結(jié)構(gòu)的字節(jié)流。TCP 并不知道所傳送的字節(jié)流的含義;
- 因此 TCP 不保證接收方應(yīng)用程序所收到的數(shù)據(jù)塊和發(fā)送方應(yīng)用程序所發(fā)出的數(shù)據(jù)塊具有對應(yīng)大小的關(guān)系(例如,發(fā)送方應(yīng)用程序交給發(fā)送方的 TCP 共 10 個數(shù)據(jù)塊,但接收方的 TCP 可能只用了 4 個就把收到的字節(jié)流交付上層的應(yīng)用程序);
- 同時,TCP 不關(guān)心應(yīng)用進(jìn)程一次把多長的報文發(fā)送到 TCP 的緩存中,而是根據(jù)對方給出的窗口值和當(dāng)前網(wǎng)絡(luò)阻塞的程度來決定一個報文段應(yīng)包含多少個字節(jié)(UDP 發(fā)送的報文長度是應(yīng)用進(jìn)程給出的)。如果應(yīng)用進(jìn)程傳送到 TCP 緩存的數(shù)據(jù)塊太長,TCP 就可以把它劃分短一點再傳送。如果應(yīng)用程序一次只發(fā)來一個字節(jié),TCP 也可以等待積累有足夠多的字節(jié)后再構(gòu)成報文段發(fā)送出去。
TCP發(fā)送報文一般是 3 個時機(jī)
- 緩沖區(qū)數(shù)據(jù)達(dá)到,最大報文長度 MSS;
- 由發(fā)送端的應(yīng)用進(jìn)程指明要求發(fā)送報文段,即 TCP 支持的推送(push)操作;
- 當(dāng)發(fā)送方的一個計時器期限到了,即使長度不超過 MSS,也發(fā)送。
解決方案
一般解決粘包拆包問題有 4 種辦法
1.在數(shù)據(jù)的末尾添加特殊的符號標(biāo)識數(shù)據(jù)包的邊界。通常會加\n、\r、\t或者其他的符號
學(xué)習(xí) HTTP、FTP 等,使用回車換行符號;
2.在數(shù)據(jù)的頭部聲明數(shù)據(jù)的長度,按長度獲取數(shù)據(jù)
將消息分為 head 和 body,head 中包含 body 長度的字段,一般 head 的第一個字段使用 int 值來表示 body 長度;
3.規(guī)定報文的長度,不足則補空位。讀取時按規(guī)定好的長度來讀取。比如 100 字節(jié),如果不夠就補空格;
4.使用更復(fù)雜的應(yīng)用層協(xié)議。
使用LineBasedFrameDecoder
LineBasedFrameDecoder 是Netty內(nèi)置的一個解碼器,對應(yīng)的編碼器是 LineEncoder。
原理是上面講的第一種思路,在數(shù)據(jù)末尾加上特殊符號以標(biāo)識邊界。默認(rèn)是使用換行符\n。
用法很簡單,發(fā)送方加上編碼器:
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- //添加編碼器,使用默認(rèn)的符號\n,字符集是UTF-8
- ch.pipeline().addLast(new LineEncoder(LineSeparator.DEFAULT, CharsetUtil.UTF_8));
- ch.pipeline().addLast(new TcpClientHandler());
- }
接收方加上解碼器:
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- //解碼器需要設(shè)置數(shù)據(jù)的最大長度,我這里設(shè)置成1024
- ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
- //給pipeline管道設(shè)置業(yè)務(wù)處理器
- ch.pipeline().addLast(new TcpServerHandler());
- }
然后在發(fā)送方,發(fā)送消息時在末尾加上標(biāo)識符:
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- for (int i = 1; i <= 5; i++) {
- //在末尾加上默認(rèn)的標(biāo)識符\n
- ByteBuf byteBuf = Unpooled.copiedBuffer("msg No" + i + StringUtil.LINE_FEED, Charset.forName("utf-8"));
- ctx.writeAndFlush(byteBuf);
- }
- }
于是我們再次啟動服務(wù)端和客戶端,在服務(wù)端的控制臺可以看到:
在數(shù)據(jù)的末尾添加特殊的符號標(biāo)識數(shù)據(jù)包的邊界,粘包、拆包的問題就得到解決了。
注意:數(shù)據(jù)末尾一定是分隔符,分隔符后面不要再加上數(shù)據(jù),否則會當(dāng)做下一條數(shù)據(jù)的開始部分。下面是錯誤演示:
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- for (int i = 1; i <= 5; i++) {
- //在末尾加上默認(rèn)的標(biāo)識符\n
- ByteBuf byteBuf = Unpooled.copiedBuffer("msg No" + i + StringUtil.LINE_FEED + "[我是分隔符后面的字符串]", Charset.forName("utf-8"));
- ctx.writeAndFlush(byteBuf);
- }
- }
服務(wù)端的控制臺就會看到這樣的打印信息:
使用自定義長度幀解碼器
使用這個解碼器解決粘包問題的原理是上面講的第二種,在數(shù)據(jù)的頭部聲明數(shù)據(jù)的長度,按長度獲取數(shù)據(jù)。這個解碼器構(gòu)造器需要定義5個參數(shù),相對較為復(fù)雜一點,先看參數(shù)的解釋:
- maxFrameLength 發(fā)送數(shù)據(jù)包的最大長度
- lengthFieldOffset 長度域的偏移量。長度域位于整個數(shù)據(jù)包字節(jié)數(shù)組中的開始下標(biāo)。
- lengthFieldLength 長度域的字節(jié)數(shù)長度。長度域的字節(jié)數(shù)長度。
- lengthAdjustment 長度域的偏移量矯正。如果長度域的值,除了包含有效數(shù)據(jù)域的長度外,還包含了其他域(如長度域自身)長度,那么,就需要進(jìn)行矯正。矯正的值為:包長 - 長度域的值 – 長度域偏移 – 長度域長。
- initialBytesToStrip 丟棄的起始字節(jié)數(shù)。丟棄處于此索引值前面的字節(jié)。
前面三個參數(shù)比較簡單,可以用下面這張圖進(jìn)行演示:
矯正偏移量是什么意思呢?
是假設(shè)你的長度域設(shè)置的值除了包括有效數(shù)據(jù)的長度還有其他域的長度包含在里面,那么就要設(shè)置這個值進(jìn)行矯正,否則解碼器拿不到有效數(shù)據(jù)。
丟棄的起始字節(jié)數(shù)。這個比較簡單,就是在這個索引值前面的數(shù)據(jù)都丟棄,只要后面的數(shù)據(jù)。一般都是丟棄長度域的數(shù)據(jù)。當(dāng)然如果你希望得到全部數(shù)據(jù),那就設(shè)置為0。
下面就在消息接收端使用自定義長度幀解碼器,解決粘包的問題:
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- //數(shù)據(jù)包最大長度是1024
- //長度域的起始索引是0
- //長度域的數(shù)據(jù)長度是4
- //矯正值為0,因為長度域只有 有效數(shù)據(jù)的長度的值
- //丟棄數(shù)據(jù)起始值是4,因為長度域長度為4,我要把長度域丟棄,才能得到有效數(shù)據(jù)
- ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
- ch.pipeline().addLast(new TcpClientHandler());
- }
接著編寫發(fā)送端代碼,根據(jù)解碼器的設(shè)置,進(jìn)行發(fā)送:
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- for (int i = 1; i <= 5; i++) {
- String str = "msg No" + i;
- ByteBuf byteBuf = Unpooled.buffer(1024);
- byte[] bytes = str.getBytes(Charset.forName("utf-8"));
- //設(shè)置長度域的值,為有效數(shù)據(jù)的長度
- byteBuf.writeInt(bytes.length);
- //設(shè)置有效數(shù)據(jù)
- byteBuf.writeBytes(bytes);
- ctx.writeAndFlush(byteBuf);
- }
- }
然后啟動服務(wù)端,客戶端,我們可以看到控制臺打印結(jié)果:
可以看到,利用自定義長度幀解碼器解決了粘包問題。
使用Google Protobuf編解碼器
Netty官網(wǎng)上是明顯寫著支持Google Protobuf的,如下圖所示:
Google Protobuf是什么
官網(wǎng)的原話: Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use special generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages.
翻譯一下:Protocol buffers是Google公司的與語言無關(guān)、平臺無關(guān)、可擴(kuò)展的序列化數(shù)據(jù)的機(jī)制,類似XML,但是更小、更快、更簡單。您只需定義一次數(shù)據(jù)的結(jié)構(gòu)化方式,然后就可以使用特殊生成的源代碼,輕松地將結(jié)構(gòu)化數(shù)據(jù)寫入和讀取到各種數(shù)據(jù)流中,并支持多種語言。
在rpc或tcp通信等很多場景都可以使用。通俗來講,如果客戶端和服務(wù)端使用的是不同的語言,那么在服務(wù)端定義一個數(shù)據(jù)結(jié)構(gòu),通過protobuf轉(zhuǎn)化為字節(jié)流,再傳送到客戶端解碼,就可以得到對應(yīng)的數(shù)據(jù)結(jié)構(gòu)。這就是protobuf神奇的地方。并且,它的通信效率極高,“一條消息數(shù)據(jù),用protobuf序列化后的大小是json的10分之一,xml格式的20分之一,是二進(jìn)制序列化的10分之一”。
Google Protobuf 官網(wǎng) :
https://developers.google.cn/protocol-buffers/
為什么使用Google Protobuf
在一些場景下,數(shù)據(jù)需要在不同的平臺,不同的程序中進(jìn)行傳輸和使用,例如某個消息是用C++程序產(chǎn)生的,而另一個程序是用java寫的,當(dāng)前者產(chǎn)生一個消息數(shù)據(jù)時,需要在不同的語言編寫的不同的程序中進(jìn)行操作,如何將消息發(fā)送并在各個程序中使用呢?這就需要設(shè)計一種消息格式,常用的就有json和xml,protobuf出現(xiàn)的則較晚。
Google Protobuf優(yōu)點
- protobuf 的主要優(yōu)點是簡單,快;
- protobuf將數(shù)據(jù)序列化為二進(jìn)制之后,占用的空間相當(dāng)小,基本僅保留了數(shù)據(jù)部分,而xml和json會附帶消息結(jié)構(gòu)在數(shù)據(jù)中;
- protobuf使用起來很方便,只需要反序列化就可以了,而不需要xml和json那樣層層解析。
Google Protobuf安裝
因為我這里是Mac系統(tǒng),Mac下面除了用dmg、pkg來安裝軟件外,比較方便的還有用brew命令進(jìn)行安裝 , 它能幫助安裝其他所需要的依賴,從而減少不必要的麻煩。
安裝最新版本的protoc
1.從github上下載 protobuf3
https://github.com/protocolbuffers/protobuf/releases/tag/v3.13.0
Mac系統(tǒng)選擇第一個,如下圖所示:
2.下載成功后,切換到root用戶
- sudo -i
3.解壓壓縮包,并進(jìn)入你自己解壓的目錄
- tar xyf protobuf-all-3.13.0.tar.gz
- cd protobuf-3.13.0
4.設(shè)置編譯目錄
- ./configure --prefix=/usr/local/protobuf
5.安裝
- make
- make install
6.配置環(huán)境變量
第一步:找到.bash_profile文件并編輯
- cd ~
- open .bash_profile
第二步:然后在打開的bash_profile文件末尾添加如下配置:
- export PROTOBUF=/usr/local/protobuf
- export PATH=$PROTOBUF/bin:$PATH
第三步:source一下使文件生效
- source .bash_profile
7.測試安裝結(jié)果
- protoc --version
使用Google Protobuf
以下步驟參考Google Protobuf的github項目的指南。
https://github.com/protocolbuffers/protobuf/tree/master/java
第一步:添加maven依賴
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <version>3.11.0</version>
- </dependency>
第二步:編寫proto文件Message.proto
如何編寫.proto文件的相關(guān)文檔說明,可以去官網(wǎng)查看 下面寫一個例子,請看示范:
- syntax = "proto3"; //版本
- option java_outer_classname = "MessagePojo";//生成的外部類名,同時也是文件名
- message Message {
- int32 id = 1;//Message類的一個屬性,屬性名稱是id,序號為1
- string content = 2;//Message類的一個屬性,屬性名稱是content,序號為2
- }
第三步:使用編譯器,通過.proto文件生成代碼
在執(zhí)行上面的安裝步驟后,進(jìn)入到 bin 目錄下,可以看到一個可執(zhí)行文件 protoc
- cd /usr/local/protobuf/bin/
然后復(fù)制前面寫好的Message.proto文件到此目錄下,如圖所示:
輸入命令:
- protoc --java_out=. Message.proto
然后就可以看到生成的MessagePojo.java文件。最后把文件復(fù)制到IDEA項目中。
第四步:在發(fā)送端添加編碼器,在接收端添加解碼器
客戶端添加編碼器,對消息進(jìn)行編碼。
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- //在發(fā)送端添加Protobuf編碼器
- ch.pipeline().addLast(new ProtobufEncoder());
- ch.pipeline().addLast(new TcpClientHandler());
- }
服務(wù)端添加解碼器,對消息進(jìn)行解碼。
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- //添加Protobuf解碼器,構(gòu)造器需要指定解碼具體的對象實例
- ch.pipeline().addLast(new ProtobufDecoder(MessagePojo.Message.getDefaultInstance()));
- //給pipeline管道設(shè)置處理器
- ch.pipeline().addLast(new TcpServerHandler());
- }
第五步:發(fā)送消息
客戶端發(fā)送消息,代碼如下:
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- //使用的是構(gòu)建者模式進(jìn)行創(chuàng)建對象
- MessagePojo.Message message = MessagePojo
- .Message
- .newBuilder()
- .setId(1)
- .setContent("一角錢,起飛~")
- .build();
- ctx.writeAndFlush(message);
- }
服務(wù)端接收到數(shù)據(jù),并且打印:
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, MessagePojo.Message messagePojo) throws Exception {
- System.out.println("id:" + messagePojo.getId());
- System.out.println("content:" + messagePojo.getContent());
- }
測試結(jié)果正確:
分析Protocol的粘包、拆包
實際上直接使用Protocol編解碼器還是存在粘包問題的。
證明一下,發(fā)送端循環(huán)一百次發(fā)送100條"一角錢,起飛"的消息,請看發(fā)送端代碼演示:
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- for (int i = 1; i <= 100; i++) {
- MessagePojo.Message message = MessagePojo
- .Message
- .newBuilder()
- .setId(i)
- .setContent(i + "號一角錢,起飛~")
- .build();
- ctx.writeAndFlush(message);
- }
- }
這時,啟動服務(wù)端,客戶端后,可能只有打印幾條消息或者在控制臺看到如下錯誤:
com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either that the input has been truncated or that an embedded message misreported its own length.
意思是:分析protocol消息時,輸入意外地在字段中間結(jié)束。這可能意味著輸入被截斷,或者嵌入的消息誤報了自己的長度。
其實就是粘包問題,多條數(shù)據(jù)合并成一條數(shù)據(jù)了,導(dǎo)致解析出現(xiàn)異常。
解決Protocol的粘包、拆包問題
只需要在發(fā)送端加上編碼器 ProtobufVarint32LengthFieldPrepender
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
- ch.pipeline().addLast(new ProtobufEncoder());
- ch.pipeline().addLast(new TcpClientHandler());
- }
接收方加上解碼器 ProtobufVarint32FrameDecoder
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
- ch.pipeline().addLast(new ProtobufDecoder(MessagePojo.Message.getDefaultInstance()));
- //給pipeline管道設(shè)置處理器
- ch.pipeline().addLast(new TcpServerHandler());
- }
然后再啟動服務(wù)端和客戶端,我們可以看到正常了~
ProtobufVarint32LengthFieldPrepender 編碼器的工作如下:
- * BEFORE ENCODE (300 bytes) AFTER ENCODE (302 bytes)
- * +---------------+ +--------+---------------+
- * | Protobuf Data |-------------->| Length | Protobuf Data |
- * | (300 bytes) | | 0xAC02 | (300 bytes) |
- * +---------------+ +--------+---------------+
- @Sharable
- public class ProtobufVarint32LengthFieldPrepender extends MessageToByteEncoder<ByteBuf> {
- @Override
- protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
- int bodyLen = msg.readableBytes();
- int headerLen = computeRawVarint32Size(bodyLen);
- //寫入請求頭,消息長度
- out.ensureWritable(headerLen + bodyLen);
- writeRawVarint32(out, bodyLen);
- //寫入數(shù)據(jù)
- out.writeBytes(msg, msg.readerIndex(), bodyLen);
- }
- }
ProtobufVarint32FrameDecoder 解碼器的工作如下:
- * BEFORE DECODE (302 bytes) AFTER DECODE (300 bytes)
- * +--------+---------------+ +---------------+
- * | Length | Protobuf Data |----->| Protobuf Data |
- * | 0xAC02 | (300 bytes) | | (300 bytes) |
- * +--------+---------------+ +---------------+
- ublic class ProtobufVarint32FrameDecoder extends ByteToMessageDecoder {
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
- //標(biāo)記讀取的下標(biāo)位置
- in.markReaderIndex();
- //獲取讀取的下標(biāo)位置
- int preIndex = in.readerIndex();
- //解碼,獲取消息的長度,并且移動讀取的下標(biāo)位置
- int length = readRawVarint32(in);
- //比較解碼前和解碼后的下標(biāo)位置,如果相等。表示字節(jié)數(shù)不夠讀取,跳到下一輪
- if (preIndex == in.readerIndex()) {
- return;
- }
- //如果消息的長度小于0,拋出異常
- if (length < 0) {
- throw new CorruptedFrameException("negative length: " + length);
- }
- //如果不夠讀取一個完整的數(shù)據(jù),reset還原下標(biāo)位置。
- if (in.readableBytes() < length) {
- in.resetReaderIndex();
- } else {
- //否則,把數(shù)據(jù)寫入到out,接收端就拿到了完整的數(shù)據(jù)了
- out.add(in.readRetainedSlice(length));
- }
- }
總結(jié):
- 發(fā)送端通過編碼器在發(fā)送的時候在消息體前面加上一個描述數(shù)據(jù)長度的數(shù)據(jù)塊。
- 接收方通過解碼器先獲取描述數(shù)據(jù)長度的數(shù)據(jù)塊,知道完整數(shù)據(jù)的長度,然后根據(jù)數(shù)據(jù)長度獲取一條完整的數(shù)據(jù)。
Netty心跳檢測機(jī)制
何為心跳
所謂心跳, 即在 TCP 長連接中, 客戶端和服務(wù)器之間定期發(fā)送的一種特殊的數(shù)據(jù)包, 通知對方自己還在線, 以確保 TCP 連接的有效性.
注:心跳包還有另一個作用,經(jīng)常被忽略,即:一個連接如果長時間不用,防火墻或者路由器就會斷開該連接。
在 Netty 中, 實現(xiàn)心跳機(jī)制的關(guān)鍵是 IdleStateHandler, 看下它的構(gòu)造器:
- public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
- this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
- }
三個參數(shù)的含義如下:
- readerIdleTimeSeconds: 讀超時。即當(dāng)在指定的時間間隔內(nèi)沒有從 Channel 讀取到數(shù)據(jù)時, 會觸發(fā)一個 READER_IDLE 的 IdleStateEvent 事件。
- writerIdleTimeSeconds: 寫超時。 即當(dāng)在指定的時間間隔內(nèi)沒有數(shù)據(jù)寫入到 Channel 時, 會觸發(fā)一個 WRITER_IDLE 的 IdleStateEvent 事件。
- allIdleTimeSeconds: 讀/寫超時。 即當(dāng)在指定的時間間隔內(nèi)沒有讀或?qū)懖僮鲿r, 會觸發(fā)一個 ALL_IDLE 的 IdleStateEvent 事件。
注:這三個參數(shù)默認(rèn)的時間單位是秒。若需要指定其他時間單位,可以使用另一個構(gòu)造方法:
- public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
- this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
- }
要實現(xiàn)Netty服務(wù)端心跳檢測機(jī)制需要在服務(wù)器端的ChannelInitializer中加入如下的代碼:
- pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
Netty心跳源碼分析
初步地看下IdleStateHandler源碼,先看下IdleStateHandler中的channelRead方法:
紅框代碼其實表示該方法只是進(jìn)行了透傳,不做任何業(yè)務(wù)邏輯處理,讓channelPipe中的下一個handler處理channelRead方法;
我們再看看channelActive方法:
這里有個initialize的方法,這是IdleStateHandler的精髓,接著探究:
這邊會觸發(fā)一個Task,ReaderIdleTimeoutTask,這個task里的run方法源碼是這樣的:
第一個紅框代碼是用當(dāng)前時間減去最后一次channelRead方法調(diào)用的時間,假如這個結(jié)果是6s,說明最后一次調(diào)用channelRead已經(jīng)是6s 之前的事情了,你設(shè)置的是5s,那么nextDelay則為-1,說明超時了,那么第二個紅框代碼則會觸發(fā)下一個handler的 userEventTriggered方法:
如果沒有超時則不觸發(fā)userEventTriggered方法。
Netty心跳檢測代碼示例
服務(wù)端
- package com.niuh.netty.heartbeat;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelPipeline;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.string.StringDecoder;
- import io.netty.handler.codec.string.StringEncoder;
- import io.netty.handler.timeout.IdleStateHandler;
- import java.util.concurrent.TimeUnit;
- public class HeartBeatServer {
- public static void main(String[] args) throws Exception {
- EventLoopGroup boss = new NioEventLoopGroup();
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- ServerBootstrap bootstrap = new ServerBootstrap();
- bootstrap.group(boss, worker)
- .channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- pipeline.addLast("decoder", new StringDecoder());
- pipeline.addLast("encoder", new StringEncoder());
- //IdleStateHandler的readerIdleTime參數(shù)指定超過3秒還沒收到客戶端的連接,
- //會觸發(fā)IdleStateEvent事件并且交給下一個handler處理,下一個handler必須
- //實現(xiàn)userEventTriggered方法處理對應(yīng)事件
- pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
- pipeline.addLast(new HeartBeatServerHandler());
- }
- });
- System.out.println("netty server start。。");
- ChannelFuture future = bootstrap.bind(9000).sync();
- future.channel().closeFuture().sync();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- worker.shutdownGracefully();
- boss.shutdownGracefully();
- }
- }
- }
服務(wù)端回調(diào)處理類
- package com.niuh.netty.heartbeat;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.handler.timeout.IdleStateEvent;
- public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {
- int readIdleTimes = 0;
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
- System.out.println(" ====== > [server] message received : " + s);
- if ("Heartbeat Packet".equals(s)) {
- ctx.channel().writeAndFlush("ok");
- } else {
- System.out.println(" 其他信息處理 ... ");
- }
- }
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- IdleStateEvent event = (IdleStateEvent) evt;
- String eventType = null;
- switch (event.state()) {
- case READER_IDLE:
- eventType = "讀空閑";
- readIdleTimes++; // 讀空閑的計數(shù)加1
- break;
- case WRITER_IDLE:
- eventType = "寫空閑";
- // 不處理
- break;
- case ALL_IDLE:
- eventType = "讀寫空閑";
- // 不處理
- break;
- }
- System.out.println(ctx.channel().remoteAddress() + "超時事件:" + eventType);
- if (readIdleTimes > 3) {
- System.out.println(" [server]讀空閑超過3次,關(guān)閉連接,釋放更多資源");
- ctx.channel().writeAndFlush("idle close");
- ctx.channel().close();
- }
- }
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");
- }
- }
客戶端
- package com.niuh.netty.heartbeat;
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.*;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.handler.codec.string.StringDecoder;
- import io.netty.handler.codec.string.StringEncoder;
- import java.util.Random;
- public class HeartBeatClient {
- public static void main(String[] args) throws Exception {
- EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
- try {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- pipeline.addLast("decoder", new StringDecoder());
- pipeline.addLast("encoder", new StringEncoder());
- pipeline.addLast(new HeartBeatClientHandler());
- }
- });
- System.out.println("netty client start。。");
- Channel channel = bootstrap.connect("127.0.0.1", 9000).sync().channel();
- String text = "Heartbeat Packet";
- Random random = new Random();
- while (channel.isActive()) {
- int num = random.nextInt(10);
- Thread.sleep(2 * 1000);
- channel.writeAndFlush(text);
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- eventLoopGroup.shutdownGracefully();
- }
- }
- static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
- System.out.println(" client received :" + msg);
- if (msg != null && msg.equals("idle close")) {
- System.out.println(" 服務(wù)端關(guān)閉連接,客戶端也關(guān)閉");
- ctx.channel().closeFuture();
- }
- }
- }
- }
PS:以上代碼提交在 Github :
https://github.com/Niuh-Study/niuh-netty.git