自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

徹底搞懂 Netty 線程模型

開(kāi)發(fā) 前端
在學(xué)習(xí)Netty 之前我們最好先掌握 BIO、NIO、AIO 基礎(chǔ)知識(shí),前面我們已經(jīng)花了三篇文章去講這些知識(shí)。我們開(kāi)始來(lái)學(xué)習(xí) Netty 的具體知識(shí)了,本文就Netty線程模型展開(kāi)分析。

 前言

  • BIO 、NIO 、AIO 總結(jié)
  • Unix網(wǎng)絡(luò)編程中的五種IO模型
  • 深入理解IO多路復(fù)用實(shí)現(xiàn)機(jī)制

在學(xué)習(xí)Netty 之前我們最好先掌握 BIO、NIO、AIO 基礎(chǔ)知識(shí),前面我們已經(jīng)花了三篇文章去講這些知識(shí)。我們開(kāi)始來(lái)學(xué)習(xí) Netty 的具體知識(shí)了,本文就Netty線程模型展開(kāi)分析。

基本概念
IO 模型

  • BIO:同步阻塞模型;
  • NIO:基于IO多路復(fù)用技術(shù)的“非阻塞同步”IO模型。簡(jiǎn)單來(lái)說(shuō),內(nèi)核將可讀可寫事件通知應(yīng)用,由應(yīng)用主動(dòng)發(fā)起讀寫事件;
  • AIO:非阻塞異步IO模型。簡(jiǎn)單來(lái)說(shuō),內(nèi)核將讀完成事件通知應(yīng)用,讀操作由內(nèi)核完成,應(yīng)用只需要操作數(shù)據(jù)即可;應(yīng)用做異步寫操作時(shí)立即返回,內(nèi)核會(huì)進(jìn)行寫操作排隊(duì)并執(zhí)行寫操作。

NIO 和 AIO 不同之處在于應(yīng)用是否進(jìn)行真正的讀寫操作。

reactor 和 proactor 模型

  • reactor:基于NIO技術(shù),可讀可寫時(shí)通知應(yīng)用;
  • proactor:基于AIO技術(shù),讀完成時(shí)通知應(yīng)用,寫操作應(yīng)用通知內(nèi)核。

Netty認(rèn)識(shí)
Netty是Java領(lǐng)域有名的開(kāi)源網(wǎng)絡(luò)庫(kù),特點(diǎn)是高性能和高擴(kuò)展性,因此很多流行的框架都是基于它來(lái)構(gòu)建的,比如我們熟知的Dubbo、Rocketmq、Hadoop等。

通過(guò)前面 NIO 的學(xué)習(xí)可以看到,NIO 的類庫(kù)和API 繁雜,例如 Selector、 ServerSocketChannel、 SocketChannel、 ByteBuffer等這些對(duì)于從事應(yīng)用層的程序員來(lái)說(shuō),使用起來(lái)開(kāi)發(fā)工作量和難度都非常大。另外客戶端面臨斷連重連、 網(wǎng)絡(luò)閃斷、心跳處理、半包讀寫、 網(wǎng)絡(luò)擁塞 和異常流的處理等等。

Netty 對(duì) JDK 自帶的 NIO 的 API 進(jìn)行了良好的封裝,解決了上述問(wèn)題。且Netty擁有高性能、 吞吐量更高,延遲更低,減少資源消耗,最小化不必要的內(nèi)存復(fù)制等優(yōu)點(diǎn)。

Netty 現(xiàn)在都在用的是4.x,5.x版本已經(jīng)廢棄,Netty 4.x 需要JDK 6以上版本支持。

在了解Netty使用場(chǎng)景后,本節(jié)將從IO模型的演進(jìn)角度來(lái)分析Netty線程模型,通過(guò)并發(fā)編程之父Doug Lea所寫《Scalable IO in Java》中涉及的一些IO處理模式,一步一步深入理解Netty線程模型的“進(jìn)化歷史”。

《Scalable IO in Java》:

http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

Netty使用場(chǎng)景

  1. 互聯(lián)網(wǎng)行業(yè):在分布式系統(tǒng)中,各個(gè)節(jié)點(diǎn)之間需要遠(yuǎn)程服務(wù)調(diào)用,高性能的 RPC 框架必不可少,Netty 作為異步高性能的通信框架,往往作為基礎(chǔ)通信組件被這些 RPC 框架使用。典型的應(yīng)用有:阿里分布式服務(wù)框架 Dubbo 的 RPC 使用 Dubbo 協(xié)議進(jìn)行通信,Dubbo 協(xié)議默認(rèn)使用 Netty 作為基礎(chǔ)通信組件,用于實(shí)現(xiàn)各進(jìn)程節(jié)點(diǎn)之間的內(nèi)部通信;消息中間件Rocketmq底層也是用的Netty作為基礎(chǔ)通信組件。
  2. 游戲行業(yè):無(wú)論是手游服務(wù)端還是大型的網(wǎng)絡(luò)游戲,Java 語(yǔ)言都得到了越來(lái)越廣泛的應(yīng)用。Netty 作為高性能的基礎(chǔ)通信組件,它本身提供了 TCP/UDP 和 HTTP 協(xié)議棧。
  3. 大數(shù)據(jù)領(lǐng)域:經(jīng)典的 Hadoop 的高性能通信和序列化組件 Avro 的 RPC 框架,默認(rèn)采用 Netty 進(jìn)行跨節(jié)點(diǎn)通信,它的 Netty Service 是基于 Netty 框架二次封裝實(shí)現(xiàn)。

Netty相關(guān)開(kāi)源項(xiàng)目:

https://netty.io/wiki/related-projects.html

IO處理模式演進(jìn)
基本上所有的網(wǎng)絡(luò)處理程序都遵循以下基本的處理(handler)流程:

  1. Read request (接收二進(jìn)制數(shù)據(jù))
  2. Decode request (解碼為可讀數(shù)據(jù))
  3. Process service (對(duì)數(shù)據(jù)進(jìn)行處理產(chǎn)生結(jié)果)
  4. Encode reply (將結(jié)果編碼為二進(jìn)制數(shù)據(jù))
  5. Send reply (返回結(jié)果)

傳統(tǒng)網(wǎng)絡(luò)服務(wù)器會(huì)為每一個(gè)連接的處理開(kāi)啟一個(gè)新的線程,即我們前面所說(shuō)的BIO模型(多線程模式),我們可以看下大致的示意圖:

上圖為 BIO版本

BIO模型對(duì)于每一個(gè)請(qǐng)求都分發(fā)給一個(gè)線程(可以理解為一個(gè)handler),每個(gè)handler中都獨(dú)自處理上面1-5流程。這種模型的適用場(chǎng)景和瓶頸可以查看《BIO 、NIO 、AIO 總結(jié)》。

改進(jìn):采用基于事件驅(qū)動(dòng)的設(shè)計(jì),當(dāng)有事件觸發(fā)時(shí),才會(huì)調(diào)用處理器進(jìn)行數(shù)據(jù)處理(非阻塞)。這就是對(duì)應(yīng)的NIO線程模型。

上圖為單線程版事件驅(qū)動(dòng)模型,可以理解為NIO單線程版本

上面用到了 Reactor 模式。關(guān)于 Reactor 模式的兩個(gè)概念:

  • Reactor:負(fù)責(zé)響應(yīng) IO 事件,當(dāng)檢測(cè)到一個(gè)新的事件,將其發(fā)送給相應(yīng)的 Handler 去處理。
  • Handler:負(fù)責(zé)處理非阻塞的行為,標(biāo)識(shí)系統(tǒng)管理的資源,同時(shí)將 Handler 與事件綁定。

注意:Reactor 為單個(gè)線程,如上圖所示:不僅需要處理客戶端的 accept 連接請(qǐng)求,同時(shí)也要負(fù)責(zé)分發(fā)(dispatch)讀寫請(qǐng)求到處理器中。由于只有單個(gè)線程處理各種請(qǐng)求,所以要求處理器中的業(yè)務(wù)需要能夠快速處理完。

改進(jìn):現(xiàn)在的服務(wù)器基本上是多核 CPU,那么在多處理器場(chǎng)景下,為實(shí)現(xiàn)服務(wù)的高性能我們可以有目的的采用多線程模式處理業(yè)務(wù)。

上圖為多線程版本事件驅(qū)動(dòng)模型,可以理解為NIO多線程版本

通過(guò)與NIO單線程模型相比,增加了worker線程池,專門用于處理非IO操作(decode、compute、encode),大大提高了工作效率。

這種模型下,客戶端發(fā)送過(guò)來(lái)的連接和注冊(cè)還是由主線程 Reactor 統(tǒng)一去處理,只不過(guò)客戶端連接成功后的后續(xù)事件分發(fā)給 worker 線程池去處理。

但是,當(dāng)客戶端短時(shí)間內(nèi)幾十萬(wàn)或者上百萬(wàn)條連接請(qǐng)求的時(shí)候(雙十一、春運(yùn)搶票),單個(gè) Rector 不僅要處理注冊(cè)事件,也要同時(shí)分發(fā)任務(wù)到 worker 線程池,由于分發(fā)也是比較耗時(shí)的操作,有可能會(huì)導(dǎo)致阻塞。

繼續(xù)改進(jìn):將Reactor 拆分為兩部分

上圖為主從NIO模型

如上圖所示,在這種模型下,mainReactor 專門負(fù)責(zé)新客戶端的連接操作,建立通道(channel),然后將一定事件內(nèi)的 channel 注冊(cè)到另外一個(gè) subReactor,subReactor 負(fù)責(zé)將客戶端的讀寫請(qǐng)求交給線程池處理。這樣即使幾十萬(wàn)個(gè)請(qǐng)求同時(shí)到來(lái)也無(wú)所謂了。

通俗理解,mainReactor就是大總管,只負(fù)責(zé)接口,subReactor就是一個(gè)員工,負(fù)責(zé)給總管接待客戶提供服務(wù)。

Netty線程模型
通過(guò)對(duì) 《 Scalable IO in Java 》里的一些 IO 處理模式理解, Netty的線程模型就是由上面主從NIO模型演變來(lái)的,是基于Reactor模型的。

如下圖所示,Boos Group 就是上面提到的 mainReactor,與上面不同在于 Worker Group,它可以理解為一組 subReactor,即在大總管下面有多個(gè)員工來(lái)干活,每次接收的客戶都均勻分配給不同員工。Netty 之所以單機(jī)支持百萬(wàn)級(jí)別并發(fā)量,就是因?yàn)橐恢鞫鄰牡木€程模型。

需要說(shuō)明的是,Netty 的線程模型并不是一成不變的。它通常采用一主多從,但是也可以根據(jù)實(shí)際需要配置啟動(dòng)參數(shù),通過(guò)設(shè)置不同的啟動(dòng)參數(shù),Netty 可以同時(shí)支持 “多主多從”。

下面是對(duì)上圖“一主多從” Netty 模型的詳細(xì)解釋:

  1. Netty 抽象除兩組線程池 BossGroup 和 WorkerGroup,BossGroup 專門負(fù)責(zé)接收客戶端的連接,WorkerGroup 專門負(fù)責(zé)網(wǎng)絡(luò)的讀寫。
  2. BossGroup 和 WorkerGroup 類型都是 NioEventLoopGroup。
  3. NioEventLoopGroup 相當(dāng)于一個(gè)事件循環(huán)線程組,這個(gè)組中含有多個(gè)事件循環(huán)線程,每一個(gè)事件循環(huán)線程是 NioEventLoop。
  4. 每個(gè) NioEventLoop 都有一個(gè) selector,用于監(jiān)聽(tīng)注冊(cè)在其上的 socketChannel 的網(wǎng)絡(luò)通訊。
  5. 每個(gè) Boss NioEventLoop 線程內(nèi)部循環(huán)執(zhí)行的步驟有 3 步:處理accept事件,與 client 建立連接,生成 NioSocketChannel;將NioSocketChannel 注冊(cè)到某個(gè) worker NioEventLoop 上的 selector;處理任務(wù)隊(duì)列的任務(wù),即runAllTasks。
  6. 每個(gè) worker NIOEvent'Loop線程循環(huán)執(zhí)行的步驟:輪詢注冊(cè)到最近的 selector 上所有的 NioSocketChannel 的 read、write 事件;處理 I/O 事件,即 read、write事件,在對(duì)應(yīng)的NioScoketChannel 處理業(yè)務(wù);runAllTask 處理任務(wù)隊(duì)列 TaskQueue 的任務(wù),一些耗時(shí)的業(yè)務(wù)處理一般可以放入 TaskQueue 中,這樣不影響數(shù)據(jù)在 pipeline 中的流動(dòng)處理。
  7. 每個(gè) worker NIOEventLoop 處理 NioSocketChannel 業(yè)務(wù)時(shí),會(huì)使用 pipeline (管道),管道中維護(hù)來(lái)很多 handler 處理器用來(lái)處理 channel 中的數(shù)據(jù)。

Netty 模塊組件
Bootstrap、ServerBootstrap
Bootstrap 意思是引導(dǎo),一個(gè) Netty 應(yīng)用通常由一個(gè) Bootstrap 開(kāi)始,主要作用是配置整個(gè) Netty程序,通過(guò)鏈?zhǔn)秸{(diào)用串聯(lián)各個(gè)組件。Netty 中 Bootstrap 類是客戶端程序的啟動(dòng)引導(dǎo)類,ServerBootstrap 是服務(wù)端 啟動(dòng)引導(dǎo)類。

Future、ChannelFuture
正如前面介紹,在 Netty 中所有的 IO 操作都是異步的,不能立刻得知消息是否被正確處理。但是可以過(guò)一會(huì)等它執(zhí)行完成或者直接注冊(cè)一個(gè)監(jiān)聽(tīng),具體的實(shí)現(xiàn)就是通過(guò) Future 和 ChannelFutures,他們可以注冊(cè)一個(gè)監(jiān)聽(tīng),當(dāng)操作執(zhí)行成功或失敗時(shí)監(jiān)聽(tīng)會(huì)自動(dòng)觸發(fā)注冊(cè)的監(jiān)聽(tīng)事件。

Channel
Netty 網(wǎng)絡(luò)通信的組件,能夠用于執(zhí)行網(wǎng)絡(luò) I/O 操作。Channel 為用戶提供:

  1. 當(dāng)前網(wǎng)絡(luò)連接的通道的狀態(tài)(例如是否打開(kāi)?是否已連接?)
  2. 網(wǎng)絡(luò)連接的配置參數(shù) (例如接收緩沖區(qū)大?。?/li>
  3. 提供異步的網(wǎng)絡(luò) I/O 操作(如建立連接,讀寫,綁定端口),異步調(diào)用意味著任何 I/O 調(diào)用都將立即返回,并且不保證在調(diào)用結(jié)束時(shí)所請(qǐng)求的 I/O 操作已完成。
  4. 調(diào)用立即返回一個(gè) ChannelFuture 實(shí)例,通過(guò)注冊(cè)監(jiān)聽(tīng)器到 ChannelFuture 上,可以 I/O 操作成功、失敗或取消時(shí)回調(diào)通知調(diào)用方。
  5. 支持關(guān)聯(lián) I/O 操作與對(duì)應(yīng)的處理程序。不同協(xié)議、不同的阻塞類型的連接都有不同的Channel 類型與之對(duì)應(yīng)。下面是一些常用的 Channel 類型:
  1. NioSocketChannel,異步的客戶端 TCP Socket 連接。(最常用) 
  2. NioServerSocketChannel,異步的服務(wù)器端 TCP Socket 連接 
  3. NioDatagramChannel,異步的 UDP 連接 
  4. NioSctpChannel,異步的客戶端 Sctp 連接 
  5. NioSctpServerChannel,異步的 Sctp 服務(wù)器端連接,這些通道涵蓋了 UDP 和 TCP 網(wǎng)絡(luò) IO 以及文件 IO。 

Selector
Netty 基于 Selector 對(duì)象實(shí)現(xiàn) I/O 多路復(fù)用,通過(guò) Selector 一個(gè)線程可以監(jiān)聽(tīng)多個(gè)連接的 Channel 事件。當(dāng)向一個(gè) Selector 中注冊(cè) Channel 后,Selector 內(nèi)部的機(jī)制就可以自動(dòng)不斷地查詢(Select) 這些注冊(cè)的 Channel 是否有已就緒的 I/O 事件(例如可讀,可寫,網(wǎng)絡(luò)連接完成等),這樣程序就可以很簡(jiǎn)單地使用一個(gè)線程高效地管理多個(gè) Channel 。

NioEventLoop
NioEventLoop 中維護(hù)了一個(gè)線程和任務(wù)隊(duì)列,支持異步提交執(zhí)行任務(wù),線程啟動(dòng)時(shí)會(huì)調(diào)用 NioEventLoop 的 run 方法,執(zhí)行 I/O 任務(wù)和非 I/O 任務(wù):

  1. I/O 任務(wù)即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法觸發(fā)。
  2. 非 IO 任務(wù),添加到 taskQueue 中的任務(wù),如 register0、bind0 等任務(wù),由 runAllTasks 方法觸發(fā)。

NioEventLoopGroup
NioEventLoopGroup,主要管理 eventLoop 的生命周期,可以理解為一個(gè)線程池,內(nèi)部維護(hù)了一組線程,每個(gè)線程(NioEventLoop)負(fù)責(zé)處理多個(gè) Channel 上的事件,而一個(gè) Channel 只對(duì)應(yīng)于一個(gè)線程。

ChannelHandler
ChannelHandler 是一個(gè)接口,處理 I/O 事件或攔截 I/O 操作,并將其轉(zhuǎn)發(fā)到其 ChannelPipeline(業(yè)務(wù)處理鏈)中的下一個(gè)處理程序。ChannelHandler 本身并沒(méi)有提供很多方法,因?yàn)檫@個(gè)接口有許多的方法需要實(shí)現(xiàn),方便使用期間,可以繼承它的子類:

  1. ChannelInboundHandler 用于處理入站 I/O 事件 
  2. ChannelOutboundHandler 用于處理出站 I/O 操作 

或者使用以下適配器類:

  1. ChannelInboundHandlerAdapter 用于處理入站 I/O 事件。 
  2. ChannelOutboundHandlerAdapter 用于處理出站 I/O 操作。 

ChannelHandlerContext
保存 Channel 相關(guān)的所有上下文信息,同時(shí)關(guān)聯(lián)一個(gè) ChannelHandler 對(duì)象。

ChannelPipline
保存 ChannelHandler 的 List,用于處理或攔截 Channel 的入站事件和出站操作。ChannelPipeline 實(shí)現(xiàn)了一種高級(jí)形式的攔截過(guò)濾器模式,使用戶可以完全控制事件的處理方式,以及 Channel 中各個(gè)的 ChannelHandler 如何相互交互。在 Netty 中每個(gè) Channel 都有且僅有一個(gè) ChannelPipeline 與之對(duì)應(yīng),它們的組成關(guān)系如下:

一個(gè) Channel 包含了一個(gè) ChannelPipeline,而 ChannelPipeline 中又維護(hù)了一個(gè)由 ChannelHandlerContext 組成的雙向鏈表,并且每個(gè) ChannelHandlerContext 中又關(guān)聯(lián)著一個(gè) ChannelHandler。read事件(入站事件)和write事件(出站事件)在一個(gè)雙向鏈表中,入站事件會(huì)從鏈表 head 往后傳遞到最后一個(gè)入站的 handler,出站事件會(huì)從鏈表 tail 往前傳遞到最前一個(gè)出站的 handler,兩種類型的 handler 互不干擾。

Netty通訊示例
Netty的maven依賴

  1. <dependencies> 
  2.  <dependency> 
  3.   <groupId>io.netty</groupId> 
  4.         <artifactId>netty-all</artifactId> 
  5.         <version>4.1.52.Final</version> 
  6.     </dependency> 
  7. </dependencies> 

服務(wù)端代碼

  1. package com.niuh.netty.base; 
  2.  
  3. import io.netty.bootstrap.ServerBootstrap; 
  4. import io.netty.channel.ChannelFuture; 
  5. import io.netty.channel.ChannelInitializer; 
  6. import io.netty.channel.ChannelOption; 
  7. import io.netty.channel.EventLoopGroup; 
  8. import io.netty.channel.nio.NioEventLoopGroup; 
  9. import io.netty.channel.socket.SocketChannel; 
  10. import io.netty.channel.socket.nio.NioServerSocketChannel; 
  11.  
  12. public class NettyServer { 
  13.  
  14.     public static void main(String[] args) throws Exception { 
  15.         //創(chuàng)建兩個(gè)線程組bossGroup和workerGroup, 含有的子線程N(yùn)ioEventLoop的個(gè)數(shù)默認(rèn)為cpu核數(shù)的兩倍 
  16.         // bossGroup只是處理連接請(qǐng)求 ,真正的和客戶端業(yè)務(wù)處理,會(huì)交給workerGroup完成 
  17.         EventLoopGroup bossGroup = new NioEventLoopGroup(1); 
  18.         EventLoopGroup workerGroup = new NioEventLoopGroup(); 
  19.         try { 
  20.             //創(chuàng)建服務(wù)器端的啟動(dòng)對(duì)象 
  21.             ServerBootstrap bootstrap = new ServerBootstrap(); 
  22.             //使用鏈?zhǔn)骄幊虂?lái)配置參數(shù) 
  23.             bootstrap.group(bossGroup, workerGroup) //設(shè)置兩個(gè)線程組 
  24.                     .channel(NioServerSocketChannel.class) //使用NioServerSocketChannel作為服務(wù)器的通道實(shí)現(xiàn) 
  25.                     // 初始化服務(wù)器連接隊(duì)列大小,服務(wù)端處理客戶端連接請(qǐng)求是順序處理的,所以同一時(shí)間只能處理一個(gè)客戶端連接。 
  26.                     // 多個(gè)客戶端同時(shí)來(lái)的時(shí)候,服務(wù)端將不能處理的客戶端連接請(qǐng)求放在隊(duì)列中等待處理 
  27.                     .option(ChannelOption.SO_BACKLOG, 1024) 
  28.                     .childHandler(new ChannelInitializer<SocketChannel>() {//創(chuàng)建通道初始化對(duì)象,設(shè)置初始化參數(shù) 
  29.  
  30.                         @Override 
  31.                         protected void initChannel(SocketChannel ch) throws Exception { 
  32.                             //對(duì)workerGroup的SocketChannel設(shè)置處理器 
  33.                             ch.pipeline().addLast(new NettyServerHandler()); 
  34.                         } 
  35.                     }); 
  36.             System.out.println("netty server start。。"); 
  37.             //綁定一個(gè)端口并且同步, 生成了一個(gè)ChannelFuture異步對(duì)象,通過(guò)isDone()等方法可以判斷異步事件的執(zhí)行情況 
  38.             //啟動(dòng)服務(wù)器(并綁定端口),bind是異步操作,sync方法是等待異步操作執(zhí)行完畢 
  39.             ChannelFuture cf = bootstrap.bind(9000).sync(); 
  40.             //給cf注冊(cè)監(jiān)聽(tīng)器,監(jiān)聽(tīng)我們關(guān)心的事件 
  41.             /*cf.addListener(new ChannelFutureListener() { 
  42.                 @Override 
  43.                 public void operationComplete(ChannelFuture future) throws Exception { 
  44.                     if (cf.isSuccess()) { 
  45.                         System.out.println("監(jiān)聽(tīng)端口9000成功"); 
  46.                     } else { 
  47.                         System.out.println("監(jiān)聽(tīng)端口9000失敗"); 
  48.                     } 
  49.                 } 
  50.             });*/ 
  51.             //對(duì)通道關(guān)閉進(jìn)行監(jiān)聽(tīng),closeFuture是異步操作,監(jiān)聽(tīng)通道關(guān)閉 
  52.             // 通過(guò)sync方法同步等待通道關(guān)閉處理完畢,這里會(huì)阻塞等待通道關(guān)閉完成 
  53.             cf.channel().closeFuture().sync(); 
  54.         } finally { 
  55.             bossGroup.shutdownGracefully(); 
  56.             workerGroup.shutdownGracefully(); 
  57.         } 
  58.     } 

服務(wù)端所注冊(cè)的自定義回調(diào)函數(shù) NettyServerHandler:

  1. package com.niuh.netty.base; 
  2.  
  3. import io.netty.buffer.ByteBuf; 
  4. import io.netty.buffer.Unpooled; 
  5. import io.netty.channel.ChannelHandlerContext; 
  6. import io.netty.channel.ChannelInboundHandlerAdapter; 
  7. import io.netty.util.CharsetUtil; 
  8.  
  9. /** 
  10.  * 自定義Handler需要繼承netty規(guī)定好的某個(gè)HandlerAdapter(規(guī)范) 
  11.  */ 
  12. public class NettyServerHandler extends ChannelInboundHandlerAdapter { 
  13.  
  14.     /** 
  15.      * 讀取客戶端發(fā)送的數(shù)據(jù) 
  16.      * 
  17.      * @param ctx 上下文對(duì)象, 含有通道channel,管道pipeline 
  18.      * @param msg 就是客戶端發(fā)送的數(shù)據(jù) 
  19.      * @throws Exception 
  20.      */ 
  21.     @Override 
  22.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
  23.         System.out.println("服務(wù)器讀取線程 " + Thread.currentThread().getName()); 
  24.         //Channel channel = ctx.channel(); 
  25.         //ChannelPipeline pipeline = ctx.pipeline(); //本質(zhì)是一個(gè)雙向鏈接, 出站入站 
  26.         //將 msg 轉(zhuǎn)成一個(gè) ByteBuf,類似NIO 的 ByteBuffer 
  27.         ByteBuf buf = (ByteBuf) msg; 
  28.         System.out.println("客戶端發(fā)送消息是:" + buf.toString(CharsetUtil.UTF_8)); 
  29.     } 
  30.  
  31.     /** 
  32.      * 數(shù)據(jù)讀取完畢處理方法 
  33.      * 
  34.      * @param ctx 
  35.      * @throws Exception 
  36.      */ 
  37.     @Override 
  38.     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 
  39.         ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8)); 
  40.         ctx.writeAndFlush(buf); 
  41.     } 
  42.  
  43.     /** 
  44.      * 處理異常, 一般是需要關(guān)閉通道 
  45.      * 
  46.      * @param ctx 
  47.      * @param cause 
  48.      * @throws Exception 
  49.      */ 
  50.     @Override 
  51.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
  52.         ctx.close(); 
  53.     } 

這個(gè)類繼承了ChannelInboundHandlerAdapter的幾個(gè)方法:

  • channelRead方法:當(dāng)客戶端與服務(wù)端連通好之后,客戶端發(fā)數(shù)據(jù)時(shí),服務(wù)端會(huì)主動(dòng)調(diào)用這個(gè)方法。
  • channelReadComplete方法:數(shù)據(jù)處理完畢的方法,ctx.writeAndFlush()就可以往客戶端寫回?cái)?shù)據(jù)了。

客戶端代碼

  1. package com.niuh.netty.base; 
  2.  
  3. import io.netty.bootstrap.Bootstrap; 
  4. import io.netty.channel.ChannelFuture; 
  5. import io.netty.channel.ChannelInitializer; 
  6. import io.netty.channel.EventLoopGroup; 
  7. import io.netty.channel.nio.NioEventLoopGroup; 
  8. import io.netty.channel.socket.SocketChannel; 
  9. import io.netty.channel.socket.nio.NioSocketChannel; 
  10.  
  11. public class NettyClient { 
  12.     public static void main(String[] args) throws Exception { 
  13.         //客戶端需要一個(gè)事件循環(huán)組 
  14.         EventLoopGroup group = new NioEventLoopGroup(); 
  15.         try { 
  16.             //創(chuàng)建客戶端啟動(dòng)對(duì)象 
  17.             //注意客戶端使用的不是ServerBootstrap而是Bootstrap 
  18.             Bootstrap bootstrap = new Bootstrap(); 
  19.             //設(shè)置相關(guān)參數(shù) 
  20.             bootstrap.group(group) //設(shè)置線程組 
  21.                     .channel(NioSocketChannel.class) // 使用NioSocketChannel作為客戶端的通道實(shí)現(xiàn) 
  22.                     .handler(new ChannelInitializer<SocketChannel>() { 
  23.                         @Override 
  24.                         protected void initChannel(SocketChannel ch) throws Exception { 
  25.                             //加入處理器 
  26.                             ch.pipeline().addLast(new NettyClientHandler()); 
  27.                         } 
  28.                     }); 
  29.  
  30.             System.out.println("netty client start。。"); 
  31.             //啟動(dòng)客戶端去連接服務(wù)器端 
  32.             ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync(); 
  33.             //對(duì)通道關(guān)閉進(jìn)行監(jiān)聽(tīng) 
  34.             cf.channel().closeFuture().sync(); 
  35.         } finally { 
  36.             group.shutdownGracefully(); 
  37.         } 
  38.     } 

客戶端自定義回調(diào)函數(shù):

  1. package com.niuh.netty.base; 
  2.  
  3. import io.netty.buffer.ByteBuf; 
  4. import io.netty.buffer.Unpooled; 
  5. import io.netty.channel.ChannelHandlerContext; 
  6. import io.netty.channel.ChannelInboundHandlerAdapter; 
  7. import io.netty.util.CharsetUtil; 
  8.  
  9. public class NettyClientHandler extends ChannelInboundHandlerAdapter { 
  10.  
  11.     /** 
  12.      * 當(dāng)客戶端連接服務(wù)器完成就會(huì)觸發(fā)該方法 
  13.      * 
  14.      * @param ctx 
  15.      * @throws Exception 
  16.      */ 
  17.     @Override 
  18.     public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  19.         ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8)); 
  20.         ctx.writeAndFlush(buf); 
  21.     } 
  22.  
  23.     //當(dāng)通道有讀取事件時(shí)會(huì)觸發(fā),即服務(wù)端發(fā)送數(shù)據(jù)給客戶端 
  24.     @Override 
  25.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
  26.         ByteBuf buf = (ByteBuf) msg; 
  27.         System.out.println("收到服務(wù)端的消息:" + buf.toString(CharsetUtil.UTF_8)); 
  28.         System.out.println("服務(wù)端的地址: " + ctx.channel().remoteAddress()); 
  29.     } 
  30.  
  31.     @Override 
  32.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
  33.         cause.printStackTrace(); 
  34.         ctx.close(); 
  35.     } 

與NettyServerHandler類似,其中的channelActive()方法是當(dāng)客戶端與服務(wù)器連接完成時(shí)候就會(huì)執(zhí)行的方法。

看完代碼,我們發(fā)現(xiàn)Netty架的目標(biāo)就是讓你的業(yè)務(wù)邏輯從網(wǎng)絡(luò)基礎(chǔ)應(yīng)用編碼中分離出來(lái),讓你可以專 注業(yè)務(wù)的開(kāi)發(fā),而不需寫一大堆類似NIO的網(wǎng)絡(luò)處理操作。

ByteBuf 理解
從結(jié)構(gòu)上來(lái)說(shuō),ByteBuf 由一串字節(jié)數(shù)組構(gòu)成。數(shù)組中每個(gè)字節(jié)用來(lái)存放信息。

ByteBuf 提供了兩個(gè)索引,一個(gè)用于讀取數(shù)據(jù),一個(gè)用于寫入數(shù)據(jù)。這兩個(gè)索引通過(guò)在字節(jié)數(shù)組中移動(dòng),來(lái)定位需要讀或者寫信息的位置。

  • 當(dāng)從 ByteBuf 讀取時(shí),它的 readerIndex(讀索引)將會(huì)根據(jù)讀取的字節(jié)數(shù)遞增。
  • 同樣,當(dāng)寫 ByteBuf 時(shí),它的 writerIndex 也會(huì)根據(jù)寫入的字節(jié)數(shù)進(jìn)行遞增。

ByteBuf.png

需要注意的是極限的情況是 readerIndex 剛好讀到了 writerIndex 寫入的地方。如果 readerIndex 超過(guò)了 writerIndex 的時(shí)候,Netty 會(huì)拋出 IndexOutOf-BoundsException 異常。

示例代碼

  1. package com.niuh.netty.base; 
  2.  
  3. import io.netty.buffer.ByteBuf; 
  4. import io.netty.buffer.Unpooled; 
  5. import io.netty.util.CharsetUtil; 
  6.  
  7. public class NettyByteBuf { 
  8.     public static void main(String[] args) { 
  9.         // 創(chuàng)建byteBuf對(duì)象,該對(duì)象內(nèi)部包含一個(gè)字節(jié)數(shù)組byte[10] 
  10.         // 通過(guò)readerindex和writerIndex和capacity,將buffer分成三個(gè)區(qū)域 
  11.         // 已經(jīng)讀取的區(qū)域:[0,readerindex) 
  12.         // 可讀取的區(qū)域:[readerindex,writerIndex) 
  13.         // 可寫的區(qū)域: [writerIndex,capacity) 
  14.         ByteBuf byteBuf = Unpooled.buffer(10); 
  15.         System.out.println("byteBuf=" + byteBuf); 
  16.  
  17.         for (int i = 0; i < 8; i++) { 
  18.             byteBuf.writeByte(i); 
  19.         } 
  20.         System.out.println("byteBuf=" + byteBuf); 
  21.  
  22.         for (int i = 0; i < 5; i++) { 
  23.             System.out.println(byteBuf.getByte(i)); 
  24.         } 
  25.         System.out.println("byteBuf=" + byteBuf); 
  26.  
  27.         for (int i = 0; i < 5; i++) { 
  28.             System.out.println(byteBuf.readByte()); 
  29.         } 
  30.         System.out.println("byteBuf=" + byteBuf); 
  31.  
  32.  
  33.         //用Unpooled工具類創(chuàng)建ByteBuf 
  34.         ByteBuf byteBuf2 = Unpooled.copiedBuffer("hello,zhangsan!", CharsetUtil.UTF_8); 
  35.         //使用相關(guān)的方法 
  36.         if (byteBuf2.hasArray()) { 
  37.             byte[] content = byteBuf2.array(); 
  38.             //將 content 轉(zhuǎn)成字符串 
  39.             System.out.println(new String(content, CharsetUtil.UTF_8)); 
  40.             System.out.println("byteBuf=" + byteBuf2); 
  41.  
  42.             System.out.println(byteBuf2.readerIndex()); // 0 
  43.             System.out.println(byteBuf2.writerIndex()); // 12 
  44.             System.out.println(byteBuf2.capacity()); // 36 
  45.  
  46.             System.out.println(byteBuf2.getByte(0)); // 獲取數(shù)組0這個(gè)位置的字符h的ascii碼,h=104 
  47.  
  48.             int len = byteBuf2.readableBytes(); //可讀的字節(jié)數(shù)  12 
  49.             System.out.println("len=" + len); 
  50.  
  51.             //使用for取出各個(gè)字節(jié) 
  52.             for (int i = 0; i < len; i++) { 
  53.                 System.out.println((char) byteBuf2.getByte(i)); 
  54.             } 
  55.  
  56.             //范圍讀取 
  57.             System.out.println(byteBuf2.getCharSequence(0, 6, CharsetUtil.UTF_8)); 
  58.             System.out.println(byteBuf2.getCharSequence(6, 6, CharsetUtil.UTF_8)); 
  59.         } 
  60.     } 

PS:以上代碼提交在 Github :https://github.com/Niuh-Study/niuh-netty.git

責(zé)任編輯:姜華 來(lái)源: 今日頭條
相關(guān)推薦

2020-07-02 09:15:59

Netty內(nèi)存RPC

2022-04-12 08:00:17

socket 編程網(wǎng)絡(luò)編程網(wǎng)絡(luò) IO 模型

2021-07-16 11:35:20

Java線程池代碼

2022-04-11 10:56:43

線程安全

2025-04-21 04:00:00

2024-09-04 16:19:06

語(yǔ)言模型統(tǒng)計(jì)語(yǔ)言模型

2022-09-29 15:39:10

服務(wù)器NettyReactor

2017-12-05 17:44:31

機(jī)器學(xué)習(xí)CNN卷積層

2024-01-03 13:39:00

JS,Javascrip算法

2023-10-18 10:55:55

HashMap

2025-01-13 16:00:00

服務(wù)網(wǎng)關(guān)分布式系統(tǒng)架構(gòu)

2025-04-11 05:55:00

2021-10-11 11:58:41

Channel原理recvq

2023-05-29 08:12:38

2025-03-17 00:21:00

2021-10-09 19:05:06

channelGo原理

2021-12-29 17:29:07

KubernetesEvents集群

2023-09-28 08:15:05

SpringBean加載

2021-06-16 14:18:37

NettyReactor線程模型

2024-05-31 08:10:58

Netty線程模型多路復(fù)用模型
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)