自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

如何在 SpringBoot 項(xiàng)目中控制 RocketMQ消費(fèi)線程數(shù)量

開(kāi)發(fā)
如何設(shè)置單個(gè) topic 消費(fèi)線程的最小數(shù)量和最大數(shù)量,用來(lái)區(qū)分不同 topic 吞吐量不同。

1 背景

最近在新項(xiàng)目開(kāi)發(fā)中遇到一個(gè)有趣的問(wèn)題,如何在 SpringBoot 項(xiàng)目中控制 RocketMQ 消費(fèi)線程數(shù)量。如何設(shè)置單個(gè) topic 消費(fèi)線程的最小數(shù)量和最大數(shù)量,用來(lái)區(qū)分不同 topic 吞吐量不同。

我們先介紹一下 RocketMQ 消息監(jiān)聽(tīng)再來(lái)說(shuō)明 RocketMQ 消費(fèi)線程。

2 RocketMQ 消息監(jiān)聽(tīng)

設(shè)置消費(fèi)者組為 my_consumer_group,監(jiān)聽(tīng) TopicTest 隊(duì)列,并使用并發(fā)消息監(jiān)聽(tīng)器MessageListenerConcurrently

1public class Consumer {
2
3 public static void main(String[] args) throws InterruptedException, MQClientException {
4 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");
5 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
6 consumer.subscribe("TopicTest", "*");
7 consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
8 consumer.registerMessageListener(new MessageListenerConcurrently() {
9 @Override
10 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
11 ConsumeConcurrentlyContext context) {
12 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
13 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
14 }
15 });
16 consumer.start();
17 System.out.printf("Consumer Started.%n");
18 }
19}

3 RocketMQ 中連接結(jié)構(gòu)圖

圖片

4 消費(fèi)監(jiān)聽(tīng)器

接口:org.apache.rocketmq.client.consumer.listener.MessageListener

圖片

有兩個(gè)子接口:

- 順序消費(fèi):MessageListenerOrderly
- 并發(fā)消費(fèi): MessageListenerConcurrently

圖片

4.1 MessageListenerConcurrently

作用:consumer并發(fā)消費(fèi)消息的監(jiān)聽(tīng)器

圖片

比如,在 quick start 中,就是使用的并發(fā)消費(fèi)消息監(jiān)聽(tīng)器:?

1 consumer.registerMessageListener(new MessageListenerConcurrently() {
2 @Override
3 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
4 ConsumeConcurrentlyContext context) {
5 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
6 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
7 }
8 });

方法返回值,是個(gè)枚舉:

1 package org.apache.rocketmq.client.consumer.listener;
2
3/**
4 * 并發(fā)消費(fèi)mq消息結(jié)果
5 */
6public enum ConsumeConcurrentlyStatus {
7
8 /**
9 * Success consumption
10 * 成功消費(fèi)
11 */
12 CONSUME_SUCCESS,
13
14 /**
15 * Failure consumption,later try to consume
16 * 失敗消費(fèi),稍后嘗試消費(fèi)
17 *
18 *
19 * 如果 {@link MessageListener}返回的消費(fèi)結(jié)果為 RECONSUME_LATER,則需要將這些消息發(fā)送給Broker延遲消息。
20 * 如果給broker發(fā)送消息失敗,將延遲5s后提交線程池進(jìn)行消費(fèi)。
21 *
22 * RECONSUME_LATER的消息發(fā)送入口: MQClientAPIImpl#consumerSendMessageBack,
23 * 命令編碼: {@link org.apache.rocketmq.common.protocol.RequestCode#CONSUMER_SEND_MSG_BACK}
24 */
25 RECONSUME_LATER;
26}

畫外音:

當(dāng)前,我們?cè)诰唧w開(kāi)發(fā)中,肯定不會(huì)直接使用這種方式來(lái)寫consumer。

常用的Consumer實(shí)現(xiàn)是:基于 推 的consumer:DefaultMQPushConsumer

4.2 MessageListenerOrderly

作用:consumer順序消費(fèi)消息的監(jiān)聽(tīng)器

5 消費(fèi)線程池

5.1 DefaultMQPushConsumer

作用:基于 推 的consumer消費(fèi)者

5.2 注冊(cè)并發(fā)消息監(jiān)聽(tīng)器

org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#registerMessageListener

圖片

當(dāng)使用這個(gè)方法注冊(cè)消息監(jiān)聽(tīng)器時(shí),實(shí)際上會(huì)把這個(gè)病發(fā)消息監(jiān)聽(tīng)器設(shè)置到 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#messageListenerInner屬性中。

5.3 設(shè)置 consumer 消費(fèi) service

可選有兩種:?

并發(fā)消費(fèi)的service

順序消費(fèi)的service

當(dāng)consumer在啟動(dòng)的時(shí),會(huì)使用MessageListener具體實(shí)現(xiàn)類型進(jìn)行判斷:

圖片

MessageListener 就有并發(fā)和順序兩種,所以service也有兩種。

1public synchronized void start() throws MQClientException {
2 switch (this.serviceState) {
3 case CREATE_JUST:
4
5 // 省略一部分代碼...........
6
7 // 根據(jù)注冊(cè)的監(jiān)聽(tīng)器類型[并發(fā)消息監(jiān)聽(tīng)器/順序執(zhí)行消息監(jiān)聽(tīng)器],來(lái)確定使用哪種消費(fèi)服務(wù).
8 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
9 this.consumeOrderly = true;
10 this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
11 } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
12 this.consumeOrderly = false;
13 this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
14 }
15 this.consumeMessageService.start();
16
17 // 省略一部分代碼..........
18 this.serviceState = ServiceState.RUNNING;
19 break;
20 case RUNNING:
21 case START_FAILED:
22 case SHUTDOWN_ALREADY:
23 throw new MQClientException("The PushConsumer service state not OK, maybe started once");
24 default:
25 break;
26 }
27
28 // 省略一部分代碼..........
29 }

如果使用的是并發(fā)消費(fèi)的話,使用 ConsumeMessageConcurrentlyService :

在實(shí)例化的時(shí)候,會(huì)創(chuàng)建一個(gè)線程池:

圖片

1// 無(wú)界隊(duì)列,并且不可配置容量.那 DefaultMQPushConsumer#consumeThreadMax 配置就毫無(wú)意義了.
2this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
3this.consumeExecutor = new ThreadPoolExecutor(
4 this.defaultMQPushConsumer.getConsumeThreadMin(), // 默認(rèn)20
5 this.defaultMQPushConsumer.getConsumeThreadMax(), // 默認(rèn)64
6 1000 * 60,
7 TimeUnit.MILLISECONDS,
8 this.consumeRequestQueue,
9 new ThreadFactoryImpl("ConsumeMessageThread_"));

consumer消費(fèi)線程池參數(shù):

  • 默認(rèn)最小消費(fèi)線程數(shù) 20
  • 默認(rèn)最大消費(fèi)線程數(shù) 64
  • keepAliveTime = 60*1000      單位:秒
  • 隊(duì)列:new LinkedBlockingQueue<>()? 無(wú)界隊(duì)列
  • 線程名稱:前綴是:ConsumeMessageThread_

注意:因?yàn)榫€程池使用的是無(wú)界隊(duì)列,那么設(shè)置的最大線程數(shù),其實(shí)沒(méi)有什么意義。

5.4 修改線程池線程數(shù)

上面我們已經(jīng)知道了,設(shè)置線程池的最大線程數(shù)是沒(méi)什么用的。

那我們其實(shí)可以設(shè)置線程池的最小線程數(shù),來(lái)修改consumer消費(fèi)消息時(shí)的線程池大小。

1public static void main(String[] args) throws InterruptedException, MQClientException {
2 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
3
4 consumer.setConsumeThreadMin(30);
5 consumer.setConsumeThreadMax(64);
6
7 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
8 consumer.subscribe("TopicTest", "*");
9 consumer.registerMessageListener(new MessageListenerConcurrently() {
10
11 @Override
12 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
13 ConsumeConcurrentlyContext context) {
14 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
15 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
16 }
17 });
18 consumer.start();
19 System.out.printf("Consumer Started.%n");
20 }

注意:consumeThreadMin? 如果大于64,則也需要設(shè)置 consumeThreadMax 參數(shù),因?yàn)橛袀€(gè)校驗(yàn):

圖片

-修改線程池線程數(shù)-SpringBoot版

如果consumer是使用spring boot進(jìn)行集成的,則可以這樣設(shè)置消費(fèi)者線程數(shù):

圖片

責(zé)任編輯:張燕妮 來(lái)源: 中生代技術(shù)
相關(guān)推薦

2022-08-02 10:01:42

架構(gòu)

2022-12-04 23:54:39

2022-11-23 15:44:49

2020-10-27 14:15:42

SpringBoot

2017-07-04 19:02:17

ReacRedux 項(xiàng)目

2021-08-23 10:40:30

人工智能KubernetesAI

2009-04-07 09:12:35

敏捷新手入門大型開(kāi)發(fā)

2021-09-15 07:56:32

TypeScriptVue項(xiàng)目

2021-09-14 07:06:13

React項(xiàng)目TypeScript

2021-03-23 08:39:27

SpringBootRedis管道技術(shù)

2021-03-30 10:46:42

SpringBoot計(jì)數(shù)器漏桶算法

2020-03-17 08:04:11

物聯(lián)網(wǎng)隱私安全

2022-07-04 10:39:24

TienChin項(xiàng)目自定義

2022-07-07 09:00:49

RocketMQ消費(fèi)者消息消費(fèi)

2025-01-03 16:32:13

SpringBoot虛擬線程Java

2023-03-28 07:08:09

RocketMQ消費(fèi)者堆棧

2023-09-26 08:01:46

消費(fèi)者TopicRocketMQ

2022-06-09 13:52:35

Vue協(xié)作開(kāi)發(fā)項(xiàng)目

2024-07-03 13:03:30

Spring注解項(xiàng)目

2023-08-23 13:24:00

異步編程方法
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)