自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Netty入門實踐:模擬IM聊天

開發(fā) 網(wǎng)絡(luò)
本文以入門實踐為主,通過原理+代碼的方式,實現(xiàn)一個簡易IM聊天功能。

我們使用的框架幾乎都有網(wǎng)絡(luò)通信的模塊,比如常見的Dubbo、RocketMQ、ElasticSearch等。它們的網(wǎng)絡(luò)通信模塊使用Netty實現(xiàn),之所以選擇Netty,有兩個主要原因:

  • Netty封裝了復(fù)雜的JDK 的 NIO操作,還封裝了各種復(fù)雜的異常場景,豐富的API使得在使用上也非常方便,幾行代碼就可以實現(xiàn)高性能的網(wǎng)絡(luò)通信功能。
  • Netty已經(jīng)經(jīng)歷各種大型中間件的生產(chǎn)環(huán)境的驗證,高可用性和健壯性都得到了全方位驗證,用起來更放心。

本文以入門實踐為主,通過原理+代碼的方式,實現(xiàn)一個簡易IM聊天功能。分為兩個部分:Netty的核心概念、IM聊天簡易實現(xiàn)。

一、Netty核心概念

1、通信流程

既然是網(wǎng)絡(luò)通信,那肯定有服務(wù)端和客戶端。在客戶端-A和客戶端-B通信的過程中,實際上是利用服務(wù)端作為消息中轉(zhuǎn)站,來實現(xiàn)A-B通信的。

不管是點-點通信,還是群通信,都可以認為是客戶端-服務(wù)端之間的通信,有了這一點,許多設(shè)計方案都可以輕松理解。

2、服務(wù)端核心概念

(1) Boss線程:Boss線程負責(zé)監(jiān)聽端口,接受新的連接,監(jiān)聽連接的數(shù)據(jù)讀寫變化。

(2) Worker線程:Worker線程負責(zé)處理具體的業(yè)務(wù)邏輯,Boss線程接收到連接的讀寫變化后,然后交給Worker處理具體業(yè)務(wù)邏輯。

(3) 服務(wù)端的IO模型:Netty支持使用NIO和BIO進行通信,可以自行設(shè)置。一般使用NioServerSocketChannel來指定NIO模型。

(4) 服務(wù)端引導(dǎo)類:服務(wù)端通過引導(dǎo)類 ServerBootstrap來啟動一系列的工作。

3、客戶端核心概念

(1) Worker線程:客戶端只有工作線程的概念,負責(zé)連接到服務(wù)端,監(jiān)聽數(shù)據(jù)讀寫變化。

(2) 客戶端的IO模型:一般使用NioSocketChannel指定客戶端的NIO模型

(3) 客戶端引導(dǎo)類:客戶端通過引導(dǎo)類Bootstrap來啟動一些列工作。

4、通用核心概念

(1) Handler:負責(zé)處理接受到的消息,大部分的業(yè)務(wù)邏輯都是放在Handler里處理。自定義的Handler一般繼承于SimpleChannelInboundHandler或者ChannelInboundHandlerAdapter。

(2) ByteBuf和編碼、解碼:數(shù)據(jù)的載體,Java對象編碼成字節(jié)碼,存放于ByteBuf,然后發(fā)送出去。服務(wù)端接收到消息后,從ByteBuf中取出數(shù)據(jù),解碼成Java對象。

(3) 通訊協(xié)議:許多框架都會自定義一套自己的協(xié)議,這樣比較符合業(yè)務(wù)。比如dubbo協(xié)議、hessian協(xié)議。

一般的協(xié)議包括如下部分:魔數(shù)、版本號、序列化算法、指令、數(shù)據(jù)長度、數(shù)據(jù)內(nèi)容,其余的都是為了適配自身業(yè)務(wù)而定的。

  • 魔數(shù):一般是固定數(shù)字,用來快速判斷是否符合本協(xié)議,如果不符合本協(xié)議,則快速失敗。
  • 版本號:一般無需改動,如果早期設(shè)置的協(xié)議到了后續(xù)不適用了,在升級版本號。
  • 序列化算法:Java對象轉(zhuǎn)序列化的方式,比如JSON。
  • 指令:操作大類。比如說登錄指令、單點發(fā)送消息指令、建群指令等。這樣服務(wù)端接收到對應(yīng)指令就用對應(yīng)的Handler去處理業(yè)務(wù)邏輯。指令占用的字節(jié)數(shù)可以根據(jù)自身業(yè)務(wù)適當(dāng)調(diào)大。
  • 數(shù)據(jù)長度:用來記錄本次數(shù)據(jù)的長度。
  • 數(shù)據(jù)內(nèi)容:具體消息內(nèi)容,比如聊天時的消息、登錄時的用戶名密碼等。

(4) 粘包拆包

Netty屬于上層應(yīng)用,在發(fā)送消息時,還是通過底層操作系統(tǒng)將數(shù)據(jù)發(fā)送出去,操作系統(tǒng)在發(fā)送數(shù)據(jù)時,不會按照我們設(shè)想的消息長度去發(fā)送內(nèi)容。這就需要我們在接收到內(nèi)容時,自行做好內(nèi)容的分割和等待。

比如有一條消息1024字節(jié),如果接受的內(nèi)容沒這么長就需要繼續(xù)等待,等這條消息的內(nèi)容完整后,在處理。如果接受的內(nèi)容包含了1條完整消息和1條不完整的消息,那么就需要拆分內(nèi)容,將完整的消息先傳遞到后面處理,剩下不完整的消息則繼續(xù)等待下一個內(nèi)容。

Netty自帶了幾種拆包器:固定長度的拆包器 FixedLengthFrameDecoder、行拆包器 LineBasedFrameDecoder、分隔符拆包器 DelimiterBasedFrameDecoder、長度域拆包器LengthFieldBasedFrameDecoder。

一般在使用自定義協(xié)議時,會使用:長度域拆包器 LengthFieldBasedFrameDecoder。

(5) 空閑檢測和定時心跳

在服務(wù)端和客戶端的通信過程中,有時候會出現(xiàn)假死連接,或者長時間沒有消息傳遞需要釋放連接。對于這些連接,我們需要及時釋放,畢竟每條連接都占用著CPU和內(nèi)存資源。大量這種連接如果不及時釋放,服務(wù)器資源遲早會耗盡,最終崩潰。

應(yīng)對這種問題的解決方式是:Netty提供了IdleStateHandler做空閑檢測,用來檢測連接是否活躍,如果再指定的時間內(nèi),沒有活躍,那么就關(guān)閉連接。然后就是客戶端定時發(fā)送心跳請求,服務(wù)器響應(yīng)心跳請求。

二、IM聊天簡易實現(xiàn)

介紹完Netty的核心概念,接下來以一個簡易的點對點IM聊天,將核心概念融入到案例中。IM聊天的核心模塊大致是如下幾個:

1、通信主體流程

通信主體流程就是搭建好:服務(wù)端、客戶端、兩端正常建立連接進行通信。

服務(wù)端代碼:

public static void main(String[] args) {
    ServerBootstrap serverBootstrap = new ServerBootstrap();

    NioEventLoopGroup boss = new NioEventLoopGroup();
    NioEventLoopGroup worker = new NioEventLoopGroup();
    serverBootstrap
            .group(boss, worker)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                protected void initChannel(NioSocketChannel ch) {
                    ch.pipeline().addLast(new StringDecoder());
                    ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, String msg) {
                            System.out.println("server accept: " + msg);
                        }
                    });
                }
            });
    serverBootstrap.bind(9000)
            .addListener(future -> {
                if (future.isSuccess()) {
                    System.out.println("端口9000綁定成功");
                } else {
                    System.err.println("端口9000綁定失敗");
                }
            });
}

客戶端代碼:

public static void main(String[] args) throws InterruptedException {
    Bootstrap bootstrap = new Bootstrap();
    NioEventLoopGroup group = new NioEventLoopGroup();

    bootstrap.group(group)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<Channel>() {
                @Override
                protected void initChannel(Channel ch) {
                    ch.pipeline().addLast(new StringEncoder());
                }
            });

    bootstrap.connect("127.0.0.1", 9000)
            .addListener(future -> {
                if (future.isSuccess()) {
                    System.out.println("鏈接服務(wù)端成功");
                    Channel channel = ((ChannelFuture) future).channel();
                    channel.writeAndFlush("我是客戶端A");
                } else {
                    System.err.println("連接服務(wù)端失敗");
                }
            });
}

2、數(shù)據(jù)包—包含通訊協(xié)議

定義數(shù)據(jù)包的抽象類,后續(xù)的各種類型的數(shù)據(jù)包都繼承此類。數(shù)據(jù)包中定義通訊協(xié)議的各種字段。

@Data
public abstract class Packet {
    /**
     * 協(xié)議版本
     */
    private Byte version = 1;

    /**
     * 指令,此處有多種實現(xiàn):比如登錄、登出、單聊、建群等等
     *
     * @return
     */
    public abstract Byte getCommand();

    /**
     * 獲取算法,默認使用JSON,如果使用其余算法,子類重寫此方法
     *
     * @return
     */
    public Byte getSerializeAlgorithm() {
        return SerializerAlgorithm.JSON;
    }
}

public class LoginRequestPacket extends Packet {
    private String userName;

    private String password;

    @Override
    public Byte getCommand() {
        return Command.LOGIN_REQUEST;
    }
}

3、序列化器

定義序列化器,功能包括:序列化、反序列化??梢远x多種序列化算法,文中以JSON為例。

public interface Serializer {
    /**
     * 序列化算法
     *
     * @return
     */
    byte getSerializerAlgorithm();

    /**
     * java 對象轉(zhuǎn)換成二進制
     */
    byte[] serialize(Object object);

    /**
     * 二進制轉(zhuǎn)換成 java 對象
     */
    <T> T deserialize(Class<T> clazz, byte[] bytes);
}

public class JSONSerializer implements Serializer {

    @Override
    public byte getSerializerAlgorithm() {
        return SerializerAlgorithm.JSON;
    }

    @Override
    public byte[] serialize(Object object) {
        return JSON.toJSONBytes(object);
    }

    @Override
    public <T> T deserialize(Class<T> clazz, byte[] bytes) {
        return JSON.parseObject(bytes, clazz);
    }
}

4、編解碼器

有了通訊協(xié)議、有了序列化協(xié)議,接下來就是對數(shù)據(jù)的編碼和解碼了。

public void encode(ByteBuf byteBuf, Packet packet) {
    Serializer serializer = getSerializer(packet.getSerializeAlgorithm());

    // 1. 序列化 java 對象
    byte[] bytes = serializer.serialize(packet);

    // 2. 實際編碼過程
    byteBuf.writeInt(MAGIC_NUMBER);
    byteBuf.writeByte(packet.getVersion());
    byteBuf.writeByte(packet.getSerializeAlgorithm());
    byteBuf.writeByte(packet.getCommand());
    byteBuf.writeInt(bytes.length);
    byteBuf.writeBytes(bytes);
}


public Packet decode(ByteBuf byteBuf) {
    // 跳過 magic number
    byteBuf.skipBytes(4);
    // 跳過版本號
    byteBuf.skipBytes(1);
    // 讀取序列化算法
    byte serializeAlgorithm = byteBuf.readByte();
    // 讀取指令
    byte command = byteBuf.readByte();
    // 讀取數(shù)據(jù)包長度
    int length = byteBuf.readInt();
    // 讀取數(shù)據(jù)
    byte[] bytes = new byte[length];
    byteBuf.readBytes(bytes);

    Class<? extends Packet> requestType = getRequestType(command);
    Serializer serializer = getSerializer(serializeAlgorithm);

    if (requestType != null && serializer != null) {
        return serializer.deserialize(requestType, bytes);
    }

    return null;
}

5、消息處理器Handler

以上把通訊的基本架子和收發(fā)消息的數(shù)據(jù)包、協(xié)議、編解碼器等基礎(chǔ)工具已經(jīng)做完,接下來就是編寫Handler實現(xiàn)具體的業(yè)務(wù)邏輯了。

這里以客戶端發(fā)起登錄功能為例,分3步,消息收發(fā)也是類似:

  • 先在客戶端發(fā)送登錄請求數(shù)據(jù)包。
  • 服務(wù)端接收到登錄請求數(shù)據(jù)包后,在服務(wù)端的Handler里做業(yè)務(wù)邏輯處理,然后發(fā)送響應(yīng)給客戶端。
  • 客戶端接收到登錄響應(yīng)數(shù)據(jù)包后,在客戶端的Handler里做業(yè)務(wù)邏輯處理。

效果如下:

核心代碼如下:

  • 客戶端發(fā)送請求
bootstrap.connect("127.0.0.1", 9000)
                .addListener(future -> {
                    if (future.isSuccess()) {
                        System.out.println("連接服務(wù)端成功");
                        Channel channel = ((ChannelFuture) future).channel();
                        // 連接之后,假設(shè)再這里發(fā)起各種操作指令,采用異步線程開始發(fā)送各種指令,發(fā)送數(shù)據(jù)用到的的channel是必不可少的
                        sendActionCommand(channel);
                    } else {
                        System.err.println("連接服務(wù)端失敗");
                    }
                });

private static void sendActionCommand(Channel channel) {
        // 直接采用控制臺輸入的方式,模擬操作指令
        Scanner scanner = new Scanner(System.in);
        LoginActionCommand loginActionCommand = new LoginActionCommand();
        new Thread(() -> {
            loginActionCommand.exec(scanner, channel);
        }).start();
    }
  • 服務(wù)端接受請求,并且處理
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {
    LoginResponsePacket loginResponsePacket = new LoginResponsePacket();
    loginResponsePacket.setVersion(loginRequestPacket.getVersion());
    loginResponsePacket.setUserName(loginRequestPacket.getUserName());

    if (valid(loginRequestPacket)) {
        loginResponsePacket.setSuccess(true);
        String userId = IDUtil.randomId();
        loginResponsePacket.setUserId(userId);
        System.out.println("[" + loginRequestPacket.getUserName() + "]登錄成功");
        SessionUtil.bindSession(new Session(userId, loginRequestPacket.getUserName()), ctx.channel());
    } else {
        loginResponsePacket.setReason("校驗失敗");
        loginResponsePacket.setSuccess(false);
        System.out.println("登錄失敗!");
    }

    // 登錄響應(yīng)
    ctx.writeAndFlush(loginResponsePacket);
}

private boolean valid(LoginRequestPacket loginRequestPacket) {
    System.out.println("服務(wù)端LoginRequestHandler,正在校驗客戶端登錄請求");
    return true;
}
  • 客戶端接受響應(yīng),并且處理
public class LoginResponseHandler extends SimpleChannelInboundHandler<LoginResponsePacket> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LoginResponsePacket loginResponsePacket) {
        String userId = loginResponsePacket.getUserId();
        String userName = loginResponsePacket.getUserName();

        if (loginResponsePacket.isSuccess()) {
            System.out.println("[" + userName + "]登錄成功,userId為: " + loginResponsePacket.getUserId());
            SessionUtil.bindSession(new Session(userId, userName), ctx.channel());
        } else {
            System.out.println("[" + userName + "]登錄失敗,原因為:" + loginResponsePacket.getReason());
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        System.out.println("客戶端連接被關(guān)閉!");
    }
}

6、空閑檢測和定時心跳

主流程和主要功能已經(jīng)實現(xiàn),還剩最后一個空閑檢測和定時心跳。

實現(xiàn)步驟:

  • 客戶端和服務(wù)端都先定義好空閑檢測。如果再規(guī)定的時間內(nèi)沒有數(shù)據(jù)傳輸,則關(guān)閉通道。
  • 客戶端定時發(fā)送心跳
  • 服務(wù)端處理心跳請求,發(fā)送響應(yīng)給客戶端

核心代碼:

  • 空閑檢測代碼:
/**
 * IM聊天空閑檢測器
 * 比如:20秒內(nèi)沒有數(shù)據(jù),則關(guān)閉通道
 */
public class ImIdleStateHandler extends IdleStateHandler {

    private static final int READER_IDLE_TIME = 20;

    public ImIdleStateHandler() {
        super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
    }

    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
        System.out.println(READER_IDLE_TIME + "秒內(nèi)未讀到數(shù)據(jù),關(guān)閉連接!");
        ctx.channel().close();
    }
}
  • 客戶端定時心跳代碼:
public void channelActive(ChannelHandlerContext ctx) throws Exception {
        scheduleSendHeartBeat(ctx);

        super.channelActive(ctx);
    }

    private void scheduleSendHeartBeat(ChannelHandlerContext ctx) {
        // 此處無需使用scheduleAtFixedRate,因為如果通道失效后,就無需在發(fā)起心跳了,按照目前的方式是最好的:成功一次安排一次
        ctx.executor().schedule(() -> {

            if (ctx.channel().isActive()) {
                System.out.println("定時任務(wù)發(fā)送心跳!");
                ctx.writeAndFlush(new HeartBeatRequestPacket());
                scheduleSendHeartBeat(ctx);
            }

        }, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
    }
  • 服務(wù)端響應(yīng)心跳代碼:
public class ImIdleStateHandler extends IdleStateHandler {

    private static final int READER_IDLE_TIME = 20;

    public ImIdleStateHandler() {
        super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
    }

    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
        System.out.println(READER_IDLE_TIME + "秒內(nèi)未讀到數(shù)據(jù),關(guān)閉連接!");
        ctx.channel().close();
    }
}

三、總結(jié)

本文介紹了Netty的核心概念,以及基本使用方法,希望能夠幫到你。本文核心詞:

  • 通信流程
  • Boss線程、Worker線程
  • 處理消息的Handler
  • 通訊協(xié)議、序列化協(xié)議、編解碼器
  • 空閑檢測、定時心跳

本文完整代碼:https://github.com/yclxiao/netty-demo.git

責(zé)任編輯:趙寧寧 來源: 不焦躁的程序員
相關(guān)推薦

2012-08-13 13:03:31

Web

2023-03-27 18:33:47

客服IM消息

2019-12-10 09:20:30

NettyBIO開發(fā)

2021-08-09 09:48:16

NettyChannelHand架構(gòu)

2023-02-10 08:16:48

WebSocket簡易聊天室

2024-10-14 08:09:08

2020-12-14 15:59:10

PythonWechaty機器人

2024-10-24 20:48:04

Netty線程Java

2018-04-20 09:36:23

NettyWebSocket京東

2016-08-05 13:19:29

GET請求github項目 POST請求

2023-11-28 08:49:01

短輪詢WebSocket長輪詢

2020-09-30 18:00:48

JavaSpring BootIM

2020-04-15 08:33:43

Netty網(wǎng)絡(luò)通信

2009-09-04 16:05:08

2022-07-28 07:00:56

Nuclei漏洞掃描器

2010-12-08 09:03:40

SQLiteAndroid

2020-12-04 10:42:54

GithubSSDNode.js

2023-09-11 10:53:32

2019-04-24 23:49:57

宜人貸蜂巢API網(wǎng)關(guān)Netty

2024-07-03 10:09:29

點贊
收藏

51CTO技術(shù)棧公眾號