自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

消息中間件應(yīng)用的常見(jiàn)問(wèn)題與方案

開(kāi)發(fā)
本文我們針對(duì)消息隊(duì)列中間件使用中的典型問(wèn)題作一番分析。

?1. 引言

消息隊(duì)列(MQ)中間件已經(jīng)普及很多年了,在互聯(lián)網(wǎng)應(yīng)用中,通常稍大一些的應(yīng)用,我們都可以見(jiàn)到MQ的身影。當(dāng)前市面上有很多中消息中間件,包括但不限于RabbitMQ、RocketMQ、ActiveMQ、Kafka(流處理中間件) 等。很多開(kāi)發(fā)人員已經(jīng)熟練地掌握了一個(gè)或者多個(gè)消息中間件的使用。但是仍然有一些小伙伴們對(duì)消息中間件不是特別熟悉,因?yàn)楦鞣N原因不能深入的去學(xué)習(xí)了解個(gè)中原理和細(xì)節(jié),導(dǎo)致使用的時(shí)候可能出現(xiàn)這樣那樣的問(wèn)題。在這里,我們就針對(duì)消息隊(duì)列中間件使用中的典型問(wèn)題作一番分析(包括順序消息、可靠性保證、消息冪等、延時(shí)消息等),并提供一些解決方案。

2. 消息中間件應(yīng)用背景

2.1 消息中間件基本思想

我們?cè)趩蝹€(gè)系統(tǒng)中,一些業(yè)務(wù)處理可以順序依次的進(jìn)行。而涉及到跨系統(tǒng)(有時(shí)候系統(tǒng)內(nèi)部亦然)的時(shí)候,會(huì)產(chǎn)生比較復(fù)雜數(shù)據(jù)交互(也可以理解為消息傳遞)的需求,這些數(shù)據(jù)的交互傳遞方式,可以是同步也可以是異步的。在異步傳遞數(shù)據(jù)的情況下,往往需要一個(gè)載體,來(lái)臨時(shí)存儲(chǔ)與分發(fā)消息。在此基礎(chǔ)上,專門針對(duì)消息接收、存儲(chǔ)、轉(zhuǎn)發(fā)而設(shè)計(jì)與開(kāi)發(fā)出來(lái)的專業(yè)應(yīng)用程序,都可以理解為消息隊(duì)列中間件。

引申一下:如果我們自己簡(jiǎn)單的使用一張數(shù)據(jù)庫(kù)表,來(lái)記錄數(shù)據(jù),然后接受數(shù)據(jù)存儲(chǔ)在數(shù)據(jù)表,通過(guò)定時(shí)任務(wù)再將數(shù)據(jù)表的數(shù)據(jù)分發(fā)出去,那么我們已經(jīng)實(shí)現(xiàn)了一個(gè)最簡(jiǎn)單的消息系統(tǒng)(這就是本地消息表)。

我們可以認(rèn)為消息中間件的基本思想就是 利用高效可靠的消息傳遞機(jī)制進(jìn)行異步的數(shù)據(jù)傳輸。在這個(gè)基本思想的指導(dǎo)下,不同的消息中間,因?yàn)槠鋫?cè)重場(chǎng)景目的不同,在功能、性能、整體設(shè)計(jì)理念上又各有差別。

消息隊(duì)列(MQ)本身是實(shí)現(xiàn)了生產(chǎn)者到消費(fèi)者的單向通信模型,RabbitMQ、RocketMQ、Kafka這些常用的MQ都是指實(shí)現(xiàn)了這個(gè)模型的消息中間件。目前最常用的幾個(gè)消息中間件主要有,RabbitMQ、RocketMQ、Kafka(分布式流處理平臺(tái))、Pulsar(分布式消息流平臺(tái))。這里我將兩個(gè)流處理平臺(tái)納入其中了, 更早的一些其他消息中間件已經(jīng)慢慢淡出視野。業(yè)務(wù)選型的時(shí)候我們遵循兩個(gè)主要的原則:最大熟悉程度原則(便于運(yùn)維、使用可靠)、業(yè)務(wù)契合原則(中間件性能可以支撐業(yè)務(wù)體量、滿足業(yè)務(wù)功能需求)。

這幾個(gè)常用的消息中間件選型對(duì)比,很容易找到,這里就不詳細(xì)描述了。大概說(shuō)一下:Pulsar目前用的不如 RabbitMQ、RocketMQ、Kafka多。RabbitMQ主要偏重是高可靠消息,RocketMQ性能和功能并重,Kafka主要是在大數(shù)據(jù)處理中應(yīng)用比較多(Pulsar比較類似)。

2.2 引入消息中間件的意義

我們先簡(jiǎn)單舉例介紹一下異步、解耦、削峰的意義與價(jià)值(參考下面這張流程圖):

對(duì)于一個(gè)用戶注冊(cè)接口,假設(shè)有2個(gè)業(yè)務(wù)點(diǎn),分別是注冊(cè)、發(fā)放新人福利,各需要50ms去處理邏輯。如果我們將這兩個(gè)業(yè)務(wù)流程耦合在一個(gè)接口,那么總計(jì)需要100ms處理完成。但是該流程中,用戶注冊(cè)時(shí)候,可以不用關(guān)心自己的福利是否立即發(fā)放,只要盡快注冊(cè)成功返回?cái)?shù)據(jù)即可,后續(xù)新人福利這一部分業(yè)務(wù)可以在主流程之外處理。我們?nèi)绻麑⑵鋭冸x出來(lái),接口主流程中只處理登陸邏輯,并通過(guò)MQ推送一條消息,通過(guò)異步方式處理后續(xù)的發(fā)放新人福利邏輯,這樣即可保證注冊(cè)接口50ms左右即能獲取結(jié)果。

而發(fā)放新人福利的業(yè)務(wù),則通過(guò)異步任務(wù)慢慢處理。通過(guò)拆分業(yè)務(wù)點(diǎn),我們已經(jīng)做到解耦,注冊(cè)的附屬業(yè)務(wù)中增加或減少功能點(diǎn)都不會(huì)影響主流程。另外如果一個(gè)業(yè)務(wù)主流程在某個(gè)點(diǎn)請(qǐng)求并發(fā)比較高,正好通過(guò)異步方式,可以將壓力分散到更長(zhǎng)的時(shí)間段中去,達(dá)到減輕固定時(shí)間段處理壓力的目的,這就是流量削峰。

另外,單線程模型的語(yǔ)言,通常對(duì)消息中間件的需求更強(qiáng)烈。多線程模型的語(yǔ)言,或者協(xié)程型語(yǔ)言,雖然可以通過(guò)自身的多線程(或協(xié)程)機(jī)制,來(lái)實(shí)現(xiàn)業(yè)務(wù)內(nèi)部的異步處理,但是考慮到持久化問(wèn)題以及管理難度,還是成熟的中間件更適合用來(lái)做異步數(shù)據(jù)通信,中間件還能實(shí)現(xiàn)分布式系統(tǒng)之間的數(shù)據(jù)異步通信。

2.3 消息中間件的應(yīng)用場(chǎng)景

 消息中間件的應(yīng)用場(chǎng)景主要有:

  • 異步通信:可以用于業(yè)務(wù)系統(tǒng)內(nèi)部的異步通信,也可以用于分布式系統(tǒng)信息交互。

  • 系統(tǒng)解耦:將不同性質(zhì)的業(yè)務(wù)進(jìn)行隔離切分,提升性能,主附流程分層,按照重要性進(jìn)行隔離,減少異常影響。

  • 流量削峰:間歇性突刺流量分散處理,減少系統(tǒng)壓力,提升系統(tǒng)可用性。

  • 分布式事務(wù)一致性:RocketMQ提供的事務(wù)消息功能可以處理分布式事務(wù)一致性(如電商訂單場(chǎng)景)。當(dāng)然,也可以使用分布式事務(wù)中間件。

  • 消息順序收發(fā):這是最基礎(chǔ)的功能,先進(jìn)先出,消息隊(duì)列必備。

  • 延時(shí)消息:延遲觸發(fā)的業(yè)務(wù)場(chǎng)景,如下單后延遲取消未支付訂單等。

  • 大數(shù)據(jù)處理:日志處理,kafka。

  • 分布式緩存同步:消費(fèi)MySQLbinlog日志進(jìn)行緩存同步,或者業(yè)務(wù)變動(dòng)直接推送到MQ消費(fèi)。

所以,如果你的業(yè)務(wù)中有以上列舉的場(chǎng)景,或者類似的功能、性能需求,那么趕快引入「消息中間件」來(lái)提升你的業(yè)務(wù)性能吧。

3. 引入消息中間件帶來(lái)的一系列問(wèn)題

雖然消息中間件引入有以上那么多好處,但是使用的時(shí)候依然會(huì)存在很多問(wèn)題。例如:

  • 引入消息中間件增加了系統(tǒng)復(fù)雜度,怎么使用維護(hù);
  • 消息發(fā)送失敗怎么辦(消息丟失);
  • 為了確保能發(fā)成功,消息重復(fù)發(fā)送了怎么辦(消息重復(fù));
  • 消息在中間件流轉(zhuǎn)出現(xiàn)異常怎么處理;
  • 消息消費(fèi)時(shí)候,如果消費(fèi)流程失敗了怎么處理,還能不能重新從中間件獲取到這條消息;
  • 消費(fèi)失敗如果還能獲取,那會(huì)不會(huì)出現(xiàn)失敗情況下,一直重復(fù)消費(fèi)同一條消息,從而流程卡死;
  • 消費(fèi)失敗如果不能再獲取,那么我們?cè)撛趺创_保這條消息能再次被處理;
  • 重復(fù)消費(fèi)到相同的消息流程怎么處理,會(huì)不會(huì)導(dǎo)致業(yè)務(wù)異常;
  • 那么我們?cè)撛趺创_保消費(fèi)流程只成功執(zhí)行一次;
  • 對(duì)于那些有順序的消息我們應(yīng)該怎么保證發(fā)送和消費(fèi)的順序一致;
  • 消息太多了,怎么保證消費(fèi)腳本消費(fèi)速度,以便更得上業(yè)務(wù)的處理需求,避免消息無(wú)限積壓;
  • 我想要發(fā)送的消息,等上幾秒鐘的時(shí)間再消費(fèi)到,該怎么做;

當(dāng)然我們對(duì)于以上的這些問(wèn)題,針對(duì)業(yè)務(wù)開(kāi)發(fā)者來(lái)說(shuō),可以進(jìn)行提煉,得到以下幾個(gè)重點(diǎn)問(wèn)題:

  • 消息順序性保證
  • 避免消息丟失
  • 消息的重復(fù)問(wèn)題
  • 消息積壓處理
  • 延遲消息處理

4. 問(wèn)題的解決方案

4.1 消息順序性保證

常規(guī)的消息中間件和流處理中間件,本身設(shè)計(jì)一般都能支持順序消息,但是根據(jù)中間件本身不同的設(shè)計(jì)目標(biāo),有不同的原理架構(gòu),導(dǎo)致我們業(yè)務(wù)中使用中間件的時(shí)候,要針對(duì)性做不同的處理。

以下幾個(gè)常用消息或流中間件的順序消息設(shè)計(jì)以及使用中亂序問(wèn)題分析:

RabbitMQ:

RabbitMQ的單個(gè)隊(duì)列(queue)自身,可以保證消息的先進(jìn)先出,在設(shè)計(jì)上,RabbitMQ所提供的單個(gè)隊(duì)列數(shù)據(jù)是存儲(chǔ)在單個(gè)broker節(jié)點(diǎn)上的,在開(kāi)啟鏡像隊(duì)列的情況下,鏡像的隊(duì)列也只是作為消息副本而存在,服務(wù)依然由主隊(duì)列提供。這種情況下在單個(gè)隊(duì)列上進(jìn)行消費(fèi),天然就是順序性的。不過(guò)由于單個(gè)隊(duì)列支持多消費(fèi)者同時(shí)消費(fèi),我們?cè)陂_(kāi)啟多個(gè)消費(fèi)者消費(fèi)統(tǒng)一隊(duì)列上的數(shù)據(jù)時(shí)候,消息分散到多個(gè)消費(fèi)者上,在并發(fā)高的時(shí)候,多個(gè)消費(fèi)者無(wú)法保證處理消息的順序性。

 解決方法就是對(duì)于需要強(qiáng)制順序的消息,使用同一個(gè)MQ隊(duì)列,并且針對(duì)單個(gè)隊(duì)列只開(kāi)啟一個(gè)消費(fèi)者消費(fèi)(保證并發(fā)處理時(shí)候的順序性,多線程同理)。由此引發(fā)的單個(gè)隊(duì)列吞吐下降的問(wèn)題,可以采取kafka的設(shè)計(jì)思想,針對(duì)單一任務(wù)開(kāi)啟一組多個(gè)隊(duì)列,將需要順序的消息按照其固定標(biāo)識(shí)(例如:ID)進(jìn)行路由,分散到這一組隊(duì)列中,相同標(biāo)識(shí)的消息進(jìn)入到相同的隊(duì)列,單個(gè)隊(duì)列使用單個(gè)消費(fèi)者消費(fèi),這樣即可以保證消息的順序與吞吐。

如圖所示:

Kafka:

Kafka是流處理中間件,在其設(shè)計(jì)中,沒(méi)有隊(duì)列的概念,消息的收發(fā)依賴于Topic,單個(gè)topic可以有多個(gè)partition(分區(qū)),這些partition可以分散到多臺(tái)broker節(jié)點(diǎn)上,并且partition還可以設(shè)置副本備份以保證其高可用。

Kafka同一個(gè)topic可以有多個(gè)消費(fèi)者,甚至消費(fèi)組。Kafka中消息消費(fèi)一般使用消費(fèi)組(消費(fèi)組可以互不干涉的消費(fèi)同一個(gè)topic下的消息)來(lái)進(jìn)行消費(fèi),消費(fèi)組中可以有多個(gè)消費(fèi)者。同一個(gè)消費(fèi)組消費(fèi)單個(gè)topic下的多個(gè)partition時(shí),將由kafka來(lái)調(diào)節(jié)消費(fèi)組中消費(fèi)者與partiton的消費(fèi)進(jìn)度與均衡。但是有一點(diǎn)是可以保證的:那就是單個(gè)partition在同一個(gè)消費(fèi)組中只能被一個(gè)消費(fèi)者消費(fèi)。

以上的設(shè)計(jì)理念下,Kafka內(nèi)部保證在同一個(gè)partition中的消息是順序的,不保證topic下的消息的順序性。Kafka的消息生產(chǎn)者發(fā)送消息的時(shí)候,是可以選擇將消息發(fā)送到哪個(gè)partition中的,我們只要將需要順序處理的消息,發(fā)送到topic下相同的partition,即可保證消息消費(fèi)的順序性。(多線程語(yǔ)言使用單個(gè)消費(fèi)者,多線程處理數(shù)據(jù)時(shí),需要自己去保證處理的順序,這里略過(guò))。

RocketMQ:

RocketMQ的一些基本概念和原理,可以通過(guò)阿里云的官網(wǎng)做一些了解: 什么是消息隊(duì)列RocketMQ版?- 消息隊(duì)列RocketMQ版 - 阿里云【1】 

RocketMQ的消息收發(fā)也是基于Topic的,Topic下有多個(gè) Queue, 分布在一個(gè)或多個(gè) Broker 上,用來(lái)保證消息的高性能收發(fā)( 與Kafka的Topic-Partition機(jī)制 有些類似,但內(nèi)部實(shí)現(xiàn)原理并不相同 )。

RocketMQ支持局部順序消息消費(fèi),也就是保證同一個(gè)消息隊(duì)列上的消息順序消費(fèi)。不支持消息全局順序消費(fèi),如果要實(shí)現(xiàn)某一個(gè)主題的全局順序消息消費(fèi),可以將該主題的隊(duì)列數(shù)量設(shè)置為1,犧牲高可用性。具體圖解可以參考阿里云文檔: 順序消息2.0 - 消息隊(duì)列RocketMQ版 - 阿里云【2】

4.2 避免消息丟失

消息丟失需要分為三部分來(lái)看:消息生產(chǎn)者發(fā)送消息到消息中間件的過(guò)程不發(fā)生消息丟失,消息在消息中間件中從接受存儲(chǔ)到被消費(fèi)的過(guò)程中消息不丟失, 消息消費(fèi)的過(guò)程中保證能消費(fèi)到中間件發(fā)送的消息而不會(huì)丟失。

生產(chǎn)者發(fā)送消息不丟失:

消息中間件一般都有消息發(fā)送確認(rèn)機(jī)制(ACK), 對(duì)于客戶端來(lái)說(shuō),只要配置好消息發(fā)送需要ACK確認(rèn),就可以根據(jù)返回的結(jié)果來(lái)判斷消息是否成功發(fā)送到中間件中。這一步通常與中間件的消息接受存儲(chǔ)流程設(shè)計(jì)有關(guān)系。根據(jù)中間件的設(shè)計(jì),我們通常采取的措施如下:

  • 開(kāi)啟MQ的ACK(或confirm)機(jī)制,直接獲知消息發(fā)送結(jié)果
  • 開(kāi)啟消息隊(duì)列的持久化機(jī)制(落盤(pán),如果需要特殊設(shè)置的話)
  • 中間件本身做好高可用部署
  • 消息發(fā)送失敗補(bǔ)償設(shè)計(jì)(重試等)

在具體的業(yè)務(wù)設(shè)計(jì)中,如果消息發(fā)送失敗,我們可以根據(jù)業(yè)務(wù)重要程度,做相應(yīng)的補(bǔ)償,例如:

  1. 消息失敗重試機(jī)制(發(fā)送失敗,繼續(xù)重發(fā),可以設(shè)置重試上限)
  2. 如果依然失敗,根據(jù)消息重要性,選擇降級(jí)方案:直接丟棄或者降級(jí)到其他中間件或載體(同時(shí)需要相應(yīng)的降級(jí)補(bǔ)償推送或消費(fèi)設(shè)計(jì))

消息中間件消息不丟失:

數(shù)消息中間件的消息接收存儲(chǔ)機(jī)制各不相同,但是會(huì)根據(jù)其特性設(shè)計(jì),最大限度保證消息不會(huì)丟失:

RabbitMQ消息接收與保存:

  • RabbitMQ 消息發(fā)送可以開(kāi)啟發(fā)送者confirm模式,所有消息是否發(fā)送成功都會(huì)通知發(fā)送者;
  • 需要開(kāi)啟隊(duì)列消息持久化保證消息落盤(pán);
  • RabbitMQ通過(guò)鏡像隊(duì)列來(lái)保證消息隊(duì)列的高可用,但是鏡像隊(duì)列只有Master提供服務(wù),其他slave只提供備份服務(wù);
  • master宕機(jī)會(huì)從slave中選擇一個(gè)成為新的master提供服務(wù);
  • master的生產(chǎn)與消費(fèi)的最新?tīng)顟B(tài)都會(huì)廣播到slave;

RocketMQ消息接受與保存:

  • RocketMQ普通消息發(fā)送有三種方式:同步(Sync)發(fā)送、異步(Async)發(fā)送和單向(Oneway)發(fā)送,其區(qū)別與準(zhǔn)確性保證可以參看 「 發(fā)送普通消息(三種方式) - 消息隊(duì)列RocketMQ版 - 阿里云」 【3】
  • 具體的RocketMQ內(nèi)部設(shè)計(jì)的HA機(jī)制是主從同步機(jī)制,消息發(fā)送到Topic下并具體消息隊(duì)列的Master Broker中后,會(huì)將消息同步到Slave。
  • 只有Master Broker才可以接收生產(chǎn)者發(fā)送的消息。而消費(fèi)者,可以從Master也可以從Slave拉取并消費(fèi)消息。

Kafka在消息接受到保存所做的設(shè)計(jì)有:

  • 分區(qū)副本方式的設(shè)計(jì)保證消息的高可用,在創(chuàng)建topic的時(shí)候都可以設(shè)置分區(qū)副本的數(shù)量;
  • 生產(chǎn)者可以選擇接收不同類型的確認(rèn)(ACK),比如在消息被完全提交時(shí)候(寫(xiě)入所有同步副本)的確認(rèn),或者在消息被寫(xiě)入首領(lǐng)副本時(shí)的確認(rèn),或者在消息被發(fā)送到網(wǎng)絡(luò)時(shí)確認(rèn);
  • Kafka的消息,寫(xiě)入分區(qū)的時(shí)候僅僅是保存在某幾個(gè)分區(qū)副本文件系統(tǒng)內(nèi)存中,并不是直接刷到磁盤(pán)了,因此宕機(jī)時(shí)候,單個(gè)副本仍然可能丟失數(shù)據(jù)。Kafka不能保證單個(gè)分區(qū)副本的數(shù)據(jù)一定不丟失,而是靠分區(qū)副本機(jī)制來(lái)確保消息的完善性(分布到不同的broker上)。

積壓消息保存時(shí)效問(wèn)題

  • Kafka對(duì)于topic下的數(shù)據(jù),有容量上限、時(shí)間上限兩種消息存儲(chǔ)上限規(guī)則,觸發(fā)其中任何一個(gè)規(guī)則,都會(huì)刪除淘汰之前的消息。這個(gè)尤其需要注意。
  • RocketMQ,消息在服務(wù)器存儲(chǔ)時(shí)間也有上限,達(dá)到上限的消息將會(huì)被刪除。也需要做相應(yīng)的考量。
  • 受持久化磁盤(pán)容量的影響,存儲(chǔ)積壓的數(shù)據(jù)不能超過(guò)磁盤(pán)的上限。
  • 如果業(yè)務(wù)消費(fèi)有異常,需要給足充足的冗余量,避免因?yàn)橄M(fèi)不及時(shí)而丟失數(shù)據(jù)。

消費(fèi)者消費(fèi)消息不丟失:

  • 消息消費(fèi)時(shí)候,也要開(kāi)啟相應(yīng)的ACK機(jī)制,消息消費(fèi)成功即ACK(對(duì)于Kafka就是更新消費(fèi)的offset);
  • 對(duì)于RocketMQ這種有消息重新消費(fèi)設(shè)計(jì)的,需要設(shè)置最大消費(fèi)次數(shù),嘗試失敗的消息重復(fù)消費(fèi)。

消息ACK帶來(lái)兩個(gè)問(wèn)題:

  • 消息消費(fèi)失敗如果不能ACK可能會(huì)導(dǎo)致消息消費(fèi)無(wú)限阻塞在某條消息處;
  • 消息失敗重新消費(fèi)導(dǎo)致消息消費(fèi)重復(fù)。

無(wú)限阻塞的問(wèn)題,可以參考RocketMQ消費(fèi)失敗的重試機(jī)制,對(duì)消息重試做一定的設(shè)計(jì):

  1. 在消息體上設(shè)計(jì)重試次數(shù)的屬性,消費(fèi)失敗的消息增加重試次數(shù)后重新發(fā)送到中間件,等待下一次消費(fèi),本次消費(fèi)成功發(fā)回消息直接ACK。
  2. 消息重試次數(shù)達(dá)到上限之后,如果仍不能成功,則啟用降級(jí)方案,將消息存儲(chǔ)到異常信息持久化載體如DB中。
  3. 手動(dòng)或者定時(shí)任務(wù)補(bǔ)償處理失敗的消息。

消息重復(fù)消費(fèi)問(wèn)題參考下一個(gè)小節(jié)。

4.3 消息的重復(fù)問(wèn)題(消費(fèi)冪等)

在分析常用中間件的時(shí)候,我們往往會(huì)發(fā)現(xiàn),中間件設(shè)計(jì)者將這個(gè)問(wèn)題的處理,下放給中間件使用者,也就是業(yè)務(wù)開(kāi)發(fā)者了。誠(chéng)然,業(yè)務(wù)消費(fèi)處理的邏輯比消息生產(chǎn)者復(fù)雜得多。生產(chǎn)者只需要保證將消息成功發(fā)送到中間件即可,而消費(fèi)者需要在消費(fèi)腳本中處理各種復(fù)雜的業(yè)務(wù)邏輯。

解決消息重復(fù)消費(fèi)的問(wèn)題,核心是使用唯一標(biāo)識(shí),來(lái)標(biāo)記某條消息是否已經(jīng)處理過(guò)。具體方案可選的則有很多,比如:

  • 使用數(shù)據(jù)庫(kù)自增主鍵,或者唯一鍵來(lái)保證數(shù)據(jù)不會(huì)重復(fù)變動(dòng);
  • 使用中間狀態(tài),以及狀態(tài)變動(dòng)有序性來(lái)判斷業(yè)務(wù)是否以已經(jīng)被處理;
  • 利用一張日志表來(lái)記錄已經(jīng)處理成功的消息的 ID,如果新到的消息 ID 已經(jīng)在日志表中,那么就不再處理這條消息;
  • 或者消息唯一標(biāo)識(shí),在Redis等NoSQL中維護(hù)一個(gè)處理緩存,判斷是否已經(jīng)處理過(guò);
  • 如果消費(fèi)者業(yè)務(wù)流程比較長(zhǎng),則需要開(kāi)發(fā)者自己保證整個(gè)業(yè)務(wù)消費(fèi)邏輯中數(shù)據(jù)處理的事務(wù)性。

4.4 消息積壓處理

通常我們?cè)谝胂⒅虚g件的時(shí)候,已經(jīng)會(huì)評(píng)估與測(cè)試消息消費(fèi)的生產(chǎn)與消費(fèi)速率,盡量使其達(dá)到平衡。但業(yè)務(wù)也有一些不可預(yù)知的突發(fā)情況,可能會(huì)造成消息的大量積壓。在這個(gè)時(shí)候,我們可以采取如下的方式,來(lái)做處理:

臨時(shí)緊急擴(kuò)容

  1. 通過(guò)增加消費(fèi)腳本的方式,提升消費(fèi)速率,如果下游沒(méi)有限制的話,可以很快的減少消息積壓。
  2. 如果消費(fèi)者下游數(shù)據(jù)處理能力有限,我們可以考慮建立臨時(shí)隊(duì)列,通過(guò)臨時(shí)腳本,將消息快速轉(zhuǎn)移到臨時(shí)隊(duì)列,優(yōu)先保證線上業(yè)務(wù)能順利貫通,而后開(kāi)啟更多的消費(fèi)腳本處理積壓的數(shù)據(jù)。(順序消息需要額外處理,并保證最終處理的順序)
  3. 優(yōu)化消費(fèi)腳本的處理速度,突破下游限制,如果有可能,可以考慮批量處理,下游擴(kuò)容等方式。

消息積壓預(yù)防

  • 做好業(yè)務(wù)設(shè)計(jì)與降級(jí),避免產(chǎn)生無(wú)效消息占用資源
  • 根據(jù)消息積壓程度,動(dòng)態(tài)增減消費(fèi)者數(shù)量,減少消息積壓
  • 做好消息積壓處理緊急預(yù)案,異常情況根據(jù)預(yù)案設(shè)計(jì),迅速針對(duì)處理

4.5 延遲消息處理

延遲消息這一項(xiàng)功能,在部分MQ中間件中有實(shí)現(xiàn)。延時(shí)消息和定時(shí)消息其實(shí)可以互相轉(zhuǎn)換。

RocketMQ:

RocketMQ定時(shí)消息不支持任意的時(shí)間精度(出于性能考量)。只支持特定級(jí)別的延遲消息。消息延遲級(jí)別在broker端通過(guò)messageDelayLevel配置。其內(nèi)部對(duì)每一個(gè)延遲級(jí)別創(chuàng)建對(duì)應(yīng)的消息消費(fèi)隊(duì)列,然后創(chuàng)建對(duì)應(yīng)延遲級(jí)別的定時(shí)任務(wù),從消息消費(fèi)隊(duì)列中將消息拉取并恢復(fù)消息的原主題和原消息消費(fèi)隊(duì)列。

RabbitMQ:

RabbitMQ實(shí)現(xiàn)延遲消息通常有兩個(gè)方案:一是創(chuàng)建一個(gè)消息延遲死信隊(duì)列,搭配一個(gè)死信轉(zhuǎn)發(fā)隊(duì)列來(lái)實(shí)現(xiàn)消費(fèi)延時(shí)。但是該方式如果前一個(gè)消息沒(méi)達(dá)到TTL時(shí)間,后一個(gè)消息即便達(dá)到了,也不會(huì)被轉(zhuǎn)發(fā)到轉(zhuǎn)發(fā)隊(duì)列中;另一個(gè)是使用延時(shí)Exchange插件(rabbitmq_delayed_message_exchange),消息在達(dá)到TTL之后才會(huì)轉(zhuǎn)發(fā)到對(duì)應(yīng)的隊(duì)列中并被消費(fèi)。

Kafka本身不支持延時(shí)消息或定時(shí)消息, 想要實(shí)現(xiàn)消息的延時(shí),需要使用其他的方案。

借助數(shù)據(jù)庫(kù)與定時(shí)任務(wù)實(shí)現(xiàn)延時(shí)消息:

常用數(shù)據(jù)庫(kù)的索引結(jié)構(gòu)都支持?jǐn)?shù)據(jù)的順序索引。借助數(shù)據(jù)庫(kù)可以很方便地實(shí)現(xiàn)任意時(shí)間消息的延時(shí)消費(fèi)。使用一張表存儲(chǔ)數(shù)據(jù)的消費(fèi)時(shí)間,開(kāi)啟定時(shí)任務(wù),在滿足條件之后將該消息提取出來(lái),后續(xù)轉(zhuǎn)發(fā)到順序隊(duì)列去處理或者直接處理都可以(已處理需要做標(biāo)記,后續(xù)不再出現(xiàn)),但是直接處理需要考慮吞吐量和并發(fā)重復(fù)性等問(wèn)題。不如單個(gè)腳本轉(zhuǎn)發(fā)到普通隊(duì)列去處理方便。數(shù)據(jù)庫(kù)支持的定時(shí)任務(wù)消息積壓是可控的,但是吞吐量會(huì)有局限。

借助Reids的有序列表實(shí)現(xiàn)延時(shí)消息:

Reids的有序列表zset結(jié)構(gòu),可以實(shí)現(xiàn)延時(shí)消息。將消息的消費(fèi)時(shí)間作為分值,把消息添加到zset中。使用 zrangebyscore 命令消費(fèi)消息 # 命令格式 zrangebysocre key min max withscores limit 0 1   消費(fèi)最早的一條消息 #min max 分別表示開(kāi)始的分值與結(jié)束的分值區(qū)間,分別使用 0和當(dāng)前時(shí)間戳,可以查出達(dá)到消費(fèi)時(shí)間的消息 # withscores 表示查詢的數(shù)據(jù)要帶分值。limit 后面 就是查詢的起始 offset 和數(shù)量 zrangebyscore key 0 {當(dāng)前時(shí)間戳} withscores limit 0 1  。

當(dāng)然,這個(gè)方案也有局限性,首先,redis必須配置持久化防止消息丟失(如果配置不合理不能100%保證,但是每個(gè)命令都持久化會(huì)造成性能下降,需要權(quán)衡);其次,如果延時(shí)消息過(guò)多會(huì)造成消息的積壓形成大key;再次,需要自己做重復(fù)消費(fèi)和消費(fèi)失敗的平衡處理(當(dāng)然有可能,還是建議開(kāi)啟單個(gè)消費(fèi)進(jìn)程將延時(shí)消息轉(zhuǎn)移到普通隊(duì)列去消費(fèi))。

基于時(shí)間輪的任務(wù)調(diào)度:

在很多軟件中,都有基于時(shí)間輪實(shí)現(xiàn)定時(shí)任務(wù)的實(shí)現(xiàn),使用時(shí)間輪以及多級(jí)時(shí)間輪可以實(shí)現(xiàn)延時(shí)任務(wù)調(diào)度。如果我們希望自己實(shí)現(xiàn)延時(shí)任務(wù)隊(duì)列,可以考慮使用此算法來(lái)實(shí)現(xiàn)任務(wù)的調(diào)度,但是需要自己根據(jù)具體的需求去設(shè)計(jì)支持任務(wù)的延時(shí)上限以及調(diào)度的時(shí)間粒度(多層級(jí))。時(shí)間輪算法我這里就先不講解了,感興趣的可以自己去搜索了解。

5. 總結(jié)

通過(guò)以上幾個(gè)小節(jié)的介紹,相信各位已經(jīng)能很自然的理解:消息隊(duì)列、異步解耦的功能與核心思想,并且對(duì)如何使用MQ來(lái)架構(gòu)自己的業(yè)務(wù)有了一定的認(rèn)知。大多數(shù)MQ使用中的問(wèn)題,只是要求我們多思考,將細(xì)節(jié)思慮周到,以保證業(yè)務(wù)的高可用。甚至,我們還可以在這幾個(gè)解決方案中提煉一些核心出來(lái),以便在業(yè)務(wù)中參照類似的思想,優(yōu)化我們的業(yè)務(wù)。比如,消息順序性保證 其核心是順序消息生產(chǎn)者發(fā)送到唯一分區(qū),再維持固定分區(qū)的單消費(fèi)者順序消費(fèi);避免消息丟失的核心是每個(gè)步驟的確認(rèn)與降級(jí)機(jī)制;消費(fèi)冪等的核心是唯一性標(biāo)識(shí)與步進(jìn)狀態(tài);消息積壓處理的核心是快速響應(yīng)應(yīng)急預(yù)案;延遲消息的核心是消息排序,優(yōu)化點(diǎn)是性能提升。

科學(xué)的方法有歸納和演繹,學(xué)習(xí)問(wèn)題處理方案的過(guò)程中,提煉出相應(yīng)的核心思想,并在使用中演繹,將這些歸納總結(jié)的知識(shí)點(diǎn),再應(yīng)用到業(yè)務(wù)中去,更加得心應(yīng)手的處理相應(yīng)的事務(wù),構(gòu)建出高可用的業(yè)務(wù)架構(gòu),這才是我們最需要做到的。

責(zé)任編輯:張燕妮 來(lái)源: 得物技術(shù)
相關(guān)推薦

2021-12-14 10:39:12

中間件ActiveMQRabbitMQ

2023-06-29 10:10:06

Rocket MQ消息中間件

2023-10-24 07:50:18

消息中間件MQ

2022-11-18 07:54:02

Go中間件項(xiàng)目

2018-02-01 10:19:22

中間件服務(wù)器系統(tǒng)

2015-08-11 11:16:36

淘寶中間件

2022-11-02 10:08:46

分布式高并發(fā)消息中間件

2022-10-28 13:33:05

Push模式互聯(lián)網(wǎng)高并發(fā)

2011-10-24 07:41:38

SOA中間件應(yīng)用服務(wù)器

2022-08-09 08:31:29

RocketMQ消息中間件

2023-05-08 08:09:26

路由元信息謂詞

2011-12-15 01:10:03

ibmdw

2011-11-28 17:53:55

淘寶aDev技術(shù)沙龍

2019-07-19 07:56:13

消息隊(duì)列消息代理消息中間件

2023-10-16 12:25:48

2024-01-24 08:19:02

Stream應(yīng)用場(chǎng)景注解

2022-02-13 23:04:28

RedisRabbitMQKafka

2022-10-21 10:48:17

消息中間件互聯(lián)網(wǎng)應(yīng)用協(xié)議

2021-03-06 08:02:39

MySQL集群服務(wù)器

2016-11-11 21:00:46

中間件
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)