常用消息隊(duì)列框架與技術(shù)選型
又是一年雙11季,土豪們買買買,程序員看看熱鬧,聊聊技術(shù)。海量的訂單、支付請(qǐng)求以及庫(kù)存更新等任務(wù),離不開分布式架構(gòu)(SOFAStack)、分布式數(shù)據(jù)庫(kù)(OceanBase)、分布式緩存(Tair)、數(shù)據(jù)處理(Flink)等一系列框架的支持。而消息隊(duì)列作為連接這些組件的重要紐帶,可以實(shí)現(xiàn)各組件之間的異步通信和解耦。本文接下來就聊聊消息隊(duì)列那些事兒~
消息隊(duì)列給我們帶來什么?
消息中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用解耦,異步消息,流量削峰等問題,實(shí)現(xiàn)高性能,高可用,可伸縮和最終一致性的系統(tǒng)架構(gòu)。
- 應(yīng)用解耦
在分布式系統(tǒng)中,服務(wù)之間可能會(huì)有依賴關(guān)系,如果直接進(jìn)行服務(wù)調(diào)用,會(huì)增加服務(wù)之間的耦合度。使用消息隊(duì)列可以將服務(wù)之間的通信轉(zhuǎn)化為消息的發(fā)送和接收,降低服務(wù)之間的耦合度。
降低系統(tǒng)耦合性(源于網(wǎng)絡(luò))
- 流量削峰/數(shù)據(jù)緩沖
在高并發(fā)場(chǎng)景下,瞬間的請(qǐng)求量可能會(huì)超出系統(tǒng)的承受能力,導(dǎo)致系統(tǒng)癱瘓。使用消息隊(duì)列可以實(shí)現(xiàn)流量削峰,將請(qǐng)求放入消息隊(duì)列中,由消費(fèi)者服務(wù)異步消費(fèi)請(qǐng)求,有效降低瞬間的請(qǐng)求量,保護(hù)系統(tǒng)穩(wěn)定性。
削峰/限流(源于網(wǎng)絡(luò))
- 異步處理:
在分布式系統(tǒng)中,不同服務(wù)之間的調(diào)用可能會(huì)因?yàn)榫W(wǎng)絡(luò)延遲或者服務(wù)負(fù)載高等原因?qū)е抡{(diào)用時(shí)間較長(zhǎng)。使用消息隊(duì)列可以實(shí)現(xiàn)異步處理,將請(qǐng)求放入消息隊(duì)列中,由消費(fèi)者服務(wù)異步消費(fèi)請(qǐng)求,提高系統(tǒng)的并發(fā)性和吞吐量。
異步提高性能(源于網(wǎng)絡(luò))
常用的消息隊(duì)列框架?
目前在市面上比較主流的消息隊(duì)列中間件主要有,Kafka、ActiveMQ、RabbitMQ、RocketMQ 等這幾種。
RocketMQ和Kafka 都是高吞吐量、高可用的分布式消息隊(duì)列系統(tǒng),相較于早期較活躍的ActiveMQ 和RabbitMQ 還是有著明顯優(yōu)勢(shì)的,特別是在雙11這樣的場(chǎng)景,吞吐量的重要性是不言而喻的。
接下來從多個(gè)維度重點(diǎn)對(duì)RocketMQ與Kafka對(duì)比:
數(shù)據(jù)可靠性
- RocketMQ:支持異步實(shí)時(shí)刷盤、同步刷盤、同步復(fù)制、異步復(fù)制?!巴剿⒈P”,可以提高單機(jī)的可靠性,避免數(shù)據(jù)丟失。
- kafka:使用異步刷盤方式,異步復(fù)制/同步復(fù)制。采用的是異步刷盤的方式,可能會(huì)存在一定的數(shù)據(jù)丟失風(fēng)險(xiǎn)。不過,Kafka也提供了一些可靠性保障的機(jī)制,例如副本機(jī)制和ISR機(jī)制等,可以在一定程度上保證數(shù)據(jù)的可靠性。
在同步復(fù)制方面,RocketMQ可以利用IO組的commit機(jī)制,批量傳輸數(shù)據(jù),因此性能上可能比Kafka要更好一些。而Kafka的同步復(fù)制是以partition為單位進(jìn)行的,一個(gè)Kafka實(shí)例上可能有多個(gè)partition,這可能會(huì)影響性能。
單機(jī)支持的隊(duì)列數(shù)
- kafka單機(jī)若超過超過一定數(shù)量的partition/隊(duì)列,CPU load會(huì)發(fā)生明顯飆高,partition越多,CPU load越高,發(fā)消息的響應(yīng)時(shí)間變長(zhǎng)。
- RocketMQ單機(jī)支持最高5萬(wàn)個(gè)隊(duì)列,CPU load不會(huì)發(fā)生明顯變化。
隊(duì)列多有什么好處呢?
單機(jī)可以創(chuàng)建更多個(gè)topic, 因?yàn)槊總€(gè)topic都是有一組隊(duì)列組成。
消費(fèi)者的集群規(guī)模和隊(duì)列數(shù)成正比,隊(duì)列越多,消費(fèi)類集群可以越大。
消息投遞的實(shí)時(shí)性
- kafka只支持pull模式,實(shí)時(shí)性取決于pull時(shí)間間隔(0.8以后版本支持長(zhǎng)輪詢)
- rocketmq有pull(長(zhǎng)輪詢)、push兩種模式 (雖然這個(gè)push模式是假push),push模式延遲肯定是比pull模式延遲低。
push模式是基于pull模式的,本地有個(gè)定時(shí)線程去pull broker的消息,緩存到本地,然后push到消費(fèi)線程。
消費(fèi)失敗重試
- Kafka本身不支持消費(fèi)失敗重試,但是可以通過設(shè)置消費(fèi)者的參數(shù)來實(shí)現(xiàn)重試機(jī)制。如,設(shè)置消費(fèi)者的max.poll.retries
- RocketMQ消費(fèi)失敗支持定時(shí)重試,每次重試間隔時(shí)間順延。
這里的重試指可靠的重試,即失敗重試的消息不是因?yàn)閏onsumer宕機(jī)而導(dǎo)致的消息丟失。
嚴(yán)格保證消息有序
- kafka可保證同一個(gè)partition上的消息有序,但一旦broker宕機(jī),就會(huì)產(chǎn)生消息亂序。
- Rocket支持嚴(yán)格的消息順序,一臺(tái)broker宕機(jī),發(fā)送消息會(huì)失敗,但不會(huì)亂序。舉例:MySQL的二進(jìn)制日志分發(fā)需要保證嚴(yán)格的順序。
定時(shí)消息
- kafka不支持定時(shí)消息
- 開源版本的RocketMQ僅支持定時(shí)級(jí)別,定時(shí)級(jí)別用戶可定制
分布式事務(wù)消息
- kafka不支持分布式事務(wù)消息
- RocketMQ支持分布式事務(wù)消息。
消息查詢
- Kafka本身不提供內(nèi)置的消息查詢功能
- RocketMQ支持根據(jù)消息標(biāo)識(shí)(發(fā)送消息時(shí)指定一個(gè)消息key, 任意字符串,如指定為訂單編號(hào))查詢消息,也支持根據(jù)消息內(nèi)容查詢消息。
消息回溯
- kafka可按照消息的offset來回溯消息
- RocketMQ支持按照時(shí)間來回溯消息,精度到毫秒,例如從一天的幾點(diǎn)幾分幾秒幾毫秒來重新消費(fèi)消息。
RocketMQ按時(shí)間做回溯消息的典型應(yīng)用場(chǎng)景為,consumer做訂單分析,但是由于程序邏輯或依賴的系統(tǒng)發(fā)生故障等原因,導(dǎo)致今天處理的消息全部無效,需要從昨天的零點(diǎn)重新處理。
消息并行度
- kafka的消息并行度,依賴于topic里配置的partition數(shù),如果partition數(shù)為10,那么最多10臺(tái)機(jī)器來消費(fèi),每臺(tái)機(jī)器只能開啟一個(gè)線程;或者一臺(tái)機(jī)器消費(fèi),最多開啟10個(gè)線程。消費(fèi)的并行度與partition個(gè)數(shù)一致。
- RocketMQ并行消費(fèi)分兩種情況:1)順序消費(fèi)方式的并行度與kafka一致;2)亂序消費(fèi)方式的并行度取決于consumer的線程數(shù),如topic配置10個(gè)隊(duì)列,10臺(tái)機(jī)器消費(fèi),每臺(tái)機(jī)器100個(gè)線程,那么并行度為1000。
消息隊(duì)列如何選型?
- ActiveMQ 的社區(qū)算是比較成熟,但是較目前來說,ActiveMQ 的性能比較差,而且版本迭代很慢,不推薦使用。
- RabbitMQ 在吞吐量方面雖然稍遜于 Kafka、RocketMQ ,由于它基于 Erlang 開發(fā),所以并發(fā)能力很強(qiáng),性能極其好,延時(shí)很低,達(dá)到微秒級(jí)。但是也因?yàn)榛?Erlang 開發(fā),所以國(guó)內(nèi)很少有公司有實(shí)力做 Erlang 源碼級(jí)別的研究和定制。如果業(yè)務(wù)場(chǎng)景對(duì)并發(fā)量要求不是太高(十萬(wàn)級(jí)、百萬(wàn)級(jí)),那這幾種消息隊(duì)列中,RabbitMQ 或許是你的首選。
- RocketMQ 阿里開源,久經(jīng)雙十一考驗(yàn),可以定制自己公司的 MQ。且支持事務(wù)消息,對(duì)消息一致性要求比較高的場(chǎng)景優(yōu)先考慮。
- Kafka 的特點(diǎn)其實(shí)很明顯,就是僅僅提供較少的核心功能,但是提供超高的吞吐量,ms 級(jí)的延遲,極高的可用性以及可靠性,而且分布式可以任意擴(kuò)展。同時(shí) Kafka 最好是支撐較少的 topic 數(shù)量即可,保證其超高吞吐量。Kafka 唯一的一點(diǎn)劣勢(shì)是有可能消息重復(fù)消費(fèi),那么對(duì)數(shù)據(jù)準(zhǔn)確性會(huì)造成極其輕微的影響,在大數(shù)據(jù)領(lǐng)域中以及日志采集中,這點(diǎn)輕微影響可以忽略這個(gè)特性天然適合大數(shù)據(jù)實(shí)時(shí)計(jì)算以及日志收集。如果是大數(shù)據(jù)領(lǐng)域的實(shí)時(shí)計(jì)算、日志采集等場(chǎng)景,用 Kafka 可謂是行業(yè)標(biāo)準(zhǔn)。
消息隊(duì)列常見問題
如何避免重復(fù)消費(fèi)?如何保證冪等性?
冪等性:就是用戶對(duì)于同一操作發(fā)起的一次請(qǐng)求或者多次請(qǐng)求的結(jié)果是一致的,不會(huì)因?yàn)槎啻吸c(diǎn)擊而產(chǎn)生了副作用問題
我們先來了解一下產(chǎn)生消息重復(fù)消費(fèi)的原因,對(duì)于MQ的使用,有三個(gè)角色:生產(chǎn)者、MQ、消費(fèi)者,那么消息的重復(fù)這三者會(huì)出現(xiàn):
- 生產(chǎn)者:生產(chǎn)者可能會(huì)推送重復(fù)的數(shù)據(jù)到MQ中,有可能controller接口重復(fù)提交了兩次,也可能是重試機(jī)制導(dǎo)致的
- MQ:假設(shè)網(wǎng)絡(luò)出現(xiàn)了波動(dòng),消費(fèi)者消費(fèi)完一條消息后,發(fā)送ack時(shí),MQ還沒來得及接受,突然掛了,導(dǎo)致MQ以為消費(fèi)者還未消費(fèi)該條消息,MQ回復(fù)后會(huì)再次推送了這條消息,導(dǎo)致出現(xiàn)重復(fù)消費(fèi)。
- 消費(fèi)者:消費(fèi)者接收到消息后,正準(zhǔn)備發(fā)送ack到MQ,突然消費(fèi)者掛了,還沒得及發(fā)送ack,這時(shí)MQ以為消費(fèi)者還沒消費(fèi)該消息,消費(fèi)者重啟后,MQ再次推送該條消息。
如何解決呢?在正常情況下,生產(chǎn)者是客戶,我們很難避免出現(xiàn)用戶重復(fù)點(diǎn)擊的情況,而MQ是允許存在多條一樣的消息,但消費(fèi)者是不允許出現(xiàn)消費(fèi)兩條一樣的數(shù)據(jù),所以冪等性一般是在消費(fèi)端實(shí)現(xiàn)的:
- 狀態(tài)判斷:消費(fèi)者把消費(fèi)消息記錄到redis中,再次消費(fèi)時(shí)先到redis判斷是否存在該數(shù)據(jù),存在則表示消費(fèi)過,直接丟棄
- 業(yè)務(wù)判斷:消費(fèi)完數(shù)據(jù)后,都是需要插入到數(shù)據(jù)庫(kù)中,使用數(shù)據(jù)庫(kù)的唯一約束防止重復(fù)消費(fèi)。插入數(shù)據(jù)庫(kù)前先查詢是否存在該數(shù)據(jù),存在則直接丟棄消息,這種方式是比較簡(jiǎn)單粗暴地解決問題
如何解決消息丟失?
消息丟失屬于比較常見的問題。一般有生產(chǎn)端丟失、MQ服務(wù)丟失、消費(fèi)端丟失等三種情況。針對(duì)各種情況應(yīng)對(duì)方式也不一樣。
生產(chǎn)端丟失的解決方案主要有。
- 開啟confirm模式,生產(chǎn)著收到MQ發(fā)回的confirm確認(rèn)之后,再進(jìn)行消息刪除,否則消息重推。
- 生產(chǎn)者端消息保存的數(shù)據(jù)庫(kù),由后臺(tái)定時(shí)程序異步推送,收到confirm確認(rèn)則認(rèn)為成功,否則消息重推,重推多次均未成功,則認(rèn)為發(fā)送失敗。
MQ服務(wù)丟失則主要是開啟消息持久化,讓消息及時(shí)保存到磁盤。
消費(fèi)端消息丟失則關(guān)閉自動(dòng)ack確認(rèn),消息消費(fèi)成功后手動(dòng)發(fā)送ack確認(rèn)。消息消費(fèi)失敗,則重新消費(fèi)。
(3)如何保證消息有序性
在生產(chǎn)端發(fā)布消息時(shí),每次法發(fā)布消息都把上一條消息的ID記錄到消息體中,消費(fèi)者接收到消息時(shí),做如下操作:
- 先根據(jù)上一條Id去檢查是否存在上一條消息還沒被消費(fèi),如果不存在(消費(fèi)后去掉id),則正常進(jìn)行,如果正常操作
- 如果存在,則根據(jù)id到數(shù)據(jù)庫(kù)檢查是否被消費(fèi),如果被消費(fèi),則正常操作
- 如果還沒被消費(fèi),則休眠一定時(shí)間(比如30ms),再重新檢查,如被消費(fèi),則正常操作
- 如果還沒被消費(fèi),則拋出異常
(4) 如何解決消息積壓?jiǎn)栴}?
所謂的消息積壓,即生成者生成消息太快,而消費(fèi)者處理消息太慢,從而導(dǎo)致消費(fèi)端消息積壓在MQ中無法處理的問題。遇到這種消息積壓的情況,可以根據(jù)消息重要程度,分為兩種情況處理:
- 如果消息可以被丟棄,那么直接丟棄就好了
- 一般情況下,消息是不可以被丟棄的,這樣就需要考慮策略了,可以將原本的消費(fèi)端重新部署為一個(gè)新的消息隊(duì)列(MQ)實(shí)例,并在后續(xù)增加消費(fèi)端,以形成另一條生產(chǎn)-消息-消費(fèi)的線路。
PS:實(shí)際項(xiàng)目中是否需要使用消息隊(duì)列以及如何使用,還是要根據(jù)業(yè)務(wù)特點(diǎn)進(jìn)行選擇,一個(gè)UV沒幾個(gè)的系統(tǒng),使用消息隊(duì)列,則純粹是老板掏錢、研發(fā)受罪了。