詳解Kafka消息發(fā)送重試機(jī)制的案例
在 Kafka 生產(chǎn)者中實(shí)現(xiàn)消息發(fā)送的重試機(jī)制,可以通過(guò)配置 KafkaProducer 的相關(guān)屬性來(lái)實(shí)現(xiàn)。以下是一些關(guān)鍵的配置項(xiàng):
retries:設(shè)置生產(chǎn)者發(fā)送失敗后重試的次數(shù)。
retry.backoff.ms:設(shè)置生產(chǎn)者在重試前等待的時(shí)間。
buffer.memory:設(shè)置生產(chǎn)者在內(nèi)存中緩存數(shù)據(jù)的最大值,如果達(dá)到這個(gè)值,生產(chǎn)者會(huì)拒絕接受新的消息,直到當(dāng)前緩存的消息被發(fā)送出去。
batch.size:設(shè)置生產(chǎn)者在發(fā)送批次中可以包含的最大消息數(shù)。
linger.ms:設(shè)置生產(chǎn)者在發(fā)送批次之前等待更多消息的最大時(shí)間。
max.in.flight.requests.per.connection:設(shè)置每個(gè)連接最多數(shù)未完成的請(qǐng)求。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerDemo {
public static void main(String[] args) {
// 配置生產(chǎn)者屬性
Properties props = new Properties();
props.put("bootstrap.servers", "4.5.8.4:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("retries", 5); // 設(shè)置重試次數(shù)
props.put("retry.backoff.ms", 100); // 設(shè)置重試間隔
props.put("buffer.memory", 33554432); // 設(shè)置緩沖區(qū)大小
props.put("batch.size", 16384); // 設(shè)置批次大小
props.put("linger.ms", 1); // 設(shè)置等待時(shí)間
props.put("max.in.flight.requests.per.connection", 5); // 設(shè)置最大在途請(qǐng)求數(shù)
// 創(chuàng)建生產(chǎn)者實(shí)例
Producer<String, String> producer = new KafkaProducer<>(props);
// 發(fā)送消息
for (int i = 0; i < 1000000; i++) {
String key = "案例1=====" + i;
System.out.println("key:"+key);
String value = "Spring AI Alibaba 實(shí)現(xiàn)了與阿里云通義模型的完整適配,接下來(lái),我們將學(xué)習(xí)如何使用 spring ai alibaba 開(kāi)發(fā)一個(gè)基于通義模型服務(wù)的智能聊天應(yīng)用" + i;
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", key, value);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// 處理消息發(fā)送失敗的情況
System.err.println("發(fā)送消息失?。? + exception.getMessage());
} else {
// 處理消息發(fā)送成功的情況
System.out.println("消息發(fā)送成功,偏移量:" + metadata.offset());
}
});
}
// 關(guān)閉生產(chǎn)者
producer.close();
}
}
在這個(gè)示例中,我們?cè)O(shè)置了重試次數(shù)、重試間隔、緩沖區(qū)大小、批次大小、等待時(shí)間和最大在途請(qǐng)求數(shù)。此外,我們還為 send 方法提供了一個(gè)回調(diào)函數(shù),用于處理消息發(fā)送成功或失敗的情況。這樣,當(dāng)消息發(fā)送失敗時(shí),生產(chǎn)者會(huì)自動(dòng)重試,直到達(dá)到配置的重試次數(shù)。如果所有重試都失敗,回調(diào)函數(shù)會(huì)收到異常通知,你可以在回調(diào)中實(shí)現(xiàn)進(jìn)一步的錯(cuò)誤處理邏輯。
?? 如何配置Kafka生產(chǎn)者的重試策略?
其實(shí)上面也有說(shuō),再次總結(jié)下
要配置 Kafka 生產(chǎn)者的重試策略,你可以按照以下步驟進(jìn)行:
- 設(shè)置重試次數(shù):
- 通過(guò)設(shè)置 retries 屬性來(lái)指定生產(chǎn)者在遇到錯(cuò)誤時(shí)重試發(fā)送消息的次數(shù)。例如,設(shè)置 retries 為 3 表示生產(chǎn)者會(huì)嘗試最多 3 次發(fā)送消息。
- 設(shè)置重試間隔:
- 使用 retry.backoff.ms 屬性來(lái)配置重試之間的時(shí)間間隔。這個(gè)設(shè)置可以防止生產(chǎn)者在連續(xù)的短時(shí)間內(nèi)發(fā)送大量重試請(qǐng)求,給 Kafka 集群或網(wǎng)絡(luò)造成壓力。
- 確保消息冪等性:
- 設(shè)置 enable.idempotence 為 true 以確保生產(chǎn)者發(fā)送消息的邏輯是冪等的,即使消息被重復(fù)發(fā)送也不會(huì)影響系統(tǒng)狀態(tài)。
- 配置確認(rèn)策略:
- 通過(guò) acks 屬性來(lái)確保消息被所有副本確認(rèn)。例如,設(shè)置 acks 為 “all” 可以確保消息被所有副本確認(rèn)后才認(rèn)為是成功發(fā)送。
- 異步發(fā)送與回調(diào):
- 使用異步發(fā)送消息,并在回調(diào)中處理發(fā)送失敗的情況。在回調(diào)中對(duì)異常進(jìn)行分類處理,對(duì)于可恢復(fù)的錯(cuò)誤進(jìn)行重試,對(duì)于不可恢復(fù)的錯(cuò)誤進(jìn)行日志記錄或報(bào)警。
- 錯(cuò)誤處理與日志記錄:
- 在回調(diào)函數(shù)中捕獲并處理異常,同時(shí)記錄詳細(xì)的錯(cuò)誤日志,便于問(wèn)題排查和監(jiān)控。
- 監(jiān)控與告警:
- 對(duì)生產(chǎn)者的關(guān)鍵性能指標(biāo)進(jìn)行監(jiān)控,如發(fā)送延遲、吞吐量等。當(dāng)指標(biāo)出現(xiàn)異常時(shí),及時(shí)觸發(fā)告警通知相關(guān)人員處理。
- 合理配置重試機(jī)制:
- 根據(jù)業(yè)務(wù)需求合理配置重試次數(shù)和重試間隔,以減少因網(wǎng)絡(luò)波動(dòng)或 Kafka 集群短暫不可用導(dǎo)致的消息丟失風(fēng)險(xiǎn)。
- 設(shè)置最大在途請(qǐng)求:
- 通過(guò) max.in.flight.requests.per.connection 屬性限制每個(gè)連接最多數(shù)未完成的請(qǐng)求,這有助于控制內(nèi)存使用和重試的并發(fā)量。
- 配置超時(shí)時(shí)間:
- Kafka 2.4 版本引入了 delivery.timeout.ms 參數(shù),它設(shè)置了發(fā)送記錄和接收確認(rèn)之間的超時(shí)時(shí)間。這個(gè)參數(shù)與 retries 結(jié)合使用,可以提供更靈活的重試控制。
通過(guò)上述配置,你可以為 Kafka 生產(chǎn)者設(shè)置一個(gè)健壯的重試策略,以確保在面對(duì)網(wǎng)絡(luò)問(wèn)題或 Kafka 集群短暫不可用時(shí),消息能夠被可靠地發(fā)送。