自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

警惕!這八個場景下 RocketMQ 會發(fā)生流量控制

開發(fā) 前端
本文介紹了 RocketMQ 發(fā)生流量控制的 8 個場景,其中 Broker 4 個場景,Consumer 4 個場景。Broker 的流量控制,本質(zhì)是對 Producer 的流量控制,最好的解決方法就是給 Broker 擴容,增加 Broker 寫入能力。

大家好,我是君哥。

在使用 RocketMQ 的過程中,有時候我們會看到下面的日志:

[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 206ms, size of queue: 5

這是因為 RocketMQ 觸發(fā)了流量控制。今天我們來聊一聊哪些場景下 RocketMQ 會觸發(fā)流量控制。

如上圖,生產(chǎn)者把消息寫入 Broker,Consumer 從 Broker 拉取消息。Broker 是 RocketMQ 的核心 ,觸發(fā)流量控制主要就是為了防止 Broker 壓力過大而宕機。

一、 Broker 流控

1、 broker busy

RockerMQ 默認采用異步刷盤策略,Producer 把消息發(fā)送到 Broker 后,Broker 會先把消息寫入 Page Cache,刷盤線程定時地把數(shù)據(jù)從 Page Cache 刷到磁盤上,如下圖:

那 broker busy 是怎么導(dǎo)致的呢?

Broker 默認是開啟快速失敗的,處理邏輯類是 BrokerFastFailure,這個類中有一個定時任務(wù)用來清理過期的請求,每 10 ms 執(zhí)行一次,代碼如下:

public void start() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
cleanExpiredRequest();
}
}
}, 1000, 10, TimeUnit.MILLISECONDS);
}

(1)Page Cache 繁忙

清理過期請求之前首先會判斷 Page Cache 是否繁忙,如果繁忙,就會給 Producer 返回一個系統(tǒng)繁忙的狀態(tài)碼(code=2,remark="[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d"),也就是本文開頭的異常日志。那怎么判斷 Page Cache 繁忙呢?Broker 收到一條消息后會追加到 Page Cache 或者內(nèi)存映射文件,這個過程首先獲取一個 CommitLog 寫入鎖,如果持有鎖的時間大于 osPageCacheBusyTimeOutMills(默認 1s,可以配置),就認為 Page Cache 繁忙。具體代碼見 DefaultMessageStore 類 isOSPageCacheBusy 方法。

(2)清理過期請求

清理過期請求時,如果請求線程的創(chuàng)建時間到當前系統(tǒng)時間間隔大于 waitTimeMillsInSendQueue(默認 200ms,可以配置)就會清理這個請求,然后給 Producer 返回一個系統(tǒng)繁忙的狀態(tài)碼(code=2,remark="[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d")。

system busy

這個異常在 NettyRemotingAbstract#processRequestCommand 方法。

拒絕請求

如果 NettyRequestProcessor 拒絕了請求,就會給 Producer 返回一個系統(tǒng)繁忙的狀態(tài)碼(code=2,remark="[REJECTREQUEST]system busy, start flow control for a while")。那什么情況下請求會被拒絕呢?看下面這段代碼:

//SendMessageProcessor類
public boolean rejectRequest() {
return this.brokerController.getMessageStore().isOSPageCacheBusy() ||
this.brokerController.getMessageStore().isTransientStorePoolDeficient();
}

從代碼中可以看到,請求被拒絕的情況有兩種可能,一個是 Page Cache 繁忙,另一個是 TransientStorePoolDeficient。

跟蹤 isTransientStorePoolDeficient 方法,發(fā)現(xiàn)判斷依據(jù)是在開啟 transientStorePoolEnable 配置的情況下,是否還有可用的 ByteBuffer。

注意:在開啟 transientStorePoolEnable 的情況下,寫入消息時會先寫入堆外內(nèi)存(DirectByteBuffer),然后刷入 Page Cache,最后刷入磁盤。而讀取消息是從 Page Cache,這樣可以實現(xiàn)讀寫分離,避免讀寫都在 Page Cache 帶來的問題。如下圖:

線程池拒絕

Broker 收到請求后,會把處理邏輯封裝成到 Runnable 中,由線程池來提交執(zhí)行,如果線程池滿了就會拒絕請求(這里線程池中隊列的大小默認是 10000,可以通過參數(shù) sendThreadPoolQueueCapacity 進行配置),線程池拒絕后會拋出異常 RejectedExecutionException,程序捕獲到異常后,會判斷是不是單向請求(OnewayRPC),如果不是,就會給 Producer 返回一個系統(tǒng)繁忙的狀態(tài)碼(code=2,remark="[OVERLOAD]system busy, start flow control for a while")。

判斷 OnewayRPC 的代碼如下,flag = 2 或者 3 時是單向請求:

public boolean isOnewayRPC() {
int bits = 1 << RPC_ONEWAY;
return (this.flag & bits) == bits;
}

(3) 消息重試

Broker 發(fā)生流量控制的情況下,返回給 Producer 系統(tǒng)繁忙的狀態(tài)碼(code=2),Producer 收到這個狀態(tài)碼是不會進行重試的。下面是會進行重試的響應(yīng)碼:

//DefaultMQProducer類
private final Set<Integer> retryResponseCodes = new CopyOnWriteArraySet<Integer>(Arrays.asList(
ResponseCode.TOPIC_NOT_EXIST,
ResponseCode.SERVICE_NOT_AVAILABLE,
ResponseCode.SYSTEM_ERROR,
ResponseCode.NO_PERMISSION,
ResponseCode.NO_BUYER_ID,
ResponseCode.NOT_IN_CURRENT_UNIT
));

二、 Consumer 流控

DefaultMQPushConsumerImpl 類中有 Consumer 流控的邏輯 。

1、 緩存消息數(shù)量超過閾值

ProcessQueue 保存的消息數(shù)量超過閾值(默認 1000,可以配置),源碼如下:

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}

2、緩存消息大小超過閾值

ProcessQueue 保存的消息大小超過閾值(默認 100M,可以配置),源碼如下:

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}

3、 緩存消息跨度超過閾值

對于非順序消費的場景,ProcessQueue 中保存的最后一條和第一條消息偏移量之差超過閾值(默認 2000,可以配置)。源代碼如下:

if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
}

4、獲取鎖失敗

對于順序消費的情況,ProcessQueue 加鎖失敗,也會延遲拉取,這個延遲時間默認是 3s,可以配置。

三、總結(jié)

本文介紹了 RocketMQ 發(fā)生流量控制的 8 個場景,其中 Broker 4 個場景,Consumer 4 個場景。Broker 的流量控制,本質(zhì)是對 Producer 的流量控制,最好的解決方法就是給 Broker 擴容,增加 Broker 寫入能力。而對于 Consumer 端的流量控制,需要解決 Consumer 端消費慢的問題,比如有第三方接口響應(yīng)慢或者有慢 SQL。

在使用的時候,根據(jù)打印的日志可以分析具體是哪種情況的流量控制,并采用相應(yīng)的措施。

責(zé)任編輯:姜華 來源: 君哥聊技術(shù)
相關(guān)推薦

2010-02-03 23:04:31

流量控制P2P華夏創(chuàng)新

2025-02-10 10:38:24

2023-08-07 09:12:51

權(quán)限SpringSecurity

2022-05-06 17:12:35

區(qū)塊鏈元宇宙

2023-10-08 12:14:42

Sentinel流量控制

2022-05-02 16:18:22

RocketMQBrokertopic

2022-05-26 00:33:29

權(quán)限TienChin項目

2011-06-23 09:09:37

流量控制

2013-07-22 14:25:29

iOS開發(fā)ASIHTTPRequ

2018-04-09 12:44:45

Docker使用場景開發(fā)

2015-01-06 09:48:34

Docker多租戶docker應(yīng)用

2024-05-13 18:33:08

SQL日期函數(shù)

2010-06-17 17:00:07

Linux流量控制

2010-06-04 10:49:58

Linux流量控制

2021-03-09 07:38:15

Percona Xtr流量控制運維

2019-10-18 15:16:10

Redis數(shù)據(jù)庫并發(fā)

2010-05-27 11:03:44

Linux流量控制

2010-06-04 11:21:42

Linux 流量控制

2009-02-05 10:13:00

局域網(wǎng)流量控制數(shù)據(jù)流量

2024-12-02 08:02:36

點贊
收藏

51CTO技術(shù)棧公眾號