我怎么不知道RocketMQ生產(chǎn)者有這么多用法?
本文轉(zhuǎn)載自微信公眾號(hào)「大魚仙人」,作者大魚。轉(zhuǎn)載本文請(qǐng)聯(lián)系大魚仙人公眾號(hào)。
前言
消息隊(duì)列RocketMQ版是阿里云基于Apache RocketMQ構(gòu)建的低延遲、高并發(fā)、高可用、高可靠的分布式消息中間件。
看過我之前幾篇文章的應(yīng)該都大概隊(duì)消息隊(duì)列有個(gè)概念了,都明白了,那這個(gè)消息從何而來呢?
所謂黃河之水天上來,大自然間每一個(gè)事物都不是平白無故來的吧?????怎么來的,????它母親生產(chǎn)的;香奈兒????怎么來的,機(jī)器加原料生產(chǎn)的;就連平時(shí)吃的大米,也是有出處的;咱們是怎么來的,咱們當(dāng)然是偉大的母親生產(chǎn)下來的了
順便感謝一下偉大的母親,周日記得給她打個(gè)電話哦
下面進(jìn)入主題,這是分割線
消息隊(duì)列RocketMQ版既可為分布式應(yīng)用系統(tǒng)提供異步解耦和削峰填谷的能力,同時(shí)也具備互聯(lián)網(wǎng)應(yīng)用所需的海量消息堆積、高吞吐、可靠重試等特性。下面列舉了一些特點(diǎn)
- 消息查詢:消息隊(duì)列RocketMQ版提供了三種消息查詢的方式,分別是按Message ID、Message Key以及Topic查詢
- 查詢消息軌跡:通過消息軌跡,能清晰定位消息從生產(chǎn)者發(fā)出,經(jīng)由消息隊(duì)列RocketMQ版服務(wù)端,投遞給消息消費(fèi)者的完整鏈路,方便定位排查問題
- 集群消費(fèi)和廣播消費(fèi):當(dāng)使用集群消費(fèi)模式時(shí),消息隊(duì)列RocketMQ版認(rèn)為任意一條消息只需要被消費(fèi)者集群內(nèi)的任意一個(gè)消費(fèi)者處理即可;當(dāng)使用廣播消費(fèi)模式時(shí),消息隊(duì)列RocketMQ版會(huì)將每條消息推送給消費(fèi)者集群內(nèi)所有注冊(cè)過的消費(fèi)者,保證消息至少被每臺(tái)機(jī)器消費(fèi)一次
- 重置消費(fèi)位點(diǎn):根據(jù)時(shí)間或位點(diǎn)重置消費(fèi)進(jìn)度,允許用戶進(jìn)行消息回溯或者丟棄堆積消息
- 死信隊(duì)列:將無法正常消費(fèi)的消息儲(chǔ)存到特殊的死信隊(duì)列供后續(xù)處理
- 全球信息路由:用于全球不同地域之間的消息同步,保證地域之間的數(shù)據(jù)一致性
客戶端,其實(shí)很容易理解了,我們可以把RocketMQ理解成一個(gè)消息服務(wù),既然是一個(gè)服務(wù),我們就需要調(diào)用這個(gè)服務(wù),那么調(diào)用這個(gè)服務(wù)的時(shí)候,這個(gè)消息從哪里來,這個(gè)就是要根據(jù)業(yè)務(wù)場(chǎng)景來定了,所以啊,消息的生產(chǎn)者Producer屬于一個(gè)客戶端;消息產(chǎn)生了,總不能一直放著吧,總要有人處理掉這些消息吧,這也是業(yè)務(wù)決定的,所以消息的消費(fèi)者consumer也是屬于客戶端。
下面啊,大魚就帶著大家一起來看看這客戶端的用處
生產(chǎn)者Producer
生產(chǎn)者Producer,顧名思義,就是負(fù)責(zé)生產(chǎn)消息的,此時(shí)大家應(yīng)該腦子有很多問號(hào)才對(duì),比如Producer發(fā)消息發(fā)到哪里了,流程是怎么樣的,發(fā)的消息都是什么類型的等等這些,這些問題搞懂了的話,Producer這個(gè)客戶端基本就搞定了
魚魚教大家一個(gè)小技巧,學(xué)習(xí)一個(gè)東西,先搞懂大體流程,再拆分而細(xì)攻之,最后再統(tǒng)籌理解,這樣效果會(huì)很好,獨(dú)家秘方
接下來我從消息是如何發(fā)送的(負(fù)載均衡、容錯(cuò)機(jī)制)、消息發(fā)給誰和存儲(chǔ)到哪里、消息的類型三方面來介紹Producer
1、消息是如何發(fā)送的?
首先,消息總不能產(chǎn)生了哪里也不去吧,那產(chǎn)生這個(gè)消息就沒有任何意義了,所以這個(gè)消息總要發(fā)送到一個(gè)地方去,接力傳遞,看下面這個(gè)圖
Producer會(huì)首先從本地緩存中獲取到指定的Topic,如果找到就直接根據(jù)這個(gè)Topic發(fā)送產(chǎn)生的消息,緩存大家都明白啊,就是為了優(yōu)化速度,減少網(wǎng)絡(luò)傳輸。
沒有的話,就要去NameServer獲取最新的Topic列表(這個(gè)是Broker啟動(dòng)的時(shí)候注冊(cè)到NameServer上的),通過一定的策略選擇一個(gè)MessageQueue隊(duì)列,獲取這個(gè)mq所在的Broker地址,也是先從本地緩存中獲取,如果獲取不到則請(qǐng)求NameServer獲取(NameServer中也同樣注冊(cè)了Broker地址和Topic的映射關(guān)系),進(jìn)行發(fā)送消息
發(fā)送失敗的話,會(huì)有重試機(jī)制,默認(rèn)是重試三次
其實(shí)保存這么多,既能減少和NameServer之間的網(wǎng)絡(luò)傳輸,又能減小NameServer的壓力,NameServer本身就是屬于輕量級(jí)的設(shè)計(jì),這樣也有利于減輕NameServer的壓力,NameServer我也會(huì)單獨(dú)寫一篇來介紹
負(fù)載均衡
我們知道消息發(fā)送的時(shí)候會(huì)首先選擇一個(gè)對(duì)應(yīng)的Topic,每個(gè)Topic會(huì)對(duì)應(yīng)多個(gè)MessageQueue,這樣就有一個(gè)問題,發(fā)消息的時(shí)候要是做不到雨露均沾,可能就會(huì)有的隊(duì)列多,有的隊(duì)列少這樣的問題,就會(huì)造成資源的浪費(fèi)
RocketMQ采用了樸素的方式,沒錯(cuò),就是輪詢,高端的食材往往只需要最樸素的烹飪方式~
生產(chǎn)者通過輪詢某個(gè) Topic 下的所有 MessageQueue 的方式來實(shí)現(xiàn)發(fā)送方的負(fù)載均衡,簡(jiǎn)單來說就是人人都有份,如下圖:
通過這種方式,可以將一個(gè) Topic 的消息分散到多個(gè) MessageQueue 上,進(jìn)而分散到多個(gè) Broker 上。
發(fā)送消息的容錯(cuò)機(jī)制:
Producer 作為發(fā)送消息的一方,有3種容錯(cuò)機(jī)制:
- 本地緩存:把從 NameSever 獲取的信息緩存到本地,以防 NameSever 宕機(jī)
- 不可用Broker集合:Producer有一個(gè) Broker 的容錯(cuò)機(jī)制,開關(guān)sendLatencyFaultEnable可以開啟,RocketMq內(nèi)部會(huì)維護(hù)一個(gè)故障Broker的HashMap,把一定延遲級(jí)別的Broker放入這個(gè)map,下次選擇Broker的時(shí)候,就會(huì)規(guī)避不可用的Broker。
- 重試:Producer發(fā)送消息時(shí),有一個(gè)重試機(jī)制,默認(rèn)重試3次。死信隊(duì)列 Consumer消費(fèi)重試超過指定次數(shù),進(jìn)入死信隊(duì)列
通過這種方式,可以將一個(gè) Topic 的消息分散到多個(gè) MessageQueue 上,進(jìn)而分散到多個(gè) Broker 上。
2、消息發(fā)給誰和存儲(chǔ)在哪里?
Producer連接NameSever
Producer 通過 NameSever 獲取指定 Topic 的 Broker 路由信息,并在本地保存一份緩存數(shù)據(jù),比如一個(gè)Topic有哪些 MessageQueue,MessageQueue 在哪幾臺(tái) Broker 上,Broker 的ip.port等等。Producer 發(fā)送消息只發(fā)到 Master Broker上,Slave 通過主從同步獲取數(shù)據(jù)。
那么 Produce 是怎么連接NameSever 的呢
- 連接:?jiǎn)蝹€(gè)生產(chǎn)者者和一臺(tái) Nameserver 保持長(zhǎng)連接,定時(shí)查詢topic配置信息,如果該nameserver掛掉,生產(chǎn)者會(huì)自動(dòng)連接下一個(gè)nameserver,直到有可用連接為止,并能自動(dòng)重連。
- 輪詢時(shí)間:默認(rèn)情況下,生產(chǎn)者每隔30秒從nameserver獲取所有topic的最新隊(duì)列情況,這意味著某個(gè)broker如果宕機(jī),生產(chǎn)者最多要30秒才能感知,在此期間,發(fā)往該broker的消息發(fā)送失敗。該時(shí)間由DefaultMQProducer的pollNameServerInteval參數(shù)決定,可手動(dòng)配置。
- 心跳:與nameserver沒有心跳
Producer連接Broker
- 連接:生產(chǎn)者 跟 Topic 涉及的所有Broker 保持長(zhǎng)連接。
- 心跳:默認(rèn)情況下,生產(chǎn)者每隔30秒向所有broker發(fā)送心跳。broker每隔10秒鐘(此時(shí)間無法更改),掃描所有還存活的連接,若某個(gè)連接2分鐘內(nèi)(當(dāng)前時(shí)間與最后更新時(shí)間差值超過2分鐘,此時(shí)間無法更改)沒有發(fā)送心跳數(shù)據(jù),則關(guān)閉連接
Producer連接上Broker之后,消息會(huì)通過輪詢的方式發(fā)送到Broker上,并且存儲(chǔ)在Broker中的CommitLog中,這里面存儲(chǔ)的是原始消息,還有一個(gè)ConsumeQueue用于存儲(chǔ)投遞到某一個(gè)queue的消息的位置信息。當(dāng)然,消息隊(duì)列會(huì)持久化到磁盤中的,不影響內(nèi)存,當(dāng)然也會(huì)定期清理消息。
那消費(fèi)完的消息去了哪里呢?什么時(shí)候清理物理消息文件呢?還有這樣設(shè)計(jì)的好處呢?
這些我們都留在下下一篇中,也就是Broker篇,讓你透徹了解Broker這個(gè)大腦是如何助力RocketMQ支持這么高的吞吐量的
總之啊,這個(gè)問題值得大家深入研究一下,如果再面試的時(shí)候,你不僅能說出RocketMQ的用處,你還能說出它的存儲(chǔ)原理和尋址原理,那面試官就愛上你了。此時(shí)你再拿出王炸,就是解決各種實(shí)際問題的能力,比如如何處理重復(fù)消息啊、如何保證消息的順序性啊、在分布式系統(tǒng)中如何保證分布式事務(wù)啊
面試官當(dāng)場(chǎng)給你發(fā)offer,say:How much money do you expect to work for us ?
3、消息的種類
RocketMQ種的消息種類大致可以分為四種:普通消息、定時(shí)和延時(shí)消息、順序消息、事務(wù)消息四種類型,這是重點(diǎn)!
簡(jiǎn)單介紹下四種類型
- 普通消息:消息隊(duì)列RocketMQ版中無特性的消息,區(qū)別于有特性的定時(shí)和延時(shí)消息、順序消息和事務(wù)消息。
- 定時(shí)和延時(shí)消息:允許消息生產(chǎn)者對(duì)指定消息進(jìn)行定時(shí)(延時(shí))投遞,最長(zhǎng)支持40天。
- 順序消息:允許消息消費(fèi)者按照消息發(fā)送的順序?qū)ο⑦M(jìn)行消費(fèi)。
- 事務(wù)消息:實(shí)現(xiàn)類似X或Open XA的分布事務(wù)功能,以達(dá)到事務(wù)最終一致性狀態(tài)。
消息隊(duì)列RocketMQ提供的四種消息類型所對(duì)應(yīng)的Topic不能混用,例如,創(chuàng)建的普通消息的Topic只能用于收發(fā)普通消息,不能用于收發(fā)其他類型的消息;同理,事務(wù)消息的Topic也只能收發(fā)事務(wù)消息,不能用于收發(fā)其他類型的消息,以此類推
普通消息
普通消息:消息隊(duì)列RocketMQ中無特性的消息,區(qū)別于有特性的定時(shí)和延時(shí)消息、順序消息和事務(wù)消息
普通消息以三種發(fā)送方式:同步Sync發(fā)送、異步Async發(fā)送和單向Oneway發(fā)送
同步就是我們發(fā)送了消息之后必須等到服務(wù)器響應(yīng)之后才能發(fā)送下一個(gè);異步適用于對(duì)時(shí)間較敏感的業(yè)務(wù)場(chǎng)景,異步不需要等待服務(wù)器的響應(yīng)就可以連續(xù)發(fā)送消息;單向則比異步用時(shí)更短,一般在微秒級(jí)別,但是可靠性會(huì)降低,因?yàn)橹还馨l(fā)送,不等待服務(wù)器響應(yīng),也沒有回調(diào)函數(shù)觸發(fā)
同步發(fā)送
同步,消息發(fā)送方發(fā)出一條消息后,會(huì)在收到服務(wù)端返回響應(yīng)之后才發(fā)下一條消息的通訊方式
異步發(fā)送
異步發(fā)送是指發(fā)送方發(fā)出一條消息后,不等服務(wù)端返回響應(yīng),接著發(fā)送下一條消息的通訊方式
消息隊(duì)列RocketMQ版的異步發(fā)送,需要實(shí)現(xiàn)異步發(fā)送回調(diào)接口(SendCallback)。消息發(fā)送方在發(fā)送了一條消息后,不需要等待服務(wù)端響應(yīng)即可發(fā)送第二條消息。發(fā)送方通過回調(diào)接口接收服務(wù)端響應(yīng),并處理響應(yīng)結(jié)果
一般用于對(duì)時(shí)間較敏感的業(yè)務(wù)場(chǎng)景
單向發(fā)送
發(fā)送方只負(fù)責(zé)發(fā)送消息,不等待服務(wù)端返回響應(yīng)且沒有回調(diào)函數(shù)觸發(fā),即只發(fā)送請(qǐng)求不等待應(yīng)答。此方式發(fā)送消息的過程耗時(shí)非常短,一般在微秒級(jí)別
應(yīng)用于對(duì)可靠性要求并不高的場(chǎng)景,比如日志收集
定時(shí)和延時(shí)消息
定時(shí)和延時(shí)消息:允許消息生產(chǎn)者對(duì)指定消息進(jìn)行定時(shí)(延時(shí))投遞,最長(zhǎng)支持40天
延時(shí)消息用于指定消息發(fā)送到消息隊(duì)列RocketMQ版的服務(wù)端后,延時(shí)一段時(shí)間才被投遞到客戶端進(jìn)行消費(fèi)(例如3秒后才被消費(fèi)),適用于解決一些消息生產(chǎn)和消費(fèi)有時(shí)間窗口要求的場(chǎng)景,或者通過消息觸發(fā)延遲任務(wù)的場(chǎng)景,類似于延遲隊(duì)列。
定時(shí)消息可以做到在指定時(shí)間戳之后才可被消費(fèi)者消費(fèi),適用于對(duì)消息生產(chǎn)和消費(fèi)有時(shí)間窗口要求,或者利用消息觸發(fā)定時(shí)任務(wù)的場(chǎng)景。
適用場(chǎng)景
通過消息來觸發(fā)一些定時(shí)任務(wù),這個(gè)時(shí)候這個(gè)定時(shí)消息就派上用場(chǎng)了,比如在某一時(shí)間向用戶發(fā)送的提醒消息;一些消息生產(chǎn)和消費(fèi)之間有時(shí)間窗口,比如典型的電商里面的超時(shí)未支付關(guān)閉訂單的場(chǎng)景,這時(shí)延時(shí)消息就派上用場(chǎng)了,超時(shí)未完成支付就關(guān)閉訂單
定時(shí)消息的精度會(huì)有1s~2s的延遲誤差
其實(shí)定時(shí)消息和延時(shí)消息在使用的時(shí)候也是有一些差別的,用過的應(yīng)該都知道,給大家提一下,定時(shí)消息需要明確指定消息發(fā)送時(shí)間點(diǎn)之后的某一時(shí)間點(diǎn)作為消息投遞的時(shí)間點(diǎn);延時(shí)消息則需要設(shè)定一個(gè)延時(shí)的時(shí)間長(zhǎng)度,長(zhǎng)度是固定的,但是時(shí)刻點(diǎn)不是固定,是根據(jù)發(fā)送消息的時(shí)間點(diǎn)有關(guān)的,消息將從當(dāng)前發(fā)送時(shí)間點(diǎn)開始延遲固定時(shí)間之后才開始投遞,這個(gè)大家應(yīng)該都很清楚了,淘寶下個(gè)單,給你留30分鐘時(shí)間支付,超時(shí)未支付則關(guān)閉訂單
順序消息
順序消息:允許消息消費(fèi)者按照消息發(fā)送的順序進(jìn)行消息的發(fā)送
順序消息分為兩類:
- 全局順序:對(duì)于指定的一個(gè)Topic,所有消息按照嚴(yán)格的先入先出FIFO(First In First Out)的順序進(jìn)行發(fā)布和消費(fèi)。
- 分區(qū)順序:對(duì)于指定的一個(gè)Topic,所有消息根據(jù)Sharding Key進(jìn)行區(qū)塊分區(qū)。同一個(gè)分區(qū)內(nèi)的消息按照嚴(yán)格的FIFO順序進(jìn)行發(fā)布和消費(fèi)。Sharding Key是順序消息中用來區(qū)分不同分區(qū)的關(guān)鍵字段,和普通消息的Key是完全不同的概念。
其實(shí)這也是個(gè)比較經(jīng)典的問題,面試也是比較常問的,就是如何保證順序性?魚魚反正會(huì)回答,你會(huì)嗎?
如果遇到這個(gè)問題,首先你要分情況說明,就是分為全局順序和分區(qū)順序這兩種情況:
1、全局順序適用于性能要求不高,所有的消息都要嚴(yán)格按照先進(jìn)先出的順序來發(fā)布和消費(fèi)的場(chǎng)景。這種情況我也沒遇到過,一般也不太會(huì)使用全局有序這種
2、分區(qū)順序適用于性能要求比較高,以Sharding Key作為分區(qū)字段,在用一個(gè)區(qū)塊中嚴(yán)格按照先進(jìn)先出的順序發(fā)布和消費(fèi)。比如用戶注冊(cè)的時(shí)候的驗(yàn)證碼,以用戶ID作為Sharding Key,那么同一個(gè)用戶發(fā)送的消息都會(huì)按照發(fā)布的先后順序來消費(fèi),再比如就是電商中的訂單流程問題
阿里巴巴集團(tuán)內(nèi)部電商系統(tǒng)均使用分區(qū)順序消息,既保證業(yè)務(wù)的順序,同時(shí)又能保證業(yè)務(wù)的高性能。別問我怎么知道的,阿里云官網(wǎng)寫的
順序消息常見問題
為什么全局順序消息性能一般?
全局順序消息是嚴(yán)格按照FIFO的消息阻塞原則,即上一條消息沒有被成功消費(fèi),那么下一條消息會(huì)一直被存儲(chǔ)到Topic隊(duì)列中。如果想提高全局順序消息的TPS,可以升級(jí)實(shí)例配置,同時(shí)消息客戶端應(yīng)用盡量減少處理本地業(yè)務(wù)邏輯的耗時(shí)。
順序消息支持哪種消息發(fā)送方式?是否支持集群消費(fèi)和廣播消費(fèi)?
順序消息只支持可靠同步發(fā)送方式,不支持異步發(fā)送方式,否則將無法嚴(yán)格保證順序。順序消息暫時(shí)僅支持集群消費(fèi)模式,不支持廣播消費(fèi)模式。
事務(wù)消息
事務(wù)消息:實(shí)現(xiàn)類似X或者Open XA的分布式事務(wù)功能,以達(dá)到最終一致性
消息隊(duì)列RocketMQ版提供類似X或Open XA的分布式事務(wù)功能,通過消息隊(duì)列RocketMQ版事務(wù)消息,能達(dá)到分布式事務(wù)的最終一致。
半事務(wù)消息:暫不能投遞的消息,發(fā)送方已經(jīng)成功地將消息發(fā)送到了消息隊(duì)列RocketMQ版服務(wù)端,但是服務(wù)端未收到生產(chǎn)者對(duì)該消息的二次確認(rèn),此時(shí)該消息被標(biāo)記成“暫不能投遞”狀態(tài),處于該種狀態(tài)下的消息即半事務(wù)消息。
消息回查:由于網(wǎng)絡(luò)閃斷、生產(chǎn)者應(yīng)用重啟等原因,導(dǎo)致某條事務(wù)消息的二次確認(rèn)丟失,消息隊(duì)列RocketMQ版服務(wù)端通過掃描發(fā)現(xiàn)某條消息長(zhǎng)期處于“半事務(wù)消息”時(shí),需要主動(dòng)向消息生產(chǎn)者詢問該消息的最終狀態(tài)(Commit或是Rollback),該詢問過程即消息回查。
跟小仙來看看事務(wù)消息發(fā)送步驟:
1、發(fā)送方將半事務(wù)消息發(fā)送到服務(wù)端Broker,服務(wù)端會(huì)將消息持久化,成功之后會(huì)返回ACK確認(rèn)消息已經(jīng)發(fā)送成功,此時(shí)消息為半事務(wù)消息
2、發(fā)送方開始執(zhí)行本地事務(wù)的邏輯
3、發(fā)送方會(huì)根據(jù)本地事務(wù)的執(zhí)行結(jié)果向服務(wù)端提交二次確認(rèn),決定Commit還是Rollback,服務(wù)端收到Commit之后則把這個(gè)消息標(biāo)記為可投遞,發(fā)送到消費(fèi)方;服務(wù)端收到Rollback之后則刪除半事務(wù)消息,服務(wù)端不會(huì)發(fā)送,則消費(fèi)方也不會(huì)收到
如可是如果斷網(wǎng)或者應(yīng)用重啟這些情況,上述的步驟的二次確認(rèn)信息無法到達(dá)服務(wù)端,怎么辦?
這里其實(shí)有個(gè)回查機(jī)制,發(fā)送方發(fā)送消息之后,需要本地執(zhí)行事務(wù),如果事務(wù)執(zhí)行的過程出現(xiàn)卡死的情況,或者事務(wù)執(zhí)行結(jié)果因?yàn)榫W(wǎng)絡(luò)等問題,無法傳遞事務(wù)結(jié)果到服務(wù)端,服務(wù)端會(huì)執(zhí)行一個(gè)回查機(jī)制,來確認(rèn)這個(gè)半事務(wù)消息的最終提交情況。
總結(jié)
消息隊(duì)列RocketMQ版的消費(fèi)者和生產(chǎn)者客戶端對(duì)象是線程安全的,可以在多個(gè)線程之間共享使用??梢栽诜?wù)器上(或者多臺(tái)服務(wù)器)部署多個(gè)生產(chǎn)者和消費(fèi)者實(shí)例,也可以在同一個(gè)生產(chǎn)者或消費(fèi)者實(shí)例里采用多線程發(fā)送或接收消息,從而提高消息發(fā)送或接收TPS。避免為每個(gè)線程創(chuàng)建一個(gè)客戶端實(shí)例。
好了,回顧一下本篇的內(nèi)容吧
1、消息發(fā)送的負(fù)載均衡、容錯(cuò)機(jī)制
2、消息發(fā)送流程和存儲(chǔ)(具體如何存儲(chǔ)會(huì)在Broker篇說,因?yàn)檫@些東西都存儲(chǔ)在Broker的CommitLog和ConsumerQueue中了)
3、消息的類型:普通消息(同步發(fā)送、異步發(fā)送、單向發(fā)送)、定時(shí)和延時(shí)消息、順序消息(全局順序和部分順序)、事務(wù)消息