NioEventLoop源碼解析
本文轉載自微信公眾號「源碼學徒」,作者皇甫嗷嗷叫 。轉載本文請聯系源碼學徒公眾號。
源碼分析
上一節(jié)課,我們就 new NioEventLoopGroup();的初始化過程做了一個深度的解析,后來我們發(fā)現,NioEventLoopGroup在初始化過程中會構建一個執(zhí)行器數組,數組內部存儲的元素是NioEventLoop類型的,但是NioEventLoop是什么呢?為什么說他是Netty的精髓呢?
我們直接進入到NioEventLoop看他的構造方法:
上一節(jié)課我們是在循環(huán)填充執(zhí)行器數組的過程中創(chuàng)建的,具體參見上一節(jié)課的for循環(huán)中的 newChild方法,這里直接分析源碼
- NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
- SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
- EventLoopTaskQueueFactory queueFactory) {
- //保存外部線程任務newTaskQueue(queueFactory)
- super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
- rejectedExecutionHandler);
- this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
- this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
- final SelectorTuple selectorTuple = openSelector();
- this.selector = selectorTuple.selector;
- this.unwrappedSelector = selectorTuple.unwrappedSelector;
- }
關于super我們一會再往后追
一、保存選擇器生產者
- this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
是綁定了一個類似于生產者的東西,使我們再初始化NioEventLoopGroup的時候初始化的,使用該生產者,后續(xù)可以獲取選擇器或者Socket通道等!
二、保存選擇器
- this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
保存一個默認的選擇策略到NioEventLoop對象里面
三、開啟一個選擇器
- final SelectorTuple selectorTuple = openSelector();
開啟一個選擇器包裝對象,內含一個選擇器!Netty官方為了Netty性能的進一步優(yōu)化,喪心病狂的對這個選擇器也進行了優(yōu)化,我們跟進一下openSelector方法,看看他是如何優(yōu)化的,內部代碼比較復雜,我們逐行分析:
1. 獲取原始的選擇器
- unwrappedSelector = provider.openSelector();
使用原始的生產者對象,獲取一個原始的選擇器,后續(xù)使用!
2. 判斷是否啟動用選擇器優(yōu)化
- //禁用優(yōu)化選項 默認false
- if (DISABLE_KEY_SET_OPTIMIZATION) {
- //如果不優(yōu)化那么就直接包裝原始的選擇器
- return new SelectorTuple(unwrappedSelector);
- }
DISABLE_KEY_SET_OPTIMIZATION默認為false, 當禁用優(yōu)化的時候,會將selector選擇器直接進行包裝返回! 默認會進行優(yōu)化,所以一般不會進這個邏輯分支!
3. 獲取一個選擇器的類的對象
- //如果需要優(yōu)化
- //反射獲取對應的類的對象 SelectorImpl
- Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
- @Override
- public Object run() {
- try {
- return Class.forName(
- "sun.nio.ch.SelectorImpl",
- false,
- PlatformDependent.getSystemClassLoader());
- } catch (Throwable cause) {
- return cause;
- }
- }
- });
這個代碼是返回一個 SelectorImpl的Class對象,這里是返回SelectorImpl的Class對象!我們由上述代碼可以看出來,如果獲取失敗,會返回一個異常,異常的話肯定不行,所以就要對可能會發(fā)生的異常做出操作:
- //如果沒有獲取成功
- f (!(maybeSelectorImplClass instanceof Class) ||
- // 確保當前的選擇器實現是我們可以檢測到的。 判斷是 unwrappedSelector的子類或者同類
- !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
- //發(fā)生異常
- if (maybeSelectorImplClass instanceof Throwable) {
- Throwable t = (Throwable) maybeSelectorImplClass;
- logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
- }
- //還是包裝為未優(yōu)化的選擇器
- return new SelectorTuple(unwrappedSelector);
如果發(fā)生了異常,或者獲取的和原始選擇器不是一個對象,就還使用原始選擇器包裝返回!
4. 創(chuàng)建一個優(yōu)化后的selectKeys
- final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
使用過NIO的都應該知道,使用選擇器是能夠獲取一個事件的Set集合的,這里Netty官方自己實現了一個Set集合,內部使用數組來進行優(yōu)化!因為Hashset集合是使用HashMap方法來實現的,(this ->Object) 再添加元素的時候如果發(fā)生了hash碰撞的話會遍歷hash槽上的鏈表 算法復雜度為O(n),但是數組不一樣 數組是O(1) 所以Netty官方使用數組來優(yōu)化選擇器事件集合 默認是1024 滿了之后2倍擴容!
大家可以簡單的把它看做一個Set集合,只不過他是使用數組的形式來實現的!內部重寫了add、size、iterator的方法,其余方法全部廢棄!
這個是Netty對選擇器優(yōu)化的一個重要對象,使得再追加事件的時候,算法復雜度由O(N)直接變?yōu)榱薕(1)!
5. 開始進行反射替換selectedKeys
- //開始進行反射替換
- Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
- @Override
- public Object run() {
- try {
- //獲取選擇器事件中的 事件對象 selectedKeys的屬性對象
- Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
- //獲取公開選擇的密鑰
- Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
- //java9且存在 Unsafe的情況下 直接替換內存空間的數據
- if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
- // 讓我們嘗試使用sun.misc.Unsafe替換SelectionKeySet。
- // 這使我們也可以在Java9 +中執(zhí)行此操作,而無需任何額外的標志。
- long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
- long publicSelectedKeysFieldOffset =
- PlatformDependent.objectFieldOffset(publicSelectedKeysField);
- if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
- PlatformDependent.putObject(
- unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
- PlatformDependent.putObject(
- unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
- return null;
- }
- // 如果沒法直接替換內存空間的數據 就想辦法用反射
- }
- //java8或者java9+上述未操作完成的 使用反射來替換
- Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
- if (cause != null) {
- return cause;
- }
- cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
- if (cause != null) {
- return cause;
- }
- //開始進行替換 將我們創(chuàng)建的優(yōu)化后的事件數組來反射的替換進選擇器中
- selectedKeysField.set(unwrappedSelector, selectedKeySet);
- publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
- return null;
- } catch (NoSuchFieldException e) {
- return e;
- } catch (IllegalAccessException e) {
- return e;
- }
- }
- });
代碼雖然多,但是,邏輯比較簡單!
- 首先獲取SelectorImpl類對象的 selectedKeys屬性和publicSelectedKeys屬性!
- 判斷使用的JDK版本是不是9以上,如果使用的9的話,直接操作JAVA的Unsafe對象操作系統(tǒng)的內存空間!有關Unsafe的介紹,再零拷貝章節(jié)介紹的很詳細,可以復習零拷貝章節(jié)! 我們這里使用的JDK8
- 如果使用的是JDK8,使用反射,將我們創(chuàng)建出來的SelectedKeys的優(yōu)化對象SelectedSelectionKeySet反射的替換進unwrappedSelector這個原始的選擇器!
6. 包裝選擇器
- return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
首先把unwrappedSelector選擇器包裝為 SelectedSelectionKeySetSelector包裝類!
再把unwrappedSelector和SelectedSelectionKeySetSelector對應起來,包裝Wie元組返回!
四、保存優(yōu)化后的選擇器和原始選擇器
- this.selector = selectorTuple.selector;
- this.unwrappedSelector = selectorTuple.unwrappedSelector;
五、調用父類,創(chuàng)建隊列
- super(parent, executor, false, newTaskQueue(queueFactory),
- newTaskQueue(queueFactory),rejectedExecutionHandler);
首先,他會通過newTaskQueue構建兩個隊列 ,這兩個隊列是什么類型的呢?
上一節(jié)課我們分析過,queueFactory == null,所以會走如圖分支代碼,DEFAULT_MAX_PENDING_TASKS如果沒有指定的話,默認是Integer.MAX,最小為16!我們進入到 該分支代碼看一下,他創(chuàng)建的是一個什么隊列:
- public static <T> Queue<T> newMpscQueue() {
- return Mpsc.newMpscQueue();
- }
可以看出他創(chuàng)建的是一個Mpsc隊列,他是一個多生產者,單消費者隊列,是由jctools框架提供的,后續(xù)如果可以,我會具體對該隊列進行一個講解,我們到這里就知道,再創(chuàng)建NIOEventLoop的時候,向父類內部傳遞了兩個Mpsc隊列,我們繼續(xù)回到主線:
進入到super(xxx)的源碼中:
- protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
- RejectedExecutionHandler rejectedExecutionHandler) {
- super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
- //保存一個 tailTasks 尾部隊列
- tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
- }
這里會保存一個隊列,尾部隊列,這個尾部隊列,官方的意思是想對Netty 的運行狀態(tài)做一些統(tǒng)計數據,例如任務循環(huán)的耗時、占用物理內存的大小等等,但是實際上應用tailTasks的場景極少,這里不做太多講解!
我們繼續(xù)跟進到super方法源碼里面:
- //parent 線程執(zhí)行器 false mpsc隊列 拒絕策略
- protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
- boolean addTaskWakesUp, Queue<Runnable> taskQueue,
- RejectedExecutionHandler rejectedHandler) {
- super(parent);
- this.addTaskWakesUp = addTaskWakesUp;
- this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
- //保存線程執(zhí)行器
- this.executor = ThreadExecutorMap.apply(executor, this);
- //創(chuàng)建一個隊列 Mpscq,外部線程執(zhí)行的時候使用這個隊列(不是在EventLoop的線程內執(zhí)行的時候) newTaskQueue(queueFactory)
- this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
- //保存拒絕策略
- this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
- }
這里是進一步保存,將該NioEventLoop對應的線程執(zhí)行器 、MpscQuerey任務隊列、對應的拒絕策略保存起來! 大家再后續(xù)看到使用對應變量的代碼千萬不要覺得陌生哦!
總結
創(chuàng)建和保存了兩個多生產者單消費者隊列tailTasks和taskQueue
保存一個線程執(zhí)行器executor
保存一個拒絕策略,該拒絕策略主要用于隊列滿了之后如何處理!
保存一個選擇器生產者!
創(chuàng)建一個優(yōu)化后的選擇器,并進行保存!
將原始選擇器和優(yōu)化后的選擇器進行保存!