Kafka大廠高頻面試題:在保證高性能、高吞吐的同時(shí)保證高可用性
Kafka的消息傳輸保障機(jī)制非常直觀。當(dāng)producer向broker發(fā)送消息時(shí),一旦這條消息被commit,由于副本機(jī)制(replication)的存在,它就不會(huì)丟失。但是如果producer發(fā)送數(shù)據(jù)給broker后,遇到的網(wǎng)絡(luò)問題而造成通信中斷,那producer就無法判斷該條消息是否已經(jīng)提交(commit)。雖然Kafka無法確定網(wǎng)絡(luò)故障期間發(fā)生了什么,但是producer可以retry多次,確保消息已經(jīng)正確傳輸?shù)絙roker中,所以目前Kafka實(shí)現(xiàn)的是at least once。
一、冪等性
1.場景
所謂冪等性,就是對接口的多次調(diào)用所產(chǎn)生的結(jié)果和調(diào)用一次是一致的。生產(chǎn)者在進(jìn)行重試的時(shí)候有可能會(huì)重復(fù)寫入消息,而使用Kafka的冪等性功能就可以避免這種情況。
冪等性是有條件的:
只能保證 Producer 在單個(gè)會(huì)話內(nèi)不丟不重,如果 Producer 出現(xiàn)意外掛掉再重啟是無法保證的(冪等性情況下,是無法獲取之前的狀態(tài)信息,因此是無法做到跨會(huì)話級(jí)別的不丟不重)。
冪等性不能跨多個(gè) Topic-Partition,只能保證單個(gè) partition 內(nèi)的冪等性,當(dāng)涉及多個(gè)Topic-Partition 時(shí),這中間的狀態(tài)并沒有同步。
Producer 使用冪等性的示例非常簡單,與正常情況下 Producer 使用相比變化不大,只需要把Producer 的配置 enable.idempotence 設(shè)置為 true 即可,如下所示:
- Properties props = new Properties();
- props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
- props.put("acks", "all"); // 當(dāng) enable.idempotence 為 true,這里默認(rèn)為 all
- props.put("bootstrap.servers", "localhost:9092");
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- KafkaProducer producer = new KafkaProducer(props);
- producer.send(new ProducerRecord(topic, "test");
二、事務(wù)
1.場景
冪等性并不能跨多個(gè)分區(qū)運(yùn)作,而事務(wù)可以彌補(bǔ)這個(gè)缺憾,事務(wù)可以保證對多個(gè)分區(qū)寫入操作的原子性。操作的原子性是指多個(gè)操作要么全部成功,要么全部失敗,不存在部分成功部分失敗的可能。
為了實(shí)現(xiàn)事務(wù),網(wǎng)絡(luò)故障必須提供唯一的transactionalId,這個(gè)參數(shù)通過客戶端程序來進(jìn)行設(shè)定。
見代碼庫:
com.heima.kafka.chapter7.ProducerTransactionSend
- properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);
2.前期準(zhǔn)備
事務(wù)要求生產(chǎn)者開啟冪等性特性,因此通過將transactional.id參數(shù)設(shè)置為非空從而開啟事務(wù)特性的同時(shí)需要將ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG設(shè)置為true(默認(rèn)值為true),如果顯示設(shè)置為false,則會(huì)拋出異常。
KafkaProducer提供了5個(gè)與事務(wù)相關(guān)的方法,詳細(xì)如下:
- //初始化事務(wù),前提是配置了transactionalId
- public void initTransactions()
- //開啟事務(wù)
- public void beginTransaction()
- //為消費(fèi)者提供事務(wù)內(nèi)的位移提交操作
- public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId)
- //提交事務(wù)
- public void commitTransaction()
- //終止事務(wù),類似于回滾
- public void abortTransaction()
3.案例解析
見代碼庫:
- com.heima.kafka.chapter7.ProducerTransactionSend
消息發(fā)送端
- /**
- * Kafka Producer事務(wù)的使用
- */
- public class ProducerTransactionSend {
- public static final String topic = "topic-transaction";
- public static final String brokerList = "localhost:9092";
- public static final String transactionId = "transactionId";
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
- properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);
- KafkaProducer<String, String> producer = new KafkaProducer<> (properties);
- producer.initTransactions();
- producer.beginTransaction();
- try {
- //處理業(yè)務(wù)邏輯并創(chuàng)建ProducerRecord
- ProducerRecord<String, String> record1 = new ProducerRecord<>(topic, "msg1");
- producer.send(record1);
- ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, "msg2");
- producer.send(record2);
- ProducerRecord<String, String> record3 = new ProducerRecord<>(topic, "msg3");
- producer.send(record3);
- //處理一些其它邏輯
- producer.commitTransaction();
- } catch (ProducerFencedException e) {
- producer.abortTransaction();
- }
- producer.close();
- }
- }
模擬事務(wù)回滾案例
- try {
- //處理業(yè)務(wù)邏輯并創(chuàng)建ProducerRecord
- ProducerRecord<String, String> record1 = new ProducerRecord<>(topic, "msg1");
- producer.send(record1);
- //模擬事務(wù)回滾案例
- System.out.println(1/0);
- ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, "msg2");
- producer.send(record2);
- ProducerRecord<String, String> record3 = new ProducerRecord<>(topic, "msg3");
- producer.send(record3);
- //處理一些其它邏輯
- producer.commitTransaction();
- } catch (ProducerFencedException e) {
- producer.abortTransaction();
- }
從上面案例中,msg1發(fā)送成功之后,出現(xiàn)了異常事務(wù)進(jìn)行了回滾,則msg1消費(fèi)端也收不到消息。
三、控制器
在Kafka集群中會(huì)有一個(gè)或者多個(gè)broker,其中有一個(gè)broker會(huì)被選舉為控制器(Kafka Controller),它負(fù)責(zé)管理整個(gè)集群中所有分區(qū)和副本的狀態(tài)。當(dāng)某個(gè)分區(qū)的leader副本出現(xiàn)故障時(shí),由控制器負(fù)責(zé)為該分區(qū)選舉新的leader副本。當(dāng)檢測到某個(gè)分區(qū)的ISR集合發(fā)生變化時(shí),由控制器負(fù)責(zé)通知所有broker更新其元數(shù)據(jù)信息。當(dāng)使用kafka-topics.sh腳本為某個(gè)topic增加分區(qū)數(shù)量時(shí),同樣還是由控制器負(fù)責(zé)分區(qū)的重新分配。
Kafka中的控制器選舉的工作依賴于Zookeeper,成功競選為控制器的broker會(huì)在Zookeeper中創(chuàng)建/controller這個(gè)臨時(shí)(EPHEMERAL)節(jié)點(diǎn),此臨時(shí)節(jié)點(diǎn)的內(nèi)容參考如下:
1.ZooInspector管理
使用zookeeper圖形化的客戶端工具(ZooInspector)提供的jar來進(jìn)行管理,啟動(dòng)如下:
- 定位到j(luò)ar所在目錄
- 運(yùn)行jar文件 java -jar zookeeper-dev-ZooInspector.jar
- 連接Zookeeper
- {"version":1,"brokerid":0,"timestamp":"1529210278988"}
其中version在目前版本中固定為1,brokerid表示稱為控制器的broker的id編號(hào),timestamp表示競選稱為控制器時(shí)的時(shí)間戳。
在任意時(shí)刻,集群中有且僅有一個(gè)控制器。每個(gè)broker啟動(dòng)的時(shí)候會(huì)去嘗試去讀取/controller節(jié)點(diǎn)的brokerid的值,如果讀取到brokerid的值不為-1,則表示已經(jīng)有其它broker節(jié)點(diǎn)成功競選為控制器,所以當(dāng)前broker就會(huì)放棄競選;如果Zookeeper中不存在/controller這個(gè)節(jié)點(diǎn),或者這個(gè)節(jié)點(diǎn)中的數(shù)據(jù)異常,那么就會(huì)嘗試去創(chuàng)建/controller這個(gè)節(jié)點(diǎn),當(dāng)前broker去創(chuàng)建節(jié)點(diǎn)的時(shí)候,也有可能其他broker同時(shí)去嘗試創(chuàng)建這個(gè)節(jié)點(diǎn),只有創(chuàng)建成功的那個(gè)broker才會(huì)成為控制器,而創(chuàng)建失敗的broker則表示競選失敗。每個(gè)broker都會(huì)在內(nèi)存中保存當(dāng)前控制器的brokerid值,這個(gè)值可以標(biāo)識(shí)為activeControllerId。
Zookeeper中還有一個(gè)與控制器有關(guān)的/controller_epoch節(jié)點(diǎn),這個(gè)節(jié)點(diǎn)是持久(PERSISTENT)節(jié)點(diǎn),節(jié)點(diǎn)中存放的是一個(gè)整型的controller_epoch值。controller_epoch用于記錄控制器發(fā)生變更的次數(shù),即記錄當(dāng)前的控制器是第幾代控制器,我們也可以稱之為“控制器的紀(jì)元”。
controller_epoch的初始值為1,即集群中第一個(gè)控制器的紀(jì)元為1,當(dāng)控制器發(fā)生變更時(shí),沒選出一個(gè)新的控制器就將該字段值加1。每個(gè)和控制器交互的請求都會(huì)攜帶上controller_epoch這個(gè)字段,如果請求的controller_epoch值小于內(nèi)存中的controller_epoch值,則認(rèn)為這個(gè)請求是向已經(jīng)過期的控制器所發(fā)送的請求,那么這個(gè)請求會(huì)被認(rèn)定為無效的請求。如果請求的controller_epoch值大于內(nèi)存中的controller_epoch值,那么則說明已經(jīng)有新的控制器當(dāng)選了。由此可見,Kafka通過controller_epoch來保證控制器的唯一性,進(jìn)而保證相關(guān)操作的一致性。
具備控制器身份的broker需要比其他普通的broker多一份職責(zé),具體細(xì)節(jié)如下:
- 監(jiān)聽partition相關(guān)的變化。
- 監(jiān)聽topic相關(guān)的變化。
- 監(jiān)聽broker相關(guān)的變化。
- 從Zookeeper中讀取獲取當(dāng)前所有與topic、partition以及broker有關(guān)的信息并進(jìn)行相應(yīng)的管理。
四、可靠性保證
- 可靠性保證:確保系統(tǒng)在各種不同的環(huán)境下能夠發(fā)生一致的行為
- Kafka的保證
- 保證分區(qū)消息的順序如果使用同一個(gè)生產(chǎn)者往同一個(gè)分區(qū)寫入消息,而且消息B在消息A之后寫入那么Kafka可以保證消息B的偏移量比消息A的偏移量大,而且消費(fèi)者會(huì)先讀取消息A再讀取消息B
- 只有當(dāng)消息被寫入分區(qū)的所有同步副本時(shí)(文件系統(tǒng)緩存),它才被認(rèn)為是已提交
- 生產(chǎn)者可以選擇接收不同類型的確認(rèn),控制參數(shù) acks
- 只要還有一個(gè)副本是活躍的,那么已提交的消息就不會(huì)丟失
- 消費(fèi)者只能讀取已經(jīng)提交的消息
1. 失效副本
怎么樣判定一個(gè)分區(qū)是否有副本是處于同步失效狀態(tài)的呢?從Kafka 0.9.x版本開始通過唯一的一個(gè)參數(shù)replica.lag.time.max.ms(默認(rèn)大小為10,000)來控制,當(dāng)ISR中的一個(gè)follower副本滯后leader副本的時(shí)間超過參數(shù)replica.lag.time.max.ms指定的值時(shí)即判定為副本失效,需要將此follower副本剔出除ISR之外。具體實(shí)現(xiàn)原理很簡單,當(dāng)follower副本將leader副本的LEO(Log End Offset,每個(gè)分區(qū)最后一條消息的位置)之前的日志全部同步時(shí),則認(rèn)為該follower副本已經(jīng)追趕上leader副本,此時(shí)更新該副本的lastCaughtUpTimeMs標(biāo)識(shí)。Kafka的副本管理器(ReplicaManager)啟動(dòng)時(shí)會(huì)啟動(dòng)一個(gè)副本過期檢測的定時(shí)任務(wù),而這個(gè)定時(shí)任務(wù)會(huì)定時(shí)檢查當(dāng)前時(shí)間與副本的lastCaughtUpTimeMs差值是否大于參數(shù)replica.lag.time.max.ms指定的值。千萬不要錯(cuò)誤地認(rèn)為follower副本只要拉取leader副本的數(shù)據(jù)就會(huì)更新lastCaughtUpTimeMs,試想當(dāng)leader副本的消息流入速度大于follower副本的拉取速度時(shí),follower副本一直不斷的拉取leader副本的消息也不能與leader副本同步,如果還將此follower副本置于ISR中,那么當(dāng)leader副本失效,而選取此follower副本為新的leader副本,那么就會(huì)有嚴(yán)重的消息丟失。
2.副本復(fù)制
Kafka 中的每個(gè)主題分區(qū)都被復(fù)制了 n 次,其中的 n 是主題的復(fù)制因子(replication factor)。這允許Kafka 在集群服務(wù)器發(fā)生故障時(shí)自動(dòng)切換到這些副本,以便在出現(xiàn)故障時(shí)消息仍然可用。Kafka 的復(fù)制是以分區(qū)為粒度的,分區(qū)的預(yù)寫日志被復(fù)制到 n 個(gè)服務(wù)器。 在 n 個(gè)副本中,一個(gè)副本作為 leader,其他副本成為 followers。顧名思義,producer 只能往 leader 分區(qū)上寫數(shù)據(jù)(讀也只能從 leader 分區(qū)上進(jìn)行),followers 只按順序從 leader 上復(fù)制日志。
一個(gè)副本可以不同步Leader有如下幾個(gè)原因 慢副本:在一定周期時(shí)間內(nèi)follower不能追趕上leader。最常見的原因之一是I / O瓶頸導(dǎo)致follower追加復(fù)制消息速度慢于從leader拉取速度。 卡住副本:在一定周期時(shí)間內(nèi)follower停止從leader拉取請求。follower replica卡住了是由于GC暫?;騠ollower失效或死亡。
新啟動(dòng)副本:當(dāng)用戶給主題增加副本因子時(shí),新的follower不在同步副本列表中,直到他們完全趕上了leader日志。
如何確定副本是滯后的:
- replica.lag.max.messages=4
在服務(wù)端現(xiàn)在只有一個(gè)參數(shù)需要配置replica.lag.time.max.ms。這個(gè)參數(shù)解釋replicas響應(yīng)partition leader的最長等待時(shí)間。檢測卡住或失敗副本的探測——如果一個(gè)replica失敗導(dǎo)致發(fā)送拉取請求時(shí)間間隔超過replica.lag.time.max.ms。Kafka會(huì)認(rèn)為此replica已經(jīng)死亡會(huì)從同步副本列表從移除。檢測慢副本機(jī)制發(fā)生了變化——如果一個(gè)replica開始落后leader超過replica.lag.time.max.ms。Kafka會(huì)認(rèn)為太緩慢并且會(huì)從同步副本列表中移除。除非replica請求leader時(shí)間間隔大于replica.lag.time.max.ms,因此即使leader使流量激增和大批量寫消息。Kafka也不會(huì)從同步副本列表從移除該副本。
Leader Epoch引用
數(shù)據(jù)丟失場景
數(shù)據(jù)出現(xiàn)不一致場景
Kafka 0.11.0.0.版本解決方案
造成上述兩個(gè)問題的根本原因在于HW值被用于衡量副本備份的成功與否以及在出現(xiàn)failture時(shí)作為日志截?cái)嗟囊罁?jù),但HW值得更新是異步延遲的,特別是需要額外的FETCH請求處理流程才能更新,故這中間發(fā)生的任何崩潰都可能導(dǎo)致HW值的過期。鑒于這些原因,Kafka 0.11引入了leader epoch來取代HW值。Leader端多開辟一段內(nèi)存區(qū)域?qū)iT保存leader的epoch信息,這樣即使出現(xiàn)上面的兩個(gè)場景也能很好地規(guī)避這些問題。
所謂leader epoch實(shí)際上是一對值:(epoch,offset)。epoch表示leader的版本號(hào),從0開始,當(dāng)leader變更過1次時(shí)epoch就會(huì)+1,而offset則對應(yīng)于該epoch版本的leader寫入第一條消息的位移。因此假設(shè)有兩對值:
- (0, 0)
- (1, 120)
則表示第一個(gè)leader從位移0開始寫入消息;共寫了120條[0, 119];而第二個(gè)leader版本號(hào)是1,從位移120處開始寫入消息。
leader broker中會(huì)保存這樣的一個(gè)緩存,并定期地寫入到一個(gè)checkpoint文件中。
避免數(shù)據(jù)丟失:
避免數(shù)據(jù)不一致
六、消息重復(fù)的場景及解決方案
1.生產(chǎn)者端重復(fù)
生產(chǎn)發(fā)送的消息沒有收到正確的broke響應(yīng),導(dǎo)致producer重試。
producer發(fā)出一條消息,broke落盤以后因?yàn)榫W(wǎng)絡(luò)等種種原因發(fā)送端得到一個(gè)發(fā)送失敗的響應(yīng)或者網(wǎng)絡(luò)中斷,然后producer收到一個(gè)可恢復(fù)的Exception重試消息導(dǎo)致消息重復(fù)。
解決方案:
- 啟動(dòng)kafka的冪等性
要啟動(dòng)kafka的冪等性,無需修改代碼,默認(rèn)為關(guān)閉,需要修改配置文件:enable.idempotence=true 同時(shí)要求 ack=all 且 retries>1。
- ack=0,不重試。
可能會(huì)丟消息,適用于吞吐量指標(biāo)重要性高于數(shù)據(jù)丟失,例如:日志收集。
消費(fèi)者端重復(fù)
根本原因
數(shù)據(jù)消費(fèi)完沒有及時(shí)提交offset到broker。
解決方案
取消自動(dòng)自動(dòng)提交
每次消費(fèi)完或者程序退出時(shí)手動(dòng)提交。這可能也沒法保證一條重復(fù)。
下游做冪等
一般的解決方案是讓下游做冪等或者盡量每消費(fèi)一條消息都記錄offset,對于少數(shù)嚴(yán)格的場景可能需要把offset或唯一ID,例如訂單ID和下游狀態(tài)更新放在同一個(gè)數(shù)據(jù)庫里面做事務(wù)來保證精確的一次更新或者在下游數(shù)據(jù)表里面同時(shí)記錄消費(fèi)offset,然后更新下游數(shù)據(jù)的時(shí)候用消費(fèi)位點(diǎn)做樂觀鎖拒絕掉舊位點(diǎn)的數(shù)據(jù)更新。
七、__consumer_offsets
_consumer_offsets是一個(gè)內(nèi)部topic,對用戶而言是透明的,除了它的數(shù)據(jù)文件以及偶爾在日志中出現(xiàn)這兩點(diǎn)之外,用戶一般是感覺不到這個(gè)topic的。不過我們的確知道它保存的是Kafka新版本consumer的位移信息。
1.何時(shí)創(chuàng)建
一般情況下,當(dāng)集群中第一有消費(fèi)者消費(fèi)消息時(shí)會(huì)自動(dòng)創(chuàng)建主題__consumer_offsets,分區(qū)數(shù)可以通過offsets.topic.num.partitions參數(shù)設(shè)定,默認(rèn)值為50,如下:
2.解析分區(qū)
見代碼庫:
- com.heima.kafka.chapter7.ConsumerOffsetsAnalysis
獲取所有分區(qū):
總結(jié)
本章主要講解了Kafka相關(guān)穩(wěn)定性的操作,包括冪等性、事務(wù)的處理,同時(shí)對可靠性保證與一致性保證做了講解,講解了消息重復(fù)以及解決方案。