自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

RabbitMQ都寫了,RocketMQ怎么能落下?

網(wǎng)絡(luò) 網(wǎng)絡(luò)管理
整體架構(gòu)最近看到了我在Github上寫的rabbitmq-examples陸續(xù)被人star了,就想著寫個rocketmq-examples。對rabbitmq感興趣的小伙伴可以看我之前的文章。下面把RocketMQ的各個特性簡單介紹一下,這樣在用的時候心里也更有把握。

 整體架構(gòu)最近看到了我在Github上寫的rabbitmq-examples陸續(xù)被人star了,就想著寫個rocketmq-examples。對rabbitmq感興趣的小伙伴可以看我之前的文章。下面把RocketMQ的各個特性簡單介紹一下,這樣在用的時候心里也更有把握。

[[335056]]

全網(wǎng)最全RabbitMQ總結(jié),別再說你不會RabbitMQ

RocketMQ是阿里自研的消息中間件,RocketMQ的整體架構(gòu)如下


主要有4個角色

 

Producer:消息生產(chǎn)者。類似,發(fā)信者 Consumer:消息消費(fèi)者。類似,收信者 BrokerServer:消息的存儲,投遞,查詢。類似,郵局 NameServer:注冊中心,支持Broker的動態(tài)注冊與發(fā)現(xiàn)。類似,郵局的管理結(jié)構(gòu)

再介紹幾個基本概念

Topic(主題):一類消息的集合,Topic和消息是一對多的關(guān)系。每個Broker可以存儲多個Topic的消息,每個Topic也可以分片存儲于不同的Broker

Tag(標(biāo)簽):在Topic類別下的二級子類別。如財(cái)務(wù)系統(tǒng)的所有消息的Topic為Finance_Topic,創(chuàng)建訂單消息的Tag為Create_Tag,關(guān)閉訂單消息的Tag為Close_Tag。這樣就能根據(jù)Tag消費(fèi)不同的消息,當(dāng)然你也可以為創(chuàng)建訂單和關(guān)閉訂單的消息各自創(chuàng)建一個Topic

Message Queue(消息隊(duì)列):相當(dāng)于Topic的分區(qū),用于并行發(fā)送和消費(fèi)消息。Message Queue在Broker上,一個Topic默認(rèn)的Message Queue的數(shù)量為4

Producer Group(生產(chǎn)者組):同一類Producer的集合。如果發(fā)送的是事務(wù)消息且原始生產(chǎn)者在發(fā)送之后崩潰,Broker會聯(lián)系統(tǒng)一生產(chǎn)者組內(nèi)的其他生產(chǎn)者實(shí)例以提交或回溯消費(fèi)

Consumer Group(消費(fèi)者組):同一類Consumer的集合。消費(fèi)者組內(nèi)的實(shí)例必須訂閱完全相同的Topic

Clustering(集群消費(fèi)):相同Consumer Group下的每個Consumer實(shí)例平均分?jǐn)傁?/p>

Broadcasting(廣播消費(fèi)):相同Consumer Group的每個Consumer實(shí)例都接收全量的消息

用圖演示一下Clustering和Broadcasting的區(qū)別

如果我有一條訂單程成交的消息,財(cái)務(wù)系統(tǒng)和物流系統(tǒng)都要同時訂閱消費(fèi)這條消息,該怎么辦呢?定義2個Consumer Group即可


Consumer1和Consumer2屬于一個Consumer Group,Consumer3和Consumer4屬于一個Consumer Group,消息會全量發(fā)送到這2個Consuemr Group,至于這2個Consumer Group是集群消費(fèi)還是廣播消費(fèi),自己定義即可

 

工作流程在官方文檔寫的很詳細(xì),不再深入了

https://github.com/apache/rocketmq/tree/master/docs/cn

 

Message消息的各種處理方式涉及到的內(nèi)容較多,所以我就不在文章中放代碼了,直接放GitHub了,目前還在不斷完善中

地址為:https://github.com/erlieStar/rocketmq-examples,

和之前的RabbitMQ一個風(fēng)格,基本上所有知識點(diǎn)都涉及到了

地址為:https://github.com/erlieStar/rabbitmq-example

每個消息必須屬于一個Topic。RocketMQ中每個消息具有唯一的Message Id,且可以攜帶具有業(yè)務(wù)標(biāo)識的Key,我們可以通過Topic,Message Id或Key來查詢消息

 

消息消費(fèi)的方式

  1. Pull(拉取式消費(fèi)),Consumer主動從Broker拉取消息
  2. Push(推送式消費(fèi)),Broker收到數(shù)據(jù)后會主動推送給Consumer,實(shí)時性較高

消息的過濾方式

  • 指定Tag
  • SQL92語法過濾

消息的發(fā)送方式

  • 同步,收到響應(yīng)后才會發(fā)送下一條消息
  • 異步,一直發(fā),用異步的回調(diào)函數(shù)來獲取結(jié)果
  • 單向(只管發(fā),不管結(jié)果)

消息的種類

  • 順序消息
  • 延遲消息
  • 批量消息
  • 事務(wù)消息

順序消息

順序消息分為局部有序和全局有序

官方介紹為普通順序消息和嚴(yán)格順序消息

局部有序:同一個業(yè)務(wù)相關(guān)的消息是有序的,如針對同一個訂單的創(chuàng)建和付款消息是有序的,只需要在發(fā)送的時候指定message queue即可,如下所示,將同一個orderId對應(yīng)的消息發(fā)送到同一個隊(duì)列

  1. SendResult sendResult = producer.send(message, new MessageQueueSelector() { 
  2.  /** 
  3.   * @param mqs topic對應(yīng)的message queue 
  4.   * @param msg send方法傳入的message 
  5.   * @param arg send方法傳入的orderId 
  6.   */ 
  7.  @Override 
  8.  public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { 
  9.   // 根據(jù)業(yè)務(wù)對象選擇對應(yīng)的隊(duì)列 
  10.   Integer orderId = (Integer) arg; 
  11.   int index = orderId % mqs.size(); 
  12.   return mqs.get(index); 
  13.  } 
  14. }, orderId); 

消費(fèi)者所使用的Listener必須是MessageListenerOrderly(對于一個隊(duì)列的消息采用一個線程去處理),而平常的話我們使用的是MessageListenerConcurrently

全局有序:要想實(shí)現(xiàn)全局有序,則Topic只能有一個message queue。

延遲消息

RocketMQ并不支持任意時間的延遲,需要設(shè)置幾個固定的延時等級,從1s到2h分別對應(yīng)著等級1到18

  1. // org.apache.rocketmq.store.config.MessageStoreConfig  
  2. private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" 

批量消息

批量發(fā)送消息能顯著提高傳遞小消息的性能,限制是這批消息應(yīng)該有相同的topic,相同的waitStoreMsgOK,而且不能是延時消息,一批消息的總大小不應(yīng)超過1MB

事務(wù)消息

事務(wù)在實(shí)際的業(yè)務(wù)場景中還是經(jīng)常遇到的,以轉(zhuǎn)賬為例子

張三給李四轉(zhuǎn)賬100元,可以分為如下2步

  1. 張三的賬戶減去100元
  2. 李四的賬戶加上100元

這2個操作要是同時成功,要是同時失敗,不然會造成數(shù)據(jù)不一致的情況,基于單個數(shù)據(jù)庫Connection時,我們只需要在方法上加上@Transactional注解就可以了。

如果基于多個Connection(如服務(wù)拆分,數(shù)據(jù)庫分庫分表),加@Transactional此時就不管用了,就得用到分布式事務(wù)

分布式事務(wù)的解決方案很多,RocketMQ只是其中一種方案,RocketMQ可以保證最終一致性

RocketMQ實(shí)現(xiàn)分布式事務(wù)的流程如下

 

  1. producer向mq server發(fā)送一個半消息
  2. mq server將消息持久化成功后,向發(fā)送方確認(rèn)消息已經(jīng)發(fā)送成功,此時消息并不會被consumer消費(fèi)
  3. producer開始執(zhí)行本地事務(wù)邏輯
  4. producer根據(jù)本地事務(wù)執(zhí)行結(jié)果向mq server發(fā)送二次確認(rèn),mq收到commit狀態(tài),將消息標(biāo)記為可投遞,consumer會消費(fèi)該消息。mq收到rollback則刪除半消息,consumer將不會消費(fèi)該消息,如果收到unknow狀態(tài),mq會對消息發(fā)起回查
  5. 在斷網(wǎng)或者應(yīng)用重啟等特殊情況下,步驟4提交的2次確認(rèn)有可能沒有到達(dá)mq server,經(jīng)過固定時間后mq會對該消息發(fā)起回查
  6. producer收到回查后,需要檢查本地事務(wù)的執(zhí)行狀態(tài)
  7. producer根據(jù)本地事務(wù)的最終狀態(tài),再次提交二次確認(rèn),mq仍按照步驟4對半消息進(jìn)行操作

理解了原理,看代碼實(shí)現(xiàn)就很容易了,放一個官方的example

  1. public class TransactionListenerImpl implements TransactionListener { 
  2.  
  3.     private AtomicInteger index = new AtomicInteger(0); 
  4.  
  5.     private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); 
  6.  
  7.     @Override 
  8.     public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { 
  9.         int value = index.getAndIncrement(); 
  10.         int status = value % 3; 
  11.         localTrans.put(msg.getTransactionId(), status); 
  12.         return LocalTransactionState.UNKNOW; 
  13.     } 
  14.  
  15.     @Override 
  16.     public LocalTransactionState checkLocalTransaction(MessageExt msg) { 
  17.         Integer status = localTrans.get(msg.getTransactionId()); 
  18.         if (status != null) { 
  19.             switch (status) { 
  20.                 case 0: 
  21.                     return LocalTransactionState.UNKNOW; 
  22.                 case 1: 
  23.                     return LocalTransactionState.COMMIT_MESSAGE; 
  24.                 case 2: 
  25.                     return LocalTransactionState.ROLLBACK_MESSAGE; 
  26.                 default
  27.                     return LocalTransactionState.COMMIT_MESSAGE; 
  28.             } 
  29.         } 
  30.         return LocalTransactionState.COMMIT_MESSAGE; 
  31.     } 

實(shí)現(xiàn)分布式事務(wù)需要實(shí)現(xiàn)TransactionListener接口,2個方法的作用如下

  1. executeLocalTransaction,執(zhí)行本地事務(wù)
  2. checkLocalTransaction,回查本地事務(wù)狀態(tài)

針對這個例子,所有的消息都會回查,因?yàn)榉祷氐亩际荱NKNOW,回查的時候status=1的數(shù)據(jù)會被消費(fèi),status=2的數(shù)據(jù)會被刪除,status=0的數(shù)據(jù)會一直回查,直到超過默認(rèn)的回查次數(shù)。

發(fā)送方代碼如下

  1. public class TransactionProducer { 
  2.  
  3.     public static final String RPODUCER_GROUP_NAME = "transactionProducerGroup"
  4.     public static final String TOPIC_NAME = "transactionTopic"
  5.     public static final String TAG_NAME = "transactionTag"
  6.  
  7.     public static void main(String[] args) throws Exception { 
  8.         TransactionListener transactionListener = new TransactionListenerImpl(); 
  9.         TransactionMQProducer producer = new TransactionMQProducer(RPODUCER_GROUP_NAME); 
  10.  
  11.         ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, 
  12.                 new ArrayBlockingQueue<>(100), new ThreadFactory() { 
  13.  
  14.             @Override 
  15.             public Thread newThread(Runnable r) { 
  16.                 Thread thread = new Thread(); 
  17.                 thread.setName("transaction-msg-check-thread"); 
  18.                 return thread; 
  19.             } 
  20.         }); 
  21.         producer.setExecutorService(executorService); 
  22.         producer.setTransactionListener(transactionListener); 
  23.         producer.start(); 
  24.  
  25.         for (int i = 0; i < 100; i++) { 
  26.             Message message = new Message(TOPIC_NAME, TAG_NAME, 
  27.                     ("hello rocketmq " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); 
  28.             SendResult sendResult = producer.send(message); 
  29.             System.out.println(sendResult); 
  30.         } 
  31.  
  32.         TimeUnit.HOURS.sleep(1); 
  33.         producer.shutdown(); 
  34.     } 

看到這,可能有人會問了,我們先執(zhí)行本地事務(wù),執(zhí)行成功后再發(fā)送消息,這樣可以嗎?

其實(shí)這樣做還是有可能會造成數(shù)據(jù)不一致的問題。假如本地事務(wù)執(zhí)行成功,發(fā)送消息,由于網(wǎng)絡(luò)延遲,消息發(fā)送成功,但是回復(fù)超時了,拋出異常,本地事務(wù)回滾。但是消息其實(shí)投遞成功并被消費(fèi)了,此時就會造成數(shù)據(jù)不一致的情況

那消息投遞到mq server,consumer消費(fèi)失敗怎么辦?

如果是消費(fèi)超時,重試即可。如果是由于代碼等原因真的消費(fèi)失敗了,此時就得人工介入,重新手動發(fā)送消息,達(dá)到最終一致性。

消息重試

發(fā)送端重試

producer向broker發(fā)送消息后,沒有收到broker的ack時,rocketmq會自動重試。重試的次數(shù)可以設(shè)置,默認(rèn)為2次

  1. DefaultMQProducer producer = new DefaultMQProducer(RPODUCER_GROUP_NAME); 
  2. // 同步發(fā)送設(shè)置重試次數(shù)為5次 
  3. producer.setRetryTimesWhenSendFailed(5); 
  4. // 異步發(fā)送設(shè)置重試次數(shù)為5次 
  5. producer.setRetryTimesWhenSendAsyncFailed(5); 

消費(fèi)端重試

順序消息的重試

對于順序消息,當(dāng)Consumer消費(fèi)消息失敗后,RocketMQ會不斷進(jìn)行消息重試,此時后續(xù)消息會被阻塞。所以當(dāng)使用順序消息的時候,監(jiān)控一定要做好,避免后續(xù)消息被阻塞

無序消息的重試

當(dāng)消費(fèi)模式為集群模式時,Broker才會自動進(jìn)行重試,對于廣播消息是不會進(jìn)行重試的

當(dāng)consumer消費(fèi)消息后返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS表明消費(fèi)消息成功,不會進(jìn)行重試

當(dāng)consumer符合如下三種場景之一時,會對消息進(jìn)行重試

  • 返回ConsumeConcurrentlyStatus.RECONSUME_LATER
  • 返回null
  • 主動或被動拋出異常

RocketMQ默認(rèn)每條消息會被重試16次,超過16次則不再重試,會將消息放到死信隊(duì)列,當(dāng)然我們也可以自己設(shè)置重試次數(shù)

每次重試的時間間隔如下

第幾次重試 與上次間隔時間 第幾次重試 與上次間隔時間
1 10s 10 7分鐘
2 30s 11 8分鐘
3 1分鐘 12 9分鐘
4 2分鐘 13 10分鐘
5 3分鐘 14 20分鐘
6 4分鐘 15 30分鐘
7 5分鐘 16 1小時
8 6分鐘 17 2小時

重試隊(duì)列和死信隊(duì)列

當(dāng)消息消費(fèi)失敗,會被發(fā)送到重試隊(duì)列

當(dāng)消息消費(fèi)失敗,并達(dá)到最大重試次數(shù),rocketmq并不會將消息丟棄,而是將消息發(fā)送到死信隊(duì)列

死信隊(duì)列有如下特點(diǎn)

  1. 里面存的是不能被正常消費(fèi)的消息
  2. 有效期與正常消息相同,都是3天,3天后會被刪除
  3. 每個死信隊(duì)列對應(yīng)一個Consumer Group ID,即死信隊(duì)列是消費(fèi)者組級別的
  4. 如果一個Consumer Group沒有產(chǎn)生死信消息,則RocketMQ不會創(chuàng)建對應(yīng)的死信隊(duì)列
  5. 死信隊(duì)列包含了一個Consumer Group下的所有死信消息,不管該消息屬于哪個Topic
  6. 重試隊(duì)列的命名為 %RETRY%消費(fèi)組名稱 死信隊(duì)列的命名為 %DLQ%消費(fèi)組名稱

RocketMQ高性能和高可用的方式

整體架構(gòu)

rocketmq是通過broker主從機(jī)制來實(shí)現(xiàn)高可用的。相同broker名稱,不同brokerid的機(jī)器組成一個broker組,brokerId=0表明這個broker是master,brokerId>0表明這個broker是slave。

 

消息生產(chǎn)的高可用:創(chuàng)建topic時,把topic的多個message queue創(chuàng)建在多個broker組上。這樣當(dāng)一個broker組的master不可用后,producer仍然可以給其他組的master發(fā)送消息。rocketmq目前還不支持主從切換,需要手動切換

消息消費(fèi)的高可用:consumer并不能配置從master讀還是slave讀。當(dāng)master不可用或者繁忙的時候consumer會被自動切換到從slave讀。這樣當(dāng)master出現(xiàn)故障后,consumer仍然可以從slave讀,保證了消息消費(fèi)的高可用

消息存儲結(jié)構(gòu)

RocketMQ需要保證消息的高可靠性,所以要將數(shù)據(jù)通過磁盤進(jìn)行持久化存儲。

將數(shù)據(jù)存到磁盤會不會很慢?其實(shí)磁盤有時候比你想象的快,有時候比你想象的慢。目前高性能磁盤的順序?qū)懰俣瓤梢赃_(dá)到600M/s,而磁盤的隨機(jī)寫大概只有100k/s,和順序?qū)懙男阅芟嗖?000倍,所以RocketMQ采用順序?qū)憽?/p>

并且通過mmap(零拷貝的一種實(shí)現(xiàn)方式,零拷貝可以省去用戶態(tài)到內(nèi)核態(tài)的數(shù)據(jù)拷貝,提高速度)具體原理并不是很懂,有興趣的小伙伴可以看看相關(guān)書籍

總而言之,RocketMQ通過順序?qū)懞土憧截惣夹g(shù)實(shí)現(xiàn)了高性能的消息存儲

和消息相關(guān)的文件有如下幾種

 

  • CommitLog:存儲消息的元數(shù)據(jù)
  • ConsumerQueue:存儲消息在CommitLog的索引
  • IndexFile:提供了一種通過key或者時間區(qū)間來查詢消息的方法

刷盤機(jī)制

 

  1. 同步刷盤:消息被寫入內(nèi)存的PAGECACHE,返回寫成功狀態(tài),當(dāng)內(nèi)存里的消息量積累到一定程度時,統(tǒng)一觸發(fā)寫磁盤操作,快速寫入 。吞吐量低,但不會造成消息丟失
  2. 異步刷盤:消息寫入內(nèi)存的PAGECACHE后,立刻通知刷盤線程刷盤,然后等待刷盤完成,刷盤線程執(zhí)行完成后喚醒等待的線程,給應(yīng)用返回消息寫成功的狀態(tài)。吞吐量高,當(dāng)磁盤損壞時,會丟失消息

主從復(fù)制

如果一個broker有master和slave時,就需要將master上的消息復(fù)制到slave上,復(fù)制的方式有兩種

  • 同步復(fù)制:master和slave均寫成功,才返回客戶端成功。maste掛了以后可以保證數(shù)據(jù)不丟失,但是同步復(fù)制會增加數(shù)據(jù)寫入延遲,降低吞吐量
  • 異步復(fù)制:master寫成功,返回客戶端成功。擁有較低的延遲和較高的吞吐量,但是當(dāng)master出現(xiàn)故障后,有可能造成數(shù)據(jù)丟失

負(fù)載均衡

Producer負(fù)載均衡

producer在發(fā)送消息時,默認(rèn)輪詢所有queue,消息就會被發(fā)送到不同的queue上。而queue可以分布在不同的broker上

 

Consumer負(fù)載均衡

默認(rèn)的分配算法是AllocateMessageQueueAveragely,如下圖

還有另外一種平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分?jǐn)俼ueue,只是以環(huán)狀輪流分queue的形式,如下圖:


如果consumer數(shù)量比message queue還多,則多會來的consumer會被閑置。所以不要讓consumer的數(shù)量多于message queue的數(shù)量

 

圖形化管理工具

在rocketmq-externals這個項(xiàng)目中提供了rocketmq的很多擴(kuò)展工具

github地址如下:https://github.com/apache/rocketmq-externals

其中有一個子項(xiàng)目rocketmq-console提供了rocketmq的圖像化工具,提供了很多實(shí)用的功能,如前面說的通過Topic,Message Id或Key來查詢消息,重新發(fā)送消息等,還是很方便的

本文轉(zhuǎn)載自微信公眾號「Java識堂」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系Java識堂公眾號。

 

責(zé)任編輯:武曉燕 來源: Java識堂
相關(guān)推薦

2021-08-27 14:14:39

ThreadLocal源碼操作

2025-01-02 08:31:33

2024-10-15 09:27:36

2022-07-27 22:48:29

消息中間件RocketMQ架構(gòu)設(shè)計(jì)

2024-02-06 07:56:20

數(shù)據(jù)庫分布式數(shù)據(jù)庫架構(gòu)產(chǎn)品

2018-08-14 10:04:52

搜狗

2023-03-10 08:00:03

KafkaActiveMQ

2021-11-04 12:42:55

RocketMQ啟動消費(fèi)

2019-04-11 10:26:15

架構(gòu)運(yùn)維技術(shù)

2022-10-08 09:33:00

平臺中間件

2016-11-10 21:00:49

消息存儲數(shù)據(jù)

2023-10-24 07:50:18

消息中間件MQ

2023-09-18 08:27:20

RabbitMQRocketMQKafka

2023-03-09 10:22:00

SpringBootRabbitMQ

2015-06-19 07:30:35

AWS遷移遺留應(yīng)用云遷移

2022-02-07 12:10:01

消息

2009-11-27 11:00:36

運(yùn)行VS2003

2024-11-20 08:09:19

RabbitMQ項(xiàng)目客戶端

2024-11-05 10:24:50

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號