自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

深度長(zhǎng)文:從Bio到Nio到Aio,再到響應(yīng)式編程

系統(tǒng) Linux
要問計(jì)算機(jī)系統(tǒng)里,有哪些概念比較折騰人,nio絕對(duì)能算上一個(gè)。配合著多是異的網(wǎng)絡(luò)編程,nio加上多線程一般能夠完成雙殺。

[[417220]]

本文轉(zhuǎn)載自微信公眾號(hào)「小姐姐味道」,作者小姐姐養(yǎng)的狗 。轉(zhuǎn)載本文請(qǐng)聯(lián)系小姐姐味道公眾號(hào)。

要問計(jì)算機(jī)系統(tǒng)里,有哪些概念比較折騰人,nio絕對(duì)能算上一個(gè)。配合著多是異的網(wǎng)絡(luò)編程,nio加上多線程一般能夠完成雙殺。

Linux有5種常見的IO模型。其中,阻塞IO就是bio,IO復(fù)用就是nio,異步IO就是aio,我們本篇文章就聚焦于此。

  • 阻塞式IO (bio)
  • 非阻塞式IO
  • IO復(fù)用 (nio)
  • 信號(hào)驅(qū)動(dòng)式IO
  • 異步IO(aio)

在網(wǎng)絡(luò)編程中,Reactor模型是必須要了解的?,F(xiàn)在,大多數(shù)與IO相關(guān)的組件,都會(huì)使用Reactor模型,比如Tomcat、Redis、Nginx等,可見Reactor應(yīng)用的廣泛性。

Reactor是NIO的基礎(chǔ)。為什么NIO的性能就能夠比傳統(tǒng)的阻塞IO性能高呢?我們首先來(lái)看一下傳統(tǒng)阻塞式IO的一些特點(diǎn)。

1.阻塞IO模型

 

如上圖,是典型的BIO模型,每當(dāng)有一個(gè)連接到來(lái),經(jīng)過協(xié)調(diào)器的處理,就開啟一個(gè)對(duì)應(yīng)的線程進(jìn)行接管。如果連接有1000條,那就需要1000個(gè)線程。線程資源是非常昂貴的,除了占用大量的內(nèi)存,還會(huì)占用非常多的CPU調(diào)度時(shí)間,所以BIO在連接非常多的情況下,效率會(huì)變得非常低。

下面的代碼是使用ServerSocket實(shí)現(xiàn)的一個(gè)簡(jiǎn)單socket服務(wù)器,監(jiān)聽在8888端口。

  1. public class BIO { 
  2.     static boolean stop = false
  3.     public static void main(String[] args) throws Exception { 
  4.         int connectionNum = 0; 
  5.         int port = 8888; 
  6.         ExecutorService service = Executors.newCachedThreadPool(); 
  7.         ServerSocket serverSocket = new ServerSocket(port); 
  8.         while (!stop) { 
  9.             if (10 == connectionNum) { 
  10.                 stop = true
  11.             } 
  12.             Socket socket = serverSocket.accept(); 
  13.             service.execute(() -> { 
  14.                 try { 
  15.                     Scanner scanner = new Scanner(socket.getInputStream()); 
  16.                     PrintStream printStream = new PrintStream(socket.getOutputStream()); 
  17.                     while (!stop) { 
  18.                         String s = scanner.next().trim(); 
  19.                         printStream.println("PONG:" + s); 
  20.                     } 
  21.                 } catch (Exception ex) { 
  22.                     ex.printStackTrace(); 
  23.                 } 
  24.             }); 
  25.             connectionNum++; 
  26.         } 
  27.         service.shutdown(); 
  28.         serverSocket.close(); 
  29.     } 

啟動(dòng)之后,使用nc命令進(jìn)行連接測(cè)試,結(jié)果如下。

  1. $ nc -v localhost 8888 
  2. Connection to localhost port 8888 [tcp/ddi-tcp-1] succeeded! 
  3. hello 
  4. PONG:hello 
  5. nice 
  6. PONG:nice 

可以看到,BIO的讀寫操作是阻塞的,線程的整個(gè)生命周期和連接的生命周期是一樣的,而且不能夠被復(fù)用。

就單個(gè)阻塞IO來(lái)說(shuō),它的效率并不比NIO慢。但是當(dāng)服務(wù)的連接增多,考慮到整個(gè)服務(wù)器的資源調(diào)度和資源利用率等因素,NIO就有了顯著的效果,NIO非常適合高并發(fā)場(chǎng)景。

2.非阻塞IO模型

其實(shí),在處理IO動(dòng)作時(shí),有大部分時(shí)間是在等待。比如,socket連接要花費(fèi)很長(zhǎng)時(shí)間進(jìn)行連接操作,在完成連接的這段時(shí)間內(nèi),它并沒有占用額外的系統(tǒng)資源,但它只能阻塞等待在線程中。這種情況下,系統(tǒng)資源并不能被合理的利用。

Java的NIO,在Linux上底層是使用epoll實(shí)現(xiàn)的。epoll是一個(gè)高性能的多路復(fù)用I/O工具,改進(jìn)了select和poll等工具的一些功能。在網(wǎng)絡(luò)編程中,對(duì)epoll概念的一些理解,幾乎是面試中必問的問題。

epoll的數(shù)據(jù)結(jié)構(gòu)是直接在內(nèi)核上進(jìn)行支持的。通過epoll_create和epoll_ctl等函數(shù)的操作,可以構(gòu)造描述符(fd)相關(guān)的事件組合(event)。

這里有兩個(gè)比較重要的概念:

  • fd 每條連接、每個(gè)文件,都對(duì)應(yīng)著一個(gè)描述符,比如端口號(hào)。內(nèi)核在定位到這些連接的時(shí)候,就是通過fd進(jìn)行尋址的
  • event 當(dāng)fd對(duì)應(yīng)的資源,有狀態(tài)或者數(shù)據(jù)變動(dòng),就會(huì)更新epoll_item結(jié)構(gòu)。在沒有事件變更的時(shí)候,epoll就阻塞等待,也不會(huì)占用系統(tǒng)資源;一旦有新的事件到來(lái),epoll就會(huì)被激活,將事件通知到應(yīng)用方

關(guān)于epoll還會(huì)有一個(gè)面試題:相對(duì)于select,epoll有哪些改進(jìn)?這里直接給出答案:

  • epoll不再需要像select一樣對(duì)fd集合進(jìn)行輪詢,也不需要在調(diào)用時(shí)將fd集合在用戶態(tài)和內(nèi)核態(tài)進(jìn)行交換
  • 應(yīng)用程序獲得就緒fd的事件復(fù)雜度,epoll時(shí)O(1),select是O(n)
  • select最大支持約1024個(gè)fd,epoll支持65535個(gè)
  • select使用輪詢模式檢測(cè)就緒事件,epoll采用通知方式,更加高效

我們還是以Java中的NIO代碼為例,來(lái)看一下NIO的具體概念。

  1. public class NIO { 
  2.     static boolean stop = false
  3.     public static void main(String[] args) throws Exception { 
  4.         int connectionNum = 0; 
  5.         int port = 8888; 
  6.         ExecutorService service = Executors.newCachedThreadPool(); 
  7.         ServerSocketChannel ssc = ServerSocketChannel.open(); 
  8.         ssc.configureBlocking(false); 
  9.         ssc.socket().bind(new InetSocketAddress("localhost", port)); 
  10.         Selector selector = Selector.open(); 
  11.         ssc.register(selector, ssc.validOps()); 
  12.         while (!stop) { 
  13.             if (10 == connectionNum) { 
  14.                 stop = true
  15.             } 
  16.             int num = selector.select(); 
  17.             if (num == 0) { 
  18.                 continue
  19.             } 
  20.             Iterator<SelectionKey> events = selector.selectedKeys().iterator(); 
  21.             while (events.hasNext()) { 
  22.                 SelectionKey event = events.next(); 
  23.  
  24.                 if (event.isAcceptable()) { 
  25.                     SocketChannel sc = ssc.accept(); 
  26.                     sc.configureBlocking(false); 
  27.                     sc.register(selector, SelectionKey.OP_READ); 
  28.                     connectionNum++; 
  29.                 } else if (event.isReadable()) { 
  30.                     try { 
  31.                         SocketChannel sc = (SocketChannel) event.channel(); 
  32.                         ByteBuffer buf = ByteBuffer.allocate(1024); 
  33.                         int size = sc.read(buf); 
  34.                         if(-1==size){ 
  35.                             sc.close(); 
  36.                         } 
  37.                         String result = new String(buf.array()).trim(); 
  38.                         ByteBuffer wrap = ByteBuffer.wrap(("PONG:" + result).getBytes()); 
  39.                         sc.write(wrap); 
  40.                     } catch (Exception ex) { 
  41.                         ex.printStackTrace(); 
  42.                     } 
  43.                 } else if (event.isWritable()) { 
  44.                     SocketChannel sc = (SocketChannel) event.channel(); 
  45.                 } 
  46.  
  47.                 events.remove(); 
  48.             } 
  49.         } 
  50.         service.shutdown(); 
  51.         ssc.close(); 
  52.     } 

上面這段代碼比較長(zhǎng),是使用NIO實(shí)現(xiàn)的和BIO相同的功能。從它的API設(shè)計(jì)上,我們就能夠看到epoll的一些影子。

首先,我們創(chuàng)建了一個(gè)服務(wù)端ssc,并開啟一個(gè)新的事件選擇器,監(jiān)聽它的OP_ACCEPT事件。

  1. ServerSocketChannel ssc = ServerSocketChannel.open(); 
  2. Selector selector = Selector.open(); 
  3. ssc.register(selector, ssc.validOps()); 

共有4種事件類型。分別是新連接事件(OP_ACCEPT)、連接就緒事件(OP_CONNECT)、讀就緒事件(OP_READ)、寫就緒事件(OP_WRITE)。任何網(wǎng)絡(luò)和文件操作,都可以抽象成這四個(gè)事件。

接下來(lái),在while循環(huán)里,使用select函數(shù),阻塞在主線程里。所謂阻塞,就是操作系統(tǒng)不再分配CPU事件片到當(dāng)前線程中,所以select函數(shù)是幾乎不占用任何系統(tǒng)資源的。

  1. int num = selector.select(); 

一旦有新的事件到達(dá),比如有新的連接到來(lái),主線程就能夠被調(diào)度到,程序就能夠向下執(zhí)行。這時(shí)候,就能夠根據(jù)訂閱的事件通知,持續(xù)獲取訂閱的事件。

由于注冊(cè)到selector的連接和事件可能會(huì)有多個(gè),所以這些事件也會(huì)有多個(gè)。我們使用安全的迭代器循環(huán)進(jìn)行處理,在處理完畢之后,將它刪除。

如果事件不刪除的話,或者漏掉了某個(gè)事件的處理,會(huì)怎么樣呢?后果還是比較嚴(yán)重的,由于事件總是存在,我們的程序會(huì)陷入無(wú)休無(wú)止的循環(huán)之中。

  1. Iterator<SelectionKey> events = selector.selectedKeys().iterator(); 
  2.     while (events.hasNext()) { 
  3.         SelectionKey event = events.next(); 
  4.         ... 
  5.         events.remove(); 
  6.     } 

有新的連接到達(dá)時(shí),我們訂閱了更多的事件。對(duì)于我們的數(shù)據(jù)讀取來(lái)說(shuō),對(duì)應(yīng)的事件就是OP_READ。和BIO編程面向流的方式不同,NIO操作的對(duì)象是抽象的概念Channel,通過緩沖區(qū)進(jìn)行數(shù)據(jù)交換。

  1. SocketChannel sc = ssc.accept(); 
  2. sc.configureBlocking(false); 
  3. sc.register(selector, SelectionKey.OP_READ); 

值得注意的是:服務(wù)端和客戶端的實(shí)現(xiàn)方式,可以是不同的。比如,服務(wù)端是NIO,客戶端可以是BIO,它們并沒有什么強(qiáng)制要求。

另外一個(gè)面試時(shí)候經(jīng)常問到的事件就是OP_WRITE。我們上面提到過,這個(gè)事件是表示寫就緒的,當(dāng)?shù)讓拥木彌_區(qū)有空閑,這個(gè)事件就會(huì)一直發(fā)生,浪費(fèi)占用CPU資源。所以,我們一般是不注冊(cè)O(shè)P_WRITE的。

這里還有一個(gè)細(xì)節(jié),在讀取數(shù)據(jù)的時(shí)候,并沒有像BIO的方式一樣使用循環(huán)來(lái)獲取數(shù)據(jù)。如下面的代碼,我們創(chuàng)建了一個(gè)1024字節(jié)的緩沖區(qū),用于數(shù)據(jù)的讀取。如果連接中的數(shù)據(jù),大于1024字節(jié)怎么辦?

  1. SocketChannel sc = (SocketChannel) event.channel(); 
  2. ByteBuffer buf = ByteBuffer.allocate(1024); 
  3. int size = sc.read(buf); 

這涉及到兩種事件的通知機(jī)制。

  • 水平觸發(fā) (level-triggered) 稱作LT模式。只要緩沖區(qū)有數(shù)據(jù),事件就會(huì)一直發(fā)生
  • 邊緣觸發(fā) (edge-triggered) 稱作ET模式。緩沖區(qū)有數(shù)據(jù),僅會(huì)觸發(fā)一次。事件想要再次觸發(fā),必須先將fd中的數(shù)據(jù)讀完才行

可以看到,Java的NIO采用的就是水平觸發(fā)的方式。LT模式頻繁環(huán)喚醒線程,效率相比較ET模式低,所以Netty使用JNI的方式,實(shí)現(xiàn)了ET模式,效率上更高一些。

3.Reactor模式

了解了BIO和NIO的一些使用方式,Reactor模式就呼之欲出了。

NIO是基于事件機(jī)制的,有一個(gè)叫做Selector的選擇器,阻塞獲取關(guān)注的事件列表。獲取到事件列表后,可以通過分發(fā)器,進(jìn)行真正的數(shù)據(jù)操作。

上圖是Doug Lea在講解NIO時(shí)候的一張圖,指明了最簡(jiǎn)單的Reactor模型的基本元素。你可以對(duì)比這上面的NIO代碼分析一下,里面有四個(gè)主要元素:

  • Acceptor 處理client的連接,并綁定具體的事件處理器
  • Event 具體發(fā)生的事件
  • Handler 執(zhí)行具體事件的處理者。比如處理讀寫事件
  • Reactor 將具體的事件分配給Handler

我們可以對(duì)上面的模型進(jìn)行近一步細(xì)化,下面這張圖同樣是Doug Lea的ppt中的。它把Reactor部分分為mainReactor和subReactor兩部分。mainReactor負(fù)責(zé)監(jiān)聽處理新的連接,然后將后續(xù)的事件處理交給subReactor,subReactor對(duì)事件處理的方式,也由阻塞模式變成了多線程處理,引入了任務(wù)隊(duì)列的模式。

熟悉Netty的同學(xué)可以看到,這個(gè)模型就是Netty設(shè)計(jì)的基礎(chǔ)。在Netty中,Boss線程對(duì)應(yīng)著對(duì)連接的處理和分派,相當(dāng)于mainReactor;Work線程 對(duì)應(yīng)著subReactor,使用多線程負(fù)責(zé)讀寫事件的分發(fā)和處理。

這種模式將每個(gè)組件的職責(zé)分的更細(xì),耦合度也更低,能有效的解決C10k問題。

4.AIO

關(guān)于NIO的概念,誤解還是比較多的。面試官可能會(huì)問你:為什么我在使用NIO的時(shí)候,使用Channel進(jìn)行讀寫,socket的操作依然是阻塞的?NIO主要體現(xiàn)在哪里?

  1. //這行代碼是阻塞的 
  2. int size = sc.read(buf); 

答案就是,NIO只負(fù)責(zé)對(duì)發(fā)生在fd描述符上的事件進(jìn)行通知。事件的獲取和通知部分是非阻塞的,但收到通知之后的操作,卻是阻塞的。即使使用多線程去處理這些事件,它依然是阻塞的。

AIO更近一步,將這些對(duì)事件的操作也變成非阻塞的。下面是一段典型的AIO代碼,它通過注冊(cè)CompletionHandler 回調(diào)函數(shù)進(jìn)行事件處理。這里的事件是隱藏的,比如read函數(shù),它不僅僅代表Channel可讀了,而且會(huì)把數(shù)據(jù)自動(dòng)的讀取到ByteBuffer中。等完成了讀取,就會(huì)通過回調(diào)函數(shù)通知你,進(jìn)行后續(xù)的操作。

  1. public class AIO { 
  2.     public static void main(String[] args) throws Exception { 
  3.         int port = 8888; 
  4.         AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open(); 
  5.         ssc.bind(new InetSocketAddress("localhost", port)); 
  6.         ssc.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { 
  7.             void job(final AsynchronousSocketChannel sc) { 
  8.                 ByteBuffer buffer = ByteBuffer.allocate(1024); 
  9.                 sc.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { 
  10.                     @Override 
  11.                     public void completed(Integer result, ByteBuffer attachment) { 
  12.                         String str = new String(attachment.array()).trim(); 
  13.                         ByteBuffer wrap = ByteBuffer.wrap(("PONG:" + str).getBytes()); 
  14.                         sc.write(wrap, null, new CompletionHandler<Integer, Object>() { 
  15.                             @Override 
  16.                             public void completed(Integer result, Object attachment) { 
  17.                                 job(sc); 
  18.                             } 
  19.                             @Override 
  20.                             public void failed(Throwable exc, Object attachment) { 
  21.                                 System.out.println("error"); 
  22.                             } 
  23.                         }); 
  24.                     } 
  25.                     @Override 
  26.                     public void failed(Throwable exc, ByteBuffer attachment) { 
  27.                         System.out.println("error"); 
  28.                     } 
  29.                 }); 
  30.             } 
  31.             @Override 
  32.             public void completed(AsynchronousSocketChannel sc, Object attachment) { 
  33.                 ssc.accept(null, this); 
  34.                 job(sc); 
  35.             } 
  36.             @Override 
  37.             public void failed(Throwable exc, Object attachment) { 
  38.                 exc.printStackTrace(); 
  39.                 System.out.println("error"); 
  40.             } 
  41.         }); 
  42.         Thread.sleep(Integer.MAX_VALUE); 
  43.     } 

AIO是Java1.7加入的,理論上性能是會(huì)提升的,但它現(xiàn)在發(fā)展的不太好。那部分對(duì)數(shù)據(jù)進(jìn)行自動(dòng)讀取的操作,總得有地方實(shí)現(xiàn),不在框架里,就得在內(nèi)核里。Netty的NIO模型加上多線程處理,在這方面已經(jīng)做的很好,編程模式也非常簡(jiǎn)單。所以,市面上對(duì)AIO的實(shí)踐并不多,在采用技術(shù)選型的時(shí)候,一定要謹(jǐn)慎。

5.響應(yīng)式編程

你可能聽說(shuō)過Spring5的webflux,webflux是可以替代spring mvc的一套解決方案,可以編寫響應(yīng)式的應(yīng)用,兩者之間的關(guān)系可以看下圖。它的底層使用的是netty,所以操作是異步非阻塞的。類似的組件還有vert.x、akka、rxjava等。

webflux是運(yùn)行在project reactor之上的一個(gè)封裝,其根本特性是由后者提供的。至于再底層的非阻塞模型,就是由Netty保證的了。

非阻塞的特性我們可以理解,響應(yīng)式又是什么概念呢?

響應(yīng)式編程是一種面向數(shù)據(jù)流和變化傳播的編程范式。這意味著可以在編程語(yǔ)言中很方便地表達(dá)靜態(tài)或動(dòng)態(tài)的數(shù)據(jù)流,而相關(guān)的計(jì)算模型會(huì)自動(dòng)將變化的值通過數(shù)據(jù)流進(jìn)行傳播。

這段話很晦澀,在編程方面,它表達(dá)的意思是:把生產(chǎn)者消費(fèi)者模式,使用簡(jiǎn)單的API表示出來(lái),并自動(dòng)處理背壓(backpressure)問題。

背壓,指的是生產(chǎn)者與消費(fèi)者之間的流量控制。通過將操作全面異步化,來(lái)減少無(wú)效的等待和資源消耗。

Java的lambda表達(dá)式可以承擔(dān)簡(jiǎn)單這個(gè)職責(zé),Java9更是引入了響應(yīng)式流(Reactive Stream),方便了我們的操作。比如,下面是Spring Cloud GateWay的Fluent API寫法,響應(yīng)式編程的API都是類似的。

  1. public RouteLocator customerRouteLocator(RouteLocatorBuilder builder) { 
  2.         return builder.routes() 
  3.                 .route(r -> r.path("/market/**"
  4.                         .filters(f -> f.filter(new RequestTimeFilter()) 
  5.                                 .addResponseHeader("X-Response-Default-Foo""Default-Bar")) 
  6.                         .uri("http://localhost:8080/market/list"
  7.                         .order(0) 
  8.                         .id("customer_filter_router"
  9.                 ) 
  10.                 .build(); 
  11.     } 

從傳統(tǒng)的開發(fā)模式過渡到reactor的開發(fā)模式,是有一定成本的,不過它確實(shí)能夠提高我們應(yīng)用程序的性能。具體用不用,就要看在編程難度和性能之間的取舍了。

小結(jié)

從上面的描述,我們了解到,BIO的線程模型是一個(gè)連接對(duì)應(yīng)一個(gè)線程的,非常的浪費(fèi)資源;NIO通過對(duì)關(guān)鍵事件的監(jiān)聽,通過主動(dòng)通知的方式完成非阻塞操作,但它對(duì)事件本身的處理依然是非阻塞的;AIO完全是異步非阻塞的,但現(xiàn)實(shí)中使用很少。

使用Netty的多Acceptor模式和多線程模式,我們能夠方便的完成類似AIO這樣的操作。Netty的事件觸發(fā)機(jī)制使用了高效的ET模式,使得支持的連接更多,性能更高。

使用Netty,能夠構(gòu)建響應(yīng)式編程的基礎(chǔ),加上類似lambda表達(dá)式這樣的書寫風(fēng)格,能夠完成類似WebFlux這樣的響應(yīng)式框架。響應(yīng)式編程是一個(gè)趨勢(shì),現(xiàn)在有越來(lái)越多的框架和底層的數(shù)據(jù)庫(kù)支持響應(yīng)式編程,我們的應(yīng)用響應(yīng)也會(huì)更加迅速。

 

作者簡(jiǎn)介:小姐姐味道 (xjjdog),一個(gè)不允許程序員走彎路的公眾號(hào)。聚焦基礎(chǔ)架構(gòu)和Linux。十年架構(gòu),日百億流量,與你探討高并發(fā)世界,給你不一樣的味道。

 

責(zé)任編輯:武曉燕 來(lái)源: 小姐姐味道
相關(guān)推薦

2023-04-06 09:42:00

LispHTMLQwit

2022-06-16 13:08:30

Combine響應(yīng)式編程訂閱

2020-04-16 15:20:43

PHP前端BIO

2022-04-16 16:52:24

Netty網(wǎng)絡(luò)服務(wù)器客戶端程序

2016-11-28 16:23:23

戴爾

2023-12-20 14:44:33

軟件開發(fā)DevOpsNoOps

2021-06-11 17:26:06

代碼Java網(wǎng)絡(luò)編程

2011-05-25 14:59:35

if elseswitch case

2020-10-10 19:37:27

BIO 、NIO 、A

2019-10-18 08:22:43

BIONIOAIO

2020-05-17 13:59:37

物聯(lián)網(wǎng)工業(yè)物聯(lián)網(wǎng)工業(yè)4.0

2023-07-11 08:40:02

IO模型后臺(tái)

2022-09-01 08:00:00

響應(yīng)式編程集成

2013-04-08 17:13:14

2017-09-12 15:26:44

2020-08-13 17:18:20

Kubernetes邊緣容器

2019-04-11 15:45:08

ReactMixin前端

2021-01-25 05:38:04

設(shè)計(jì)原理VueSubject

2023-06-26 07:39:10

2022-04-13 07:59:23

IOBIONIO
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)