自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

TCP通信過程中遇到粘包拆包解決全過程

網(wǎng)絡(luò) 通信技術(shù)
在使用TCP協(xié)議進行通信時,聽到最多的也就是粘包和拆包問題。本文就來看看,如何解決粘包和拆包問題。

[[359421]]

 在使用TCP協(xié)議進行通信時,聽到最多的也就是粘包和拆包問題。本文就來看看,如何解決粘包和拆包問題。

01一 TCP的粘包/拆包的問題以及解決

在解決TCP粘包和拆包,我們先看看一種思想。來看看讀取一個Int數(shù)據(jù)的Demo,體會下這種思想。

1.1 ReplayingDecoder

1. 自定義解碼器,從ByteBuf讀取一個Int。(重點,一定要看懂這段代碼)

  1. public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder { 
  2.     @Override 
  3.     protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { 
  4.       if (buf.readableBytes() < 4) { 
  5.         return
  6.       } 
  7.       buf.markReaderIndex();//標記下當前讀指針。 
  8.       int length = buf.readInt();//從ByteBuf讀出一個int 
  9.       if (buf.readableBytes() < length) { 
  10.         buf.resetReaderIndex();//恢復(fù)到剛才標記的讀指針 
  11.         return;  
  12.       } 
  13.       out.add(buf.readBytes(length)); 
  14.     } 

2. 使用ReplayingDecoder進行優(yōu)化()

  1. public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> { 
  2.     protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { 
  3.        out.add(buf.readBytes(buf.readInt())); 
  4.     } 

3. ReplayingDecoder使用說明(重點,要理解的)

  • a 使⽤了特殊的ByteBuf,叫做ReplayingDecoderByteBuf,擴展了ByteBuf
  • b 重寫了ByteBuf的readXxx()等⽅法,會先檢查可讀字節(jié)⻓度,⼀旦檢測到不滿⾜要求就直接拋出REPLAY(REPLAY繼承ERROR)
  • c ReplayingDecoder重寫了ByteToMessageDecoder的callDecode()⽅法,捕獲Signal并在catch塊中重置ByteBuf的readerIndex。
  • d 繼續(xù)等待數(shù)據(jù),直到有了數(shù)據(jù)后繼續(xù)讀取,這樣就可以保證讀取到需要讀取的數(shù)據(jù)。
  • e 類定義中的泛型S是⼀個⽤于記錄解碼狀態(tài)的狀態(tài)機枚舉類,在state(S s)、checkpoint(S s)等⽅法中會⽤到。在簡單解碼時也可以⽤java.lang.Void來占位。

總結(jié):

ReplayingDecoder是ByteToMessageDecoder的子類,擴展了ByteBuf。從寫了readXxx()等⽅法,當前ByteBuf中數(shù)據(jù)小于代取數(shù)據(jù),等待數(shù)據(jù)滿足,才能取數(shù)據(jù)。就可以省略手動實現(xiàn)這段代碼。

4. 注意

  1. 1 buffer的部分操作(readBytes(ByteBuffer dst)、retain()、release()等⽅法會直接拋出異常) 
  2. 2 在某些情況下會影響性能(如多次對同⼀段消息解碼) 

繼承ReplayingDecoder,錯誤示例和修改

  1. //這是⼀個錯誤的例⼦: 
  2. //消息中包含了2個integer,代碼中decode⽅法會被調(diào)⽤兩次,此時隊列size不等于2,這段代碼達不到期望結(jié)果。 
  3. public class MyDecoder extends ReplayingDecoder<Void> { 
  4.     private final Queue<Integervalues = new LinkedList<Integer>(); 
  5.     @Override 
  6.     public void decode(ByteBuf buf, List<Object> out) throws Exception { 
  7.         // A message contains 2 integers. 
  8.         values.offer(buf.readInt()); 
  9.         values.offer(buf.readInt()); 
  10.         assert values.size() == 2; 
  11.         out.add(values.poll() + values.poll()); 
  12.     } 

  1. //正確的做法: 
  2. public class MyDecoder extends ReplayingDecoder<Void> { 
  3.     private final Queue<Integervalues = new LinkedList<Integer>(); 
  4.     @Override 
  5.     public void decode(ByteBuf buf, List<Object> out) throws Exception { 
  6.         // Revert the state of the variable that might have been changed 
  7.         // since the last partial decode. 
  8.         values.clear(); 
  9.         // A message contains 2 integers. 
  10.         values.offer(buf.readInt()); 
  11.         values.offer(buf.readInt()); 
  12.         // Now we know this assertion will never fail. 
  13.         assert values.size() == 2; 
  14.         out.add(values.poll() + values.poll()); 
  15.     } 
  16. }  

ByteToIntegerDecoder2的實現(xiàn)

  1. public class ByteToIntegerDecoder2 extends ReplayingDecoder<Void> { 
  2.     /** 
  3.     * @param ctx 上下⽂ 
  4.     * @param in 輸⼊的ByteBuf消息數(shù)據(jù) 
  5.     * @param out 轉(zhuǎn)化后輸出的容器 
  6.     * @throws Exception 
  7.     */ 
  8.     @Override 
  9.     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { 
  10.      out.add(in.readInt()); //讀取到int類型數(shù)據(jù),放⼊到輸出,完成數(shù)據(jù)類型的轉(zhuǎn)化 
  11.     } 

1.2 拆包和粘包問題重現(xiàn)(客戶端向服務(wù)端發(fā)送十條數(shù)據(jù))

1. 客戶端啟動類

  1. public class NettyClient { 
  2.     public static void main(String[] args) throws Exception{ 
  3.         EventLoopGroup worker = new NioEventLoopGroup(); 
  4.         try { 
  5.             // 服務(wù)器啟動類 
  6.             Bootstrap bootstrap = new Bootstrap(); 
  7.             bootstrap.group(worker); 
  8.             bootstrap.channel(NioSocketChannel.class); 
  9.             bootstrap.handler(new ChannelInitializer<SocketChannel>() { 
  10.                 @Override 
  11.                 protected void initChannel(SocketChannel ch) throws Exception { 
  12.                      ch.pipeline().addLast(new ClientHandler()); 
  13.                 } 
  14.          }); 
  15.             ChannelFuture future = bootstrap.connect("127.0.0.1", 5566).sync(); 
  16.             future.channel().closeFuture().sync(); 
  17.         } finally { 
  18.          worker.shutdownGracefully(); 
  19.         } 
  20.     } 

2. 客戶端ClientHandler

  1. public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { 
  2.     private int count
  3.     @Override 
  4.     protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 
  5.       System.out.println("接收到服務(wù)端的消息:" + 
  6.       msg.toString(CharsetUtil.UTF_8)); 
  7.       System.out.println("接收到服務(wù)端的消息數(shù)量:" + (++count)); 
  8.     } 
  9.     @Override 
  10.     public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  11.       for (int i = 0; i < 10; i++) { 
  12.       ctx.writeAndFlush(Unpooled.copiedBuffer("from client a message!"
  13.       CharsetUtil.UTF_8)); 
  14.       } 
  15.     } 
  16.     @Override 
  17.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
  18.       cause.printStackTrace(); 
  19.       ctx.close(); 
  20.     } 

3. 服務(wù)端NettyServer

  1. public class NettyServer { 
  2.     public static void main(String[] args) throws Exception { 
  3.       // 主線程,不處理任何業(yè)務(wù)邏輯,只是接收客戶的連接請求 
  4.       EventLoopGroup boss = new NioEventLoopGroup(1); 
  5.       // ⼯作線程,線程數(shù)默認是:cpu*2 
  6.       EventLoopGroup worker = new NioEventLoopGroup(); 
  7.       try { 
  8.         // 服務(wù)器啟動類 
  9.         ServerBootstrap serverBootstrap = new ServerBootstrap(); 
  10.         serverBootstrap.group(boss, worker); 
  11.         //配置server通道 
  12.         serverBootstrap.channel(NioServerSocketChannel.class); 
  13.         serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { 
  14.           @Override 
  15.           protected void initChannel(SocketChannel ch) throws Exception { 
  16.             ch.pipeline() 
  17.             .addLast(new ServerHandler()); 
  18.           } 
  19.         }); //worker線程的處理器 
  20.         ChannelFuture future = serverBootstrap.bind(5566).sync(); 
  21.         System.out.println("服務(wù)器啟動完成。。。。。"); 
  22.         //等待服務(wù)端監(jiān)聽端⼝關(guān)閉 
  23.         future.channel().closeFuture().sync(); 
  24.       } finally { 
  25.         //優(yōu)雅關(guān)閉 
  26.         boss.shutdownGracefully(); 
  27.         worker.shutdownGracefully(); 
  28.       } 
  29.     } 

4. ServerHandler

  1. public class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> { 
  2.       private int count
  3.       @Override 
  4.       protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 
  5.           System.out.println("服務(wù)端接收到消息:" + 
  6.           msg.toString(CharsetUtil.UTF_8)); 
  7.           System.out.println("服務(wù)端接收到消息數(shù)量:" + (++count)); 
  8.           ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8)); 
  9.       } 

 

1.3 什么是TCP的粘包和拆包問題

TCP是流傳遞的,所謂流,就是沒有界限的數(shù)據(jù)。服務(wù)端接受客戶端數(shù)據(jù),并不知道是一條還是多條。服務(wù)端在讀取數(shù)據(jù)的時候會出現(xiàn)粘包問題。

因此服務(wù)端和客戶端進行數(shù)據(jù)傳遞的時候,要制定好拆包規(guī)則??蛻舳税凑赵撘?guī)則進行粘包,服務(wù)端按照該規(guī)則拆包。如果有任意違背該規(guī)則,服務(wù)端就不能拿到預(yù)期的數(shù)據(jù)。

1. 解決思路(三種)

  • 1. 在發(fā)送的數(shù)據(jù)包中添加頭,在頭⾥存儲數(shù)據(jù)的⼤⼩,服務(wù)端就可以按照此⼤⼩來讀取數(shù)據(jù),這樣就知道界限在哪⾥了。
  • 2. 以固定的⻓度發(fā)送數(shù)據(jù),超出的分多次發(fā)送,不⾜的以0填充,接收端就以固定⻓度接收即可。
  • 3. 在數(shù)據(jù)包之間設(shè)置邊界,如添加特殊符號,這樣,接收端通過這個邊界就可以將不同的數(shù)據(jù)包拆分開。

1.4 實戰(zhàn):解決TCP的粘包/拆包問題

1. 自定義協(xié)議

  1. public class MyProtocol { 
  2.     private Integer length; //數(shù)據(jù)頭:⻓度 
  3.     private byte[] body; //數(shù)據(jù)體 
  4.     public Integer getLength() { 
  5.      return length; 
  6.     } 
  7.     public void setLength(Integer length) { 
  8.      this.length = length; 
  9.     } 
  10.     public byte[] getBody() { 
  11.      return body; 
  12.     } 
  13.     public void setBody(byte[] body) { 
  14.      this.body = body; 
  15.     } 

2. 編碼器

  1. public class MyEncoder extends MessageToByteEncoder<MyProtocol> { 
  2.   @Override 
  3.   protected void encode(ChannelHandlerContext ctx, MyProtocol msg, ByteBuf out) throws Exception { 
  4.     out.writeInt(msg.getLength()); 
  5.     out.writeBytes(msg.getBody()); 
  6.   } 

3. 解碼器

  1. public class MyDecoder extends ReplayingDecoder<Void> { 
  2.     @Override 
  3.     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { 
  4.         int length = in.readInt(); //獲取⻓度 
  5.         byte[] data = new byte[length]; //根據(jù)⻓度定義byte數(shù)組 
  6.         in.readBytes(data); //讀取數(shù)據(jù) 
  7.         MyProtocol myProtocol = new MyProtocol(); 
  8.         myProtocol.setLength(length); 
  9.         myProtocol.setBody(data); 
  10.         out.add(myProtocol); 
  11.     } 

4. 客戶端ClientHandler

  1. public class ClientHandler extends SimpleChannelInboundHandler<MyProtocol> { 
  2.     private int count
  3.     @Override 
  4.     protected void channelRead0(ChannelHandlerContext ctx, MyProtocol msg) throws Exception { 
  5.         System.out.println("接收到服務(wù)端的消息:" + new String(msg.getBody(), 
  6.         CharsetUtil.UTF_8)); 
  7.         System.out.println("接收到服務(wù)端的消息數(shù)量:" + (++count)); 
  8.     } 
  9.     @Override 
  10.     public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  11.         for (int i = 0; i < 10; i++) { 
  12.             byte[] data = "from client a message!".getBytes(CharsetUtil.UTF_8); 
  13.             MyProtocol myProtocol = new MyProtocol(); 
  14.             myProtocol.setLength(data.length); 
  15.             myProtocol.setBody(data); 
  16.             ctx.writeAndFlush(myProtocol); 
  17.         } 
  18.     } 
  19.     @Override 
  20.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception { 
  21.       cause.printStackTrace(); 
  22.       ctx.close(); 
  23.     } 

5. NettyClient

  1. public class NettyClient { 
  2.     public static void main(String[] args) throws Exception{ 
  3.         EventLoopGroup worker = new NioEventLoopGroup(); 
  4.         try { 
  5.             // 服務(wù)器啟動類 
  6.             Bootstrap bootstrap = new Bootstrap(); 
  7.             bootstrap.group(worker); 
  8.             bootstrap.channel(NioSocketChannel.class); 
  9.             bootstrap.handler(new ChannelInitializer<SocketChannel>() { 
  10.               @Override 
  11.               protected void initChannel(SocketChannel ch) throws Exception { 
  12.                 ch.pipeline().addLast(new MyEncoder()); 
  13.                 ch.pipeline().addLast(new MyDecoder()); 
  14.                 ch.pipeline().addLast(new ClientHandler()); 
  15.               } 
  16.             }); 
  17.             ChannelFuture future = bootstrap.connect("127.0.0.1", 5566).sync(); 
  18.             future.channel().closeFuture().sync(); 
  19.         } finally { 
  20.          worker.shutdownGracefully(); 
  21.         } 
  22.     } 

6. ServerHandler

  1. public class ServerHandler extends SimpleChannelInboundHandler<MyProtocol> { 
  2.     private int count
  3.     @Override 
  4.     protected void channelRead0(ChannelHandlerContext ctx, MyProtocol msg) throws Exception { 
  5.         System.out.println("服務(wù)端接收到消息:" + new String(msg.getBody(), 
  6.         CharsetUtil.UTF_8)); 
  7.         System.out.println("服務(wù)端接收到消息數(shù)量:" + (++count)); 
  8.         byte[] data = "ok".getBytes(CharsetUtil.UTF_8); 
  9.         MyProtocol myProtocol = new MyProtocol(); 
  10.         myProtocol.setLength(data.length); 
  11.         myProtocol.setBody(data); 
  12.         ctx.writeAndFlush(myProtocol); 
  13.     } 

7. NettyServer

  1. public class NettyServer { 
  2.     public static void main(String[] args) throws Exception { 
  3.         // 主線程,不處理任何業(yè)務(wù)邏輯,只是接收客戶的連接請求 
  4.         EventLoopGroup boss = new NioEventLoopGroup(1); 
  5.         // ⼯作線程,線程數(shù)默認是:cpu*2 
  6.         EventLoopGroup worker = new NioEventLoopGroup(); 
  7.         try { 
  8.             // 服務(wù)器啟動類 
  9.             ServerBootstrap serverBootstrap = new ServerBootstrap(); 
  10.             serverBootstrap.group(boss, worker); 
  11.             //配置server通道 
  12.             serverBootstrap.channel(NioServerSocketChannel.class); 
  13.             serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { 
  14.               @Override 
  15.               protected void initChannel(SocketChannel ch) throws Exception { 
  16.                   ch.pipeline() 
  17.                   .addLast(new MyDecoder()) 
  18.                   .addLast(new MyEncoder()) 
  19.                   .addLast(new ServerHandler()); 
  20.               } 
  21.             }); //worker線程的處理器 
  22.             ChannelFuture future = serverBootstrap.bind(5566).sync(); 
  23.             System.out.println("服務(wù)器啟動完成。。。。。"); 
  24.             //等待服務(wù)端監(jiān)聽端⼝關(guān)閉 
  25.             future.channel().closeFuture().sync(); 
  26.         } finally { 
  27.             //優(yōu)雅關(guān)閉 
  28.             boss.shutdownGracefully(); 
  29.             worker.shutdownGracefully(); 
  30.         } 
  31.     } 

8. 測試

02二 Netty核心源碼解析

2.1 服務(wù)端啟動過程刨析

1. 創(chuàng)建服務(wù)端Channel

  1. 1 ServerBootstrap對象的bind()⽅法,也是⼊⼝⽅法 
  2. 2 AbstractBootstrap中的initAndRegister()進⾏創(chuàng)建Channel 
  3.      創(chuàng)建Channel的⼯作由ReflectiveChannelFactory反射類中的newChannel()⽅法完成。 
  4. 3 NioServerSocketChannel中的構(gòu)造⽅法中,通過jdk nio底層的SelectorProvider打開ServerSocketChannel。 
  5. 4 在AbstractNioChannel的構(gòu)造⽅法中,設(shè)置channel為⾮阻塞:ch.configureBlocking(false); 
  6. 5 通過的AbstractChannel的構(gòu)造⽅法,創(chuàng)建了id、unsafe、pipeline內(nèi)容。 
  7. 6 通過NioServerSocketChannelConfig獲取tcp底層的⼀些參數(shù) 

2. 初始化服務(wù)端Channel

  1. 1 AbstractBootstrap中的initAndRegister()進⾏初始化channel,代碼:init(channel); 
  2.  
  3. 2 在ServerBootstrap中的init()⽅法設(shè)置channelOptions以及Attributes。 
  4.  
  5. 3 緊接著,將⽤戶⾃定義參數(shù)、屬性保存到局部變量currentChildOptions、currentChildAttrs,以 
  6.   供后⾯使⽤ 
  7.  
  8. 4 如果設(shè)置了serverBootstrap.handler()的話,會加⼊到pipeline中。 
  9.  
  10. 5 添加連接器ServerBootstrapAcceptor,有新連接加⼊后,將⾃定義的childHandler加⼊到連接的 
  11.   pipeline中: 

  1. ch.eventLoop().execute(new Runnable() { 
  2.     @Override 
  3.     public void run() { 
  4.       pipeline.addLast( 
  5.         new ServerBootstrapAcceptor(ch, currentChildGroup,currentChildHandler, currentChildOptions, currentChildAttrs)); 
  6.     } 
  7. }); 

  1. @Override 
  2. @SuppressWarnings("unchecked"
  3. public void channelRead(ChannelHandlerContext ctx, Object msg) {  
  4.     //當客戶端有連接時才會執(zhí)⾏ 
  5.     final Channel child = (Channel) msg; 
  6.     //將⾃定義的childHandler加⼊到連接的pipeline中 
  7.     child.pipeline().addLast(childHandler);  
  8.     setChannelOptions(child, childOptions, logger); 
  9.     setAttributes(child, childAttrs); 
  10.     try { 
  11.         childGroup.register(child).addListener(new ChannelFutureListener(){ 
  12.             @Override 
  13.             public void operationComplete(ChannelFuture future) throws Exception { 
  14.                 if (!future.isSuccess()) { 
  15.                  forceClose(child, future.cause()); 
  16.                 } 
  17.             } 
  18.         }); 
  19.     } catch (Throwable t) { 
  20.      forceClose(child, t); 
  21.     } 

3. 注冊selector

  1. //進⾏注冊 
  2. 1 initAndRegister()⽅法中的ChannelFuture regFuture = config().group().register(channel);  
  3.  
  4. 2 在io.netty.channel.AbstractChannel.AbstractUnsafe#register()中完成實際的注冊 
  5.     2.1 AbstractChannel.this.eventLoop = eventLoop; 進⾏eventLoop的賦值操作,后續(xù)的IO事件 
  6.         ⼯作將在由該eventLoop執(zhí)⾏。 
  7.      2.2 調(diào)⽤register0(promise)中的doRegister()進⾏實際的注冊 
  8.      
  9. 3 io.netty.channel.nio.AbstractNioChannel#doRegister進⾏了⽅法實現(xiàn) 

  1. //通過jdk底層進⾏注冊多路復(fù)⽤器 
  2. //javaChannel() --前⾯創(chuàng)建的channel 
  3. //eventLoop().unwrappedSelector() -- 獲取selector 
  4. //注冊感興趣的事件為0,表明沒有感興趣的事件,后⾯會進⾏重新注冊事件 
  5. //將this對象以attachment的形式注冊到selector,⽅便后⾯拿到當前對象的內(nèi)容 
  6. selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); 

4. 綁定端口

  1. 1 ⼊⼝在io.netty.bootstrap.AbstractBootstrap#doBind0(),啟動⼀個線程進⾏執(zhí)⾏綁定端⼝操作 
  2.  
  3. 2 調(diào)⽤io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, 
  4.   io.netty.channel.ChannelPromise)⽅法,再次啟動線程執(zhí)⾏ 
  5.    
  6. 3 最終調(diào)⽤io.netty.channel.socket.nio.NioServerSocketChannel#doBind()⽅法進⾏綁定操作 

  1. //通過jdk底層的channel進⾏綁定 
  2. @SuppressJava6Requirement(reason = "Usage guarded by java version check"
  3.     @Override 
  4.     protected void doBind(SocketAddress localAddress) throws Exception { 
  5.       if (PlatformDependent.javaVersion() >= 7) { 
  6.          javaChannel().bind(localAddress, config.getBacklog()); 
  7.       } else { 
  8.           javaChannel().socket().bind(localAddress, 
  9.           config.getBacklog()); 
  10.     } 

什么時候進⾏更新selector的主從事件?最終在io.netty.channel.nio.AbstractNioChannel#doBeginRead()⽅法中完成的

  1. protected void doBeginRead() throws Exception { 
  2.     // Channel.read() or ChannelHandlerContext.read() was called 
  3.     final SelectionKey selectionKey = this.selectionKey; 
  4.     if (!selectionKey.isValid()) { 
  5.      return
  6.     } 
  7.     readPending = true
  8.     final int interestOps = selectionKey.interestOps(); 
  9.     if ((interestOps & readInterestOp) == 0) { 
  10.         selectionKey.interestOps(interestOps | readInterestOp); //設(shè)置 
  11.         感興趣的事件為OP_ACCEPT 
  12.     } 
  13. //在NioServerSocketChannel的構(gòu)造⽅法中進⾏了賦值 
  14. public NioServerSocketChannel(ServerSocketChannel channel) { 
  15.       super(null, channel, SelectionKey.OP_ACCEPT); 
  16.       config = new NioServerSocketChannelConfig(this, 
  17.       javaChannel().socket()); 

2.2 連接請求過程源碼刨析

1. 新連接接入

  1. 入口在 
  2. io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, 
  3. io.netty.channel.nio.AbstractNioChannel)中 
  4.     進⼊NioMessageUnsafe的read()⽅法 
  5.      
  6.     調(diào)⽤io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages() ⽅法,創(chuàng)建 
  7.     jdk底層的channel,封裝成NioSocketChannel添加到List容器中 

  1. @Override 
  2. protected int doReadMessages(List<Object> buf) throws Exception { 
  3.     SocketChannel ch = SocketUtils.accept(javaChannel()); 
  4.     try { 
  5.         if (ch != null) { 
  6.           buf.add(new NioSocketChannel(this, ch)); 
  7.           return 1; 
  8.         } 
  9.     } catch (Throwable t) { 
  10.         logger.warn("Failed to create a new channel from an 
  11.         accepted socket.", t); 
  12.         try { 
  13.          ch.close(); 
  14.         } catch (Throwable t2) { 
  15.             logger.warn("Failed to close a socket.", t2); 
  16.         } 
  17.     } 
  18.     return 0; 

  1. 創(chuàng)建NioSocketChannel對象 
  2. new NioSocketChannel(this, ch),通過new的⽅式進⾏創(chuàng)建 
  3.     調(diào)⽤super的構(gòu)造⽅法 
  4.         傳⼊SelectionKey.OP_READ事件標識 
  5.         創(chuàng)建id、unsafe、pipeline對象 
  6.         設(shè)置⾮阻塞 ch.configureBlocking(false); 
  7.     創(chuàng)建NioSocketChannelConfig對象 

2. 注冊讀事件

  1. 在io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe中的: 
  2. for (int i = 0; i < size; i ++) { 
  3.     readPending = false
  4.     pipeline.fireChannelRead(readBuf.get(i)); //傳播讀事件 

  1.  在io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)⽅ 
  2.  法中 
  3. private void invokeChannelRead(Object msg) { 
  4.     if (invokeHandler()) { 
  5.       try { 
  6.           //執(zhí)⾏channelRead,需要注意的是,第⼀次執(zhí)⾏是HeadHandler,第⼆次是 
  7.           ServerBootstrapAcceptor 
  8.           //通過ServerBootstrapAcceptor進⼊和 新連接接⼊的 注冊selector相同的 
  9.           邏輯進⾏注冊以及事件綁定 
  10.           ((ChannelInboundHandler) handler()).channelRead(this, msg); 
  11.       } catch (Throwable t) { 
  12.          invokeExceptionCaught(t); 
  13.       } 
  14.     } else { 
  15.      fireChannelRead(msg); 
  16.     } 

03三 使用Netty優(yōu)化點

在使用Netty,一些簡單的建議點。值得看看。

3.1 零拷貝

  • 1 Bytebuf使⽤⽤池化的DirectBuffer類型,不需要進⾏字節(jié)緩沖區(qū)的⼆次拷⻉。如果使⽤堆內(nèi)存,JVM會先拷⻉到堆內(nèi),再寫⼊Socket,就多了⼀次拷⻉。
  • 2 CompositeByteBuf將多個ByteBuf封裝成⼀個ByteBuf,在添加ByteBuf時不需要進程拷⻉。
  • 3 Netty的⽂件傳輸類DefaultFileRegion的transferTo⽅法將⽂件發(fā)送到⽬標channel中,不需要進⾏循環(huán)拷⻉,提升了性能。

3.2 EventLoop的任務(wù)調(diào)度

  1. channel.eventLoop().execute(new Runnable() { 
  2.     @Override 
  3.     public void run() { 
  4.      channel.writeAndFlush(data) 
  5.     } 
  6. }); 

而不是使用hannel.writeAndFlush(data);EventLoop的任務(wù)調(diào)度直接放入到channel所對應(yīng)的EventLoop的執(zhí)行隊列,后者會導(dǎo)致線程切換。備注:在writeAndFlush的底層,如果沒有通過eventLoop執(zhí)行的話,就會啟動新的線程。

3.3 減少ChannelPipline的調(diào)⽤⻓度

  1. public class YourHandler extends ChannelInboundHandlerAdapter { 
  2.     @Override 
  3.     public void channelActive(ChannelHandlerContext ctx) { 
  4.     //msg從整個ChannelPipline中⾛⼀遍,所有的handler都要經(jīng)過。 
  5.     ctx.channel().writeAndFlush(msg); 
  6.     //從當前handler⼀直到pipline的尾部,調(diào)⽤更短。 
  7.     ctx.writeAndFlush(msg); 
  8.     } 

3.4 減少ChannelHandler的創(chuàng)建(基本上不會配置)

如果channelhandler是⽆狀態(tài)的(即不需要保存任何狀態(tài)參數(shù)),那么使⽤Sharable注解,并在 bootstrap時只創(chuàng)建⼀個實例,減少GC。否則每次連接都會new出handler對象。

  1. @ChannelHandler.Shareable 
  2. public class StatelessHandler extends ChannelInboundHandlerAdapter { 
  3.     @Override 
  4.     public void channelActive(ChannelHandlerContext ctx) {} 
  5. public class MyInitializer extends ChannelInitializer<Channel> { 
  6.     private static final ChannelHandler INSTANCE = new StatelessHandler(); 
  7.     @Override 
  8.     public void initChannel(Channel ch) { 
  9.      ch.pipeline().addLast(INSTANCE); 
  10.     } 

注意:

ByteToMessageDecoder之類的編解碼器是有狀態(tài)的,不能使⽤Sharable注解。

3.5 配置參數(shù)的設(shè)置

  1. 服務(wù)端的bossGroup只需要設(shè)置為1即可,因為ServerSocketChannel在初始化階段,只會 
  2. 注冊到某⼀個eventLoop上,⽽這個eventLoop只會有⼀個線程在運⾏,所以沒有必要設(shè)置為 
  3. 多線程。⽽ IO 線程,為了充分利⽤ CPU,同時考慮減少線上下⽂切換的開銷,通常workGroup 
  4. 設(shè)置為CPU核數(shù)的兩倍,這也是Netty提供的默認值。 
  5.  
  6. 在對于響應(yīng)時間有⾼要求的場景,使⽤.childOption(ChannelOption.TCP_NODELAY, true
  7. 和.option(ChannelOption.TCP_NODELAY, true)來禁⽤nagle算法,不等待,⽴即發(fā)送。 

Netty相關(guān)的知識點也算是分享完畢了。后續(xù)我仍然分享Netty相關(guān)內(nèi)容。主要是在工作中遇到問題,和新的感悟。

 

責任編輯:姜華 來源: 花花和Java
相關(guān)推薦

2019-10-17 11:06:32

TCP粘包通信協(xié)議

2020-01-06 15:23:41

NettyTCP粘包

2024-12-19 11:00:00

TCP網(wǎng)絡(luò)通信粘包

2021-07-15 10:35:16

NettyTCPJava

2021-03-09 22:30:47

TCP拆包協(xié)議

2011-02-22 10:46:02

Samba配置

2022-04-28 08:38:09

TCP協(xié)議解碼器

2011-04-18 15:56:10

軟件測試

2009-12-08 17:56:16

WCF配置

2011-09-06 15:38:20

QT安裝

2011-01-21 17:51:52

2009-04-13 12:37:18

2009-06-10 16:55:42

cygwin netb安裝

2010-03-01 17:01:03

Python編程技巧

2010-03-10 13:24:45

Zend Debugg

2010-06-17 13:10:09

Linux Grub修

2010-11-19 10:11:49

Oracle物化視圖

2012-11-06 10:19:18

Java自定義加載Java類

2009-04-23 10:04:55

2011-03-11 10:39:02

YUM安裝LAMP
點贊
收藏

51CTO技術(shù)棧公眾號