自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

RocketMQ的編解碼技術(shù)細(xì)節(jié)

開發(fā) 前端
RocketMQ序列化、反序列化有兩種方式:一種是將數(shù)據(jù)通過FastJSON將數(shù)據(jù)轉(zhuǎn)化成JSON字符串,然后轉(zhuǎn)化成byte[]數(shù)組進(jìn)行編解碼。另一種方式是RocketMQ定義了一套自己的編解碼,將每個(gè)字段分別進(jìn)行編解碼。編碼不論采用那種方式,都最終都編碼為byte[]。

 [[405364]]

本文轉(zhuǎn)載自微信公眾號「漫漫技術(shù)路」,作者劉蒞。轉(zhuǎn)載本文請聯(lián)系漫漫技術(shù)路公眾號。

從上一篇文章中,我們了解的RocketMQ不同組件之間,數(shù)據(jù)是如何通過網(wǎng)絡(luò)傳輸?shù)?,總結(jié)以下幾點(diǎn)

  • RocketMQ的網(wǎng)絡(luò)傳輸模塊,在remoting子模塊下,入口可以參考RemotingNettyServer和RemotingNettyClient兩個(gè)類。
  • RocketMQ是依靠Netty,與各個(gè)組件進(jìn)行數(shù)據(jù)傳輸。
  • RocketMQ序列化、反序列化有兩種方式:一種是將數(shù)據(jù)通過FastJSON將數(shù)據(jù)轉(zhuǎn)化成JSON字符串,然后轉(zhuǎn)化成byte[]數(shù)組進(jìn)行編解碼。另一種方式是RocketMQ定義了一套自己的編解碼,將每個(gè)字段分別進(jìn)行編解碼。編碼不論采用那種方式,都最終都編碼為byte[]。

今天,我們分析RocketMQ的編解碼細(xì)節(jié)。在本篇文章中,可以學(xué)到:

  • RocketMQ網(wǎng)絡(luò)協(xié)議。
  • RocketMQ在傳輸數(shù)據(jù)時(shí),內(nèi)存的分配。
  • RocketMQ編解碼細(xì)節(jié)。

編碼流程

首先,我們編碼器org.apache.rocketmq.remoting.netty.NettyEncoder入手

  1. @ChannelHandler.Sharable 
  2. public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> { 
  3.     private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); 
  4.  
  5.     @Override 
  6.     public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out
  7.         throws Exception { 
  8.         try { 
  9.             ByteBuffer header = remotingCommand.encodeHeader(); 
  10.             out.writeBytes(header); 
  11.             byte[] body = remotingCommand.getBody(); 
  12.             if (body != null) { 
  13.                 out.writeBytes(body); 
  14.             } 
  15.         } catch (Exception e) { 
  16.             log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); 
  17.             if (remotingCommand != null) { 
  18.                 log.error(remotingCommand.toString()); 
  19.             } 
  20.             RemotingUtil.closeChannel(ctx.channel()); 
  21.         } 
  22.     } 

從上面的代碼,是RocketMQ將RemotingCommand對象編碼成ByteBuf的唯一入口。

我們可以看到,先將header部分編碼成ByteBuf,然后將body部分追加到ByteBuf里。

body部分編碼很容易理解,那么header部分是怎么編碼的呢?

  1. public ByteBuffer encodeHeader() { 
  2.     return encodeHeader(this.body != null ? this.body.length : 0); 
  3.  
  4. public ByteBuffer encodeHeader(final int bodyLength) { 
  5.     // 1> header length size 
  6.     int length = 4; 
  7.  
  8.     // 2> header data length 
  9.     byte[] headerData; 
  10.     headerData = this.headerEncode(); 
  11.  
  12.     length += headerData.length; 
  13.  
  14.     // 3> body data length 
  15.     length += bodyLength; 
  16.  
  17.     ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); 
  18.  
  19.     // length 
  20.     result.putInt(length); 
  21.  
  22.     // header length 
  23.     result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); 
  24.  
  25.     // header data 
  26.     result.put(headerData); 
  27.  
  28.     result.flip(); 
  29.  
  30.     return result; 

我們一步一步看,邏輯如下

  • 定義變量length,其初始值是4,經(jīng)過幾步操作,其值是4+header.length+body.length。
  • 調(diào)用headerEnocde()方法編碼header部分。
  • 創(chuàng)建ByteBuffer并申請4+length-body.length大小的內(nèi)存,其實(shí)就是4+4+header.length大小的內(nèi)存。
  • 將數(shù)據(jù)寫入ByteBuffer,完成header部分的編碼。

我們畫一張圖來表示當(dāng)前內(nèi)存都存的什么數(shù)據(jù)。

我們可以很清楚的看到,每部分?jǐn)?shù)據(jù),分別存儲在ButyBuf的什么位置,這里需要特別強(qiáng)調(diào)的是,byte[4]部分存儲的什么,通過源碼,進(jìn)一步分析。

  1. result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); 
  2.   
  3. public static byte[] markProtocolType(int source, SerializeType type) { 
  4.     byte[] result = new byte[4]; 
  5.   
  6.     result[0] = type.getCode(); 
  7.     result[1] = (byte) ((source >> 16) & 0xFF); 
  8.     result[2] = (byte) ((source >> 8) & 0xFF); 
  9.     result[3] = (byte) (source & 0xFF); 
  10.     return result; 
  11.   
  12. public enum SerializeType { 
  13.     JSON((byte) 0), 
  14.     ROCKETMQ((byte) 1); 

markProtocolType方法,第一個(gè)參數(shù)source表示header部分的長度,第二個(gè)參數(shù)type是編碼類型的枚舉,返回值是byte[]。

那么markProtocolType方法是做什么的呢?我們將type字段傳入JSON和ROCKETMQ這兩個(gè)枚舉值,分別看一下,返回的是什么。

返回值共四個(gè)字節(jié),只有第一個(gè)字節(jié)不同,第四個(gè)字節(jié)是header部分的長度,前文已經(jīng)提到過,RocketMQ對header部分,可以采用兩種編解碼方式。

對!!沒錯,第一個(gè)字節(jié)就是標(biāo)識編解碼類型的。

解碼流程

接下來我們來看解碼,解碼與編碼稍有不同,解碼器繼承Netty提供的LengthFieldBasedFrameDecoder解碼器,我們來看org.apache.rocketmq.remoting.netty.NettyDecoder的源碼。

  1. public class NettyDecoder extends LengthFieldBasedFrameDecoder { 
  2.     private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); 
  3.  
  4.     private static final int FRAME_MAX_LENGTH = 
  5.         Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength""16777216")); 
  6.  
  7.     public NettyDecoder() { 
  8.         super(FRAME_MAX_LENGTH, 0, 4, 0, 4); 
  9.     } 
  10.  
  11.     @Override 
  12.     public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { 
  13.         ByteBuf frame = null
  14.         try { 
  15.             frame = (ByteBuf) super.decode(ctx, in); 
  16.             if (null == frame) { 
  17.                 return null
  18.             } 
  19.  
  20.             ByteBuffer byteBuffer = frame.nioBuffer(); 
  21.  
  22.             return RemotingCommand.decode(byteBuffer); 
  23.         } catch (Exception e) { 
  24.             log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); 
  25.             RemotingUtil.closeChannel(ctx.channel()); 
  26.         } finally { 
  27.             if (null != frame) { 
  28.                 frame.release(); 
  29.             } 
  30.         } 
  31.  
  32.         return null
  33.     } 

其步驟如下:

  • 調(diào)用LengthFieldBasedFrameDecoder的decode方法,初次解碼。
  • 調(diào)用RemotingCommand.decode()方法,完成對header、body部分的解碼,并轉(zhuǎn)化為RemotingCommand對象。

為什么要經(jīng)過兩次解碼?

熟悉LengthFieldBasedFrameDecoder解碼器的朋友都知道,LengthFieldBasedFrameDecoder解碼器是Netty提供的一種非常靈活的解碼器。

它在RocketMQ的NettyDecoder類中是這樣被構(gòu)造的。

  1. public NettyDecoder() { 
  2.     super(FRAME_MAX_LENGTH, 0, 4, 0, 4); 

LengthFieldBasedFrameDecoder我在這里不詳細(xì)解釋,按上面的構(gòu)造方法,意思是跳過開頭前4個(gè)字節(jié)。

構(gòu)造出來LengthFieldBasedFrameDecoder后對RocketMQ協(xié)議進(jìn)行初次解碼,解碼結(jié)果如下:

我們可以看到,把前四個(gè)字節(jié),也就是把存儲length字段的那部分內(nèi)存截去了,只剩byte[]+header+body部分。

我們再來看RemotingCommand的解碼邏輯

  1. public static RemotingCommand decode(final ByteBuffer byteBuffer) { 
  2.     int length = byteBuffer.limit(); 
  3.     int oriHeaderLen = byteBuffer.getInt(); 
  4.     int headerLength = getHeaderLength(oriHeaderLen); 
  5.  
  6.     byte[] headerData = new byte[headerLength]; 
  7.     byteBuffer.get(headerData); 
  8.  
  9.     RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); 
  10.  
  11.     int bodyLength = length - 4 - headerLength; 
  12.     byte[] bodyData = null
  13.     if (bodyLength > 0) { 
  14.         bodyData = new byte[bodyLength]; 
  15.         byteBuffer.get(bodyData); 
  16.     } 
  17.     cmd.body = bodyData; 
  18.  
  19.     return cmd; 
  20.  
  21. private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) { 
  22.     switch (type) { 
  23.         case JSON: 
  24.             RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class); 
  25.             resultJson.setSerializeTypeCurrentRPC(type); 
  26.             return resultJson; 
  27.         case ROCKETMQ: 
  28.             RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData); 
  29.             resultRMQ.setSerializeTypeCurrentRPC(type); 
  30.             return resultRMQ; 
  31.         default
  32.             break; 
  33.     } 
  34.     return null

只剩byte[4]+header+body三個(gè)部分,解碼邏輯便很清晰。可以分為以下幾個(gè)步驟

  • 求出數(shù)據(jù)的長度、header的長度。
  • 根據(jù)編碼類型,解碼。比如通過JSON方式編碼,則通過JSON方式解碼。
  • 算出body的長度,解碼body。
  • 將最終解碼生成的RemotingCommand對象,發(fā)送給pipeline的下一個(gè)handler處理。

 

 

責(zé)任編輯:武曉燕 來源: 漫漫技術(shù)路
相關(guān)推薦

2024-04-25 17:07:33

無源光網(wǎng)絡(luò)PON接入網(wǎng)技術(shù)

2015-04-13 10:12:08

Windows容器技術(shù)Nano Server

2017-11-10 08:35:06

存儲FCoE網(wǎng)絡(luò)

2014-05-29 09:34:25

2019-05-06 10:51:49

總監(jiān)技術(shù)場景

2019-05-13 08:51:53

總監(jiān)技術(shù)CTO

2013-06-26 09:42:25

技術(shù)服務(wù)器內(nèi)存虛擬化

2020-04-03 09:05:43

麻將 AI Suphx神經(jīng)網(wǎng)絡(luò)

2023-05-08 07:20:22

Doris分析型數(shù)據(jù)庫

2018-07-17 09:34:15

Service Mes技術(shù)Kubernetes

2015-07-27 09:44:38

Amazon EC2云平臺CoreOS容器

2022-04-28 07:59:11

Polkitpkexec漏洞

2021-03-16 15:49:30

架構(gòu)運(yùn)維技術(shù)

2018-04-20 14:37:43

互聯(lián)網(wǎng)技術(shù)細(xì)節(jié)

2022-06-29 13:59:40

家居應(yīng)用鴻蒙

2009-12-02 11:03:29

AMD

2020-12-21 06:58:12

Web安全編解碼工具

2022-09-05 08:12:28

Google二進(jìn)制Protobuf

2020-09-21 05:58:40

深度學(xué)習(xí)算法目標(biāo)檢測

2016-03-31 15:11:47

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號