RocketMQ 有哪些消息類型,你知道嗎?
一、普通消息
1 消息發(fā)送分類
Producer對于消息的發(fā)送方式也有多種選擇,不同的方式會產(chǎn)生不同的系統(tǒng)效果。
同步發(fā)送消息
同步發(fā)送消息是指,Producer發(fā)出?條消息后,會在收到MQ返回的ACK之后才發(fā)下?條消息。該方式 的消息可靠性最高,但消息發(fā)送效率太低。
異步發(fā)送消息
異步發(fā)送消息是指,Producer發(fā)出消息后無需等待MQ返回ACK,直接發(fā)送下?條消息。該方式的消息可靠性可以得到保障,消息發(fā)送效率也可以。
單向發(fā)送消息
單向發(fā)送消息是指,Producer僅負責發(fā)送消息,不等待、不處理MQ的ACK。該發(fā)送方式時MQ也不返回ACK。該方式的消息發(fā)送效率最高,但消息可靠性較差。
二、順序消息
1 什么是順序消息
順序消息指的是,嚴格按照消息的發(fā)送順序進行消費的消息(FIFO)。
默認情況下生產(chǎn)者會把消息以Round Robin輪詢方式發(fā)送到不同的Queue分區(qū)隊列;而消費消息時會從多個Queue上拉取消息,這種情況下的發(fā)送和消費是不能保證順序的。如果將消息僅發(fā)送到同一個Queue中,消費時也只從這個Queue上拉取消息,就嚴格保證了消息的順序性。
2 為什么需要順序消息
例如,現(xiàn)在有TOPIC ORDER_STATUS (訂單狀態(tài)),其下有4個Queue隊列,該Topic中的不同消息用于 描述當前訂單的不同狀態(tài)。假設訂單有狀態(tài):未支付、已支付、發(fā)貨中、發(fā)貨成功、發(fā)貨失敗。
根據(jù)以上訂單狀態(tài),生產(chǎn)者從時序上可以生成如下幾個消息:
訂單T0000001:未支付 --> 訂單T0000001:已支付 --> 訂單T0000001:發(fā)貨中 --> 訂單
T0000001:發(fā)貨失敗
消息發(fā)送到MQ中之后,Queue的選擇如果采用輪詢策略,消息在MQ的存儲可能如下:
這種情況下,我們希望Consumer消費消息的順序和我們發(fā)送是一致的,然而上述MQ的投遞和消費方式,我們無法保證順序是正確的。對于順序異常的消息,Consumer即使設置有一定的狀態(tài)容錯,也不能完全處理好這么多種隨機出現(xiàn)組合情況。
基于上述的情況,可以設計如下方案:對于相同訂單號的消息,通過一定的策略,將其放置在一個Queue中,然后消費者再采用一定的策略(例如,一個線程獨立處理一個queue,保證處理消息的順序性),能夠保證消費的順序性。
3 有序性分類
根據(jù)有序范圍的不同,RocketMQ可以嚴格地保證兩種消息的有序性:分區(qū)有序與全局有序。
全局有序
當發(fā)送和消費參與的Queue只有一個時所保證的有序是整個Topic中消息的順序, 稱為全局有序。
在創(chuàng)建Topic時指定Queue的數(shù)量。有三種指定方式:
1)在代碼中創(chuàng)建Producer時,可以指定其自動創(chuàng)建的Topic的Queue數(shù)量
2)在RocketMQ可視化控制臺中手動創(chuàng)建Topic時指定Queue數(shù)量
3)使用mqadmin命令手動創(chuàng)建Topic時指定Queue數(shù)量
分區(qū)有序
如 果有多個Queue參與,其僅可保證在該Queue分區(qū)隊列上的消息順序,則稱為分區(qū)有序。
如何實現(xiàn)Queue的選擇?在定義Producer時我們可以指定消息隊列選擇器,而這個選擇器是我們自己實現(xiàn)了MessageQueueSelector接口定義的。
在定義選擇器的選擇算法時,一般需要使用選擇key。這個選擇key可以是消息key也可以是其它數(shù)據(jù)。但無論誰做選擇key,都不能重復,都是唯一的。
一般性的選擇算法是,讓選擇key(或其hash值)與該Topic所包含的Queue的數(shù)量取模,其結(jié)果即為選擇出的Queue的QueueId。
取模算法存在一個問題:不同選擇key與Queue數(shù)量取模結(jié)果可能會是相同的,即不同選擇key的消息可能會出現(xiàn)在相同的Queue,即同一個Consuemr可能會消費到不同選擇key的消息。這個問題如何解決?一般性的作法是,從消息中獲取到選擇key,對其進行判斷。若是當前Consumer需要消費的消息,則直接消費,否則,什么也不做。這種做法要求選擇key要能夠隨著消息一起被Consumer獲取到。此時使用消息key作為選擇key是比較好的做法。
以上做法會不會出現(xiàn)如下新的問題呢?不屬于那個Consumer的消息被拉取走了,那么應該消費該消息的Consumer是否還能再消費該消息呢?同一個Queue中的消息不可能被同一個Group中的 不同Consumer同時消費。所以,消費現(xiàn)一個Queue的不同選擇key的消息的Consumer一定屬于不同的Group。而不同的Group中的Consumer間的消費是相互隔離的,互不影響的。
三、延時消息
1 什么是延時消息
當消息寫入到Broker后,在指定的時長后才可被消費處理的消息,稱為延時消息。
采用RocketMQ的延時消息可以實現(xiàn)定時任務的功能,而無需使用定時器。典型的應用場景是,電商交易中超時未支付關(guān)閉訂單的場景,12306平臺訂票超時未支付取消訂票的場景。
在電商平臺中,訂單創(chuàng)建時會發(fā)送一條延遲消息。這條消息將會在30分鐘后投遞給后臺業(yè)務系 統(tǒng)(Consumer),后臺業(yè)務系統(tǒng)收到該消息后會判斷對應的訂單是否已經(jīng)完成支付。如果未完 成,則取消訂單,將商品再次放回到庫存;如果完成支付,則忽略。
在12306平臺中,車票預訂成功后就會發(fā)送一條延遲消息。這條消息將會在45分鐘后投遞給后臺業(yè)務系統(tǒng)(Consumer),后臺業(yè)務系統(tǒng)收到該消息后會判斷對應的訂單是否已經(jīng)完成支付。如 果未完成,則取消預訂,將車票再次放回到票池;如果完成支付,則忽略。
2 延時等級
延時消息的延遲時長不支持隨意時長的延遲,是通過特定的延遲等級來指定的。延時等級定義在RocketMQ服務端的MessageStoreConfig類中的如下變量中:
即,若指定的延時等級為3,則表示延遲時長為10s,即延遲等級是從1開始計數(shù)的。
當然,如果需要自定義的延時等級,可以通過在broker加載的配置中新增如下配置(例如下面增加了1天這個等級1d)。配置文件在RocketMQ安裝目錄下的conf目錄中。
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d
3 延時消息實現(xiàn)原理
具體實現(xiàn)方案是:
修改消息
Producer將消息發(fā)送到Broker后,Broker會首先將消息寫入到commitlog文件,然后需要將其分發(fā)到相應的consumequeue。不過,在分發(fā)之前,系統(tǒng)會先判斷消息中是否帶有延時等級。若沒有,則直接正
常分發(fā);若有則需要經(jīng)歷一個復雜的過程:
- 修改消息的Topic為SCHEDULE_TOPIC_XXXX
- 根據(jù)延時等級,在consumequeue目錄中SCHEDULE_TOPIC_XXXX主題下創(chuàng)建出相應的queueId目錄與consumequeue文件(如果沒有這些目錄與文件的話)。
延遲等級delayLevel與queueId的對應關(guān)系為queueId = delayLevel -1 需要注意,在創(chuàng)建queueId目錄時,并不是一次性地將所有延遲等級對應的目錄全部創(chuàng)建完畢,而是用到哪個延遲等級創(chuàng)建哪個目錄
- 修改消息索引單元內(nèi)容。索引單元中的Message Tag HashCode部分原本存放的是消息的Tag的Hash值?,F(xiàn)修改為消息的投遞時間。投遞時間是指該消息被重新修改為原Topic后再次被寫入到commitlog中的時間。投遞時間 = 消息存儲時間 + 延時等級時間。消息存儲時間指的是消息被發(fā)送到Broker時的時間戳。
- 將消息索引寫入到SCHEDULE_TOPIC_XXXX主題下相應的consumequeue中
SCHEDULE_TOPIC_XXXX目錄中各個延時等級Queue中的消息是如何排序的?
是按照消息投遞時間排序的。一個Broker中同一等級的所有延時消息會被寫入到consumequeue目錄中SCHEDULE_TOPIC_XXXX目錄下相同Queue中。即一個Queue中消息投遞時間的延遲等級時間是相同的。那么投遞時間就取決于于消息存儲時間了。即按照消息被發(fā)送到Broker的時間進行排序的。
投遞延時消息
Broker內(nèi)部有?個延遲消息服務類ScheuleMessageService,其會消費SCHEDULE_TOPIC_XXXX中的消息,即按照每條消息的投遞時間,將延時消息投遞到?標Topic中。不過,在投遞之前會從commitlog
中將原來寫入的消息再次讀出,并將其原來的延時等級設置為0,即原消息變?yōu)榱艘粭l不延遲的普通消息。然后再次將消息投遞到目標Topic中。
ScheuleMessageService在Broker啟動時,會創(chuàng)建并啟動一個定時器TImer,用于執(zhí)行相應的定時任務。系統(tǒng)會根據(jù)延時等級的個數(shù),定義相應數(shù)量的TimerTask,每個TimerTask負責一個延遲 等級消息的消費與投遞。每個TimerTask都會檢測相應Queue隊列的第一條消息是否到期。若第 一條消息未到期,則后面的所有消息更不會到期(消息是按照投遞時間排序的);若第一條消息到期了,則將該消息投遞到目標Topic,即消費該消息。
將消息重新寫入commitlog
延遲消息服務類ScheuleMessageService將延遲消息再次發(fā)送給了commitlog,并再次形成新的消息索引條目,分發(fā)到相應Queue。
這其實就是一次普通消息發(fā)送。只不過這次的消息Producer是延遲消息服務類 ScheuleMessageService。
四、事務消息
1 問題引入
這里的一個需求場景是:工行用戶A向建行用戶B轉(zhuǎn)賬1萬元。
我們可以使用同步消息來處理該需求場景:
- 工行系統(tǒng)發(fā)送一個給B增款1萬元的同步消息M給Broker
- 消息被Broker成功接收后,向工行系統(tǒng)發(fā)送成功ACK
- 工行系統(tǒng)收到成功ACK后從用戶A中扣款1萬元
- 建行系統(tǒng)從Broker中獲取到消息M
- 建行系統(tǒng)消費消息M,即向用戶B中增加1萬元
這其中是有問題的:若第3步中的扣款操作失敗,但消息已經(jīng)成功發(fā)送到了Broker。對于MQ來 說,只要消息寫入成功,那么這個消息就可以被消費。此時建行系統(tǒng)中用戶B增加了1萬元。出 現(xiàn)了數(shù)據(jù)不一致問題。
2 解決思路
解決思路是,讓第1、2、3步具有原子性,要么全部成功,要么全部失敗。即消息發(fā)送成功后,必須要保證扣款成功。如果扣款失敗,則回滾發(fā)送成功的消息。而該思路即使用事務消息。這里要使用分布式事務解決方案。
使用事務消息來處理該需求場景:
- 事務管理器TM向事務協(xié)調(diào)器TC發(fā)起指令,開啟全局事務
- 工行系統(tǒng)發(fā)一個給B增款1萬元的事務消息M給TC
- TC會向Broker發(fā)送半事務消息prepareHalf,將消息M預提交到Broker。此時的建行系統(tǒng)是看不到Broker中的消息M的
- Broker會將預提交執(zhí)行結(jié)果Report給TC。
- 如果預提交失敗,則TC會向TM上報預提交失敗的響應,全局事務結(jié)束;如果預提交成功,TC會調(diào)用工行系統(tǒng)的回調(diào)操作,去完成工行用戶A的預扣款1萬元的操作
- 工行系統(tǒng)會向TC發(fā)送預扣款執(zhí)行結(jié)果,即本地事務的執(zhí)行狀態(tài)
- TC收到預扣款執(zhí)行結(jié)果后,會將結(jié)果上報給TM。
預扣款執(zhí)行結(jié)果存在三種可能性:
// 描述本地事務執(zhí)行狀態(tài)
public enum LocalTransactionState {
COMMIT_MESSAGE, // 本地事務執(zhí)行成功
ROLLBACK_MESSAGE, // 本地事務執(zhí)行失敗
UNKNOW, // 不確定,表示需要進行回查以確定本地事務的執(zhí)行結(jié)果
}
- TM會根據(jù)上報結(jié)果向TC發(fā)出不同的確認指令
若預扣款成功(本地事務狀態(tài)為COMMIT_MESSAGE),則TM向TC發(fā)送Global Commit指令
若預扣款失?。ū镜厥聞諣顟B(tài)為ROLLBACK_MESSAGE),則TM向TC發(fā)送Global Rollback指令
若現(xiàn)未知狀態(tài)(本地事務狀態(tài)為UNKNOW),則會觸發(fā)工行系統(tǒng)的本地事務狀態(tài)回查操作?;夭椴僮鲿⒒夭榻Y(jié)果,即COMMIT_MESSAGE或ROLLBACK_MESSAGE Report給TC。TC將結(jié)果上 報給TM,TM會再向TC發(fā)送最終確認指令Global Commit或Global Rollback
- TC在接收到指令后會向Broker與工行系統(tǒng)發(fā)出確認指令
TC接收的若是Global Commit指令,則向Broker與工行系統(tǒng)發(fā)送Branch Commit指令。此時 Broker中的消息M才可被建行系統(tǒng)看到;此時的工行用戶A中的扣款操作才真正被確認 TC接收到的若是Global Rollback指令,則向Broker與工行系統(tǒng)發(fā)送Branch Rollback指令。此時 Broker中的消息M將被撤銷;工行用戶A中的扣款操作將被回滾
以上方案就是為了確保消息投遞與扣款操作能夠在一個事務中,要成功都成功,有一個失敗,則全部回滾。
以上方案并不是一個典型的XA模式。因為XA模式中的分支事務是異步的,而事務消息方案中的消息預提交與預扣款操作間是同步的。
3 基礎
分布式事務
對于分布式事務,通俗地說就是,一次操作由若干分支操作組成,這些分支操作分屬不同應用,分布在 不同服務器上。分布式事務需要保證這些分支操作要么全部成功,要么全部失敗。分布式事務與普通事務一樣,就是為了保證操作結(jié)果的一致性。
事務消息
RocketMQ提供了類似X/Open XA的分布式事務功能,通過事務消息能達到分布式事務的最終一致。XA 是一種分布式事務解決方案,一種分布式事務處理模式。
半事務消息
暫不能投遞的消息,發(fā)送方已經(jīng)成功地將消息發(fā)送到了Broker,但是Broker未收到最終確認指令,此時 該消息被標記成“暫不能投遞”狀態(tài),即不能被消費者看到。處于該種狀態(tài)下的消息即半事務消息。
本地事務狀態(tài)
Producer回調(diào)操作執(zhí)行的結(jié)果為本地事務狀態(tài),其會發(fā)送給TC,而TC會再發(fā)送給TM。TM會根據(jù)TC發(fā)送來的本地事務狀態(tài)來決定全局事務確認指令。
消息回查
消息回查,即重新查詢本地事務的執(zhí)行狀態(tài)。本例就是重新到DB中查看預扣款操作是否執(zhí)行成功。
注意,消息回查不是重新執(zhí)行回調(diào)操作?;卣{(diào)操作是進行預扣款操作,而消息回查則是查看預 扣款操作執(zhí)行的結(jié)果。
引發(fā)消息回查的原因最常見的有兩個:
1)回調(diào)操作返回UNKNWON
2)TC沒有接收到TM的最終全局事務確認指令
RocketMQ中的消息回查設置
關(guān)于消息回查,有三個常見的屬性設置。它們都在broker加載的配置文件中設置,例如:
- transactinotallow=20,指定TM在20秒內(nèi)應將最終確認狀態(tài)發(fā)送給TC,否則引發(fā)消息回查。默認為60秒
- transactinotallow=5,指定最多回查5次,超過后將丟棄消息并記錄錯誤日志。默認15次。
- transactinotallow=10,指定設置的多次消息回查的時間間隔為10秒。默認為60秒。
4 XA模式三劍客
XA協(xié)議
XA(Unix Transaction)是一種分布式事務解決方案,一種分布式事務處理模式,是基于XA協(xié)議的。
XA協(xié)議由Tuxedo(Transaction for Unix has been Extended for Distributed Operation,分布式操作擴展之后的Unix事務系統(tǒng))首先提出的,并交給X/Open組織,作為資源管理器與事務管理器的接口標
準。
XA模式中有三個重要組件:TC、TM、RM。
TC
Transaction Coordinator,事務協(xié)調(diào)者。維護全局和分支事務的狀態(tài),驅(qū)動全局事務提交或回滾。
RocketMQ中Broker充當著TC。
TM
Transaction Manager,事務管理器。定義全局事務的范圍:開始全局事務、提交或回滾全局事務。它 實際是全局事務的發(fā)起者。
RocketMQ中事務消息的Producer充當著TM。
RM
Resource Manager,資源管理器。管理分支事務處理的資源,與TC交談以注冊分支事務和報告分支事務的狀態(tài),并驅(qū)動分支事務提交或回滾。
RocketMQ中事務消息的Producer及Broker均是RM。
5 注意
事務消息不支持延時消息
對于事務消息要做好冪等性檢查,因為事務消息可能不止一次被消費(因為存在回滾后再提交的情況)
五、批量消息
1 批量發(fā)送消息
發(fā)送限制
生產(chǎn)者進行消息發(fā)送時可以一次發(fā)送多條消息,這可以大大提升Producer的發(fā)送效率。不過需要注意以 下幾點:
- 批量發(fā)送的消息必須具有相同的Topic
- 批量發(fā)送的消息必須具有相同的刷盤策略
- 批量發(fā)送的消息不能是延時消息與事務消息
批量發(fā)送大小
默認情況下,一批發(fā)送的消息總大小不能超過4MB字節(jié)。如果想超出該值,有兩種解決方案:
方案一:將批量消息進行拆分,拆分為若干不大于4M的消息集合分多次批量發(fā)送
方案二:在Producer端與Broker端修改屬性
** Producer端需要在發(fā)送之前設置Producer的maxMessageSize屬性
** Broker端需要修改其加載的配置文件中的maxMessageSize屬性
生產(chǎn)者發(fā)送的消息大小
生產(chǎn)者通過send()方法發(fā)送的Message,并不是直接將Message序列化后發(fā)送到網(wǎng)絡上的,而是通過這 個Message生成了一個字符串發(fā)送出去的。這個字符串由四部分構(gòu)成:Topic、消息Body、消息日志(占20字節(jié)),及用于描述消息的一堆屬性key-value。這些屬性中包含例如生產(chǎn)者地址、生產(chǎn)時間、要發(fā)送的QueueId等。最終寫入到Broker中消息單元中的數(shù)據(jù)都是來自于這些屬性。
2 批量消費消息
修改批量屬性
Consumer的MessageListenerConcurrently監(jiān)聽接口的consumeMessage()方法的第一個參數(shù)為消息列 表,但默認情況下每次只能消費一條消息。若要使其一次可以消費多條消息,則可以通過改Consumer的consumeMessageBatchMaxSize屬性來指定。不過,該值不能超過32。因為默認情況下消 費者每次可以拉取的消息最多是32條。若要修改一次拉取的最大值,則可通過修改Consumer的pullBatchSize屬性來指定。
存在的問題
Consumer的pullBatchSize屬性與consumeMessageBatchMaxSize屬性是否設置的越大越好?當然不是。 pullBatchSize值設置的越大,Consumer每拉取一次需要的時間就會越長,且在網(wǎng)絡上傳輸出現(xiàn) 問題的可能性就越高。若在拉取過程中若出現(xiàn)了問題,那么本批次所有消息都需要全部重新拉 取。
,你consumeMessageBatchMaxSize值設置的越大,Consumer的消息并發(fā)消費能力越低,且這批被消費的消息具有相同的消費結(jié)果。因為consumeMessageBatchMaxSize指定的一批消息只會使用一 個線程進行處理,且在處理過程中只要有一個消息處理異常,則這批消息需要全部重新再次消費 處理。