NIO之多線程協(xié)作處理數(shù)據(jù)讀寫
本文轉(zhuǎn)載自微信公眾號(hào)「源碼學(xué)徒」,作者皇甫嗷嗷叫 。轉(zhuǎn)載本文請(qǐng)聯(lián)系源碼學(xué)徒公眾號(hào)。
經(jīng)過前面幾章的學(xué)習(xí),我們已經(jīng) 能夠掌握了JDK NIO的開發(fā)方式,我們來總結(jié)一下NIO開發(fā)的流程:
- 創(chuàng)建一個(gè)服務(wù)端通道 ServerSocketChannel
- 創(chuàng)建一個(gè)選擇器 Selector
- 將服務(wù)端通道注冊(cè)到選擇器上,并且關(guān)注我們感興趣的事件serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
- 綁定服務(wù)管道的地址 serverSocketChannel.bind(new InetSocketAddress(8989));
- 開始進(jìn)行事件選擇,選擇我們感興趣的事件做對(duì)應(yīng)的操作!
具體的代碼信息請(qǐng)參照第一章:多路復(fù)用模型章節(jié),這里不做太多的贅述!
有關(guān)多路復(fù)用的概念,我們也在第一章進(jìn)行了分析。多路復(fù)用模型能夠最大限度的將一個(gè)線程的執(zhí)行能力榨干,一條線程執(zhí)行所有的數(shù)據(jù),包括新連接的接入、數(shù)據(jù)的讀取、計(jì)算與回寫,但是假設(shè),我們的數(shù)據(jù)計(jì)算及其緩慢,那么該任務(wù)的執(zhí)行就勢(shì)必影響下一個(gè)新鏈接的接入!
傳統(tǒng)NIO單線程模型
單線程的NIO模型
如圖,我們能了解到,單線程情況下,讀事件因?yàn)橐鲆恍I(yè)務(wù)性操作(數(shù)據(jù)庫(kù)連接、圖片、文件下載)等操作,導(dǎo)致線程阻塞再,讀事件的處理上,此時(shí)單線程程序無法進(jìn)行下一次新鏈接的處理!我們對(duì)該線程模型進(jìn)行優(yōu)化,select事件處理封裝為任務(wù),提交到線程池!
NIO多線程模型
上面的這種數(shù)據(jù)結(jié)構(gòu)能夠解決掉因?yàn)橛?jì)算任務(wù)耗時(shí)過長(zhǎng),導(dǎo)致新鏈接接入阻塞的問題,我們能否再次進(jìn)行一次優(yōu)化呢?
我們能否創(chuàng)建多個(gè)事件選擇器,每個(gè)事件選擇器,負(fù)責(zé)不同的Socket連接,就像下面這種:
NIO多線程優(yōu)化模型
這樣我們就可以每一個(gè)Select選擇器負(fù)責(zé)多個(gè)客戶端Socket連接,主線程只需要將客戶端新連接選擇一個(gè)選擇器注冊(cè)到select選擇器上就可以了!所以我們的架構(gòu)圖,就變成了下圖這樣:
我們?cè)趕elect選擇器內(nèi)部處理計(jì)算任務(wù)的時(shí)候,也可以將任務(wù)封裝為task,提交到線程池里面去,徹底將新連接接入和讀寫事件處理分離開,互不影響!事實(shí)上,這也是Netty的核心思想之一,我們可以根據(jù)上面的圖例,自己簡(jiǎn)單寫一個(gè):
代碼實(shí)現(xiàn)
構(gòu)建一個(gè)事件執(zhí)行器 對(duì)應(yīng)上圖的select選擇器
- /**
- * Nio事件處理器
- *
- * @author huangfu
- * @date
- */
- public class MyNioEventLoop implements Runnable {
- static final ByteBuffer ALLOCATE = ByteBuffer.allocate(128);
- private final Selector selector;
- private final LinkedBlockingQueue<Runnable> linkedBlockingQueue;
- public MyNioEventLoop(Selector selector) {
- this.selector = selector;
- linkedBlockingQueue = new LinkedBlockingQueue<>();
- }
- public Selector getSelector() {
- return selector;
- }
- public LinkedBlockingQueue<Runnable> getLinkedBlockingQueue() {
- return linkedBlockingQueue;
- }
- //忽略 hashCode和eques
- /**
- * 任務(wù)處理器
- */
- @Override
- public void run() {
- while (!Thread.currentThread().isInterrupted()) {
- try {
- //進(jìn)行事件選擇 這里我們只處理讀事件
- if (selector.select() > 0) {
- Set<SelectionKey> selectionKeys = selector.selectedKeys();
- Iterator<SelectionKey> iterator = selectionKeys.iterator();
- //處理讀事件
- while (iterator.hasNext()) {
- SelectionKey next = iterator.next();
- iterator.remove();
- if (next.isReadable()) {
- SocketChannel channel = (SocketChannel) next.channel();
- int read = channel.read(ALLOCATE);
- if(read > 0) {
- System.out.printf("線程%s【%s】發(fā)來消-息:",Thread.currentThread().getName(), channel.getRemoteAddress());
- System.out.println(new String(ALLOCATE.array(), StandardCharsets.UTF_8));
- }else if(read == -1) {
- System.out.println("連接斷開");
- channel.close();
- }
- ALLOCATE.clear();
- }
- }
- selectionKeys.clear();
- }else {
- //處理異步任務(wù) 進(jìn)行注冊(cè)
- while (!linkedBlockingQueue.isEmpty()) {
- Runnable take = linkedBlockingQueue.take();
- //異步事件執(zhí)行
- take.run();
- }
- }
- } catch (IOException | InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
構(gòu)建一個(gè)選擇器組
- /**
- * 選擇器組
- *
- * @author huangfu
- * @date 2021年3月12日09:44:37
- */
- public class SelectorGroup {
- private final List<MyNioEventLoop> SELECTOR_GROUP = new ArrayList<>(8);
- private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
- private final static AtomicInteger IDX = new AtomicInteger();
- /**
- * 初始化選擇器
- * @param count 處理器數(shù)量
- * @throws IOException 異常欣喜
- */
- public SelectorGroup(int count) throws IOException {
- for (int i = 0; i < count; i++) {
- Selector open = Selector.open();
- MyNioEventLoop myNioEventLoop = new MyNioEventLoop(open);
- SELECTOR_GROUP.add(myNioEventLoop);
- }
- }
- public SelectorGroup() throws IOException {
- this(AVAILABLE_PROCESSORS << 1);
- }
- /**
- * 輪詢獲取一個(gè)選擇器
- * @return 返回一個(gè)選擇器
- */
- public MyNioEventLoop next(){
- int andIncrement = IDX.getAndIncrement();
- int length = SELECTOR_GROUP.size();
- return SELECTOR_GROUP.get(Math.abs(andIncrement % length));
- }
- }
構(gòu)建一個(gè)執(zhí)行器記錄器
- /**
- * @author huangfu
- * @date
- */
- public class ThreadContext {
- /**
- * 記錄當(dāng)前使用過的選擇器
- */
- public static final Set<MyNioEventLoop> RUN_SELECT = new HashSet<>();
- }
構(gòu)建一個(gè)新連接接入選擇器
- /**
- * 連接器
- *
- * @author huangfu
- * @date 2021年3月12日10:15:37
- */
- public class Acceptor implements Runnable {
- private final ServerSocketChannel serverSocketChannel;
- private final SelectorGroup selectorGroup;
- public Acceptor(ServerSocketChannel serverSocketChannel, SelectorGroup selectorGroup) {
- this.serverSocketChannel = serverSocketChannel;
- this.selectorGroup = selectorGroup;
- }
- @Override
- public void run() {
- try {
- SocketChannel socketChannel = serverSocketChannel.accept();
- MyNioEventLoop next = selectorGroup.next();
- //向隊(duì)列追加一個(gè)注冊(cè)任務(wù)
- next.getLinkedBlockingQueue().offer(() -> {
- try {
- //客戶端注冊(cè)為非阻塞
- socketChannel.configureBlocking(false);
- //注冊(cè)到選擇器 關(guān)注一個(gè)讀事件
- socketChannel.register(next.getSelector(), SelectionKey.OP_READ);
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
- //喚醒對(duì)應(yīng)的任務(wù),讓其處理異步任務(wù)
- next.getSelector().wakeup();
- System.out.println("檢測(cè)到連接:" + socketChannel.getRemoteAddress());
- //當(dāng)當(dāng)前選擇器已經(jīng)被使用過了 就不再使用了,直接注冊(cè)就行了
- if (ThreadContext.RUN_SELECT.add(next)) {
- //啟動(dòng)任務(wù)
- new Thread(next).start();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
創(chuàng)建啟動(dòng)器
- /**
- * @author huangfu
- * @date
- */
- public class TestMain {
- public static void main(String[] args) throws IOException {
- //創(chuàng)建一個(gè)選擇器組 傳遞選擇器組的大小 決定使用多少選擇器來實(shí)現(xiàn)
- SelectorGroup selectorGroup = new SelectorGroup(2);
- //開啟一個(gè)服務(wù)端管道
- ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
- //開啟一個(gè)服務(wù)端專用的選擇器
- Selector selector = Selector.open();
- //設(shè)置非阻塞
- serverSocketChannel.configureBlocking(false);
- //創(chuàng)建一個(gè)連接器
- Acceptor acceptor = new Acceptor(serverSocketChannel, selectorGroup);
- //將服務(wù)端通道注冊(cè)到服務(wù)端選擇器上 這里會(huì)綁定一個(gè)新連接接入器
- serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, acceptor);
- //綁定端口
- serverSocketChannel.bind(new InetSocketAddress(8989));
- //啟動(dòng)處理器
- new Reactor(selector).run();
- }
- }
總結(jié)
單線程下的NIO存在性能瓶頸,當(dāng)某一計(jì)算過程緩慢的時(shí)候會(huì)阻塞住整個(gè)線程,導(dǎo)致影響其他事件的處理!
為了解決這一缺陷,我們提出了使用異步線程的方式去操作任務(wù),將耗時(shí)較長(zhǎng)的業(yè)務(wù),封裝為一個(gè)異步任務(wù),提交到線程池執(zhí)行!
為了使業(yè)務(wù)操作和新連接接入完全分離開,我們做了另外一重優(yōu)化,我們封裝了一個(gè)選擇器組,輪詢的方式獲取選擇器,每一個(gè)選擇器都能夠處理多個(gè)新連接, socket連接->selector選擇器 = 多 -> 1,在每一個(gè)選擇器里面又可以使用線程池來處理任務(wù),進(jìn)一步提高吞吐量!