帶你漲姿勢的認識一下Kafka Consumer
之前我們介紹過了 Kafka 整體架構(gòu),Kafka 生產(chǎn)者,Kafka 生產(chǎn)的消息最終流向哪里呢?當(dāng)然是需要消費了,要不只產(chǎn)生一系列數(shù)據(jù)沒有任何作用啊,如果把 Kafka 比作餐廳的話,那么生產(chǎn)者就是廚師的角色,消費者就是客人,只有廚師的話,那么炒出來的菜沒有人吃也沒有意義,如果只有客人沒有廚師的話,誰會去這個店吃飯呢?!所以如果你看完前面的文章意猶未盡的話,可以繼續(xù)讓你爽一爽。如果你沒看過前面的文章,那就從現(xiàn)在開始讓你爽。
Kafka 消費者概念
應(yīng)用程序使用 KafkaConsumer 從 Kafka 中訂閱主題并接收來自這些主題的消息,然后再把他們保存起來。應(yīng)用程序首先需要創(chuàng)建一個 KafkaConsumer 對象,訂閱主題并開始接受消息,驗證消息并保存結(jié)果。一段時間后,生產(chǎn)者往主題寫入的速度超過了應(yīng)用程序驗證數(shù)據(jù)的速度,這時候該如何處理?如果只使用單個消費者的話,應(yīng)用程序會跟不上消息生成的速度,就像多個生產(chǎn)者像相同的主題寫入消息一樣,這時候就需要多個消費者共同參與消費主題中的消息,對消息進行分流處理。
Kafka 消費者從屬于消費者群組。一個群組中的消費者訂閱的都是相同的主題,每個消費者接收主題一部分分區(qū)的消息。下面是一個 Kafka 分區(qū)消費示意圖
上圖中的主題 T1 有四個分區(qū),分別是分區(qū)0、分區(qū)1、分區(qū)2、分區(qū)3,我們創(chuàng)建一個消費者群組1,消費者群組中只有一個消費者,它訂閱主題T1,接收到 T1 中的全部消息。由于一個消費者處理四個生產(chǎn)者發(fā)送到分區(qū)的消息,壓力有些大,需要幫手來幫忙分擔(dān)任務(wù),于是就演變?yōu)橄聢D
這樣一來,消費者的消費能力就大大提高了,但是在某些環(huán)境下比如用戶產(chǎn)生消息特別多的時候,生產(chǎn)者產(chǎn)生的消息仍舊讓消費者吃不消,那就繼續(xù)增加消費者。
如上圖所示,每個分區(qū)所產(chǎn)生的消息能夠被每個消費者群組中的消費者消費,如果向消費者群組中增加更多的消費者,那么多余的消費者將會閑置,如下圖所示
向群組中增加消費者是橫向伸縮消費能力的主要方式??偠灾?,我們可以通過增加消費組的消費者來進行水平擴展提升消費能力。這也是為什么建議創(chuàng)建主題時使用比較多的分區(qū)數(shù),這樣可以在消費負載高的情況下增加消費者來提升性能。另外,消費者的數(shù)量不應(yīng)該比分區(qū)數(shù)多,因為多出來的消費者是空閑的,沒有任何幫助。
Kafka 一個很重要的特性就是,只需寫入一次消息,可以支持任意多的應(yīng)用讀取這個消息。換句話說,每個應(yīng)用都可以讀到全量的消息。為了使得每個應(yīng)用都能讀到全量消息,應(yīng)用需要有不同的消費組。對于上面的例子,假如我們新增了一個新的消費組 G2,而這個消費組有兩個消費者,那么就演變?yōu)橄聢D這樣
在這個場景中,消費組 G1 和消費組 G2 都能收到 T1 主題的全量消息,在邏輯意義上來說它們屬于不同的應(yīng)用。
總結(jié)起來就是如果應(yīng)用需要讀取全量消息,那么請為該應(yīng)用設(shè)置一個消費組;如果該應(yīng)用消費能力不足,那么可以考慮在這個消費組里增加消費者。
消費者組和分區(qū)重平衡
消費者組是什么
消費者組(Consumer Group)是由一個或多個消費者實例(Consumer Instance)組成的群組,具有可擴展性和可容錯性的一種機制。消費者組內(nèi)的消費者共享一個消費者組ID,這個ID 也叫做 Group ID,組內(nèi)的消費者共同對一個主題進行訂閱和消費,同一個組中的消費者只能消費一個分區(qū)的消息,多余的消費者會閑置,派不上用場。
我們在上面提到了兩種消費方式
- 一個消費者群組消費一個主題中的消息,這種消費模式又稱為點對點的消費方式,點對點的消費方式又被稱為消息隊列
- 一個主題中的消息被多個消費者群組共同消費,這種消費模式又稱為發(fā)布-訂閱模式
消費者重平衡
我們從上面的消費者演變圖中可以知道這么一個過程:最初是一個消費者訂閱一個主題并消費其全部分區(qū)的消息,后來有一個消費者加入群組,隨后又有更多的消費者加入群組,而新加入的消費者實例分攤了最初消費者的部分消息,這種把分區(qū)的所有權(quán)通過一個消費者轉(zhuǎn)到其他消費者的行為稱為重平衡,英文名也叫做 Rebalance 。如下圖所示
重平衡非常重要,它為消費者群組帶來了高可用性 和 伸縮性,我們可以放心的添加消費者或移除消費者,不過在正常情況下我們并不希望發(fā)生這樣的行為。在重平衡期間,消費者無法讀取消息,造成整個消費者組在重平衡的期間都不可用。另外,當(dāng)分區(qū)被重新分配給另一個消費者時,消息當(dāng)前的讀取狀態(tài)會丟失,它有可能還需要去刷新緩存,在它重新恢復(fù)狀態(tài)之前會拖慢應(yīng)用程序。
消費者通過向組織協(xié)調(diào)者(Kafka Broker)發(fā)送心跳來維護自己是消費者組的一員并確認其擁有的分區(qū)。對于不同不的消費群體來說,其組織協(xié)調(diào)者可以是不同的。只要消費者定期發(fā)送心跳,就會認為消費者是存活的并處理其分區(qū)中的消息。當(dāng)消費者檢索記錄或者提交它所消費的記錄時就會發(fā)送心跳。
如果過了一段時間 Kafka 停止發(fā)送心跳了,會話(Session)就會過期,組織協(xié)調(diào)者就會認為這個 Consumer 已經(jīng)死亡,就會觸發(fā)一次重平衡。如果消費者宕機并且停止發(fā)送消息,組織協(xié)調(diào)者會等待幾秒鐘,確認它死亡了才會觸發(fā)重平衡。在這段時間里,死亡的消費者將不處理任何消息。在清理消費者時,消費者將通知協(xié)調(diào)者它要離開群組,組織協(xié)調(diào)者會觸發(fā)一次重平衡,盡量降低處理停頓。
重平衡是一把雙刃劍,它為消費者群組帶來高可用性和伸縮性的同時,還有有一些明顯的缺點(bug),而這些 bug 到現(xiàn)在社區(qū)還無法修改。
重平衡的過程對消費者組有極大的影響。因為每次重平衡過程中都會導(dǎo)致萬物靜止,參考 JVM 中的垃圾回收機制,也就是 Stop The World ,STW,(引用自《深入理解 Java 虛擬機》中 p76 關(guān)于 Serial 收集器的描述):
更重要的是它在進行垃圾收集時,必須暫停其他所有的工作線程。直到它收集結(jié)束。Stop The World 這個名字聽起來很帥,但這項工作實際上是由虛擬機在后臺自動發(fā)起并完成的,在用戶不可見的情況下把用戶正常工作的線程全部停掉,這對很多應(yīng)用來說都是難以接受的。
也就是說,在重平衡期間,消費者組中的消費者實例都會停止消費,等待重平衡的完成。而且重平衡這個過程很慢......
創(chuàng)建消費者
上面的理論說的有點多,下面就通過代碼來講解一下消費者是如何消費的
在讀取消息之前,需要先創(chuàng)建一個 KafkaConsumer 對象。創(chuàng)建 KafkaConsumer 對象與創(chuàng)建 KafkaProducer 對象十分相似 --- 把需要傳遞給消費者的屬性放在 properties 對象中,后面我們會著重討論 Kafka 的一些配置,這里我們先簡單的創(chuàng)建一下,使用3個屬性就足矣,分別是 bootstrap.server,key.deserializer,value.deserializer 。
這三個屬性我們已經(jīng)用過很多次了,如果你還不是很清楚的話,可以參考 帶你漲姿勢是認識一下Kafka Producer
還有一個屬性是 group.id 這個屬性不是必須的,它指定了 KafkaConsumer 是屬于哪個消費者群組。創(chuàng)建不屬于任何一個群組的消費者也是可以的
- Properties properties = new Properties();
- properties.put("bootstrap.server","192.168.1.9:9092"); properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
- KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);
主題訂閱
創(chuàng)建好消費者之后,下一步就開始訂閱主題了。subscribe() 方法接受一個主題列表作為參數(shù),使用起來比較簡單
- consumer.subscribe(Collections.singletonList("customerTopic"));
為了簡單我們只訂閱了一個主題 customerTopic,參數(shù)傳入的是一個正則表達式,正則表達式可以匹配多個主題,如果有人創(chuàng)建了新的主題,并且主題的名字與正則表達式相匹配,那么會立即觸發(fā)一次重平衡,消費者就可以讀取新的主題。
要訂閱所有與 test 相關(guān)的主題,可以這樣做
- consumer.subscribe("test.*");
輪詢
我們知道,Kafka 是支持訂閱/發(fā)布模式的,生產(chǎn)者發(fā)送數(shù)據(jù)給 Kafka Broker,那么消費者是如何知道生產(chǎn)者發(fā)送了數(shù)據(jù)呢?其實生產(chǎn)者產(chǎn)生的數(shù)據(jù)消費者是不知道的,KafkaConsumer 采用輪詢的方式定期去 Kafka Broker 中進行數(shù)據(jù)的檢索,如果有數(shù)據(jù)就用來消費,如果沒有就再繼續(xù)輪詢等待,下面是輪詢等待的具體實現(xiàn)
- try {
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
- for (ConsumerRecord<String, String> record : records) {
- int updateCount = 1;
- if (map.containsKey(record.value())) {
- updateCount = (int) map.get(record.value() + 1);
- }
- map.put(record.value(), updateCount);
- }
- }
- }finally {
- consumer.close();
- }
- 這是一個無限循環(huán)。消費者實際上是一個長期運行的應(yīng)用程序,它通過輪詢的方式向 Kafka 請求數(shù)據(jù)。
- 第三行代碼非常重要,Kafka 必須定期循環(huán)請求數(shù)據(jù),否則就會認為該 Consumer 已經(jīng)掛了,會觸發(fā)重平衡,它的分區(qū)會移交給群組中的其它消費者。傳給 poll() 方法的是一個超市時間,用 java.time.Duration 類來表示,如果該參數(shù)被設(shè)置為 0 ,poll() 方法會立刻返回,否則就會在指定的毫秒數(shù)內(nèi)一直等待 broker 返回數(shù)據(jù)。
- poll() 方法會返回一個記錄列表。每條記錄都包含了記錄所屬主題的信息,記錄所在分區(qū)的信息、記錄在分區(qū)中的偏移量,以及記錄的鍵值對。我們一般會遍歷這個列表,逐條處理每條記錄。
- 在退出應(yīng)用程序之前使用 close() 方法關(guān)閉消費者。網(wǎng)絡(luò)連接和 socket 也會隨之關(guān)閉,并立即觸發(fā)一次重平衡,而不是等待群組協(xié)調(diào)器發(fā)現(xiàn)它不再發(fā)送心跳并認定它已經(jīng)死亡。
線程安全性
在同一個群組中,我們無法讓一個線程運行多個消費者,也無法讓多個線程安全的共享一個消費者。按照規(guī)則,一個消費者使用一個線程,如果一個消費者群組中多個消費者都想要運行的話,那么必須讓每個消費者在自己的線程中運行,可以使用 Java 中的 ExecutorService 啟動多個消費者進行進行處理。
消費者配置
到目前為止,我們學(xué)習(xí)了如何使用消費者 API,不過只介紹了幾個最基本的屬性,Kafka 文檔列出了所有與消費者相關(guān)的配置說明。大部分參數(shù)都有合理的默認值,一般不需要修改它們,下面我們就來介紹一下這些參數(shù)。
- fetch.min.bytes
該屬性指定了消費者從服務(wù)器獲取記錄的最小字節(jié)數(shù)。broker 在收到消費者的數(shù)據(jù)請求時,如果可用的數(shù)據(jù)量小于 fetch.min.bytes 指定的大小,那么它會等到有足夠的可用數(shù)據(jù)時才把它返回給消費者。這樣可以降低消費者和 broker 的工作負載,因為它們在主題使用頻率不是很高的時候就不用來回處理消息。如果沒有很多可用數(shù)據(jù),但消費者的 CPU 使用率很高,那么就需要把該屬性的值設(shè)得比默認值大。如果消費者的數(shù)量比較多,把該屬性的值調(diào)大可以降低 broker 的工作負載。
- fetch.max.wait.ms
我們通過上面的 fetch.min.bytes 告訴 Kafka,等到有足夠的數(shù)據(jù)時才會把它返回給消費者。而 fetch.max.wait.ms 則用于指定 broker 的等待時間,默認是 500 毫秒。如果沒有足夠的數(shù)據(jù)流入 kafka 的話,消費者獲取的最小數(shù)據(jù)量要求就得不到滿足,最終導(dǎo)致 500 毫秒的延遲。如果要降低潛在的延遲,就可以把參數(shù)值設(shè)置的小一些。如果 fetch.max.wait.ms 被設(shè)置為 100 毫秒的延遲,而 fetch.min.bytes 的值設(shè)置為 1MB,那么 Kafka 在收到消費者請求后,要么返回 1MB 的數(shù)據(jù),要么在 100 ms 后返回所有可用的數(shù)據(jù)。就看哪個條件首先被滿足。
- max.partition.fetch.bytes
該屬性指定了服務(wù)器從每個分區(qū)里返回給消費者的最大字節(jié)數(shù)。它的默認值時 1MB,也就是說,KafkaConsumer.poll() 方法從每個分區(qū)里返回的記錄最多不超過 max.partition.fetch.bytes 指定的字節(jié)。如果一個主題有20個分區(qū)和5個消費者,那么每個消費者需要至少4 MB的可用內(nèi)存來接收記錄。在為消費者分配內(nèi)存時,可以給它們多分配一些,因為如果群組里有消費者發(fā)生崩潰,剩下的消費者需要處理更多的分區(qū)。max.partition.fetch.bytes 的值必須比 broker 能夠接收的最大消息的字節(jié)數(shù)(通過 max.message.size 屬性配置大),否則消費者可能無法讀取這些消息,導(dǎo)致消費者一直掛起重試。 在設(shè)置該屬性時,另外一個考量的因素是消費者處理數(shù)據(jù)的時間。消費者需要頻繁的調(diào)用 poll() 方法來避免會話過期和發(fā)生分區(qū)再平衡,如果單次調(diào)用poll() 返回的數(shù)據(jù)太多,消費者需要更多的時間進行處理,可能無法及時進行下一個輪詢來避免會話過期。如果出現(xiàn)這種情況,可以把 max.partition.fetch.bytes 值改小,或者延長會話過期時間。
- session.timeout.ms
這個屬性指定了消費者在被認為死亡之前可以與服務(wù)器斷開連接的時間,默認是 3s。如果消費者沒有在 session.timeout.ms 指定的時間內(nèi)發(fā)送心跳給群組協(xié)調(diào)器,就會被認定為死亡,協(xié)調(diào)器就會觸發(fā)重平衡。把它的分區(qū)分配給消費者群組中的其它消費者,此屬性與 heartbeat.interval.ms 緊密相關(guān)。heartbeat.interval.ms 指定了 poll() 方法向群組協(xié)調(diào)器發(fā)送心跳的頻率,session.timeout.ms 則指定了消費者可以多久不發(fā)送心跳。所以,這兩個屬性一般需要同時修改,heartbeat.interval.ms 必須比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 應(yīng)該是 1s。把 session.timeout.ms 值設(shè)置的比默認值小,可以更快地檢測和恢復(fù)崩憤的節(jié)點,不過長時間的輪詢或垃圾收集可能導(dǎo)致非預(yù)期的重平衡。把該屬性的值設(shè)置得大一些,可以減少意外的重平衡,不過檢測節(jié)點崩潰需要更長的時間。
- auto.offset.reset
該屬性指定了消費者在讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況下的該如何處理。它的默認值是 latest,意思指的是,在偏移量無效的情況下,消費者將從最新的記錄開始讀取數(shù)據(jù)。另一個值是 earliest,意思指的是在偏移量無效的情況下,消費者將從起始位置處開始讀取分區(qū)的記錄。
- enable.auto.commit
我們稍后將介紹幾種不同的提交偏移量的方式。該屬性指定了消費者是否自動提交偏移量,默認值是 true,為了盡量避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為 false,由自己控制何時提交偏移量。如果把它設(shè)置為 true,還可以通過 auto.commit.interval.ms 屬性來控制提交的頻率
- partition.assignment.strategy
我們知道,分區(qū)會分配給群組中的消費者。PartitionAssignor 會根據(jù)給定的消費者和主題,決定哪些分區(qū)應(yīng)該被分配給哪個消費者,Kafka 有兩個默認的分配策略Range 和 RoundRobin
- client.id
該屬性可以是任意字符串,broker 用他來標(biāo)識從客戶端發(fā)送過來的消息,通常被用在日志、度量指標(biāo)和配額中
- max.poll.records
該屬性用于控制單次調(diào)用 call() 方法能夠返回的記錄數(shù)量,可以幫你控制在輪詢中需要處理的數(shù)據(jù)量。
- receive.buffer.bytes 和 send.buffer.bytes
socket 在讀寫數(shù)據(jù)時用到的 TCP 緩沖區(qū)也可以設(shè)置大小。如果它們被設(shè)置為 -1,就使用操作系統(tǒng)默認值。如果生產(chǎn)者或消費者與 broker 處于不同的數(shù)據(jù)中心內(nèi),可以適當(dāng)增大這些值,因為跨數(shù)據(jù)中心的網(wǎng)絡(luò)一般都有比較高的延遲和比較低的帶寬。
提交和偏移量的概念
特殊偏移
我們上面提到,消費者在每次調(diào)用poll() 方法進行定時輪詢的時候,會返回由生產(chǎn)者寫入 Kafka 但是還沒有被消費者消費的記錄,因此我們可以追蹤到哪些記錄是被群組里的哪個消費者讀取的。消費者可以使用 Kafka 來追蹤消息在分區(qū)中的位置(偏移量)
消費者會向一個叫做 _consumer_offset 的特殊主題中發(fā)送消息,這個主題會保存每次所發(fā)送消息中的分區(qū)偏移量,這個主題的主要作用就是消費者觸發(fā)重平衡后記錄偏移使用的,消費者每次向這個主題發(fā)送消息,正常情況下不觸發(fā)重平衡,這個主題是不起作用的,當(dāng)觸發(fā)重平衡后,消費者停止工作,每個消費者可能會分到對應(yīng)的分區(qū),這個主題就是讓消費者能夠繼續(xù)處理消息所設(shè)置的。
如果提交的偏移量小于客戶端最后一次處理的偏移量,那么位于兩個偏移量之間的消息就會被重復(fù)處理
如果提交的偏移量大于最后一次消費時的偏移量,那么處于兩個偏移量中間的消息將會丟失
既然_consumer_offset 如此重要,那么它的提交方式是怎樣的呢?下面我們就來說一下
提交方式
KafkaConsumer API 提供了多種方式來提交偏移量
自動提交
最簡單的方式就是讓消費者自動提交偏移量。如果 enable.auto.commit 被設(shè)置為true,那么每過 5s,消費者會自動把從 poll() 方法輪詢到的最大偏移量提交上去。提交時間間隔由 auto.commit.interval.ms 控制,默認是 5s。與消費者里的其他東西一樣,自動提交也是在輪詢中進行的。消費者在每次輪詢中會檢查是否提交該偏移量了,如果是,那么就會提交從上一次輪詢中返回的偏移量。
提交當(dāng)前偏移量
把 auto.commit.offset 設(shè)置為 false,可以讓應(yīng)用程序決定何時提交偏移量。使用 commitSync() 提交偏移量。這個 API 會提交由 poll() 方法返回的最新偏移量,提交成功后馬上返回,如果提交失敗就拋出異常。
commitSync() 將會提交由 poll() 返回的最新偏移量,如果處理完所有記錄后要確保調(diào)用了 commitSync(),否則還是會有丟失消息的風(fēng)險,如果發(fā)生了在均衡,從最近一批消息到發(fā)生在均衡之間的所有消息都將被重復(fù)處理。
異步提交
異步提交 commitAsync() 與同步提交 commitSync() 最大的區(qū)別在于異步提交不會進行重試,同步提交會一致進行重試。
同步和異步組合提交
一般情況下,針對偶爾出現(xiàn)的提交失敗,不進行重試不會有太大的問題,因為如果提交失敗是因為臨時問題導(dǎo)致的,那么后續(xù)的提交總會有成功的。但是如果在關(guān)閉消費者或再均衡前的最后一次提交,就要確保提交成功。
因此,在消費者關(guān)閉之前一般會組合使用commitAsync和commitSync提交偏移量。
提交特定的偏移量
消費者API允許調(diào)用 commitSync() 和 commitAsync() 方法時傳入希望提交的 partition 和 offset 的 map,即提交特定的偏移量。