Kafka Consumer 消費(fèi)消息和 Rebalance 機(jī)制
Kafka Consumer
Kafka 有消費(fèi)組的概念,每個(gè)消費(fèi)者只能消費(fèi)所分配到的分區(qū)的消息,每一個(gè)分區(qū)只能被一個(gè)消費(fèi)組中的一個(gè)消費(fèi)者所消費(fèi),所以同一個(gè)消費(fèi)組中消費(fèi)者的數(shù)量如果超過(guò)了分區(qū)的數(shù)量,將會(huì)出現(xiàn)有些消費(fèi)者分配不到消費(fèi)的分區(qū)。消費(fèi)組與消費(fèi)者關(guān)系如下圖所示:
consumer group
Kafka Consumer Client 消費(fèi)消息通常包含以下步驟:
- 配置客戶端,創(chuàng)建消費(fèi)者
- 訂閱主題
- 拉去消息并消費(fèi)
- 提交消費(fèi)位移
- 關(guān)閉消費(fèi)者實(shí)例
過(guò)程
因?yàn)?Kafka 的 Consumer 客戶端是線程不安全的,為了保證線程安全,并提升消費(fèi)性能,可以在 Consumer 端采用類似 Reactor 的線程模型來(lái)消費(fèi)數(shù)據(jù)。
消費(fèi)模型
Kafka consumer 參數(shù)
- bootstrap.servers:連接 broker 地址,host:port 格式。
- group.id:消費(fèi)者隸屬的消費(fèi)組。
- key.deserializer:與生產(chǎn)者的key.serializer對(duì)應(yīng),key 的反序列化方式。
- value.deserializer:與生產(chǎn)者的value.serializer對(duì)應(yīng),value 的反序列化方式。
- session.timeout.ms:coordinator 檢測(cè)失敗的時(shí)間。默認(rèn) 10s 該參數(shù)是 Consumer Group 主動(dòng)檢測(cè) (組內(nèi)成員 comsummer) 崩潰的時(shí)間間隔,類似于心跳過(guò)期時(shí)間。
- auto.offset.reset:該屬性指定了消費(fèi)者在讀取一個(gè)沒(méi)有偏移量后者偏移量無(wú)效(消費(fèi)者長(zhǎng)時(shí)間失效當(dāng)前的偏移量已經(jīng)過(guò)時(shí)并且被刪除了)的分區(qū)的情況下,應(yīng)該作何處理,默認(rèn)值是 latest,也就是從最新記錄讀取數(shù)據(jù)(消費(fèi)者啟動(dòng)之后生成的記錄),另一個(gè)值是 earliest,意思是在偏移量無(wú)效的情況下,消費(fèi)者從起始位置開始讀取數(shù)據(jù)。
- enable.auto.commit:否自動(dòng)提交位移,如果為false,則需要在程序中手動(dòng)提交位移。對(duì)于精確到一次的語(yǔ)義,最好手動(dòng)提交位移
- fetch.max.bytes:?jiǎn)未卫?shù)據(jù)的最大字節(jié)數(shù)量
- max.poll.records:?jiǎn)未?poll 調(diào)用返回的最大消息數(shù),如果處理邏輯很輕量,可以適當(dāng)提高該值。但是max.poll.records條數(shù)據(jù)需要在在 session.timeout.ms 這個(gè)時(shí)間內(nèi)處理完 。默認(rèn)值為 500
- request.timeout.ms:一次請(qǐng)求響應(yīng)的最長(zhǎng)等待時(shí)間。如果在超時(shí)時(shí)間內(nèi)未得到響應(yīng),kafka 要么重發(fā)這條消息,要么超過(guò)重試次數(shù)的情況下直接置為失敗。
Kafka Rebalance
rebalance 本質(zhì)上是一種協(xié)議,規(guī)定了一個(gè) consumer group 下的所有 consumer 如何達(dá)成一致來(lái)分配訂閱 topic 的每個(gè)分區(qū)。比如某個(gè) group 下有 20 個(gè) consumer,它訂閱了一個(gè)具有 100 個(gè)分區(qū)的 topic。正常情況下,Kafka 平均會(huì)為每個(gè) consumer 分配 5 個(gè)分區(qū)。這個(gè)分配的過(guò)程就叫 rebalance。
什么時(shí)候 rebalance?
這也是經(jīng)常被提及的一個(gè)問(wèn)題。rebalance 的觸發(fā)條件有三種:
- 組成員發(fā)生變更(新 consumer 加入組、已有 consumer 主動(dòng)離開組或已有 consumer 崩潰了——這兩者的區(qū)別后面會(huì)談到)
- 訂閱主題數(shù)發(fā)生變更
- 訂閱主題的分區(qū)數(shù)發(fā)生變更
如何進(jìn)行組內(nèi)分區(qū)分配?
Kafka 默認(rèn)提供了兩種分配策略:Range 和 Round-Robin。當(dāng)然 Kafka 采用了可插拔式的分配策略,你可以創(chuàng)建自己的分配器以實(shí)現(xiàn)不同的分配策略。
kafka 高頻面試題
- Kafka 有哪些命令行工具?你用過(guò)哪些?/bin目錄,管理 kafka 集群、管理 topic、生產(chǎn)和消費(fèi) kafka。
- Kafka Producer 的執(zhí)行過(guò)程?攔截器,序列化器,分區(qū)器和累加器。
- Kafka Producer 有哪些常見(jiàn)配置?broker 配置,ack 配置,網(wǎng)絡(luò)和發(fā)送參數(shù),壓縮參數(shù),ack 參數(shù)。
- 如何讓 Kafka 的消息有序?Kafka 在 Topic 級(jí)別本身是無(wú)序的,只有 partition 上才有序,所以為了保證處理順序,可以自定義分區(qū)器,將需順序處理的數(shù)據(jù)發(fā)送到同一個(gè) partition。
- Producer 如何保證數(shù)據(jù)發(fā)送不丟失?ack 機(jī)制,重試機(jī)制。
- 如何提升 Producer 的性能?批量,異步,壓縮。
- 如果同一 group 下 consumer 的數(shù)量大于 part 的數(shù)量,kafka 如何處理?多余的 Part 將處于無(wú)用狀態(tài),不消費(fèi)數(shù)據(jù)。
- Kafka Consumer 是否是線程安全的?不安全,單線程消費(fèi),多線程處理。
- 講一下你使用 Kafka Consumer 消費(fèi)消息時(shí)的線程模型,為何如此設(shè)計(jì)?拉取和處理分離。
- Kafka Consumer 的常見(jiàn)配置?broker, 網(wǎng)絡(luò)和拉取參數(shù),心跳參數(shù)。
- Consumer 什么時(shí)候會(huì)被踢出集群?奔潰,網(wǎng)絡(luò)異常,處理時(shí)間過(guò)長(zhǎng)提交位移超時(shí)。
- 當(dāng)有 Consumer 加入或退出時(shí),Kafka 會(huì)作何反應(yīng)?進(jìn)行 Rebalance。
- 什么是 Rebalance,何時(shí)會(huì)發(fā)生 Rebalance?topic 變化,consumer 變化。