自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

一文快速了解高性能網(wǎng)絡(luò)通信框架 Netty

開發(fā)
本文將進(jìn)行一個(gè)Netty的快速入門,通過本文你將對(duì)Netty的核心組件和使用有著初步的認(rèn)識(shí)。

傳統(tǒng)BIO與不完美的解決方案

1.BIO編程及其問題

Java程序員早期進(jìn)行網(wǎng)絡(luò)程序開發(fā)的時(shí)候,采用的都是傳統(tǒng)BIO模式進(jìn)行開發(fā),這種模式工作流程非常簡單:

  • 阻塞監(jiān)聽。
  • 收到連接分配現(xiàn)場處理該連接。
  • 循環(huán)回到第一步。

這種做法在少量的客戶端連接下還是可以保證可靠運(yùn)行的,我們都知道每當(dāng)服務(wù)器啟動(dòng)就會(huì)其一個(gè)端口監(jiān)聽連接,筆者以自己的服務(wù)器的1234號(hào)進(jìn)程為例:

netstat -ano | findstr :1234

此時(shí)對(duì)應(yīng)的端口使用情況為只有一個(gè)8080端口監(jiān)聽:

  TCP    0.0.0.0:8080           0.0.0.0:0              LISTENING       11312
  TCP    [::]:8080              [::]:0                 LISTENING       11312

每當(dāng)我們一個(gè)客戶端接入,服務(wù)器就會(huì)為其分配一個(gè)端口端口處理和該客戶端的收發(fā),以筆者的程序?yàn)槔?,可以看到此時(shí)該進(jìn)程正使用58891與客戶端socket進(jìn)程交互:

 TCP    0.0.0.0:8080           0.0.0.0:0              LISTENING       11312
  TCP    127.0.0.1:8080         127.0.0.1:58891        ESTABLISHED     11312
  TCP    127.0.0.1:58891        127.0.0.1:8080         ESTABLISHED     4928
  TCP    [::]:8080              [::]:0                 LISTENING       11312

由此可知,一旦遇到高并發(fā)IO讀寫,由于一個(gè)客戶端綁定一個(gè)線程的模式,所以每一個(gè)端口號(hào)的收發(fā)都需要一個(gè)線程進(jìn)程處理,如果有大量連接接入勢(shì)必導(dǎo)致頻繁的線程上下文切換進(jìn)而導(dǎo)致各種資源的消耗,由此導(dǎo)致著名的C10k問題:

這里筆者也給出一段比較基礎(chǔ)的bio代碼示例供讀者參考一下這種實(shí)現(xiàn),可以看到我們的主線程阻塞監(jiān)聽,每當(dāng)收到一個(gè)新的連接就創(chuàng)建一個(gè)線程處理這個(gè)客戶端的讀寫請(qǐng)求:

public class IOServer {
    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(8888);
        //創(chuàng)建一個(gè)線程等待連接進(jìn)來的客戶端
        new Thread(() -> waitConnect(serverSocket)).start();
    }

    private static void waitConnect(ServerSocket serverSocket) {
        while (true) {
            try {
                // 1. 阻塞方法獲取新連接
                Socket socket = serverSocket.accept();

                // 2. 每個(gè)客戶端來了,就專門創(chuàng)建一個(gè)新的連接處理
                new Thread(() -> {
                    int len;
                    byte[] data = new byte[1024];
                    try {
                        InputStream inputStream = socket.getInputStream();
                        // 3. 按字節(jié)流方式讀取數(shù)據(jù)
                        while ((len = inputStream.read(data)) != -1) {
                            System.out.println(Thread.currentThread().getName() + " receive msg:" + new String(data, 0, len));
                        }
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }).start();

            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

2.epoll事件驅(qū)動(dòng)編程

于是就有了epoll事件驅(qū)動(dòng)編程這一方案,也就是我們常說的IO多路復(fù)用,該方案的理念是將所有socket的讀寫事件注冊(cè)到epoll上。 以我們的服務(wù)端為例,創(chuàng)建socket監(jiān)聽連接時(shí)就會(huì)將自己的感興趣的連接事件注冊(cè)到epoll上,隨后服務(wù)端就可以在循環(huán)中非阻塞的獲取是否有連接接入,每當(dāng)有連接接入就會(huì)為請(qǐng)求客戶端建立連接并將其讀寫事件注冊(cè)到處理客戶都安的epoll上,后續(xù)所有客戶端讀寫請(qǐng)求都會(huì)交給這個(gè)epoll處理,由此實(shí)現(xiàn)最少的線程做最多的事情,提升性能同時(shí)還降低消耗:

對(duì)此我們也用一段偽代碼展示一下事件驅(qū)動(dòng)編程:

//創(chuàng)建epoll
EpollFd epollFd=createEpoll();
//將文件描述符注冊(cè)到epoll上
epollCreateCtl(epollFd,socketFdList)

while(true){
 //收到epoll推送過來的事件
 List<event> eventList=epollWait(epollFd);
 //遍歷并處理事件
 eventList.foreach(e->handler(e));
}

3.JDK傳統(tǒng)事件驅(qū)動(dòng)編程

基于上述描述我們對(duì)事件驅(qū)動(dòng)編程有了初步的了解,接下來我們就來看看原生的jdk是如何實(shí)現(xiàn)NIO事件驅(qū)動(dòng)編程的。

首先我們需要?jiǎng)?chuàng)建一個(gè)serverSelector用于非阻塞查詢是否有就緒的socket事件,一旦收到客戶端的請(qǐng)求后,為其建立連接之后,將客戶端的讀寫事件注冊(cè)到clientSelector,由clientSelector的線程處理這些客戶端讀寫,而serverSelector依然負(fù)責(zé)非阻塞輪詢監(jiān)聽是否有新連接:

簡單介紹之后我們給出Selector 聲明:

 //負(fù)責(zé)輪詢是否有新連接
  Selector serverSelector = Selector.open();
  //負(fù)責(zé)處理每個(gè)客戶端是否有數(shù)據(jù)可讀
  Selector clientSelector = Selector.open();

然后我們使用這個(gè)socket非阻塞輪詢就緒的連接事件并注冊(cè)到客戶端的epoll模型上:

new Thread(() -> {
            try {
                //創(chuàng)建服務(wù)端socket監(jiān)聽通道
                ServerSocketChannel listenerChannel = ServerSocketChannel.open();
                //綁定端口
                listenerChannel.socket().bind(new InetSocketAddress(8888));
                //設(shè)置為非阻塞監(jiān)聽
                listenerChannel.configureBlocking(false);
                //注冊(cè)感興趣的事件為OP_ACCEPT事件,即可處理當(dāng)前socket的ACCEPT連接接入事件
                listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT);

                //循環(huán)非阻塞獲取就緒事件
                while (true) {
                    //阻塞1毫秒查看是否有新的連接進(jìn)來
                    if (serverSelector.select(1) > 0) {
                        //查看是否有就緒的事件
                        Set<SelectionKey> set = serverSelector.selectedKeys();
                        Iterator<SelectionKey> keyIterator = set.iterator();
                        //遍歷事件
                        while (keyIterator.hasNext()) {
                            SelectionKey key = keyIterator.next();
                            //判斷是否是新的socket連接加入
                            if (key.isAcceptable()) {
                                System.out.println("有新的socket連接加入");
                                //接收此通道與socket的連接
                                SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
                                clientChannel.configureBlocking(false);
                                //服務(wù)端監(jiān)測(cè)到新連接之后,不再創(chuàng)建一個(gè)新線程,而是直接將
                                //新連接綁定到clientSelector上
                                clientChannel.register(clientSelector, SelectionKey.OP_READ);
                                keyIterator.remove();
                            }
                        }
                    }
                }


            } catch (Exception e) {

            }
        }).start();

我們?cè)賮砜纯纯蛻舳颂幚砭€程邏輯,和上文差不多,都是非阻塞輪詢客戶端就緒的事件,我們以輸出的方式模擬事件處理,然后進(jìn)入下一次循環(huán):

new Thread(() -> {
            while (true) {
                try {
                    //通過clientSelector.select(1)方法可以輪詢出來,進(jìn)而批量處理
                    if (clientSelector.select(1) > 0) {
                        //獲取就緒的客戶端事件
                        Set<SelectionKey> set = clientSelector.selectedKeys();
                        Iterator<SelectionKey> keyIterator = set.iterator();
                        //循環(huán)遍歷處理客戶端事件,完成后將該key移除,并在此注冊(cè)一個(gè)OP_READ等待下一次該socket就緒
                        while (keyIterator.hasNext()) {
                            SelectionKey key = keyIterator.next();
                            if (key.isReadable()) {
                                try {
                                    //獲取事件的通道
                                    SocketChannel clientChannel = (SocketChannel) key.channel();
                                    //數(shù)據(jù)的讀寫面向Buffer
                                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                                    //讀取數(shù)據(jù)到buffer中
                                    clientChannel.read(byteBuffer);
                                    byteBuffer.flip();
                                    System.out.println(Thread.currentThread().getName() + ":" + Charset.defaultCharset().newDecoder().decode(byteBuffer).toString());
                                } catch (Exception e) {

                                } finally {
                                    keyIterator.remove();
                                    key.interestOps(SelectionKey.OP_READ);
                                }
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();

可以看出原生nio雖然相對(duì)bio減小了一定開銷且提高一定的性能,但是缺點(diǎn)也很明顯:

原生的JDK的NIO概念非常多,使用非常復(fù)雜對(duì)新手不友好。

  • 底層使用epoll,很容易導(dǎo)致空輪詢進(jìn)而出現(xiàn)CPU100%。
  • 沒有對(duì)建立連接和處理請(qǐng)求的兩個(gè)處理建立線程模型,無法較好的發(fā)揮它的優(yōu)勢(shì),需要自己進(jìn)行擴(kuò)展實(shí)現(xiàn)。
  • 項(xiàng)目龐大后,會(huì)出現(xiàn)各種奇奇怪怪的bug,很難排查,且維護(hù)成本較高。

高性能網(wǎng)絡(luò)通信框架Netty

相對(duì)與JDK的原生nio,Netty與之相比有著一下的優(yōu)勢(shì):

  • 統(tǒng)一的API,支持多種傳輸類型、阻塞的和非阻塞的簡單而強(qiáng)大的線程模型,真正的無連接數(shù)據(jù)報(bào)套接字,支持鏈接邏輯組件以支持復(fù)用。
  • 易于使用,各種配置只需幾個(gè)方法的調(diào)用就能完成。
  • 性能較好,擁有比 Java 的核心API更高的吞吐量以及更低的延遲得益于池化和復(fù)用,擁有更低的資源消耗最少的內(nèi)存復(fù)制。
  • 健壯,不會(huì)因?yàn)槁?、快速或者超載的連接而導(dǎo)致OutOfMemoryError消除在高速網(wǎng)絡(luò)中NIO應(yīng)用程序常見的不公平讀/寫比率。
  • 安全,完整的SSL/TLS以及 StartTLS支持可用于受限環(huán)境下,如Applet和 OSGI。
  • 社區(qū)活躍。

同樣以以上述客戶端服務(wù)端通信,Netty實(shí)現(xiàn)就比較簡單了,我們編寫服務(wù)端時(shí),只需通過NioEventLoopGroup 完成上圖所說兩個(gè)slector創(chuàng)建,再通過channel指明當(dāng)前事件輪詢采用NIO非阻塞方式,最后將事件處理器FirstServerHandler添加到當(dāng)前服務(wù)端childHandler的pipeline上即可處理所有客戶端讀寫請(qǐng)求:

public static void main(String[] args) {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
  //創(chuàng)建處理連接的事件輪詢eventLoop 
        NioEventLoopGroup boss = new NioEventLoopGroup();
        //創(chuàng)建處理客戶端讀寫請(qǐng)求的eventLoop 
        NioEventLoopGroup worker = new NioEventLoopGroup();
  
        serverBootstrap.group(boss, worker)
          //設(shè)置為非阻塞輪詢
          .channel(NioServerSocketChannel.class)
          //childHandler添加ServerHandler客戶端讀寫請(qǐng)求
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {                   
                        ch.pipeline().addLast(new FirstServerHandler());    
                    }
                });

        serverBootstrap.bind("127.0.0.1", 8080);
    }

最后我們給出FirstServerHandler 的代碼,可以看到我們直接繼承ChannelInboundHandlerAdapter 處理客戶端發(fā)送的數(shù)據(jù),每當(dāng)服務(wù)端收到客戶端數(shù)據(jù)時(shí)就會(huì)回調(diào)channelRead,我們的邏輯也很簡單,收到數(shù)據(jù)之后直接回復(fù)Hello Netty client:

public class FirstServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 收到客戶端數(shù)據(jù)后會(huì)回調(diào)該方法
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf byteBuf = (ByteBuf) msg;
        //打印讀取到的數(shù)據(jù)
        System.out.println(new Date() + ": 服務(wù)端讀到數(shù)據(jù) -> " + byteBuf.toString(StandardCharsets.UTF_8));

        // 回復(fù)客戶端數(shù)據(jù)
        System.out.println(new Date() + ": 服務(wù)端寫出數(shù)據(jù)");
        //組裝數(shù)據(jù)并發(fā)送
        ByteBuf out = getByteBuf(ctx);
        ctx.channel().writeAndFlush(out);
    }

    private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
        ByteBuf buffer = ctx.alloc().buffer();

        byte[] bytes = "Hello Netty client ".getBytes(StandardCharsets.UTF_8);

        buffer.writeBytes(bytes);

        return buffer;
    }
}

此時(shí)我們通過telnet 127.0.0.1 8080進(jìn)行數(shù)據(jù)發(fā)送即可收到服務(wù)端的響應(yīng)了:

責(zé)任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關(guān)推薦

2022-05-13 10:59:14

容器網(wǎng)絡(luò)通信

2023-11-20 08:18:49

Netty服務(wù)器

2023-01-05 11:54:34

2023-02-02 08:18:41

2023-06-19 07:54:37

DotNetty網(wǎng)絡(luò)通信框架

2018-10-08 15:22:36

IO模型

2022-04-12 10:34:05

Web框架方案

2023-12-29 15:30:41

內(nèi)存存儲(chǔ)

2020-08-27 07:34:50

Zookeeper數(shù)據(jù)結(jié)構(gòu)

2022-09-06 11:21:49

光網(wǎng)絡(luò)光纖

2020-12-08 20:20:15

神經(jīng)網(wǎng)絡(luò)深度學(xué)習(xí)機(jī)器學(xué)習(xí)

2025-01-26 15:44:29

2024-02-20 19:53:57

網(wǎng)絡(luò)通信協(xié)議

2019-09-25 08:25:49

RPC網(wǎng)絡(luò)通信

2023-04-26 15:43:24

容器編排容器編排工具

2022-06-08 08:11:56

威脅建模網(wǎng)絡(luò)安全網(wǎng)絡(luò)攻擊

2023-11-06 08:16:19

APM系統(tǒng)運(yùn)維

2022-02-25 07:34:36

MQTT協(xié)議RabbitMQ

2022-11-11 19:09:13

架構(gòu)

2021-01-27 11:10:49

JVM性能調(diào)優(yōu)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)