自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

RocketMQ 用法詳解,你學會了嗎?

開發(fā) 架構
正常情況下生產(chǎn)者組是沒有作用的,但是在發(fā)送事務消息時,如果producer中途意外宕機,broker會主動回調(diào)producer group 內(nèi)的任意一臺機器來確認事務的狀態(tài)。

大家好,我是指北君。

圖片

消息中間件是我們工作中使用最頻繁的一類中間件,它具有低耦合、可靠投遞、廣播、流量控制、最終一致性等一系列功能,成為異步RPC的主要手段之一。當今市面上有很多主流的消息中間件,如老牌的ActiveMQ、RabbitMQ,炙手可熱的Kafka,阿里巴巴自主開發(fā)RocketMQ等。今天,指北君就來詳細講講RocketMQ生產(chǎn)者和消費者在使用時的一些注意事項。

一. 生產(chǎn)者

1.1 發(fā)送消息注意事項

1)消息大小

建議消息大小不要超過512K。

2)異步發(fā)送

默認的發(fā)送為同步發(fā)送,send方法會一直阻塞,等待broker端的響應。如果你關注性能問題,可以通過send(msg, callback)來發(fā)起異步調(diào)用。

3)生產(chǎn)者組

正常情況下生產(chǎn)者組是沒有作用的,但是在發(fā)送事務消息時,如果producer中途意外宕機,broker會主動回調(diào)producer group 內(nèi)的任意一臺機器來確認事務的狀態(tài)。(目前開源版本還不支持事務消息)。

4)線程安全問題

生產(chǎn)者實例是線程安全的,在應用中只需要實例化一次即可。

5)性能問題

如果你希望在一個jvm進程內(nèi)使用多個producer實例來提高發(fā)送能,我們建議:

使用異步發(fā)送,并且producer實例只需要3 ~ 5個即可 對每一個producer 調(diào)用 setInstanceName,區(qū)別不同的生產(chǎn)者。

6)發(fā)送超時時間

當客戶端向broker發(fā)送請求超時時,客戶端會拋出 RemotingTimeoutException,默認的超時時間是3秒。通過調(diào)用send(msg, timeout) 可以設置超時時間。超時時間建議不要設置過小,因為 broker 可能需要時間刷盤或向 slave 同步數(shù)據(jù)。

7)對于同一個應用最好只使用一個Topic,消息的子類型可以使用 tags 來標識,tags 可以由應用自由設置。當發(fā)送的消息設置了 tags 時,消費方在訂閱消息時可以使用 tags 在 broker 做消息過濾。注意這里的命名雖然是復數(shù),但是一條消息只能有一個tag。

8)消息在業(yè)務層面的唯一標識可以設置到 keys 字段,方便根據(jù) keys 來定位消息。broker 會為每個消息創(chuàng)建索引(哈希索引),應用可以通過topic 、key 查詢這條消息的內(nèi)容(MessageExt),以及消息被誰消費(MessageTrack,精確到consumer group)。由于是哈希索引,請盡量保證key 的唯一,這樣可以避免潛在的哈希沖突。

9)消息發(fā)送不管是成功還是失敗都要打印消息日志,日志內(nèi)容務必包含 sendResult 和 key 字段。

10)對于消息不可丟失的應用,務必要有消息重發(fā)機制。例如如果消息發(fā)送失敗,可以將消息存儲到數(shù)據(jù)庫,然后通過定時程序或者人工的方式觸發(fā)重發(fā)。

11)調(diào)用send 同步發(fā)送消息時,假定此時設置了 isWaitStoreMsgOK=true(default is true),只要不拋出異常就代表發(fā)送成功,但當 isWaitStoreMsgOK = false 時,發(fā)送永遠返回 SEND_OK。但是對于發(fā)送“成功”會有多個狀態(tài),在 SendStatus 中定義如下:

FLUSH_DISK_TIMEOUT

如果 broker 設置的 FlushDiskType = SYNC_FLUSH,當 broker 的在刷盤超時時(MessageStoreConfig.syncFlushTimeout,默認5秒)會返回該狀態(tài)。此時消息任然保存在內(nèi)存中,只有broker 宕機時消息才會丟失。

FLUSH_SLAVE_TIMEOU

如果 broker 的 role 是 SYNC_MASTER,當 slave 同步數(shù)據(jù)的時間超過了 MessageStoreConfig.syncFlushTimeout (默認5秒) 時會返回此狀態(tài)。此時只有主從都宕機,并且主也沒有刷盤時,消息才會丟失。

SLAVE_NOT_AVAILABLE

如果 broker 的 role 是 SYNC_MASTER,并且此時 slave 不可用時會返回該狀態(tài)。

SEND_OK

發(fā)送成功。為了保證消息不丟失還需要配置 SYNC_MASTER or SYNC_FLUSH。

12)消息重復

當發(fā)送消息時返回 FLUSH_DISK_TIMEOUT/FLUSH_SLAVE_TIMEOUT,若非常不幸的 broker 也宕機了,消息將會丟失。此時如果什么都不做,消息可能會丟失,如果重發(fā)消息,消息可能會出現(xiàn)重復。

通常我們建議發(fā)送端重發(fā)消息,由消費方來保證消息消費的冪等性。

1.2 消息發(fā)送失敗如何處理

Producer 的 send 方法本生支持內(nèi)部重試,重試邏輯如下:

至多重試3次 如果發(fā)送失敗,則輪轉到下一個broker 這個方法的總耗時時間不超過 sendMsgTimeout,默認3秒 所以發(fā)送消息已經(jīng)產(chǎn)生超時異常的話就不會再重試。以上策略仍不能保證消息發(fā)送一定成功,為保證消息發(fā)送一定成功,建議應用這么做:如果調(diào)用 send 同步發(fā)送失敗,則嘗試將消息存儲到db,由后臺線程定時重試,保證消息一定到達 Broker。

1.3 oneway 的發(fā)送形式

對于可靠性要求不高的應用,可以采用 oneway 的發(fā)送形式,oneway 形式不等待應答。

1.4 發(fā)送順序消息

順序消息分為分區(qū)有序和全局有序。

分區(qū)有序要求 producer 在send 時傳入 MessageQueueSelector 的實現(xiàn)類,最終將某一類消息發(fā)送到同一隊列。但是一旦發(fā)生通信異常、broker 重啟等,由于隊列總數(shù)發(fā)生變化,哈希取模后定位的隊列會變化,會產(chǎn)生短暫的順序不一致。如果業(yè)務能容忍在集群異常情況下(如某個 broker 宕機或者重啟)消息短暫的亂序,使用分區(qū)有序比較合適。

全局嚴格有序的消息即便在異常情況下也能保證消息的有序性,但是卻犧牲了分布式的 failover 特性,即 broker 集群中只有要一臺機器不可用,則整個集群都不可用,服務可用性會大大降低。

順序消息的缺點:

發(fā)送順序消息無法利用集群的 FailOver 特性 消費順序消息的并行度依賴于隊列數(shù)量 隊列熱點問題,個別隊列由于哈希不均導致消息過多,消費速度跟不上,產(chǎn)生消費堆積問題 遇到消費失敗的消息,無法跳過,當前隊列需要暫停 5.發(fā)送事務消息 目前暫不支持。

二. 消費者

2.1 消費者組和訂閱

不同的消費者組可以獨立消費相同的topic,這點類似于ActiveMQ的虛擬 topic. 另外對于相同的消費者組,需要確保組內(nèi)的消費者訂閱消息的規(guī)則是一致的!

MQ 里的一個Consumer Group 代表一個 Consumer 實例群組。對于大多數(shù)分布式應用來說,一個 Consumer Group 下通常會掛載多個 Consumer 實例。訂閱關系一致指的是同一個 Consumer Group 下所有 Consumer 實例的處理邏輯必須完全一致。一旦訂閱關系不一致,消息消費的邏輯就會混亂,甚至導致消息丟失。

由于 MQ 的訂閱關系主要由 Topic+Tag 共同組成,因此,保持訂閱關系一致意味著同一個 Consumer Group 下所有的實例需在以下兩方面均保持一致:

訂閱的 Topic 必須一致;訂閱的 Topic 中的 Tag 必須一致。

技術架構 > Consumer 最佳實踐 > image2017-11-15 15:50:13.png

2.2 MessageListener

1)順序消費 MessageListenerOrderly

順序消費時消費者會鎖定隊列,以確保消息被順序消費,但是這樣也會造成一定的性能損耗。當消費出現(xiàn)異常的時候,建議不要拋出異常,而是返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT,讓消費暫停一會,暫停時間由 context.setSuspendCurrentQueueTimeMillis 方法指定。

2)并發(fā)消費

并發(fā)消費是推薦的消費方式,在此種模式下,消息將被并發(fā)的消費。消費出現(xiàn)異常時不建議拋出異常,只需要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 即可。為了保證消息肯定被至少消費一次,消息將會被重發(fā)回 broker (topic不是原topic而是這個消費組的RETRY topic),在延遲的某個時間點(默認是10秒,業(yè)務可設置,通過 delayLevelWhenNextConsume 和 MessageStoreConfig.messageDelayLevel 設置)后,再次投遞到這個 ConsumerGroup,而如果一直這樣重復消費都持續(xù)失敗到一定次數(shù)(默認是16次,DefaultMQPushConsumer.maxReconsumeTimes),就會投遞到DLQ隊列。應用可以監(jiān)控死信隊列來做人工干預。

3)返回狀態(tài)

在并行消費時可以通過返回 RECONSUME_LATER 來告訴 Consumer 當前無法消費該消息,等延時一段時間再重新消費,但是此時消費不會停止,你可以繼續(xù)消費其他消息。但在順序消費時,因為要保證消費的順序性,所以你不能跳過失敗的消息,此時你可以通過返回 SUSPEND_CURRENT_QUEUE_A_MOMENT 來告訴 Consumer 先暫停一會。

4)阻塞

不建議阻塞Listener,因為這會阻塞住線程池,同時也有可能造成消費者線程終止。

2.3 線程數(shù)

consumer 內(nèi)部通過一個 ThreadPoolExecutor 來消費消息,可以通過 setConsumeThreadMin 和 setConsumeThreadMax 來改變線程池的大小。

2.4 ConsumeFromWhere

當新實例啟動的時候,PushConsumer會拿到本消費組broker已經(jīng)記錄好的消費進度(consumer offset),按照這個進度發(fā)起自己的第一次Pull請求。

如果這個消費進度在Broker并沒有存儲起來,證明這個是一個全新的消費組,這時候客戶端有幾個策略可以選擇:

CONSUME_FROM_LAST_OFFSET //默認策略,從該隊列最尾開始消費,即跳過歷史消息。

CONSUME_FROM_FIRST_OFFSET //從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍。

CONSUME_FROM_TIMESTAMP//從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時以前 注意:這些配置只對全新的消費組有效,老的消費組都是按已經(jīng)存儲過的消費進度繼續(xù)消費。

對于老消費組想跳過歷史消息可以采用以下幾種方法:

1)判斷消息的發(fā)送時間,太老的消息直接返回 CONSUME_SUCCESS。

2)判斷消息的 offset 和 MAX_OFFSET 的差距,如果落后太多,可以直接。返回 CONSUME_SUCCESS。

3)消費者啟動前,先調(diào)整該消費組的消費進度,再開始消費??梢匀斯な褂妹?resetOffsetByTimeStamp,詳見 ResetOffsetByTimeCommand.java。

2.5 消息冪等

由于 RocketMQ 無法避免消費重復,所以如果業(yè)務對消息重復非常敏感,務必在業(yè)務層面去重。

2.6 消費速度慢處理方式

1)提高消費并行度

大部分消息消費行為都屬于 IO 密集型業(yè)務,適當?shù)奶岣卟l(fā)度可以顯著的改善消費的吞吐量。

2)批量方式消費

默認情況下 consumer 的 consumeMessageBatchMaxSize 為1,即一次只消費一個消息,如果應用可以批量消費消息,則可以很大程度上提高消費吞吐量。

3)跳過非重要消息

當消堆積嚴重時可以丟棄不重要的消息。

4)優(yōu)化消息消費過程

2.7 打印消費日志

建議在消費入口方法打印消息,方便后續(xù)排查問題,消費失敗時也打印失敗日志。

2.8 利用broker過濾消息,避免多余的消息傳輸

三. 小結

好了,RocketMQ生產(chǎn)者與消費者的使用事項就總結完畢了,相信大家對RocketMQ的使用應該會更有信心了。

責任編輯:武曉燕 來源: Java技術指北
相關推薦

2024-02-04 00:00:00

Effect數(shù)據(jù)組件

2024-01-02 12:05:26

Java并發(fā)編程

2024-10-11 09:15:33

2023-07-03 07:20:50

2023-03-26 22:31:29

2022-12-06 07:53:33

MySQL索引B+樹

2022-12-06 08:37:43

2022-04-26 08:41:54

JDK動態(tài)代理方法

2022-04-13 09:01:45

SASSCSS處理器

2023-08-08 08:23:08

Spring日志?線程池

2023-09-06 11:31:24

MERGE用法SQL

2024-09-10 10:34:48

2024-12-31 00:08:37

C#語言dynamic?

2023-03-09 07:38:58

static關鍵字狀態(tài)

2023-05-18 09:01:11

MBRGPT分區(qū)

2024-08-12 08:12:38

2024-10-12 10:25:15

2022-08-09 08:25:44

Stream創(chuàng)建流流計算

2023-01-10 08:43:15

定義DDD架構

2023-07-26 13:11:21

ChatGPT平臺工具
點贊
收藏

51CTO技術棧公眾號