自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

RocketMQ結(jié)合源碼告訴你消息量大為啥不需要手動壓縮消息

開發(fā) 前端
RocketMQ在存儲的時候自己進行消息壓縮,consumer進行消息拉取的時候,broker進行消息解壓縮,然后推送給consumer,這種方式就是消耗broker?cpu,也不能節(jié)省網(wǎng)絡帶寬,只能節(jié)省存儲空間,所以很明顯是在client端進行壓縮比較好。

背景

最近同事發(fā)現(xiàn)線上發(fā)送的RocketMQ消息太大,同事為了節(jié)省網(wǎng)絡帶寬和存儲空間,手動壓縮消息然后再進行消息發(fā)送,發(fā)現(xiàn)磁盤也沒有明顯的縮減。

所以我打算結(jié)合源碼告訴他RocketMQ自帶的消息壓縮。

RocketMQ版本

  • 5.1.0

為什么需要壓縮消息

首先說一下為什么需要消息壓縮,原因其實很簡單。就是為了節(jié)省網(wǎng)絡帶寬和存儲空間。

在哪里壓縮消息

我們的消息壓縮可以在很多個地方進行。

有兩種方案

在client端進行壓縮

比如我們可以在Producer發(fā)送消息的時候進行消息壓縮。

然后將壓縮后的消息發(fā)送到Broker,broker只管存儲。

等到consumer需要消息的時候,原封不動的推送給消費者,由consumer自己進行解壓縮。

這種方式的好處是broker不需要關(guān)心消息的壓縮和解壓縮,只需要存儲消息即可。

在broker端進行壓縮

這種方式就是Producer發(fā)送消息的時候,不進行壓縮。

RocketMQ在存儲的時候自己進行消息壓縮,consumer進行消息拉取的時候,broker進行消息解壓縮,然后推送給consumer。

這種方式就是消耗broker cpu,也不能節(jié)省網(wǎng)絡帶寬,只能節(jié)省存儲空間。

所以很明顯是在client端進行壓縮比較好。

源碼分析

這里我們來具體結(jié)合源碼分析下:

在消息發(fā)送org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl方法中會對消息進行壓縮判斷。

圖片圖片

tryToCompressMessage 消息壓縮

什么消息會被壓縮呢?

private boolean tryToCompressMessage(final Message msg) {
        if (msg instanceof MessageBatch) {
            //batch does not support compressing right now
            return false;
        }
        byte[] body = msg.getBody();
        if (body != null) {
            if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
                try {
                    byte[] data = compressor.compress(body, compressLevel);
                    if (data != null) {
                        msg.setBody(data);
                        return true;
                    }
                } catch (IOException e) {
                    log.error("tryToCompressMessage exception", e);
                    log.warn(msg.toString());
                }
            }
        }

        return false;
    }
  • 批量消息不支持壓縮。
  • 消息體長度大于defaultMQProducer.getCompressMsgBodyOverHowmuch()的時候進行壓縮。默認1024 * 4 = 4kb。
  • 壓縮算法是什么呢?

RocketMQ目前提供三種壓縮算法

  • LZ4
  • ZSTD
  • ZLIB

默認壓縮算法為ZLIB。

private CompressionType compressType = CompressionType.of(System.getProperty(MixAll.MESSAGE_COMPRESS_TYPE, "ZLIB"));

壓縮等級為5。

private int compressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
  • 消息壓縮完后會通過sysFlag進行標記,表示消息進行了壓縮,方便后續(xù)解壓。
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    sysFlag |= compressType.getCompressionFlag();

消息解壓

消息解壓主要是在方法org.apache.rocketmq.common.message.MessageDecoder.decode(java.nio.ByteBuffer, boolean, boolean, boolean, boolean, boolean)中進行的。

在client拉取到消息成功后對PullResult對象進行處理執(zhí)行decodesBatch方法。

圖片圖片

消息解析decodesBatch方法會調(diào)用org.apache.rocketmq.common.message.MessageDecoder.decode(java.nio.ByteBuffer, boolean, boolean, boolean, boolean, boolean)方法。

decode方法會對消息進行解壓。

圖片

總結(jié)

  • 消息壓縮主要是為了節(jié)省網(wǎng)絡帶寬和存儲空間。
  • RocketMQ提供了三種壓縮算法,分別是LZ4、ZSTD、ZLIB,默認為ZLIB。
  • 消息壓縮主要是在Producer發(fā)送消息的時候進行壓縮,broker只管存儲。
  • 消息解壓主要是在Consumer拉取消息的時候進行解壓。
  • RocketMQ消息壓縮僅支持單條消息壓縮,不支持批量消息壓縮。
  • 一般消息壓縮都會選擇在client端進行壓縮,這樣可以節(jié)省broker的cpu。
責任編輯:武曉燕 來源: 小奏技術(shù)
相關(guān)推薦

2022-09-26 10:43:13

RocketMQ保存消息

2012-08-23 09:50:07

測試測試人員軟件測試

2010-11-23 10:55:47

跳槽

2011-09-02 09:45:39

交互設計Android

2018-01-29 13:18:42

前端JavaScript

2021-05-26 10:19:01

jreJava應用程序

2022-12-22 10:03:18

消息集成

2019-07-15 08:00:00

AI人工智能

2021-05-07 15:18:26

比特幣禁令監(jiān)管

2024-10-11 09:15:33

2017-03-13 13:54:40

戴爾

2024-10-29 08:34:27

RocketMQ消息類型事務消息

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2024-04-09 09:08:09

Kafka消息架構(gòu)

2020-07-28 08:28:07

JavaScriptswitch開發(fā)

2022-06-07 17:01:31

UI框架前端

2023-04-26 10:06:08

RocketMQ屬性Consumer

2019-07-17 06:17:01

UbuntuUbuntu LTSNvidia驅(qū)動

2022-04-21 08:01:34

React框架action

2009-11-23 12:45:22

點贊
收藏

51CTO技術(shù)棧公眾號