基于 Netty 服務(wù)端快速了解核心組件
由于Netty優(yōu)秀的設(shè)計(jì)和封裝,開發(fā)一個(gè)高性能網(wǎng)絡(luò)程序就變得非常簡單,本文從一個(gè)簡單的服務(wù)端落地簡單介紹一下Netty中的幾個(gè)核心組件,希望對你有幫助。
快速落地一個(gè)服務(wù)端
我們希望通過Netty快速落地一個(gè)簡單的主從reactor模型,由主reactor對應(yīng)的線程組接收連接交由acceptor創(chuàng)建連接,與之建立的客戶端的讀寫事件都會(huì)交由從reactor對應(yīng)的線程池處理:
基于此設(shè)計(jì),我們通過Netty寫下如下代碼,可以看到我們做了如下幾件事:
- 聲明一個(gè)服務(wù)端創(chuàng)建引導(dǎo)類ServerBootstrap ,負(fù)責(zé)配置服務(wù)端及其啟動(dòng)工作。
- 聲明主從reactor線程組,其中boss可以看作監(jiān)聽端口接收新連接的線程組,而work則是負(fù)責(zé)處理客戶端數(shù)據(jù)讀寫的線程組。
- 基于上述線程池作為group的入?yún)⑼瓿芍鲝膔eactor模型創(chuàng)建。
- 通過channel函數(shù)指定server channe為NioServerSocketChannel即采用NIO模型,而NioServerSocketChannel我們可以直接理解為serverSocket的抽象表示。
- 通過childHandler方法給引導(dǎo)設(shè)置每一個(gè)連接數(shù)據(jù)讀寫的處理器handler。
最后調(diào)用bind啟動(dòng)服務(wù)端并通過addListener對連接結(jié)果進(jìn)行異步監(jiān)聽:
public static void main(String[] args) {
//1. 聲明一個(gè)服務(wù)端創(chuàng)建引導(dǎo)類
ServerBootstrap serverBootstrap = new ServerBootstrap();
//2. 聲明主從reactor線程組
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());
serverBootstrap.group(boss, worker)//3. 基于上述線程池創(chuàng)建主從reactor模型
.channel(NioServerSocketChannel.class)//server channel采用NIO模型
.childHandler(new ChannelInitializer<NioSocketChannel>() {//添加客戶端讀寫請求處理器到subreactor中
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 對于ChannelInboundHandlerAdapter,收到消息后會(huì)按照順序執(zhí)行即 A -> B->ServerHandler
ch.pipeline().addLast(new InboundHandlerA())
.addLast(new InboundHandlerB())
.addLast(new ServerHandler());
// 處理寫數(shù)據(jù)的邏輯,順序是反著的 B -> A
ch.pipeline().addLast(new OutboundHandlerA())
.addLast(new OutboundHandlerB())
.addLast(new OutboundHandlerC());
ch.pipeline().addLast(new ExceptionHandler());
}
});
//綁定8080端口并設(shè)置回調(diào)監(jiān)聽結(jié)果
serverBootstrap.bind("127.0.0.1", 8080)
.addListener(f -> {
if (f.isSuccess()) {
System.out.println("連接成功");
}
});
}
對于客戶端的發(fā)送的數(shù)據(jù),我們都會(huì)通過ChannelInboundHandlerAdapter添加順序處理,就如代碼所示我們的執(zhí)行順序?yàn)镮nboundHandlerA->InboundHandlerB->ServerHandler,對此我們給出InboundHandlerA的代碼,InboundHandlerB內(nèi)容一樣就不展示了:
public class InboundHandlerA extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerA : " + ((ByteBuf)msg).toString(StandardCharsets.UTF_8));
//將當(dāng)前的處理過的msg轉(zhuǎn)交給pipeline的下一個(gè)ChannelHandler
super.channelRead(ctx, msg);
}
}
而ServerHandler的則是:
- 客戶端與服務(wù)端建立連接,對應(yīng)客戶端channel被激活,觸發(fā)channelActive方法。
- ChannelHandlerContext 的 Channel 已注冊到其 EventLoop 中,執(zhí)行channelRegistered。
- 將 ChannelHandler 添加到實(shí)際上下文并準(zhǔn)備好處理事件后調(diào)用。
解析客戶端的數(shù)據(jù)然后回復(fù)Hello Netty client :
private static class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("channel被激活,執(zhí)行channelActive");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
System.out.println("執(zhí)行channelRegistered");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
System.out.println("執(zhí)行handlerAdded");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
//打印讀取到的數(shù)據(jù)
System.out.println(new Date() + ": 服務(wù)端讀到數(shù)據(jù) -> " + byteBuf.toString(StandardCharsets.UTF_8));
// 回復(fù)客戶端數(shù)據(jù)
System.out.println(new Date() + ": 服務(wù)端寫出數(shù)據(jù)");
//組裝數(shù)據(jù)并發(fā)送
ByteBuf out = getByteBuf(ctx);
ctx.channel().writeAndFlush(out);
super.channelRead(ctx, msg);
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
ByteBuf buffer = ctx.alloc().buffer();
byte[] bytes = "Hello Netty client ".getBytes(StandardCharsets.UTF_8);
buffer.writeBytes(bytes);
return buffer;
}
//......
}
我們通過telnet 對應(yīng)ip和端口后發(fā)現(xiàn)服務(wù)端輸出如下內(nèi)容,也很我們上文說明的一致:
執(zhí)行handlerAdded
執(zhí)行channelRegistered
端口綁定成功,channel被激活,執(zhí)行channelActive
然后我們發(fā)送消息 1,可以看到觸發(fā)所有inbound的channelRead方法:
InBoundHandlerA : 1
InBoundHandlerB: 1
Wed Jul 24 00:05:18 CST 2024: 服務(wù)端讀到數(shù)據(jù) -> 1
然后我們回復(fù)hello netty client,按照添加的倒敘觸發(fā)OutBoundHandler:
Wed Jul 24 00:05:18 CST 2024: 服務(wù)端寫出數(shù)據(jù)
OutBoundHandlerC: Hello Netty client
OutBoundHandlerB: Hello Netty client
OutBoundHandlerA: Hello Netty client
詳解Netty中的核心組件
channel接口
channel是Netty對于底層class socket中的bind、connect、read、write等原語的封裝,簡化了我們網(wǎng)絡(luò)編程的復(fù)雜度,同時(shí)Netty也提供的各種現(xiàn)成的channel,我們可以根據(jù)個(gè)人需要自行使用。 下面便是筆者比較常用的Tcp或者UDP中比較常用的幾種channel。
- NioServerSocketChannel:基于NIO選擇器處理新連接。
- EpollServerSocketChannel:使用 linux EPOLL Edge 觸發(fā)模式實(shí)現(xiàn)最大性能的實(shí)現(xiàn)。
- NioDatagramChannel:發(fā)送和接收 AddressedEnvelope 的 NIO 數(shù)據(jù)報(bào)通道。
- EpollDatagramChannel:使用 linux EPOLL Edge 觸發(fā)模式實(shí)現(xiàn)最大性能的 DatagramChannel 實(shí)現(xiàn)。
EventLoop接口
在Netty中,所有channel都會(huì)注冊到某個(gè)eventLoop上, 每一個(gè)EventLoopGroup中有一個(gè)或者多個(gè)EventLoop,而每一個(gè)EventLoop都綁定一個(gè)線程,負(fù)責(zé)處理一個(gè)或者多個(gè)channel的事件:
這里我們也簡單的給出NioEventLoop中的run方法,它繼承自SingleThreadEventExecutor,我們可以大概看到NioEventLoop的核心邏輯本質(zhì)就是輪詢所有注冊到NioEventLoop上的channel(socket的抽象)是否有其就緒的事件,然后
@Override
protected void run() {
for (;;) {
try {
//基于selectStrategy輪詢查看是否有就緒事件
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
//......
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
//根據(jù)IO配比執(zhí)行網(wǎng)絡(luò)IO事件方法processSelectedKeys以及其他事件方法runAllTasks
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
//......
}
}
pipeline和channelHandler以channelHandlerContext
每一個(gè)channel的事件都會(huì)交由channelHandler處理,而負(fù)責(zé)同一個(gè)channel的channelHandler都會(huì)交由pipeline一條邏輯鏈進(jìn)行連接,這兩者的的關(guān)系都會(huì)一一封裝成channelHandlerContext,channelHandlerContext主要是負(fù)責(zé)當(dāng)前channelHandler和與其同一條channelpipeline上的其他channelHandler之間的交互。
舉個(gè)例子,當(dāng)我們收到客戶端的寫入數(shù)據(jù)時(shí),這些數(shù)據(jù)就會(huì)交由pipeline上的channelHandler處理,如下圖所示,從第一個(gè)channelHandler處理完成之后,每個(gè)channelHandlerContext就會(huì)將消息轉(zhuǎn)交到當(dāng)前pipeline的下一個(gè)channelHandler處理:
假設(shè)我們的channelHandler執(zhí)行完ChannelActive后,如希望繼續(xù)傳播則會(huì)調(diào)用fireChannelActive:
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("端口綁定成功,channel被激活,執(zhí)行channelActive");
ctx.fireChannelActive()
}
查看其內(nèi)部邏輯即可知曉,它就是通過AbstractChannelHandlerContext得到pipeline的下一個(gè)ChannelHandler并執(zhí)行其channelActive方法:
@Override
public ChannelHandlerContext fireChannelActive() {
final AbstractChannelHandlerContext next = findContextInbound();
invokeChannelActive(next);
return this;
}
回調(diào)的思想
我們可以說回調(diào)其實(shí)是一種設(shè)計(jì)思想,Netty對于連接或者讀寫操作都是異步非阻塞的,所以我們希望在連接被建立進(jìn)行一些響應(yīng)的處理,那么Netty就會(huì)在連接建立的時(shí)候方法暴露一個(gè)回調(diào)方法供用戶實(shí)現(xiàn)個(gè)性邏輯。
例如我們的channel連接被建立時(shí),其底層就會(huì)調(diào)用invokeChannelActive獲取我們綁定的ChannelInboundHandler并執(zhí)行其channelActive方法:
private void invokeChannelActive() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelActive();
}
}
于是就會(huì)調(diào)用到我們服務(wù)端ServerHandler 的channelActive方法:
private static class ServerHandler extends ChannelInboundHandlerAdapter {
//......
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("端口綁定成功,channel被激活,執(zhí)行channelActive");
}
//......
}
Future異步監(jiān)聽
為保證網(wǎng)絡(luò)服務(wù)器執(zhí)行的效率,Netty大部分網(wǎng)絡(luò)IO操作都采用異步的,以筆者建立連接設(shè)置的監(jiān)聽器為例,當(dāng)前連接成功后,就會(huì)返回給監(jiān)聽器一個(gè)java.util.concurrent.Future,我們就可以通過這個(gè)f獲取連接的結(jié)果是否成功:
//綁定8080端口并設(shè)置回調(diào)監(jiān)聽結(jié)果
serverBootstrap.bind("127.0.0.1", 8080)
.addListener(f -> {
if (f.isSuccess()) {
System.out.println("連接成功");
}
});
我們步入DefaultPromise的addListener即可發(fā)現(xiàn)其內(nèi)部就是添加監(jiān)聽后判斷這個(gè)連接的異步任務(wù)Future是否完成,如果完成調(diào)用notifyListeners回調(diào)我們的監(jiān)聽器的邏輯:
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
//......
//添加監(jiān)聽
synchronized (this) {
addListener0(listener);
}
//連接任務(wù)完成,通知監(jiān)聽器
if (isDone()) {
notifyListeners();
}
return this;
}