在同一個程序中創(chuàng)建兩個不同Group ID的消費端實例,在控制臺中查看一個Group ID下單個消費端堆棧信息,堆棧信息中包含了兩個Group ID消費端的堆棧信息,給排查問題造成了困擾。
背景介紹
專有云企業(yè)版v_3_12,消息隊列RocketMQ控制臺->Group管理,查看Group ID下單個消費端堆棧信息,期望只展示與該Group ID相關(guān)的堆棧信息,在以下場景與期望不符。
場景介紹
在同一個程序中創(chuàng)建兩個不同Group ID的消費端實例,在控制臺中查看一個Group ID下單個消費端堆棧信息,堆棧信息中包含了兩個Group ID消費端的堆棧信息,給排查問題造成了困擾。
示例代碼
pom
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.8.3.Final</version>
</dependency>
code
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import com.aliyun.openservices.ons.api.bean.BatchConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class Main {
public static void main(String[] args){
String nameSrvAddr = "xxx";
String accessKey = "xxx";
String secretKey = "xxx";
String groupId1 = "Goup_ID_1";
String topic1 = "xxx_1";
String tag1 = "xxx_1";
BatchMessageListener batchMessageListener1 = (messages, context) -> Action.CommitMessage;
BatchConsumerBean batchConsumerBean1 = batchConsumerBean(nameSrvAddr,accessKey,secretKey,
groupId1,topic1,tag1,batchMessageListener1);
batchConsumerBean1.start();
String groupId2 = "Goup_ID_2";
String topic2 = "xxx_2";
String tag2 = "xxx_2";
BatchMessageListener batchMessageListener2 = (messages, context) -> Action.CommitMessage;
BatchConsumerBean batchConsumerBean2 = batchConsumerBean(nameSrvAddr,accessKey,secretKey,
groupId2,topic2,tag2,batchMessageListener2);
batchConsumerBean2.start();
}
private static BatchConsumerBean batchConsumerBean(String nameSrvAddr,String accessKey,String secretKey,String groupId,String topic,String tag,BatchMessageListener batchMessageListener){
BatchConsumerBean batchConsumerBean = new BatchConsumerBean();
Properties properties = new Properties();
properties.put(PropertyKeyConst.NAMESRV_ADDR,nameSrvAddr);
properties.put(PropertyKeyConst.AccessKey,accessKey);
properties.put(PropertyKeyConst.SecretKey,secretKey);
properties.put(PropertyKeyConst.GROUP_ID,groupId);
batchConsumerBean.setProperties(properties);
Subscription subscription = new Subscription();
subscription.setTopic(topic);
subscription.setExpression(tag);
Map<Subscription, BatchMessageListener> subscriptionTable = new HashMap<>();
subscriptionTable.put(subscription,batchMessageListener);
batchConsumerBean.setSubscriptionTable(subscriptionTable);
return batchConsumerBean;
}
}
分析過程
首先分析示例代碼中與BatchConsumerBean相關(guān)聯(lián)的對象,然后分析控制臺展示消費端堆棧信息的流程,最后分析下不同版本的RocketMQ Client SDK對消費端消費線程命名方式的變化。
BatchConsumerBean
示例代碼中創(chuàng)建了兩個BatchConsumerBean實例,與BatchConsumerBean實例相關(guān)聯(lián)的對象如下:

與BatchConsumerBean關(guān)聯(lián)的對象
從上圖看,BatchConsumerBean實例是比較重的,所以上面的示例代碼可以優(yōu)化為只創(chuàng)建一個BatchConsumerBean實例,與該問題不太相關(guān),暫時忽略;
上圖中與該問題直接相關(guān)的是ClientRemotingProcessor、MQClientInstance、DefaultMQPushConsumerImpl、ConsumerStatsManager,下面繼續(xù)分析。
堆棧信息展示流程
下面描述的是在瀏覽器請求一個Group ID單個消費端堆棧信息的流程。

堆棧信息展示流程
瀏覽器請求控制臺應(yīng)用
當在控制臺單機某個消費端堆棧信息的時候,瀏覽器會向控制臺應(yīng)用發(fā)起http請求,主要請求參數(shù)是:
GroupID,ClientId,其中每個MQClientInstance實例對應(yīng)一個ClientId。
控制臺應(yīng)用請求Broker
控制臺應(yīng)用收到瀏覽器請求后,主要進行以下操作:
String topic = MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
if (brokerDatas != null) {
for (BrokerData brokerData : brokerDatas) {
String addr = brokerData.selectBrokerAddr();
if (addr != null) {
return this.mqClientInstance.getMQClientAPIImpl().getConsumerRunningInfo(addr, consumerGroup, clientId, jstack,timeoutMillis * 3);
}
}
}
- 根據(jù)%RETRY% + GroupIID查找對應(yīng)的TopicRouteData
- 從TopicRouteData中選擇一個Broker的地址發(fā)送getConsumerRunningInfo請求
Broker請求Consumer
Broker收到請求后,主要進行以下操作:
ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId);
RemotingCommand newRequest = RemotingCommand.createRequestCommand(requestCode, null);
newRequest.setExtFields(request.getExtFields());
newRequest.setBody(request.getBody());
return this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest);
- AdminBrokerProcessor響應(yīng)查詢請求
- 根據(jù)GroupID和ClientId找到對應(yīng)Consumer實例的channel socket
- 通過channel socket發(fā)送請求到Consumer實例
Consumer處理邏輯
Consumer收到請求后,主要進行以下操作:
ConsumerRunningInfo consumerRunningInfo = this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup());
if (requestHeader.isJstackEnable()) {
Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
String jstack = UtilAll.jstack(map);
consumerRunningInfo.setJstack(jstack);
}
- 通過MQClientInstance實例請求Consumer實例的consumerRunningInfo方法獲取Consumer運行信息,如:pullRT、pullTPS、consumeRT、consumeOKTPS、consumeFailedTPS等信息
- 獲取JVM所有線程棧信息
- 將獲取到的ConsumerRunningInfo返回給Broker。
其中第2步【獲取JVM所有線程棧信息】就是我們需要查看的堆棧信息,目前控制臺主要展示了以ConsumeMessageThread__開頭的線程和RebalanceService線程,這塊期望只展示與該消費端相關(guān)的ConsumeMessageThread__線程和Rebalance線程,不期望將不相關(guān)的消費端線程也展示出來。
ConsumeMessageThread線程的命名
在當前版本中處理業(yè)務(wù)的消費者線程名的形式是:ConsumeMessageThread_數(shù)字,
ConsumeMessageConcurrentlyService類中相關(guān)代碼如下:
//該線程池用于處理業(yè)務(wù)邏輯
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));
新版本中線程的命名中增加了GroupId,相關(guān)代碼如下:
String consumeThreadPrefix = null;
if (consumerGroup.length() > 100) {
consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup, 0, 100).append("_").toString();
} else {
consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup).append("_").toString();
}
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl(consumeThreadPrefix));
線程名形式為:ConsumeMessageThread_GroupId__數(shù)字,從一定程度對以上問題進行了優(yōu)化。
總結(jié)
- ONS SDK對RocketMQ Client進行了封裝,更加方便業(yè)務(wù)的使用,Consumer對象比較重,需要根據(jù)業(yè)務(wù)采用合理的初始化方式
- ConsumerStatsManager記錄了消費端的一些統(tǒng)計信息
- ConsumeMessageConcurrentlyService對消費端線程命名進行了優(yōu)化?