自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

NIO之多線程協(xié)作處理數(shù)據(jù)讀寫

開發(fā) 前端
單線程下的NIO存在性能瓶頸,當(dāng)某一計(jì)算過程緩慢的時(shí)候會(huì)阻塞住整個(gè)線程,導(dǎo)致影響其他事件的處理!

[[407963]]

本文轉(zhuǎn)載自微信公眾號(hào)「源碼學(xué)徒」,作者皇甫嗷嗷叫 。轉(zhuǎn)載本文請(qǐng)聯(lián)系源碼學(xué)徒公眾號(hào)。

經(jīng)過前面幾章的學(xué)習(xí),我們已經(jīng) 能夠掌握了JDK NIO的開發(fā)方式,我們來總結(jié)一下NIO開發(fā)的流程:

  1. 創(chuàng)建一個(gè)服務(wù)端通道 ServerSocketChannel
  2. 創(chuàng)建一個(gè)選擇器 Selector
  3. 將服務(wù)端通道注冊(cè)到選擇器上,并且關(guān)注我們感興趣的事件serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  4. 綁定服務(wù)管道的地址 serverSocketChannel.bind(new InetSocketAddress(8989));
  5. 開始進(jìn)行事件選擇,選擇我們感興趣的事件做對(duì)應(yīng)的操作!

具體的代碼信息請(qǐng)參照第一章:多路復(fù)用模型章節(jié),這里不做太多的贅述!

有關(guān)多路復(fù)用的概念,我們也在第一章進(jìn)行了分析。多路復(fù)用模型能夠最大限度的將一個(gè)線程的執(zhí)行能力榨干,一條線程執(zhí)行所有的數(shù)據(jù),包括新連接的接入、數(shù)據(jù)的讀取、計(jì)算與回寫,但是假設(shè),我們的數(shù)據(jù)計(jì)算及其緩慢,那么該任務(wù)的執(zhí)行就勢(shì)必影響下一個(gè)新鏈接的接入!

傳統(tǒng)NIO單線程模型

單線程的NIO模型

如圖,我們能了解到,單線程情況下,讀事件因?yàn)橐鲆恍I(yè)務(wù)性操作(數(shù)據(jù)庫(kù)連接、圖片、文件下載)等操作,導(dǎo)致線程阻塞再,讀事件的處理上,此時(shí)單線程程序無法進(jìn)行下一次新鏈接的處理!我們對(duì)該線程模型進(jìn)行優(yōu)化,select事件處理封裝為任務(wù),提交到線程池!

NIO多線程模型

上面的這種數(shù)據(jù)結(jié)構(gòu)能夠解決掉因?yàn)橛?jì)算任務(wù)耗時(shí)過長(zhǎng),導(dǎo)致新鏈接接入阻塞的問題,我們能否再次進(jìn)行一次優(yōu)化呢?

我們能否創(chuàng)建多個(gè)事件選擇器,每個(gè)事件選擇器,負(fù)責(zé)不同的Socket連接,就像下面這種:

NIO多線程優(yōu)化模型

這樣我們就可以每一個(gè)Select選擇器負(fù)責(zé)多個(gè)客戶端Socket連接,主線程只需要將客戶端新連接選擇一個(gè)選擇器注冊(cè)到select選擇器上就可以了!所以我們的架構(gòu)圖,就變成了下圖這樣:

我們?cè)趕elect選擇器內(nèi)部處理計(jì)算任務(wù)的時(shí)候,也可以將任務(wù)封裝為task,提交到線程池里面去,徹底將新連接接入和讀寫事件處理分離開,互不影響!事實(shí)上,這也是Netty的核心思想之一,我們可以根據(jù)上面的圖例,自己簡(jiǎn)單寫一個(gè):

代碼實(shí)現(xiàn)

構(gòu)建一個(gè)事件執(zhí)行器 對(duì)應(yīng)上圖的select選擇器

  1. /** 
  2.  * Nio事件處理器 
  3.  * 
  4.  * @author huangfu 
  5.  * @date 
  6.  */ 
  7. public class MyNioEventLoop implements Runnable { 
  8.     static final ByteBuffer ALLOCATE = ByteBuffer.allocate(128); 
  9.     private final Selector selector; 
  10.     private final LinkedBlockingQueue<Runnable> linkedBlockingQueue; 
  11.     public MyNioEventLoop(Selector selector) { 
  12.         this.selector = selector; 
  13.         linkedBlockingQueue = new LinkedBlockingQueue<>(); 
  14.     } 
  15.  
  16.     public Selector getSelector() { 
  17.         return selector; 
  18.     } 
  19.  
  20.     public LinkedBlockingQueue<Runnable> getLinkedBlockingQueue() { 
  21.         return linkedBlockingQueue; 
  22.     } 
  23.  
  24.     //忽略  hashCode和eques 
  25.  
  26.     /** 
  27.      * 任務(wù)處理器 
  28.      */ 
  29.     @Override 
  30.     public void run() { 
  31.         while (!Thread.currentThread().isInterrupted()) { 
  32.             try { 
  33.                 //進(jìn)行事件選擇  這里我們只處理讀事件 
  34.                 if (selector.select() > 0) { 
  35.                     Set<SelectionKey> selectionKeys = selector.selectedKeys(); 
  36.                     Iterator<SelectionKey> iterator = selectionKeys.iterator(); 
  37.                     //處理讀事件 
  38.                     while (iterator.hasNext()) { 
  39.                         SelectionKey next = iterator.next(); 
  40.                         iterator.remove(); 
  41.                         if (next.isReadable()) { 
  42.                             SocketChannel channel = (SocketChannel) next.channel(); 
  43.                             int read = channel.read(ALLOCATE); 
  44.                             if(read > 0) { 
  45.                                 System.out.printf("線程%s【%s】發(fā)來消-息:",Thread.currentThread().getName(), channel.getRemoteAddress()); 
  46.                                 System.out.println(new String(ALLOCATE.array(), StandardCharsets.UTF_8)); 
  47.                             }else if(read == -1) { 
  48.                                 System.out.println("連接斷開"); 
  49.                                 channel.close(); 
  50.                             } 
  51.                             ALLOCATE.clear(); 
  52.                         } 
  53.                     } 
  54.                     selectionKeys.clear(); 
  55.                 }else { 
  56.                     //處理異步任務(wù)  進(jìn)行注冊(cè) 
  57.                     while (!linkedBlockingQueue.isEmpty()) { 
  58.                         Runnable take = linkedBlockingQueue.take(); 
  59.                         //異步事件執(zhí)行 
  60.                         take.run(); 
  61.                     } 
  62.                 } 
  63.             } catch (IOException | InterruptedException e) { 
  64.                 e.printStackTrace(); 
  65.             } 
  66.         } 
  67.     } 

構(gòu)建一個(gè)選擇器組

  1. /** 
  2.  * 選擇器組 
  3.  * 
  4.  * @author huangfu 
  5.  * @date 2021年3月12日09:44:37 
  6.  */ 
  7. public class SelectorGroup { 
  8.     private final List<MyNioEventLoop> SELECTOR_GROUP = new ArrayList<>(8); 
  9.     private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors(); 
  10.     private final static AtomicInteger IDX = new AtomicInteger(); 
  11.  
  12.     /** 
  13.      * 初始化選擇器 
  14.      * @param count 處理器數(shù)量 
  15.      * @throws IOException 異常欣喜 
  16.      */ 
  17.     public SelectorGroup(int count) throws IOException { 
  18.  
  19.         for (int i = 0; i < count; i++) { 
  20.             Selector open = Selector.open(); 
  21.             MyNioEventLoop myNioEventLoop = new MyNioEventLoop(open); 
  22.             SELECTOR_GROUP.add(myNioEventLoop); 
  23.         } 
  24.     } 
  25.  
  26.     public SelectorGroup() throws IOException { 
  27.         this(AVAILABLE_PROCESSORS << 1); 
  28.     } 
  29.  
  30.     /** 
  31.      * 輪詢獲取一個(gè)選擇器 
  32.      * @return 返回一個(gè)選擇器 
  33.      */ 
  34.     public MyNioEventLoop next(){ 
  35.         int andIncrement = IDX.getAndIncrement(); 
  36.         int length = SELECTOR_GROUP.size(); 
  37.  
  38.         return SELECTOR_GROUP.get(Math.abs(andIncrement % length)); 
  39.     } 

構(gòu)建一個(gè)執(zhí)行器記錄器

  1. /** 
  2.  * @author huangfu 
  3.  * @date 
  4.  */ 
  5. public class ThreadContext { 
  6.     /** 
  7.      * 記錄當(dāng)前使用過的選擇器 
  8.      */ 
  9.     public static final Set<MyNioEventLoop> RUN_SELECT = new HashSet<>(); 

構(gòu)建一個(gè)新連接接入選擇器

  1. /** 
  2.  * 連接器 
  3.  * 
  4.  * @author huangfu 
  5.  * @date 2021年3月12日10:15:37 
  6.  */ 
  7. public class Acceptor implements Runnable { 
  8.     private final ServerSocketChannel serverSocketChannel; 
  9.     private final SelectorGroup selectorGroup; 
  10.  
  11.     public Acceptor(ServerSocketChannel serverSocketChannel, SelectorGroup selectorGroup) { 
  12.         this.serverSocketChannel = serverSocketChannel; 
  13.         this.selectorGroup = selectorGroup; 
  14.     } 
  15.  
  16.  
  17.     @Override 
  18.     public void run() { 
  19.         try { 
  20.             SocketChannel socketChannel = serverSocketChannel.accept(); 
  21.             MyNioEventLoop next = selectorGroup.next(); 
  22.  
  23.             //向隊(duì)列追加一個(gè)注冊(cè)任務(wù) 
  24.             next.getLinkedBlockingQueue().offer(() -> { 
  25.                 try { 
  26.                     //客戶端注冊(cè)為非阻塞 
  27.                     socketChannel.configureBlocking(false); 
  28.                     //注冊(cè)到選擇器 關(guān)注一個(gè)讀事件 
  29.                     socketChannel.register(next.getSelector(), SelectionKey.OP_READ); 
  30.                 } catch (Exception e) { 
  31.                     e.printStackTrace(); 
  32.                 } 
  33.             }); 
  34.             //喚醒對(duì)應(yīng)的任務(wù),讓其處理異步任務(wù) 
  35.             next.getSelector().wakeup(); 
  36.  
  37.  
  38.             System.out.println("檢測(cè)到連接:" + socketChannel.getRemoteAddress()); 
  39.             //當(dāng)當(dāng)前選擇器已經(jīng)被使用過了  就不再使用了,直接注冊(cè)就行了 
  40.             if (ThreadContext.RUN_SELECT.add(next)) { 
  41.                 //啟動(dòng)任務(wù) 
  42.                 new Thread(next).start(); 
  43.             } 
  44.  
  45.  
  46.         } catch (IOException e) { 
  47.             e.printStackTrace(); 
  48.         } 
  49.     } 

創(chuàng)建啟動(dòng)器

  1. /** 
  2.  * @author huangfu 
  3.  * @date 
  4.  */ 
  5. public class TestMain { 
  6.  
  7.     public static void main(String[] args) throws IOException { 
  8.         //創(chuàng)建一個(gè)選擇器組   傳遞選擇器組的大小 決定使用多少選擇器來實(shí)現(xiàn) 
  9.         SelectorGroup selectorGroup = new SelectorGroup(2); 
  10.         //開啟一個(gè)服務(wù)端管道 
  11.         ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); 
  12.         //開啟一個(gè)服務(wù)端專用的選擇器 
  13.         Selector selector = Selector.open(); 
  14.         //設(shè)置非阻塞 
  15.         serverSocketChannel.configureBlocking(false); 
  16.         //創(chuàng)建一個(gè)連接器 
  17.         Acceptor acceptor = new Acceptor(serverSocketChannel, selectorGroup); 
  18.         //將服務(wù)端通道注冊(cè)到服務(wù)端選擇器上  這里會(huì)綁定一個(gè)新連接接入器 
  19.         serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, acceptor); 
  20.         //綁定端口 
  21.         serverSocketChannel.bind(new InetSocketAddress(8989)); 
  22.         //啟動(dòng)處理器 
  23.         new Reactor(selector).run(); 
  24.     } 

總結(jié)

單線程下的NIO存在性能瓶頸,當(dāng)某一計(jì)算過程緩慢的時(shí)候會(huì)阻塞住整個(gè)線程,導(dǎo)致影響其他事件的處理!

為了解決這一缺陷,我們提出了使用異步線程的方式去操作任務(wù),將耗時(shí)較長(zhǎng)的業(yè)務(wù),封裝為一個(gè)異步任務(wù),提交到線程池執(zhí)行!

 

為了使業(yè)務(wù)操作和新連接接入完全分離開,我們做了另外一重優(yōu)化,我們封裝了一個(gè)選擇器組,輪詢的方式獲取選擇器,每一個(gè)選擇器都能夠處理多個(gè)新連接, socket連接->selector選擇器 = 多 -> 1,在每一個(gè)選擇器里面又可以使用線程池來處理任務(wù),進(jìn)一步提高吞吐量!

 

責(zé)任編輯:武曉燕 來源: 源碼學(xué)徒
相關(guān)推薦

2021-03-05 07:38:52

C++線程編程開發(fā)技術(shù)

2023-06-05 07:56:10

線程分配處理器

2023-06-06 08:17:52

多線程編程Thread類

2023-06-13 13:39:00

多線程異步編程

2011-08-18 17:07:23

IOS開發(fā)多線程NSInvocatio

2009-08-17 16:56:51

C#多線程控制進(jìn)度條

2013-08-21 16:17:09

iPhone多線程

2011-12-15 11:03:21

JavaNIO

2023-06-08 08:21:08

多線程編程線程間通信

2018-04-20 14:11:27

多線程死鎖樂觀鎖

2016-10-09 20:15:30

多線程多進(jìn)程

2023-11-03 07:50:01

2011-12-08 13:04:06

JavaNIO

2010-01-28 09:55:05

性能優(yōu)化

2009-08-17 14:08:33

C#進(jìn)度條使用

2021-03-26 05:54:00

C#數(shù)據(jù)方法

2021-06-10 00:13:43

C#隊(duì)列數(shù)據(jù)

2021-02-26 20:55:56

JavaNIO隨機(jī)

2024-12-27 08:11:44

Python編程模式IO

2022-02-14 15:07:48

進(jìn)程FileChanne線程
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)