肝了很久的字節(jié)跳動消息隊列面經(jīng),我不信你能看完!!
作者個人研發(fā)的在高并發(fā)場景下,提供的簡單、穩(wěn)定、可擴展的延遲消息隊列框架,具有精準的定時任務和延遲隊列處理功能。自開源半年多以來,已成功為十幾家中小型企業(yè)提供了精準定時調(diào)度方案,經(jīng)受住了生產(chǎn)環(huán)境的考驗。為使更多童鞋受益,現(xiàn)給出開源框架地址:https://github.com/sunshinelyz/mykit-delay
寫在前面
又到了年底跳槽高峰季,很多小伙伴出去面試時,不少面試官都會問到消息隊列的問題,不少小伙伴回答的不是很完美,有些小伙伴是心里知道答案,嘴上卻沒有很好的表達出來,究其根本原因,還是對相關的知識點理解的不夠透徹。今天,我們就一起來探討下這個話題。注:文章有點長,你說你能一鼓作氣看完,我有點不信!!
文章已收錄到:
https://github.com/sunshinelyz/technology-binghe
https://gitee.com/binghe001/technology-binghe
什么是消息隊列?
消息隊列(Message Queue)是在消息的傳輸過程中保存消息的容器,是應用間的通信方式。消息發(fā)送后可以立即返回,由消息系統(tǒng)保證消息的可靠傳輸,消息發(fā)布者只管把消息寫到隊列里面而不用考慮誰需要消息,而消息的使用者也不需要知道誰發(fā)布的消息,只管到消息隊列里面取,這樣生產(chǎn)和消費便可以做到分離。
為什么要使用消息隊列?
優(yōu)點:
- 異步處理:例如短信通知、終端狀態(tài)推送、App推送、用戶注冊等
- 數(shù)據(jù)同步:業(yè)務數(shù)據(jù)推送同步
- 重試補償:記賬失敗重試
- 系統(tǒng)解耦:通訊上下行、終端異常監(jiān)控、分布式事件中心
- 流量消峰:秒殺場景下的下單處理
- 發(fā)布訂閱:HSF的服務狀態(tài)變化通知、分布式事件中心
- 高并發(fā)緩沖:日志服務、監(jiān)控上報
使用消息隊列比較核心的作用就是:解耦、異步、削峰。
缺點:
系統(tǒng)可用性降低 系統(tǒng)引入的外部依賴越多,越容易掛掉?如何保證消息隊列的高可用?
系統(tǒng)復雜度提高 怎么保證消息沒有重復消費?怎么處理消息丟失的情況?怎么保證消息傳遞的順序性?
一致性問題 A 系統(tǒng)處理完了直接返回成功了,人都以為你這個請求就成功了;但是問題是,要是 BCD 三個系統(tǒng)那里,BD 兩個系統(tǒng)寫庫成功了,結果 C 系統(tǒng)寫庫失敗了,咋整?你這數(shù)據(jù)就不一致了。
以下主要討論的RabbitMQ和Kafka兩種消息隊列。
如何保證消息隊列的高可用?
RabbitMQ的高可用
RabbitMQ的高可用是基于主從(非分布式)做高可用性。RabbitMQ 有三種模式:單機模式(Demo級別)、普通集群模式(無高可用性)、鏡像集群模式(高可用性)。
- 普通集群模式
普通集群模式,意思就是在多臺機器上啟動多個 RabbitMQ 實例,每個機器啟動一個。你創(chuàng)建的 queue,只會放在一個 RabbitMQ 實例上,但是每個實例都同步 queue 的元數(shù)據(jù)(元數(shù)據(jù)可以認為是 queue 的一些配置信息,通過元數(shù)據(jù),可以找到 queue 所在實例)。你消費的時候,實際上如果連接到了另外一個實例,那么那個實例會從 queue 所在實例上拉取數(shù)據(jù)過來。
這種方式確實很麻煩,也不怎么好,沒做到所謂的分布式,就是個普通集群。因為這導致你要么消費者每次隨機連接一個實例然后拉取數(shù)據(jù),要么固定連接那個 queue 所在實例消費數(shù)據(jù),前者有數(shù)據(jù)拉取的開銷,后者導致單實例性能瓶頸。
而且如果那個放 queue 的實例宕機了,會導致接下來其他實例就無法從那個實例拉取,如果你開啟了消息持久化,讓 RabbitMQ 落地存儲消息的話,消息不一定會丟,得等這個實例恢復了,然后才可以繼續(xù)從這個 queue 拉取數(shù)據(jù)。
所以這個事兒就比較尷尬了,這就沒有什么所謂的高可用性,這方案主要是提高吞吐量的,就是說讓集群中多個節(jié)點來服務某個 queue 的讀寫操作。
- 鏡像集群模式
這種模式,才是所謂的 RabbitMQ 的高可用模式。跟普通集群模式不一樣的是,在鏡像集群模式下,你創(chuàng)建的 queue,無論元數(shù)據(jù)還是 queue 里的消息都會存在于多個實例上,就是說,每個 RabbitMQ 節(jié)點都有這個 queue 的一個完整鏡像,包含 queue 的全部數(shù)據(jù)的意思。然后每次你寫消息到 queue 的時候,都會自動把消息同步到多個實例的 queue 上。
那么如何開啟這個鏡像集群模式呢?其實很簡單,RabbitMQ 有很好的管理控制臺,就是在后臺新增一個策略,這個策略是鏡像集群模式的策略,指定的時候是可以要求數(shù)據(jù)同步到所有節(jié)點的,也可以要求同步到指定數(shù)量的節(jié)點,再次創(chuàng)建 queue 的時候,應用這個策略,就會自動將數(shù)據(jù)同步到其他的節(jié)點上去了。
這樣的話,好處在于,你任何一個機器宕機了,沒事兒,其它機器(節(jié)點)還包含了這個 queue 的完整數(shù)據(jù),別的 consumer 都可以到其它節(jié)點上去消費數(shù)據(jù)。壞處在于,第一,這個性能開銷也太大了吧,消息需要同步到所有機器上,導致網(wǎng)絡帶寬壓力和消耗很重!第二,這么玩兒,不是分布式的,就沒有擴展性可言了,如果某個 queue 負載很重,你加機器,新增的機器也包含了這個 queue 的所有數(shù)據(jù),并沒有辦法線性擴展你的 queue。你想,如果這個 queue 的數(shù)據(jù)量很大,大到這個機器上的容量無法容納了,此時該怎么辦呢?
Kafka的高可用
Kafka 一個最基本的架構認識:由多個 broker 組成,每個 broker 是一個節(jié)點;你創(chuàng)建一個 topic,這個 topic 可以劃分為多個 partition,每個 partition 可以存在于不同的 broker 上,每個 partition 就放一部分數(shù)據(jù)。
這就是天然的分布式消息隊列,就是說一個 topic 的數(shù)據(jù),是分散放在多個機器上的,每個機器就放一部分數(shù)據(jù)。
實際上 RabbmitMQ 之類的,并不是分布式消息隊列,它就是傳統(tǒng)的消息隊列,只不過提供了一些集群、HA(High Availability, 高可用性) 的機制而已,因為無論怎么玩兒,RabbitMQ 一個 queue 的數(shù)據(jù)都是放在一個節(jié)點里的,鏡像集群下,也是每個節(jié)點都放這個 queue 的完整數(shù)據(jù)。
Kafka 0.8 以前,是沒有 HA 機制的,就是任何一個 broker 宕機了,那個 broker 上的 partition 就廢了,沒法寫也沒法讀,沒有什么高可用性可言。
比如說,我們假設創(chuàng)建了一個 topic,指定其 partition 數(shù)量是 3 個,分別在三臺機器上。但是,如果第二臺機器宕機了,會導致這個 topic 的 1/3 的數(shù)據(jù)就丟了,因此這個是做不到高可用的。
Kafka 0.8 以后,提供了 HA 機制,就是 replica(復制品) 副本機制。每個 partition 的數(shù)據(jù)都會同步到其它機器上,形成自己的多個 replica 副本。所有 replica 會選舉一個 leader 出來,那么生產(chǎn)和消費都跟這個 leader 打交道,然后其他 replica 就是 follower。寫的時候,leader 會負責把數(shù)據(jù)同步到所有 follower 上去,讀的時候就直接讀 leader 上的數(shù)據(jù)即可。只能讀寫 leader?很簡單,要是你可以隨意讀寫每個 follower,那么就要 care 數(shù)據(jù)一致性的問題,系統(tǒng)復雜度太高,很容易出問題。Kafka 會均勻地將一個 partition 的所有 replica 分布在不同的機器上,這樣才可以提高容錯性。
這么搞,就有所謂的高可用性了,因為如果某個 broker 宕機了,沒事兒,那個 broker上面的 partition 在其他機器上都有副本的。如果這個宕機的 broker 上面有某個 partition 的 leader,那么此時會從 follower 中重新選舉一個新的 leader 出來,大家繼續(xù)讀寫那個新的 leader 即可。這就有所謂的高可用性了。
寫數(shù)據(jù)的時候,生產(chǎn)者就寫 leader,然后 leader 將數(shù)據(jù)落地寫本地磁盤,接著其他 follower 自己主動從 leader 來 pull 數(shù)據(jù)。一旦所有 follower 同步好數(shù)據(jù)了,就會發(fā)送 ack 給 leader,leader 收到所有 follower 的 ack 之后,就會返回寫成功的消息給生產(chǎn)者。(當然,這只是其中一種模式,還可以適當調(diào)整這個行為)
消費的時候,只會從 leader 去讀,但是只有當一個消息已經(jīng)被所有 follower 都同步成功返回 ack 的時候,這個消息才會被消費者讀到。
如何保證消息不重復消費(冪等性)?
首先,所有的消息隊列都會有這樣重復消費的問題,因為這是不MQ來保證,而是我們自己開發(fā)保證的,我們使用Kakfa來討論是如何實現(xiàn)的。
Kakfa有個offset的概念,就是每個消息寫進去都會有一個offset值,代表消費的序號,然后consumer消費了數(shù)據(jù)之后,默認每隔一段時間會把自己消費過的消息的offset值提交,表示我已經(jīng)消費過了,下次要是我重啟啥的,就讓我從當前提交的offset處來繼續(xù)消費。
但是凡事總有意外,比如我們之前生產(chǎn)經(jīng)常遇到的,就是你有時候重啟系統(tǒng),看你怎么重啟了,如果碰到點著急的,直接 kill 進程了,再重啟。這會導致 consumer 有些消息處理了,但是沒來得及提交 offset,尷尬了。重啟之后,少數(shù)消息會再次消費一次。
其實重復消費不可怕,可怕的是你沒考慮到重復消費之后,怎么保證冪等性。
舉個例子吧。假設你有個系統(tǒng),消費一條消息就往數(shù)據(jù)庫里插入一條數(shù)據(jù),要是你一個消息重復兩次,你不就插入了兩條,這數(shù)據(jù)不就錯了?但是你要是消費到第二次的時候,自己判斷一下是否已經(jīng)消費過了,若是就直接扔了,這樣不就保留了一條數(shù)據(jù),從而保證了數(shù)據(jù)的正確性。一條數(shù)據(jù)重復出現(xiàn)兩次,數(shù)據(jù)庫里就只有一條數(shù)據(jù),這就保證了系統(tǒng)的冪等性。冪等性,通俗點說,就一個數(shù)據(jù),或者一個請求,給你重復來多次,你得確保對應的數(shù)據(jù)是不會改變的,不能出錯。
所以第二個問題來了,怎么保證消息隊列消費的冪等性?
其實還是得結合業(yè)務來思考,我這里給幾個思路:
- 比如你拿個數(shù)據(jù)要寫庫,你先根據(jù)主鍵查一下,如果這數(shù)據(jù)都有了,你就別插入了,update 一下好吧。
- 比如你是寫 Redis,那沒問題了,反正每次都是 set,天然冪等性。
- 比如你不是上面兩個場景,那做的稍微復雜一點,你需要讓生產(chǎn)者發(fā)送每條數(shù)據(jù)的時候,里面加一個全局唯一的 id,類似訂單 id 之類的東西,然后你這里消費到了之后,先根據(jù)這個 id 去比如 Redis 里查一下,之前消費過嗎?如果沒有消費過,你就處理,然后這個 id 寫 Redis。如果消費過了,那你就別處理了,保證別重復處理相同的消息即可。
- 比如基于數(shù)據(jù)庫的唯一鍵來保證重復數(shù)據(jù)不會重復插入多條。因為有唯一鍵約束了,重復數(shù)據(jù)插入只會報錯,不會導致數(shù)據(jù)庫中出現(xiàn)臟數(shù)據(jù)。
當然,如何保證 MQ 的消費是冪等性的,需要結合具體的業(yè)務來看。
如何保證消息的可靠傳輸(不丟失)?
這個是肯定的,MQ的基本原則就是數(shù)據(jù)不能多一條,也不能少一條,不能多其實就是我們前面重復消費的問題。不能少,就是數(shù)據(jù)不能丟,像計費,扣費的一些信息,是肯定不能丟失的。
數(shù)據(jù)的丟失問題,可能出現(xiàn)在生產(chǎn)者、MQ、消費者中,咱們從 RabbitMQ 和 Kafka 分別來分析一下吧。
RabbitMQ如何保證消息的可靠
生產(chǎn)者丟數(shù)據(jù)
生產(chǎn)者將數(shù)據(jù)發(fā)送到 RabbitMQ 的時候,可能數(shù)據(jù)就在半路給搞丟了,因為網(wǎng)絡問題啥的,都有可能。
此時可以選擇用 RabbitMQ 提供的事務功能,就是生產(chǎn)者發(fā)送數(shù)據(jù)之前開啟 RabbitMQ 事務channel.txSelect,然后發(fā)送消息,如果消息沒有成功被 RabbitMQ 接收到,那么生產(chǎn)者會收到異常報錯,此時就可以回滾事務channel.txRollback,然后重試發(fā)送消息;如果收到了消息,那么可以提交事務channel.txCommit。
- // 開啟事務
- channel.txSelect
- try {
- // 這里發(fā)送消息
- } catch (Exception e) {
- channel.txRollback
- // 這里再次重發(fā)這條消息
- }
- // 提交事務
- channel.txCommit
但是問題是,RabbitMQ 事務機制(同步)一搞,基本上吞吐量會下來,因為太耗性能。
所以一般來說,如果你要確保說寫 RabbitMQ 的消息別丟,可以開啟 confirm 模式,在生產(chǎn)者那里設置開啟 confirm 模式之后,你每次寫的消息都會分配一個唯一的 id,然后如果寫入了 RabbitMQ 中,RabbitMQ 會給你回傳一個 ack 消息,告訴你說這個消息 ok 了。如果 RabbitMQ 沒能處理這個消息,會回調(diào)你的一個 nack 接口,告訴你這個消息接收失敗,你可以重試。而且你可以結合這個機制自己在內(nèi)存里維護每個消息 id 的狀態(tài),如果超過一定時間還沒接收到這個消息的回調(diào),那么你可以重發(fā)。
事務機制和 confirm 機制最大的不同在于,事務機制是同步的,你提交一個事務之后會阻塞在那兒,但是 confirm 機制是異步的,你發(fā)送個消息之后就可以發(fā)送下一個消息,然后那個消息 RabbitMQ 接收了之后會異步回調(diào)你的一個接口通知你這個消息接收到了。
所以一般在生產(chǎn)者這塊避免數(shù)據(jù)丟失,都是用 confirm 機制的。
RabbitMQ丟數(shù)據(jù)
就是 RabbitMQ 自己弄丟了數(shù)據(jù),這個你必須開啟 RabbitMQ 的持久化,就是消息寫入之后會持久化到磁盤,哪怕是 RabbitMQ 自己掛了,恢復之后會自動讀取之前存儲的數(shù)據(jù),一般數(shù)據(jù)不會丟。除非極其罕見的是,RabbitMQ 還沒持久化,自己就掛了,可能導致少量數(shù)據(jù)丟失,但是這個概率較小。
設置持久化有兩個步驟:
- 創(chuàng)建 queue 的時候?qū)⑵湓O置為持久化 這樣就可以保證 RabbitMQ 持久化 queue 的元數(shù)據(jù),但是它是不會持久化 queue 里的數(shù)據(jù)的。
- 第二個是發(fā)送消息的時候?qū)⑾⒌?deliveryMode 設置為 2 就是將消息設置為持久化的,此時 RabbitMQ 就會將消息持久化到磁盤上去。
必須要同時設置這兩個持久化才行,RabbitMQ 哪怕是掛了,再次重啟,也會從磁盤上重啟恢復 queue,恢復這個 queue 里的數(shù)據(jù)。
注意,哪怕是你給 RabbitMQ 開啟了持久化機制,也有一種可能,就是這個消息寫到了 RabbitMQ 中,但是還沒來得及持久化到磁盤上,結果不巧,此時 RabbitMQ 掛了,就會導致內(nèi)存里的一點點數(shù)據(jù)丟失。
所以,持久化可以跟生產(chǎn)者那邊的 confirm 機制配合起來,只有消息被持久化到磁盤之后,才會通知生產(chǎn)者 ack 了,所以哪怕是在持久化到磁盤之前,RabbitMQ 掛了,數(shù)據(jù)丟了,生產(chǎn)者收不到 ack,你也是可以自己重發(fā)的。
消費者丟數(shù)據(jù)
RabbitMQ 如果丟失了數(shù)據(jù),主要是因為你消費的時候,剛消費到,還沒處理,結果進程掛了,比如重啟了,那么就尷尬了,RabbitMQ 認為你都消費了,這數(shù)據(jù)就丟了。
這個時候得用 RabbitMQ 提供的 ack 機制,簡單來說,就是你必須關閉 RabbitMQ 的自動ack,可以通過一個 api 來調(diào)用就行,然后每次你自己代碼里確保處理完的時候,再在程序里ack 一把。這樣的話,如果你還沒處理完,不就沒有 ack 了?那 RabbitMQ 就認為你還沒處理完,這個時候 RabbitMQ 會把這個消費分配給別的 consumer 去處理,消息是不會丟的。
Kakfa如何保證消息的可靠
- 消費者丟數(shù)據(jù)
唯一可能導致消費者弄丟數(shù)據(jù)的情況,就是說,你消費到了這個消息,然后消費者那邊自動提交了 offset,讓 Kafka 以為你已經(jīng)消費好了這個消息,但其實你才剛準備處理這個消息,你還沒處理,你自己就掛了,此時這條消息就丟咯。
這不是跟 RabbitMQ 差不多嗎,大家都知道 Kafka 會自動提交 offset,那么只要關閉自動提交 offset,在處理完之后自己手動提交 offset,就可以保證數(shù)據(jù)不會丟。但是此時確實還是可能會有重復消費,比如你剛處理完,還沒提交 offset,結果自己掛了,此時肯定會重復消費一次,自己保證冪等性就好了。
生產(chǎn)環(huán)境碰到的一個問題,就是說我們的 Kafka 消費者消費到了數(shù)據(jù)之后是寫到一個內(nèi)存的 queue 里先緩沖一下,結果有的時候,你剛把消息寫入內(nèi)存 queue,然后消費者會自動提交 offset。然后此時我們重啟了系統(tǒng),就會導致內(nèi)存 queue 里還沒來得及處理的數(shù)據(jù)就丟失了。
- Kafka丟數(shù)據(jù)
這塊比較常見的一個場景,就是 Kafka 某個 broker 宕機,然后重新選舉 partition 的 leader。大家想想,要是此時其他的 follower 剛好還有些數(shù)據(jù)沒有同步,結果此時 leader 掛了,然后選舉某個 follower 成 leader 之后,不就少了一些數(shù)據(jù)?這就丟了一些數(shù)據(jù)啊。
生產(chǎn)環(huán)境也遇到過,我們也是,之前 Kafka 的 leader 機器宕機了,將 follower 切換為 leader 之后,就會發(fā)現(xiàn)說這個數(shù)據(jù)就丟了。
所以此時一般是要求起碼設置如下 4 個參數(shù):
我們生產(chǎn)環(huán)境就是按照上述要求配置的,這樣配置之后,至少在 Kafka broker 端就可以保證在 leader 所在 broker 發(fā)生故障,進行 leader 切換時,數(shù)據(jù)不會丟失。
- 給 topic 設置 replication.factor 參數(shù):這個值必須大于 1,要求每個 partition 必須有至少 2 個副本。
- 在 Kafka 服務端設置 min.insync.replicas 參數(shù):這個值必須大于 1,這個是要求一個 leader 至少感知到有至少一個 follower 還跟自己保持聯(lián)系,沒掉隊,這樣才能確保 leader 掛了還有一個 follower 吧。
- 在 producer 端設置 acks=all:這個是要求每條數(shù)據(jù),必須是寫入所有 replica 之后,才能認為是寫成功了。
- 在 producer 端設置 retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這里了。
- 生產(chǎn)者丟數(shù)據(jù)
如果按照上述的思路設置了 acks=all,一定不會丟,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才認為本次寫成功了。如果沒滿足這個條件,生產(chǎn)者會自動不斷的重試,重試無限次。
如何保證消息的順序性?
我舉個例子,我們以前做過一個 mysql binlog 同步的系統(tǒng),壓力還是非常大的,日同步數(shù)據(jù)要達到上億,就是說數(shù)據(jù)從一個 mysql 庫原封不動地同步到另一個 mysql 庫里面去(mysql -> mysql)。常見的一點在于說比如大數(shù)據(jù) team,就需要同步一個 mysql 庫過來,對公司的業(yè)務系統(tǒng)的數(shù)據(jù)做各種復雜的操作。
你在 mysql 里增刪改一條數(shù)據(jù),對應出來了增刪改 3 條 binlog 日志,接著這三條 binlog發(fā)送到 MQ 里面,再消費出來依次執(zhí)行,起碼得保證人家是按照順序來的吧?不然本來是:增加、修改、刪除;你楞是換了順序給執(zhí)行成刪除、修改、增加,不全錯了么。
本來這個數(shù)據(jù)同步過來,應該最后這個數(shù)據(jù)被刪除了;結果你搞錯了這個順序,最后這個數(shù)據(jù)保留下來了,數(shù)據(jù)同步就出錯了。
先看看順序會錯亂的倆場景:
- RabbitMQ:一個 queue,多個 consumer。比如,生產(chǎn)者向 RabbitMQ 里發(fā)送了三條數(shù)據(jù),順序依次是 data1/data2/data3,壓入的是 RabbitMQ 的一個內(nèi)存隊列。有三個消費者分別從 MQ 中消費這三條數(shù)據(jù)中的一條,結果消費者2先執(zhí)行完操作,把 data2 存入數(shù)據(jù)庫,然后是 data1/data3。這不明顯亂了。
- Kafka:比如說我們建了一個 topic,有三個 partition。生產(chǎn)者在寫的時候,其實可以指定一個 key,比如說我們指定了某個訂單 id 作為 key,那么這個訂單相關的數(shù)據(jù),一定會被分發(fā)到同一個 partition 中去,而且這個 partition 中的數(shù)據(jù)一定是有順序的。消費者從 partition 中取出來數(shù)據(jù)的時候,也一定是有順序的。到這里,順序還是 ok 的,沒有錯亂。接著,我們在消費者里可能會搞多個線程來并發(fā)處理消息。因為如果消費者是單線程消費處理,而處理比較耗時的話,比如處理一條消息耗時幾十 ms,那么 1 秒鐘只能處理幾十條消息,這吞吐量太低了。而多個線程并發(fā)跑的話,順序可能就亂掉了。
RabbitMQ解決方案
拆分多個 queue,每個 queue 一個 consumer,就是多一些 queue 而已,確實是麻煩點;或者就一個 queue 但是對應一個 consumer,然后這個 consumer 內(nèi)部用內(nèi)存隊列做排隊,然后分發(fā)給底層不同的 worker 來處理。
Kafka解決方案
- 一個 topic,一個 partition,一個 consumer,內(nèi)部單線程消費,單線程吞吐量太低,一般不會用這個。
- 寫 N 個內(nèi)存 queue,具有相同 key 的數(shù)據(jù)都到同一個內(nèi)存 queue;然后對于 N 個線程,每個線程分別消費一個內(nèi)存 queue 即可,這樣就能保證順序性。
如何處理消息推積?
大量消息在 mq 里積壓了幾個小時了還沒解決
一個消費者一秒是 1000 條,一秒 3 個消費者是 3000 條,一分鐘就是 18 萬條。所以如果你積壓了幾百萬到上千萬的數(shù)據(jù),即使消費者恢復了,也需要大概 1 小時的時間才能恢復過來。
一般這個時候,只能臨時緊急擴容了,具體操作步驟和思路如下:
- 先修復 consumer 的問題,確保其恢復消費速度,然后將現(xiàn)有 consumer 都停掉。
- 新建一個 topic,partition 是原來的 10 倍,臨時建立好原先 10 倍的 queue 數(shù)量。
- 然后寫一個臨時的分發(fā)數(shù)據(jù)的 consumer 程序,這個程序部署上去消費積壓的數(shù)據(jù),消費之后不做耗時的處理,直接均勻輪詢寫入臨時建立好的 10 倍數(shù)量的 queue。
- 接著臨時征用 10 倍的機器來部署 consumer,每一批 consumer 消費一個臨時 queue 的數(shù)據(jù)。這種做法相當于是臨時將 queue 資源和 consumer 資源擴大 10 倍,以正常的 10 倍速度來消費數(shù)據(jù)。
- 等快速消費完積壓數(shù)據(jù)之后,得恢復原先部署的架構,重新用原先的 consumer 機器來消費消息。
mq 中的消息過期失效了
假設你用的是 RabbitMQ,RabbtiMQ 是可以設置過期時間的,也就是 TTL。如果消息在 queue 中積壓超過一定的時間就會被 RabbitMQ 給清理掉,這個數(shù)據(jù)就沒了。那這就是第二個坑了。這就不是說數(shù)據(jù)會大量積壓在 mq 里,而是大量的數(shù)據(jù)會直接搞丟。
這個情況下,就不是說要增加 consumer 消費積壓的消息,因為實際上沒啥積壓,而是丟了大量的消息。我們可以采取一個方案,就是批量重導,這個我們之前線上也有類似的場景干過。就是大量積壓的時候,我們當時就直接丟棄數(shù)據(jù)了,然后等過了高峰期以后,比如大家一起喝咖啡熬夜到晚上12點以后,用戶都睡覺了。這個時候我們就開始寫程序,將丟失的那批數(shù)據(jù),寫個臨時程序,一點一點的查出來,然后重新灌入 mq 里面去,把白天丟的數(shù)據(jù)給他補回來。也只能是這樣了。
假設 1 萬個訂單積壓在 mq 里面,沒有處理,其中 1000 個訂單都丟了,你只能手動寫程序把那 1000 個訂單給查出來,手動發(fā)到 mq 里去再補一次。
mq 都快寫滿了
如果消息積壓在 mq 里,你很長時間都沒有處理掉,此時導致 mq 都快寫滿了,咋辦?這個還有別的辦法嗎?沒有,誰讓你第一個方案執(zhí)行的太慢了,你臨時寫程序,接入數(shù)據(jù)來消費,消費一個丟棄一個,都不要了,快速消費掉所有的消息。然后走第二個方案,到了晚上再補數(shù)據(jù)吧。
參考資料:
- Kafa深度解析
- RabbitMQ源碼解析
本文轉載自微信公眾號「冰河技術」,可以通過以下二維碼關注。轉載本文請聯(lián)系冰河技術公眾號。