從實戰(zhàn)開始,帶你深入了解Netty各個組件和ByteBuf
上文對IO模型和Reactor模型進行講解,是不是感覺有點懵懵的。哈哈哈,反正我并沒有對其有深入見解。我是這樣安慰自己的,知識在不斷的反復學習和思考中有新的感悟。不氣餒,繼續(xù)新的征程。本篇文章想來從實戰(zhàn)開始,帶我深入了解Netty各個組件是做什么?ByteBuf執(zhí)行原理又是怎樣的?
01一 第一個Netty實例
用Netty實現(xiàn)通信。說白了就是客戶端向服務端發(fā)消息,服務端接收消息并給客戶端響應。所以我來看看服務端和客戶端是如何實現(xiàn)的?
11.1 服務端
1. 依賴
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.haopt.iot</groupId>
- <artifactId>first-netty</artifactId>
- <packaging>jar</packaging>
- <version>1.0-SNAPSHOT</version>
- <dependencies>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>4.1.50.Final</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.12</version>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.2</version>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- <encoding>UTF-8</encoding>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </project>
2. 服務端-MyRPCServer
- package com.haopt.netty.server;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.buffer.UnpooledByteBufAllocator;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- public class MyRPCServer {
- public void start(int port) throws Exception {
- // 主線程,不處理任何業(yè)務邏輯,只是接收客戶的連接請求
- EventLoopGroup boss = new NioEventLoopGroup(1);
- // 工作線程,線程數(shù)默認是:cpu核數(shù)*2
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- // 服務器啟動類
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group(boss, worker) //設置線程組
- .channel(NioServerSocketChannel.class) //配置server通道
- .childHandler(new MyChannelInitializer()); //worker線程的處理器
- //ByteBuf 的分配要設置為非池化,否則不能切換到堆緩沖區(qū)模式
- serverBootstrap.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
- ChannelFuture future = serverBootstrap.bind(port).sync();
- System.out.println("服務器啟動完成,端口為:" + port);
- //等待服務端監(jiān)聽端口關閉
- future.channel().closeFuture().sync();
- } finally {
- //優(yōu)雅關閉
- boss.shutdownGracefully();
- worker.shutdownGracefully();
- }
- }
- }
3. 服務端-ChannelHandler
- package com.haopt.netty.server.handler;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.util.CharsetUtil;
- public class MyChannelHandler extends ChannelInboundHandlerAdapter {
- /**
- * 獲取客戶端發(fā)來的數(shù)據(jù)
- * @param ctx
- * @param msg
- * @throws Exception
- */
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- ByteBuf byteBuf = (ByteBuf) msg;
- String msgStr = byteBuf.toString(CharsetUtil.UTF_8);
- System.out.println("客戶端發(fā)來數(shù)據(jù):" + msgStr);
- //向客戶端發(fā)送數(shù)據(jù)
- ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
- }
- /**
- * 異常處理
- * @param ctx
- * @param cause
- * @throws Exception
- */
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- }
4. 測試用例
- package com.haopt.netty.myrpc;
- import com.haopt.netty.server.MyRPCServer;
- import org.junit.Test;
- public class TestServer {
- @Test
- public void testServer() throws Exception{
- MyRPCServer myRPCServer = new MyRPCServer();
- myRPCServer.start(5566);
- }
- }
21.2 客戶端
1. 客戶端-client
- package com.haopt.netty.client;
- import com.haopt.netty.client.handler.MyClientHandler;
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioSocketChannel;
- public class MyRPCClient {
- public void start(String host, int port) throws Exception {
- //定義⼯作線程組
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- //注意:client使⽤的是Bootstrap
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(worker)
- .channel(NioSocketChannel.class) //注意:client使⽤的是NioSocketChannel
- .handler(new MyClientHandler());
- //連接到遠程服務
- ChannelFuture future = bootstrap.connect(host, port).sync();
- future.channel().closeFuture().sync();
- } finally {
- worker.shutdownGracefully();
- }
- }
- }
2. 客戶端-(ClientHandler)
- package com.haopt.netty.client.handler;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.util.CharsetUtil;
- public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
- System.out.println("接收到服務端的消息:" +
- msg.toString(CharsetUtil.UTF_8));
- }
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- // 向服務端發(fā)送數(shù)據(jù)
- String msg = "hello";
- ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- }
相信代碼執(zhí)行起來沒有任何問題(如果有任何問題反應交流)。但是對上面代碼為何這樣實現(xiàn)有很多疑。嘿嘿,我也是奧。接下來我們對這些代碼中用到的組件進行介紹,希望能消除之前疑慮。如果還是不能,可以把疑問寫于留言處,嘿嘿,我也不一定會有個好的解答,但是大佬總會有的。
02二 Netty核心組件
我們都知道Netty是基于事件驅動。但是事件發(fā)生后,Netty的各個組件都做了什么?來看看下面內(nèi)容!
3 2.1 Channel
1. 初識Channel
- a 可以理解為socket連接,客戶端和服務端連接的時候會創(chuàng)建一個channel。
- 負責基本的IO操作,例如:bind()、connect()、read()、write()。
- b Netty的Channel接口所提供的API,大大減少了Socket類復雜性
2. 常見Channel(不同的協(xié)議和阻塞類型的連接會有不同的Channel類型與之對應)
- a NioSocketChannel,NIO的客戶端 TCP Socket 連接。
- b NioServerSocketChannel,NIO的服務器端 TCP Socket 連接。
- c NioDatagramChannel, UDP 連接。
- d NioSctpChannel,客戶端 Sctp 連接。
- e NioSctpServerChannel,Sctp 服務器端連接,這些通道涵蓋了UDP和TCP⽹絡IO以及⽂件IO。
4 2.2 EventLoopGroup、EventLoop
1. 概述
- 有了Channel連接服務,連接之間消息流動。服務器發(fā)出消息稱為出站,服務器接受消息稱為入站。
- 那么消息出站和入站就產(chǎn)生了事件例如:連接已激活;數(shù)據(jù)讀??;用戶事件;異常事件;打開連接;
- 關閉連接等等。有了事件,有了事件就需要機制來監(jiān)控和協(xié)調(diào)事件,這個機制就是EventLoop。
2. 初識EventLoopGroup、EventLoop

對上圖解釋
- a 一個EventLoopGroup包含一個或者多個EventLoop
- b 一個EventLoop在生命周期內(nèi)之和一個Thread綁定
- c EventLoop上所有的IO事件在它專有的Thread上被處理。
- d Channel在它生命周期只注冊于一個Event Loop
- e 一個Event Loop可能被分配給一個或者多個Channel
3. 代碼實現(xiàn)
- // 主線程,不處理任何業(yè)務邏輯,只是接收客戶的連接請求
- EventLoopGroup boss = new NioEventLoopGroup(1);
- // ⼯作線程,線程數(shù)默認是:cpu*2
- EventLoopGroup worker = new NioEventLoopGroup();
5 2.3 ChannelHandler
1. 初識ChannelHandler
- 對于數(shù)據(jù)的出站和入棧的業(yè)務邏輯都是在ChannelHandler中。
2. 對于出站和入站對應的ChannelHandler

- ChannelInboundHandler ⼊站事件處理器
- ChannelOutBoundHandler 出站事件處理器
3. 開發(fā)中常用的ChannelHandler(ChannelInboundHandlerAdapter、SimpleChannelInboundHandler)
a 源碼
b SimpleChannelInboundHandler的源碼(是ChannelInboundHandlerAdapter子類)

注意:
兩者的區(qū)別在于,前者不會釋放消息數(shù)據(jù)的引⽤,⽽后者會釋放消息數(shù)據(jù)的引⽤。
6 2.4 ChannelPipeline
1. 初識ChannelPipeline
- 將ChannelHandler串起來。一個Channel包含一個ChannelPipeline,而ChannelPipeline維護者一個ChannelHandler列表。
- ChannelHandler與Channel和ChannelPipeline之間的映射關系,由ChannelHandlerContext進⾏維護。
如上圖解釋
- ChannelHandler按照加⼊的順序會組成⼀個雙向鏈表,⼊站事件從鏈表的head往后傳遞到最后⼀個ChannelHandler。
- 出站事件從鏈表的tail向前傳遞,直到最后⼀個ChannelHandler,兩種類型的ChannelHandler相互不會影響。
7 2.5 Bootstrap
1. 初識Bootstrap
- 是引導作用,配置整個netty程序,將各個組件串起來,最后綁定接口,啟動服務。
2. Bootstrap兩種類型(Bootstrap、ServerBootstrap)
- 客戶端只需要一個EventLoopGroup,服務端需要兩個EventLoopGroup。
上圖解釋
- 與ServerChannel相關聯(lián)的EventLoopGroup 將分配⼀個負責為傳⼊連接請求創(chuàng)建 Channel 的EventLoop。
- ⼀旦連接被接受,第⼆個 EventLoopGroup 就會給它的 Channel 分配⼀個 EventLoop。
8 2.6 Future
1. 初識
- 操作完成時通知應用程序的方式。這個對象可以看做異步操作執(zhí)行結果占位符,它在將來某個時刻完成,并提供對其結果的訪問。
2. ChannelFuture的由來
- JDK 預置了 interface java.util.concurrent.Future,但是其所提供的實現(xiàn),
- 只允許⼿動檢查對應的操作是否已經(jīng)完成,或者⼀直阻塞直到它完成。這是⾮常
- 繁瑣的,所以 Netty 提供了它⾃⼰的實現(xiàn)--ChannelFuture,⽤于在執(zhí)⾏異步
- 操作的時候使⽤。
3. Netty為什么完全是異步?
- a ChannelFuture提供了⼏種額外的⽅法,這些⽅法使得我們能夠注冊⼀個或者多個 ChannelFutureListener實例。
- b 監(jiān)聽器的回調(diào)⽅法operationComplete(),將會在對應的操作完成時被調(diào)⽤。
- 然后監(jiān)聽器可以判斷該操作是成功地完成了還是出錯了。
- c 每個 Netty 的出站 I/O 操作都將返回⼀個 ChannelFuture,也就是說,
- 它們都不會阻塞。所以說,Netty完全是異步和事件驅動的。
9 2.7 組件小結

上圖解釋
- 將組件串起來
03三 緩存區(qū)-ByteBuf
ByteBuf是我們開發(fā)中代碼操作最多部分和出現(xiàn)問題最多的一部分。比如常見的TCP協(xié)議通信的粘包和拆包解決,和ByteBuf密切相關。后面文章會詳細分析,先不展開。我們這里先了解ByteBuf的常用API和執(zhí)行內(nèi)幕。
10 3.1 ByteBuf概述
1. 初識ByteBuf
- JavaNIO提供了緩存容器(ByteBuffer),但是使用復雜。因此netty引入緩存ButeBuf,
- 一串字節(jié)數(shù)組構成。
2. ByteBuf兩個索引(readerIndex,writerIndex)
- a readerIndex 將會根據(jù)讀取的字節(jié)數(shù)遞增
- b writerIndex 也會根據(jù)寫⼊的字節(jié)數(shù)進⾏遞增
- 注意:如果readerIndex超過了writerIndex的時候,Netty會拋出IndexOutOf-BoundsException異常。
11 3.2 ByteBuf基本使用
1. 讀取
- package com.haopt.netty.myrpc.test;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.util.CharsetUtil;
- public class TestByteBuf01 {
- public static void main(String[] args) {
- //構造
- ByteBuf byteBuf = Unpooled.copiedBuffer("hello world",
- CharsetUtil.UTF_8);
- System.out.println("byteBuf的容量為:" + byteBuf.capacity());
- System.out.println("byteBuf的可讀容量為:" + byteBuf.readableBytes());
- System.out.println("byteBuf的可寫容量為:" + byteBuf.writableBytes());
- while (byteBuf.isReadable()){ //⽅法⼀:內(nèi)部通過移動readerIndex進⾏讀取
- System.out.println((char)byteBuf.readByte());
- }
- //⽅法⼆:通過下標直接讀取
- for (int i = 0; i < byteBuf.readableBytes(); i++) {
- System.out.println((char)byteBuf.getByte(i));
- }
- //⽅法三:轉化為byte[]進⾏讀取
- byte[] bytes = byteBuf.array();
- for (byte b : bytes) {
- System.out.println((char)b);
- }
- }
- }
2. 寫入
- package com.haopt.netty.myrpc.test;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.util.CharsetUtil;
- public class TestByteBuf02 {
- public static void main(String[] args) {
- //構造空的字節(jié)緩沖區(qū),初始⼤⼩為10,最⼤為20
- ByteBuf byteBuf = Unpooled.buffer(10,20);
- System.out.println("byteBuf的容量為:" + byteBuf.capacity());
- System.out.println("byteBuf的可讀容量為:" + byteBuf.readableBytes());
- System.out.println("byteBuf的可寫容量為:" + byteBuf.writableBytes());
- for (int i = 0; i < 5; i++) {
- byteBuf.writeInt(i); //寫⼊int類型,⼀個int占4個字節(jié)
- }
- System.out.println("ok");
- System.out.println("byteBuf的容量為:" + byteBuf.capacity());
- System.out.println("byteBuf的可讀容量為:" + byteBuf.readableBytes());
- System.out.println("byteBuf的可寫容量為:" + byteBuf.writableBytes());
- while (byteBuf.isReadable()){
- System.out.println(byteBuf.readInt());
- }
- }
- }
3. 丟棄已讀字節(jié)

- package com.haopt.netty.myrpc.test;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.util.CharsetUtil;
- public class TestByteBuf03 {
- public static void main(String[] args) {
- ByteBuf byteBuf = Unpooled.copiedBuffer("hello world",CharsetUtil.UTF_8);
- System.out.println("byteBuf的容量為:" + byteBuf.capacity());
- System.out.println("byteBuf的可讀容量為:" + byteBuf.readableBytes());
- System.out.println("byteBuf的可寫容量為:" + byteBuf.writableBytes());
- while (byteBuf.isReadable()){
- System.out.println((char)byteBuf.readByte());
- }
- byteBuf.discardReadBytes(); //丟棄已讀的字節(jié)空間
- System.out.println("byteBuf的容量為:" + byteBuf.capacity());
- System.out.println("byteBuf的可讀容量為:" + byteBuf.readableBytes());
- System.out.println("byteBuf的可寫容量為:" + byteBuf.writableBytes());
- }
4. clear()

- package com.haopt.netty.myrpc.test;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.util.CharsetUtil;
- public class TestByteBuf04 {
- public static void main(String[] args) {
- ByteBuf byteBuf = Unpooled.copiedBuffer("hello world",CharsetUtil.UTF_8);
- System.out.println("byteBuf的容量為:" + byteBuf.capacity());
- System.out.println("byteBuf的可讀容量為:" + byteBuf.readableBytes());
- System.out.println("byteBuf的可寫容量為:" + byteBuf.writableBytes());
- byteBuf.clear(); //重置readerIndex 、 writerIndex 為0
- System.out.println("byteBuf的容量為:" + byteBuf.capacity());
- System.out.println("byteBuf的可讀容量為:" + byteBuf.readableBytes());
- System.out.println("byteBuf的可寫容量為:" + byteBuf.writableBytes());
- }
- }
12 3.3 ByteBuf 使⽤模式
3.3.1 根據(jù)存放緩沖區(qū),分為三類
1. 堆緩存區(qū)(HeapByteBuf)
- 內(nèi)存的分配和回收速度⽐較快,可以被JVM⾃動回收,缺點是,如果進⾏socket的IO讀寫,需要額外做⼀次內(nèi)存復制,將堆內(nèi)存對應的緩沖區(qū)復制到內(nèi)核Channel中,性能會有⼀定程度的下降。
- 由于在堆上被 JVM 管理,在不被使⽤時可以快速釋放。可以通過 ByteBuf.array() 來獲取 byte[] 數(shù)
- 據(jù)。
2. 直接緩存區(qū)(DirectByteBuf)
- ⾮堆內(nèi)存,它在對外進⾏內(nèi)存分配,相⽐堆內(nèi)存,它的分配和回收速度會慢⼀些,但是
- 將它寫⼊或從Socket Channel中讀取時,由于減少了⼀次內(nèi)存拷⻉,速度⽐堆內(nèi)存塊。
3. 復合緩存區(qū)
- 顧名思義就是將上述兩類緩沖區(qū)聚合在⼀起。Netty 提供了⼀個 CompsiteByteBuf,
- 可以將堆緩沖區(qū)和直接緩沖區(qū)的數(shù)據(jù)放在⼀起,讓使⽤更加⽅便。
3.3.2 緩存區(qū)選擇
Netty默認使⽤的是直接緩沖區(qū)(DirectByteBuf),如果需要使⽤堆緩沖區(qū)(HeapByteBuf)模式,則需要進⾏系統(tǒng)參數(shù)的設置。
- //netty中IO操作都是基于Unsafe完成的
- System.setProperty("io.netty.noUnsafe", "true");
- //ByteBuf的分配要設置為⾮池化,否則不能切換到堆緩沖器模式
- serverBootstrap.childOption(ChannelOption.ALLOCATOR,UnpooledByteBufAllocator.DEFAULT);
3.3.3 ByteBuf對象是否池化(Netty是默認池化的)
1. 池化化和非池化的實現(xiàn)
- PooledByteBufAllocator,實現(xiàn)了ByteBuf的對象的池化,提⾼性能減少并最⼤限度地減少內(nèi)存碎⽚。
- UnpooledByteBufAllocator,沒有實現(xiàn)對象的池化,每次會⽣成新的對象實例。
2. 代碼實現(xiàn)(讓Netty中ByteBuf對象不池化)
- //通過ChannelHandlerContext獲取ByteBufAllocator實例
- ctx.alloc();
- //通過channel也可以獲取
- channel.alloc();
- //Netty默認使⽤了PooledByteBufAllocator
- //可以在引導類中設置⾮池化模式
- serverBootstrap.childOption(ChannelOption.ALLOCATOR,UnpooledByteBufAllocator.DEFAULT);
- //或通過系統(tǒng)參數(shù)設置
- System.setProperty("io.netty.allocator.type", "pooled");
- System.setProperty("io.netty.allocator.type", "unpooled");
我在開發(fā)項目中,我一般不進行更改。因為我覺得池化效率更高。有其他高見,歡迎留言。
13 3.5 ByteBuf的釋放
ByteBuf如果采⽤的是堆緩沖區(qū)模式的話,可以由GC回收,但是如果采⽤的是直接緩沖區(qū),就不受GC的 管理,就得⼿動釋放,否則會發(fā)⽣內(nèi)存泄露。
3.5.1 ByteBuf的手動釋放(一般不推薦使用,了解)
1. 實現(xiàn)邏輯
- ⼿動釋放,就是在使⽤完成后,調(diào)⽤ReferenceCountUtil.release(byteBuf); 進⾏釋放。
- 通過release⽅法減去 byteBuf的使⽤計數(shù),Netty 會⾃動回收 byteBuf。
2. 代碼
- /**
- * 獲取客戶端發(fā)來的數(shù)據(jù)
- *
- * @param ctx
- * @param msg
- * @throws Exception
- */
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- ByteBuf byteBuf = (ByteBuf) msg;
- String msgStr = byteBuf.toString(CharsetUtil.UTF_8);
- System.out.println("客戶端發(fā)來數(shù)據(jù):" + msgStr);
- //釋放資源
- ReferenceCountUtil.release(byteBuf);
- }
注意:
⼿動釋放可以達到⽬的,但是這種⽅式會⽐較繁瑣,如果⼀旦忘記釋放就可能會造成內(nèi)存泄露。
3.5.1 ByteBuf的自動釋放
⾃動釋放有三種⽅式,分別是:⼊站的TailHandler、繼承SimpleChannelInboundHandler、 HeadHandler的出站釋放。
1. TailHandler
Netty的ChannelPipleline的流⽔線的末端是TailHandler,默認情況下如果每個⼊站處理器Handler都把消息往下傳,TailHandler會釋放掉ReferenceCounted類型的消息。
- /**
- * 獲取客戶端發(fā)來的數(shù)據(jù)
- * @param ctx
- * @param msg
- * @throws Exception
- */
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- ByteBuf byteBuf = (ByteBuf) msg;
- String msgStr = byteBuf.toString(CharsetUtil.UTF_8);
- System.out.println("客戶端發(fā)來數(shù)據(jù):" + msgStr);
- //向客戶端發(fā)送數(shù)據(jù)
- ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
- ctx.fireChannelRead(msg); //將ByteBuf向下傳遞
- }
源碼:
在DefaultChannelPipeline中的TailContext內(nèi)部類會在最后執(zhí)⾏
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- onUnhandledInboundMessage(ctx, msg);
- }
- //最后會執(zhí)⾏
- protected void onUnhandledInboundMessage(Object msg) {
- try {
- logger.debug(
- "Discarded inbound message {} that reached at the tail of the
- pipeline. " + "Please check your pipeline configuration.", msg);
- } finally {
- ReferenceCountUtil.release(msg); //釋放資源
- }
- }
2. SimpleChannelInboundHandler
當ChannelHandler繼承了SimpleChannelInboundHandler后,在SimpleChannelInboundHandler的channelRead()⽅法中,將會進⾏資源的釋放。
SimpleChannelInboundHandler的源碼
- //SimpleChannelInboundHandler中的channelRead()
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- boolean release = true;
- try {
- if (acceptInboundMessage(msg)) {
- @SuppressWarnings("unchecked")
- I imsg = (I) msg;
- channelRead0(ctx, imsg);
- } else {
- release = false;
- ctx.fireChannelRead(msg);
- }
- } finally {
- if (autoRelease && release) {
- ReferenceCountUtil.release(msg); //在這⾥釋放
- }
- }
- }
我們handler代碼編寫:
- package com.haopt.myrpc.client.handler;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.util.CharsetUtil;
- public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
- System.out.println("接收到服務端的消息:" +
- msg.toString(CharsetUtil.UTF_8));
- }
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- // 向服務端發(fā)送數(shù)據(jù)
- String msg = "hello";
- ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- }
3. 堆緩沖區(qū)(HeadHandler)
出站處理流程中,申請分配到的ByteBuf,通過HeadHandler完成⾃動釋放。
在出站流程開始的時候,通過調(diào)⽤ctx.writeAndFlush(msg),Bytebuf緩沖區(qū)開始進⼊出站處理的pipeline流⽔線。
在每⼀個出站Handler中的處理完成后,最后消息會來到出站的最后⼀棒HeadHandler,再經(jīng)過⼀輪復雜的調(diào)⽤,在flush完成后終將被release掉。
- package com.haopt.myrpc.client.handler;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.util.CharsetUtil;
- public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws
- Exception {
- System.out.println("接收到服務端的消息:" +
- msg.toString(CharsetUtil.UTF_8));
- }
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- // 向服務端發(fā)送數(shù)據(jù)
- String msg = "hello";
- ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- }
14 3.6 ByteBuf小結
a ⼊站流程中,如果對原消息不做處理,調(diào)ctx.fireChannelRead(msg) 把
原消息往下傳,由流⽔線最后⼀棒 TailHandler 完成⾃動釋放。
b 如果截斷了⼊站處理流⽔線,則繼承SimpleChannelInboundHandler ,完成⼊站ByteBuf ⾃動釋放。
c 出站處理過程中,申請分配到的 ByteBuf,通過 HeadHandler 完成⾃動釋放。
d ⼊站處理中,如果將原消息轉化為新的消息ctx.fireChannelRead(newMsg)往下傳,那必須把原消息release掉。
e ⼊站處理中,如果已經(jīng)不再調(diào)⽤ ctx.fireChannelRead(msg) 傳遞任何消息,也沒有繼承SimpleChannelInboundHandler 完成⾃動釋放,那更要把原消息release掉。