Kafka消費(fèi)位點(diǎn)管理沒(méi)你想的那么簡(jiǎn)單
背景
如果你習(xí)慣了使用RocketMQ這種自動(dòng)擋管理消費(fèi)位點(diǎn),消息失敗重試的方式。你再來(lái)使用kafka,會(huì)發(fā)現(xiàn)kafka這種手動(dòng)擋的消費(fèi)位點(diǎn)管理就沒(méi)那么容易了
熟悉RocketMQ的小伙伴都知道RocketMQ已經(jīng)默認(rèn)幫我實(shí)現(xiàn)好了消息消費(fèi)失敗重試,消費(fèi)位點(diǎn)自動(dòng)提交,死信隊(duì)列等功能,那么kafka是否也是如此呢?
kafka消費(fèi)位點(diǎn)管理
kafka消費(fèi)位點(diǎn)有兩種管理方式
- 手動(dòng)提交消費(fèi)位點(diǎn)
- 自動(dòng)提交消費(fèi)位點(diǎn)
自動(dòng)提交消費(fèi)位點(diǎn)
想要設(shè)置自動(dòng)提交消費(fèi)位點(diǎn)我們只需要設(shè)置兩個(gè)屬性
- ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 自動(dòng)提交消費(fèi)位點(diǎn)
- ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG 自動(dòng)提交消費(fèi)位點(diǎn)的時(shí)間間隔
一個(gè)簡(jiǎn)單的消費(fèi)代碼如下
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 自動(dòng)提交消費(fèi)位點(diǎn)的時(shí)間間隔
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
try {
handlerMessage(record);
} catch (Exception e) {
log.error("處理消息異常: {}", record, e);
// 循環(huán)繼續(xù)
}
}
}
自動(dòng)提交消費(fèi)位點(diǎn)有幾個(gè)缺點(diǎn)
- 會(huì)出現(xiàn)重復(fù)消費(fèi):比如Consumer每5秒自動(dòng)提交一次位移,如果在第4秒時(shí),消費(fèi)了消息,但是還沒(méi)有提交位移,此時(shí)Consumer掛掉了,那么下次Consumer啟動(dòng)時(shí),會(huì)從上次提交的位移開(kāi)始消費(fèi),這樣就會(huì)導(dǎo)致消息重復(fù)消費(fèi)。 當(dāng)然比如出現(xiàn)Rebalance也是會(huì)出現(xiàn)重復(fù)消費(fèi)的情況
- 無(wú)法精準(zhǔn)控制消費(fèi)位點(diǎn)
手動(dòng)提交消費(fèi)位點(diǎn)
手動(dòng)提交消費(fèi)位點(diǎn)又分兩種
- 同步提交(commitSync)
- 異步提交(commitAsync)
同步提交(commitSync)
同步提交的方式很簡(jiǎn)單,就是每次消費(fèi)完通過(guò)調(diào)用API consumer.commitSync。
相關(guān)的代碼如下:
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
// 注意這里消費(fèi)業(yè)務(wù)邏輯上消費(fèi)失敗后的消息處理
handlerMessage(records);
try {
// 消費(fèi)成功后手動(dòng)提交位點(diǎn)
consumer.commitSync();
} catch (CommitFailedException e) {
// 消費(fèi)位點(diǎn)提交失敗異常處理
handleError(e);
}
}
同步提交的方式有一個(gè)缺點(diǎn),調(diào)用commitSync()時(shí),Consumer會(huì)處于阻塞狀態(tài),直到broker返回提交成功,嚴(yán)重影響消費(fèi)性能。
異步提交(commitAsync)
異步提交的方式很簡(jiǎn)單,就是每次消費(fèi)完通過(guò)調(diào)用API consumer.commitAsync。
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
handlerMessage(records); // 處理消息
consumer.commitAsync((offsets, exception) -> {
if (exception != null)
handleError(exception);
});
}
commitAsync主要是提供了異步回調(diào),通過(guò)回調(diào)來(lái)通知消費(fèi)位點(diǎn)是否提交成功。
異步提交消費(fèi)位點(diǎn)也有一些缺點(diǎn),比如消費(fèi)位點(diǎn)不能重復(fù)提交。因?yàn)樘峤晃稽c(diǎn)失敗后,重新提交位點(diǎn)可能更晚的消費(fèi)位點(diǎn)已經(jīng)提交了,這里提交已經(jīng)是沒(méi)有意義的了。
spring-kafka消息消費(fèi)
可以看到不管是同步提交消費(fèi)位點(diǎn)還是異步提交消費(fèi)位點(diǎn),都有一些問(wèn)題,想要寫(xiě)出生產(chǎn)可用的消費(fèi)代碼,需要注意的細(xì)節(jié)非常多。
比如消費(fèi)失敗后的消息如何處理,是停止消費(fèi)跳出循環(huán),還是說(shuō)記錄消費(fèi)失敗的消息,人工處理等。
這里我們可以簡(jiǎn)單看看spring-kafka是如何消費(fèi)消息的。
我們簡(jiǎn)單看看主流程代碼:
圖片
這里我們忽略源碼的一些其他細(xì)節(jié)。只分析主要的消費(fèi)流程。
- invokeOnMessage(cRecord); 處理消息
可以看到invokeOnMessage是被整個(gè)try-catch包裹的,這樣就保證了消費(fèi)失敗后不會(huì)影響整個(gè)消費(fèi)流程。
具體我們先看看消息正常處理的邏輯。
private void invokeOnMessage(final ConsumerRecord<K, V> cRecord) {
if (cRecord.value() instanceof DeserializationException ex) {
throw ex;
}
if (cRecord.key() instanceof DeserializationException ex) {
throw ex;
}
if (cRecord.value() == null && this.checkNullValueForExceptions) {
checkDeser(cRecord, SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
}
if (cRecord.key() == null && this.checkNullKeyForExceptions) {
checkDeser(cRecord, SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
}
doInvokeOnMessage(cRecord);
if (this.nackSleepDurationMillis < 0 && !this.isManualImmediateAck) {
ackCurrent(cRecord);
}
if (this.isCountAck || this.isTimeOnlyAck) {
doProcessCommits();
}
}
這里主要是一些異常校驗(yàn),然后就是判斷是否可以提交消費(fèi)位點(diǎn)。如果可以則調(diào)用doProcessCommits()進(jìn)行正常的消費(fèi)位點(diǎn)提交。
- doProcessCommits() 消費(fèi)位點(diǎn)處理
如果消費(fèi)位點(diǎn)提交失敗也會(huì)進(jìn)行一些異常處理。
private void doProcessCommits() {
if (!this.autoCommit && !this.isRecordAck) {
try {
processCommits();
}
catch (CommitFailedException cfe) {
if (this.remainingRecords != null && !this.isBatchListener) {
ConsumerRecords<K, V> pending = this.remainingRecords;
this.remainingRecords = null;
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
for (ConsumerRecord<K, V> kvConsumerRecord : pending) {
records.add(kvConsumerRecord);
}
this.commonErrorHandler.handleRemaining(cfe, records, this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer);
}
}
}
}
如果消費(fèi)位點(diǎn)提交失敗則會(huì)調(diào)用commonErrorHandler進(jìn)行異常處理。
commonErrorHandler有多個(gè)實(shí)現(xiàn)類,有一個(gè)默認(rèn)實(shí)現(xiàn)DefaultErrorHandler
- 消息消費(fèi)失敗異常處理
如果消息消費(fèi)失敗,也提供了一個(gè)異常處理擴(kuò)展invokeErrorHandler(cRecord, iterator, e);
里面實(shí)際使用的也是DefaultErrorHandler
核心的處理邏輯主要還是在SeekUtils中封裝
- DefaultErrorHandler
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records,
Consumer<?, ?> consumer, MessageListenerContainer container) {
SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR
getFailureTracker(), this.logger, getLogLevel());
}
- SeekUtils
public static void seekOrRecover(Exception thrownException, @Nullable List<ConsumerRecord<?, ?>> records,
Consumer<?, ?> consumer, MessageListenerContainer container, boolean commitRecovered,
RecoveryStrategy recovery, LogAccessor logger, Level level) {}
可以看到有一個(gè)RecoveryStrategy參數(shù),這個(gè)是消息消費(fèi)失敗如何恢復(fù),比如我們需要手動(dòng)增加一個(gè)類似死信隊(duì)列的topic,這里消息消費(fèi)失敗就會(huì)自動(dòng)發(fā)送到我們的死信隊(duì)列
死信隊(duì)列的topic名字生成規(guī)則主要是topicName + -dlt
private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + "-dlt", cr.partition());
總結(jié)
可以看到如果我們單純的使用kafka-client原生的sdk來(lái)進(jìn)行消息消費(fèi),是非常容易出現(xiàn)問(wèn)題的。
我們需要很多細(xì)節(jié),比如
- 消息消費(fèi)失敗了如何處理,是否需要重試,如果重試還是失敗怎么辦?丟掉還是手動(dòng)處理丟到自己創(chuàng)建的死信隊(duì)列中。
- 消費(fèi)位點(diǎn)提交失敗了如何處理。
- 消費(fèi)位點(diǎn)是使用同步提交還是異步提交?或者混合提交?
所以如果spring boot項(xiàng)目還是建議使用spring相關(guān)已經(jīng)封裝好的kafka sdk。
非必要盡量不要使用原生的kafka-client sdk。