TCP通信過程中遇到粘包拆包解決全過程
在使用TCP協(xié)議進行通信時,聽到最多的也就是粘包和拆包問題。本文就來看看,如何解決粘包和拆包問題。
01一 TCP的粘包/拆包的問題以及解決
在解決TCP粘包和拆包,我們先看看一種思想。來看看讀取一個Int數(shù)據(jù)的Demo,體會下這種思想。
1.1 ReplayingDecoder
1. 自定義解碼器,從ByteBuf讀取一個Int。(重點,一定要看懂這段代碼)
- public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
- if (buf.readableBytes() < 4) {
- return;
- }
- buf.markReaderIndex();//標記下當前讀指針。
- int length = buf.readInt();//從ByteBuf讀出一個int
- if (buf.readableBytes() < length) {
- buf.resetReaderIndex();//恢復(fù)到剛才標記的讀指針
- return;
- }
- out.add(buf.readBytes(length));
- }
- }
2. 使用ReplayingDecoder進行優(yōu)化()
- public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> {
- protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
- out.add(buf.readBytes(buf.readInt()));
- }
- }
3. ReplayingDecoder使用說明(重點,要理解的)
- a 使⽤了特殊的ByteBuf,叫做ReplayingDecoderByteBuf,擴展了ByteBuf
- b 重寫了ByteBuf的readXxx()等⽅法,會先檢查可讀字節(jié)⻓度,⼀旦檢測到不滿⾜要求就直接拋出REPLAY(REPLAY繼承ERROR)
- c ReplayingDecoder重寫了ByteToMessageDecoder的callDecode()⽅法,捕獲Signal并在catch塊中重置ByteBuf的readerIndex。
- d 繼續(xù)等待數(shù)據(jù),直到有了數(shù)據(jù)后繼續(xù)讀取,這樣就可以保證讀取到需要讀取的數(shù)據(jù)。
- e 類定義中的泛型S是⼀個⽤于記錄解碼狀態(tài)的狀態(tài)機枚舉類,在state(S s)、checkpoint(S s)等⽅法中會⽤到。在簡單解碼時也可以⽤java.lang.Void來占位。
總結(jié):
ReplayingDecoder是ByteToMessageDecoder的子類,擴展了ByteBuf。從寫了readXxx()等⽅法,當前ByteBuf中數(shù)據(jù)小于代取數(shù)據(jù),等待數(shù)據(jù)滿足,才能取數(shù)據(jù)。就可以省略手動實現(xiàn)這段代碼。
4. 注意
- 1 buffer的部分操作(readBytes(ByteBuffer dst)、retain()、release()等⽅法會直接拋出異常)
- 2 在某些情況下會影響性能(如多次對同⼀段消息解碼)
繼承ReplayingDecoder,錯誤示例和修改
- //這是⼀個錯誤的例⼦:
- //消息中包含了2個integer,代碼中decode⽅法會被調(diào)⽤兩次,此時隊列size不等于2,這段代碼達不到期望結(jié)果。
- public class MyDecoder extends ReplayingDecoder<Void> {
- private final Queue<Integer> values = new LinkedList<Integer>();
- @Override
- public void decode(ByteBuf buf, List<Object> out) throws Exception {
- // A message contains 2 integers.
- values.offer(buf.readInt());
- values.offer(buf.readInt());
- assert values.size() == 2;
- out.add(values.poll() + values.poll());
- }
- }
- //正確的做法:
- public class MyDecoder extends ReplayingDecoder<Void> {
- private final Queue<Integer> values = new LinkedList<Integer>();
- @Override
- public void decode(ByteBuf buf, List<Object> out) throws Exception {
- // Revert the state of the variable that might have been changed
- // since the last partial decode.
- values.clear();
- // A message contains 2 integers.
- values.offer(buf.readInt());
- values.offer(buf.readInt());
- // Now we know this assertion will never fail.
- assert values.size() == 2;
- out.add(values.poll() + values.poll());
- }
- }
ByteToIntegerDecoder2的實現(xiàn)
- public class ByteToIntegerDecoder2 extends ReplayingDecoder<Void> {
- /**
- * @param ctx 上下⽂
- * @param in 輸⼊的ByteBuf消息數(shù)據(jù)
- * @param out 轉(zhuǎn)化后輸出的容器
- * @throws Exception
- */
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
- out.add(in.readInt()); //讀取到int類型數(shù)據(jù),放⼊到輸出,完成數(shù)據(jù)類型的轉(zhuǎn)化
- }
- }
1.2 拆包和粘包問題重現(xiàn)(客戶端向服務(wù)端發(fā)送十條數(shù)據(jù))
1. 客戶端啟動類
- public class NettyClient {
- public static void main(String[] args) throws Exception{
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- // 服務(wù)器啟動類
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(worker);
- bootstrap.channel(NioSocketChannel.class);
- bootstrap.handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new ClientHandler());
- }
- });
- ChannelFuture future = bootstrap.connect("127.0.0.1", 5566).sync();
- future.channel().closeFuture().sync();
- } finally {
- worker.shutdownGracefully();
- }
- }
- }
2. 客戶端ClientHandler
- public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
- private int count;
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
- System.out.println("接收到服務(wù)端的消息:" +
- msg.toString(CharsetUtil.UTF_8));
- System.out.println("接收到服務(wù)端的消息數(shù)量:" + (++count));
- }
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- for (int i = 0; i < 10; i++) {
- ctx.writeAndFlush(Unpooled.copiedBuffer("from client a message!",
- CharsetUtil.UTF_8));
- }
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- }
3. 服務(wù)端NettyServer
- public class NettyServer {
- public static void main(String[] args) throws Exception {
- // 主線程,不處理任何業(yè)務(wù)邏輯,只是接收客戶的連接請求
- EventLoopGroup boss = new NioEventLoopGroup(1);
- // ⼯作線程,線程數(shù)默認是:cpu*2
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- // 服務(wù)器啟動類
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group(boss, worker);
- //配置server通道
- serverBootstrap.channel(NioServerSocketChannel.class);
- serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline()
- .addLast(new ServerHandler());
- }
- }); //worker線程的處理器
- ChannelFuture future = serverBootstrap.bind(5566).sync();
- System.out.println("服務(wù)器啟動完成。。。。。");
- //等待服務(wù)端監(jiān)聽端⼝關(guān)閉
- future.channel().closeFuture().sync();
- } finally {
- //優(yōu)雅關(guān)閉
- boss.shutdownGracefully();
- worker.shutdownGracefully();
- }
- }
- }
4. ServerHandler
- public class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
- private int count;
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
- System.out.println("服務(wù)端接收到消息:" +
- msg.toString(CharsetUtil.UTF_8));
- System.out.println("服務(wù)端接收到消息數(shù)量:" + (++count));
- ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
- }
- }
1.3 什么是TCP的粘包和拆包問題
TCP是流傳遞的,所謂流,就是沒有界限的數(shù)據(jù)。服務(wù)端接受客戶端數(shù)據(jù),并不知道是一條還是多條。服務(wù)端在讀取數(shù)據(jù)的時候會出現(xiàn)粘包問題。
因此服務(wù)端和客戶端進行數(shù)據(jù)傳遞的時候,要制定好拆包規(guī)則??蛻舳税凑赵撘?guī)則進行粘包,服務(wù)端按照該規(guī)則拆包。如果有任意違背該規(guī)則,服務(wù)端就不能拿到預(yù)期的數(shù)據(jù)。
1. 解決思路(三種)
- 1. 在發(fā)送的數(shù)據(jù)包中添加頭,在頭⾥存儲數(shù)據(jù)的⼤⼩,服務(wù)端就可以按照此⼤⼩來讀取數(shù)據(jù),這樣就知道界限在哪⾥了。
- 2. 以固定的⻓度發(fā)送數(shù)據(jù),超出的分多次發(fā)送,不⾜的以0填充,接收端就以固定⻓度接收即可。
- 3. 在數(shù)據(jù)包之間設(shè)置邊界,如添加特殊符號,這樣,接收端通過這個邊界就可以將不同的數(shù)據(jù)包拆分開。
1.4 實戰(zhàn):解決TCP的粘包/拆包問題
1. 自定義協(xié)議
- public class MyProtocol {
- private Integer length; //數(shù)據(jù)頭:⻓度
- private byte[] body; //數(shù)據(jù)體
- public Integer getLength() {
- return length;
- }
- public void setLength(Integer length) {
- this.length = length;
- }
- public byte[] getBody() {
- return body;
- }
- public void setBody(byte[] body) {
- this.body = body;
- }
2. 編碼器
- public class MyEncoder extends MessageToByteEncoder<MyProtocol> {
- @Override
- protected void encode(ChannelHandlerContext ctx, MyProtocol msg, ByteBuf out) throws Exception {
- out.writeInt(msg.getLength());
- out.writeBytes(msg.getBody());
- }
- }
3. 解碼器
- public class MyDecoder extends ReplayingDecoder<Void> {
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
- int length = in.readInt(); //獲取⻓度
- byte[] data = new byte[length]; //根據(jù)⻓度定義byte數(shù)組
- in.readBytes(data); //讀取數(shù)據(jù)
- MyProtocol myProtocol = new MyProtocol();
- myProtocol.setLength(length);
- myProtocol.setBody(data);
- out.add(myProtocol);
- }
- }
4. 客戶端ClientHandler
- public class ClientHandler extends SimpleChannelInboundHandler<MyProtocol> {
- private int count;
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, MyProtocol msg) throws Exception {
- System.out.println("接收到服務(wù)端的消息:" + new String(msg.getBody(),
- CharsetUtil.UTF_8));
- System.out.println("接收到服務(wù)端的消息數(shù)量:" + (++count));
- }
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- for (int i = 0; i < 10; i++) {
- byte[] data = "from client a message!".getBytes(CharsetUtil.UTF_8);
- MyProtocol myProtocol = new MyProtocol();
- myProtocol.setLength(data.length);
- myProtocol.setBody(data);
- ctx.writeAndFlush(myProtocol);
- }
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- }
5. NettyClient
- public class NettyClient {
- public static void main(String[] args) throws Exception{
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- // 服務(wù)器啟動類
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(worker);
- bootstrap.channel(NioSocketChannel.class);
- bootstrap.handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new MyEncoder());
- ch.pipeline().addLast(new MyDecoder());
- ch.pipeline().addLast(new ClientHandler());
- }
- });
- ChannelFuture future = bootstrap.connect("127.0.0.1", 5566).sync();
- future.channel().closeFuture().sync();
- } finally {
- worker.shutdownGracefully();
- }
- }
- }
6. ServerHandler
- public class ServerHandler extends SimpleChannelInboundHandler<MyProtocol> {
- private int count;
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, MyProtocol msg) throws Exception {
- System.out.println("服務(wù)端接收到消息:" + new String(msg.getBody(),
- CharsetUtil.UTF_8));
- System.out.println("服務(wù)端接收到消息數(shù)量:" + (++count));
- byte[] data = "ok".getBytes(CharsetUtil.UTF_8);
- MyProtocol myProtocol = new MyProtocol();
- myProtocol.setLength(data.length);
- myProtocol.setBody(data);
- ctx.writeAndFlush(myProtocol);
- }
- }
7. NettyServer
- public class NettyServer {
- public static void main(String[] args) throws Exception {
- // 主線程,不處理任何業(yè)務(wù)邏輯,只是接收客戶的連接請求
- EventLoopGroup boss = new NioEventLoopGroup(1);
- // ⼯作線程,線程數(shù)默認是:cpu*2
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- // 服務(wù)器啟動類
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group(boss, worker);
- //配置server通道
- serverBootstrap.channel(NioServerSocketChannel.class);
- serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline()
- .addLast(new MyDecoder())
- .addLast(new MyEncoder())
- .addLast(new ServerHandler());
- }
- }); //worker線程的處理器
- ChannelFuture future = serverBootstrap.bind(5566).sync();
- System.out.println("服務(wù)器啟動完成。。。。。");
- //等待服務(wù)端監(jiān)聽端⼝關(guān)閉
- future.channel().closeFuture().sync();
- } finally {
- //優(yōu)雅關(guān)閉
- boss.shutdownGracefully();
- worker.shutdownGracefully();
- }
- }
- }
8. 測試
02二 Netty核心源碼解析
2.1 服務(wù)端啟動過程刨析
1. 創(chuàng)建服務(wù)端Channel
- 1 ServerBootstrap對象的bind()⽅法,也是⼊⼝⽅法
- 2 AbstractBootstrap中的initAndRegister()進⾏創(chuàng)建Channel
- 創(chuàng)建Channel的⼯作由ReflectiveChannelFactory反射類中的newChannel()⽅法完成。
- 3 NioServerSocketChannel中的構(gòu)造⽅法中,通過jdk nio底層的SelectorProvider打開ServerSocketChannel。
- 4 在AbstractNioChannel的構(gòu)造⽅法中,設(shè)置channel為⾮阻塞:ch.configureBlocking(false);
- 5 通過的AbstractChannel的構(gòu)造⽅法,創(chuàng)建了id、unsafe、pipeline內(nèi)容。
- 6 通過NioServerSocketChannelConfig獲取tcp底層的⼀些參數(shù)
2. 初始化服務(wù)端Channel
- 1 AbstractBootstrap中的initAndRegister()進⾏初始化channel,代碼:init(channel);
- 2 在ServerBootstrap中的init()⽅法設(shè)置channelOptions以及Attributes。
- 3 緊接著,將⽤戶⾃定義參數(shù)、屬性保存到局部變量currentChildOptions、currentChildAttrs,以
- 供后⾯使⽤
- 4 如果設(shè)置了serverBootstrap.handler()的話,會加⼊到pipeline中。
- 5 添加連接器ServerBootstrapAcceptor,有新連接加⼊后,將⾃定義的childHandler加⼊到連接的
- pipeline中:
- ch.eventLoop().execute(new Runnable() {
- @Override
- public void run() {
- pipeline.addLast(
- new ServerBootstrapAcceptor(ch, currentChildGroup,currentChildHandler, currentChildOptions, currentChildAttrs));
- }
- });
- @Override
- @SuppressWarnings("unchecked")
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- //當客戶端有連接時才會執(zhí)⾏
- final Channel child = (Channel) msg;
- //將⾃定義的childHandler加⼊到連接的pipeline中
- child.pipeline().addLast(childHandler);
- setChannelOptions(child, childOptions, logger);
- setAttributes(child, childAttrs);
- try {
- childGroup.register(child).addListener(new ChannelFutureListener(){
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- forceClose(child, future.cause());
- }
- }
- });
- } catch (Throwable t) {
- forceClose(child, t);
- }
- }
3. 注冊selector
- //進⾏注冊
- 1 initAndRegister()⽅法中的ChannelFuture regFuture = config().group().register(channel);
- 2 在io.netty.channel.AbstractChannel.AbstractUnsafe#register()中完成實際的注冊
- 2.1 AbstractChannel.this.eventLoop = eventLoop; 進⾏eventLoop的賦值操作,后續(xù)的IO事件
- ⼯作將在由該eventLoop執(zhí)⾏。
- 2.2 調(diào)⽤register0(promise)中的doRegister()進⾏實際的注冊
- 3 io.netty.channel.nio.AbstractNioChannel#doRegister進⾏了⽅法實現(xiàn)
- //通過jdk底層進⾏注冊多路復(fù)⽤器
- //javaChannel() --前⾯創(chuàng)建的channel
- //eventLoop().unwrappedSelector() -- 獲取selector
- //注冊感興趣的事件為0,表明沒有感興趣的事件,后⾯會進⾏重新注冊事件
- //將this對象以attachment的形式注冊到selector,⽅便后⾯拿到當前對象的內(nèi)容
- selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
4. 綁定端口
- 1 ⼊⼝在io.netty.bootstrap.AbstractBootstrap#doBind0(),啟動⼀個線程進⾏執(zhí)⾏綁定端⼝操作
- 2 調(diào)⽤io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress,
- io.netty.channel.ChannelPromise)⽅法,再次啟動線程執(zhí)⾏
- 3 最終調(diào)⽤io.netty.channel.socket.nio.NioServerSocketChannel#doBind()⽅法進⾏綁定操作
- //通過jdk底層的channel進⾏綁定
- @SuppressJava6Requirement(reason = "Usage guarded by java version check")
- @Override
- protected void doBind(SocketAddress localAddress) throws Exception {
- if (PlatformDependent.javaVersion() >= 7) {
- javaChannel().bind(localAddress, config.getBacklog());
- } else {
- javaChannel().socket().bind(localAddress,
- config.getBacklog());
- }
- }
什么時候進⾏更新selector的主從事件?最終在io.netty.channel.nio.AbstractNioChannel#doBeginRead()⽅法中完成的
- protected void doBeginRead() throws Exception {
- // Channel.read() or ChannelHandlerContext.read() was called
- final SelectionKey selectionKey = this.selectionKey;
- if (!selectionKey.isValid()) {
- return;
- }
- readPending = true;
- final int interestOps = selectionKey.interestOps();
- if ((interestOps & readInterestOp) == 0) {
- selectionKey.interestOps(interestOps | readInterestOp); //設(shè)置
- 感興趣的事件為OP_ACCEPT
- }
- }
- //在NioServerSocketChannel的構(gòu)造⽅法中進⾏了賦值
- public NioServerSocketChannel(ServerSocketChannel channel) {
- super(null, channel, SelectionKey.OP_ACCEPT);
- config = new NioServerSocketChannelConfig(this,
- javaChannel().socket());
- }
2.2 連接請求過程源碼刨析
1. 新連接接入
- 入口在
- io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey,
- io.netty.channel.nio.AbstractNioChannel)中
- 進⼊NioMessageUnsafe的read()⽅法
- 調(diào)⽤io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages() ⽅法,創(chuàng)建
- jdk底層的channel,封裝成NioSocketChannel添加到List容器中
- @Override
- protected int doReadMessages(List<Object> buf) throws Exception {
- SocketChannel ch = SocketUtils.accept(javaChannel());
- try {
- if (ch != null) {
- buf.add(new NioSocketChannel(this, ch));
- return 1;
- }
- } catch (Throwable t) {
- logger.warn("Failed to create a new channel from an
- accepted socket.", t);
- try {
- ch.close();
- } catch (Throwable t2) {
- logger.warn("Failed to close a socket.", t2);
- }
- }
- return 0;
- }
- 創(chuàng)建NioSocketChannel對象
- new NioSocketChannel(this, ch),通過new的⽅式進⾏創(chuàng)建
- 調(diào)⽤super的構(gòu)造⽅法
- 傳⼊SelectionKey.OP_READ事件標識
- 創(chuàng)建id、unsafe、pipeline對象
- 設(shè)置⾮阻塞 ch.configureBlocking(false);
- 創(chuàng)建NioSocketChannelConfig對象
2. 注冊讀事件
- 在io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe中的:
- for (int i = 0; i < size; i ++) {
- readPending = false;
- pipeline.fireChannelRead(readBuf.get(i)); //傳播讀事件
- }
- 在io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)⽅
- 法中
- private void invokeChannelRead(Object msg) {
- if (invokeHandler()) {
- try {
- //執(zhí)⾏channelRead,需要注意的是,第⼀次執(zhí)⾏是HeadHandler,第⼆次是
- ServerBootstrapAcceptor
- //通過ServerBootstrapAcceptor進⼊和 新連接接⼊的 注冊selector相同的
- 邏輯進⾏注冊以及事件綁定
- ((ChannelInboundHandler) handler()).channelRead(this, msg);
- } catch (Throwable t) {
- invokeExceptionCaught(t);
- }
- } else {
- fireChannelRead(msg);
- }
- }
03三 使用Netty優(yōu)化點
在使用Netty,一些簡單的建議點。值得看看。
3.1 零拷貝
- 1 Bytebuf使⽤⽤池化的DirectBuffer類型,不需要進⾏字節(jié)緩沖區(qū)的⼆次拷⻉。如果使⽤堆內(nèi)存,JVM會先拷⻉到堆內(nèi),再寫⼊Socket,就多了⼀次拷⻉。
- 2 CompositeByteBuf將多個ByteBuf封裝成⼀個ByteBuf,在添加ByteBuf時不需要進程拷⻉。
- 3 Netty的⽂件傳輸類DefaultFileRegion的transferTo⽅法將⽂件發(fā)送到⽬標channel中,不需要進⾏循環(huán)拷⻉,提升了性能。
3.2 EventLoop的任務(wù)調(diào)度
- channel.eventLoop().execute(new Runnable() {
- @Override
- public void run() {
- channel.writeAndFlush(data)
- }
- });
而不是使用hannel.writeAndFlush(data);EventLoop的任務(wù)調(diào)度直接放入到channel所對應(yīng)的EventLoop的執(zhí)行隊列,后者會導(dǎo)致線程切換。備注:在writeAndFlush的底層,如果沒有通過eventLoop執(zhí)行的話,就會啟動新的線程。
3.3 減少ChannelPipline的調(diào)⽤⻓度
- public class YourHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void channelActive(ChannelHandlerContext ctx) {
- //msg從整個ChannelPipline中⾛⼀遍,所有的handler都要經(jīng)過。
- ctx.channel().writeAndFlush(msg);
- //從當前handler⼀直到pipline的尾部,調(diào)⽤更短。
- ctx.writeAndFlush(msg);
- }
- }
3.4 減少ChannelHandler的創(chuàng)建(基本上不會配置)
如果channelhandler是⽆狀態(tài)的(即不需要保存任何狀態(tài)參數(shù)),那么使⽤Sharable注解,并在 bootstrap時只創(chuàng)建⼀個實例,減少GC。否則每次連接都會new出handler對象。
- @ChannelHandler.Shareable
- public class StatelessHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void channelActive(ChannelHandlerContext ctx) {}
- }
- public class MyInitializer extends ChannelInitializer<Channel> {
- private static final ChannelHandler INSTANCE = new StatelessHandler();
- @Override
- public void initChannel(Channel ch) {
- ch.pipeline().addLast(INSTANCE);
- }
- }
注意:
ByteToMessageDecoder之類的編解碼器是有狀態(tài)的,不能使⽤Sharable注解。
3.5 配置參數(shù)的設(shè)置
- 服務(wù)端的bossGroup只需要設(shè)置為1即可,因為ServerSocketChannel在初始化階段,只會
- 注冊到某⼀個eventLoop上,⽽這個eventLoop只會有⼀個線程在運⾏,所以沒有必要設(shè)置為
- 多線程。⽽ IO 線程,為了充分利⽤ CPU,同時考慮減少線上下⽂切換的開銷,通常workGroup
- 設(shè)置為CPU核數(shù)的兩倍,這也是Netty提供的默認值。
- 在對于響應(yīng)時間有⾼要求的場景,使⽤.childOption(ChannelOption.TCP_NODELAY, true)
- 和.option(ChannelOption.TCP_NODELAY, true)來禁⽤nagle算法,不等待,⽴即發(fā)送。
Netty相關(guān)的知識點也算是分享完畢了。后續(xù)我仍然分享Netty相關(guān)內(nèi)容。主要是在工作中遇到問題,和新的感悟。