一篇文章幫你徹底搞清楚“I/O多路復(fù)用”和“異步I/O”的前世今生
曾經(jīng)的VIP服務(wù)
在網(wǎng)絡(luò)的初期,網(wǎng)民很少,服務(wù)器完全無(wú)壓力,那時(shí)的技術(shù)也沒(méi)有現(xiàn)在先進(jìn),通常用一個(gè)線程來(lái)全程跟蹤處理一個(gè)請(qǐng)求。因?yàn)檫@樣最簡(jiǎn)單。
其實(shí)代碼實(shí)現(xiàn)大家都知道,就是服務(wù)器上有個(gè)ServerSocket在某個(gè)端口監(jiān)聽(tīng),接收到客戶端的連接后,會(huì)創(chuàng)建一個(gè)Socket,并把它交給一個(gè)線程進(jìn)行后續(xù)處理。
線程主要從Socket讀取客戶端傳過(guò)來(lái)的數(shù)據(jù),然后進(jìn)行業(yè)務(wù)處理,并把結(jié)果再寫(xiě)入Socket傳回客戶端。
由于網(wǎng)絡(luò)的原因,Socket創(chuàng)建后并不一定能立刻從它上面讀取數(shù)據(jù),可能需要等一段時(shí)間,此時(shí)線程也必須一直阻塞著。在向Socket寫(xiě)入數(shù)據(jù)時(shí),也可能會(huì)使線程阻塞。
這里準(zhǔn)備了一個(gè)示例,主要邏輯如下:
客戶端:創(chuàng)建20個(gè)Socket并連接到服務(wù)器上,再創(chuàng)建20個(gè)線程,每個(gè)線程負(fù)責(zé)一個(gè)Socket。
服務(wù)器端:接收到這20個(gè)連接,創(chuàng)建20個(gè)Socket,接著創(chuàng)建20個(gè)線程,每個(gè)線程負(fù)責(zé)一個(gè)Socket。
為了模擬服務(wù)器端的Socket在創(chuàng)建后不能立馬讀取數(shù)據(jù),讓客戶端的20個(gè)線程分別休眠5-10之間的一個(gè)隨機(jī)秒數(shù)。
客戶端的20個(gè)線程會(huì)在第5秒到第10秒這段時(shí)間內(nèi)陸陸續(xù)續(xù)的向服務(wù)器端發(fā)送數(shù)據(jù),服務(wù)器端的20個(gè)線程也會(huì)陸陸續(xù)續(xù)接收到數(shù)據(jù)。
- /**
- * @author lixinjie
- * @since 2019-05-07
- */
- public class BioServer {
- static AtomicInteger counter = new AtomicInteger(0);
- static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
- public static void main(String[] args) {
- try {
- ServerSocket ss = new ServerSocket();
- ss.bind(new InetSocketAddress("localhost", 8080));
- while (true) {
- Socket s = ss.accept();
- processWithNewThread(s);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- static void processWithNewThread(Socket s) {
- Runnable run = () -> {
- InetSocketAddress rsa = (InetSocketAddress)s.getRemoteSocketAddress();
- System.out.println(time() + "->" + rsa.getHostName() + ":" + rsa.getPort() + "->" + Thread.currentThread().getId() + ":" + counter.incrementAndGet());
- try {
- String result = readBytes(s.getInputStream());
- System.out.println(time() + "->" + result + "->" + Thread.currentThread().getId() + ":" + counter.getAndDecrement());
- s.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
- };
- new Thread(run).start();
- }
- static String readBytes(InputStream is) throws Exception {
- long start = 0;
- int total = 0;
- int count = 0;
- byte[] bytes = new byte[1024];
- //開(kāi)始讀數(shù)據(jù)的時(shí)間
- long begin = System.currentTimeMillis();
- while ((count = is.read(bytes)) > -1) {
- if (start < 1) {
- //第一次讀到數(shù)據(jù)的時(shí)間
- start = System.currentTimeMillis();
- }
- total += count;
- }
- //讀完數(shù)據(jù)的時(shí)間
- long end = System.currentTimeMillis();
- return "wait=" + (start - begin) + "ms,read=" + (end - start) + "ms,total=" + total + "bs";
- }
- static String time() {
- return sdf.format(new Date());
- }
- }
- /**
- * @author lixinjie
- * @since 2019-05-07
- */
- public class Client {
- public static void main(String[] args) {
- try {
- for (int i = 0; i < 20; i++) {
- Socket s = new Socket();
- s.connect(new InetSocketAddress("localhost", 8080));
- processWithNewThread(s, i);
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- static void processWithNewThread(Socket s, int i) {
- Runnable run = () -> {
- try {
- //睡眠隨機(jī)的5-10秒,模擬數(shù)據(jù)尚未就緒
- Thread.sleep((new Random().nextInt(6) + 5) * 1000);
- //寫(xiě)1M數(shù)據(jù),為了拉長(zhǎng)服務(wù)器端讀數(shù)據(jù)的過(guò)程
- s.getOutputStream().write(prepareBytes());
- //睡眠1秒,讓服務(wù)器端把數(shù)據(jù)讀完
- Thread.sleep(1000);
- s.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
- };
- new Thread(run).start();
- }
- static byte[] prepareBytes() {
- byte[] bytes = new byte[1024*1024*1];
- for (int i = 0; i < bytes.length; i++) {
- bytes[i] = 1;
- }
- return bytes;
- }
- }
執(zhí)行結(jié)果如下:
- 時(shí)間->IP:Port->線程Id:當(dāng)前線程數(shù)
- 15:11:52->127.0.0.1:55201->10:1
- 15:11:52->127.0.0.1:55203->12:2
- 15:11:52->127.0.0.1:55204->13:3
- 15:11:52->127.0.0.1:55207->16:4
- 15:11:52->127.0.0.1:55208->17:5
- 15:11:52->127.0.0.1:55202->11:6
- 15:11:52->127.0.0.1:55205->14:7
- 15:11:52->127.0.0.1:55206->15:8
- 15:11:52->127.0.0.1:55209->18:9
- 15:11:52->127.0.0.1:55210->19:10
- 15:11:52->127.0.0.1:55213->22:11
- 15:11:52->127.0.0.1:55214->23:12
- 15:11:52->127.0.0.1:55217->26:13
- 15:11:52->127.0.0.1:55211->20:14
- 15:11:52->127.0.0.1:55218->27:15
- 15:11:52->127.0.0.1:55212->21:16
- 15:11:52->127.0.0.1:55215->24:17
- 15:11:52->127.0.0.1:55216->25:18
- 15:11:52->127.0.0.1:55219->28:19
- 15:11:52->127.0.0.1:55220->29:20
- 時(shí)間->等待數(shù)據(jù)的時(shí)間,讀取數(shù)據(jù)的時(shí)間,總共讀取的字節(jié)數(shù)->線程Id:當(dāng)前線程數(shù)
- 15:11:58->wait=5012ms,read=1022ms,total=1048576bs->17:20
- 15:11:58->wait=5021ms,read=1022ms,total=1048576bs->13:19
- 15:11:58->wait=5034ms,read=1008ms,total=1048576bs->11:18
- 15:11:58->wait=5046ms,read=1003ms,total=1048576bs->12:17
- 15:11:58->wait=5038ms,read=1005ms,total=1048576bs->23:16
- 15:11:58->wait=5037ms,read=1010ms,total=1048576bs->22:15
- 15:11:59->wait=6001ms,read=1017ms,total=1048576bs->15:14
- 15:11:59->wait=6016ms,read=1013ms,total=1048576bs->27:13
- 15:11:59->wait=6011ms,read=1018ms,total=1048576bs->24:12
- 15:12:00->wait=7005ms,read=1008ms,total=1048576bs->20:11
- 15:12:00->wait=6999ms,read=1020ms,total=1048576bs->14:10
- 15:12:00->wait=7019ms,read=1007ms,total=1048576bs->26:9
- 15:12:00->wait=7012ms,read=1015ms,total=1048576bs->21:8
- 15:12:00->wait=7023ms,read=1008ms,total=1048576bs->25:7
- 15:12:01->wait=7999ms,read=1011ms,total=1048576bs->18:6
- 15:12:02->wait=9026ms,read=1014ms,total=1048576bs->10:5
- 15:12:02->wait=9005ms,read=1031ms,total=1048576bs->19:4
- 15:12:03->wait=10007ms,read=1011ms,total=1048576bs->16:3
- 15:12:03->wait=10006ms,read=1017ms,total=1048576bs->29:2
- 15:12:03->wait=10010ms,read=1022ms,total=1048576bs->28:1
可以看到服務(wù)器端確實(shí)為每個(gè)連接創(chuàng)建一個(gè)線程,共創(chuàng)建了20個(gè)線程。
客戶端進(jìn)入休眠約5-10秒,模擬連接上數(shù)據(jù)不就緒,服務(wù)器端線程在等待,等待時(shí)間約5-10秒。
客戶端陸續(xù)結(jié)束休眠,往連接上寫(xiě)入1M數(shù)據(jù),服務(wù)器端開(kāi)始讀取數(shù)據(jù),整個(gè)讀取過(guò)程約1秒。
可以看到,服務(wù)器端的工作線程會(huì)把時(shí)間花在“等待數(shù)據(jù)”和“讀取數(shù)據(jù)”這兩個(gè)過(guò)程上。
這有兩個(gè)不好的地方:
一是有很多客戶端同時(shí)發(fā)起請(qǐng)求的話,服務(wù)器端要?jiǎng)?chuàng)建很多的線程,可能會(huì)因?yàn)槌^(guò)了上限而造成崩潰。
二是每個(gè)線程的大部分時(shí)光中都是在阻塞著,無(wú)事可干,造成極大的資源浪費(fèi)。
開(kāi)頭已經(jīng)說(shuō)了那個(gè)年代網(wǎng)民很少,所以,不可能會(huì)有大量請(qǐng)求同時(shí)過(guò)來(lái)。至于資源浪費(fèi)就浪費(fèi)吧,反正閑著也是閑著。
來(lái)個(gè)簡(jiǎn)單的小例子:
飯店共有10張桌子,且配備了10位服務(wù)員。只要有客人來(lái)了,大堂經(jīng)理就把客人帶到一張桌子,并安排一位服務(wù)員全程陪同。
即使客人暫時(shí)不需要服務(wù),服務(wù)員也一直在旁邊站著。可能覺(jué)著是一種浪費(fèi),其實(shí)非也,這就是尊貴的VIP服務(wù)。
其實(shí),VIP映射的是一對(duì)一的模型,主要體現(xiàn)在“專用”上或“私有”上。
真正的多路復(fù)用技術(shù)
多路復(fù)用技術(shù)原本指的是,在通信方面,多種信號(hào)或數(shù)據(jù)(從宏觀上看)交織在一起,使用同一條傳輸通道進(jìn)行傳輸。
這樣做的目的,一方面可以充分利用通道的傳輸能力,另一方面自然是省時(shí)省力省錢(qián)啦。
其實(shí)這個(gè)概念非常的“生活化”,隨手就可以舉個(gè)例子:
一條小水渠里水在流,在一端往里倒入大量乒乓球,在另一端用網(wǎng)進(jìn)行過(guò)濾,把乒乓球和水流分開(kāi)。
這就是一個(gè)比較“土”的多路復(fù)用,首先在發(fā)射端把多種信號(hào)或數(shù)據(jù)進(jìn)行“混合”,接著是在通道上進(jìn)行傳輸,最后在接收端“分離”出自己需要的信號(hào)或數(shù)據(jù)。
相信大家都看出來(lái)了,這里的重點(diǎn)其實(shí)就是處理好“混合”和“分離”,對(duì)于不同的信號(hào)或數(shù)據(jù),有不同的處理方法。
比如以前的有線電視是模擬信號(hào),即電磁波。一家一般只有一根信號(hào)線,但可以同時(shí)接多個(gè)電視,每個(gè)電視任意換臺(tái),互不影響。
這是由于不同頻率的波可以混合和分離。(當(dāng)然,可能不是十分準(zhǔn)確,明白意思就行了。)
再比如城市的高鐵站一般都有數(shù)個(gè)站臺(tái)供高鐵(同時(shí))??浚鞘虚g的高鐵軌道單方向只有一條,如何保證那么多趟高鐵安全運(yùn)行呢?
很明顯是分時(shí)使用,每趟高鐵都有自己的時(shí)刻。多趟高鐵按不同的時(shí)刻出站相當(dāng)于混合,按不同的時(shí)刻進(jìn)站相當(dāng)于分離。
總結(jié)一下,多路指的是多種不同的信號(hào)或數(shù)據(jù)或其它事物,復(fù)用指的是共用同一個(gè)物理鏈路或通道或載體。
可見(jiàn),多路復(fù)用技術(shù)是一種一對(duì)多的模型,“多”的這一方復(fù)用了“一”的這一方。
其實(shí),一對(duì)多的模型主要體現(xiàn)在“公用”上或“共享”上。
您先看著,我一會(huì)再過(guò)來(lái)
一對(duì)一服務(wù)是典型的有錢(qián)任性,雖然響應(yīng)及時(shí)、服務(wù)周到,但不是每個(gè)人都能享受的,畢竟還是“屌絲”多嘛,那就來(lái)個(gè)共享服務(wù)吧。
所以實(shí)際當(dāng)中更多的情況是,客人坐下后,會(huì)給他一個(gè)菜單,讓他先看著,反正也不可能立馬點(diǎn)餐,服務(wù)員就去忙別的了。
可能不時(shí)的會(huì)有服務(wù)員從客人身旁經(jīng)過(guò),發(fā)現(xiàn)客人還沒(méi)有點(diǎn)餐,就會(huì)主動(dòng)去詢問(wèn)現(xiàn)在需要點(diǎn)餐嗎?
如果需要,服務(wù)員就給你寫(xiě)菜單,如果不需要,服務(wù)員就繼續(xù)往前走了。
這種情況飯店整體運(yùn)行的也很好,但是服務(wù)員人數(shù)少多了。現(xiàn)在服務(wù)10桌客人,4個(gè)服務(wù)員綽綽有余。(這節(jié)省的可都是純利潤(rùn)呀。)
因?yàn)?0桌客人同時(shí)需要服務(wù)的情況幾乎是不會(huì)發(fā)生的,絕大部分情況都是錯(cuò)開(kāi)的。如果真有的話,那就等會(huì)好了,又不是120/119,人命關(guān)天的。
回到代碼里,情況與之非常相似,完全可以采用相同的理論去處理。
連接建立后,找個(gè)地方把它放到那里,可以暫時(shí)先不管它,反正此時(shí)也沒(méi)有數(shù)據(jù)可讀。
但是數(shù)據(jù)早晚會(huì)到來(lái)的,所以,要不時(shí)的去詢問(wèn)每個(gè)連接有數(shù)據(jù)沒(méi)有,有的話就讀取數(shù)據(jù),沒(méi)有的話就繼續(xù)不管它。
其實(shí)這個(gè)模式在Java里早就有了,就是Java NIO,這里的大寫(xiě)字母“N”是單詞“New”,即“新”的意思,主要是為了和上面的“一對(duì)一”進(jìn)行區(qū)分。
先鋪墊一下吧
現(xiàn)在需要把Socket交互的過(guò)程再稍微細(xì)化一些。客戶端先請(qǐng)求連接,connect,服務(wù)器端然后接受連接,accept,然后客戶端再向連接寫(xiě)入數(shù)據(jù),write,接著服務(wù)器端從連接上讀出數(shù)據(jù),read。
和打電話的場(chǎng)景一樣,主叫撥號(hào),connect,被叫接聽(tīng),accept,主叫說(shuō)話,speak,被叫聆聽(tīng),listen。主叫給被叫打電話,說(shuō)明主叫找被叫有事,所以被叫關(guān)注的是接通電話,聽(tīng)對(duì)方說(shuō)。
客戶端主動(dòng)向服務(wù)器端發(fā)起請(qǐng)求,說(shuō)明客戶端找服務(wù)器端有事,所以服務(wù)器端關(guān)注的是接受請(qǐng)求,讀取對(duì)方傳來(lái)的數(shù)據(jù)。這里把接受請(qǐng)求,讀取數(shù)據(jù)稱為服務(wù)器端感興趣的操作。
在Java NIO中,接受請(qǐng)求的操作,用OP_ACCEPT表示,讀取數(shù)據(jù)的操作,用OP_READ表示。
我決定先過(guò)一遍飯店的場(chǎng)景,讓首次接觸Java NIO的同學(xué)不那么迷茫。就是把常規(guī)的場(chǎng)景進(jìn)行了定向整理,稍微有點(diǎn)刻意,明白意思就行了。
1、專門(mén)設(shè)立一個(gè)“跑腿”服務(wù)員,工作職責(zé)單一,就是問(wèn)問(wèn)客人是否需要服務(wù)。
2、站在門(mén)口接待客人,本來(lái)是大堂經(jīng)理的工作,但是他不愿意在門(mén)口盯著,于是就委托給跑腿服務(wù)員,你幫我盯著,有人來(lái)了告訴我。
于是跑腿服務(wù)員就有了一個(gè)任務(wù),替大堂經(jīng)理盯梢。終于來(lái)客人了,跑腿服務(wù)員趕緊告訴了大堂經(jīng)理。
3、大堂經(jīng)理把客人帶到座位上,對(duì)跑腿服務(wù)員說(shuō),客人接下來(lái)肯定是要點(diǎn)餐的,但是現(xiàn)在在看菜單,不知道什么時(shí)候能看好,所以你不時(shí)的過(guò)來(lái)問(wèn)問(wèn),看需不需要點(diǎn)餐,需要的話就再喊來(lái)一個(gè)“點(diǎn)餐”服務(wù)員給客人寫(xiě)菜單。
于是跑腿服務(wù)員就又多了一個(gè)任務(wù),就是盯著這桌客人,不時(shí)來(lái)問(wèn)問(wèn),如果需要服務(wù)的話,就叫點(diǎn)餐服務(wù)員過(guò)來(lái)服務(wù)。
4、跑腿服務(wù)員在某次詢問(wèn)中,客人終于決定點(diǎn)餐了,跑題服務(wù)員趕緊找來(lái)一個(gè)點(diǎn)餐服務(wù)員為客人寫(xiě)菜單。
5、就這樣,跑腿服務(wù)員既要盯著門(mén)外新過(guò)來(lái)的客人,也要盯著門(mén)內(nèi)已經(jīng)就坐的客人。新客人來(lái)了,通知大堂經(jīng)理去接待。就坐的客人決定點(diǎn)餐了,通知點(diǎn)餐服務(wù)員去寫(xiě)菜單。
事情就這樣一直循環(huán)的持續(xù)下去,一切,都挺好。角色明確,職責(zé)單一,配合很好。
大堂經(jīng)理和點(diǎn)餐服務(wù)員是需求的提供者或?qū)崿F(xiàn)者,跑腿服務(wù)員是需求的發(fā)現(xiàn)者,并識(shí)別出需求的種類,需要接待的交給大堂經(jīng)理,需要點(diǎn)餐的交給點(diǎn)餐服務(wù)員。
哈哈,Java NIO來(lái)啦
代碼的寫(xiě)法非常的固定,可以配合著后面的解說(shuō)來(lái)看,這樣就好理解了,如下:
- /**
- * @author lixinjie
- * @since 2019-05-07
- */
- public class NioServer {
- static int clientCount = 0;
- static AtomicInteger counter = new AtomicInteger(0);
- static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
- public static void main(String[] args) {
- try {
- Selector selector = Selector.open();
- ServerSocketChannel ssc = ServerSocketChannel.open();
- ssc.configureBlocking(false);
- ssc.register(selector, SelectionKey.OP_ACCEPT);
- ssc.bind(new InetSocketAddress("localhost", 8080));
- while (true) {
- selector.select();
- Set<SelectionKey> keys = selector.selectedKeys();
- Iterator<SelectionKey> iterator = keys.iterator();
- while (iterator.hasNext()) {
- SelectionKey key = iterator.next();
- iterator.remove();
- if (key.isAcceptable()) {
- ServerSocketChannel ssc1 = (ServerSocketChannel)key.channel();
- SocketChannel sc = null;
- while ((sc = ssc1.accept()) != null) {
- sc.configureBlocking(false);
- sc.register(selector, SelectionKey.OP_READ);
- InetSocketAddress rsa = (InetSocketAddress)sc.socket().getRemoteSocketAddress();
- System.out.println(time() + "->" + rsa.getHostName() + ":" + rsa.getPort() + "->" + Thread.currentThread().getId() + ":" + (++clientCount));
- }
- } else if (key.isReadable()) {
- //先將“讀”從感興趣操作移出,待把數(shù)據(jù)從通道中讀完后,再把“讀”添加到感興趣操作中
- //否則,該通道會(huì)一直被選出來(lái)
- key.interestOps(key.interestOps() & (~ SelectionKey.OP_READ));
- processWithNewThread((SocketChannel)key.channel(), key);
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- static void processWithNewThread(SocketChannel sc, SelectionKey key) {
- Runnable run = () -> {
- counter.incrementAndGet();
- try {
- String result = readBytes(sc);
- //把“讀”加進(jìn)去
- key.interestOps(key.interestOps() | SelectionKey.OP_READ);
- System.out.println(time() + "->" + result + "->" + Thread.currentThread().getId() + ":" + counter.get());
- sc.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
- counter.decrementAndGet();
- };
- new Thread(run).start();
- }
- static String readBytes(SocketChannel sc) throws Exception {
- long start = 0;
- int total = 0;
- int count = 0;
- ByteBuffer bb = ByteBuffer.allocate(1024);
- //開(kāi)始讀數(shù)據(jù)的時(shí)間
- long begin = System.currentTimeMillis();
- while ((count = sc.read(bb)) > -1) {
- if (start < 1) {
- //第一次讀到數(shù)據(jù)的時(shí)間
- start = System.currentTimeMillis();
- }
- total += count;
- bb.clear();
- }
- //讀完數(shù)據(jù)的時(shí)間
- long end = System.currentTimeMillis();
- return "wait=" + (start - begin) + "ms,read=" + (end - start) + "ms,total=" + total + "bs";
- }
- static String time() {
- return sdf.format(new Date());
- }
- }
它的大致處理過(guò)程如下:
1、定義一個(gè)選擇器,Selector。
相當(dāng)于設(shè)立一個(gè)跑腿服務(wù)員。
2、定義一個(gè)服務(wù)器端套接字通道,ServerSocketChannel,并配置為非阻塞的。
相等于聘請(qǐng)了一位大堂經(jīng)理。
3、將套接字通道注冊(cè)到選擇器上,并把感興趣的操作設(shè)置為OP_ACCEPT。
相當(dāng)于大堂經(jīng)理給跑腿服務(wù)員說(shuō),幫我盯著門(mén)外,有客人來(lái)了告訴我。
4、進(jìn)入死循環(huán),選擇器不時(shí)的進(jìn)行選擇。
相當(dāng)于跑腿服務(wù)員一遍又一遍的去詢問(wèn)、去轉(zhuǎn)悠。
5、選擇器終于選擇出了通道,發(fā)現(xiàn)通道是需要Acceptable的。
相當(dāng)于跑腿服務(wù)員終于發(fā)現(xiàn)門(mén)外來(lái)客人了,客人是需要接待的。
6、于是服務(wù)器端套接字接受了這個(gè)通道,開(kāi)始處理。
相當(dāng)于跑腿服務(wù)員把大堂經(jīng)理叫來(lái)了,大堂經(jīng)理開(kāi)始著手接待。
7、把新接受的通道配置為非阻塞的,并把它也注冊(cè)到了選擇器上,該通道感興趣的操作為OP_READ。
相當(dāng)于大堂經(jīng)理把客人帶到座位上,給了客人菜單,并又把客人委托給跑腿服務(wù)員,說(shuō)客人接下來(lái)肯定是要點(diǎn)餐的,你不時(shí)的來(lái)問(wèn)問(wèn)。
8、選擇器繼續(xù)不時(shí)的進(jìn)行選擇著。
相當(dāng)于跑腿服務(wù)員繼續(xù)不時(shí)的詢問(wèn)著、轉(zhuǎn)悠著。
9、選擇器終于又選擇出了通道,這次發(fā)現(xiàn)通道是需要Readable的。
相當(dāng)于跑腿服務(wù)員終于發(fā)現(xiàn)了一桌客人有了需求,是需要點(diǎn)餐的。
10、把這個(gè)通道交給了一個(gè)新的工作線程去處理。
相當(dāng)于跑腿服務(wù)員叫來(lái)了點(diǎn)餐服務(wù)員,點(diǎn)餐服務(wù)員開(kāi)始為客人寫(xiě)菜單。
11、這個(gè)工作線程處理完后,就被回收了,可以再去處理其它通道。
相當(dāng)于點(diǎn)餐服務(wù)員寫(xiě)好菜單后,就走了,可以再去為其他客人寫(xiě)菜單。
12、選擇器繼續(xù)著重復(fù)的選擇工作,不知道什么時(shí)候是個(gè)頭。
相當(dāng)于跑腿服務(wù)員繼續(xù)著重復(fù)的詢問(wèn)、轉(zhuǎn)悠,不知道未來(lái)在何方。
相信你已經(jīng)看出來(lái)了,大堂經(jīng)理相當(dāng)于服務(wù)器端套接字,跑腿服務(wù)員相當(dāng)于選擇器,點(diǎn)餐服務(wù)員相當(dāng)于Worker線程。
啟動(dòng)服務(wù)器端代碼,使用同一個(gè)客戶端代碼,按相同的套路發(fā)20個(gè)請(qǐng)求,結(jié)果如下:
- 時(shí)間->IP:Port->主線程Id:當(dāng)前連接數(shù)
- 16:34:39->127.0.0.1:56105->1:1
- 16:34:39->127.0.0.1:56106->1:2
- 16:34:39->127.0.0.1:56107->1:3
- 16:34:39->127.0.0.1:56108->1:4
- 16:34:39->127.0.0.1:56109->1:5
- 16:34:39->127.0.0.1:56110->1:6
- 16:34:39->127.0.0.1:56111->1:7
- 16:34:39->127.0.0.1:56112->1:8
- 16:34:39->127.0.0.1:56113->1:9
- 16:34:39->127.0.0.1:56114->1:10
- 16:34:39->127.0.0.1:56115->1:11
- 16:34:39->127.0.0.1:56116->1:12
- 16:34:39->127.0.0.1:56117->1:13
- 16:34:39->127.0.0.1:56118->1:14
- 16:34:39->127.0.0.1:56119->1:15
- 16:34:39->127.0.0.1:56120->1:16
- 16:34:39->127.0.0.1:56121->1:17
- 16:34:39->127.0.0.1:56122->1:18
- 16:34:39->127.0.0.1:56123->1:19
- 16:34:39->127.0.0.1:56124->1:20
- 時(shí)間->等待數(shù)據(jù)的時(shí)間,讀取數(shù)據(jù)的時(shí)間,總共讀取的字節(jié)數(shù)->線程Id:當(dāng)前線程數(shù)
- 16:34:45->wait=1ms,read=1018ms,total=1048576bs->11:5
- 16:34:45->wait=0ms,read=1054ms,total=1048576bs->10:5
- 16:34:45->wait=0ms,read=1072ms,total=1048576bs->13:6
- 16:34:45->wait=0ms,read=1061ms,total=1048576bs->14:5
- 16:34:45->wait=0ms,read=1140ms,total=1048576bs->12:4
- 16:34:46->wait=0ms,read=1001ms,total=1048576bs->15:5
- 16:34:46->wait=0ms,read=1062ms,total=1048576bs->17:6
- 16:34:46->wait=0ms,read=1059ms,total=1048576bs->16:5
- 16:34:47->wait=0ms,read=1001ms,total=1048576bs->19:4
- 16:34:47->wait=0ms,read=1001ms,total=1048576bs->20:4
- 16:34:47->wait=0ms,read=1015ms,total=1048576bs->18:3
- 16:34:47->wait=0ms,read=1001ms,total=1048576bs->21:2
- 16:34:48->wait=0ms,read=1032ms,total=1048576bs->22:4
- 16:34:49->wait=0ms,read=1002ms,total=1048576bs->23:3
- 16:34:49->wait=0ms,read=1001ms,total=1048576bs->25:2
- 16:34:49->wait=0ms,read=1028ms,total=1048576bs->24:4
- 16:34:50->wait=0ms,read=1008ms,total=1048576bs->28:4
- 16:34:50->wait=0ms,read=1033ms,total=1048576bs->27:3
- 16:34:50->wait=1ms,read=1002ms,total=1048576bs->29:2
- 16:34:50->wait=0ms,read=1001ms,total=1048576bs->26:2
服務(wù)器端接受20個(gè)連接,創(chuàng)建20個(gè)通道,并把它們注冊(cè)到選擇器上,此時(shí)不需要額外線程。
當(dāng)某個(gè)通道已經(jīng)有數(shù)據(jù)時(shí),才會(huì)用一個(gè)線程來(lái)處理它,所以,線程“等待數(shù)據(jù)”的時(shí)間是0,“讀取數(shù)據(jù)”的時(shí)間還是約1秒。
因?yàn)?0個(gè)通道是陸陸續(xù)續(xù)有數(shù)據(jù)的,所以服務(wù)器端最多時(shí)是6個(gè)線程在同時(shí)運(yùn)行的,換句話說(shuō),用包含6個(gè)線程的線程池就可以了。
對(duì)比與結(jié)論:
處理同樣的20個(gè)請(qǐng)求,一個(gè)需要用20個(gè)線程,一個(gè)需要用6個(gè)線程,節(jié)省了70%線程數(shù)。
在本例中,兩種感興趣的操作共用一個(gè)選擇器,且選擇器運(yùn)行在主線程里,Worker線程是新的線程。
其實(shí)對(duì)于選擇器的個(gè)數(shù)、選擇器運(yùn)行在哪個(gè)線程里、是否使用新的線程來(lái)處理請(qǐng)求都沒(méi)有要求,要根據(jù)實(shí)際情況來(lái)定。
比如說(shuō)redis,和處理請(qǐng)求相關(guān)的就一個(gè)線程,選擇器運(yùn)行在里面,處理請(qǐng)求的程序也運(yùn)行在里面,所以這個(gè)線程既是I/O線程,也是Worker線程。
當(dāng)然,也可以使用兩個(gè)選擇器,一個(gè)處理OP_ACCEPT,一個(gè)處理OP_READ,讓它們分別運(yùn)行在兩個(gè)單獨(dú)的I/O線程里。對(duì)于能快速完成的操作可以直接在I/O線程里做了,對(duì)于非常耗時(shí)的操作一定要使用Worker線程池來(lái)處理。
這種處理模式就是被稱為的多路復(fù)用I/O,多路指的是多個(gè)Socket通道,復(fù)用指的是只用一個(gè)線程來(lái)管理它們。
再稍微分析一下
一對(duì)一的形式,一個(gè)桌子配一個(gè)服務(wù)員,一個(gè)Socket分配一個(gè)線程,響應(yīng)速度最快,畢竟是VIP嘛,但是效率很低,服務(wù)員大部分時(shí)間都是在站著,線程大部分時(shí)間都是在等待。
多路復(fù)用的形式,所有桌子共用一個(gè)跑腿服務(wù)員,所有Socket共用一個(gè)選擇器線程,響應(yīng)速度肯定變慢了,畢竟是一對(duì)多嘛。但是效率提高了,點(diǎn)餐服務(wù)員在需要點(diǎn)餐時(shí)才會(huì)過(guò)去,工作線程在數(shù)據(jù)就緒時(shí)才會(huì)開(kāi)始工作。
從VIP到多路復(fù)用,形式上確實(shí)有很大的不同,其本質(zhì)是從一對(duì)一到一對(duì)多的轉(zhuǎn)變,其實(shí)就是犧牲了響應(yīng)速度,換來(lái)了效率的提升,不過(guò)綜合性能還是得到了極大的改進(jìn)。
就飯店而言,究竟幾張桌子配一個(gè)跑腿服務(wù)員,幾張桌子配一個(gè)點(diǎn)餐服務(wù)員,經(jīng)過(guò)一段時(shí)間運(yùn)行,一定會(huì)有一個(gè)最優(yōu)解。
就程序而言,究竟需要幾個(gè)選擇器線程,幾個(gè)工作線程,經(jīng)過(guò)評(píng)估測(cè)試后,也會(huì)有一個(gè)最優(yōu)解。
一旦達(dá)到最優(yōu)解后,就不可能再提升了,這同樣是由多路復(fù)用這種一對(duì)多的形式所限制的。就像一對(duì)一的形式限制一樣。
人們的追求是無(wú)止境的,如何對(duì)多路復(fù)用繼續(xù)提升呢?答案一定是具有顛覆性的,即拋棄多路復(fù)用,采用全新的形式。
還以飯店為例,如何在最優(yōu)解的情況下,既要繼續(xù)減少服務(wù)員數(shù)量,還要使效率提升呢?可能有些朋友已經(jīng)猜到了,那就是拋棄服務(wù)員服務(wù)客人這種模式,把飯店改成自助餐廳。
在客人進(jìn)門(mén)時(shí),把餐具給他,并告訴他就餐時(shí)長(zhǎng)、不準(zhǔn)浪費(fèi)等這些規(guī)則,然后就不用管了。客人自己選餐,自己吃完,自己走人,不用再等服務(wù)員了,因此也不再需要服務(wù)員了。(收拾桌子的除外。)
這種模式對(duì)應(yīng)到程序里,其實(shí)就是AIO,在Java里也早就有了。
嘻嘻,Java AIO來(lái)啦
代碼的寫(xiě)法非常的固定,可以配合著后面的解說(shuō)來(lái)看,這樣就好理解了,如下:
- /**
- * @author lixinjie
- * @since 2019-05-13
- */
- public class AioServer {
- static int clientCount = 0;
- static AtomicInteger counter = new AtomicInteger(0);
- static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
- public static void main(String[] args) {
- try {
- AsynchronousServerSocketChannel assc = AsynchronousServerSocketChannel.open();
- assc.bind(new InetSocketAddress("localhost", 8080));
- //非阻塞方法,其實(shí)就是注冊(cè)了個(gè)回調(diào),而且只能接受一個(gè)連接
- assc.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
- @Override
- public void completed(AsynchronousSocketChannel asc, Object attachment) {
- //再次注冊(cè),接受下一個(gè)連接
- assc.accept(null, this);
- try {
- InetSocketAddress rsa = (InetSocketAddress)asc.getRemoteAddress();
- System.out.println(time() + "->" + rsa.getHostName() + ":" + rsa.getPort() + "->" + Thread.currentThread().getId() + ":" + (++clientCount));
- } catch (Exception e) {
- }
- readFromChannelAsync(asc);
- }
- @Override
- public void failed(Throwable exc, Object attachment) {
- }
- });
- //不讓主線程退出
- synchronized (AioServer.class) {
- AioServer.class.wait();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- static void readFromChannelAsync(AsynchronousSocketChannel asc) {
- //會(huì)把數(shù)據(jù)讀入到該buffer之后,再觸發(fā)工作線程來(lái)執(zhí)行回調(diào)
- ByteBuffer bb = ByteBuffer.allocate(1024*1024*1 + 1);
- long begin = System.currentTimeMillis();
- //非阻塞方法,其實(shí)就是注冊(cè)了個(gè)回調(diào),而且只能接受一次讀取
- asc.read(bb, null, new CompletionHandler<Integer, Object>() {
- //從該連接上一共讀到的字節(jié)數(shù)
- int total = 0;
- /**
- * @param count 表示本次讀取到的字節(jié)數(shù),-1表示數(shù)據(jù)已讀完
- */
- @Override
- public void completed(Integer count, Object attachment) {
- counter.incrementAndGet();
- if (count > -1) {
- total += count;
- }
- int size = bb.position();
- System.out.println(time() + "->count=" + count + ",total=" + total + "bs,buffer=" + size + "bs->" + Thread.currentThread().getId() + ":" + counter.get());
- if (count > -1) {//數(shù)據(jù)還沒(méi)有讀完
- //再次注冊(cè)回調(diào),接受下一次讀取
- asc.read(bb, null, this);
- } else {//數(shù)據(jù)已讀完
- try {
- asc.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- counter.decrementAndGet();
- }
- @Override
- public void failed(Throwable exc, Object attachment) {
- }
- });
- long end = System.currentTimeMillis();
- System.out.println(time() + "->exe read req,use=" + (end -begin) + "ms" + "->" + Thread.currentThread().getId());
- }
- static String time() {
- return sdf.format(new Date());
- }
- }
它的大致處理過(guò)程如下:
1、初始化一個(gè)AsynchronousServerSocketChannel對(duì)象,并開(kāi)始監(jiān)聽(tīng)
2、通過(guò)accept方法注冊(cè)一個(gè)“完成處理器”的接受連接回調(diào),即CompletionHandler,用于在接受到連接后的相關(guān)操作。
3、當(dāng)客戶端連接過(guò)來(lái)后,由系統(tǒng)來(lái)接受,并創(chuàng)建好AsynchronousSocketChannel對(duì)象,然后觸發(fā)該回調(diào),并把該對(duì)象傳進(jìn)該回調(diào),該回調(diào)會(huì)在Worker線程中執(zhí)行。
4、在接受連接回調(diào)里,再次使用accept方法注冊(cè)一次相同的完成處理器對(duì)象,用于讓系統(tǒng)接受下一個(gè)連接。就是這種注冊(cè)只能使用一次,所以要不停的連續(xù)注冊(cè),人家就是這樣設(shè)計(jì)的。
5、在接受連接回調(diào)里,使用AsynchronousSocketChannel對(duì)象的read方法注冊(cè)另一個(gè)接受數(shù)據(jù)回調(diào),用于在接受到數(shù)據(jù)后的相關(guān)操作。
6、當(dāng)客戶端數(shù)據(jù)過(guò)來(lái)后,由系統(tǒng)接受,并放入指定好的ByteBuffer中,然后觸發(fā)該回調(diào),并把本次接受到的數(shù)據(jù)字節(jié)數(shù)傳入該回調(diào),該回調(diào)會(huì)在Worker線程中執(zhí)行。
7、在接受數(shù)據(jù)回調(diào)里,如果數(shù)據(jù)沒(méi)有接受完,需要再次使用read方法把同一個(gè)對(duì)象注冊(cè)一次,用于讓系統(tǒng)接受下一次數(shù)據(jù)。這和上面的套路是一樣的。
8、客戶端的數(shù)據(jù)可能是分多次傳到服務(wù)器端的,所以接受數(shù)據(jù)回調(diào)會(huì)被執(zhí)行多次,直到數(shù)據(jù)接受完為止。多次接受到的數(shù)據(jù)合起來(lái)才是完整的數(shù)據(jù),這個(gè)一定要處理好。
9、關(guān)于ByteBuffer,要么足夠的大,能夠裝得下完整的客戶端數(shù)據(jù),這樣多次接受的數(shù)據(jù)直接往里追加即可。要么每次把ByteBuffer中的數(shù)據(jù)移到別的地方存儲(chǔ)起來(lái),然后清空ByteBuffer,用于讓系統(tǒng)往里裝入下一次接受的數(shù)據(jù)。
注:如果出現(xiàn)ByteBuffer空間不足,則系統(tǒng)不會(huì)裝入數(shù)據(jù),就會(huì)導(dǎo)致客戶端數(shù)據(jù)總是讀不完,極有可能進(jìn)入死循環(huán)。
啟動(dòng)服務(wù)器端代碼,使用同一個(gè)客戶端代碼,按相同的套路發(fā)20個(gè)請(qǐng)求,結(jié)果如下:
- 時(shí)間->IP:Port->回調(diào)線程Id:當(dāng)前連接數(shù)
- 17:20:47->127.0.0.1:56454->15:1
- 時(shí)間->發(fā)起一個(gè)讀請(qǐng)求,耗時(shí)->回調(diào)線程Id
- 17:20:47->exe read req,use=3ms->15
- 17:20:47->127.0.0.1:56455->15:2
- 17:20:47->exe read req,use=1ms->15
- 17:20:47->127.0.0.1:56456->15:3
- 17:20:47->exe read req,use=0ms->15
- 17:20:47->127.0.0.1:56457->16:4
- 17:20:47->127.0.0.1:56458->15:5
- 17:20:47->exe read req,use=1ms->16
- 17:20:47->exe read req,use=1ms->15
- 17:20:47->127.0.0.1:56460->15:6
- 17:20:47->127.0.0.1:56459->17:7
- 17:20:47->exe read req,use=0ms->15
- 17:20:47->127.0.0.1:56462->15:8
- 17:20:47->127.0.0.1:56461->16:9
- 17:20:47->exe read req,use=1ms->15
- 17:20:47->exe read req,use=0ms->16
- 17:20:47->exe read req,use=0ms->17
- 17:20:47->127.0.0.1:56465->16:10
- 17:20:47->127.0.0.1:56463->18:11
- 17:20:47->exe read req,use=0ms->18
- 17:20:47->127.0.0.1:56466->15:12
- 17:20:47->exe read req,use=1ms->16
- 17:20:47->127.0.0.1:56464->17:13
- 17:20:47->exe read req,use=1ms->15
- 17:20:47->127.0.0.1:56467->18:14
- 17:20:47->exe read req,use=2ms->17
- 17:20:47->exe read req,use=1ms->18
- 17:20:47->127.0.0.1:56468->15:15
- 17:20:47->exe read req,use=1ms->15
- 17:20:47->127.0.0.1:56469->16:16
- 17:20:47->127.0.0.1:56470->18:17
- 17:20:47->exe read req,use=1ms->18
- 17:20:47->exe read req,use=1ms->16
- 17:20:47->127.0.0.1:56472->15:18
- 17:20:47->127.0.0.1:56473->19:19
- 17:20:47->exe read req,use=2ms->15
- 17:20:47->127.0.0.1:56471->17:20
- 17:20:47->exe read req,use=1ms->19
- 17:20:47->exe read req,use=1ms->17
- 時(shí)間->本次接受到的字節(jié)數(shù),截至到目前接受到的字節(jié)總數(shù),buffer中的字節(jié)總數(shù)->回調(diào)線程Id:當(dāng)前線程數(shù)
- 17:20:52->count=65536,total=65536bs,buffer=65536bs->14:1
- 17:20:52->count=65536,total=65536bs,buffer=65536bs->14:1
- 17:20:52->count=65536,total=65536bs,buffer=65536bs->14:1
- 17:20:52->count=230188,total=295724bs,buffer=295724bs->12:1
- 17:20:52->count=752852,total=1048576bs,buffer=1048576bs->14:3
- 17:20:52->count=131072,total=196608bs,buffer=196608bs->17:2
- 。。。。。。。。。。。。。。。。。。。。。。
- 17:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:1
- 17:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:1
- 17:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:1
- 17:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:1
- 17:20:58->count=-1,total=1048576bs,buffer=1048576bs->15:1
- 17:20:58->count=-1,total=1048576bs,buffer=1048576bs->15:1
- 17:20:58->count=-1,total=1048576bs,buffer=1048576bs->15:1
系統(tǒng)接受到連接后,在工作線程中執(zhí)行了回調(diào)。并且在回調(diào)中執(zhí)行了read方法,耗時(shí)是0,因?yàn)橹皇亲?cè)了個(gè)接受數(shù)據(jù)的回調(diào)而已。
系統(tǒng)接受到數(shù)據(jù)后,把數(shù)據(jù)放入ByteBuffer,在工作線程中執(zhí)行了回調(diào)。并且回調(diào)中可以直接使用ByteBuffer中的數(shù)據(jù)。
接受數(shù)據(jù)的回調(diào)被執(zhí)行了多次,多次接受到的數(shù)據(jù)加起來(lái)正好等于客戶端傳來(lái)的數(shù)據(jù)。
因?yàn)橄到y(tǒng)是接受到數(shù)據(jù)后才觸發(fā)的回調(diào),所以服務(wù)器端最多時(shí)是3個(gè)線程在同時(shí)運(yùn)行回調(diào)的,換句話說(shuō),線程池包含3個(gè)線程就可以了。
對(duì)比與結(jié)論:
處理同樣的20個(gè)請(qǐng)求,一個(gè)需要用20個(gè)線程,一個(gè)需要用6個(gè)線程,一個(gè)需要3個(gè)線程,又節(jié)省了50%線程數(shù)。
注:不用特別較真這個(gè)比較結(jié)果,這里只是為了說(shuō)明問(wèn)題而已。哈哈。
三種處理方式的對(duì)比
第一種是阻塞IO,阻塞點(diǎn)有兩個(gè),等待數(shù)據(jù)就緒的過(guò)程和讀取數(shù)據(jù)的過(guò)程。
第二種是阻塞IO,阻塞點(diǎn)有一個(gè),讀取數(shù)據(jù)的過(guò)程。
第三種是非阻塞IO,沒(méi)有阻塞點(diǎn),當(dāng)工作線程啟動(dòng)時(shí),數(shù)據(jù)已經(jīng)(被系統(tǒng))準(zhǔn)備好可以直接用了。
可見(jiàn),這是一個(gè)逐步消除阻塞點(diǎn)的過(guò)程。
再次來(lái)談?wù)劯鞣NIO:
只有一個(gè)線程,接受一個(gè)連接,讀取數(shù)據(jù),處理業(yè)務(wù),寫(xiě)回結(jié)果,再接受下一個(gè)連接,這是同步阻塞。這種用法幾乎沒(méi)有。
一個(gè)線程和一個(gè)線程池,線程接受到連接后,把它丟給線程池中的線程,再接受下一個(gè)連接,這是異步阻塞。對(duì)應(yīng)示例一。
一個(gè)線程和一個(gè)線程池,線程運(yùn)行selector,執(zhí)行select操作,把就緒的連接拿出來(lái)丟給線程池中的線程,再執(zhí)行下一次的select操作,就是多路復(fù)用,這是異步阻塞。對(duì)應(yīng)示例二。
一個(gè)線程和一個(gè)線程池,線程注冊(cè)一個(gè)accept回調(diào),系統(tǒng)幫我們接受好連接后,才觸發(fā)回調(diào)在線程池中執(zhí)行,執(zhí)行時(shí)再注冊(cè)read回調(diào),系統(tǒng)幫我們接受好數(shù)據(jù)后,才觸發(fā)回調(diào)在線程池中執(zhí)行,就是AIO,這是異步非阻塞。對(duì)應(yīng)示例三。
redis也是多路復(fù)用,但它只有一個(gè)線程在執(zhí)行select操作,處理就緒的連接,整個(gè)是串行化的,所以天然不存在并發(fā)問(wèn)題。只能把它歸為同步阻塞了。
BIO是阻塞IO,可以是同步阻塞,也可以是異步阻塞。AIO是異步IO,只有異步非阻塞這一種。因此沒(méi)有同步非阻塞這種說(shuō)法,因?yàn)橥揭欢ㄊ亲枞摹?/p>
注:以上的說(shuō)法是站在用戶程序/線程的立場(chǎng)上來(lái)說(shuō)的。
建議把代碼下載下來(lái),自己運(yùn)行一下,體會(huì)體會(huì):
https://github.com/coding-new-talking/java-code-demo.git