自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

基于 Netty 服務(wù)端快速了解核心組件

開發(fā) 網(wǎng)絡(luò)
由于Netty優(yōu)秀的設(shè)計(jì)和封裝,開發(fā)一個(gè)高性能網(wǎng)絡(luò)程序就變得非常簡單,本文從一個(gè)簡單的服務(wù)端落地簡單介紹一下Netty中的幾個(gè)核心組件,希望對你有幫助。

由于Netty優(yōu)秀的設(shè)計(jì)和封裝,開發(fā)一個(gè)高性能網(wǎng)絡(luò)程序就變得非常簡單,本文從一個(gè)簡單的服務(wù)端落地簡單介紹一下Netty中的幾個(gè)核心組件,希望對你有幫助。

快速落地一個(gè)服務(wù)端

我們希望通過Netty快速落地一個(gè)簡單的主從reactor模型,由主reactor對應(yīng)的線程組接收連接交由acceptor創(chuàng)建連接,與之建立的客戶端的讀寫事件都會(huì)交由從reactor對應(yīng)的線程池處理:

基于此設(shè)計(jì),我們通過Netty寫下如下代碼,可以看到我們做了如下幾件事:

  • 聲明一個(gè)服務(wù)端創(chuàng)建引導(dǎo)類ServerBootstrap ,負(fù)責(zé)配置服務(wù)端及其啟動(dòng)工作。
  • 聲明主從reactor線程組,其中boss可以看作監(jiān)聽端口接收新連接的線程組,而work則是負(fù)責(zé)處理客戶端數(shù)據(jù)讀寫的線程組。
  • 基于上述線程池作為group的入?yún)⑼瓿芍鲝膔eactor模型創(chuàng)建。
  • 通過channel函數(shù)指定server channe為NioServerSocketChannel即采用NIO模型,而NioServerSocketChannel我們可以直接理解為serverSocket的抽象表示。
  • 通過childHandler方法給引導(dǎo)設(shè)置每一個(gè)連接數(shù)據(jù)讀寫的處理器handler。

最后調(diào)用bind啟動(dòng)服務(wù)端并通過addListener對連接結(jié)果進(jìn)行異步監(jiān)聽:

public static void main(String[] args) {
        //1. 聲明一個(gè)服務(wù)端創(chuàng)建引導(dǎo)類
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //2. 聲明主從reactor線程組
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());


        serverBootstrap.group(boss, worker)//3. 基于上述線程池創(chuàng)建主從reactor模型
                .channel(NioServerSocketChannel.class)//server channel采用NIO模型
                .childHandler(new ChannelInitializer<NioSocketChannel>() {//添加客戶端讀寫請求處理器到subreactor中
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // 對于ChannelInboundHandlerAdapter,收到消息后會(huì)按照順序執(zhí)行即 A -> B->ServerHandler
                        ch.pipeline().addLast(new InboundHandlerA())
                                .addLast(new InboundHandlerB())
                                .addLast(new ServerHandler());

                        // 處理寫數(shù)據(jù)的邏輯,順序是反著的 B -> A
                        ch.pipeline().addLast(new OutboundHandlerA())
                                .addLast(new OutboundHandlerB())
                                .addLast(new OutboundHandlerC());

                        ch.pipeline().addLast(new ExceptionHandler());
                    }
                });
        //綁定8080端口并設(shè)置回調(diào)監(jiān)聽結(jié)果
        serverBootstrap.bind("127.0.0.1", 8080)
                .addListener(f -> {
                    if (f.isSuccess()) {
                        System.out.println("連接成功");
                    }
                });
    }

對于客戶端的發(fā)送的數(shù)據(jù),我們都會(huì)通過ChannelInboundHandlerAdapter添加順序處理,就如代碼所示我們的執(zhí)行順序?yàn)镮nboundHandlerA->InboundHandlerB->ServerHandler,對此我們給出InboundHandlerA的代碼,InboundHandlerB內(nèi)容一樣就不展示了:

public class InboundHandlerA extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InBoundHandlerA : " + ((ByteBuf)msg).toString(StandardCharsets.UTF_8));
        //將當(dāng)前的處理過的msg轉(zhuǎn)交給pipeline的下一個(gè)ChannelHandler
        super.channelRead(ctx, msg);
    }
}

而ServerHandler的則是:

  • 客戶端與服務(wù)端建立連接,對應(yīng)客戶端channel被激活,觸發(fā)channelActive方法。
  • ChannelHandlerContext 的 Channel 已注冊到其 EventLoop 中,執(zhí)行channelRegistered。
  • 將 ChannelHandler 添加到實(shí)際上下文并準(zhǔn)備好處理事件后調(diào)用。

解析客戶端的數(shù)據(jù)然后回復(fù)Hello Netty client :

private static class ServerHandler extends ChannelInboundHandlerAdapter {

  @Override
        public void channelActive(ChannelHandlerContext ctx) {
            System.out.println("channel被激活,執(zhí)行channelActive");
        }

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) {
            System.out.println("執(zhí)行channelRegistered");
        }

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            System.out.println("執(zhí)行handlerAdded");
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf byteBuf = (ByteBuf) msg;
            //打印讀取到的數(shù)據(jù)
            System.out.println(new Date() + ": 服務(wù)端讀到數(shù)據(jù) -> " + byteBuf.toString(StandardCharsets.UTF_8));

            // 回復(fù)客戶端數(shù)據(jù)
            System.out.println(new Date() + ": 服務(wù)端寫出數(shù)據(jù)");
            //組裝數(shù)據(jù)并發(fā)送
            ByteBuf out = getByteBuf(ctx);
            ctx.channel().writeAndFlush(out);
            super.channelRead(ctx, msg);
        }

        private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
            ByteBuf buffer = ctx.alloc().buffer();

            byte[] bytes = "Hello Netty client ".getBytes(StandardCharsets.UTF_8);

            buffer.writeBytes(bytes);

            return buffer;
        }

      //......
    }

我們通過telnet 對應(yīng)ip和端口后發(fā)現(xiàn)服務(wù)端輸出如下內(nèi)容,也很我們上文說明的一致:

執(zhí)行handlerAdded
執(zhí)行channelRegistered
端口綁定成功,channel被激活,執(zhí)行channelActive

然后我們發(fā)送消息 1,可以看到觸發(fā)所有inbound的channelRead方法:

InBoundHandlerA : 1
InBoundHandlerB: 1
Wed Jul 24 00:05:18 CST 2024: 服務(wù)端讀到數(shù)據(jù) -> 1

然后我們回復(fù)hello netty client,按照添加的倒敘觸發(fā)OutBoundHandler:

Wed Jul 24 00:05:18 CST 2024: 服務(wù)端寫出數(shù)據(jù)
OutBoundHandlerC: Hello Netty client 
OutBoundHandlerB: Hello Netty client 
OutBoundHandlerA: Hello Netty client 

詳解Netty中的核心組件

channel接口

channel是Netty對于底層class socket中的bind、connect、read、write等原語的封裝,簡化了我們網(wǎng)絡(luò)編程的復(fù)雜度,同時(shí)Netty也提供的各種現(xiàn)成的channel,我們可以根據(jù)個(gè)人需要自行使用。 下面便是筆者比較常用的Tcp或者UDP中比較常用的幾種channel。

  • NioServerSocketChannel:基于NIO選擇器處理新連接。
  • EpollServerSocketChannel:使用 linux EPOLL Edge 觸發(fā)模式實(shí)現(xiàn)最大性能的實(shí)現(xiàn)。
  • NioDatagramChannel:發(fā)送和接收 AddressedEnvelope 的 NIO 數(shù)據(jù)報(bào)通道。
  • EpollDatagramChannel:使用 linux EPOLL Edge 觸發(fā)模式實(shí)現(xiàn)最大性能的 DatagramChannel 實(shí)現(xiàn)。

EventLoop接口

在Netty中,所有channel都會(huì)注冊到某個(gè)eventLoop上, 每一個(gè)EventLoopGroup中有一個(gè)或者多個(gè)EventLoop,而每一個(gè)EventLoop都綁定一個(gè)線程,負(fù)責(zé)處理一個(gè)或者多個(gè)channel的事件:

這里我們也簡單的給出NioEventLoop中的run方法,它繼承自SingleThreadEventExecutor,我們可以大概看到NioEventLoop的核心邏輯本質(zhì)就是輪詢所有注冊到NioEventLoop上的channel(socket的抽象)是否有其就緒的事件,然后

  @Override
    protected void run() {
        for (;;) {
            try {
             //基于selectStrategy輪詢查看是否有就緒事件
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
      //......

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                //根據(jù)IO配比執(zhí)行網(wǎng)絡(luò)IO事件方法processSelectedKeys以及其他事件方法runAllTasks
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
           //......
        }
    }

pipeline和channelHandler以channelHandlerContext

每一個(gè)channel的事件都會(huì)交由channelHandler處理,而負(fù)責(zé)同一個(gè)channel的channelHandler都會(huì)交由pipeline一條邏輯鏈進(jìn)行連接,這兩者的的關(guān)系都會(huì)一一封裝成channelHandlerContext,channelHandlerContext主要是負(fù)責(zé)當(dāng)前channelHandler和與其同一條channelpipeline上的其他channelHandler之間的交互。

舉個(gè)例子,當(dāng)我們收到客戶端的寫入數(shù)據(jù)時(shí),這些數(shù)據(jù)就會(huì)交由pipeline上的channelHandler處理,如下圖所示,從第一個(gè)channelHandler處理完成之后,每個(gè)channelHandlerContext就會(huì)將消息轉(zhuǎn)交到當(dāng)前pipeline的下一個(gè)channelHandler處理:

假設(shè)我們的channelHandler執(zhí)行完ChannelActive后,如希望繼續(xù)傳播則會(huì)調(diào)用fireChannelActive:

 @Override
        public void channelActive(ChannelHandlerContext ctx) {
            System.out.println("端口綁定成功,channel被激活,執(zhí)行channelActive");
            ctx.fireChannelActive()
        }

查看其內(nèi)部邏輯即可知曉,它就是通過AbstractChannelHandlerContext得到pipeline的下一個(gè)ChannelHandler并執(zhí)行其channelActive方法:

 @Override
    public ChannelHandlerContext fireChannelActive() {
        final AbstractChannelHandlerContext next = findContextInbound();
        invokeChannelActive(next);
        return this;
    }

回調(diào)的思想

我們可以說回調(diào)其實(shí)是一種設(shè)計(jì)思想,Netty對于連接或者讀寫操作都是異步非阻塞的,所以我們希望在連接被建立進(jìn)行一些響應(yīng)的處理,那么Netty就會(huì)在連接建立的時(shí)候方法暴露一個(gè)回調(diào)方法供用戶實(shí)現(xiàn)個(gè)性邏輯。

例如我們的channel連接被建立時(shí),其底層就會(huì)調(diào)用invokeChannelActive獲取我們綁定的ChannelInboundHandler并執(zhí)行其channelActive方法:

private void invokeChannelActive() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelActive(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelActive();
        }
    }

于是就會(huì)調(diào)用到我們服務(wù)端ServerHandler 的channelActive方法:

private static class ServerHandler extends ChannelInboundHandlerAdapter {

        //......

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            System.out.println("端口綁定成功,channel被激活,執(zhí)行channelActive");
        }
  //......
}

Future異步監(jiān)聽

為保證網(wǎng)絡(luò)服務(wù)器執(zhí)行的效率,Netty大部分網(wǎng)絡(luò)IO操作都采用異步的,以筆者建立連接設(shè)置的監(jiān)聽器為例,當(dāng)前連接成功后,就會(huì)返回給監(jiān)聽器一個(gè)java.util.concurrent.Future,我們就可以通過這個(gè)f獲取連接的結(jié)果是否成功:

//綁定8080端口并設(shè)置回調(diào)監(jiān)聽結(jié)果
        serverBootstrap.bind("127.0.0.1", 8080)
                .addListener(f -> {
                    if (f.isSuccess()) {
                        System.out.println("連接成功");
                    }
                });

我們步入DefaultPromise的addListener即可發(fā)現(xiàn)其內(nèi)部就是添加監(jiān)聽后判斷這個(gè)連接的異步任務(wù)Future是否完成,如果完成調(diào)用notifyListeners回調(diào)我們的監(jiān)聽器的邏輯:

@Override
    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        //......
  //添加監(jiān)聽
        synchronized (this) {
            addListener0(listener);
        }
  //連接任務(wù)完成,通知監(jiān)聽器
        if (isDone()) {
            notifyListeners();
        }

        return this;
    }
責(zé)任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關(guān)推薦

2022-09-30 10:44:47

Netty組件數(shù)據(jù)

2023-09-11 10:53:32

2024-04-10 10:09:07

2024-05-30 08:04:20

Netty核心組件架構(gòu)

2023-07-26 10:21:26

服務(wù)端組件客戶端

2016-03-18 09:04:42

swift服務(wù)端

2021-10-14 08:39:17

Java Netty Java 基礎(chǔ)

2011-04-22 10:13:35

SimpleFrame

2013-03-25 10:08:44

PHPWeb

2012-03-02 10:38:33

MySQL

2023-06-30 09:46:00

服務(wù)物理機(jī)自動(dòng)化

2024-01-16 08:05:53

2022-10-08 00:01:00

ssrvuereact

2010-08-03 09:59:30

NFS服務(wù)

2016-11-03 09:59:38

kotlinjavaspring

2021-05-25 08:20:37

編程技能開發(fā)

2023-08-08 08:17:23

VasDolly服務(wù)端參數(shù)

2010-03-18 18:09:36

Java Socket

2021-04-26 13:20:06

Vue服務(wù)端渲染前端

2010-03-19 18:17:17

Java Server
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號