自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

阿里二面:RocketMQ 消費者拉取一批消息,其中部分消費失敗了,偏移量怎樣更新?

人工智能 機器學習
如果一批消息按照順序消費,是不可能出現(xiàn)第 100 條消息消費成功了,但第 50 條消費失敗的情況,因為第 50 條消息失敗的時候,應該退出循環(huán),不再繼續(xù)進行消費。

大家好,我是君哥。

最近有讀者參加面試時被問了一個問題,如果消費者拉取了一批消息,比如 100 條,第 100 條消息消費成功了,但是第 50 條消費失敗,偏移量會怎樣更新?就著這個問題,今天來聊一下,如果一批消息有消費失敗的情況時,偏移量怎么保存。

1 拉取消息

1.1 封裝拉取請求

以 RocketMQ 推模式為例,RocketMQ 消費者啟動代碼如下:

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
try{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
}catch (Exception e){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}

上面的 DefaultMQPushConsumer 是一個推模式的消費者,啟動方法是 start。消費者啟動后會觸發(fā)重平衡線程(RebalanceService),這個線程的任務是在死循環(huán)中不停地進行重平衡,最終封裝拉取消息的請求到 pullRequestQueue。這個過程涉及到的 UML 類圖如下:

圖片

1.2 處理拉取請求

封裝好拉取消息的請求 PullRequest 后,RocketMQ 就會不停地從 pullRequestQueue 獲取消息拉取請求進行處理。UML 類圖如下:

圖片

拉取消息的入口方法是一個死循環(huán),代碼如下:

//PullMessageService
public void run(){
log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}

log.info(this.getServiceName() + " service end");
}

這里拉取到消息后,提交給 PullCallback 這個回調(diào)函數(shù)進行處理。

拉取到的消息首先被 put 到 ProcessQueue 中的 msgTreeMap 上,然后被封裝到 ConsumeRequest 這個線程類來處理。把代碼精簡后,ConsumeRequest 處理邏輯如下:

//ConsumeMessageConcurrentlyService.java
public void run(){
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
try {
//1.執(zhí)行消費邏輯,這里的邏輯是在文章開頭的代碼中定義的
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
}
if (!processQueue.isDropped()) {
//2.處理消費結(jié)果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}

2 處理消費結(jié)果

2.1 并發(fā)消息

并發(fā)消息處理消費結(jié)果的代碼做精簡后如下:

//ConsumeMessageConcurrentlyService.java
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
){
int ackIndex = context.getAckIndex();
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
break;
case RECONSUME_LATER:
break;
default:
break;
}

switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}

if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
}
break;
default:
break;
}

long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}

從上面的代碼可以看出,如果處理消息的邏輯是串行的,比如文章開頭的代碼使用 for 循環(huán)來處理消息,那如果在某一條消息處理失敗了,直接退出循環(huán),給 ConsumeConcurrentlyContext 的 ackIndex 變量賦值為消息列表中失敗消息的位置,這樣這條失敗消息后面的消息就不再處理了,發(fā)送給 Broker 等待重新拉取。代碼如下:

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
for (int i = 0; i < msgs.size(); i++) {
try{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
}catch (Exception e){
context.setAckIndex(i);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}

消費成功的消息則從 ProcessQueue 中的 msgTreeMap 中移除,并且返回 msgTreeMap 中最小的偏移量(firstKey)去更新。注意:集群模式偏移量保存在 Broker 端,更新偏移量需要發(fā)送消息到 Broker,而廣播模式偏移量保存在 Consumer 端,只需要更新本地偏移量就可以。

如果處理消息的邏輯是并行的,處理消息失敗后給 ackIndex 賦值是沒有意義的,因為可能有多條消息失敗,給 ackIndex 變量賦值并不準確。最好的方法就是給 ackIndex 賦值 0,整批消息全部重新消費,這樣又可能帶來冥等問題。

2.2 順序消息

對于順序消息,從 msgTreeMap 取出消息后,先要放到 consumingMsgOrderlyTreeMap 上面,更新偏移量時,是從 consumingMsgOrderlyTreeMap 上取最大的消息偏移量(lastKey)。

3 總結(jié)

回到開頭的問題,如果一批消息按照順序消費,是不可能出現(xiàn)第 100 條消息消費成功了,但第 50 條消費失敗的情況,因為第 50 條消息失敗的時候,應該退出循環(huán),不再繼續(xù)進行消費。

如果是并發(fā)消費,如果出現(xiàn)了這種情況,建議是整批消息全部重新消費,也就是給 ackIndex 賦值 0,這樣必須考慮冥等問題。

責任編輯:武曉燕 來源: 君哥聊技術
相關推薦

2022-03-14 11:05:01

RocketMQRedis緩存

2022-06-02 10:54:16

BrokerRocketMQ

2023-03-14 08:45:25

RocketMQ消息消費

2021-12-17 08:17:00

RocketMQ數(shù)據(jù)結(jié)構(gòu)消息中間件

2022-08-15 10:45:34

RocketMQ消息隊列

2024-01-24 09:00:31

SSD訂閱關系內(nèi)存

2024-04-22 00:00:00

RocketMQ優(yōu)化位點

2022-07-07 09:00:49

RocketMQ消費者消息消費

2022-11-08 07:36:17

RocketMQ消費者消息堆積

2009-04-15 11:17:23

2021-07-12 10:25:03

RocketMQ數(shù)據(jù)結(jié)構(gòu)kafka

2011-07-22 16:25:38

CA TechnoloIT消費化

2011-08-05 16:21:24

2021-04-20 08:32:51

消息MQ隊列

2023-06-01 08:08:38

kafka消費者分區(qū)策略

2024-03-14 11:58:43

2022-05-09 11:15:05

RocketMQPULL 模式PUSH 模式

2015-08-26 09:39:30

java消費者

2025-02-26 07:53:21

2023-03-28 07:08:09

RocketMQ消費者堆棧
點贊
收藏

51CTO技術棧公眾號