自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

NioServerSocketChannel的注冊源碼解析

開發(fā) 后端
Netty會回調再初始化NioServerSocketChannel的時候注冊的Channelinitialization, 添加一個新連接接入器ServerBootstrapAcceptor,并刪除本身!

[[410319]]

有道無術,術尚可求也!有術無道,止于術!

我們上一章分析了Netty中NioServerSocketChaennl的創(chuàng)建于初始化,本章節(jié)將繼續(xù)分析NioServerSocketChannel的分析,NioServerSocketChannel是Netty官方封裝的一個通道對象,旨用來代替或者包裝JDK原生的SocketChannel對象,那么他是如何講NioServerSocketChannel于JDK的NIO相關代碼關聯(lián)起來的呢?

一、源碼入口尋找

我們上一節(jié)課主要分析的源碼方法是initAndRegister方法,其實從名字可以看出來,這里是做通道的初始化于注冊的,我們繼續(xù)回到這個方法,該方法的尋找,參照上一章節(jié):

AbstractBootstrap#initAndRegister

我們跳過上節(jié)課已經分析的代碼,直接來到注冊相關的邏輯:

  1. ChannelFuture regFuture = config().group().register(channel); 

我們逐個方法進行分析:

  1. config() 

現(xiàn)在我們創(chuàng)建的ServerBootstrap,所以為什么選這個我就不多說了:

  1. private final ServerBootstrapConfig config = new ServerBootstrapConfig(this); 

我們可以看到,他返回的是這個對象,該對象是再創(chuàng)建ServerBootstrap的時候自動創(chuàng)建的,我們看,他構造方法里面穿了一個this,證明他持有一個ServerBootstrap的引用,這代表著他可以通過這個對象,獲取ServerBootstrap內所有的屬性和方法!獲取到這個類之后干嘛了呢?

  1. config().group() 

估計大家很多都已經猜出來了,我們直接點進group里面去驗證一下:

  1. @SuppressWarnings("deprecation"
  2. public final EventLoopGroup group() { 
  3.     return bootstrap.group(); 

該代碼是獲取到了我們再構建ServerBootstrap的時候設置的bossGroup對象,有興趣的可以追一下,這里比較簡單就不做太多的闡述了,我們繼續(xù)回到主線,

  1. config().group().register(channel); 

我們通過上述代碼的分析,知道了group方法返回的是NioEventLoopGroup,我們進入到register方法:

我們發(fā)現(xiàn)這里并沒有NioEventLoopGroup,但是通過前幾章我們的學習,我們知道NioEventLoopGroup是MultithreadEventLoopGroup的子類,所以我們子類沒有往父類找,我們進入到MultithreadEventLoopGroup源碼里面:

  1. @Override 
  2. public ChannelFuture register(Channel channel) { 
  3.     //一般來說這里獲取的NioEventLoop 他有繼承與  SingleThreadEventLoop 
  4.     return next().register(channel); 

在這里,我們看到了一個我們前面分析過得代碼,next(),他調用的是chooser.next();, chooser是我們在構建NioEventLoopGroup的時候創(chuàng)建的一個執(zhí)行器的選擇器,next方法的功能是輪訓的返回一個線程執(zhí)行器:NioEventLoop!記不太清的同學可以回頭看NioEventLoopGroup初始化源碼解析的那一章代碼!

現(xiàn)在我們根據前幾章的基礎,我們知道了next()方法返回的是一個NioEventLoop類,我們進入到register()方法查看:

但是,我們發(fā)現(xiàn)NioEventLoop相關的實現(xiàn),但是我們根據前面所學,我們可以知道,NioEventLoop的父類是SingleThreadEventLoop,所以我們進入到 SingleThreadEventLoop#register(io.netty.channel.Channel):

  1. @Override 
  2. public ChannelFuture register(Channel channel) { 
  3.     //調用本身的注冊方法 
  4.     return register(new DefaultChannelPromise(channel, this)); 
  5.  
  6. //沒什么可說的繼續(xù)往下追 
  7. @Override 
  8. public ChannelFuture register(final ChannelPromise promise) { 
  9.     ObjectUtil.checkNotNull(promise, "promise"); 
  10.     promise.channel().unsafe().register(this, promise); 
  11.     return promise; 

我們一定能夠猜到,這里的主要代碼是:promise.channel().unsafe().register(this, promise);

我們上一章分析過 unsafe是 NioMessageUnsafe, 但是register卻沒有他的實現(xiàn):

我們還是需要往父類追,進入到io.netty.channel.AbstractChannel.AbstractUnsafe#register(this, promise):

我們這里先關注一下參數(shù) :

this: 傳入的是他本身,他本身是個什么 NioEventLoop,也就是說,他傳入了一個執(zhí)行器

promise:NioServerSocketChannel的包裝對象

我們進入到 register方法中,分析主要代碼:

  1. @Override 
  2. public final void register(EventLoop eventLoop, final ChannelPromise promise) { 
  3.     ......................暫時忽略不必要代碼............................. 
  4.     AbstractChannel.this.eventLoop = eventLoop; 
  5.     //注意此時的thread = null 所以返回false 
  6.     if (eventLoop.inEventLoop()) { 
  7.         //實際的注冊 注冊selector 觸發(fā) handlerAdded事件和 channelRegistered事件 
  8.         register0(promise); 
  9.     } else { 
  10.         .......................暫時忽略不必要代碼...................... 
  11.     } 
  1. AbstractChannel.this.eventLoop = eventLoop; 

首先我們將上一步獲取的執(zhí)行器保存在NioServerSocketChannel中! 這行代碼有力的證明了,每一個Channel綁定一個NioEventLoop對象!

  1. if (eventLoop.inEventLoop()) { 
  2.     //實際的注冊 注冊selector 觸發(fā) handlerAdded事件和 channelRegistered事件 
  3.     register0(promise); 

注意:這里我需要澄清一點,真實的調試過程中,并不會走這個分支,而是會走else分支異步進行注冊,這里為了更方便大家理解,我就依照if分支進行源碼分析,其實沒有太大變化,都是調用register0方法進行注冊,只不過一個同步一個異步,關于異步,是Netty中及其重要的一個知識點,我將放到后面單獨開一章進行講解!

我們進入到register0源碼里面:

  1. private void register0(ChannelPromise promise) { 
  2.     try { 
  3.         ..............忽略代碼.................. 
  4.         //實際的注冊  調用jdk底層的數(shù)據注冊selector 
  5.         // 調用 JDK 底層的 register() 進行注冊 
  6.         //io.netty.channel.nio.AbstractNioChannel.doRegister 
  7.         doRegister(); 
  8.         neverRegistered = false
  9.         registered = true
  10.  
  11.         //通知管道  傳播handlerAdded事件 
  12.         //觸發(fā) handlerAdded 事件 觸發(fā)任務 add事件 
  13.         pipeline.invokeHandlerAddedIfNeeded(); 
  14.  
  15.         safeSetSuccess(promise); 
  16.         //通知管道  傳播channelRegistered事件 
  17.         // 觸發(fā) channelRegistered 事件 
  18.         pipeline.fireChannelRegistered(); 
  19.         // 如果從未注冊過頻道,則僅觸發(fā)channelActive。 
  20.         // 如果取消注冊并重新注冊通道,則多個通道處于活動狀態(tài)。 
  21.         //isActive() 返回false 
  22.         // 此時 Channel 還未注冊綁定地址,所以處于非活躍狀態(tài) 
  23.         if (isActive()) { 
  24.             ....................忽略不必要代碼.................. 
  25.         } 
  26.     } catch (Throwable t) { 
  27.         // 直接關閉通道以避免FD泄漏。 
  28.         closeForcibly(); 
  29.         closeFuture.setClosed(); 
  30.         safeSetFailure(promise, t); 
  31.     } 

二、源碼解析

doRegister();

  1. doRegister(); 

真正的注冊方法,該方法是將Netty本身的NioServerSocket與JDK連接起來的最重要的一個類!

  1. selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); 

javaChannel()方法是返回JDK原生的SocketChannel,他是再NioServerSocketChannel初始化的時候被保存的,還記得我們再講述NIO開發(fā)Socket的時候的流程嗎

我們重點關注一下javaChannel().register的參數(shù):

eventLoop().unwrappedSelector():NioEventLoop再創(chuàng)建的時候,會保存兩個選擇器,一個是JDK的原始的選擇器,一個是經過Netty包裝的選擇器,這里返回的是原生的選擇器!

0:不關注任何事件

this:this代表著當前類,他是NioServerSocketChannel類型的,他將一個NioServerSocketChannel的對象,綁定到了JDK原生的選擇器,后續(xù)只需要通過SelectionKey.attachment(),就能獲取到NioServerSocketChannel,而一個NioServerSocketChannel里面又包含一個JDK原生的Channel對象,就可以基于該jdk原生的Channel來進行各種讀寫操作!

到現(xiàn)在為止,我們就完成JDK中的NIO的將通道綁定到選擇器上,我們回到上一步:

pipeline.invokeHandlerAddedIfNeeded

  1. pipeline.invokeHandlerAddedIfNeeded(); 

開始回調pipeline通道里面添加自定義事件:

  1. final void invokeHandlerAddedIfNeeded() { 
  2.     assert channel.eventLoop().inEventLoop(); 
  3.     if (firstRegistration) { 
  4.         firstRegistration = false
  5.         // 現(xiàn)在,我們已注冊到EventLoop?,F(xiàn)在該調用ChannelHandler的回調了, 
  6.         // 在完成注冊之前添加的內容。 
  7.         callHandlerAddedForAllHandlers(); 
  8.     } 
  9.  
  10. //callHandlerAddedForAllHandlers 
  11. private void callHandlerAddedForAllHandlers() { 
  12.     //task = PendingHandlerAddedTask 
  13.     PendingHandlerCallback task = pendingHandlerCallbackHead; 
  14.     while (task != null) { 
  15.         task.execute(); 
  16.         task = task.next
  17.     } 

需要注意的是 PendingHandlerCallback task 是PendingHandlerAddedTask類型的,他是什么時候加載的呢?實在我們初始化NioServerSocketChannel的時候調用addLast方法的時候被賦的值,有興趣的小伙伴可以自己去跟一下源碼,這里直接進入到:

  1. if (executor.inEventLoop()) { 
  2.     callHandlerAdded0(ctx); 
  3.  
  4. //進入到 callHandlerAdded0源碼邏輯 
  5. private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { 
  6.     try{ 
  7.         ctx.callHandlerAdded(); 
  8.     } 
  9.     ....................... 
  10.  
  11. //進入到ctx.callHandlerAdded(); 
  12. final void callHandlerAdded() throws Exception { 
  13.     if (setAddComplete()) { 
  14.         handler().handlerAdded(this); 
  15.     } 

還接記得handler()嗎,我再NioServerSocketChannel初始化的時候說過,當時程序向pipeline中添加了一個ChannelInitializer,這里返回的就是那個ChannelInitializer! 我們進入到ChannelInitializer#handlerAdded方法里面:

  1. @Override 
  2. public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 
  3.     if (ctx.channel().isRegistered()) { 
  4.         if (initChannel(ctx)) { 
  5.             removeState(ctx); 
  6.         } 
  7.     } 

首先我們重點關注一個 initChannel(ctx),

  1. private boolean initChannel(ChannelHandlerContext ctx) throws Exception { 
  2.     // 防止再次進入。 
  3.     if (initMap.add(ctx)) { 
  4.         try { 
  5.             // 調用 ChannelInitializer 實現(xiàn)的 initChannel() 方法 
  6.             initChannel((C) ctx.channel()); 
  7.         } catch (Throwable cause) { 
  8.             ................................ 
  9.         } finally { 
  10.             ChannelPipeline pipeline = ctx.pipeline(); 
  11.             if (pipeline.context(this) != null) { 
  12.                 // 將 ChannelInitializer 自身從 Pipeline 中移出 
  13.                 pipeline.remove(this); 
  14.             } 
  15.         } 
  16.         return true
  17.     } 
  18.     return false

該方法會回調ChannelInitializer的抽象方法initChannel,該抽象方法在我們初始化的時候完成,我們就要找到實現(xiàn)這個抽象方法的地方,我們回到上一節(jié)課的代碼: io.netty.bootstrap.ServerBootstrap#init

  1. void init(Channel channel) { 
  2.     ..........................; 
  3.     p.addLast(new ChannelInitializer<Channel>() { 
  4.         @Override 
  5.         public void initChannel(final Channel ch) { 
  6.             final ChannelPipeline pipeline = ch.pipeline(); 
  7.             //將用戶自定義的handler添加進管道  handler 是在構建ServerBootStr的時候傳入的  handler 
  8.             ChannelHandler handler = config.handler(); 
  9.             if (handler != null) { 
  10.                 pipeline.addLast(handler); 
  11.             } 
  12.  
  13.             ch.eventLoop().execute(() -> { 
  14.                 pipeline.addLast(new ServerBootstrapAcceptor( 
  15.                     ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); 
  16.             }); 
  17.     }     

上一節(jié)課講的時候,我們將這一段邏輯略過了,只說是會向通道中添加一個ChannelInitializer實現(xiàn),現(xiàn)在開始回調他的initChannel方法了:

  1. ChannelHandler handler = config.handler(); 
  2. if (handler != null) { 
  3.     pipeline.addLast(handler); 

這段代碼會將客戶再構建ServerBootstrap的時候傳入的handler添加進通道,我們?yōu)榱朔奖憷斫猓僭O用戶沒有設置handler,所以這個handler判斷不通過,跳過,我們繼續(xù)往下:

  1. ch.eventLoop().execute(() -> { 
  2.     pipeline.addLast(new ServerBootstrapAcceptor( 
  3.         ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); 
  4. }); 

這里異步的向管道流注冊一個默認的Handler, 為什么說是異步的,我們暫且不說,我們暫且認為是同步的進行add,此時我們的通道如下:

  1. ServerBootstrapAcceptor( 
  2.                 final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler, 
  3.                 Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) { 
  4.     this.childGroup = childGroup; 
  5.     this.childHandler = childHandler; 
  6.     this.childOptions = childOptions; 
  7.     this.childAttrs = childAttrs; 
  8.  
  9.     enableAutoReadTask = new Runnable() { 
  10.         @Override 
  11.         public void run() { 
  12.             channel.config().setAutoRead(true); 
  13.         } 
  14.     }; 

我們可以看到,他會保存一系列的參數(shù),包括WorkGroup、childHandler、childOptions、childAttrs這些參數(shù)都是我們再創(chuàng)建serverBootstrap的時候傳入的參數(shù),這也證明了,這些參數(shù)是作用于客戶端Socket連接的!

有關ServerBootstrapAcceptor,后續(xù)會進行一個詳細的分析,我們接著說,這里需要重點講一下addLast方法,

  1. @Override 
  2.     public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { 
  3.         ........................忽略......................... 
  4.         //通知添加方法回調 
  5.         callHandlerAdded0(newCtx); 
  6.         return this; 
  7.     } 

在進行添加的時候,他會回調內部產生的handlerAdded方法,還記得,我們在介紹Netty的基本架構的業(yè)務通道章節(jié)嗎?

再調用addLast之后,該方法會被回調!

這樣就講所有的方法注冊完畢了,我們繼續(xù)回到ChannelInitializer#handlerAdded方法,當**initChannel(ctx)**調用完了之后:

  1. private boolean initChannel(ChannelHandlerContext ctx) throws Exception { 
  2.     // 防止再次進入。 
  3.     if (initMap.add(ctx)) { 
  4.         try { 
  5.             // 調用 ChannelInitializer 實現(xiàn)的 initChannel() 方法 
  6.             initChannel((C) ctx.channel()); 
  7.         } catch (Throwable cause) { 
  8.             ................................ 
  9.         } finally { 
  10.             ChannelPipeline pipeline = ctx.pipeline(); 
  11.             if (pipeline.context(this) != null) { 
  12.                 // 將 ChannelInitializer 自身從 Pipeline 中移出 
  13.                 pipeline.remove(this); 
  14.             } 
  15.         } 
  16.         return true
  17.     } 
  18.     return false

我們會進入到finally里面,我們會看到,此時會做一個操作,刪除當前的類,當前的類是誰,是ChannelInitializer,所以刪除完畢后,此時管道對象的結構如圖所示:

至此 invokeHandlerAddedIfNeeded 分析完畢

pipeline.fireChannelRegistered();

  1. @Override 
  2. public final ChannelPipeline fireChannelRegistered() { 
  3.     AbstractChannelHandlerContext.invokeChannelRegistered(head); 
  4.     return this; 

這行代碼其實沒什么可說的,大家可以自己跟一下調試一下代碼,這個代碼的意義是從HeadContext節(jié)點開始傳播channelRegistered方法:

至此,NioServerSocketChannel的注冊基本就分析完了,有的同學可能覺得少分析了一段:

  1. if (isActive()) { 
  2.     if (firstRegistration) { 
  3.         //Channel 當前狀態(tài)為活躍時,觸發(fā) channelActive 事件 
  4.         pipeline.fireChannelActive(); 
  5.     } else if (config().isAutoRead()) { 
  6.         // 該通道已注冊,并已設置autoRead()。這意味著我們需要開始閱讀 
  7.         // 再次,以便我們處理入站數(shù)據。 
  8.         // 
  9.         // See https://github.com/netty/netty/issues/4805 
  10.         //開始讀事件 
  11.         beginRead(); 
  12.     } 

這段代碼再第一次啟動的時候并不會被調用,因為此時通道還沒有綁定端口正式啟動起了,所以這里isActive會返回false,有關邏輯,會在新連接接入講解的時候進行分析!

三、總結

 

  1. Netty會調用JDK底層的注冊方法,同時將本身的NioServerSocketChannel作為att綁定到選擇事件上!
  2. 當注冊完成后會回調 handlerAdded方法
  3. Netty會回調再初始化NioServerSocketChannel的時候注冊的Channelinitialization, 添加一個新連接接入器ServerBootstrapAcceptor,并刪除本身!
  4. 當注冊完成后會回調Channelregistered方法

 

責任編輯:武曉燕 來源: 源碼學徒
相關推薦

2021-07-07 05:00:17

初始化源碼

2022-04-11 08:25:37

XMLSQL語句Mybatis

2021-07-12 08:00:21

Nacos 服務注冊源碼分析

2021-07-03 08:51:30

源碼Netty選擇器

2023-03-17 17:51:30

APIServer路由注冊

2022-05-06 07:52:06

Nacos服務注冊

2010-06-21 10:40:06

Linux APM

2022-05-20 10:32:49

事件循環(huán)器事件隊列鴻蒙

2015-09-16 09:10:27

Java源碼解析

2018-07-19 15:57:46

ViewStub源碼方法

2012-11-06 11:07:59

jQueryJSjQuery框架

2024-11-18 16:15:00

2024-01-18 08:31:22

go實現(xiàn)gorm框架

2021-02-20 06:09:46

libtask協(xié)程鎖機制

2020-12-01 15:00:20

Java 基礎

2016-08-31 13:48:00

AndroidRetrofit源碼解析

2021-07-16 06:56:50

Nacos注冊源碼

2020-10-14 11:11:56

Redux源碼解析

2022-03-18 15:55:15

鴻蒙操作系統(tǒng)架構

2022-12-07 08:02:43

Spring流程IOC
點贊
收藏

51CTO技術棧公眾號