如果一批消息按照順序消費,是不可能出現(xiàn)第 100 條消息消費成功了,但第 50 條消費失敗的情況,因為第 50 條消息失敗的時候,應該退出循環(huán),不再繼續(xù)進行消費。
大家好,我是君哥。
最近有讀者參加面試時被問了一個問題,如果消費者拉取了一批消息,比如 100 條,第 100 條消息消費成功了,但是第 50 條消費失敗,偏移量會怎樣更新?就著這個問題,今天來聊一下,如果一批消息有消費失敗的情況時,偏移量怎么保存。
1 拉取消息
1.1 封裝拉取請求
以 RocketMQ 推模式為例,RocketMQ 消費者啟動代碼如下:
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
try{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
}catch (Exception e){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
上面的 DefaultMQPushConsumer 是一個推模式的消費者,啟動方法是 start。消費者啟動后會觸發(fā)重平衡線程(RebalanceService),這個線程的任務是在死循環(huán)中不停地進行重平衡,最終封裝拉取消息的請求到 pullRequestQueue。這個過程涉及到的 UML 類圖如下:

1.2 處理拉取請求
封裝好拉取消息的請求 PullRequest 后,RocketMQ 就會不停地從 pullRequestQueue 獲取消息拉取請求進行處理。UML 類圖如下:

拉取消息的入口方法是一個死循環(huán),代碼如下:
//PullMessageService
public void run(){
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
這里拉取到消息后,提交給 PullCallback 這個回調(diào)函數(shù)進行處理。
拉取到的消息首先被 put 到 ProcessQueue 中的 msgTreeMap 上,然后被封裝到 ConsumeRequest 這個線程類來處理。把代碼精簡后,ConsumeRequest 處理邏輯如下:
//ConsumeMessageConcurrentlyService.java
public void run(){
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
try {
//1.執(zhí)行消費邏輯,這里的邏輯是在文章開頭的代碼中定義的
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
}
if (!processQueue.isDropped()) {
//2.處理消費結(jié)果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}
2 處理消費結(jié)果
2.1 并發(fā)消息
并發(fā)消息處理消費結(jié)果的代碼做精簡后如下:
//ConsumeMessageConcurrentlyService.java
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
){
int ackIndex = context.getAckIndex();
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
break;
case RECONSUME_LATER:
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
}
break;
default:
break;
}
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
從上面的代碼可以看出,如果處理消息的邏輯是串行的,比如文章開頭的代碼使用 for 循環(huán)來處理消息,那如果在某一條消息處理失敗了,直接退出循環(huán),給 ConsumeConcurrentlyContext 的 ackIndex 變量賦值為消息列表中失敗消息的位置,這樣這條失敗消息后面的消息就不再處理了,發(fā)送給 Broker 等待重新拉取。代碼如下:
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
for (int i = 0; i < msgs.size(); i++) {
try{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
}catch (Exception e){
context.setAckIndex(i);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
消費成功的消息則從 ProcessQueue 中的 msgTreeMap 中移除,并且返回 msgTreeMap 中最小的偏移量(firstKey)去更新。注意:集群模式偏移量保存在 Broker 端,更新偏移量需要發(fā)送消息到 Broker,而廣播模式偏移量保存在 Consumer 端,只需要更新本地偏移量就可以。
如果處理消息的邏輯是并行的,處理消息失敗后給 ackIndex 賦值是沒有意義的,因為可能有多條消息失敗,給 ackIndex 變量賦值并不準確。最好的方法就是給 ackIndex 賦值 0,整批消息全部重新消費,這樣又可能帶來冥等問題。
2.2 順序消息
對于順序消息,從 msgTreeMap 取出消息后,先要放到 consumingMsgOrderlyTreeMap 上面,更新偏移量時,是從 consumingMsgOrderlyTreeMap 上取最大的消息偏移量(lastKey)。
3 總結(jié)
回到開頭的問題,如果一批消息按照順序消費,是不可能出現(xiàn)第 100 條消息消費成功了,但第 50 條消費失敗的情況,因為第 50 條消息失敗的時候,應該退出循環(huán),不再繼續(xù)進行消費。
如果是并發(fā)消費,如果出現(xiàn)了這種情況,建議是整批消息全部重新消費,也就是給 ackIndex 賦值 0,這樣必須考慮冥等問題。