自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Netty源碼之Reactor模式

開發(fā) 前端
每個(gè)新連接創(chuàng)建一個(gè)線程來(lái)處理。對(duì)于長(zhǎng)連接服務(wù),如果一個(gè)client和server保持一個(gè)連接的話,有多少個(gè)client接入,server就需要?jiǎng)?chuàng)建同等的線程來(lái)處理。線程上下文切換,數(shù)據(jù)同步和內(nèi)存消耗,對(duì)server來(lái)說(shuō),將是非常大的開銷。

 [[410505]]

學(xué)習(xí)目標(biāo)

  • 什么是Reactor模式?
  • Reactor模式由什么組成的?
  • Reactor模式解決什么問(wèn)題?
  • Reactor模式線程模型有哪些?演進(jìn)過(guò)程?

web處理請(qǐng)求架構(gòu)

大多數(shù)web請(qǐng)求處理流程可以抽象成這幾個(gè)步驟:讀取(read),解碼(decode),處理(process),編碼(encode),發(fā)送(send),如下圖所示:

 

同時(shí),處理web請(qǐng)求通常有兩種架構(gòu):傳統(tǒng)基于線程架構(gòu)和事件驅(qū)動(dòng)架構(gòu)。

傳統(tǒng)基于線程架構(gòu)

概念

每個(gè)新連接創(chuàng)建一個(gè)線程來(lái)處理。對(duì)于長(zhǎng)連接服務(wù),如果一個(gè)client和server保持一個(gè)連接的話,有多少個(gè)client接入,server就需要?jiǎng)?chuàng)建同等的線程來(lái)處理。線程上下文切換,數(shù)據(jù)同步和內(nèi)存消耗,對(duì)server來(lái)說(shuō),將是非常大的開銷。

代碼實(shí)現(xiàn)

傳統(tǒng)基于線程架構(gòu)通常采用BIO的方式來(lái)實(shí)現(xiàn),代碼如下:

  1. public class Server implements Runnable { 
  2.  
  3.     int port; 
  4.  
  5.     public Server(int port) { 
  6.         this.port = port; 
  7.     } 
  8.  
  9.     @Override 
  10.     public void run() { 
  11.         try { 
  12.             ServerSocket serverSocket = new ServerSocket(port); 
  13.             while (true){ 
  14.                 System.out.println("等待新連接..."); 
  15.                 new Thread(new Handler(serverSocket.accept())).start(); 
  16.             } 
  17.         } catch (IOException e) { 
  18.             e.printStackTrace(); 
  19.         } 
  20.     } 
  21.  
  22.     static class Handler implements Runnable{ 
  23.  
  24.         private Socket socket; 
  25.  
  26.         public Handler(Socket socket){ 
  27.             this.socket = socket; 
  28.         } 
  29.  
  30.         @Override 
  31.         public void run() { 
  32.             try { 
  33.                 byte[] input = new byte[1024]; 
  34.  
  35.                 this.socket.getInputStream().read(input); 
  36.                 byte[] output = process(input); 
  37.                 this.socket.getOutputStream().write(output); 
  38.                 this.socket.getOutputStream().flush(); 
  39.                 this.socket.close(); 
  40.                 System.out.println("響應(yīng)完成!"); 
  41.             } catch (IOException e) { 
  42.                 e.printStackTrace(); 
  43.             } 
  44.         } 
  45.  
  46.         private byte[] process(byte[] input) { 
  47.             System.out.println("讀取內(nèi)容:" + new String(input)); 
  48.             return input; 
  49.         } 
  50.     } 
  51.  
  52.     public static void main(String[] args) throws InterruptedException { 
  53.         Thread thread = new Thread(new Server(2021)); 
  54.         thread.setDaemon(true); 
  55.         thread.start(); 
  56.  
  57.         synchronized (Server.class) { 
  58.             Server.class.wait(); 
  59.         } 
  60.     } 

為了避免線程創(chuàng)建銷毀的開銷,我們通常會(huì)采用線程池,但是同樣也有很大的弊端:

  • 同步阻塞IO,讀寫阻塞,線程等待時(shí)間過(guò)長(zhǎng)
  • 在制定線程策略的時(shí)候,只能根據(jù)CPU的數(shù)目來(lái)限定可用線程資源,不能根據(jù)連接并發(fā)數(shù)目來(lái)制定,也就是連接有限制。否則很難保證對(duì)客戶端請(qǐng)求的高效和公平。
  • 多線程之間的上下文切換,造成線程使用效率并不高,并且不易擴(kuò)展
  • 狀態(tài)數(shù)據(jù)以及其他需要保持一致的數(shù)據(jù),需要采用并發(fā)同步控制

應(yīng)用場(chǎng)景

既然傳統(tǒng)基于線程架構(gòu)弊端這么多,它存在還有什么價(jià)值?它的應(yīng)用場(chǎng)景是什么?

傳統(tǒng)基于線程架構(gòu)適用于連接數(shù)目比較小且一次傳輸大量數(shù)據(jù)的場(chǎng)景,比如上傳,下載。

事件驅(qū)動(dòng)架構(gòu)

事件驅(qū)動(dòng)架構(gòu):可以把線程和連接解耦,線程只用于執(zhí)行事件注冊(cè)的回調(diào)函數(shù)。事件驅(qū)動(dòng)架構(gòu)由事件生產(chǎn)者和事件消費(fèi)者組成,前者是事件的來(lái)源,它只負(fù)責(zé)監(jiān)聽哪些事件發(fā)生;后者是直接處理事件或者事件發(fā)生時(shí),響應(yīng)事件的實(shí)體。

Reactor模式

什么是Reactor模式?

Reactor模式是事件驅(qū)動(dòng)架構(gòu)的一種具體實(shí)現(xiàn)方法,簡(jiǎn)而言之,就是一個(gè)單線程進(jìn)行循環(huán)監(jiān)聽就緒IO事件,并將就緒IO事件分發(fā)給對(duì)應(yīng)的回調(diào)函數(shù)。

Reactor模式由什么組成的?

Reactor模式分為兩個(gè)重要組成部分,Reactor和Handler。 Reactor(反應(yīng)器):循環(huán)監(jiān)聽就緒IO事件,并分發(fā)給回調(diào)函數(shù)。 Handler(回調(diào)函數(shù)):執(zhí)行對(duì)應(yīng)IO事件的實(shí)際業(yè)務(wù)邏輯。

Reactor模式解決什么問(wèn)題?

反應(yīng)器模式可以實(shí)現(xiàn)同步的多路復(fù)用,同步是指按照事件到達(dá)的順序分發(fā)處理。反應(yīng)器 接收來(lái)自不同的客戶端的消息、請(qǐng)求和連接,盡管客戶端是并發(fā)的,但是反應(yīng)器可以按照事件到達(dá)的順序觸發(fā)回調(diào)函數(shù)。因此,Reactor模式將連接和線程解耦,不需要為每個(gè)連接創(chuàng)建單獨(dú)線程。這個(gè)問(wèn)題和C10K問(wèn)題相同,提供了一個(gè)解決思路。

Reactor模式下的三種模型

單線程模型:IO事件輪詢,分發(fā)(accept)和IO事件執(zhí)行(read,decode,compute,encode,send)都在一個(gè)線程中完成,如下圖所示:

在單線程模型下,不僅IO操作在Reactor線程上,而非IO操作(handlder中process()方法)也在Reactor線程上執(zhí)行了,當(dāng)非IO操作執(zhí)行慢的話,這會(huì)大大延遲IO請(qǐng)求響應(yīng),所以應(yīng)該把非IO操作拆出來(lái),來(lái)加速Reactor線程對(duì)IO請(qǐng)求響應(yīng),就出現(xiàn)多線程模型。

單線程模型實(shí)現(xiàn):

  1. // reactor 
  2. public class Reactor implements Runnable { 
  3.  
  4.     int port; 
  5.     Selector selector; 
  6.     ServerSocketChannel serverSocket; 
  7.  
  8.     public Reactor(int port) throws IOException { 
  9.         this.port = port; 
  10.         // 創(chuàng)建serverSocket對(duì)象 
  11.         serverSocket = ServerSocketChannel.open(); 
  12.         // 綁定端口 
  13.         serverSocket.socket().bind(new InetSocketAddress(port)); 
  14.         // 配置非阻塞 
  15.         serverSocket.configureBlocking(false); 
  16.         // 創(chuàng)建selector對(duì)象 
  17.         selector = Selector.open(); 
  18.         // serversocket注冊(cè)到selector上,幫忙監(jiān)聽accpet事件 
  19.         serverSocket.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(serverSocket,selector)); 
  20.         /** 還可以使用 SPI provider,來(lái)創(chuàng)建selector和serversocket對(duì)象 
  21.         SelectorProvider p = SelectorProvider.provider(); 
  22.         selector = p.openSelector(); 
  23.         serverSocket = p.openServerSocketChannel(); 
  24.         */ 
  25.     } 
  26.  
  27.     @Override 
  28.     public void run() { 
  29.         try { 
  30.             while (!Thread.interrupted()) { 
  31.                 System.out.println("start select event..."); 
  32.                 selector.select(); 
  33.                 Set selectedKeys = selector.selectedKeys(); 
  34.                 Iterator it = selectedKeys.iterator(); 
  35.                 while (it.hasNext()) { 
  36.                     dispatch((SelectionKey)it.next()); 
  37.                 } 
  38.                 selectedKeys.clear(); 
  39.             } 
  40.         } catch (IOException e) { 
  41.             e.printStackTrace(); 
  42.         } 
  43.     } 
  44.  
  45.     private void dispatch(SelectionKey key) { 
  46.         Runnable r = (Runnable) key.attachment(); 
  47.         if (r != null) { 
  48.             r.run(); 
  49.         } 
  50.     } 
  51.  
  52.  
  53.     public static void main(String[] args) throws IOException, InterruptedException { 
  54.         Thread thread = new Thread(new Reactor(2021)); 
  55.         thread.start(); 
  56.         synchronized (Reactor.class) { 
  57.             Reactor.class.wait(); 
  58.         } 
  59.     } 
  60. // acceptor調(diào)度器 
  61. public class Acceptor implements Runnable { 
  62.  
  63.     ServerSocketChannel serverSocket; 
  64.     Selector selector; 
  65.  
  66.     public Acceptor(ServerSocketChannel serverSocket,Selector selector) { 
  67.         this.serverSocket = serverSocket; 
  68.         this.selector = selector; 
  69.     } 
  70.  
  71.     @Override 
  72.     public void run() { 
  73.         try { 
  74.             SocketChannel socket = this.serverSocket.accept(); 
  75.             if (socket != null) { 
  76.                 new Handler(selector,socket); 
  77.             } 
  78.  
  79.         } catch (IOException e) { 
  80.             e.printStackTrace(); 
  81.         } 
  82.     } 
  83. // 回調(diào)函數(shù)handler 
  84. public class Handler implements Runnable { 
  85.  
  86.     Selector selector; 
  87.     SocketChannel socket; 
  88.     SelectionKey sk; 
  89.  
  90.     ByteBuffer input = ByteBuffer.allocate(1024); 
  91.     ByteBuffer output = ByteBuffer.allocate(1024); 
  92.     static final int READING = 0, SENDING = 1; 
  93.     int state = READING; 
  94.  
  95.  
  96.     public Handler(Selector selector, SocketChannel socket) throws IOException { 
  97.         this.selector = selector; 
  98.         this.socket = socket; 
  99.  
  100.         this.socket.configureBlocking(false); 
  101.         sk = this.socket.register(selector,0); 
  102.         sk.attach(this); 
  103.         sk.interestOps(SelectionKey.OP_READ); 
  104.         selector.wakeup(); 
  105.     } 
  106.  
  107.     @Override 
  108.     public void run() { 
  109.         try{ 
  110.             if (state == READING) { 
  111.                 read(); 
  112.             } else if (state == SENDING) { 
  113.                 send(); 
  114.             } 
  115.         } catch (IOException ex) { 
  116.             ex.printStackTrace(); 
  117.         } 
  118.     } 
  119.  
  120.     private void read() throws IOException { 
  121.         socket.read(input); 
  122.         if (inputIsComplete()) { 
  123.             // 執(zhí)行業(yè)務(wù)邏輯代碼 
  124.             process(); 
  125.             state = SENDING; 
  126.             // Normally also do first write now 
  127.             sk.interestOps(SelectionKey.OP_WRITE); 
  128.         } 
  129.     } 
  130.  
  131.     private void send() throws IOException { 
  132.         socket.write(output); 
  133.         socket.close(); 
  134.         if (outputIsComplete()) sk.cancel(); 
  135.     } 
  136.  
  137.     boolean inputIsComplete() { return true;} 
  138.  
  139.     boolean outputIsComplete() {return true;} 
  140.     // 處理非IO操作(業(yè)務(wù)邏輯代碼) 
  141.     void process(){ 
  142.         String msg = new String(input.array()); 
  143.         System.out.println("讀取內(nèi)容:" + msg); 
  144.         output.put(msg.getBytes()); 
  145.         output.flip(); 
  146.     } 
  • 多線程模型:與單線程模型不同的是添加一個(gè)業(yè)務(wù)線程池,將非IO操作(業(yè)務(wù)邏輯處理)交給業(yè)務(wù)線程池來(lái)處理,提高Reactor線程的IO響應(yīng),如圖所示:

 

在多線程模型下,雖然將非IO操作拆出去了,但是所有IO操作都在Reactor單線程中完成的。在高負(fù)載、高并發(fā)場(chǎng)景下,也會(huì)成為瓶頸,于是對(duì)Reactor單線程進(jìn)行了優(yōu)化,出現(xiàn)了主從線程模型。

多線程模型實(shí)現(xiàn):

  1. public class Reactor implements Runnable { 
  2.  
  3.     int port; 
  4.     Selector selector; 
  5.     ServerSocketChannel serverSocket; 
  6.  
  7.  
  8.     public Reactor(int port) throws IOException { 
  9.         this.port = port; 
  10.  
  11.         // 創(chuàng)建serverSocket對(duì)象 
  12.         serverSocket = ServerSocketChannel.open(); 
  13.         // 綁定端口 
  14.         serverSocket.socket().bind(new InetSocketAddress(port)); 
  15.         // 配置非阻塞 
  16.         serverSocket.configureBlocking(false); 
  17.  
  18.         // 創(chuàng)建selector對(duì)象 
  19.         selector = Selector.open(); 
  20.         // serversocket注冊(cè)到selector上,幫忙監(jiān)聽accpet事件 
  21.         serverSocket.register(selector, SelectionKey.OP_ACCEPT, new Acceptor("Acceptor",serverSocket,selector)); 
  22.  
  23.         /** 還可以使用 SPI provider,來(lái)創(chuàng)建selector和serversocket對(duì)象 
  24.         SelectorProvider p = SelectorProvider.provider(); 
  25.         selector = p.openSelector(); 
  26.         serverSocket = p.openServerSocketChannel(); 
  27.         */ 
  28.     } 
  29.  
  30.     @Override 
  31.     public void run() { 
  32.         try { 
  33.             while (!Thread.interrupted()) { 
  34.                 System.out.println("start select event..."); 
  35.                 selector.select(); 
  36.                 Set selectedKeys = selector.selectedKeys(); 
  37.                 Iterator it = selectedKeys.iterator(); 
  38.                 while (it.hasNext()) { 
  39.                     dispatch((SelectionKey)it.next()); 
  40.                 } 
  41.                 selectedKeys.clear(); 
  42.             } 
  43.         } catch (IOException e) { 
  44.             e.printStackTrace(); 
  45.         } 
  46.     } 
  47.  
  48.     private void dispatch(SelectionKey key) { 
  49.         SelfRunable r = (SelfRunable) key.attachment(); 
  50.         if (r != null) { 
  51.             System.out.println("dispatch to " + r.getName() + "===="); 
  52.             r.run(); 
  53.         } 
  54.     } 
  55.  
  56.  
  57.     public static void main(String[] args) throws IOException, InterruptedException { 
  58.  
  59.         Thread thread = new Thread(new Reactor(2021)); 
  60.         thread.start(); 
  61.  
  62.         synchronized (Reactor.class) { 
  63.             Reactor.class.wait(); 
  64.         } 
  65.  
  66.  
  67.     } 
  68.  
  69. public class Acceptor implements SelfRunable { 
  70.     ServerSocketChannel serverSocket; 
  71.     Selector selector; 
  72.     String name
  73.     public Acceptor(String name, ServerSocketChannel serverSocket,Selector selector) { 
  74.         this.name = name
  75.         this.serverSocket = serverSocket; 
  76.         this.selector = selector; 
  77.     } 
  78.  
  79.     @Override 
  80.     public void run() { 
  81.         try { 
  82.             SocketChannel socket = this.serverSocket.accept(); 
  83.             if (socket != null) { 
  84.                 new Handler("handler_" + ((InetSocketAddress)socket.getLocalAddress()).getPort(), selector,socket); 
  85.             } 
  86.  
  87.         } catch (IOException e) { 
  88.             e.printStackTrace(); 
  89.         } 
  90.     } 
  91.  
  92.     @Override 
  93.     public String getName() { 
  94.         return this.name
  95.     } 
  96.  
  97. public class Handler implements SelfRunable { 
  98.     String name
  99.     Selector selector; 
  100.     SocketChannel socket; 
  101.     SelectionKey sk; 
  102.     ByteBuffer input = ByteBuffer.allocate(1024); 
  103.     ByteBuffer output = ByteBuffer.allocate(1024); 
  104.     static final int READING = 0, SENDING = 1,  PROCESSING = 3; 
  105.     volatile int state = READING; 
  106.     static ExecutorService poolExecutor = Executors.newFixedThreadPool(5); 
  107.  
  108.     public Handler(String name, Selector selector, SocketChannel socket) throws IOException { 
  109.         this.selector = selector; 
  110.         this.socket = socket; 
  111.         this.name = name
  112.  
  113.         this.socket.configureBlocking(false); 
  114.         sk = this.socket.register(selector,0); 
  115.         sk.attach(this); 
  116.         sk.interestOps(SelectionKey.OP_READ); 
  117.         selector.wakeup(); 
  118.     } 
  119.  
  120.     @Override 
  121.     public void run() { 
  122.         try{ 
  123.             System.out.println("state:" + state); 
  124.             if (state == READING) { 
  125.                 read(); 
  126.             } else if (state == SENDING) { 
  127.                 send(); 
  128.             } 
  129.         } catch (IOException ex) { 
  130.             ex.printStackTrace(); 
  131.         } 
  132.     } 
  133.  
  134.     synchronized void read() throws IOException { 
  135.         socket.read(input); 
  136.         if (inputIsComplete()) { 
  137.             state = PROCESSING; 
  138.            poolExecutor.execute(new Processer()); 
  139.         } 
  140.     } 
  141.  
  142.     synchronized void processAndHandOff() { 
  143.         System.out.println("processAndHandOff========="); 
  144.         process(); 
  145.         state = SENDING; // or rebind attachment 
  146.         sk.interestOps(SelectionKey.OP_WRITE); 
  147.         selector.wakeup(); 
  148.         System.out.println("processAndHandOff finish ! ========="); 
  149.     } 
  150.  
  151.     private void send() throws IOException { 
  152.         System.out.println("start send ..."); 
  153.         socket.write(output); 
  154.         socket.close(); 
  155.         System.out.println("start send finish!"); 
  156.         if (outputIsComplete()) sk.cancel(); 
  157.     } 
  158.  
  159.     boolean inputIsComplete() { return true;} 
  160.  
  161.     boolean outputIsComplete() {return true;} 
  162.  
  163.     void process(){ 
  164.         String msg = new String(input.array()); 
  165.         System.out.println("讀取內(nèi)容:" + msg); 
  166.         output.put(msg.getBytes()); 
  167.         output.flip(); 
  168.     } 
  169.  
  170.     @Override 
  171.     public String getName() { 
  172.         return this.name
  173.     } 
  174.  
  175.     class Processer implements Runnable { 
  176.         public void run() { processAndHandOff(); } 
  177.     } 
  • 主從線程模型: 相比多線程模型而言,對(duì)于多核cpu,為了充分利用資源,將Reactor拆分成了mainReactor 和 subReactor,但是,主從線程模型也有弊端,不適合大量數(shù)據(jù)傳輸。 mainReactor:負(fù)責(zé)監(jiān)聽接收(accpet)新連接,將新連接后續(xù)操作交給subReactor來(lái)處理,通常由一個(gè)線程處理。 subReactor: 負(fù)責(zé)處理IO的讀寫操作,通常由多個(gè)線程處理。 非IO操作依然由業(yè)務(wù)線程池來(lái)處理。

主從線程模型實(shí)現(xiàn):

  1. public class Reactor implements Runnable { 
  2.  
  3.     int port; 
  4.     Selector selector; 
  5.     ServerSocketChannel serverSocket; 
  6.     int SUBREACTOR_SIZE = 1; 
  7.     SubReactor[] subReactorPool = new SubReactor[SUBREACTOR_SIZE]; 
  8.  
  9.  
  10.     public Reactor(int port) throws IOException { 
  11.         this.port = port; 
  12.  
  13.         // 創(chuàng)建serverSocket對(duì)象 
  14.         serverSocket = ServerSocketChannel.open(); 
  15.         // 綁定端口 
  16.         serverSocket.socket().bind(new InetSocketAddress(port)); 
  17.         // 配置非阻塞 
  18.         serverSocket.configureBlocking(false); 
  19.  
  20.         // 創(chuàng)建selector對(duì)象 
  21.         selector = Selector.open(); 
  22.         // serversocket注冊(cè)到selector上,幫忙監(jiān)聽accpet事件 
  23.         serverSocket.register(selector, SelectionKey.OP_ACCEPT, new Acceptor("Acceptor",serverSocket,subReactorPool)); 
  24.  
  25.         // 初始化subreactor pool 
  26.         initSubReactorPool(); 
  27.  
  28.  
  29.         /** 還可以使用 SPI provider,來(lái)創(chuàng)建selector和serversocket對(duì)象 
  30.         SelectorProvider p = SelectorProvider.provider(); 
  31.         selector = p.openSelector(); 
  32.         serverSocket = p.openServerSocketChannel(); 
  33.         */ 
  34.     } 
  35.  
  36.     @Override 
  37.     public void run() { 
  38.         try { 
  39.             while (!Thread.interrupted()) { 
  40.                 System.out.println("mainReactor start select event..."); 
  41.                 selector.select(); 
  42.                 Set selectedKeys = selector.selectedKeys(); 
  43.                 Iterator it = selectedKeys.iterator(); 
  44.                 while (it.hasNext()) { 
  45.                     dispatch((SelectionKey)it.next()); 
  46.                 } 
  47.                 selectedKeys.clear(); 
  48.             } 
  49.         } catch (IOException e) { 
  50.             e.printStackTrace(); 
  51.         } 
  52.     } 
  53.  
  54.     void initSubReactorPool() { 
  55.         try { 
  56.             for (int i = 0; i < SUBREACTOR_SIZE; i++) { 
  57.                 subReactorPool[i] = new SubReactor("SubReactor" + i); 
  58.             } 
  59.         } catch (IOException ex) { /* ... */ } 
  60.     } 
  61.  
  62.     private void dispatch(SelectionKey key) { 
  63.         SelfRunable r = (SelfRunable) key.attachment(); 
  64.         if (r != null) { 
  65.             System.out.println("mainReactor dispatch to " + r.getName() + "===="); 
  66.             r.run(); 
  67.         } 
  68.     } 
  69.  
  70.  
  71.     public static void main(String[] args) throws IOException, InterruptedException { 
  72.  
  73.         Thread thread = new Thread(new Reactor(2021)); 
  74.         thread.start(); 
  75.  
  76.         synchronized (Reactor.class) { 
  77.             Reactor.class.wait(); 
  78.         } 
  79.     } 
  80.  
  81. public class SubReactor implements SelfRunable { 
  82.  
  83.     private Selector selector; 
  84.     private String name
  85.     private List<SelfRunable> task = new ArrayList<SelfRunable>(); 
  86.  
  87.     public SubReactor(String name) throws IOException { 
  88.         this.name = name
  89.         selector = Selector.open(); 
  90.         new Thread(this).start(); 
  91.     } 
  92.  
  93.     @Override 
  94.     public String getName() { 
  95.         return this.name
  96.     } 
  97.  
  98.     @Override 
  99.     public void run() { 
  100.         try { 
  101.             while (!Thread.interrupted()) { 
  102.                 System.out.println("subReactor start select event..."); 
  103.                 selector.select(5000); 
  104.                 Set selectedKeys = selector.selectedKeys(); 
  105.                 Iterator it = selectedKeys.iterator(); 
  106.                 while (it.hasNext()) { 
  107.                     dispatch((SelectionKey)it.next()); 
  108.                 } 
  109.                 selectedKeys.clear(); 
  110.  
  111.             } 
  112.         } catch (IOException e) { 
  113.             e.printStackTrace(); 
  114.         } 
  115.     } 
  116.  
  117.     private void dispatch(SelectionKey key) { 
  118.         SelfRunable r = (SelfRunable) key.attachment(); 
  119.         if (r != null) { 
  120.             System.out.println("subReactor dispatch to " + r.getName() + "===="); 
  121.             r.run(); 
  122.         } 
  123.     } 
  124.  
  125.     public Selector getSelector(){ 
  126.         return this.selector; 
  127.     } 
  128.  
  129.     public void submit(SelfRunable runnable) { 
  130.         task.add(runnable); 
  131.     } 
  132.  
  133.  
  134. public class Acceptor implements SelfRunable { 
  135.  
  136.     int next = 0; 
  137.     String name
  138.     SubReactor[] subReactorPool; 
  139.     ServerSocketChannel serverSocket; 
  140.  
  141.     public Acceptor(String name, ServerSocketChannel serverSocket,SubReactor[] subReactorPool) { 
  142.         this.name = name
  143.         this.serverSocket = serverSocket; 
  144.         this.subReactorPool = subReactorPool; 
  145.     } 
  146.  
  147.     @Override 
  148.     public void run() { 
  149.         try { 
  150.             SocketChannel socket = this.serverSocket.accept(); 
  151.             if (socket != null) { 
  152.                 new Handler("handler", subReactorPool[next].getSelector(),socket); 
  153.             } 
  154.             if (++next == subReactorPool.length) {next=0;} 
  155.  
  156.         } catch (IOException e) { 
  157.             e.printStackTrace(); 
  158.         } 
  159.     } 
  160.  
  161.     @Override 
  162.     public String getName() { 
  163.         return this.name
  164.     } 
  165.  
  166. public class Handler implements SelfRunable { 
  167.  
  168.     String name
  169.     Selector selector; 
  170.     SocketChannel socket; 
  171.     SelectionKey sk; 
  172.  
  173.     ByteBuffer input = ByteBuffer.allocate(1024); 
  174.     ByteBuffer output = ByteBuffer.allocate(1024); 
  175.     static final int READING = 0, SENDING = 1,  PROCESSING = 3; 
  176.     volatile int state = READING; 
  177.  
  178.     static ExecutorService poolExecutor = Executors.newFixedThreadPool(5); 
  179.  
  180.     public Handler(String name, Selector selector, SocketChannel socket) throws IOException { 
  181.         this.selector = selector; 
  182.         this.socket = socket; 
  183.         this.name = name
  184.  
  185.         this.socket.configureBlocking(false); 
  186.         sk = this.socket.register(this.selector,0); 
  187.         sk.attach(this); 
  188.         sk.interestOps(SelectionKey.OP_READ); 
  189.         selector.wakeup(); 
  190.     } 
  191.  
  192.     @Override 
  193.     public void run() { 
  194.         try{ 
  195.             System.out.println("state:" + state); 
  196.             if (state == READING) { 
  197.                 read(); 
  198.             } else if (state == SENDING) { 
  199.                 send(); 
  200.             } 
  201.         } catch (IOException ex) { 
  202.             ex.printStackTrace(); 
  203.         } 
  204.     } 
  205.  
  206.     synchronized void read() throws IOException { 
  207.         socket.read(input); 
  208.         if (inputIsComplete()) { 
  209.             state = PROCESSING; 
  210.            poolExecutor.execute(new Processer()); 
  211.         } 
  212.     } 
  213.  
  214.     synchronized void processAndHandOff() { 
  215.         System.out.println("processAndHandOff========="); 
  216.         process(); 
  217.         state = SENDING; // or rebind attachment 
  218.         sk.interestOps(SelectionKey.OP_WRITE); 
  219.         selector.wakeup(); 
  220.         System.out.println("processAndHandOff finish ! ========="); 
  221.     } 
  222.  
  223.     private void send() throws IOException { 
  224.         System.out.println("start send ..."); 
  225.         socket.write(output); 
  226.         socket.close(); 
  227.         System.out.println("start send finish!"); 
  228.         if (outputIsComplete()) sk.cancel(); 
  229.     } 
  230.  
  231.     boolean inputIsComplete() { return true;} 
  232.  
  233.     boolean outputIsComplete() {return true;} 
  234.  
  235.     void process(){ 
  236.         String msg = new String(input.array()); 
  237.         System.out.println("讀取內(nèi)容:" + msg); 
  238.         output.put(msg.getBytes()); 
  239.         output.flip(); 
  240.     } 
  241.  
  242.     @Override 
  243.     public String getName() { 
  244.         return this.name
  245.     } 
  246.  
  247.     class Processer implements Runnable { 
  248.         public void run() { processAndHandOff(); } 
  249.     } 

Reactor線程模型演進(jìn)

模型

簡(jiǎn)介

弊端

單線程模型

IO/非IO操作都在Reactor單線程中完成

非IO操作執(zhí)行慢,影響IO操作響應(yīng)延遲

多線程模型

拆分非IO操作交給業(yè)務(wù)線程池執(zhí)行,IO操作由Reator單線程執(zhí)行

高并發(fā),高負(fù)載場(chǎng)景下,Reactor單線程會(huì)成為瓶頸

主從線程模型

Reactor單線程拆分為mainReactor和subReactor

不適合大量數(shù)據(jù)傳輸

Netty線程模型

Reactor主從線程模型-抽象模型

  • 創(chuàng)建ServerSocketChannel過(guò)程(創(chuàng)建channel,配置非阻塞)
  • ServerSocketChannel注冊(cè)到mainReactor的selector對(duì)象上,監(jiān)聽accept事件
  • mainReactor的selector監(jiān)聽到新連接SocketChannel,將SocketChannel注冊(cè)到subReactor的selector對(duì)象上,監(jiān)聽read/write事件
  • subReactor的selector監(jiān)聽到read/write事件,移交給業(yè)務(wù)線程池(對(duì)應(yīng)netty的pipeline)

Netty線程模型

我們?cè)俸煤每纯磎ainReactor和subReactor,其實(shí)這兩個(gè)類功能非常相似,所以Netty將mainReactor和subReactor統(tǒng)一成了EventLoop。對(duì)于Netty零基礎(chǔ)的,請(qǐng)參考這個(gè)Reactor主從線程模型-抽象模型和下面這張圖來(lái)理解EventLoop。

 

 

責(zé)任編輯:武曉燕 來(lái)源: 今日頭條
相關(guān)推薦

2022-03-04 08:10:35

NettyIO模型Reactor

2022-03-06 12:15:38

NettyReactor線程

2021-06-16 14:18:37

NettyReactor線程模型

2012-08-24 09:58:09

ReactorDSSC

2022-10-25 08:23:09

Reactor模式I/O

2022-03-10 07:58:12

ReactorNetty運(yùn)轉(zhuǎn)架構(gòu)

2022-09-29 15:39:10

服務(wù)器NettyReactor

2022-02-09 09:37:54

ReactorNettyI/O

2020-12-11 11:04:07

NettyIO

2024-11-22 08:00:00

Netty開發(fā)

2019-01-15 10:54:03

高性能ServerReactor

2022-05-24 15:46:51

Wi-FiSTA模式

2024-10-24 20:48:04

Netty線程Java

2020-08-21 07:23:50

工廠模式設(shè)計(jì)

2021-09-27 08:56:44

NettyChannelHand架構(gòu)

2021-04-26 17:38:40

ReactorProactor網(wǎng)絡(luò)

2015-03-31 18:26:43

陌陌社交

2021-06-09 08:53:34

設(shè)計(jì)模式策略模式工廠模式

2012-02-29 09:41:14

JavaScript

2015-09-08 13:39:10

JavaScript設(shè)計(jì)模式
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)