自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Kafka生產(chǎn)環(huán)境實(shí)戰(zhàn)經(jīng)驗(yàn)深度總結(jié),讓你少走彎路

云計(jì)算 Kafka
本文結(jié)合本人在多個(gè)生產(chǎn)項(xiàng)目中使用 Kafka 的經(jīng)驗(yàn),圍繞以下幾個(gè)方面展開:消息丟失防范、重復(fù)消費(fèi)控制、性能瓶頸優(yōu)化、集群運(yùn)維策略,以及 Topic、分區(qū)、副本機(jī)制的設(shè)計(jì)要點(diǎn)。

1.背景

在實(shí)際項(xiàng)目中接入 Kafka 已經(jīng)成為高并發(fā)系統(tǒng)的標(biāo)配。然而,從簡(jiǎn)單的“能跑”到“穩(wěn)定高效地跑”,中間有太多坑值得記錄和總結(jié)。本文結(jié)合本人在多個(gè)生產(chǎn)項(xiàng)目中使用 Kafka 的經(jīng)驗(yàn),圍繞以下幾個(gè)方面展開:消息丟失防范、重復(fù)消費(fèi)控制、性能瓶頸優(yōu)化、集群運(yùn)維策略,以及 Topic、分區(qū)、副本機(jī)制的設(shè)計(jì)要點(diǎn)。先來(lái)看看kafka的基礎(chǔ)架構(gòu)圖,有個(gè)整體認(rèn)識(shí):

圖片圖片

接下我們我們就從生產(chǎn)者、服務(wù)端broker、服務(wù)端去講述下實(shí)戰(zhàn)經(jīng)驗(yàn)心得。

2.生產(chǎn)者如何提高吞吐量?

下面來(lái)看看生產(chǎn)者發(fā)送一條消息到kafka服務(wù)端的流程:

圖片圖片

在消息發(fā)送的過(guò)程中,涉及到了兩個(gè)線程——main 線程和 Sender 線程。在 main 線程中創(chuàng)建了一個(gè)雙端隊(duì)列RecordAccumulator。main 線程將消息發(fā)送給 RecordAccumulator,Sender 線程不斷從 RecordAccumulator 中拉取消息發(fā)送到 Kafka服務(wù)端 Broker

可以適當(dāng)調(diào)整以下四個(gè)生產(chǎn)者的參數(shù)來(lái)提高吞吐量:

參數(shù)

說(shuō)明

batch.size

提交一批數(shù)據(jù)到緩沖區(qū)的最大值,默認(rèn) 16k。適當(dāng)增加該值,可以提高吞吐量,但是如果該值設(shè)置太大,會(huì)導(dǎo)致數(shù)據(jù)傳輸延遲增加。

linger.ms

如果數(shù)據(jù)遲遲未達(dá)到 batch.size,sender 等待 linger.time之后就會(huì)發(fā)送數(shù)據(jù)。單位 ms,默認(rèn)值是 0ms,表示沒有延遲。生產(chǎn)環(huán)境建議該值大小為 5-100ms 之間。

buffer.memory

RecordAccumulator 緩沖區(qū)總大小,默認(rèn) 32m??梢赃m當(dāng)增加該值提高緩沖區(qū)的存儲(chǔ)能力

compression.type

生產(chǎn)者發(fā)送的所有數(shù)據(jù)的壓縮方式。默認(rèn)是 none,也就是不壓縮。支持壓縮類型:none、gzip、snappy、lz4 和 zstd。

這些參數(shù)的應(yīng)用思想都很好理解,就好比我們現(xiàn)實(shí)生活中集散中心大巴車?yán)?,一次拉一個(gè),有人就走。這種方式效率低下,浪費(fèi)資源。所以一般都是車到了多等一下,等人數(shù)差不多才走,這就是參數(shù)batch.size和linger.ms的提醒,buffer.memory其實(shí)也好理解,就好比車送到目的地只能容納100個(gè)人,你使勁送過(guò)去,收不下,只能目的把這個(gè)100個(gè)人安頓好,才能接著送,所以適當(dāng)調(diào)大,可以增加吞吐量,至于壓縮compression.type就是讓一次可以拉更多的人,就好比讓小孩子和大人用一個(gè)座位。

3.如何保證消息不丟失?

消息丟失可能發(fā)生在生產(chǎn)者發(fā)送消息、broker保存消息、消費(fèi)者消費(fèi)消息等環(huán)節(jié)。

3.1 生產(chǎn)者丟失消息

生產(chǎn)者丟失消息是比較常見的場(chǎng)景,生產(chǎn)者發(fā)送消息到kafka,因?yàn)榫W(wǎng)絡(luò)抖動(dòng)最后發(fā)現(xiàn)kakfa沒保存,這鍋該誰(shuí)背?答案是生產(chǎn)者,因?yàn)?Kafka Producer 是異步發(fā)送消息的,也就是說(shuō)如果你調(diào)用的是 producer.send(msg) 這個(gè) API,那么它通常會(huì)立即返回,但此時(shí)你不能認(rèn)為消息發(fā)送已成功完成。解決辦法也很簡(jiǎn)單:**Producer 永遠(yuǎn)要使用帶有回調(diào)通知的發(fā)送 API,也就是說(shuō)不要使用 producer.send(msg),而要使用 producer.send(msg, callback)**,通過(guò)回調(diào)callback才能真正知道消息是否成功發(fā)送

設(shè)置重試 retries 。這里的 retries 同樣是 Producer 的參數(shù),對(duì)應(yīng)前面提到的 Producer 自動(dòng)重試。當(dāng)出現(xiàn)網(wǎng)絡(luò)的瞬時(shí)抖動(dòng)時(shí),消息發(fā)送可能會(huì)失敗,此時(shí)配置了 retries > 0 的 Producer 能夠自動(dòng)重試消息發(fā)送,避免消息丟失。

3.2 broker丟失消息

設(shè)置 acks = all。acks 是 Producer 的一個(gè)參數(shù),代表了你對(duì)“已提交”消息的定義。如果設(shè)置成 all,表示生產(chǎn)者發(fā)送過(guò)來(lái)的數(shù)據(jù),Leader和ISR隊(duì)列里面的所有節(jié)點(diǎn)收到數(shù)據(jù)后才應(yīng)答。

參數(shù)

說(shuō)明

acks

0:生產(chǎn)者發(fā)送過(guò)來(lái)的數(shù)據(jù),不需要等數(shù)據(jù)落盤應(yīng)答。
1:生產(chǎn)者發(fā)送過(guò)來(lái)的數(shù)據(jù),Leader 收到數(shù)據(jù)后應(yīng)答。
-1(all):生產(chǎn)者發(fā)送過(guò)來(lái)的數(shù)據(jù),Leader+和 isr 隊(duì)列里面的所有節(jié)點(diǎn)收到數(shù)據(jù)后應(yīng)答。默認(rèn)值是-1,-1 和all 是等價(jià)的。

設(shè)置 unclean.leader.election.enable = false。這是 Broker 端的參數(shù),它控制的是哪些 Broker 有資格競(jìng)選分區(qū)的 Leader。如果一個(gè) Broker 落后原先的 Leader 太多,那么它一旦成為新的 Leader,必然會(huì)造成消息的丟失。故一般都要將該參數(shù)設(shè)置成 false,即不允許這種情況的發(fā)生。

設(shè)置 replication.factor >= 3。這也是 Broker 端的參數(shù)。其實(shí)這里想表述的是,最好將消息多保存幾份,畢竟目前防止消息丟失的主要機(jī)制就是冗余。

設(shè)置 min.insync.replicas > 1。這依然是 Broker 端參數(shù),控制的是消息至少要被寫入到多少個(gè)副本才算是“已提交”。設(shè)置成大于 1 可以提升消息持久性。在實(shí)際環(huán)境中千萬(wàn)不要使用默認(rèn)值 1。

確保 replication.factor > min.insync.replicas。如果兩者相等,那么只要有一個(gè)副本掛機(jī),整個(gè)分區(qū)就無(wú)法正常工作了。我們不僅要改善消息的持久性,防止數(shù)據(jù)丟失,還要在不降低可用性的基礎(chǔ)上完成。推薦設(shè)置成 replication.factor = min.insync.replicas + 1

3.3 消費(fèi)者丟失消息

Consumer 程序有個(gè)“位移”的概念,表示的是這個(gè) Consumer 當(dāng)前消費(fèi)到的 Topic 分區(qū)的位置。如果我們一次消費(fèi)offset為0-9的10條消息,拉取到消息之后就自動(dòng)提交了位移,但是消費(fèi)到位移5的時(shí)候報(bào)錯(cuò)了,那么位移5-9的消息就被丟失了。

解決辦法也很簡(jiǎn)單就是確保消息消費(fèi)完成再提交。Consumer 端有個(gè)參數(shù) enable.auto.commit,最好把它設(shè)置成 false關(guān)閉自動(dòng)提交,并采用手動(dòng)提交位移的方式。如果啟用了自動(dòng)提交,Consumer 端還有個(gè)參數(shù)就派上用場(chǎng)了:auto.commit.interval.ms。它的默認(rèn)值是 5 秒,表明 Kafka 每 5 秒會(huì)為你自動(dòng)提交一次位移。

手動(dòng)提交位移是保證消費(fèi)者消息消息過(guò)程中不丟失消息的核心所在,手動(dòng)提交分為同步和異步,同步提交會(huì)使消費(fèi)者處于阻塞狀態(tài),直到遠(yuǎn)端的 Broker 返回提交結(jié)果。而異步提交它會(huì)立即返回,不會(huì)阻塞,因此不會(huì)影響 Consumer 應(yīng)用的 TPS。但是異步條件有一個(gè)缺點(diǎn)就是發(fā)生了異常我們無(wú)法立刻感知到并相應(yīng)邏輯處理,所以代碼里面的提交位移邏輯一般是:同步+異步

try {
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            process(records); // 處理消息
            kafkaConsumer.commitAsync(); // 使用異步提交規(guī)避阻塞
        }
    } catch (Exception e) {
        handle(e); // 處理異常
    } finally {
        try {
            kafkaConsumer.commitSync(); // 最后一次提交使用同步阻塞式提交
        } finally {
            kafkaConsumer.close();
        }
    }

這段代碼同時(shí)使用了 commitSync() 和 commitAsync()。對(duì)于常規(guī)性、階段性的手動(dòng)提交,我們調(diào)用 commitAsync() 避免程序阻塞,而在 Consumer 要關(guān)閉前,我們調(diào)用 commitSync() 方法執(zhí)行同步阻塞式的位移提交,以確保 Consumer 關(guān)閉前能夠保存正確的位移數(shù)據(jù)。將兩者結(jié)合后,我們既實(shí)現(xiàn)了異步無(wú)阻塞式的位移管理,也確保了 Consumer 位移的正確性,所以,如果你需要自行編寫代碼開發(fā)一套 Kafka Consumer 應(yīng)用,那么我推薦你使用上面的代碼范例來(lái)實(shí)現(xiàn)手動(dòng)的位移提交。

關(guān)于提交位移有一個(gè)可能發(fā)生的異常:CommitFailedException,顧名思義就是 Consumer 客戶端在提交位移時(shí)出現(xiàn)了錯(cuò)誤或異常,而且還是那種不可恢復(fù)的嚴(yán)重異常。這是因?yàn)樵诶〉较⑾M(fèi)完之后提交位移這期間,消費(fèi)者組發(fā)生了重平衡。關(guān)于什么是重平衡可以看后續(xù)總結(jié)講述。

4.如何保證消息不會(huì)重復(fù)消費(fèi)?

在生產(chǎn)者端可能由于開啟了重試機(jī)制導(dǎo)致同一條消息被發(fā)送了兩次,這時(shí)候可以讓生產(chǎn)者開啟冪等性配置參數(shù):enable.idempotence 默認(rèn)為true, 即開啟的。

消費(fèi)者端就是要保證實(shí)際消費(fèi)消息的位移和提交的位移一致,使用手工同步位移。當(dāng)然我們也可以在消費(fèi)消息的代碼邏輯保證消費(fèi)的冪等性:使用唯一索引或者分布式鎖都行

5.如何解決消息積壓?jiǎn)栴}

消息積壓會(huì)導(dǎo)致很多問(wèn)題,?如磁盤被打滿、?產(chǎn)端發(fā)消息導(dǎo)致kafka性能過(guò)慢,最后導(dǎo)致出現(xiàn)服務(wù)雪崩不可用,解決方案如下:

  • 如果是Kafka消費(fèi)能力不足,則可以考慮增加Topic的分區(qū)數(shù),并且同時(shí)提升消費(fèi)組的消費(fèi)者數(shù)量,消費(fèi)者數(shù) = 分區(qū)數(shù)。因?yàn)橹黝}的一個(gè)分區(qū)只能被消費(fèi)者組中一個(gè)消費(fèi)者消費(fèi),假如我們消費(fèi)者組里有3個(gè)消費(fèi)者,但是主題就一個(gè)分區(qū),這就白白空著兩個(gè)消費(fèi)者無(wú)所事事。如果已經(jīng)是多個(gè)消費(fèi)者對(duì)應(yīng)多個(gè)分區(qū)了,還是消費(fèi)比較慢,就說(shuō)明是消息消息的代碼邏輯過(guò)重處理過(guò)慢,可以引入多線程異步操作,但這時(shí)候需要自己控制代碼邏輯來(lái)保證消費(fèi)的順序性,因?yàn)橐粋€(gè)分區(qū)內(nèi)的消息是有序的,被一個(gè)消費(fèi)者順序消費(fèi),但是當(dāng)消費(fèi)者開啟多線程處理之后就不能保證順序消費(fèi)了。
  • 如果是下游的數(shù)據(jù)處理不及時(shí):提高每批次拉取的數(shù)量。批次拉取數(shù)據(jù)過(guò)少(拉取數(shù)據(jù)/處理時(shí)間 < 生產(chǎn)速度),使處理的數(shù)據(jù)小于生產(chǎn)的數(shù)據(jù),也會(huì)造成數(shù)據(jù)積壓。比如說(shuō)可以從一次最多拉取500條,調(diào)整為一次最多拉取1000條。簡(jiǎn)單來(lái)說(shuō)就是在消費(fèi)能力跟得上的同時(shí),盡量保證消費(fèi)速度>生產(chǎn)速度,這樣就不會(huì)堆積了。

6.如何保證消息的有序性。

生產(chǎn)者:在發(fā)送時(shí)將ack不能設(shè)置0,關(guān)閉重試,使?同步發(fā)送,等到發(fā)送成功再發(fā)送下?條。確保消息是順序發(fā)送的。

消費(fèi)者:消息是發(fā)送到?個(gè)分區(qū)中,只能有?個(gè)消費(fèi)組的消費(fèi)者來(lái)接收消息。

因此,kafka的順序消費(fèi)會(huì)犧牲掉性能。

7.什么是重平衡?

Rebalance 就是讓一個(gè) Consumer Group 下所有的 Consumer 實(shí)例就如何消費(fèi)訂閱主題的所有分區(qū)達(dá)成共識(shí)的過(guò)程。在 Rebalance 過(guò)程中,所有 Consumer 實(shí)例共同參與,在協(xié)調(diào)者組件的幫助下,完成訂閱主題分區(qū)的分配。但是,在整個(gè)過(guò)程中,所有實(shí)例都不能消費(fèi)任何消息,因此它對(duì) Consumer 的 TPS 影響很大。重平衡觸發(fā)的場(chǎng)景:

  • 消費(fèi)者組訂閱的主題的分區(qū)數(shù)增加了,注意主題分區(qū)數(shù)只能增加,不能減少
  • 消費(fèi)者組訂閱的主題數(shù)有變化,可能變多了也可能變少了。
  • 消費(fèi)者組成員有變化,可能變多了也可能變少了。

前兩個(gè)訂閱的分區(qū)數(shù)增加還是主題數(shù)變化,都是一個(gè)主動(dòng)發(fā)起Rebalance,我們是能提前感知到的。Consumer 實(shí)例增加的情況很好理解,當(dāng)我們啟動(dòng)一個(gè)配置有相同 group.id 值的 Consumer 程序時(shí),實(shí)際上就向這個(gè) Group 添加了一個(gè)新的 Consumer 實(shí)例。此時(shí),Coordinator 會(huì)接納這個(gè)新實(shí)例,將其加入到組中,并重新分配分區(qū)。通常來(lái)說(shuō),增加 Consumer 實(shí)例的操作都是計(jì)劃內(nèi)的,可能是出于增加 TPS 或提高伸縮性的需要。但是對(duì)于Consumer 實(shí)例減少,大部分不是人為操作下線的,更多情況是Consumer 實(shí)例會(huì)被 Coordinator 錯(cuò)誤地認(rèn)為“已停止”從而被“踢出”Group。如果是這個(gè)原因?qū)е碌?Rebalance,這種情況就得引起我們重視了。

Coordinator 會(huì)在什么情況下認(rèn)為某個(gè) Consumer 實(shí)例已掛從而要退組呢?

當(dāng) Consumer Group 完成 Rebalance 之后,每個(gè) Consumer 實(shí)例都會(huì)定期地向 Coordinator 發(fā)送心跳請(qǐng)求,表明它還存活著。如果某個(gè) Consumer 實(shí)例不能及時(shí)地發(fā)送這些心跳請(qǐng)求,Coordinator 就會(huì)認(rèn)為該 Consumer 已經(jīng)“死”了,從而將其從 Group 中移除,然后開啟新一輪 Rebalance。Consumer 端有個(gè)參數(shù),叫 session.timeout.ms,就是被用來(lái)表征此事的。該參數(shù)的默認(rèn)值是 145秒,即如果 Coordinator 在 45 秒之內(nèi)沒有收到 Group 下某 Consumer 實(shí)例的心跳,它就會(huì)認(rèn)為這個(gè) Consumer 實(shí)例已經(jīng)掛了??梢赃@么說(shuō),session.timout.ms 決定了 Consumer 存活性的時(shí)間間隔。

Consumer 端還有一個(gè)參數(shù),用于控制 Consumer 實(shí)際消費(fèi)能力對(duì) Rebalance 的影響,即 max.poll.interval.ms 參數(shù)。它限定了 Consumer 端應(yīng)用程序兩次調(diào)用 poll 方法的最大時(shí)間間隔。它的默認(rèn)值是 5 分鐘,表示你的 Consumer 程序如果在 5 分鐘之內(nèi)無(wú)法消費(fèi)完 poll 方法返回的消息,那么 Consumer 會(huì)主動(dòng)發(fā)起“離開組”的請(qǐng)求,Coordinator 也會(huì)開啟新一輪 Rebalance。

8.kafka作為消息隊(duì)列為什么發(fā)送和消費(fèi)消息這么快?

  • 消息分區(qū):不受單臺(tái)服務(wù)器的限制,可以不受限的處理更多的數(shù)據(jù)
  • 順序讀寫:磁盤順序讀寫,提升讀寫效率
  • 頁(yè)緩存:把磁盤中的數(shù)據(jù)緩存到內(nèi)存中,把對(duì)磁盤的訪問(wèn)變?yōu)閷?duì)內(nèi)存的訪問(wèn)
  • 零拷貝:減少上下文切換及數(shù)據(jù)拷貝
  • 消息壓縮:減少磁盤IO和網(wǎng)絡(luò)IO
  • 分批發(fā)送:將消息打包批量發(fā)送,減少網(wǎng)絡(luò)開銷
責(zé)任編輯:武曉燕 來(lái)源: Shepherd進(jìn)階筆記
相關(guān)推薦

2011-04-29 10:31:36

數(shù)據(jù)中心虛擬化

2016-09-30 15:55:54

DataMining大數(shù)據(jù)時(shí)代

2010-03-22 11:57:23

云計(jì)算

2009-05-08 09:23:52

網(wǎng)管故障病毒

2009-02-02 13:54:49

忠告成長(zhǎng)涉世之初

2010-06-13 10:50:05

職場(chǎng)辛酸教訓(xùn)

2020-05-08 15:06:58

數(shù)據(jù)科學(xué)模型深度學(xué)習(xí)

2022-01-04 08:21:50

經(jīng)驗(yàn)職場(chǎng)工作

2018-05-29 22:24:22

程序員開發(fā)學(xué)習(xí)

2017-01-05 16:29:00

2018-07-04 13:53:08

2019-11-13 10:16:14

大數(shù)據(jù)架構(gòu)數(shù)據(jù)科學(xué)

2015-07-02 10:50:55

SDN

2011-07-07 10:49:41

JavaScript

2019-05-06 13:42:13

大數(shù)據(jù)分布式架構(gòu)

2024-11-27 08:28:37

2013-10-23 13:33:29

微軟Surface庫(kù)克

2012-04-16 09:50:08

2015-11-10 09:40:55

IT實(shí)施計(jì)劃IT
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)