自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

RocketMQ Consumer 啟動(dòng)時(shí)都干了些啥?

開發(fā) 架構(gòu)
關(guān)于 RocketMQ 的一些基礎(chǔ)概念、一些底層實(shí)現(xiàn)之前都已在文章 RocketMQ基礎(chǔ)概念剖析&源碼解析 中寫過了,沒有相關(guān)上下文的可以先去補(bǔ)齊一部分。

[[433300]]

可能我們對(duì) RocketMQ 的消費(fèi)者認(rèn)知乍一想很簡單,就是一個(gè)拿來消費(fèi)消息的客戶端而已,你只需要指定對(duì)應(yīng)的 Topic 和 ConsumerGroup,剩下的就是只需要:

  • 接收消息
  • 處理消息

就完事了。

簡略消費(fèi)模型

當(dāng)然,可能在實(shí)際業(yè)務(wù)場景下,確實(shí)是這樣。但是如果我們不清楚 Consumer 啟動(dòng)之后到底會(huì)做些什么,底層的實(shí)現(xiàn)的一些細(xì)節(jié),在面對(duì)復(fù)雜業(yè)務(wù)場景時(shí),排查起來就會(huì)如同大海撈針般迷茫。

相反,你如果了解其中的細(xì)節(jié),那么在排查問題時(shí)就會(huì)有更多的上下文,就有可能會(huì)提出更多的解決方案。

關(guān)于 RocketMQ 的一些基礎(chǔ)概念、一些底層實(shí)現(xiàn)之前都已在文章 RocketMQ基礎(chǔ)概念剖析&源碼解析 中寫過了,沒有相關(guān)上下文的可以先去補(bǔ)齊一部分。

簡單示例

整體邏輯

首先我們還是從一個(gè)簡單的例子來看一下,RocketMQ Consumer 的基本使用。從使用入手,一點(diǎn)點(diǎn)了解細(xì)節(jié)。

  1. public class Consumer { 
  2.  
  3.     public static void main(String[] args) throws InterruptedException, MQClientException { 
  4.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); 
  5.  
  6.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); 
  7.  
  8.         consumer.subscribe("TopicTest""*"); 
  9.  
  10.         consumer.registerMessageListener(new MessageListenerConcurrently() { 
  11.  
  12.             @Override 
  13.             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, 
  14.                 ConsumeConcurrentlyContext context) { 
  15.                 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); 
  16.                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 
  17.             } 
  18.         }); 
  19.  
  20.         consumer.start(); 
  21.  
  22.         System.out.printf("Consumer Started.%n"); 
  23.     } 

代碼看著肯定有些難度,下面的流程圖和上面的代碼邏輯等價(jià),可以結(jié)合著一起看。

Consumer使用示例

消費(fèi)點(diǎn)策略

這里除了像 Topic、注冊(cè)消息監(jiān)聽器這種常規(guī)的內(nèi)容之外,setConsumeFromWhere 值得我們更多的關(guān)注。它決定了消費(fèi)者將從哪里開始消費(fèi),可選的值有三個(gè):

三個(gè)可選的 ConsumeFromWhere 的值

實(shí)際上 ConsumeFromWhere 的枚舉類源碼中還有另外三個(gè)值,但是已經(jīng)被棄用了。但是這個(gè)配置僅對(duì)新的 ConsumerGroup 有效,已經(jīng)存在的 ConsumerGroup 會(huì)繼續(xù)按照上次消費(fèi)到的 Offset 繼續(xù)消費(fèi)。

其實(shí)也很好理解,假設(shè)有 1000 條消息,你的服務(wù)已經(jīng)消費(fèi)到了 500 條了,然后你上線新的東西將服務(wù)重新啟動(dòng),然后又從頭開始消費(fèi)了?這不扯嗎?

緩存訂閱的 Topic 信息

看起來就一行 consumer.subscribe("TopicTest", "*"),實(shí)際上背后做了很多事情,這里先給大家把簡單的流程畫出來。

subscribe_topic

subscribe 函數(shù)的第一個(gè)參數(shù)就是我們需要消費(fèi)的 Topic,這個(gè)自不必多說。第二個(gè)參數(shù)說復(fù)雜點(diǎn)叫過濾表達(dá)式字符串,說簡單點(diǎn)其實(shí)就是你要訂閱的消息的 Tag。

每個(gè)消息都會(huì)有一個(gè)自己的 Tag 這個(gè)如果你不清楚的話,可以考慮去看看上面那篇文章

這里我們傳的是 *,代表訂閱所有類別的消息。當(dāng)然我們也可以傳入 tagA || tagB || tagC 這種,代表我們只消費(fèi)打了這三種 Tag 的消息。

RocketMQ 會(huì)根據(jù)我們傳入的這兩個(gè)參數(shù),構(gòu)造出 SubscriptionData ,放入一個(gè)位于內(nèi)存的 ConcurrentHashMap 中維護(hù)起來,簡單來說就一句話,把這個(gè)訂閱的 Topic 緩存下來。

在緩存完之后會(huì)進(jìn)行一個(gè)比較關(guān)鍵的操作,那就是開始向所有的 Broker 發(fā)送心跳。Consumer 客戶端會(huì)將:

  • 消費(fèi)者的名稱
  • 消費(fèi)類型 代表是通過 Push 或者 Pull 的模式消費(fèi)消息
  • 消費(fèi)模型 指集群消費(fèi)(CLUSTERING)或者是廣播消費(fèi)(BROADCASTING)
  • 消費(fèi)點(diǎn)策略 也就是類似 CONSUME_FROM_LAST_OFFSET 這種
  • 消費(fèi)者的訂閱數(shù)據(jù)集合 一個(gè)消費(fèi)者可以監(jiān)聽多個(gè) Topic
  • 生產(chǎn)者的集合 當(dāng)前實(shí)例上注冊(cè)的生產(chǎn)的集合

沒錯(cuò),在 Consumer 實(shí)例啟動(dòng)之后還會(huì)去運(yùn)行 Producer 的相關(guān)代碼。此外,如果一個(gè)客戶端即沒有配置生產(chǎn)者、也沒有配置消費(fèi)者,那么是不會(huì)執(zhí)行心跳的邏輯的,因?yàn)闆]有意義。

啟動(dòng)消費(fèi)者實(shí)例

上文提到的核心邏輯其實(shí)都在這里,我們?cè)谙旅嬖敿?xì)討論,所以簡單示例到這里就結(jié)束了。

進(jìn)入啟動(dòng)核心邏輯

在啟動(dòng)的核心入口類中,總共對(duì) 4 種狀態(tài)進(jìn)行了分別處理,分別是:

  • CREATE_JUST
  • RUNNING
  • START_FAILED
  • SHUTDOWN_ALREADY

但我們由于是剛剛創(chuàng)建,會(huì)走到 CREATE_JUST 的邏輯中來,我們就重點(diǎn)來看 Consumer 剛剛啟動(dòng)時(shí)會(huì)做些什么。

檢查配置

基操,跟我們平時(shí)寫的業(yè)務(wù)代碼沒有什么兩樣,檢查配置中的各種參數(shù)是否合法。

配置項(xiàng)太多了就不贅述,大家只需要知道 RocketMQ 啟動(dòng)的時(shí)候會(huì)對(duì)配置中的參數(shù)進(jìn)行校驗(yàn)就知道了。

算了,還是列一列吧:

  • 消費(fèi)者組的名稱是不是空
  • 消費(fèi)者組的名稱不能是被 RocketMQ 保留使用的名稱,即 —— DEFAULT_CONSUMER
  • 消費(fèi)模型(CLUSTERING、BROADCASTING)是否有配置
  • 消費(fèi)點(diǎn)策略(例如 CONSUME_FROM_LAST_OFFSET)是否配置
  • 判斷消費(fèi)的方式是否合法,只能是順序消費(fèi)或者并發(fā)消費(fèi)
  • 消費(fèi)者組的最小消費(fèi)線程、最大消費(fèi)線程數(shù)量是否在規(guī)定的范圍內(nèi),這個(gè)范圍是指(1, 1000),左開右開。還有就是最小不能大于最大這種判斷
  • ......等等等等

所以你看到了, 即使是牛X的開源框架也會(huì)有這種繁瑣的、常見的業(yè)務(wù)代碼。

改變實(shí)例名稱

instanceName 會(huì)從系統(tǒng)的配置項(xiàng) rocketmq.client.name 中獲取,如果沒有配置就會(huì)設(shè)置為 DEFAULT。,并且消費(fèi)模型是 CLUSTERING(默認(rèn)情況就是),就會(huì)將 DEFAULT 改成 ${PID}#${System.nanoTime()} 的字符串,這里舉個(gè)例子。

  1. instanceName = "90762#75029316672643" 

為什么要單獨(dú)把這個(gè)提出來講呢?這相當(dāng)于是給每個(gè)實(shí)例一個(gè)唯一標(biāo)識(shí),這個(gè)唯一標(biāo)識(shí)其實(shí)很重要,如果一個(gè)消費(fèi)者組的 instanceName 相同,那么可能就會(huì)造成重復(fù)消費(fèi)、或者消息堆積的問題的問題,造成消息堆積的這個(gè)點(diǎn)比較有意思,后續(xù)我有時(shí)間應(yīng)該會(huì)單獨(dú)寫一篇文章來討論。

但眼尖的同學(xué)可能已經(jīng)看到了,instanceName 的組成不是 PID 和 System.nanoTime?PID 可能由于獲取的是 Docker 容器宿主機(jī)器的 PID,可能是一樣的,可以理解。那 System.nanoTime 呢?這也能重復(fù)?

實(shí)際上從 RocketMQ 的 Github 這個(gè)提交記錄來看,至少在 2021年3月16號(hào)之前,這個(gè)問題還是有可能存在的。

RocketMQ 官方 Github 的提交記錄

RocketMQ 官方在 3月16號(hào)的提交修復(fù)了這個(gè)問題,給大家看看改了啥:

提交具體內(nèi)容

在原來的版本中,instanceName 就只由 PID 組成,就完全可能造成不同的消費(fèi)者實(shí)例擁有相同的 instanceName。

熟悉的 RocketMQ 的同學(xué)有疑問,在 Broker 側(cè)對(duì) Consumer 的唯一標(biāo)識(shí)不是 clientID 嗎?沒錯(cuò),但 clientID 是由 clientIP 和 instanceName 一起組成的。

而 clientIP 上面也提到過了,可能由于 Docker 的原因獲取到相同的,會(huì)最終導(dǎo)致 clientID 相同。

OK,關(guān)于改變實(shí)例的名稱就到這,確實(shí)沒想到講了這么多。

實(shí)例化消費(fèi)者

關(guān)鍵變量名為 mQClientFactory

接下來就會(huì)實(shí)例化消費(fèi)者實(shí)例,在上面 改變實(shí)例名稱 中講到的 clientID 就是在這一步做的初始化。這里就不給大家列源碼了,你就需要知道這個(gè)地方會(huì)實(shí)例化出來一個(gè)消費(fèi)者就 OK 了,不要過多的糾結(jié)于細(xì)節(jié)。

然后會(huì)給 Rebalance 的實(shí)現(xiàn)設(shè)置上一些屬性,例如消費(fèi)者組名稱、消息模型、Rebalance 采取的策略、剛剛實(shí)例化出來的消費(fèi)者實(shí)例。

這個(gè) Rebalance 的策略默認(rèn)為:

AllocateMessageQueueAveragely 就是一個(gè)把 Messsage Queue 平均分配給消費(fèi)者的策略,更多的細(xì)節(jié)也可以參考我上面的那篇文章。

除此之外,還會(huì)初始化拉取消息的核心實(shí)現(xiàn) PullAPIWrapper。

初始化 offsetStore

這里會(huì)根據(jù)不同的消息模型(即 BROADCASTING 或者 CLUSTERING),實(shí)例化不同的 offsetStore 實(shí)現(xiàn)。

  • BROADCASTING 采用的實(shí)現(xiàn)為 LocalFileOffsetStore
  • CLUSTERING 采用的實(shí)現(xiàn)為 RemoteBrokerOffsetStore

區(qū)別就是 LocalFileOffsetStore 是在本地管理 Offset,而 RemoteBrokerOffsetStore 則是將 offset 交給 Broker 進(jìn)行原

啟動(dòng) ConsumeMessageService

緩存消費(fèi)者組

接下來會(huì)將消費(fèi)者組在當(dāng)前的客戶端實(shí)例中緩存起來,具體是在一個(gè)叫 consumerTable 的內(nèi)存 concurrentHashMap 中。

其實(shí)源碼中叫 registerConsumer:

registerConsumer 源碼

但我認(rèn)為給大家「翻譯」成緩存更合理,因?yàn)樗椭皇前褬?gòu)建好的 consumer 實(shí)例給緩存到 map 中,僅此而已。哦對(duì),還做了個(gè)如果存在就返回 false,代表實(shí)際上并沒有注冊(cè)成功。

那為啥需要返回 false 呢?你如果存在了不執(zhí)行緩存邏輯就好嗎?甚至外面還要根據(jù)這個(gè) false 來拋出 MQClientException 異常?

如果注冊(cè)失敗,拋出異常

為啥呢?假設(shè)你同事 A 已經(jīng)使用了名稱 consumer_group_name_for_a ,線上正在正常的運(yùn)行消費(fèi)消息。得,你加了個(gè)功能需要監(jiān)聽 MQ,也使用了 consumer_group_name_for_a,你想想如果 RocketMQ 不做校驗(yàn),你倒是注冊(cè)成功了,但是你同事 A 估計(jì)要罵娘了:“咋回事?咋開始重復(fù)消費(fèi)了?”

啟動(dòng) mQClientFactory

這個(gè) mQClientFactory 就是在 實(shí)例化消費(fèi)者 步驟中創(chuàng)建的消費(fèi)者實(shí)例,最后會(huì)通過調(diào)用 mQClientFactory.start()。

這就是最后的核心邏輯了。

初始化 NameServer 地址

初始化用于通信的 Netty 客戶端

初始化 Netty 客戶端

啟動(dòng)一堆定時(shí)任務(wù)

這個(gè)一堆沒有夸張,確實(shí)很多,舉個(gè)例子:

  • 剛剛上面那一步,如果 NameServer 沒有獲取到,就會(huì)啟動(dòng)一個(gè)定時(shí)任務(wù)隔一段時(shí)間去拉一次
  • 比如,還會(huì)啟動(dòng)定時(shí)任務(wù)隔一段時(shí)間去 NameServer 拉一次指定 Topic 的路由數(shù)據(jù)。這個(gè)路由數(shù)據(jù)具體是指像 MessageQueue 相關(guān)的數(shù)據(jù),例如有多少個(gè)寫隊(duì)列、多少個(gè)讀隊(duì)列,還有就是該 Topic 所分布的 Broker 的 brokerName、集群和 IP 地址等相關(guān)的數(shù)據(jù),這些大致就叫路由數(shù)據(jù)
  • 再比如,啟動(dòng)發(fā)送心跳的定時(shí)任務(wù),不啟動(dòng)這個(gè)心跳不動(dòng)
  • 再比如,Broker 有可能會(huì)掛對(duì)吧?客戶端這邊是不是需要及時(shí)的把 offline 的 Broker 給干掉呢?所以 RocketMQ 有個(gè) cleanOfflineBroker 方法就是專門拿來干這個(gè)的
  • 然后有一個(gè)比較關(guān)鍵的就是持久化 offset,這里由于是采用的 CLUSTERING 消費(fèi),故會(huì)定時(shí)將當(dāng)前消費(fèi)者消費(fèi)的情況上報(bào)給 Broker

 

責(zé)任編輯:武曉燕 來源: SH的全棧筆記
相關(guān)推薦

2015-07-07 17:21:46

2021-02-27 11:03:26

算法職責(zé)ICBU

2021-03-08 08:03:44

注解Spring配置

2016-11-24 23:32:32

技術(shù)面試團(tuán)隊(duì)協(xié)作解決問題

2014-09-05 10:02:55

微軟

2018-10-23 17:08:11

CIOIT人才

2020-01-09 13:31:50

AI 數(shù)據(jù)人工智能

2010-05-25 18:57:42

啟動(dòng)postfix

2017-12-25 13:51:32

LinuxUbuntu LinuLXD容器

2021-08-26 11:09:51

systemdLinux

2021-08-26 13:55:45

systemdLinux目標(biāo)

2018-09-18 11:12:04

2021-09-24 18:36:48

數(shù)據(jù)平臺(tái)傳輸

2010-05-06 18:42:15

Unix系統(tǒng)

2019-04-22 12:25:40

UbuntuLinux IP地址

2018-07-11 05:56:19

2019-08-28 20:30:09

2021-03-05 06:27:38

MySQL日志機(jī)制

2024-08-19 04:00:00

2022-02-17 08:20:17

Spring執(zhí)行代碼SpringBoot
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)