聊聊Java NIO Selector 使用
之前的文章已經(jīng)把 Java 中 NIO 的 Buffer、Channel 講解完了,不太了解的可以先回過(guò)頭去看看。這篇文章我們就來(lái)聊聊 Selector —— 選擇器。
首先 Selector 是用來(lái)干嘛的呢?不熟悉這個(gè)概念的話我們其實(shí)可以這么理解:
selector
把它當(dāng)作 SQL 中的 select 語(yǔ)句,在 SQL 中無(wú)非就是篩選出符合條件的結(jié)果集合。而 NIO 中的 Selector 用途類似,只不過(guò)它選擇出來(lái)的是有就緒 IO 事件的 Channel。
IO 事件代表了 Channel 對(duì)于不同的 IO 操作所處的不同的狀態(tài),而不是對(duì) Channel 進(jìn)行 IO 操作??偣灿?4 種 IO 事件的定義:
- OP_READ 可讀
- OP_WRITE 可寫(xiě)
- OP_CONNECT 連接
- OP_ACCEPT 接收
IO 事件分類
比如 OP_READ,其就緒是指數(shù)據(jù)已經(jīng)在內(nèi)核態(tài) Ready 了并且已經(jīng)從內(nèi)核態(tài)復(fù)制到了用戶態(tài)的緩沖區(qū),然后我們的應(yīng)用程序就可以去讀取數(shù)據(jù)了,這叫可讀。
再比如 OP_CONNECT,當(dāng)某個(gè) Channel 已經(jīng)完成了握手連接,則 Channel 就會(huì)處于 OP_CONNECT 的狀態(tài)。
對(duì)用戶態(tài)和內(nèi)核態(tài)不了解的,可以去看看之前寫(xiě)的 《用戶態(tài)和內(nèi)核態(tài)的區(qū)別》
在之前講 BIO 模型的時(shí)候說(shuō)過(guò),用戶態(tài)在發(fā)起 read 系統(tǒng)調(diào)用之后會(huì)一直阻塞,直到數(shù)據(jù)在內(nèi)核態(tài) Ready 并且復(fù)制到用戶態(tài)的緩沖區(qū)內(nèi)。如果只有一個(gè)用戶還好,隨便你阻塞多久。但要是這時(shí)有其他用戶發(fā)請(qǐng)求進(jìn)來(lái)了,就會(huì)一直卡在這里等待。這樣串行的處理會(huì)導(dǎo)致系統(tǒng)的效率極其低下。
針對(duì)這個(gè)問(wèn)題,也是有解決方案的。那就是為每個(gè)用戶都分配一個(gè)線程(即 Connection Per Thread),乍一想這個(gè)思路可能沒(méi)問(wèn)題,但使用線程需要消耗系統(tǒng)的資源,例如在 JVM 中一個(gè)線程會(huì)占用較多的資源,非常昂貴。系統(tǒng)稍微并發(fā)多一些(例如上千),你的系統(tǒng)就會(huì)直接 OOM 了。而且,線程頻繁的創(chuàng)建、銷毀、切換也是一個(gè)比較耗時(shí)的操作。
而如果用 NIO,雖然不會(huì)阻塞了,但是會(huì)一直輪詢,讓 CPU 空轉(zhuǎn),也是一個(gè)不環(huán)保的方式。
而如果用 Selector,只需要一個(gè)線程來(lái)監(jiān)聽(tīng)多個(gè) Channel,而這個(gè)多個(gè)可以上千、上萬(wàn)甚至更多。那這些 Channel 是怎么跟 Selector 關(guān)聯(lián)上的呢?
答案是通過(guò)注冊(cè),因?yàn)楝F(xiàn)在變成了 Selector 決定什么時(shí)候處理 Channel 中的事件,而注冊(cè)操作則相當(dāng)于將 Channel 的控制權(quán)轉(zhuǎn)交給了 Selector。一旦注冊(cè)上了,后續(xù)當(dāng) Channel 有就緒的 IO 事件,Selector 就會(huì)將它們選擇出來(lái)執(zhí)行對(duì)應(yīng)的操作。
說(shuō)了這么多,來(lái)看個(gè)例子吧,客戶端的代碼相對(duì)簡(jiǎn)單,后續(xù)再看,我們先看服務(wù)端的:
public static void main(String[] args) throws IOException {
// 創(chuàng)建 selector, 管理多個(gè) channel
Selector selector = Selector.open();
// 創(chuàng)建 ServerSocketChannel 并且綁定端口
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8080));
// 將 channel 注冊(cè)到 selector 上
SelectionKey serverSocketChannelKey = serverSocketChannel.register(selector, 0);
// 由于總共有 4 種事件, 分別是 accept、connect、read 和 write,
// 分別代表有連接請(qǐng)求時(shí)觸發(fā)、客戶端建立連接時(shí)觸發(fā)、可讀事件、可寫(xiě)事件
// 我們可以使用 interestOps 來(lái)表明只處理有連接請(qǐng)求的事件
serverSocketChannelKey.interestOps(SelectionKey.OP_ACCEPT);
System.out.printf("serverSocketChannel %s\n", serverSocketChannelKey);
while (true) {
// 沒(méi)有事件發(fā)生, 線程會(huì)阻塞; 有事件發(fā)生, 就會(huì)讓線程繼續(xù)執(zhí)行
System.out.println("start to select...");
selector.select();
// 換句話說(shuō), 有連接過(guò)來(lái)了, 就會(huì)繼續(xù)往下走
// 通過(guò) selectedKeys 包含了所有發(fā)生的事件, 可能會(huì)包含 READ 或者 WRITE
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
System.out.printf("selected key %s\n", key);
// 這里需要進(jìn)行事件區(qū)分
if (key.isAcceptable()) {
System.out.println("get acceptable event");
// 觸發(fā)此次事件的 channel, 拿到事件一定要處理, 否則會(huì)進(jìn)入非阻塞模式, 空轉(zhuǎn)占用 CPU
// 例如你可以使用 key.cancel()
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = channel.accept();
socketChannel.configureBlocking(false);
// 這個(gè) socketChannel 也需要注冊(cè)到 selector 上, 相當(dāng)于把控制權(quán)交給 selector
SelectionKey socketChannelKey = socketChannel.register(selector, 0);
socketChannelKey.interestOps(SelectionKey.OP_READ);
System.out.printf("get socketChannel %s\n", socketChannel);
} else if (key.isReadable()) {
System.out.println("get readable event");
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buf = ByteBuffer.allocate(16);
channel.read(buf);
buf.flip();
ByteBufferUtil.debugRead(buf);
key.cancel();
}
iterator.remove();
}
}
}
看起來(lái)有點(diǎn)多,但相應(yīng)的注釋都寫(xiě)了,可以先看看。其實(shí)這里的很多代碼跟之前的玩轉(zhuǎn) Channel 的代碼差不多的,這里抽一些我認(rèn)為值得講的解釋一下。
首先就是 Selector.open(),跟 Channel 的 open 方法類似,可以理解為創(chuàng)建一個(gè) selector。
其次就是 SelectionKey serverSocketChannelKey = serverSocketChannel.register(selector, 0); 了,我們調(diào)用了 serverSocketChannel 的注冊(cè)方法之后,返回了一個(gè) SelectionKey,這是個(gè)什么概念呢?
說(shuō)簡(jiǎn)單點(diǎn),你可以把 SelectionKey 理解為你去商場(chǎng)寄存柜存東西,那個(gè)機(jī)器吐給你的提取憑證
換句話說(shuō),這個(gè) SelectionKey 就是當(dāng)前這個(gè) serverSocketChannel 注冊(cè)到 selector 上的憑證。selector 會(huì)維護(hù)一個(gè) SelectionKey 的集合,用于統(tǒng)一管理。
selectionkey 集合
上圖中的每個(gè) Key 都代表了一個(gè)具體的 Channel。
而至于 register 的第二個(gè)參數(shù),我們傳入的是 0,代表了當(dāng)前 Selector 需要關(guān)注這個(gè) Channel 的哪些 IO 事件。0 代表不關(guān)注任何事件,我們這里是通過(guò) serverSocketChannelKey.interestOps(SelectionKey.OP_ACCEPT); 來(lái)告訴 Selector,對(duì)這個(gè) Channel 只關(guān)注 OP_ACCEPT 事件。
IO 事件有 4 個(gè),如果你想要同時(shí)監(jiān)聽(tīng)多個(gè) IO 事件怎么辦呢?答案是通過(guò)或運(yùn)算符。
serverSocketChannelKey.interestOps(SelectionKey.OP_ACCEPT | SelectionKey.OP_READ);
上面說(shuō)過(guò),NIO 雖然不阻塞,但會(huì)一直輪詢占用 CPU 的資源,而 Selector 解決了這個(gè)問(wèn)題。在調(diào)用完 selector.select(); 之后,線程會(huì)在這里阻塞,而不會(huì)像 NIO 一樣瘋狂輪詢,把 CPU 拉滿。所以 Selector 只會(huì)在有事件處理的時(shí)候才執(zhí)行,其余時(shí)間都會(huì)阻塞,極大的減少了 CPU 資源的占用。
當(dāng)客戶端調(diào)用 connect 發(fā)起連接之后,Channel 就會(huì)處于 OP_CONNECT 就緒狀態(tài),selector.select(); 就不會(huì)再阻塞,會(huì)繼續(xù)往下運(yùn)行,即:
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
其中 selectedKeys 這個(gè)名字也能看出來(lái),表示被選出來(lái)的 SelectionKey。上面我們已經(jīng)討論過(guò) Selector 維護(hù)的一種集合 —— SelectionKey 集合,接下來(lái)我們?cè)儆懻摿硗庖环N集合 —— SelectedKey 集合。
selectedkey 集合
當(dāng) Channel 有就緒 IO 事件之后,對(duì)應(yīng)的 Key 就會(huì)被加入到 SelectedKey 集合中,然后這一次 While 循環(huán)會(huì)依次處理被選擇出來(lái)的所有 Key。
但被選擇出來(lái)的 Key 可能觸發(fā)的是不同的 IO 事件,所以我們需要對(duì) Key 進(jìn)行區(qū)分。代碼里區(qū)分了 OP_ACCEPT 和 OP_READ,分別討論一下。
ServerSocketChannel 一開(kāi)始 register 的時(shí)候只設(shè)定關(guān)注 OP_ACCEPT 事件,所以第一次循環(huán)只會(huì)進(jìn)入 IsAcceptable 分支里,所以這里通過(guò) iterator.next() 迭代器拿到的 SelectionKey 就是 serverSocketChannel 注冊(cè)之后返回的 Key,同理拿到的 channel 的就是最開(kāi)始調(diào)用 ServerSocketChannel.open(); 創(chuàng)建的 channel。
拿到了 ServerSocketChannel 我們就可以調(diào)用其 accept() 方法來(lái)處理建立連接的請(qǐng)求了,這里值得注意的是,建立連接之后,這個(gè) SocketChannel 也需要注冊(cè)到 Selector 上去,因?yàn)檫@些 SocketChannel 也需要將控制權(quán)交給 Selector,這樣后續(xù)有就緒 IO 事件才能通過(guò) Selector 處理。這里我們對(duì)這個(gè) SocketChannel 只關(guān)注 OP_READ 事件。相當(dāng)于把后續(xù)進(jìn)來(lái)的所有的連接和 Selector 就關(guān)聯(lián)上了。
Accept 事件處理成功之后,服務(wù)器這邊會(huì)繼續(xù)循環(huán),然后再次在 selector.select(); 處阻塞住。
客戶端這邊會(huì)繼續(xù)調(diào)用 write 方法向 channel 寫(xiě)入數(shù)據(jù),數(shù)據(jù) Ready 之后就會(huì)觸發(fā) OP_READ 事件,然后繼續(xù)往下走,這次由于事件是 OP_READ 所以會(huì)進(jìn)入 key.isReadable() 這個(gè)分支。進(jìn)入這個(gè)分支之后會(huì)獲取到對(duì)應(yīng)的 SocketChannel,并從其中讀取客戶端發(fā)來(lái)的數(shù)據(jù)。
而另一個(gè)值得關(guān)注的是 iterator.remove();,每次迭代都需要把當(dāng)前處理的 SelectedKey 移除,這是為什么呢?
因?yàn)閷?duì)應(yīng)的 Key 進(jìn)入了 SelectedKey 集合之后,不會(huì)被 NIO 里的機(jī)制給移除。如果我們不去移除,那么下一次調(diào)用 selector.selectedKeys().iterator(); 會(huì)發(fā)現(xiàn),上次處理的有 OP_ACCEPT 事件的 SelectionKey 還在,而這會(huì)導(dǎo)致上面的服務(wù)端程序拋出空指針異常。
大家可以自行將 iterator.remove(); 注釋掉再試試
客戶端的代碼很簡(jiǎn)單,就直接給出來(lái)了:
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 8080));
ByteBuffer buffer = ByteBuffer.allocate(16);
buffer.put("test".getBytes(StandardCharsets.UTF_8));
buffer.flip();
socketChannel.write(buffer);
}
如果不去移除的話,服務(wù)端會(huì)在下面這行 NPE。
socketChannel.configureBlocking(false);
為啥呢?因?yàn)榇藭r(shí) SelectionKey 雖然還在,ServerSocketChannel 也能拿到,但調(diào)用 channel.accept(); 的時(shí)候,并沒(méi)有客戶端真正在發(fā)起連接(上一個(gè)循環(huán)已經(jīng)處理過(guò)真正的連接請(qǐng)求了,只是沒(méi)有將這個(gè) Key 從 SelectedKey 中移除)。所以 channel.accept(); 會(huì)返回一個(gè) null,我們?cè)賹?duì) null 調(diào)用 configureBlocking 方法,自然而然就 NPE 了。