自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

在線等,Kafka如果丟了消息怎么辦?

開(kāi)發(fā) 架構(gòu) 開(kāi)發(fā)工具 Kafka
Kafka 存在丟消息的問(wèn)題,消息丟失會(huì)發(fā)生在 Broker,Producer 和 Consumer 三種。

[[383294]]

圖片來(lái)自 Pexels

 

Broker

Broker

丟失消息是由于 Kafka 本身的原因造成的,Kafka 為了得到更高的性能和吞吐量,將數(shù)據(jù)異步批量的存儲(chǔ)在磁盤中。

消息的刷盤過(guò)程,為了提高性能,減少刷盤次數(shù),Kafka 采用了批量刷盤的做法。即,按照一定的消息量,和時(shí)間間隔進(jìn)行刷盤。

這種機(jī)制也是由于 Linux 操作系統(tǒng)決定的。將數(shù)據(jù)存儲(chǔ)到 Linux 操作系統(tǒng)種,會(huì)先存儲(chǔ)到頁(yè)緩存(Page cache)中,按照時(shí)間或者其他條件進(jìn)行刷盤(從 Page Cache 到 file),或者通過(guò) fsync 命令強(qiáng)制刷盤。

數(shù)據(jù)在Page Cache中時(shí),如果系統(tǒng)掛掉,數(shù)據(jù)會(huì)丟失。

 

Broker 在 Linux 服務(wù)器上高速讀寫以及同步到 Replica

上圖簡(jiǎn)述了 Broker 寫數(shù)據(jù)以及同步的一個(gè)過(guò)程。Broker 寫數(shù)據(jù)只寫到 Page Cache 中,而 Page Cache 位于內(nèi)存。

這部分?jǐn)?shù)據(jù)在斷電后是會(huì)丟失的。Page Cache 的數(shù)據(jù)通過(guò) Linux 的 flusher 程序進(jìn)行刷盤。

刷盤觸發(fā)條件有三:

  • 主動(dòng)調(diào)用 sync 或 fsync 函數(shù)。
  • 可用內(nèi)存低于閾值。
  • dirty data 時(shí)間達(dá)到閾值。dirty 是 Page Cache 的一個(gè)標(biāo)識(shí)位,當(dāng)有數(shù)據(jù)寫入到 Page Cache 時(shí),Page Cache 被標(biāo)注為 dirty,數(shù)據(jù)刷盤以后,dirty 標(biāo)志清除。

Broker 配置刷盤機(jī)制,是通過(guò)調(diào)用 fsync 函數(shù)接管了刷盤動(dòng)作。從單個(gè) Broker 來(lái)看,Page Cache 的數(shù)據(jù)會(huì)丟失。

Kafka 沒(méi)有提供同步刷盤的方式。同步刷盤在 RocketMQ 中有實(shí)現(xiàn),實(shí)現(xiàn)原理是將異步刷盤的流程進(jìn)行阻塞,等待響應(yīng),類似 Ajax 的 callback 或者是 Java 的 future。

下面是一段 RocketMQ 的源碼:

  1. GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); 
  2. service.putRequest(request); 
  3. boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); // 刷盤 

也就是說(shuō),理論上,要完全讓 Kafka 保證單個(gè) Broker 不丟失消息是做不到的,只能通過(guò)調(diào)整刷盤機(jī)制的參數(shù)緩解該情況。

比如,減少刷盤間隔,減少刷盤數(shù)據(jù)量大小。時(shí)間越短,性能越差,可靠性越好(盡可能可靠)。這是一個(gè)選擇題。

為了解決該問(wèn)題,Kafka 通過(guò) Producer 和 Broker 協(xié)同處理單個(gè) Broker 丟失參數(shù)的情況。

一旦 Producer 發(fā)現(xiàn) Broker 消息丟失,即可自動(dòng)進(jìn)行 retry。除非 retry 次數(shù)超過(guò)閾值(可配置),消息才會(huì)丟失。

此時(shí)需要生產(chǎn)者客戶端手動(dòng)處理該情況。那么 Producer 是如何檢測(cè)到數(shù)據(jù)丟失的呢?是通過(guò) ack 機(jī)制,類似于 http 的三次握手的方式。

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed:

acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won’t generally know of any failures). The offset given back for each record will always be set to -1.

acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.

acks=allThis means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.

http://kafka.apache.org/20/documentation.html

以上的引用是 Kafka 官方對(duì)于參數(shù) acks 的解釋(在老版本中,該參數(shù)是 request.required.acks):

①acks=0,Producer 不等待 Broker 的響應(yīng),效率最高,但是消息很可能會(huì)丟。

②acks=1,leader broker 收到消息后,不等待其他 follower 的響應(yīng),即返回 ack。也可以理解為 ack 數(shù)為 1。

此時(shí),如果 follower 還沒(méi)有收到 leader 同步的消息 leader 就掛了,那么消息會(huì)丟失。

按照上圖中的例子,如果 leader 收到消息,成功寫入 PageCache 后,會(huì)返回 ack,此時(shí) Producer 認(rèn)為消息發(fā)送成功。

但此時(shí),按照上圖,數(shù)據(jù)還沒(méi)有被同步到 follower。如果此時(shí) leader 斷電,數(shù)據(jù)會(huì)丟失。

③acks=-1,leader broker 收到消息后,掛起,等待所有 ISR 列表中的 follower 返回結(jié)果后,再返回 ack。

-1 等效與 all。這種配置下,只有 leader 寫入數(shù)據(jù)到 pagecache 是不會(huì)返回 ack 的,還需要所有的 ISR 返回“成功”才會(huì)觸發(fā) ack。

如果此時(shí)斷電,Producer 可以知道消息沒(méi)有被發(fā)送成功,將會(huì)重新發(fā)送。如果在 follower 收到數(shù)據(jù)以后,成功返回 ack,leader 斷電,數(shù)據(jù)將存在于原來(lái)的 follower 中。在重新選舉以后,新的 leader 會(huì)持有該部分?jǐn)?shù)據(jù)。

數(shù)據(jù)從 leader 同步到 follower,需要 2 步:

  • 數(shù)據(jù)從 Page Cache 被刷盤到 disk。因?yàn)橹挥?disk 中的數(shù)據(jù)才能被同步到 replica。
  • 數(shù)據(jù)同步到 replica,并且 replica 成功將數(shù)據(jù)寫入 Page Cache。在 Producer 得到 ack 后,哪怕是所有機(jī)器都停電,數(shù)據(jù)也至少會(huì)存在于 leader 的磁盤內(nèi)。

上面第三點(diǎn)提到了 ISR 的列表的 follower,需要配合另一個(gè)參數(shù)才能更好的保證 ack 的有效性。

ISR 是 Broker 維護(hù)的一個(gè)“可靠的 follower 列表”,in-sync Replica 列表,Broker 的配置包含一個(gè)參數(shù):min.insync.replicas。

該參數(shù)表示 ISR 中最少的副本數(shù)。如果不設(shè)置該值,ISR 中的 follower 列表可能為空。此時(shí)相當(dāng)于 acks=1。

如上圖中: 

  • acks=0,總耗時(shí) f(t)=f(1)。
  • acks=1,總耗時(shí) f(t)=f(1)+f(2)。
  • acks=-1,總耗時(shí) f(t)=f(1)+max( f(A) , f(B) )+f(2)。

性能依次遞減,可靠性依次升高。

Producer

Producer丟失消息,發(fā)生在生產(chǎn)者客戶端。

為了提升效率,減少 IO,Producer 在發(fā)送數(shù)據(jù)時(shí)可以將多個(gè)請(qǐng)求進(jìn)行合并后發(fā)送。被合并的請(qǐng)求咋發(fā)送一線緩存在本地 buffer 中。

緩存的方式和前文提到的刷盤類似,Producer 可以將請(qǐng)求打包成“塊”或者按照時(shí)間間隔,將 buffer 中的數(shù)據(jù)發(fā)出。

通過(guò) buffer 我們可以將生產(chǎn)者改造為異步的方式,而這可以提升我們的發(fā)送效率。

但是,buffer 中的數(shù)據(jù)就是危險(xiǎn)的。在正常情況下,客戶端的異步調(diào)用可以通過(guò) callback 來(lái)處理消息發(fā)送失敗或者超時(shí)的情況。

但是,一旦 Producer 被非法的停止了,那么 buffer 中的數(shù)據(jù)將丟失,Broker 將無(wú)法收到該部分?jǐn)?shù)據(jù)。

又或者,當(dāng) Producer 客戶端內(nèi)存不夠時(shí),如果采取的策略是丟棄消息(另一種策略是 block 阻塞),消息也會(huì)被丟失。

抑或,消息產(chǎn)生(異步產(chǎn)生)過(guò)快,導(dǎo)致掛起線程過(guò)多,內(nèi)存不足,導(dǎo)致程序崩潰,消息丟失。

 

Producer 采取批量發(fā)送的示意圖

異步發(fā)送消息生產(chǎn)速度過(guò)快的示意圖 

根據(jù)上圖,可以想到幾個(gè)解決的思路:

  • 異步發(fā)送消息改為同步發(fā)送消?;蛘?service 產(chǎn)生消息時(shí),使用阻塞的線程池,并且線程數(shù)有一定上限。整體思路是控制消息產(chǎn)生速度。
  • 擴(kuò)大 Buffer 的容量配置。這種方式可以緩解該情況的出現(xiàn),但不能杜絕。
  • service 不直接將消息發(fā)送到 buffer(內(nèi)存),而是將消息寫到本地的磁盤中(數(shù)據(jù)庫(kù)或者文件),由另一個(gè)(或少量)生產(chǎn)線程進(jìn)行消息發(fā)送。相當(dāng)于是在 buffer 和 service 之間又加了一層空間更加富裕的緩沖層。

Consumer

Consumer 消費(fèi)消息有下面幾個(gè)步驟:

  • 接收消息
  • 處理消息
  • 反饋“處理完畢”(commited)

Consumer的消費(fèi)方式主要分為兩種:

  • 自動(dòng)提交 offset,Automatic Offset Committing
  • 手動(dòng)提交 offset,Manual Offset Control

Consumer 自動(dòng)提交的機(jī)制是根據(jù)一定的時(shí)間間隔,將收到的消息進(jìn)行 commit。commit 過(guò)程和消費(fèi)消息的過(guò)程是異步的。

也就是說(shuō),可能存在消費(fèi)過(guò)程未成功(比如拋出異常),commit 消息已經(jīng)提交了。此時(shí)消息就丟失了。

  1. Properties props = new Properties(); 
  2. props.put("bootstrap.servers""localhost:9092"); 
  3. props.put("group.id""test"); 
  4. // 自動(dòng)提交開(kāi)關(guān) 
  5. props.put("enable.auto.commit""true"); 
  6. // 自動(dòng)提交的時(shí)間間隔,此處是1s 
  7. props.put("auto.commit.interval.ms""1000"); 
  8. props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer"); 
  9. props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer"); 
  10. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
  11. consumer.subscribe(Arrays.asList("foo""bar")); 
  12. while (true) { 
  13.         // 調(diào)用poll后,1000ms后,消息狀態(tài)會(huì)被改為 committed 
  14.   ConsumerRecords<String, String> records = consumer.poll(100); 
  15.   for (ConsumerRecord<String, String> record : records) 
  16.     insertIntoDB(record); // 將消息入庫(kù),時(shí)間可能會(huì)超過(guò)1000ms 

上面的示例是自動(dòng)提交的例子。如果此時(shí),insertIntoDB(record) 發(fā)生異常,消息將會(huì)出現(xiàn)丟失。

接下來(lái)是手動(dòng)提交的例子:

  1. Properties props = new Properties(); 
  2. props.put("bootstrap.servers""localhost:9092"); 
  3. props.put("group.id""test"); 
  4. // 關(guān)閉自動(dòng)提交,改為手動(dòng)提交 
  5. props.put("enable.auto.commit""false"); 
  6. props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer"); 
  7. props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer"); 
  8. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
  9. consumer.subscribe(Arrays.asList("foo""bar")); 
  10. final int minBatchSize = 200; 
  11. List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); 
  12. while (true) { 
  13.         // 調(diào)用poll后,不會(huì)進(jìn)行auto commit 
  14.   ConsumerRecords<String, String> records = consumer.poll(100); 
  15.   for (ConsumerRecord<String, String> record : records) { 
  16.     buffer.add(record); 
  17.   } 
  18.   if (buffer.size() >= minBatchSize) { 
  19.     insertIntoDb(buffer); 
  20.                 // 所有消息消費(fèi)完畢以后,才進(jìn)行commit操作 
  21.     consumer.commitSync(); 
  22.     buffer.clear(); 
  23.  } 

將提交類型改為手動(dòng)以后,可以保證消息“至少被消費(fèi)一次”(at least once)。但此時(shí)可能出現(xiàn)重復(fù)消費(fèi)的情況,重復(fù)消費(fèi)不屬于本篇討論范圍。

上面兩個(gè)例子,是直接使用 Consumer 的 High level API,客戶端對(duì)于 offset 等控制是透明的。

也可以采用 Low level API 的方式,手動(dòng)控制 offset,也可以保證消息不丟,不過(guò)會(huì)更加復(fù)雜。

  1. try { 
  2.      while(running) { 
  3.          ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); 
  4.          for (TopicPartition partition : records.partitions()) { 
  5.              List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); 
  6.              for (ConsumerRecord<String, String> record : partitionRecords) { 
  7.                  System.out.println(record.offset() + ": " + record.value()); 
  8.              } 
  9.              long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); 
  10.              // 精確控制offset 
  11.              consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); 
  12.          } 
  13.      } 
  14.  } finally { 
  15.    consumer.close(); 
  16.  } 

作者:DongGuoChao

編輯:陶家龍

出處:https://blog.dogchao.cn/?p=305

責(zé)任編輯:武曉燕 來(lái)源: blog.dogchao
相關(guān)推薦

2022-10-31 09:30:32

kafkaconsumer服務(wù)端

2021-01-05 10:48:38

RedisAOF日志RDB快照

2022-04-22 10:30:07

框架JavaScript前端

2021-11-16 07:02:05

函數(shù)Python返回值

2021-01-23 23:18:21

Windows 10Windows微軟

2018-09-04 16:01:33

工作挑戰(zhàn)性技術(shù)

2024-12-12 14:56:48

消息積壓MQ分區(qū)

2018-12-29 08:31:35

2023-07-07 00:54:05

2017-12-08 11:14:21

2017-06-30 13:23:59

SaaS供應(yīng)商破產(chǎn)

2015-10-10 10:42:03

企業(yè)級(jí)云服務(wù)AWS

2022-02-17 08:57:18

內(nèi)存設(shè)計(jì)進(jìn)程

2022-07-14 10:23:39

數(shù)據(jù)

2022-07-14 10:16:22

Flink

2010-12-22 14:40:51

3Q大戰(zhàn)

2018-10-30 10:49:53

物聯(lián)網(wǎng)IoT智能設(shè)備

2013-02-28 11:00:51

IE10瀏覽器

2020-06-19 07:42:04

人工智能

2015-10-28 17:09:13

技術(shù)創(chuàng)業(yè)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)