自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

閱讀一個(gè)分布式框架,這些必備的 NIO 知識(shí)你要知道

開發(fā) 架構(gòu) 分布式
因?yàn)橐粋€(gè)分布式的開源框架,通常是集群部署的,不同的節(jié)點(diǎn)和節(jié)點(diǎn)之間需要相互通信來完成復(fù)雜的功能,而閱讀到這些源碼的時(shí)候,如果不了解它通信機(jī)制的話,就會(huì)迷失在代碼里,像走進(jìn)了一片原始森林。

[[397718]]

本文轉(zhuǎn)載自微信公眾號(hào)「KK架構(gòu)師」,作者KK架構(gòu)師。轉(zhuǎn)載本文請(qǐng)聯(lián)系KK架構(gòu)師公眾號(hào)。

 一、開篇

閱讀一個(gè)分布式開源項(xiàng)目的時(shí)候,最重要的就是了解這個(gè)項(xiàng)目的通信框架。

因?yàn)橐粋€(gè)分布式的開源框架,通常是集群部署的,不同的節(jié)點(diǎn)和節(jié)點(diǎn)之間需要相互通信來完成復(fù)雜的功能,而閱讀到這些源碼的時(shí)候,如果不了解它通信機(jī)制的話,就會(huì)迷失在代碼里,像走進(jìn)了一片原始森林。

比如 HDFS ,使用的通信框架是自己封裝的 Hadoop Rpc;Spark 底層通信就是用的 Netty;而最近閱讀的 Kafka 源碼,底層使用的是原生的 Java NIO。

所以本次,我們來聊一聊 Java NIO 的那些主要的知識(shí)點(diǎn)。

二、多圖弄懂 NIO 三大核心概念

談到 NIO,就會(huì)有三個(gè)核心的概念:通道、緩沖、選擇器。

直接開門見山,或許聽起來會(huì)有點(diǎn)迷茫,我們需要從頭開始說。

1、通道

以前在并發(fā)要求不是很高的情況下,是 CPU 來全權(quán)處理輸入輸出的(中斷),如下圖:

用戶程序向服務(wù)端發(fā)起讀寫請(qǐng)求,cpu 直接處理這些請(qǐng)求。這樣有一個(gè)弊端,當(dāng) IO 請(qǐng)求非常多的時(shí)候,會(huì)大量占用 CPU,使得整個(gè)系統(tǒng)的處理能力會(huì)下降。

隨著計(jì)算機(jī)的發(fā)展,出現(xiàn)了一種新的方式,使用 DMA 來全權(quán)處理 IO 請(qǐng)求,如下圖:

DMA 是 Direct Memory Access,直接內(nèi)存訪問控制。

為什么要增加這個(gè)設(shè)備呢?是因?yàn)?CPU 中斷方式不能滿足數(shù)據(jù)傳輸速度的要求,因?yàn)樵谥袛喾绞较?,每次中斷需要保存斷點(diǎn)和現(xiàn)場(chǎng),中斷返回時(shí),要恢復(fù)斷點(diǎn)和現(xiàn)場(chǎng)。

所有這些原因,使得中斷方式難以滿足高速外設(shè)對(duì)傳輸速度的要求。

所以,就有了 DMA 這樣的設(shè)備,在 DMA 方式的數(shù)據(jù)傳輸過程中,當(dāng) I/O 設(shè)備需要進(jìn)行數(shù)據(jù)傳送時(shí),通過 DMA 控制器向 CPU 提出 DMA 傳送請(qǐng)求,CPU 響應(yīng)之后將讓出系統(tǒng)總線,由 DMA 控制器接管總線進(jìn)行數(shù)據(jù)傳輸,而此時(shí) CPU 除了做一些初始化操作之外,可以去做自己的事情。

但是有了 DMA,仍然滿足不了業(yè)務(wù)快速發(fā)展的需要,因?yàn)楫?dāng) I/O 請(qǐng)求過多時(shí),會(huì)出現(xiàn)總線沖突的問題。

所以后面就出現(xiàn)了通道(Channel),它和 DMA 不同的地方是,通道有自己的指令系統(tǒng)和程序,是一個(gè)協(xié)處理器;而 DMA 只能實(shí)現(xiàn)固定的數(shù)據(jù)傳送控制。

而 Java NIO 中的 Channel ,就是對(duì)上圖中通道的實(shí)現(xiàn)。

2、緩沖

理解了通道的概念,緩沖區(qū)也很好理解了。

通道表示打開到 I/O 設(shè)備的(例如:文件、套接字)的連接,但是通道本身并不存儲(chǔ)數(shù)據(jù)。真正作為數(shù)據(jù)傳輸載體的是緩沖區(qū)。

當(dāng)應(yīng)用程序要寫數(shù)據(jù)時(shí),需要先把數(shù)據(jù)寫到緩沖區(qū)里,然后由通道負(fù)責(zé)把緩沖區(qū)的數(shù)據(jù)發(fā)送到目的地(文件、磁盤、網(wǎng)絡(luò)),然后再從緩沖區(qū)把數(shù)據(jù)取出來。

若需要使用 NIO 系統(tǒng),需要獲取用于連接 I/O 設(shè)備的通道以及用于容納數(shù)據(jù)的緩沖區(qū),然后操作緩沖區(qū),對(duì)數(shù)據(jù)進(jìn)行處理。

3、選擇器

選擇器也叫做多路復(fù)用器,是一種非阻塞式的 I/O 。既然談到了非阻塞式,必然要先談?wù)勛枞健W枞饺缦聢D所示:

客戶端向服務(wù)端發(fā)出一個(gè)讀寫請(qǐng)求時(shí),服務(wù)端的線程會(huì)一直看內(nèi)核地址空間是否有數(shù)據(jù)了。

客戶端沒有數(shù)據(jù)發(fā)送過來時(shí),服務(wù)端的線程會(huì)一直等待,在此期間是什么事情都做不了的。

直到客戶端有數(shù)據(jù)發(fā)送過來,會(huì)把數(shù)據(jù)從內(nèi)核地址空間拷貝到用戶地址空間,然后才讀取到了數(shù)據(jù)的。

這就導(dǎo)致如果有大量的請(qǐng)求過來,后面的請(qǐng)求要等待前面的請(qǐng)求執(zhí)行完畢,會(huì)造成大量的排隊(duì),無法充分利用 cpu 資源,性能就會(huì)急劇下降。

再看看選擇器是如何工作的。

現(xiàn)在客戶端服務(wù)端之間通信是用通道+緩沖區(qū)的,那么所有的通道都會(huì)注冊(cè)到選擇器上來。選擇器會(huì)監(jiān)控這些通道的 I/O 狀態(tài),比如連接、讀、寫的情況。

當(dāng)某一個(gè)通道上的某個(gè)事件完全就緒時(shí),選擇器才會(huì)把這個(gè)任務(wù)分配到服務(wù)端的一個(gè)或者多個(gè)線程上。

當(dāng)客戶端沒有事件準(zhǔn)備好時(shí),服務(wù)端的線程是不會(huì)阻塞的,它可以做自己的事情,直到客戶端事件就緒,才會(huì)去處理。

這種非阻塞式相比較阻塞式,可以進(jìn)一步的利用 cpu 資源。

三、理解了概念,再來學(xué) API

1、緩沖區(qū)的 API

要徹底理解緩沖區(qū),必須知道緩沖區(qū)的四個(gè)屬性,mark,position,limit,capacity,只需要跑一遍代碼就知道了。

(1)分配一定大小的緩沖區(qū)

  1. //1.分配一個(gè)指定大小的緩沖區(qū) 
  2. ByteBuffer buffer = ByteBuffer.allocate(10); 
  3. System.out.println("---------alocate"); 
  4. System.out.println("position:" + buffer.position()); 
  5. System.out.println("limit:" + buffer.limit()); 
  6. System.out.println("capacity:" + buffer.capacity()); 

運(yùn)行結(jié)果:

  1. ---------alocate----------- 
  2. position:0 
  3. limit:10 
  4. capacity:10 

這里我們分配了 10 個(gè)字節(jié)的緩沖區(qū),也就是在 ByteBuffer 的 final byte[] hb; 屬性上開辟了 10 個(gè)字節(jié)的空間。

所以容量 capacity 為 10 , limit 可讀寫數(shù)據(jù)的最大位置 也是 10 ,position 為可以操作數(shù)據(jù)的位置為 0 。

(2)往緩沖區(qū)寫數(shù)據(jù)

  1. // 2.寫入數(shù)據(jù)到緩沖區(qū) 
  2. String str = "abcde"
  3. System.out.println("------------put------------"); 
  4. buffer.put(str.getBytes(StandardCharsets.UTF_8)); 
  5. System.out.println("position:" + buffer.position()); 
  6. System.out.println("limit:" + buffer.limit()); 
  7. System.out.println("capacity:" + buffer.capacity()); 

運(yùn)行結(jié)果:

  1. ------------put------------ 
  2. position:5 
  3. limit:10 
  4. capacity:10 

這里我們往緩沖區(qū)寫了 5 個(gè)字節(jié)的數(shù)據(jù),那么 capacity 和 limit 都還是10,但是 position 為 5 了,因?yàn)榍懊嬉呀?jīng)寫入了 5 個(gè)了

(3)切換成讀數(shù)據(jù)的模式

  1. // 3.切換成讀數(shù)據(jù)的模式 
  2. buffer.flip(); 
  3. System.out.println("------------flip------------"); 
  4. System.out.println("position:" + buffer.position()); 
  5. System.out.println("limit:" + buffer.limit()); 
  6. System.out.println("capacity:" + buffer.capacity()); 

那我們現(xiàn)在想從緩沖區(qū)讀取一些數(shù)據(jù)出來,就需要切換成 flip 模式,flip 會(huì)改變一些屬性的值

運(yùn)行結(jié)果:

  1. ------------flip------------ 
  2. position:0 
  3. limit:5 
  4. capacity:10 

flip 會(huì)改變 position 的值為 0 ,并且 limit 為5,表示我要從頭開始讀,并且只能讀到 5 的位置

(4)讀取一些數(shù)據(jù)

  1. // 4. 讀取數(shù)據(jù) 
  2. System.out.println("------------get------------"); 
  3. byte[] dest = new byte[buffer.limit()]; 
  4. buffer.get(dest); 
  5. System.out.println(new String(dest,0,dest.length)); 
  6. System.out.println("position:" + buffer.position()); 
  7. System.out.println("limit:" + buffer.limit()); 
  8. System.out.println("capacity:" + buffer.capacity()); 

運(yùn)行結(jié)果:

  1. ------------get------------ 
  2. abcde 
  3. position:5 
  4. limit:5 
  5. capacity:10 

讀取了數(shù)據(jù)之后,position 就變成 5 了,表示我已經(jīng)讀取到 5 了。

(5)重復(fù)讀

  1. //5.rewind() 
  2. buffer.rewind(); 
  3. System.out.println("------------rewind------------"); 
  4. System.out.println("position:" + buffer.position()); 
  5. System.out.println("limit:" + buffer.limit()); 
  6. System.out.println("capacity:" + buffer.capacity()); 

運(yùn)行結(jié)果:

  1. ------------rewind------------ 
  2. position:0 
  3. limit:5 
  4. capacity:10 

rewind 表示重復(fù)讀取 buffer 里面的數(shù)據(jù)

(6)清除數(shù)據(jù)

  1. //6.clear() 
  2. buffer.clear(); 
  3. System.out.println("------------clear------------"); 
  4. System.out.println("position:" + buffer.position()); 
  5. System.out.println("limit:" + buffer.limit()); 
  6. System.out.println("capacity:" + buffer.capacity()); 

運(yùn)行結(jié)果:

  1. ------------clear------------ 
  2. position:0 
  3. limit:10 
  4. capacity:10 

clear() 之后,position 回到了 0 ,limit 回到了 10,又可以重頭開始寫數(shù)據(jù)了,能寫 10 個(gè)字節(jié)。

但是要注意的是,緩沖里面的數(shù)據(jù)并沒有清空掉,數(shù)據(jù)還在里面,處于被“遺忘”狀態(tài)。這幾個(gè)指針回到了最初的狀態(tài)。

(7)標(biāo)記

這是第四個(gè)屬性:mark。

mark 可以記錄 position 的位置。可以通過 reset() 方法回到 mark 的位置。

  1.  @Test 
  2.     public void test2() { 
  3.         // 分配 10 個(gè)字節(jié) 
  4.         String str = "abcde"
  5.         ByteBuffer buffer = ByteBuffer.allocate(10); 
  6.         buffer.put(str.getBytes(StandardCharsets.UTF_8)); 
  7.  
  8.         // 切換到讀模式,讀取 2 個(gè)字節(jié) 
  9.         buffer.flip(); 
  10.         byte[] dest = new byte[buffer.limit()]; 
  11.         buffer.get(dest, 0, 2); 
  12.         System.out.println(new String(dest, 0, 2)); 
  13.         System.out.println(buffer.position()); 
  14.  
  15.         // mark 一下記錄當(dāng)前位置 
  16.         buffer.mark(); 
  17.  
  18.         // 又讀取兩個(gè)字節(jié) 
  19.         buffer.get(dest, 2, 2); 
  20.         System.out.println(new String(dest, 2, 2)); 
  21.         System.out.println(buffer.position()); 
  22.  
  23.         // reset,回到 mark 的位置 
  24.         buffer.reset(); 
  25.         System.out.println(buffer.position()); 
  26.     } 
  27.  
  28. 執(zhí)行結(jié)果: 
  29.  
  30. ```tex 
  31. ab 
  32. cd 

2、使用通道、緩沖區(qū)、選擇器完成一個(gè)網(wǎng)絡(luò)程序

(1)服務(wù)端

  1. @Test 
  2.  public void testServer() throws IOException { 
  3.      ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); 
  4.      serverSocketChannel.configureBlocking(false); 
  5.  
  6.      serverSocketChannel.bind(new InetSocketAddress(8989)); 
  7.  
  8.      Selector selector = Selector.open(); 
  9.      serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); 
  10.  
  11.      while (selector.select() > 0) { 
  12.          Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); 
  13.          while (iterator.hasNext()) { 
  14.              SelectionKey key = iterator.next(); 
  15.              if (key.isAcceptable()) { 
  16.                  SocketChannel socketChannel = serverSocketChannel.accept(); 
  17.                  socketChannel.configureBlocking(false); 
  18.                  socketChannel.register(selector, SelectionKey.OP_READ); 
  19.              } else if (key.isReadable()) { 
  20.                  SocketChannel channel = (SocketChannel) key.channel(); 
  21.                  ByteBuffer byteBuffer = ByteBuffer.allocate(1024); 
  22.                  int len = 0; 
  23.                  while ((len = channel.read(byteBuffer)) > 0) { 
  24.                      byteBuffer.flip(); 
  25.                      System.out.println(new String(byteBuffer.array(), 0, len)); 
  26.                      byteBuffer.clear(); 
  27.                  } 
  28.              } 
  29.          } 
  30.  
  31.          iterator.remove(); 
  32.      } 
  33.  } 

1、首先使用 ServerSocketChannel.open(),打開一個(gè)通道,設(shè)置成非阻塞模式;

2、綁定到 8989 端口上;

3、把通道注冊(cè)到選擇器上;

4、while 循環(huán),選擇器上是否有事件,如果事件是客戶端的連接事件,則打開一個(gè) SocketChannel,注冊(cè)成非阻塞模式,并且往選擇器上注冊(cè)一個(gè)讀數(shù)據(jù)的事件;

5、當(dāng)客戶端發(fā)送數(shù)據(jù)過來的時(shí)候,就可以打開一個(gè)通道,讀取緩沖區(qū)上的數(shù)據(jù);

6、并且此時(shí),服務(wù)端是可以同時(shí)接受多個(gè)客戶端的請(qǐng)求的。

(2)客戶端

  1. @Test 
  2.  public void testClient() throws IOException { 
  3.      SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8989)); 
  4.      socketChannel.configureBlocking(false); 
  5.  
  6.      ByteBuffer byteBuffer = ByteBuffer.allocate(1024); 
  7.      byteBuffer.put(new Date().toString().getBytes(StandardCharsets.UTF_8)); 
  8.      byteBuffer.flip(); 
  9.      socketChannel.write(byteBuffer); 
  10.      byteBuffer.clear(); 
  11.  
  12.      socketChannel.close(); 
  13.  
  14.  } 

1、客戶端打開一個(gè) SocketChannel,配置成非阻塞模式;

2、使用 ByteBuffer 發(fā)送數(shù)據(jù)(注意發(fā)送之前,要 flip);

3、關(guān)閉通道。

四、總結(jié)

本次我們初步探究了一下 Java NIO 的幾個(gè)核心概念,通道、緩沖區(qū)、選擇器。

但是你要知道,這是冰山一角,通道和選擇器如果要深究的話,會(huì)涉及到操作系統(tǒng)底層和很多計(jì)算機(jī)組成原理的知識(shí)。

比如選擇器就涉及到了 select,poll,epoll 的概念,這幾個(gè)概念如果再打開的話,還會(huì)牽涉到硬件中斷,內(nèi)核的一些知識(shí)。

所以學(xué)海無涯苦作舟,越來越對(duì)這句話感同身受。

 

責(zé)任編輯:武曉燕 來源: KK架構(gòu)師
相關(guān)推薦

2019-12-27 16:00:56

分布式事務(wù)框架Java

2018-02-06 09:06:03

主流分布式存儲(chǔ)系統(tǒng)

2019-07-19 15:51:11

框架選型分布式

2023-04-05 10:00:00

分布式算法

2023-05-12 14:49:47

CSS框架前端

2018-02-08 08:08:12

2020-09-17 16:08:29

網(wǎng)絡(luò)安全數(shù)據(jù)技術(shù)

2020-09-08 08:27:25

JavaScript模塊ECMAScript

2018-02-25 04:57:01

物聯(lián)網(wǎng)網(wǎng)絡(luò)技術(shù)v

2020-07-30 09:35:09

Redis分布式鎖數(shù)據(jù)庫

2022-11-11 08:19:03

redis分布式

2022-09-13 09:14:48

架構(gòu)系統(tǒng)

2022-06-27 08:36:27

分布式事務(wù)XA規(guī)范

2021-06-24 10:27:48

分布式架構(gòu)系統(tǒng)

2021-06-25 10:45:43

Netty 分布式框架 IO 框架

2022-04-28 12:17:26

瀏覽器連字符hyphens

2018-05-16 09:41:13

神經(jīng)網(wǎng)絡(luò)NN函數(shù)

2013-09-11 16:02:00

Spark分布式計(jì)算系統(tǒng)

2022-04-14 07:56:30

公平鎖Java線程

2022-02-09 16:25:34

區(qū)塊鏈技術(shù)加密貨幣
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)