這篇文章專治MQ中間件各種疑難雜癥
原創(chuàng)【51CTO.com原創(chuàng)稿件】上周分享的一篇《面試大殺器:為什么一定要用MQ中間件?》受到了大家的一致好評,今天這篇文章為大家總結(jié)下 MQ 應(yīng)用中的一些疑難雜癥。
消息隊列有什么優(yōu)點和缺點?
為什么使用消息隊列?假設(shè)你的業(yè)務(wù)場景遇到個技術(shù)挑戰(zhàn),如果不用 MQ 可能會很麻煩,但是你用了 MQ 之后會帶給你很多好處。
消息隊列 MQ 的常見使用場景其實有很多,但是比較核心的有如下三個:
- 解耦
- 異步
- 削峰
解耦:A 系統(tǒng)發(fā)送個數(shù)據(jù)到 BCD 三個系統(tǒng),接口調(diào)用發(fā)送,那如果 E 系統(tǒng)也要這個數(shù)據(jù)呢?那如果 C 系統(tǒng)現(xiàn)在不需要了呢?
現(xiàn)在 A 系統(tǒng)又要發(fā)送第二種數(shù)據(jù)了呢?而且 A 系統(tǒng)要時時刻刻考慮 BCDE 四個系統(tǒng)如果掛了咋辦?要不要重發(fā)?我要不要把消息存起來?
你需要去考慮一下你負(fù)責(zé)的系統(tǒng)中是否有類似的場景,就是一個系統(tǒng)或者一個模塊,調(diào)用了多個系統(tǒng)或者模塊,互相之間的調(diào)用很復(fù)雜,維護起來很麻煩。
但是,這個調(diào)用是不需要直接同步調(diào)用接口的,如果用 MQ 給他異步化解耦,也是可以的。你就只需要去考慮在你的項目里,是不是可以運用這個 MQ 去進(jìn)行系統(tǒng)的解耦。
異步:A 系統(tǒng)接收一個請求,需要在自己本地寫庫,還需要在 BCD 三個系統(tǒng)寫庫,自己本地寫庫要 30ms,BCD 三個系統(tǒng)分別寫庫要 300ms、450ms、200ms。
最終請求總延時是 30 + 300 + 450 + 200 = 980ms,接近 1s,異步后,BCD 三個系統(tǒng)分別寫庫的時間,A 系統(tǒng)就不再考慮了。
削峰:每天 0 點到 16 點,A 系統(tǒng)風(fēng)平浪靜,每秒并發(fā)請求數(shù)量就 100 個。結(jié)果每次一到 16 點~23 點,每秒并發(fā)請求數(shù)量突然會暴增到 10000 條。
但是系統(tǒng)的處理能力就只能是每秒鐘處理 1000 個請求啊。怎么辦?需要我們進(jìn)行流量的削峰,讓系統(tǒng)可以平緩的處理突增的請求。
優(yōu)點上面已經(jīng)說了,就是在特殊場景下有其對應(yīng)的好處,解耦、異步、削峰,那么消息隊列有什么缺點?
系統(tǒng)可用性降低:系統(tǒng)引入的外部依賴越多,越容易掛掉,本來你就是 A 系統(tǒng)調(diào)用 BCD 三個系統(tǒng)的接口就好了。
ABCD 四個系統(tǒng)好好的,沒啥問題,你偏加個 MQ 進(jìn)來,萬一 MQ 掛了怎么辦?MQ 掛了,整套系統(tǒng)崩潰了,業(yè)務(wù)也就停頓了。
系統(tǒng)復(fù)雜性提高:硬生生加個 MQ 進(jìn)來,怎么保證消息沒有重復(fù)消費?怎么處理消息丟失的情況?怎么保證消息傳遞的順序性?
一致性問題:A 系統(tǒng)處理完了直接返回成功了,大家都以為你這個請求就成功了。
但問題是,要是 BCD 三個系統(tǒng)那里,BD 兩個系統(tǒng)寫庫成功了,結(jié)果 C 系統(tǒng)寫庫失敗了,你這數(shù)據(jù)就不一致了。
所以消息隊列實際是一種非常復(fù)雜的架構(gòu),你引入它有很多好處,但是也得針對它帶來的壞處做各種額外的技術(shù)方案和架構(gòu)來規(guī)避掉。
常見消息隊列的比較如下圖:
如何解決重復(fù)消費?
消息重復(fù)的原因
消息發(fā)送端應(yīng)用的消息重復(fù)發(fā)送,有以下幾種情況:
- 消息發(fā)送端發(fā)送消息給消息中間件,消息中間件收到消息并成功存儲,而這時消息中間件出現(xiàn)了問題,導(dǎo)致應(yīng)用端沒有收到消息發(fā)送成功的返回因而進(jìn)行重試產(chǎn)生了重復(fù)。
- 消息中間件因為負(fù)載高響應(yīng)變慢,成功把消息存儲到消息存儲中后,返回“成功”這個結(jié)果時超時。
- 消息中間件將消息成功寫入消息存儲,在返回結(jié)果時網(wǎng)絡(luò)出現(xiàn)問題,導(dǎo)致應(yīng)用發(fā)送端重試,而重試時網(wǎng)絡(luò)恢復(fù),由此導(dǎo)致重復(fù)。
可以看到,通過消息發(fā)送端產(chǎn)生消息重復(fù)的主要原因是消息成功進(jìn)入消息存儲后,因為各種原因使得消息發(fā)送端沒有收到“成功”的返回結(jié)果,并且又有重試機制,因而導(dǎo)致重復(fù)。
消息到達(dá)了消息存儲,由消息中間件進(jìn)行向外的投遞時產(chǎn)生重復(fù),有以下幾種情況:
- 消息被投遞到消息接收者應(yīng)用進(jìn)行處理,處理完畢后應(yīng)用出問題了,消息中間件不知道消息處理結(jié)果,會再次投遞。
- 消息被投遞到消息接收者應(yīng)用進(jìn)行處理,處理完畢后網(wǎng)絡(luò)出現(xiàn)問題了,消息中間件沒有收到消息處理結(jié)果,會再次投遞。
- 消息被投遞到消息接收者應(yīng)用進(jìn)行處理,處理時間比較長,消息中間件因為消息超時會再次投遞。
- 消息被投遞到消息接收者應(yīng)用進(jìn)行處理,處理完畢后消息中間件出問題了,沒能收到消息結(jié)果并處理,會再次投遞。
- 消息被投遞到消息接收者應(yīng)用進(jìn)行處理,處理完畢后消息中間件收到結(jié)果但是遇到消息存儲故障,沒能更新投遞狀態(tài),會再次投遞。
可以看到,在投遞過程中產(chǎn)生的消息重復(fù)接收主要是因為消息接收者成功處理完消息后,消息中間件不能及時更新投遞狀態(tài)造成的。
如何解決重復(fù)消費
那么有什么辦法可以解決呢?主要是要求消息接收者來處理這種重復(fù)的情況,也就是要求消息接收者的消息處理是冪等操作。
什么是冪等性?對于消息接收端的情況,冪等的含義是采用同樣的輸入多次調(diào)用處理函數(shù),得到同樣的結(jié)果。
例如,一個 SQL 操作:
- update stat_table set count= 10 where id =1
這個操作多次執(zhí)行,id 等于 1 的記錄中的 count 字段的值都為 10,這個操作就是冪等的,我們不用擔(dān)心這個操作被重復(fù)。
再來看另外一個 SQL 操作:
- update stat_table set count= count +1 where id= 1;
這樣的 SQL 操作就不是冪等的,一旦重復(fù),結(jié)果就會產(chǎn)生變化。
因此應(yīng)對消息重復(fù)的辦法是使消息接收端的處理是一個冪等操作。這樣的做法降低了消息中間件的整體復(fù)雜性,不過也給使用消息中間件的消息接收端應(yīng)用帶來了一定的限制和門檻。
①MVCC
多版本并發(fā)控制,樂觀鎖的一種實現(xiàn),在生產(chǎn)者發(fā)送消息時進(jìn)行數(shù)據(jù)更新時需要帶上數(shù)據(jù)的版本號,消費者去更新時需要去比較持有數(shù)據(jù)的版本號,版本號不一致的操作無法成功。
例如博客點贊次數(shù)自動 +1 的接口:
- public boolean addCount(Long id, Long version);
- update blogTable set count= count+1,version=version+1 where id=321 and version=123
每一個 version 只有一次執(zhí)行成功的機會,一旦失敗了生產(chǎn)者必須重新獲取數(shù)據(jù)的新版本號再次發(fā)起更新。
②去重表
利用數(shù)據(jù)庫表單的特性來實現(xiàn)冪等,常用的一個思路是在表上構(gòu)建索引,保證某一類數(shù)據(jù)一旦執(zhí)行完畢,后續(xù)同樣的請求不再重復(fù)處理了(利用一張日志表來記錄已經(jīng)處理成功的消息的 id,如果新到的消息 id 已經(jīng)在日志表中,那么就不再處理這條消息。)
以電商平臺為例子,電商平臺上的訂單 id 就是最適合的 token。當(dāng)用戶下單時,會經(jīng)歷多個環(huán)節(jié),比如生成訂單,減庫存,減優(yōu)惠券等等。
每一個環(huán)節(jié)執(zhí)行時都先檢測一下該訂單 id 是否已經(jīng)執(zhí)行過這一步驟,對未執(zhí)行的請求,執(zhí)行操作并緩存結(jié)果,而對已經(jīng)執(zhí)行過的 id,則直接返回之前的執(zhí)行結(jié)果,不做任何操作。
這樣可以在更大程度上避免操作的重復(fù)執(zhí)行問題,緩存起來的執(zhí)行結(jié)果也能用于事務(wù)的控制等。
如何保證消息的可靠性傳輸?
ActiveMQ
要保證消息的可靠性,除了消息的持久化,還包括兩個方面:
- 生產(chǎn)者發(fā)送的消息可以被 ActiveMQ 收到。
- 消費者收到了 ActiveMQ 發(fā)送的消息。
①生產(chǎn)者
非持久化又不在事務(wù)中的消息,可能會有消息的丟失。為保證消息可以被 ActiveMQ 收到,我們應(yīng)該采用事務(wù)消息或持久化消息。
②消費者
消費者對消息的確認(rèn)有四種機制:
- AUTO_ACKNOWLEDGE=1:自動確認(rèn)
- CLIENT_ACKNOWLEDGE=2:客戶端手動確認(rèn)
- DUPS_OK_ACKNOWLEDGE=3:自動批量確認(rèn)
- SESSION_TRANSACTED=0:事務(wù)提交并確認(rèn)
ACK_MODE 描述了 Consumer 與 Broker 確認(rèn)消息的方式(時機),比如當(dāng)消息被 Consumer 接收之后,Consumer 將在何時確認(rèn)消息。
所以 ack_mode 描述的不是 Producer 與 Broker 之間的關(guān)系,而是 Customer 與 Broker 之間的關(guān)系。
對于 Broker 而言,只有接收到 ACK 指令,才會認(rèn)為消息被正確的接收或者處理成功了。通過 ACK,可以在 Consumer 與 Broker 之間建立一種簡單的“擔(dān)保”機制。
AUTO_ACKNOWLEDGE:自動確認(rèn),“同步”(receive)方法返回 message 給消息時會立即確認(rèn)。
在"異步"(messageListener)方式中,將會首先調(diào)用listener.onMessage(message)。
如果 onMessage 方法正常結(jié)束,消息將會正常確認(rèn);如果 onMessage 方法異常,將導(dǎo)致消費者要求 ActiveMQ 重發(fā)消息。
CLIENT_ACKNOWLEDGE:客戶端手動確認(rèn),這就意味著 AcitveMQ 將不會“自作主張”的為你 ACK 任何消息,開發(fā)者需要自己擇機確認(rèn)。
我們可以在當(dāng)前消息處理成功之后,立即調(diào)用 message.acknowledge() 方法來"逐個"確認(rèn)消息,這樣可以盡可能的減少因網(wǎng)絡(luò)故障而導(dǎo)致消息重發(fā)的個數(shù)。
當(dāng)然也可以處理多條消息之后,間歇性的調(diào)用 ACKNOWLEDGE 方法來一次確認(rèn)多條消息,減少 ACK 的次數(shù)來提升 Consumer 的效率,不過需要自行權(quán)衡。
DUPS_OK_ACKNOWLEDGE:類似于 AUTO_ACK 確認(rèn)機制,為自動批量確認(rèn)而生,而且具有“延遲”確認(rèn)的特點,ActiveMQ 會根據(jù)內(nèi)部算法,在收到一定數(shù)量的消息自動進(jìn)行確認(rèn)。
在此模式下,可能會出現(xiàn)重復(fù)消息,什么時候?當(dāng) Consumer 故障重啟后,那些尚未 ACK 的消息會重新發(fā)送過來。
SESSION_TRANSACTED:當(dāng) Session 使用事務(wù)時,就是使用此模式。當(dāng)決定事務(wù)中的消息可以確認(rèn)時,必須調(diào)用 session.commit() 方法,Commit 方法將會導(dǎo)致當(dāng)前 Session 的事務(wù)中所有消息立即被確認(rèn)。
在事務(wù)開始之后的任何時機調(diào)用 rollback(),意味著當(dāng)前事務(wù)的結(jié)束,事務(wù)中所有的消息都將被重發(fā)。當(dāng)然在 Commit 之前拋出異常,也會導(dǎo)致事務(wù)的 rollback。
RabbitMQ
①生產(chǎn)者弄丟了數(shù)據(jù)
生產(chǎn)者將數(shù)據(jù)發(fā)送到 RabbitMQ 的時候,可能數(shù)據(jù)就在半路給搞丟了,因為網(wǎng)絡(luò)啥的問題,都有可能。
此時可以選擇用 RabbitMQ 提供的事務(wù)功能,就是生產(chǎn)者發(fā)送數(shù)據(jù)之前開啟 RabbitMQ 事務(wù)(channel.txSelect),然后發(fā)送消息,如果消息沒有成功被 RabbitMQ 接收到,那么生產(chǎn)者會收到異常報錯。
此時就可以回滾事務(wù)(channel.txRollback),然后重試發(fā)送消息;如果收到了消息,那么可以提交事務(wù)(channel.txCommit)。
但是問題是,RabbitMQ 事務(wù)機制一搞,基本上吞吐量會下來,因為太耗性能。
所以一般來說,如果要確保 RabbitMQ 的消息別丟,可以開啟 Confirm 模式。
在生產(chǎn)者那里設(shè)置開啟 Confirm 模式之后,你每次寫的消息都會分配一個 id,然后如果寫入了 RabbitMQ 中,RabbitMQ 會給你回傳一個 ACK 消息,告訴你說這個消息 OK 了。
如果 RabbitMQ 沒能處理這個消息,會回調(diào)你一個 nack 接口,告訴你這個消息接收失敗,你可以重試。
而且你可以結(jié)合這個機制,自己在內(nèi)存里維護每個消息 id 的狀態(tài),如果超過一定時間還沒接收到這個消息的回調(diào),那么你可以重發(fā)。
事務(wù)機制和 Cnofirm 機制的不同在于:事務(wù)機制是同步的,你提交一個事務(wù)之后會阻塞在那兒。
但是 Confirm 機制是異步的,你發(fā)送個消息之后就可以發(fā)送下一個消息,然后那個消息 RabbitMQ 接收了之后會異步回調(diào)你一個接口通知你這個消息接收到了。
所以一般在生產(chǎn)者這塊避免數(shù)據(jù)丟失,都是用 Confirm 機制的。
②RabbitMQ 弄丟了數(shù)據(jù)
就是 RabbitMQ 自己弄丟了數(shù)據(jù),這個你必須開啟 RabbitMQ 的持久化,就是消息寫入之后會持久化到磁盤,哪怕是 RabbitMQ 自己掛了,恢復(fù)之后會自動讀取之前存儲的數(shù)據(jù),一般數(shù)據(jù)不會丟。
除非極其罕見的是,RabbitMQ 還沒持久化,自己就掛了,可能導(dǎo)致少量數(shù)據(jù)會丟失的,但是這個概率較小。
設(shè)置持久化有兩個步驟:
- 創(chuàng)建 queue 和交換器的時候?qū)⑵湓O(shè)置為持久化的,這樣就可以保證 RabbitMQ 持久化相關(guān)的元數(shù)據(jù),但是不會持久化 queue 里的數(shù)據(jù)。
- 發(fā)送消息的時候?qū)⑾⒌?deliveryMode 設(shè)置為 2,就是將消息設(shè)置為持久化的,此時 RabbitMQ 就會將消息持久化到磁盤上去。
必須要同時設(shè)置這兩個持久化才行,RabbitMQ 哪怕是掛了,再次重啟,也會從磁盤上重啟恢復(fù) queue,恢復(fù)這個 queue 里的數(shù)據(jù)。
而且持久化可以跟生產(chǎn)者那邊的 Confirm 機制配合起來,只有消息被持久化到磁盤之后,才會通知生產(chǎn)者 ACK 了。
所以哪怕是在持久化到磁盤之前,RabbitMQ 掛了,數(shù)據(jù)丟了,生產(chǎn)者收不到 ACK,你也是可以自己重發(fā)的。
哪怕是你給 RabbitMQ 開啟了持久化機制,也有一種可能,就是這個消息寫到了 RabbitMQ 中,但是還沒來得及持久化到磁盤上,結(jié)果不巧,此時 RabbitMQ 掛了,就會導(dǎo)致內(nèi)存里的一點點數(shù)據(jù)會丟失。
③消費端弄丟了數(shù)據(jù)
RabbitMQ 如果丟失了數(shù)據(jù),主要是因為你消費的時候,剛消費到,還沒處理,結(jié)果進(jìn)程掛了,比如重啟了,那么就尷尬了,RabbitMQ 認(rèn)為你都消費了,這數(shù)據(jù)就丟了。
這個時候得用 RabbitMQ 提供的 ACK 機制,簡單來說,就是你關(guān)閉 RabbitMQ 自動 ACK,可以通過一個 API 來調(diào)用就行,然后每次你自己代碼里確保處理完的時候,再程序里 ACK 一把。
這樣的話,如果你還沒處理完,不就沒有 ACK?那 RabbitMQ 就認(rèn)為你還沒處理完,這個時候 RabbitMQ 會把這個消費分配給別的 Consumer 去處理,消息是不會丟的。
Kafka
①消費端弄丟了數(shù)據(jù)
只有一個可能導(dǎo)致消費者弄丟數(shù)據(jù)的情況,就是說,你那個消費到了這個消息,然后消費者那邊自動提交了 Offset,讓 Kafka 以為你已經(jīng)消費好了這個消息。
其實你剛準(zhǔn)備處理這個消息,你還沒處理,你自己就掛了,此時這條消息就丟咯。
大家都知道 Kafka 會自動提交 Offset,那么只要關(guān)閉自動提交 Offset,在處理完之后自己手動提交 Offset,就可以保證數(shù)據(jù)不會丟。
但是此時確實還是會重復(fù)消費,比如你剛處理完,還沒提交 Offset,結(jié)果自己掛了,此時肯定會重復(fù)消費一次,自己保證冪等性就好了。
生產(chǎn)環(huán)境碰到的一個問題,就是說我們的 Kafka 消費者消費到了數(shù)據(jù)之后是寫到一個內(nèi)存的 queue 里先緩沖一下,結(jié)果有的時候,你剛把消息寫入內(nèi)存 queue,然后消費者會自動提交 Offset。
然后此時我們重啟了系統(tǒng),就會導(dǎo)致內(nèi)存 queue 里還沒來得及處理的數(shù)據(jù)就丟失了。
②Kafka 弄丟了數(shù)據(jù)
這塊比較常見的一個場景,就是 Kafka 某個 Broker 宕機,然后重新選舉 Partition 的 Leader 時。
大家想想,要是此時其他的 Follower 剛好還有些數(shù)據(jù)沒有同步,結(jié)果此時 Leader 掛了,然后選舉某個 Follower 成 Leader 之后,他不就少了一些數(shù)據(jù)?這就丟了一些數(shù)據(jù)啊。
所以此時一般是要求起碼設(shè)置如下四個參數(shù):
- 給這個 Topic 設(shè)置 replication.factor 參數(shù):這個值必須大于 1,要求每個 Partition 必須有至少 2 個副本。
- 在 Kafka 服務(wù)端設(shè)置 min.insync.replicas 參數(shù):這個值必須大于 1,這個是要求一個 Leader 至少感知到有至少一個 Follower 還跟自己保持聯(lián)系,沒掉隊,這樣才能確保 Leader 掛了還有一個 Follower 吧。
- 在 Producer 端設(shè)置 acks=all:這個是要求每條數(shù)據(jù),必須是寫入所有 Replica 之后,才能認(rèn)為是寫成功了。
- 在 Producer 端設(shè)置 retries=MAX(很大很大很大的一個值,反復(fù)重試的意思):這個是要求一旦寫入失敗,就循環(huán)重試,卡在這里了。
③生產(chǎn)者會不會弄丟數(shù)據(jù)
如果按照上述的思路設(shè)置了 ack=all,一定不會丟,要求是,你的 Leader 接收到消息,所有的 Follower 都同步到了消息之后,才認(rèn)為本次寫成功了。如果沒滿足這個條件,生產(chǎn)者會自動不斷的重試,重試無數(shù)次。
消息的順序性
從根本上說,異步消息是不應(yīng)該有順序依賴的,在 MQ 上估計是沒法解決。
要實現(xiàn)嚴(yán)格的順序消息,簡單且可行的辦法就是:保證生產(chǎn)者、MQServer、消費者是一對一對一的關(guān)系。
ActiveMQ
①通過高級特性 Consumer 獨有消費者(exclusive consumer)
- queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
- consumer = session.createConsumer(queue);
當(dāng)在接收信息的時候,有多個獨占消費者的時候,只有一個獨占消費者可以接收到消息。
獨占消息就是在有多個消費者同時消費一個 queue 時,可以保證只有一個消費者可以消費消息。
這樣雖然保證了消息的順序問題,不過也帶來了一個問題,就是這個 queue 的所有消息將只會在這一個主消費者上消費,其他消費者將閑置,達(dá)不到負(fù)載均衡分配。
而實際業(yè)務(wù)我們可能更多的是這樣的場景,比如一個訂單會發(fā)出一組順序消息,我們只要求這一組消息是順序消費的,而訂單與訂單之間又是可以并行消費的,不需要順序,因為順序也沒有任何意義。
有沒有辦法做到呢?可以利用 ActiveMQ 的另一個高級特性之 messageGroup。
②利用 ActiveMQ 的高級特性:Message Groups
Message Groups 特性是一種負(fù)載均衡的機制。在一個消息被分發(fā)到 Consumer 之前,Broker 首先檢查消息 JMSXGroupID 屬性。
如果存在,那么 Broker 會檢查是否有某個 Consumer 擁有這個 Message Group。
如果沒有,那么 Broker 會選擇一個 Consumer,并將它關(guān)聯(lián)到這個 Message Group。
此后,這個 Consumer 會接收這個 Message Group 的所有消息,直到 Consumer 被關(guān)閉。
Message Group 被關(guān)閉,通過發(fā)送一個消息,并設(shè)置這個消息的 JMSXGroupSeq 為 -1。
- bytesMessage.setStringProperty("JMSXGroupID", "constact-20100000002");
- bytesMessage.setIntProperty("JMSXGroupSeq", -1);
如上圖所示,同一個 queue 中,擁有相同 JMSXGroupID 的消息將發(fā)往同一個消費者,解決順序問題;不同分組的消息又能被其他消費者并行消費,解決負(fù)載均衡的問題。
RabbitMQ
如果有順序依賴的消息,要保證消息有一個 hashKey,類似于數(shù)據(jù)庫表分區(qū)的的分區(qū) key 列。保證對同一個 key 的消息發(fā)送到相同的隊列。
A 用戶產(chǎn)生的消息(包括創(chuàng)建消息和刪除消息)都按 A 的 hashKey 分發(fā)到同一個隊列。
只需要把強相關(guān)的兩條消息基于相同的路由就行了,也就是說經(jīng)過 m1 和 m2 的在路由表里的路由是一樣的,那自然 m1 會優(yōu)先于 m2 去投遞。而且一個 queue 只對應(yīng)一個 Consumer。
Kafka
一個 Topic,一個 Partition,一個 Consumer,內(nèi)部單線程消費。
如何解決消息隊列的延時以及過期失效問題?RabbitMQ 是可以設(shè)置過期時間的,就是 TTL。
如果消息在 queue 中積壓超過一定的時間,而又沒有設(shè)置死信隊列機制,就會被 RabbitMQ 給清理掉,這個數(shù)據(jù)就沒了。ActiveMQ 則通過更改配置,支持消息的定時發(fā)送。
有幾百萬消息持續(xù)積壓幾小時怎么解決?
發(fā)生了線上故障,幾千萬條數(shù)據(jù)在 MQ 里積壓很久。是修復(fù) Consumer 的問題,讓他恢復(fù)消費速度,然后等待幾個小時消費完畢?這是個解決方案,不過有時候我們還會進(jìn)行臨時緊急擴容。
一個消費者一秒是 1000 條,3 個消費者一秒是 3000 條,一分鐘是 18 萬條。
所以如果積壓了幾百萬到上千萬的數(shù)據(jù),即使消費者恢復(fù)了,也需要大概一小時的時間才能恢復(fù)過來。
一般這個時候,只能操作臨時緊急擴容了,具體操作步驟和思路如下:
- 先修復(fù) Consumer 的問題,確保其恢復(fù)消費速度,然后將現(xiàn)有 Consumer 都停掉。
- 新建一個 Topic,Partition 是原來的 10 倍,臨時建立好原先 10 倍或者 20 倍的 queue 數(shù)量。
然后寫一個臨時的分發(fā)數(shù)據(jù)的 Consumer 程序,這個程序部署上去消費積壓的數(shù)據(jù),消費之后不做耗時的處理,直接均勻輪詢寫入臨時建立好的 10 倍數(shù)量的 queue。
- 接著臨時征用 10 倍的機器來部署 Consumer,每一批 Consumer 消費一個臨時 queue 的數(shù)據(jù)。
- 這種做法相當(dāng)于是臨時將 queue 資源和 Consumer 資源擴大 10 倍,以正常的 10 倍速度來消費數(shù)據(jù)。
- 等快速消費完積壓數(shù)據(jù)之后,再恢復(fù)原先部署架構(gòu),重新用原先的 Consumer 機器來消費消息。
Kafka是如何實現(xiàn)高性能的?
①宏觀架構(gòu)層面利用 Partition 實現(xiàn)并行處理
Kafka 中每個 Topic 都包含一個或多個 Partition,不同 Partition 可位于不同節(jié)點。
同時 Partition 在物理上對應(yīng)一個本地文件夾,每個 Partition 包含一個或多個 Segment,每個 Segment 包含一個數(shù)據(jù)文件和一個與之對應(yīng)的索引文件。
在邏輯上,可以把一個 Partition 當(dāng)作一個非常長的數(shù)組,可通過這個“數(shù)組”的索引(Offset)去訪問其數(shù)據(jù)。
一方面,由于不同 Partition 可位于不同機器,因此可以充分利用集群優(yōu)勢,實現(xiàn)機器間的并行處理。
另一方面,由于 Partition 在物理上對應(yīng)一個文件夾,即使多個 Partition 位于同一個節(jié)點,也可通過配置讓同一節(jié)點上的不同 Partition 置于不同的 disk drive 上,從而實現(xiàn)磁盤間的并行處理,充分發(fā)揮多磁盤的優(yōu)勢。
利用多磁盤的具體方法是,將不同磁盤 mount 到不同目錄,然后在 server.properties 中,將 log.dirs 設(shè)置為多目錄(用逗號分隔)。
Kafka 會自動將所有 Partition 盡可能均勻分配到不同目錄也即不同目錄(也即不同 disk)上。
Partition 是最小并發(fā)粒度,Partition 個數(shù)決定了可能的并行度。
②ISR 實現(xiàn)可用性與數(shù)據(jù)一致性的動態(tài)平衡
常用數(shù)據(jù)復(fù)制及一致性方案有如下幾種:
Master-Slave:
- RDBMS 的讀寫分離即為典型的 Master-Slave 方案。
- 同步復(fù)制可保證強一致性但會影響可用性。
- 異步復(fù)制可提供高可用性但會降低一致性。
WNR:
- 主要用于去中心化的分布式系統(tǒng)中。
- N 代表總副本數(shù),W 代表每次寫操作要保證的最少寫成功的副本數(shù),R 代表每次讀至少要讀取的副本數(shù)。
- 當(dāng) W+R>N 時,可保證每次讀取的數(shù)據(jù)至少有一個副本擁有新的數(shù)據(jù)。
- 多個寫操作的順序難以保證,可能導(dǎo)致多副本間的寫操作順序不一致。Dynamo 通過向量時鐘保證最終一致性。
Paxos 及其變種:
- Google 的 Chubby,Zookeeper 的原子廣播協(xié)議(Zab),RAFT 等。
基于 ISR 的數(shù)據(jù)復(fù)制方案:Kafka 的數(shù)據(jù)復(fù)制是以 Partition 為單位的。而多個備份間的數(shù)據(jù)復(fù)制,通過 Follower 向 Leader 拉取數(shù)據(jù)完成。
從這一點來講,Kafka 的數(shù)據(jù)復(fù)制方案接近于上文所講的 Master-Slave 方案。
不同的是,Kafka 既不是完全的同步復(fù)制,也不是完全的異步復(fù)制,而是基于 ISR 的動態(tài)復(fù)制方案。
ISR,也即 In-Sync Replica。每個 Partition 的 Leader 都會維護這樣一個列表,該列表中,包含了所有與之同步的 Replica(包含 Leader 自己)。
每次數(shù)據(jù)寫入時,只有 ISR 中的所有 Replica 都復(fù)制完,Leader 才會將其置為 Commit,它才能被 Consumer 所消費。
這種方案,與同步復(fù)制非常接近。但不同的是,這個 ISR 是由 Leader 動態(tài)維護的。
如果 Follower 不能緊“跟上”Leader,它將被 Leader 從 ISR 中移除,待它又重新“跟上”Leader 后,會被 Leader 再次加到 ISR 中。每次改變 ISR 后,Leader 都會將新的 ISR 持久化到 Zookeeper 中。
由于 Leader 可移除不能及時與之同步的 Follower,故與同步復(fù)制相比可避免最慢的 Follower 拖慢整體速度,也即 ISR 提高了系統(tǒng)可用性。
ISR 中的所有 Follower 都包含了所有 Commit 過的消息,而只有 Commit 過的消息才會被 Consumer 消費。
故從 Consumer 的角度而言,ISR 中的所有 Replica 都始終處于同步狀態(tài),從而與異步復(fù)制方案相比提高了數(shù)據(jù)一致性。
ISR 可動態(tài)調(diào)整,極限情況下,可以只包含 Leader,極大提高了可容忍的宕機的 Follower 的數(shù)量。
與 Majority Quorum 方案相比,容忍相同個數(shù)的節(jié)點失敗,所要求的總節(jié)點數(shù)少了近一半。
③具體實現(xiàn)層面高效使用磁盤特性和操作系統(tǒng)特性
將寫磁盤的過程變?yōu)轫樞驅(qū)?/strong>
Kafka 的整個設(shè)計中,Partition 相當(dāng)于一個非常長的數(shù)組,而 Broker 接收到的所有消息順序?qū)懭脒@個大數(shù)組中。
同時 Consumer 通過 Offset 順序消費這些數(shù)據(jù),并且不刪除已經(jīng)消費的數(shù)據(jù),從而避免了隨機寫磁盤的過程。
由于磁盤有限,不可能保存所有數(shù)據(jù),實際上作為消息系統(tǒng) Kafka 也沒必要保存所有數(shù)據(jù),需要刪除舊的數(shù)據(jù)。
而這個刪除過程,并非通過使用“讀-寫”模式去修改文件,而是將 Partition 分為多個 Segment,每個 Segment 對應(yīng)一個物理文件,通過刪除整個文件的方式去刪除 Partition 內(nèi)的數(shù)據(jù)。
這種方式清除舊數(shù)據(jù)的方式,也避免了對文件的隨機寫操作。在存儲機制上,使用了 Log Structured Merge Trees(LSM) 。
注:Log Structured Merge Trees(LSM),谷歌 “BigTable” 的論文中提出,LSM 是當(dāng)前被用在許多產(chǎn)品的文件結(jié)構(gòu)策略:HBase,Cassandra,LevelDB,SQLite,Kafka。
LSM 被設(shè)計來提供比傳統(tǒng)的 B+ 樹或者 ISAM 更好的寫操作吞吐量,通過消去隨機的本地更新操作來達(dá)到這個目標(biāo)。
這個問題的本質(zhì)還是磁盤隨機操作慢,順序讀寫快。這兩種操作存在巨大的差距,無論是磁盤還是 SSD,而且快至少三個數(shù)量級。
充分利用 Page Cache
使用 Page Cache 的好處如下:
- I/O Scheduler 會將連續(xù)的小塊寫組裝成大塊的物理寫從而提高性能。
- I/O Scheduler 會嘗試將一些寫操作重新按順序排好,從而減少磁盤頭的移動時間。
- 充分利用所有空閑內(nèi)存(非 JVM 內(nèi)存)。如果使用應(yīng)用層 Cache(即 JVM 堆內(nèi)存),會增加 GC 負(fù)擔(dān)。
- 讀操作可直接在 Page Cache 內(nèi)進(jìn)行。如果消費和生產(chǎn)速度相當(dāng),甚至不需要通過物理磁盤(直接通過 Page Cache)交換數(shù)據(jù)。
- 如果進(jìn)程重啟,JVM 內(nèi)的 Cache 會失效,但 Page Cache 仍然可用。
Broker 收到數(shù)據(jù)后,寫磁盤時只是將數(shù)據(jù)寫入 Page Cache,并不保證數(shù)據(jù)一定完全寫入磁盤。
從這一點看,可能會造成機器宕機時,Page Cache 內(nèi)的數(shù)據(jù)未寫入磁盤從而造成數(shù)據(jù)丟失。
但是這種丟失只發(fā)生在機器斷電等造成操作系統(tǒng)不工作的場景,而這種場景完全可以由 Kafka 層面的 Replication 機制去解決。
如果為了保證這種情況下數(shù)據(jù)不丟失而強制將 Page Cache 中的數(shù)據(jù) Flush 到磁盤,反而會降低性能。
也正因如此,Kafka 雖然提供了 flush.messages 和 flush.ms 兩個參數(shù)將 Page Cache 中的數(shù)據(jù)強制 Flush 到磁盤,但是 Kafka 并不建議使用。
如果數(shù)據(jù)消費速度與生產(chǎn)速度相當(dāng),甚至不需要通過物理磁盤交換數(shù)據(jù),而是直接通過 Page Cache 交換數(shù)據(jù)。同時,F(xiàn)ollower 從 Leader Fetch 數(shù)據(jù)時,也可通過 Page Cache 完成。
注:Page Cache,又稱 pcache,其中文名稱為頁高速緩沖存儲器,簡稱頁高緩。
Page Cache 的大小為一頁,通常為 4K。在 Linux 讀寫文件時,它用于緩存文件的邏輯內(nèi)容,從而加快對磁盤上映像和數(shù)據(jù)的訪問。 這是 Linux 操作系統(tǒng)的一個特色。
支持多 Disk Drive
Broker 的 log.dirs 配置項,允許配置多個文件夾。如果機器上有多個 Disk Drive,可將不同的 Disk 掛載到不同的目錄,然后將這些目錄都配置到 log.dirs 里。
Kafka 會盡可能將不同的 Partition 分配到不同的目錄,也即不同的 Disk 上,從而充分利用了多 Disk 的優(yōu)勢。
零拷貝
Kafka 中存在大量的網(wǎng)絡(luò)數(shù)據(jù)持久化到磁盤(Producer 到 Broker)和磁盤文件通過網(wǎng)絡(luò)發(fā)送(Broker 到 Consumer)的過程。這一過程的性能直接影響 Kafka 的整體吞吐量。
傳統(tǒng)模式下的四次拷貝與四次上下文切換,以將磁盤文件通過網(wǎng)絡(luò)發(fā)送為例。
傳統(tǒng)模式下,一般使用如下偽代碼所示的方法先將文件數(shù)據(jù)讀入內(nèi)存,然后通過 Socket 將內(nèi)存中的數(shù)據(jù)發(fā)送出去。
- buffer = File.readSocket.send(buffer)
這一過程實際上發(fā)生了四次數(shù)據(jù)拷貝:
- 首先通過系統(tǒng)調(diào)用將文件數(shù)據(jù)讀入到內(nèi)核態(tài) Buffer(DMA 拷貝)。
- 然后應(yīng)用程序?qū)?nèi)存態(tài) Buffer 數(shù)據(jù)讀入到用戶態(tài) Buffer(CPU 拷貝)。
- 接著用戶程序通過 Socket 發(fā)送數(shù)據(jù)時將用戶態(tài) Buffer 數(shù)據(jù)拷貝到內(nèi)核態(tài) Buffer(CPU 拷貝)。
- 通過 DMA 拷貝將數(shù)據(jù)拷貝到 NIC Buffer。同時,還伴隨著四次上下文切換。
而 Linux 2.4+ 內(nèi)核通過 sendfile 系統(tǒng)調(diào)用,提供了零拷貝。數(shù)據(jù)通過 DMA 拷貝到內(nèi)核態(tài) Buffer 后,直接通過 DMA 拷貝到 NIC Buffer,無需 CPU 拷貝。這也是零拷貝這一說法的來源。
除了減少數(shù)據(jù)拷貝外,因為整個讀文件-網(wǎng)絡(luò)發(fā)送由一個 sendfile 調(diào)用完成,整個過程只有兩次上下文切換,因此大大提高了性能。
從具體實現(xiàn)來看,Kafka 的數(shù)據(jù)傳輸通過 Java NIO 的 FileChannel 的 transferTo 和 transferFrom 方法實現(xiàn)零拷貝。
注: transferTo 和 transferFrom 并不保證一定能使用零拷貝。實際上是否能使用零拷貝與操作系統(tǒng)相關(guān),如果操作系統(tǒng)提供 sendfile 這樣的零拷貝系統(tǒng)調(diào)用,則這兩個方法會通過這樣的系統(tǒng)調(diào)用充分利用零拷貝的優(yōu)勢,否則并不能通過這兩個方法本身實現(xiàn)零拷貝。
減少網(wǎng)絡(luò)開銷批處理
批處理是一種常用的用于提高 I/O 性能的方式。對 Kafka 而言,批處理既減少了網(wǎng)絡(luò)傳輸?shù)?Overhead,又提高了寫磁盤的效率。
Kafka 的 send 方法并非立即將消息發(fā)送出去,而是通過 batch.size 和 linger.ms 控制實際發(fā)送頻率,從而實現(xiàn)批量發(fā)送。
由于每次網(wǎng)絡(luò)傳輸,除了傳輸消息本身以外,還要傳輸非常多的網(wǎng)絡(luò)協(xié)議本身的一些內(nèi)容(稱為 Overhead),所以將多條消息合并到一起傳輸,可有效減少網(wǎng)絡(luò)傳輸?shù)?Overhead,進(jìn)而提高了傳輸效率。
數(shù)據(jù)壓縮降低網(wǎng)絡(luò)負(fù)載
Kafka 從 0.7 開始,即支持將數(shù)據(jù)壓縮后再傳輸給 Broker。除了可以將每條消息單獨壓縮然后傳輸外,Kafka 還支持在批量發(fā)送時,將整個 Batch 的消息一起壓縮后傳輸。
數(shù)據(jù)壓縮的一個基本原理是,重復(fù)數(shù)據(jù)越多壓縮效果越好。因此將整個 Batch 的數(shù)據(jù)一起壓縮能更大幅度減小數(shù)據(jù)量,從而更大程度提高網(wǎng)絡(luò)傳輸效率。
Broker 接收消息后,并不直接解壓縮,而是直接將消息以壓縮后的形式持久化到磁盤。Consumer Fetch 到數(shù)據(jù)后再解壓縮。
因此 Kafka 的壓縮不僅減少了 Producer 到 Broker 的網(wǎng)絡(luò)傳輸負(fù)載,同時也降低了 Broker 磁盤操作的負(fù)載,也降低了 Consumer 與 Broker 間的網(wǎng)絡(luò)傳輸量,從而極大得提高了傳輸效率,提高了吞吐量。
高效的序列化方式
Kafka 消息的 Key 和 Payload(或者說 Value)的類型可自定義,只需同時提供相應(yīng)的序列化器和反序列化器即可。
因此用戶可以通過使用快速且緊湊的序列化-反序列化方式(如 Avro,Protocal Buffer)來減少實際網(wǎng)絡(luò)傳輸和磁盤存儲的數(shù)據(jù)規(guī)模,從而提高吞吐率。
這里要注意,如果使用的序列化方法太慢,即使壓縮比非常高,最終的效率也不一定高。
【51CTO原創(chuàng)稿件,合作站點轉(zhuǎn)載請注明原文作者和出處為51CTO.com】