RocketMQ結(jié)合源碼告訴你消息量大為啥不需要手動壓縮消息
背景
最近同事發(fā)現(xiàn)線上發(fā)送的RocketMQ消息太大,同事為了節(jié)省網(wǎng)絡帶寬和存儲空間,手動壓縮消息然后再進行消息發(fā)送,發(fā)現(xiàn)磁盤也沒有明顯的縮減。
所以我打算結(jié)合源碼告訴他RocketMQ自帶的消息壓縮。
RocketMQ版本
- 5.1.0
為什么需要壓縮消息
首先說一下為什么需要消息壓縮,原因其實很簡單。就是為了節(jié)省網(wǎng)絡帶寬和存儲空間。
在哪里壓縮消息
我們的消息壓縮可以在很多個地方進行。
有兩種方案
在client端進行壓縮
比如我們可以在Producer發(fā)送消息的時候進行消息壓縮。
然后將壓縮后的消息發(fā)送到Broker,broker只管存儲。
等到consumer需要消息的時候,原封不動的推送給消費者,由consumer自己進行解壓縮。
這種方式的好處是broker不需要關(guān)心消息的壓縮和解壓縮,只需要存儲消息即可。
在broker端進行壓縮
這種方式就是Producer發(fā)送消息的時候,不進行壓縮。
RocketMQ在存儲的時候自己進行消息壓縮,consumer進行消息拉取的時候,broker進行消息解壓縮,然后推送給consumer。
這種方式就是消耗broker cpu,也不能節(jié)省網(wǎng)絡帶寬,只能節(jié)省存儲空間。
所以很明顯是在client端進行壓縮比較好。
源碼分析
這里我們來具體結(jié)合源碼分析下:
在消息發(fā)送org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl方法中會對消息進行壓縮判斷。
圖片
tryToCompressMessage 消息壓縮
什么消息會被壓縮呢?
private boolean tryToCompressMessage(final Message msg) {
if (msg instanceof MessageBatch) {
//batch does not support compressing right now
return false;
}
byte[] body = msg.getBody();
if (body != null) {
if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
try {
byte[] data = compressor.compress(body, compressLevel);
if (data != null) {
msg.setBody(data);
return true;
}
} catch (IOException e) {
log.error("tryToCompressMessage exception", e);
log.warn(msg.toString());
}
}
}
return false;
}
- 批量消息不支持壓縮。
- 消息體長度大于defaultMQProducer.getCompressMsgBodyOverHowmuch()的時候進行壓縮。默認1024 * 4 = 4kb。
- 壓縮算法是什么呢?
RocketMQ目前提供三種壓縮算法
- LZ4
- ZSTD
- ZLIB
默認壓縮算法為ZLIB。
private CompressionType compressType = CompressionType.of(System.getProperty(MixAll.MESSAGE_COMPRESS_TYPE, "ZLIB"));
壓縮等級為5。
private int compressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
- 消息壓縮完后會通過sysFlag進行標記,表示消息進行了壓縮,方便后續(xù)解壓。
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
sysFlag |= compressType.getCompressionFlag();
消息解壓
消息解壓主要是在方法org.apache.rocketmq.common.message.MessageDecoder.decode(java.nio.ByteBuffer, boolean, boolean, boolean, boolean, boolean)中進行的。
在client拉取到消息成功后對PullResult對象進行處理執(zhí)行decodesBatch方法。
圖片
消息解析decodesBatch方法會調(diào)用org.apache.rocketmq.common.message.MessageDecoder.decode(java.nio.ByteBuffer, boolean, boolean, boolean, boolean, boolean)方法。
decode方法會對消息進行解壓。
總結(jié)
- 消息壓縮主要是為了節(jié)省網(wǎng)絡帶寬和存儲空間。
- RocketMQ提供了三種壓縮算法,分別是LZ4、ZSTD、ZLIB,默認為ZLIB。
- 消息壓縮主要是在Producer發(fā)送消息的時候進行壓縮,broker只管存儲。
- 消息解壓主要是在Consumer拉取消息的時候進行解壓。
- RocketMQ消息壓縮僅支持單條消息壓縮,不支持批量消息壓縮。
- 一般消息壓縮都會選擇在client端進行壓縮,這樣可以節(jié)省broker的cpu。