自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

詳解Kafka消息發(fā)送重試機(jī)制的案例

開(kāi)發(fā) 前端
你可以為 Kafka 生產(chǎn)者設(shè)置一個(gè)健壯的重試策略,以確保在面對(duì)網(wǎng)絡(luò)問(wèn)題或 Kafka 集群短暫不可用時(shí),消息能夠被可靠地發(fā)送。

在 Kafka 生產(chǎn)者中實(shí)現(xiàn)消息發(fā)送的重試機(jī)制,可以通過(guò)配置 KafkaProducer 的相關(guān)屬性來(lái)實(shí)現(xiàn)。以下是一些關(guān)鍵的配置項(xiàng):

retries:設(shè)置生產(chǎn)者發(fā)送失敗后重試的次數(shù)。

retry.backoff.ms:設(shè)置生產(chǎn)者在重試前等待的時(shí)間。

buffer.memory:設(shè)置生產(chǎn)者在內(nèi)存中緩存數(shù)據(jù)的最大值,如果達(dá)到這個(gè)值,生產(chǎn)者會(huì)拒絕接受新的消息,直到當(dāng)前緩存的消息被發(fā)送出去。

batch.size:設(shè)置生產(chǎn)者在發(fā)送批次中可以包含的最大消息數(shù)。

linger.ms:設(shè)置生產(chǎn)者在發(fā)送批次之前等待更多消息的最大時(shí)間。

max.in.flight.requests.per.connection:設(shè)置每個(gè)連接最多數(shù)未完成的請(qǐng)求。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerDemo {
   
    public static void main(String[] args) {
   
        // 配置生產(chǎn)者屬性
        Properties props = new Properties();
        props.put("bootstrap.servers", "4.5.8.4:9092");
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        props.put("retries", 5); // 設(shè)置重試次數(shù)
        props.put("retry.backoff.ms", 100); // 設(shè)置重試間隔
        props.put("buffer.memory", 33554432); // 設(shè)置緩沖區(qū)大小
        props.put("batch.size", 16384); // 設(shè)置批次大小
        props.put("linger.ms", 1); // 設(shè)置等待時(shí)間
        props.put("max.in.flight.requests.per.connection", 5); // 設(shè)置最大在途請(qǐng)求數(shù)

        // 創(chuàng)建生產(chǎn)者實(shí)例
        Producer<String, String> producer = new KafkaProducer<>(props);

        // 發(fā)送消息
        for (int i = 0; i < 1000000; i++) {
   
            String key = "案例1=====" + i;
            System.out.println("key:"+key);
            String value = "Spring AI Alibaba 實(shí)現(xiàn)了與阿里云通義模型的完整適配,接下來(lái),我們將學(xué)習(xí)如何使用 spring ai alibaba 開(kāi)發(fā)一個(gè)基于通義模型服務(wù)的智能聊天應(yīng)用" + i;
            ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", key, value);
            producer.send(record, (metadata, exception) -> {
   
                if (exception != null) {
   
                    // 處理消息發(fā)送失敗的情況
                    System.err.println("發(fā)送消息失?。? + exception.getMessage());
                } else {
   
                    // 處理消息發(fā)送成功的情況
                    System.out.println("消息發(fā)送成功,偏移量:" + metadata.offset());
                }
            });
        }

        // 關(guān)閉生產(chǎn)者
        producer.close();
    }
}


在這個(gè)示例中,我們?cè)O(shè)置了重試次數(shù)、重試間隔、緩沖區(qū)大小、批次大小、等待時(shí)間和最大在途請(qǐng)求數(shù)。此外,我們還為 send 方法提供了一個(gè)回調(diào)函數(shù),用于處理消息發(fā)送成功或失敗的情況。這樣,當(dāng)消息發(fā)送失敗時(shí),生產(chǎn)者會(huì)自動(dòng)重試,直到達(dá)到配置的重試次數(shù)。如果所有重試都失敗,回調(diào)函數(shù)會(huì)收到異常通知,你可以在回調(diào)中實(shí)現(xiàn)進(jìn)一步的錯(cuò)誤處理邏輯。

?? 如何配置Kafka生產(chǎn)者的重試策略?

其實(shí)上面也有說(shuō),再次總結(jié)下

要配置 Kafka 生產(chǎn)者的重試策略,你可以按照以下步驟進(jìn)行:

  1. 設(shè)置重試次數(shù):
  • 通過(guò)設(shè)置 retries 屬性來(lái)指定生產(chǎn)者在遇到錯(cuò)誤時(shí)重試發(fā)送消息的次數(shù)。例如,設(shè)置 retries 為 3 表示生產(chǎn)者會(huì)嘗試最多 3 次發(fā)送消息。
  1. 設(shè)置重試間隔:
  • 使用 retry.backoff.ms 屬性來(lái)配置重試之間的時(shí)間間隔。這個(gè)設(shè)置可以防止生產(chǎn)者在連續(xù)的短時(shí)間內(nèi)發(fā)送大量重試請(qǐng)求,給 Kafka 集群或網(wǎng)絡(luò)造成壓力。
  1. 確保消息冪等性:
  • 設(shè)置 enable.idempotence 為 true 以確保生產(chǎn)者發(fā)送消息的邏輯是冪等的,即使消息被重復(fù)發(fā)送也不會(huì)影響系統(tǒng)狀態(tài)。
  1. 配置確認(rèn)策略:
  • 通過(guò) acks 屬性來(lái)確保消息被所有副本確認(rèn)。例如,設(shè)置 acks 為 “all” 可以確保消息被所有副本確認(rèn)后才認(rèn)為是成功發(fā)送。
  1. 異步發(fā)送與回調(diào):
  • 使用異步發(fā)送消息,并在回調(diào)中處理發(fā)送失敗的情況。在回調(diào)中對(duì)異常進(jìn)行分類處理,對(duì)于可恢復(fù)的錯(cuò)誤進(jìn)行重試,對(duì)于不可恢復(fù)的錯(cuò)誤進(jìn)行日志記錄或報(bào)警。
  1. 錯(cuò)誤處理與日志記錄:
  • 在回調(diào)函數(shù)中捕獲并處理異常,同時(shí)記錄詳細(xì)的錯(cuò)誤日志,便于問(wèn)題排查和監(jiān)控。
  1. 監(jiān)控與告警:
  • 對(duì)生產(chǎn)者的關(guān)鍵性能指標(biāo)進(jìn)行監(jiān)控,如發(fā)送延遲、吞吐量等。當(dāng)指標(biāo)出現(xiàn)異常時(shí),及時(shí)觸發(fā)告警通知相關(guān)人員處理。
  1. 合理配置重試機(jī)制:
  • 根據(jù)業(yè)務(wù)需求合理配置重試次數(shù)和重試間隔,以減少因網(wǎng)絡(luò)波動(dòng)或 Kafka 集群短暫不可用導(dǎo)致的消息丟失風(fēng)險(xiǎn)。
  1. 設(shè)置最大在途請(qǐng)求:
  • 通過(guò) max.in.flight.requests.per.connection 屬性限制每個(gè)連接最多數(shù)未完成的請(qǐng)求,這有助于控制內(nèi)存使用和重試的并發(fā)量。
  • 配置超時(shí)時(shí)間:
  • Kafka 2.4 版本引入了 delivery.timeout.ms 參數(shù),它設(shè)置了發(fā)送記錄和接收確認(rèn)之間的超時(shí)時(shí)間。這個(gè)參數(shù)與 retries 結(jié)合使用,可以提供更靈活的重試控制。

通過(guò)上述配置,你可以為 Kafka 生產(chǎn)者設(shè)置一個(gè)健壯的重試策略,以確保在面對(duì)網(wǎng)絡(luò)問(wèn)題或 Kafka 集群短暫不可用時(shí),消息能夠被可靠地發(fā)送。

責(zé)任編輯:武曉燕 來(lái)源: 程序員子龍
相關(guān)推薦

2022-11-14 08:19:59

重試機(jī)制Kafka

2024-09-25 08:32:05

2025-02-26 10:49:14

2020-07-19 15:39:37

Python開(kāi)發(fā)工具

2021-02-20 10:02:22

Spring重試機(jī)制Java

2022-05-06 07:44:10

微服務(wù)系統(tǒng)設(shè)計(jì)重試機(jī)制

2023-10-27 08:20:12

springboot微服務(wù)

2017-07-02 16:50:21

2017-06-16 15:16:15

2023-11-27 07:44:59

RabbitMQ機(jī)制

2023-11-06 08:00:38

接口高可用機(jī)制

2024-08-13 15:46:57

2025-04-18 03:00:00

2024-01-04 18:01:55

高并發(fā)SpringBoot

2025-02-27 09:35:22

2023-10-13 10:44:35

OC消息發(fā)送

2024-09-30 08:30:37

2024-03-20 08:33:00

Kafka線程安全Rebalance

2021-08-30 13:08:56

Kafka網(wǎng)絡(luò)通信

2025-02-26 08:10:40

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)