自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Java NIO2 AIO開(kāi)發(fā)核心流程

開(kāi)發(fā) 后端
按照《Unix網(wǎng)絡(luò)編程》的劃分,IO模型可以分為:阻塞IO、非阻塞IO、IO復(fù)用、信號(hào)驅(qū)動(dòng)IO和異步IO,按照POSIX標(biāo)準(zhǔn)來(lái)劃分只分為兩類(lèi):同步IO和異步IO。如何區(qū)分呢?首先一個(gè)IO操作其實(shí)分成了兩個(gè)步驟……

按照《Unix網(wǎng)絡(luò)編程》的劃分,IO模型可以分為:阻塞IO、非阻塞IO、IO復(fù)用、信號(hào)驅(qū)動(dòng)IO和異步IO,按照POSIX標(biāo)準(zhǔn)來(lái)劃分只分為兩類(lèi):同步IO和異步IO。如何區(qū)分呢?首先一個(gè)IO操作其實(shí)分成了兩個(gè)步驟:發(fā)起IO請(qǐng)求和實(shí)際的IO操作,同步IO和異步IO的區(qū)別就在于第二個(gè)步驟是否阻塞,如果實(shí)際的IO讀寫(xiě)阻塞請(qǐng)求進(jìn)程,那么就是同步IO,因此阻塞IO、非阻塞IO、IO服用、信號(hào)驅(qū)動(dòng)IO都是同步IO,如果不阻塞,而是操作系統(tǒng)幫你做完IO操作再將結(jié)果返回給你,那么就是異步IO。阻塞IO和非阻塞IO的區(qū)別在于第一步,發(fā)起IO請(qǐng)求是否會(huì)被阻塞,如果阻塞直到完成那么就是傳統(tǒng)的阻塞IO,如果不阻塞,那么就是非阻塞IO。

Java nio 2.0的主要改進(jìn)就是引入了異步IO(包括文件和網(wǎng)絡(luò)),這里主要介紹下異步網(wǎng)絡(luò)IO API的使用以及框架的設(shè)計(jì),以TCP服務(wù)端為例。首先看下為了支持AIO引入的新的類(lèi)和接口:

java.nio.channels.AsynchronousChannel

標(biāo)記一個(gè)channel支持異步IO操作。

java.nio.channels.AsynchronousServerSocketChannel

ServerSocket的aio版本,創(chuàng)建TCP服務(wù)端,綁定地址,監(jiān)聽(tīng)端口等。

java.nio.channels.AsynchronousSocketChannel

面向流的異步socket channel,表示一個(gè)連接。

java.nio.channels.AsynchronousChannelGroup

異步channel的分組管理,目的是為了資源共享。一個(gè)AsynchronousChannelGroup綁定一個(gè)線(xiàn)程池,這個(gè)線(xiàn)程池執(zhí)行兩個(gè)任務(wù):處理IO事件和派發(fā)CompletionHandler。AsynchronousServerSocketChannel創(chuàng)建的時(shí)候可以傳入一個(gè) AsynchronousChannelGroup,那么通過(guò)AsynchronousServerSocketChannel創(chuàng)建的 AsynchronousSocketChannel將同屬于一個(gè)組,共享資源。

java.nio.channels.CompletionHandler

異步IO操作結(jié)果的回調(diào)接口,用于定義在IO操作完成后所作的回調(diào)工作。AIO的API允許兩種方式來(lái)處理異步操作的結(jié)果:返回的Future模式或者注冊(cè)CompletionHandler,我更推薦用CompletionHandler的方式,這些handler的調(diào)用是由 AsynchronousChannelGroup的線(xiàn)程池派發(fā)的。顯然,線(xiàn)程池的大小是性能的關(guān)鍵因素。AsynchronousChannelGroup允許綁定不同的線(xiàn)程池,通過(guò)三個(gè)靜態(tài)方法來(lái)創(chuàng)建:

  1. public static AsynchronousChannelGroup withFixedThreadPool(int nThreads,  
  2.                                                               ThreadFactory threadFactory)  
  3.        throws IOException  
  4.  
  5. public static AsynchronousChannelGroup withCachedThreadPool(ExecutorService executor,  
  6.                                                                int initialSize)  
  7.  
  8. public static AsynchronousChannelGroup withThreadPool(ExecutorService executor)  
  9.        throws IOException 

需要根據(jù)具體應(yīng)用相應(yīng)調(diào)整,從框架角度出發(fā),需要暴露這樣的配置選項(xiàng)給用戶(hù)。

在介紹完了aio引入的TCP的主要接口和類(lèi)之后,我們來(lái)設(shè)想下一個(gè)aio框架應(yīng)該怎么設(shè)計(jì)。參考非阻塞nio框架的設(shè)計(jì),一般都是采用Reactor模式,Reacot負(fù)責(zé)事件的注冊(cè)、select、事件的派發(fā);相應(yīng)地,異步IO有個(gè)Proactor模式,Proactor負(fù)責(zé) CompletionHandler的派發(fā),查看一個(gè)典型的IO寫(xiě)操作的流程來(lái)看兩者的區(qū)別:

Reactor: send(msg) -> 消息隊(duì)列是否為空,如果為空 -> 向Reactor注冊(cè)O(shè)P_WRITE,然后返回 -> Reactor select -> 觸發(fā)Writable,通知用戶(hù)線(xiàn)程去處理 ->先注銷(xiāo)Writable(很多人遇到的cpu 100%的問(wèn)題就在于沒(méi)有注銷(xiāo)),處理Writeable,如果沒(méi)有完全寫(xiě)入,繼續(xù)注冊(cè)O(shè)P_WRITE。注意到,寫(xiě)入的工作還是用戶(hù)線(xiàn)程在處理。

Proactor: send(msg) -> 消息隊(duì)列是否為空,如果為空,發(fā)起read異步調(diào)用,并注冊(cè)CompletionHandler,然后返回。 -> 操作系統(tǒng)負(fù)責(zé)將你的消息寫(xiě)入,并返回結(jié)果(寫(xiě)入的字節(jié)數(shù))給Proactor -> Proactor派發(fā)CompletionHandler。可見(jiàn),寫(xiě)入的工作是操作系統(tǒng)在處理,無(wú)需用戶(hù)線(xiàn)程參與。事實(shí)上在aio的API 中,AsynchronousChannelGroup就扮演了Proactor的角色。

CompletionHandler有三個(gè)方法,分別對(duì)應(yīng)于處理成功、失敗、被取消(通過(guò)返回的Future)情況下的回調(diào)處理:

  1. public interface CompletionHandler<V,A> {  
  2.  
  3.      void completed(V result, A attachment);  
  4.  
  5.     void failed(Throwable exc, A attachment);  
  6.  
  7.      
  8.     void cancelled(A attachment);  

 

 

其中的泛型參數(shù)V表示IO調(diào)用的結(jié)果,而A是發(fā)起調(diào)用時(shí)傳入的attchment。

在初步介紹完aio引入的類(lèi)和接口后,我們看看一個(gè)典型的tcp服務(wù)端是怎么啟動(dòng)的,怎么接受連接并處理讀和寫(xiě),這里引用的代碼都是yanf4j 的aio分支中的代碼,可以從svn checkout,svn地址: http://yanf4j.googlecode.com/svn/branches/yanf4j-aio

第一步,創(chuàng)建一個(gè)AsynchronousServerSocketChannel,創(chuàng)建之前先創(chuàng)建一個(gè) AsynchronousChannelGroup,上文提到AsynchronousServerSocketChannel可以綁定一個(gè) AsynchronousChannelGroup,那么通過(guò)這個(gè)AsynchronousServerSocketChannel建立的連接都將同屬于一個(gè)AsynchronousChannelGroup并共享資源:

  1. this.asynchronousChannelGroup = AsynchronousChannelGroup  
  2.                     .withCachedThreadPool(Executors.newCachedThreadPool(),  
  3.                             this.threadPoolSize); 

 

然后初始化一個(gè)AsynchronousServerSocketChannel,通過(guò)open方法:

  1. this.serverSocketChannel = AsynchronousServerSocketChannel  
  2.                 .open(this.asynchronousChannelGroup); 

通過(guò)nio 2.0引入的SocketOption類(lèi)設(shè)置一些TCP選項(xiàng):

  1. this.serverSocketChannel  
  2.                     .setOption(  
  3.                             StandardSocketOption.SO_REUSEADDR,true);  
  4. this.serverSocketChannel  
  5.                     .setOption(  
  6.                             StandardSocketOption.SO_RCVBUF,16*1024); 

綁定本地地址:

  1. this.serverSocketChannel  
  2.                     .bind(new InetSocketAddress("localhost",8080), 100); 

其中的100用于指定等待連接的隊(duì)列大小(backlog)。完了嗎?還沒(méi)有,最重要的監(jiān)聽(tīng)工作還沒(méi)開(kāi)始,監(jiān)聽(tīng)端口是為了等待連接上來(lái)以便accept產(chǎn)生一個(gè)AsynchronousSocketChannel來(lái)表示一個(gè)新建立的連接,因此需要發(fā)起一個(gè)accept調(diào)用,調(diào)用是異步的,操作系統(tǒng)將在連接建立后,將最后的結(jié)果——AsynchronousSocketChannel返回給你

  1. public void pendingAccept() {  
  2.         if (this.started && this.serverSocketChannel.isOpen()) {  
  3.             this.acceptFuture = this.serverSocketChannel.accept(null,  
  4.                     new AcceptCompletionHandler());  
  5.  
  6.         } else {  
  7.             throw new IllegalStateException("Controller has been closed");  
  8.         }  
  9.     } 

注意,重復(fù)的accept調(diào)用將會(huì)拋出PendingAcceptException,后文提到的read和write也是如此。accept方法的第一個(gè)參數(shù)是你想傳給CompletionHandler的attchment,第二個(gè)參數(shù)就是注冊(cè)的用于回調(diào)的CompletionHandler,最后返回結(jié)果Future<AsynchronousSocketChannel>。你可以對(duì)future做處理,這里采用更推薦的方式就是注冊(cè)一個(gè)CompletionHandler。那么accept的CompletionHandler中做些什么工作呢?顯然一個(gè)赤裸裸的 AsynchronousSocketChannel是不夠的,我們需要將它封裝成session,一個(gè)session表示一個(gè)連接(mina里就叫 IoSession了),里面帶了一個(gè)緩沖的消息隊(duì)列以及一些其他資源等。在連接建立后,除非你的服務(wù)器只準(zhǔn)備接受一個(gè)連接,不然你需要在后面繼續(xù)調(diào)用pendingAccept來(lái)發(fā)起另一個(gè)accept請(qǐng)求:

  1. private final class AcceptCompletionHandler implements 
  2.             CompletionHandler<AsynchronousSocketChannel, Object> {  
  3.  
  4.         @Override 
  5.         public void cancelled(Object attachment) {  
  6.             logger.warn("Accept operation was canceled");  
  7.         }  
  8.  
  9.         @Override 
  10.         public void completed(AsynchronousSocketChannel socketChannel,  
  11.                 Object attachment) {  
  12.             try {  
  13.                 logger.debug("Accept connection from " 
  14.                         + socketChannel.getRemoteAddress());  
  15.                 configureChannel(socketChannel);  
  16.                 AioSessionConfig sessionConfig = buildSessionConfig(socketChannel);  
  17.                 Session session = new AioTCPSession(sessionConfig,  
  18.                         AioTCPController.this.configuration  
  19.                                 .getSessionReadBufferSize(),  
  20.                         AioTCPController.this.sessionTimeout);  
  21.                 session.start();  
  22.                 registerSession(session);  
  23.             } catch (Exception e) {  
  24.                 e.printStackTrace();  
  25.                 logger.error("Accept error", e);  
  26.                 notifyException(e);  
  27.             } finally {  
  28.                 <strong>pendingAccept</strong>();  
  29.             }  
  30.         }  
  31.  
  32.         @Override 
  33.         public void failed(Throwable exc, Object attachment) {  
  34.             logger.error("Accept error", exc);  
  35.             try {  
  36.                 notifyException(exc);  
  37.             } finally {  
  38.                 <strong>pendingAccept</strong>();  
  39.             }  
  40.         }  
  41.     } 

注意到了吧,我們?cè)趂ailed和completed方法中在最后都調(diào)用了pendingAccept來(lái)繼續(xù)發(fā)起accept調(diào)用,等待新的連接上來(lái)。有的同學(xué)可能要說(shuō)了,這樣搞是不是遞歸調(diào)用,會(huì)不會(huì)堆棧溢出?實(shí)際上不會(huì),因?yàn)榘l(fā)起accept調(diào)用的線(xiàn)程與CompletionHandler回調(diào)的線(xiàn)程并非同一個(gè),不是一個(gè)上下文中,兩者之間沒(méi)有耦合關(guān)系。要注意到,CompletionHandler的回調(diào)共用的是 AsynchronousChannelGroup綁定的線(xiàn)程池,因此千萬(wàn)別在CompletionHandler回調(diào)方法中調(diào)用阻塞或者長(zhǎng)時(shí)間的操作,例如sleep,回調(diào)方法最好能支持超時(shí),防止線(xiàn)程池耗盡。

連接建立后,怎么讀和寫(xiě)呢?回憶下在nonblocking nio框架中,連接建立后的第一件事是干什么?注冊(cè)O(shè)P_READ事件等待socket可讀。異步IO也同樣如此,連接建立后馬上發(fā)起一個(gè)異步read調(diào)用,等待socket可讀,這個(gè)是Session.start方法中所做的事情:

  1. public class AioTCPSession {  
  2.     protected void start0() {  
  3.         pendingRead();  
  4.     }  
  5.  
  6.     protected final void pendingRead() {  
  7.         if (!isClosed() && this.asynchronousSocketChannel.isOpen()) {  
  8.             if (!this.readBuffer.hasRemaining()) {  
  9.                 this.readBuffer = ByteBufferUtils  
  10.                         .increaseBufferCapatity(this.readBuffer);  
  11.             }  
  12.             this.readFuture = this.asynchronousSocketChannel.read(  
  13.                     this.readBuffer, thisthis.readCompletionHandler);  
  14.         } else {  
  15.             throw new IllegalStateException(  
  16.                     "Session Or Channel has been closed");  
  17.         }  
  18.     }  
  19.      

AsynchronousSocketChannel的read調(diào)用與AsynchronousServerSocketChannel的accept調(diào)用類(lèi)似,同樣是非阻塞的,返回結(jié)果也是一個(gè)Future,但是寫(xiě)的結(jié)果是整數(shù),表示寫(xiě)入了多少字節(jié),因此read調(diào)用返回的是 Future<Integer>,方法的第一個(gè)參數(shù)是讀的緩沖區(qū),操作系統(tǒng)將IO讀到數(shù)據(jù)拷貝到這個(gè)緩沖區(qū),第二個(gè)參數(shù)是傳遞給 CompletionHandler的attchment,第三個(gè)參數(shù)就是注冊(cè)的用于回調(diào)的CompletionHandler。這里保存了read的結(jié)果Future,這是為了在關(guān)閉連接的時(shí)候能夠主動(dòng)取消調(diào)用,accept也是如此。現(xiàn)在可以看看read的CompletionHandler的實(shí)現(xiàn):

  1. public final class ReadCompletionHandler implements 
  2.         CompletionHandler<Integer, AbstractAioSession> {  
  3.  
  4.     private static final Logger log = LoggerFactory  
  5.             .getLogger(ReadCompletionHandler.class);  
  6.     protected final AioTCPController controller;  
  7.  
  8.     public ReadCompletionHandler(AioTCPController controller) {  
  9.         this.controller = controller;  
  10.     }  
  11.  
  12.     @Override 
  13.     public void cancelled(AbstractAioSession session) {  
  14.         log.warn("Session(" + session.getRemoteSocketAddress()  
  15.                 + ") read operation was canceled");  
  16.     }  
  17.  
  18.     @Override 
  19.     public void completed(Integer result, AbstractAioSession session) {  
  20.         if (log.isDebugEnabled())  
  21.             log.debug("Session(" + session.getRemoteSocketAddress()  
  22.                     + ") read +" + result + " bytes");  
  23.         if (result < 0) {  
  24.             session.close();  
  25.             return;  
  26.         }  
  27.         try {  
  28.             if (result > 0) {  
  29.                 session.updateTimeStamp();  
  30.                 session.getReadBuffer().flip();  
  31.                 session.decode();  
  32.                 session.getReadBuffer().compact();  
  33.             }  
  34.         } finally {  
  35.             try {  
  36.                 session.pendingRead();  
  37.             } catch (IOException e) {  
  38.                 session.onException(e);  
  39.                 session.close();  
  40.             }  
  41.         }  
  42.         controller.checkSessionTimeout();  
  43.     }  
  44.  
  45.     @Override 
  46.     public void failed(Throwable exc, AbstractAioSession session) {  
  47.         log.error("Session read error", exc);  
  48.         session.onException(exc);  
  49.         session.close();  
  50.     }  
  51.  

如果IO讀失敗,會(huì)返回失敗產(chǎn)生的異常,這種情況下我們就主動(dòng)關(guān)閉連接,通過(guò)session.close()方法,這個(gè)方法干了兩件事情:關(guān)閉channel和取消read調(diào)用:

  1. if (null != this.readFuture) {  
  2.             this.readFuture.cancel(true);  
  3.         }  
  4. this.asynchronousSocketChannel.close(); 

在讀成功的情況下,我們還需要判斷結(jié)果result是否小于0,如果小于0就表示對(duì)端關(guān)閉了,這種情況下我們也主動(dòng)關(guān)閉連接并返回。如果讀到一定字節(jié),也就是result大于0的情況下,我們就嘗試從讀緩沖區(qū)中decode出消息,并派發(fā)給業(yè)務(wù)處理器的回調(diào)方法,最終通過(guò)pendingRead繼續(xù)發(fā)起read調(diào)用等待socket的下一次可讀??梢?jiàn),我們并不需要自己去調(diào)用channel來(lái)進(jìn)行IO讀,而是操作系統(tǒng)幫你直接讀到了緩沖區(qū),然后給你一個(gè)結(jié)果表示讀入了多少字節(jié),你處理這個(gè)結(jié)果即可。而nonblocking IO框架中,是reactor通知用戶(hù)線(xiàn)程socket可讀了,然后用戶(hù)線(xiàn)程自己去調(diào)用read進(jìn)行實(shí)際讀操作。這里還有個(gè)需要注意的地方,就是decode出來(lái)的消息的派發(fā)給業(yè)務(wù)處理器工作最好交給一個(gè)線(xiàn)程池來(lái)處理,避免阻塞group綁定的線(xiàn)程池。

 

IO寫(xiě)的操作與此類(lèi)似,不過(guò)通常寫(xiě)的話(huà)我們會(huì)在session中關(guān)聯(lián)一個(gè)緩沖隊(duì)列來(lái)處理,沒(méi)有完全寫(xiě)入或者等待寫(xiě)入的消息都存放在隊(duì)列中,隊(duì)列為空的情況下發(fā)起write調(diào)用:

  1. protected void write0(WriteMessage message) {  
  2.       boolean needWrite = false;  
  3.       synchronized (this.writeQueue) {  
  4.           needWrite = this.writeQueue.isEmpty();  
  5.           this.writeQueue.offer(message);  
  6.       }  
  7.       if (needWrite) {  
  8.           pendingWrite(message);  
  9.       }  
  10.   }  
  11.  
  12.   protected final void pendingWrite(WriteMessage message) {  
  13.       message = preprocessWriteMessage(message);  
  14.       if (!isClosed() && this.asynchronousSocketChannel.isOpen()) {  
  15.           this.asynchronousSocketChannel.write(message.getWriteBuffer(),  
  16.                   thisthis.writeCompletionHandler);  
  17.       } else {  
  18.           throw new IllegalStateException(  
  19.                   "Session Or Channel has been closed");  
  20.       }  
  21.   } 

write調(diào)用返回的結(jié)果與read一樣是一個(gè)Future<Integer>,而write的CompletionHandler處理的核心邏輯大概是這樣:

  1. @Override 
  2.     public void completed(Integer result, AbstractAioSession session) {  
  3.         if (log.isDebugEnabled())  
  4.             log.debug("Session(" + session.getRemoteSocketAddress()  
  5.                     + ") writen " + result + " bytes");  
  6.                   
  7.         WriteMessage writeMessage;  
  8.         Queue<WriteMessage> writeQueue = session.getWriteQueue();  
  9.         synchronized (writeQueue) {  
  10.             writeMessage = writeQueue.peek();  
  11.             if (writeMessage.getWriteBuffer() == null 
  12.                     || !writeMessage.getWriteBuffer().hasRemaining()) {  
  13.                 writeQueue.remove();  
  14.                 if (writeMessage.getWriteFuture() != null) {  
  15.                     writeMessage.getWriteFuture().setResult(Boolean.TRUE);  
  16.                 }  
  17.                 try {  
  18.                     session.getHandler().onMessageSent(session,  
  19.                             writeMessage.getMessage());  
  20.                 } catch (Exception e) {  
  21.                     session.onException(e);  
  22.                 }  
  23.                 writeMessage = writeQueue.peek();  
  24.             }  
  25.         }  
  26.         if (writeMessage != null) {  
  27.             try {  
  28.                 session.pendingWrite(writeMessage);  
  29.             } catch (IOException e) {  
  30.                 session.onException(e);  
  31.                 session.close();  
  32.             }  
  33.         }  
  34.     } 

compete方法中的result就是實(shí)際寫(xiě)入的字節(jié)數(shù),然后我們判斷消息的緩沖區(qū)是否還有剩余,如果沒(méi)有就將消息從隊(duì)列中移除,如果隊(duì)列中還有消息,那么繼續(xù)發(fā)起write調(diào)用。

重復(fù)一下,這里引用的代碼都是yanf4j aio分支中的源碼,感興趣的朋友可以直接check out出來(lái)看看: http://yanf4j.googlecode.com/svn/branches/yanf4j-aio。

在引入了aio之后,java對(duì)于網(wǎng)絡(luò)層的支持已經(jīng)非常完善,該有的都有了,java也已經(jīng)成為服務(wù)器開(kāi)發(fā)的首選語(yǔ)言之一。java的弱項(xiàng)在于對(duì)內(nèi)存的管理上,由于這一切都交給了GC,因此在高性能的網(wǎng)絡(luò)服務(wù)器上還是Cpp的天下。java這種單一堆模型比之erlang的進(jìn)程內(nèi)堆模型還是有差距,很難做到高效的垃圾回收和細(xì)粒度的內(nèi)存管理。

這里僅僅是介紹了aio開(kāi)發(fā)的核心流程,對(duì)于一個(gè)網(wǎng)絡(luò)框架來(lái)說(shuō),還需要考慮超時(shí)的處理、緩沖buffer的處理、業(yè)務(wù)層和網(wǎng)絡(luò)層的切分、可擴(kuò)展性、性能的可調(diào)性以及一定的通用性要求。

原文鏈接:http://lxy2330.iteye.com/blog/1122849

【編輯推薦】

  1. Java NIO開(kāi)發(fā)實(shí)例
  2. Java NIO 聊天室實(shí)例
  3. 多線(xiàn)程N(yùn)IO客戶(hù)端實(shí)例
  4. 用nio實(shí)現(xiàn)Echo服務(wù)
  5. Java NIO 深入研究
責(zé)任編輯:林師授 來(lái)源: lxy2330的博客
相關(guān)推薦

2009-11-30 09:40:23

Java 7 NIO2HTTP Server

2020-03-11 10:09:57

JAVA詳解classpath

2009-12-14 10:44:51

Java 7NIO2

2020-04-16 15:20:43

PHP前端BIO

2022-04-16 16:52:24

Netty網(wǎng)絡(luò)服務(wù)器客戶(hù)端程序

2021-12-27 10:20:46

JavaNetty網(wǎng)絡(luò)

2011-12-15 11:19:08

JavaNIO

2023-07-11 08:40:02

IO模型后臺(tái)

2020-10-10 19:37:27

BIO 、NIO 、A

2021-06-11 17:26:06

代碼Java網(wǎng)絡(luò)編程

2023-06-26 07:39:10

2014-04-18 09:55:49

Tomcat 8NIO 2

2019-10-18 08:22:43

BIONIOAIO

2021-08-12 18:48:31

響應(yīng)式編程Bio

2020-01-16 10:19:12

JavaAIOBIO

2021-03-04 08:34:55

同步阻塞非阻塞

2011-12-15 09:55:47

javanio

2011-12-07 14:57:44

JavaNIO

2011-12-15 09:40:06

Javanio

2009-06-22 17:09:00

J2EE項(xiàng)目開(kāi)發(fā)流程
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)