自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

生產(chǎn)環(huán)境MQ集群一個非常詭異的消費延遲排查

開發(fā) 前端
由于目前我暫時對底層存儲寫入的原理還認識不夠深入,對相關系統(tǒng)采集指標不夠敏感,當時主要分析了一下線程棧,發(fā)現(xiàn)ReputMessageService線程一直在工作,推測可能是轉發(fā)不及時,這塊我還需要更加深入去研究,如果大家對這塊有其實理解,歡迎留言,我也會在后續(xù)工作中提升這塊的技能,更加深入去理解底層的原理。

?1、問題現(xiàn)象

某一天,項目組一個同事向我反饋,他們使用公司的數(shù)據(jù)同步產(chǎn)品將MySQL數(shù)據(jù)同步到MQ集群,然后使用消費者將數(shù)據(jù)再同步到ES,反饋數(shù)據(jù)同步延遲嚴重,但對應的消費組確沒有積壓,但最近最近幾分鐘的數(shù)據(jù)都沒有同步過來。

那問題來了,消費端沒有消費積壓,而且通過查看數(shù)據(jù)同步平臺該通過任務的同步狀態(tài),同樣顯示沒有積壓,那是為什么呢?

遇到這個問題,我們應該冷靜下來,分析一下其大概的數(shù)據(jù)流向圖,梳理后如下圖所示:

圖片

通過初步的診斷,從數(shù)據(jù)同步產(chǎn)品查看Binlog同步無延遲、MQ消費無積壓,那為什么最終Es集群中的數(shù)據(jù)與MySQL有高達幾分鐘的延遲呢?

2、問題排查

根據(jù)上圖幾個關鍵組件數(shù)據(jù)同步延遲的檢測,基本就排除了數(shù)據(jù)同步組件、MQ消費端本身消費的問題,問題的癥結應該就是數(shù)據(jù)同步組件成功將數(shù)據(jù)寫入到MQ集群,并且MQ集群返回了寫入成功,但消費端并沒有及時感知這個消息,也就是說消息雖然寫入到MQ集群,但并沒有達到消費隊列。

因為如果數(shù)據(jù)同步組件如果沒有寫入成功,則MySQL Binlog日志就會出現(xiàn)延遲。但如果是MQ消費端的問題,則MQ平臺也會顯示消費組積壓。

那為什么消息服務器寫入成功,但消費組為什么感知不到呢?

首先為了驗證上述結論是否正確,我還特意去看了一下主題的詳細信息:

圖片

查看主題的統(tǒng)計信息時發(fā)現(xiàn)當前系統(tǒng)的時間為19:01分, 但主題最新的寫入時間才是18:50,兩者之間相差將近10分鐘。

備注:上述界面是我們公司內部的消息運營管理平臺,其實底層是調用了RocketMQ提供的topicStatus命令。

那這又是怎么造成的呢?

在這里我假設大家對RocketMQ底層的實現(xiàn)原理還不是特別熟悉,在這樣的情況下,我覺得我們應該首先摸清楚topicStatus這個命令返回的minOffset、maxOffset以及l(fā)astUpdate這些是的具體獲取邏輯,只有了解了這些,我們才能尋根究底,最終找到解決辦法。

2.1 問題探究與原理剖析

在這個場景中,我們可以通過對topicStatus命令進行解析,從而探究其背后的實現(xiàn)原理。

當我們在命令行中輸入 sh ./mqadmin topicStatus命令時,最終是調用defaultMQAdminExtImpl的examineTopicStats方法,最終在服務端的處理邏輯定義在AdminBrokerProcessor的getTopicStatsInfo方法中,核心代碼如下:

圖片

這里的實現(xiàn)要點:

  • 通過MessageStore的getMinOffsetInQueue獲取最小偏移量。
  • 通過MessageStore的getMaxOffsetInQueue獲取最大偏移量。
  • 最新更新時間為最大偏移量減去一(表示最新一條消息)的存儲時間

故要弄清隊列最大、最小偏移量,關鍵是要看懂getMaxOffsetInQueue或者getMinOffsetInQueue的計算邏輯。

我也注意到分析源碼雖然能直抵真相,但閱讀起來太粗糙,所以我接下來的文章會盡量避免通篇的源碼解讀,取而代之的是只點出源碼的入口處,其旁支細節(jié)將通過時序圖獲流程圖,方便感興趣的讀者朋友去探究,我重點進行知識點的提煉,降低大家的學習成本。

如果大家想成體系的研究RocketMQ,想將消息中間件當成自己職業(yè)的閃光點,強烈建議購買我的兩本關于RocketMQ的數(shù)據(jù):《RocketMQ技術內幕》與《RocketMQ實戰(zhàn)》。

MessageStore的getMaxOffsetInQueue的時序圖如下所示:

圖片

從上述時序圖我們可以得知,調用DefaultMessageStore的getMaxOffsetInQueue方法,首先是根據(jù)主題、隊列ID獲取ConsumeQueue對象(在RocketMQ中一個主題的一個隊列會對應一個ConsumeQueue,代表一個消費隊列),也就是這里獲取的偏移量指的是消費者隊列中的偏移量,而不是Commitlog文件的偏移量。

如果是找最大偏移量,就從該隊列中的找到最后一個文件,去獲取器最大的有效偏移量,也就是等于文件的起始偏移量(fileFromOffset)加上該文件當前最大可讀的偏移量(readPosition),故引起這張時序圖一個非常關鍵的點,就是如何獲取消費隊列最大的可讀偏移量,代碼見MappedFile的getReadPosition:

public int getReadPosition(){
return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}

由于ConsumeQueue并沒有 transientStorePoolEnable 機制,數(shù)據(jù)直接寫入到FlieChannel中,故這里的writeBuffer為空,取的是 wrotePosition的值,那ConsumeQueue文件的wrotePosition值在什么地方更新呢?

這個可以通過查看MappedFile中修改wrotePosition的方法appendMessage方法的調用,如下圖所示:

圖片

與ConsumeQueue對應的入口主要有兩個:

  • ReputMessageService#doReput Commitlog異步轉發(fā)線程,通過該線程異步構建Consumequeue、Index等文件
  • Commitlog#recoverAbnormally RocketMQ啟動時根據(jù)Commitlog文件自動恢復Consumequeue文件

今天的主角當然不讓非ReputMessageService莫屬,這里先和大家普及一下一個最基本的知識:RocketMQ為了追求極致的順序寫,會將所有主題的消息順序寫入到一個文件(Commitlog文件),然后異步轉發(fā)到ConsumeQueue(消費隊列文件)、IndexFile(索引文件)。

其轉發(fā)服務就是通過ReputMessageService來實現(xiàn)的。

在深入介紹Commitlog文件的轉發(fā)機制之前,我在這里先問大家一個問題:消息是寫入到內存就轉發(fā)給ConsumeQueue,亦或是刷寫到磁盤后再轉發(fā)呢?

為了方便大家對這個問題的探究,其代碼的核心入口如下圖所示:

圖片

這里的關鍵實現(xiàn)要點如下:

  • 判斷是否轉發(fā)關鍵條件在于 isCommitlogAvailable()方法返回true
  • 根據(jù)轉發(fā)位點reputFromOffset,從Commitlog文件中獲取消息的物理偏移量、消息大小,tags等信息轉發(fā)到消息消費隊列、索引文件。

那isCommitlogAvailable的核心如下所示:

圖片

故轉發(fā)的關鍵就在于Commitlog的maxOffset的獲取邏輯了,其實現(xiàn)時序圖如下所示:

圖片

這里核心重點是getReadPosition方法的實現(xiàn),在RocketMQ寫Commitlog文件,為了提升寫入性能,引入了內存級讀寫分離機制,具體的實現(xiàn)原理如下圖所示:

圖片

具體在實現(xiàn)層面,就是如果transientStorePoolEnable=true,數(shù)據(jù)寫入到堆外內存(writeBuffer)中,然后再提交到FileChannel,提交的位置(commitedPosition來表示)。

大家可以分別看一下改變wrotePosition與committedPposition的調用鏈。

其中wrotePosition的調用鏈如下所示:

圖片

可以得知:wrotePosition是消息寫入到內存(pagecache或者堆外內存)都會更新,但一旦開啟了堆外內存機制,并不會取該值,所以我們可以理解為當消息寫入到Pagecache中時,就可以被轉發(fā)到消息消費隊列。

緊接著我們再看一下committedPosition的調用鏈,如下所示:

圖片

原來在RocketMQ中,如果開啟了transientStorePoolEnable機制,消息先寫入到堆外內存,然后就會向消息發(fā)送者返回發(fā)送成功,然后會有一個異步線程(CommitRealTimeService)定時將消息(默認200ms一次循環(huán))提交到FileChannel,即更新committedPosition的值,消息就會轉發(fā)給消費隊列,從而消費者就可以進行消費。

2.2 問題原因提煉

經(jīng)過上面的解析,問題應該有所眉目了。

由于我們公司為了提高RocketMQ的資源利用率,提升RocketMQ的寫入性能,我們開啟了transientStorePoolEnable機制,消息發(fā)送端寫入到堆外內存,就會返回寫入成功,這樣MySQL Binlog數(shù)據(jù)同步并不會產(chǎn)生延遲,那這里的問題,無非就2個:

  • CommitRealTimeService 線程并沒有及時將堆外內存中的數(shù)據(jù)提交到FileChannel
  • ReputMessageService線程沒有及時將數(shù)據(jù)轉發(fā)到消費隊列

由于目前我暫時對底層存儲寫入的原理還認識不夠深入,對相關系統(tǒng)采集指標不夠敏感,當時主要分析了一下線程棧,發(fā)現(xiàn)ReputMessageService線程一直在工作,推測可能是轉發(fā)不及時,這塊我還需要更加深入去研究,如果大家對這塊有其實理解,歡迎留言,我也會在后續(xù)工作中提升這塊的技能,更加深入去理解底層的原理。

也就是目前知道了問題的表象原因,雖然底層原理還未通透,但目前足以指導我們更好的處理問題:將集群內消息寫入大的主題,遷移到其他負載較低的集群,從而降低該集群的寫入壓力,當遷移了幾個主題后,果不其然,消息到達消費隊列接近實時,集群得以恢復。

責任編輯:武曉燕 來源: 中間件興趣圈
相關推薦

2023-02-23 08:02:19

PulsarJava

2024-06-28 08:28:43

反序列化filterJson

2013-12-05 10:50:13

2022-05-31 08:35:05

RocketMQACK客戶端

2018-07-20 08:44:21

Redis內存排查

2021-04-20 08:32:51

消息MQ隊列

2014-05-23 10:37:37

聊天程序PHP聊天程序

2020-09-29 15:08:47

Go UI框架開發(fā)

2024-07-02 11:29:28

Typer庫Python命令

2018-08-07 10:54:02

HTTPS郵箱瀏覽器

2025-01-08 06:00:00

Argus開源安全檢查工具

2025-01-07 13:00:00

日志分析平臺Graylog網(wǎng)絡安全

2024-05-09 08:08:32

SpringBinderJava

2024-04-24 11:42:21

Redis延遲消息數(shù)據(jù)庫

2021-04-30 07:09:48

SQLP0事故

2021-10-13 10:22:10

Python多繼承開發(fā)

2013-04-23 14:32:28

創(chuàng)業(yè)創(chuàng)業(yè)者Mark Suster

2024-07-16 18:05:19

延遲隊列MQRabbitMQ

2022-04-18 09:07:54

Linux網(wǎng)絡延遲

2019-04-15 13:15:12

數(shù)據(jù)庫MySQL死鎖
點贊
收藏

51CTO技術棧公眾號