自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Java異步非阻塞編程的幾種方式

開發(fā) 開發(fā)工具
一個很簡單的業(yè)務(wù)邏輯,其他后端服務(wù)提供了一個接口,我們需要通過接口調(diào)用,獲取到響應(yīng)的數(shù)據(jù)。

 

一 從一個同步的Http調(diào)用說起

一個很簡單的業(yè)務(wù)邏輯,其他后端服務(wù)提供了一個接口,我們需要通過接口調(diào)用,獲取到響應(yīng)的數(shù)據(jù)。

逆地理接口:通過經(jīng)緯度獲取這個經(jīng)緯度所在的省市區(qū)縣以及響應(yīng)的code:

  1. curl-i"http://xxx?latitude=31.08966221524924&channel=amap7a&near=false&longitude=105.13990312814713" 
  2.  
  3. {"adcode":"510722"

服務(wù)端執(zhí)行,最簡單的同步調(diào)用方式:

 

服務(wù)端響應(yīng)之前,IO會阻塞在:java.net.SocketInputStream#socketRead0 的native方法上:

 

通過jstack日志,可以發(fā)現(xiàn),此時這個Thread會一直在runable的狀態(tài):

  1. "main"#1 prio=5 os_prio=31 tid=0x00007fed0c810000 nid=0x1003 runnable [0x000070000ce14000]   java.lang.Thread.State: RUNNABLE 
  2.         at java.net.SocketInputStream.socketRead0(Native Method) 
  3.         at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
  4.         at java.net.SocketInputStream.read(SocketInputStream.java:171) 
  5.         at java.net.SocketInputStream.read(SocketInputStream.java:141) 
  6.         at org.apache.http.impl.conn.LoggingInputStream.read(LoggingInputStream.java:84) 
  7.         at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137) 
  8.         at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153) 
  9.         at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:282) 
  10.         at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138) 
  11.         at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56) 
  12.         at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259) 
  13.         at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163) 
  14.         at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:165) 
  15.         at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273) 
  16.         at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125) 
  17.         at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272) 
  18.         at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185) 
  19.         at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) 
  20.         at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) 
  21.         at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) 
  22.         at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) 
  23.         at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) 
  24.         at com.amap.aos.async.AsyncIO.blockingIO(AsyncIO.java:207) 
  25.                 ....... 

線程模型示例:

 

同步最大的問題是在IO等待的過程中,線程資源沒有得到充分的利用,對于大量IO場景的業(yè)務(wù)吞吐量會有一定限制。

二 JDK NIO & Future

在JDK 1.5 中,JUC提供了Future抽象:

 

 

 


 

 

當然并不是所有的Future都是這樣實現(xiàn)的,如 io.netty.util.concurrent.AbstractFuture 就是通過線程輪詢?nèi)ァ?/p>

這樣做的好處是,主線程可以不用等待IO響應(yīng),可以去做點其他的,比如說再發(fā)送一個IO請求,可以等到一起返回:

  1. "main"#1 prio=5 os_prio=31 tid=0x00007fd7a500b000 nid=0xe03 waiting on condition [0x000070000a95d000]   java.lang.Thread.State: WAITING (parking) 
  2.         at sun.misc.Unsafe.park(Native Method) 
  3. - parking to wait for  <0x000000076ee2d768> (a java.util.concurrent.CountDownLatch$Sync) 
  4.         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 
  5.         at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) 
  6.         at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) 
  7.         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) 
  8.         at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) 
  9.         at org.asynchttpclient.netty.NettyResponseFuture.get(NettyResponseFuture.java:162) 
  10.         at com.amap.aos.async.AsyncIO.futureBlockingGet(AsyncIO.java:201) 
  11.         ..... 
  12. "AsyncHttpClient-2-1"#11 prio=5 os_prio=31 tid=0x00007fd7a7247800 nid=0x340b runnable [0x000070000ba94000]   java.lang.Thread.State: RUNNABLE 
  13.         at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method) 
  14.         at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198) 
  15.         at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117) 
  16.         at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) 
  17. - locked <0x000000076eb00ef0> (a io.netty.channel.nio.SelectedSelectionKeySet) 
  18. - locked <0x000000076eb00f10> (a java.util.Collections$UnmodifiableSet) 
  19. - locked <0x000000076eb00ea0> (a sun.nio.ch.KQueueSelectorImpl) 
  20.         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) 
  21.         at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:693) 
  22.         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:353) 
  23.         at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140) 
  24.         at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) 
  25.         at java.lang.Thread.run(Thread.java:748) 

 

主線程在等待結(jié)果返回過程中依然需要等待,沒有根本解決此問題。

三 使用Callback回調(diào)方式

第二節(jié)中,依然需要主線程等待,獲取結(jié)果,那么可不可以在主線程完成發(fā)送請求后,再也不用關(guān)心這個邏輯,去執(zhí)行其他的邏輯?那就可以使用Callback機制。

 

如此一來,主線程再也不需要關(guān)心發(fā)起IO后的業(yè)務(wù)邏輯,發(fā)送完請求后,就可以徹底去干其他事情,或者回到線程池中再供調(diào)度。如果是HttpServer,那么需要結(jié)合Servlet 3.1的異步Servlet。

 

 

 


 

 

異步Servelt參考資料

https://www.cnblogs.com/davenkin/p/async-servlet.html

使用Callback方式,從線程模型中看,發(fā)現(xiàn)線程資源已經(jīng)得到了比較充分的利用,整個過程中已經(jīng)沒有線程阻塞。

四 Callback hell

回調(diào)地獄,當Callback的線程還需要執(zhí)行下一個IO調(diào)用的時候,這個時候進入回調(diào)地獄模式。

典型的應(yīng)用場景如,通過經(jīng)緯度獲取行政區(qū)域adcode(逆地理接口),然后再根據(jù)獲得的adcode,獲取當?shù)氐奶鞖庑畔?天氣接口)。

在同步的編程模型中,幾乎不會涉及到此類問題。

 

Callback方式的核心缺陷

五 JDK 1.8 CompletableFuture

那么有沒有辦法解決Callback Hell的問題?當然有,JDK 1.8中提供了CompletableFuture,先看看它是怎么解決這個問題的。

將逆地理的Callback邏輯,封裝成一個獨立的CompletableFuture,當異步線程回調(diào)時,調(diào)用future.complete(T) ,將結(jié)果封裝。

 

將天氣執(zhí)行的Call邏輯,也封裝成為一個獨立的CompletableFuture ,完成之后,邏輯同上。

 

compose銜接,whenComplete輸出:

 

每一個IO操作,均可以封裝為獨立的CompletableFuture,從而避免回調(diào)地獄。

CompletableFuture,只有兩個屬性:

  • result:Future的執(zhí)行結(jié)果 (Either the result or boxed AltResult)。
  • stack:操作棧,用于定義這個Future接下來操作的行為 (Top of Treiber stack of dependent actions)。

weatherFuture這個方法是如何被調(diào)用的呢?

通過堆??梢园l(fā)現(xiàn),是在 reverseCodeFuture.complete(result) 的時候,并且也將獲得的adcode作為參數(shù)執(zhí)行接下來的邏輯。

 

這樣一來,就完美解決回調(diào)地獄問題,在主的邏輯中,看起來像是在同步的進行編碼。

六 Vert.x Future

Info-Service中,大量使用的 Vert.x Future 也是類似的解決的方案,不過設(shè)計上使用Handler的概念。

 

核心執(zhí)行的邏輯差不多:

 

這當然不是Vertx的全部,當然這是題外話了。

七 Reactive Streams

異步編程對吞吐量以及資源有好處,但是有沒有統(tǒng)一的抽象去解決此類問題內(nèi),答案是 Reactive Streams。

核心抽象:Publisher Subscriber Processor Subscription ,整個包里面,只有這四個接口,沒有實現(xiàn)類。

 

在JDK 9里面,已經(jīng)被作為一種規(guī)范封裝到 java.util.concurrent.Flow :

 

參考資料

https://www.baeldung.com/java-9-reactive-streams

http://ypk1226.com/2019/07/01/reactive/reactive-streams/

https://www.reactivemanifesto.org/

https://projectreactor.io/learn

一個簡單的例子:

 

 

 

 

八 Reactor & Spring 5 & Spring WebFlux

Flux & Mono

 

 

 


 

 

參考資料

https://projectreactor.io/docs/core/3.1.0.M3/reference/index.html

https://speakerdeck.com/simonbasle/projectreactor-dot-io-reactor3-intro

責(zé)任編輯:武曉燕 來源: 51CTO專欄
相關(guān)推薦

2021-08-02 11:13:28

人工智能機器學(xué)習(xí)技術(shù)

2021-05-07 16:19:36

異步編程Java線程

2015-07-03 10:12:04

編程同步非阻塞

2011-04-25 11:05:10

javascript

2022-09-22 10:51:32

服務(wù)端開發(fā)者異步非阻塞編程

2019-07-23 11:01:57

Python同步異步

2012-02-22 21:15:41

unixIO阻塞

2022-06-22 08:16:29

異步非阻塞框架

2016-11-28 09:08:43

java系統(tǒng)異步非阻塞

2012-10-10 10:00:27

同步異步開發(fā)Java

2024-12-02 00:57:17

非阻塞異步編程

2018-03-28 08:52:53

阻塞非阻塞I

2025-02-17 13:23:34

Python同步阻塞MySQL

2023-12-06 07:28:47

阻塞IO異步IO

2024-09-23 17:15:28

Python并發(fā)并行

2021-06-04 18:14:15

阻塞非阻塞tcp

2020-05-08 10:34:30

Spring非阻塞編程

2021-02-04 10:50:11

網(wǎng)絡(luò)安全非阻塞模Winsock編程

2021-01-10 11:21:33

JavaScript語言開發(fā)

2024-04-24 10:57:54

Golang編程
點贊
收藏

51CTO技術(shù)棧公眾號