自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

帶你漲姿勢的認識一下Kafka Consumer

開發(fā) 架構(gòu) Kafka
之前我們介紹過了 Kafka 整體架構(gòu),Kafka 生產(chǎn)者,Kafka 生產(chǎn)的消息最終流向哪里呢?當(dāng)然是需要消費了,要不只產(chǎn)生一系列數(shù)據(jù)沒有任何作用啊。

之前我們介紹過了 Kafka 整體架構(gòu),Kafka 生產(chǎn)者,Kafka 生產(chǎn)的消息最終流向哪里呢?當(dāng)然是需要消費了,要不只產(chǎn)生一系列數(shù)據(jù)沒有任何作用啊,如果把 Kafka 比作餐廳的話,那么生產(chǎn)者就是廚師的角色,消費者就是客人,只有廚師的話,那么炒出來的菜沒有人吃也沒有意義,如果只有客人沒有廚師的話,誰會去這個店吃飯呢?!所以如果你看完前面的文章意猶未盡的話,可以繼續(xù)讓你爽一爽。如果你沒看過前面的文章,那就從現(xiàn)在開始讓你爽。

Kafka 消費者概念

應(yīng)用程序使用 KafkaConsumer 從 Kafka 中訂閱主題并接收來自這些主題的消息,然后再把他們保存起來。應(yīng)用程序首先需要創(chuàng)建一個 KafkaConsumer 對象,訂閱主題并開始接受消息,驗證消息并保存結(jié)果。一段時間后,生產(chǎn)者往主題寫入的速度超過了應(yīng)用程序驗證數(shù)據(jù)的速度,這時候該如何處理?如果只使用單個消費者的話,應(yīng)用程序會跟不上消息生成的速度,就像多個生產(chǎn)者像相同的主題寫入消息一樣,這時候就需要多個消費者共同參與消費主題中的消息,對消息進行分流處理。

Kafka 消費者從屬于消費者群組。一個群組中的消費者訂閱的都是相同的主題,每個消費者接收主題一部分分區(qū)的消息。下面是一個 Kafka 分區(qū)消費示意圖

上圖中的主題 T1 有四個分區(qū),分別是分區(qū)0、分區(qū)1、分區(qū)2、分區(qū)3,我們創(chuàng)建一個消費者群組1,消費者群組中只有一個消費者,它訂閱主題T1,接收到 T1 中的全部消息。由于一個消費者處理四個生產(chǎn)者發(fā)送到分區(qū)的消息,壓力有些大,需要幫手來幫忙分擔(dān)任務(wù),于是就演變?yōu)橄聢D

這樣一來,消費者的消費能力就大大提高了,但是在某些環(huán)境下比如用戶產(chǎn)生消息特別多的時候,生產(chǎn)者產(chǎn)生的消息仍舊讓消費者吃不消,那就繼續(xù)增加消費者。

如上圖所示,每個分區(qū)所產(chǎn)生的消息能夠被每個消費者群組中的消費者消費,如果向消費者群組中增加更多的消費者,那么多余的消費者將會閑置,如下圖所示

向群組中增加消費者是橫向伸縮消費能力的主要方式??偠灾?,我們可以通過增加消費組的消費者來進行水平擴展提升消費能力。這也是為什么建議創(chuàng)建主題時使用比較多的分區(qū)數(shù),這樣可以在消費負載高的情況下增加消費者來提升性能。另外,消費者的數(shù)量不應(yīng)該比分區(qū)數(shù)多,因為多出來的消費者是空閑的,沒有任何幫助。

Kafka 一個很重要的特性就是,只需寫入一次消息,可以支持任意多的應(yīng)用讀取這個消息。換句話說,每個應(yīng)用都可以讀到全量的消息。為了使得每個應(yīng)用都能讀到全量消息,應(yīng)用需要有不同的消費組。對于上面的例子,假如我們新增了一個新的消費組 G2,而這個消費組有兩個消費者,那么就演變?yōu)橄聢D這樣

在這個場景中,消費組 G1 和消費組 G2 都能收到 T1 主題的全量消息,在邏輯意義上來說它們屬于不同的應(yīng)用。

總結(jié)起來就是如果應(yīng)用需要讀取全量消息,那么請為該應(yīng)用設(shè)置一個消費組;如果該應(yīng)用消費能力不足,那么可以考慮在這個消費組里增加消費者。

消費者組和分區(qū)重平衡

消費者組是什么

消費者組(Consumer Group)是由一個或多個消費者實例(Consumer Instance)組成的群組,具有可擴展性和可容錯性的一種機制。消費者組內(nèi)的消費者共享一個消費者組ID,這個ID 也叫做 Group ID,組內(nèi)的消費者共同對一個主題進行訂閱和消費,同一個組中的消費者只能消費一個分區(qū)的消息,多余的消費者會閑置,派不上用場。

我們在上面提到了兩種消費方式

  •  一個消費者群組消費一個主題中的消息,這種消費模式又稱為點對點的消費方式,點對點的消費方式又被稱為消息隊列
  •  一個主題中的消息被多個消費者群組共同消費,這種消費模式又稱為發(fā)布-訂閱模式

消費者重平衡

我們從上面的消費者演變圖中可以知道這么一個過程:最初是一個消費者訂閱一個主題并消費其全部分區(qū)的消息,后來有一個消費者加入群組,隨后又有更多的消費者加入群組,而新加入的消費者實例分攤了最初消費者的部分消息,這種把分區(qū)的所有權(quán)通過一個消費者轉(zhuǎn)到其他消費者的行為稱為重平衡,英文名也叫做 Rebalance 。如下圖所示

重平衡非常重要,它為消費者群組帶來了高可用性 和 伸縮性,我們可以放心的添加消費者或移除消費者,不過在正常情況下我們并不希望發(fā)生這樣的行為。在重平衡期間,消費者無法讀取消息,造成整個消費者組在重平衡的期間都不可用。另外,當(dāng)分區(qū)被重新分配給另一個消費者時,消息當(dāng)前的讀取狀態(tài)會丟失,它有可能還需要去刷新緩存,在它重新恢復(fù)狀態(tài)之前會拖慢應(yīng)用程序。

消費者通過向組織協(xié)調(diào)者(Kafka Broker)發(fā)送心跳來維護自己是消費者組的一員并確認其擁有的分區(qū)。對于不同不的消費群體來說,其組織協(xié)調(diào)者可以是不同的。只要消費者定期發(fā)送心跳,就會認為消費者是存活的并處理其分區(qū)中的消息。當(dāng)消費者檢索記錄或者提交它所消費的記錄時就會發(fā)送心跳。

如果過了一段時間 Kafka 停止發(fā)送心跳了,會話(Session)就會過期,組織協(xié)調(diào)者就會認為這個 Consumer 已經(jīng)死亡,就會觸發(fā)一次重平衡。如果消費者宕機并且停止發(fā)送消息,組織協(xié)調(diào)者會等待幾秒鐘,確認它死亡了才會觸發(fā)重平衡。在這段時間里,死亡的消費者將不處理任何消息。在清理消費者時,消費者將通知協(xié)調(diào)者它要離開群組,組織協(xié)調(diào)者會觸發(fā)一次重平衡,盡量降低處理停頓。

重平衡是一把雙刃劍,它為消費者群組帶來高可用性和伸縮性的同時,還有有一些明顯的缺點(bug),而這些 bug 到現(xiàn)在社區(qū)還無法修改。

重平衡的過程對消費者組有極大的影響。因為每次重平衡過程中都會導(dǎo)致萬物靜止,參考 JVM 中的垃圾回收機制,也就是 Stop The World ,STW,(引用自《深入理解 Java 虛擬機》中 p76 關(guān)于 Serial 收集器的描述):

更重要的是它在進行垃圾收集時,必須暫停其他所有的工作線程。直到它收集結(jié)束。Stop The World 這個名字聽起來很帥,但這項工作實際上是由虛擬機在后臺自動發(fā)起并完成的,在用戶不可見的情況下把用戶正常工作的線程全部停掉,這對很多應(yīng)用來說都是難以接受的。

也就是說,在重平衡期間,消費者組中的消費者實例都會停止消費,等待重平衡的完成。而且重平衡這個過程很慢......

創(chuàng)建消費者

上面的理論說的有點多,下面就通過代碼來講解一下消費者是如何消費的

在讀取消息之前,需要先創(chuàng)建一個 KafkaConsumer 對象。創(chuàng)建 KafkaConsumer 對象與創(chuàng)建 KafkaProducer 對象十分相似 --- 把需要傳遞給消費者的屬性放在 properties 對象中,后面我們會著重討論 Kafka 的一些配置,這里我們先簡單的創(chuàng)建一下,使用3個屬性就足矣,分別是 bootstrap.server,key.deserializer,value.deserializer 。

這三個屬性我們已經(jīng)用過很多次了,如果你還不是很清楚的話,可以參考 帶你漲姿勢是認識一下Kafka Producer

還有一個屬性是 group.id 這個屬性不是必須的,它指定了 KafkaConsumer 是屬于哪個消費者群組。創(chuàng)建不屬于任何一個群組的消費者也是可以的 

  1. Properties properties = new Properties();  
  2.         properties.put("bootstrap.server","192.168.1.9:9092");     properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");   properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); 
  3.  KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties); 

主題訂閱

創(chuàng)建好消費者之后,下一步就開始訂閱主題了。subscribe() 方法接受一個主題列表作為參數(shù),使用起來比較簡單 

  1. consumer.subscribe(Collections.singletonList("customerTopic")); 

為了簡單我們只訂閱了一個主題 customerTopic,參數(shù)傳入的是一個正則表達式,正則表達式可以匹配多個主題,如果有人創(chuàng)建了新的主題,并且主題的名字與正則表達式相匹配,那么會立即觸發(fā)一次重平衡,消費者就可以讀取新的主題。

要訂閱所有與 test 相關(guān)的主題,可以這樣做 

  1. consumer.subscribe("test.*"); 

輪詢

我們知道,Kafka 是支持訂閱/發(fā)布模式的,生產(chǎn)者發(fā)送數(shù)據(jù)給 Kafka Broker,那么消費者是如何知道生產(chǎn)者發(fā)送了數(shù)據(jù)呢?其實生產(chǎn)者產(chǎn)生的數(shù)據(jù)消費者是不知道的,KafkaConsumer 采用輪詢的方式定期去 Kafka Broker 中進行數(shù)據(jù)的檢索,如果有數(shù)據(jù)就用來消費,如果沒有就再繼續(xù)輪詢等待,下面是輪詢等待的具體實現(xiàn) 

  1. try {  
  2.   while (true) {  
  3.     ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));  
  4.     for (ConsumerRecord<String, String> record : records) {  
  5.       int updateCount = 1 
  6.       if (map.containsKey(record.value())) {  
  7.         updateCount = (int) map.get(record.value() + 1);  
  8.       }  
  9.       map.put(record.value(), updateCount);  
  10.     }  
  11.   }  
  12. }finally {  
  13.   consumer.close();  
  •  這是一個無限循環(huán)。消費者實際上是一個長期運行的應(yīng)用程序,它通過輪詢的方式向 Kafka 請求數(shù)據(jù)。
  •  第三行代碼非常重要,Kafka 必須定期循環(huán)請求數(shù)據(jù),否則就會認為該 Consumer 已經(jīng)掛了,會觸發(fā)重平衡,它的分區(qū)會移交給群組中的其它消費者。傳給 poll() 方法的是一個超市時間,用 java.time.Duration 類來表示,如果該參數(shù)被設(shè)置為 0 ,poll() 方法會立刻返回,否則就會在指定的毫秒數(shù)內(nèi)一直等待 broker 返回數(shù)據(jù)。
  •  poll() 方法會返回一個記錄列表。每條記錄都包含了記錄所屬主題的信息,記錄所在分區(qū)的信息、記錄在分區(qū)中的偏移量,以及記錄的鍵值對。我們一般會遍歷這個列表,逐條處理每條記錄。
  •  在退出應(yīng)用程序之前使用 close() 方法關(guān)閉消費者。網(wǎng)絡(luò)連接和 socket 也會隨之關(guān)閉,并立即觸發(fā)一次重平衡,而不是等待群組協(xié)調(diào)器發(fā)現(xiàn)它不再發(fā)送心跳并認定它已經(jīng)死亡。

線程安全性

在同一個群組中,我們無法讓一個線程運行多個消費者,也無法讓多個線程安全的共享一個消費者。按照規(guī)則,一個消費者使用一個線程,如果一個消費者群組中多個消費者都想要運行的話,那么必須讓每個消費者在自己的線程中運行,可以使用 Java 中的 ExecutorService 啟動多個消費者進行進行處理。

消費者配置

到目前為止,我們學(xué)習(xí)了如何使用消費者 API,不過只介紹了幾個最基本的屬性,Kafka 文檔列出了所有與消費者相關(guān)的配置說明。大部分參數(shù)都有合理的默認值,一般不需要修改它們,下面我們就來介紹一下這些參數(shù)。

  •  fetch.min.bytes

該屬性指定了消費者從服務(wù)器獲取記錄的最小字節(jié)數(shù)。broker 在收到消費者的數(shù)據(jù)請求時,如果可用的數(shù)據(jù)量小于 fetch.min.bytes 指定的大小,那么它會等到有足夠的可用數(shù)據(jù)時才把它返回給消費者。這樣可以降低消費者和 broker 的工作負載,因為它們在主題使用頻率不是很高的時候就不用來回處理消息。如果沒有很多可用數(shù)據(jù),但消費者的 CPU 使用率很高,那么就需要把該屬性的值設(shè)得比默認值大。如果消費者的數(shù)量比較多,把該屬性的值調(diào)大可以降低 broker 的工作負載。

  •  fetch.max.wait.ms

我們通過上面的 fetch.min.bytes 告訴 Kafka,等到有足夠的數(shù)據(jù)時才會把它返回給消費者。而 fetch.max.wait.ms 則用于指定 broker 的等待時間,默認是 500 毫秒。如果沒有足夠的數(shù)據(jù)流入 kafka 的話,消費者獲取的最小數(shù)據(jù)量要求就得不到滿足,最終導(dǎo)致 500 毫秒的延遲。如果要降低潛在的延遲,就可以把參數(shù)值設(shè)置的小一些。如果 fetch.max.wait.ms 被設(shè)置為 100 毫秒的延遲,而 fetch.min.bytes 的值設(shè)置為 1MB,那么 Kafka 在收到消費者請求后,要么返回 1MB 的數(shù)據(jù),要么在 100 ms 后返回所有可用的數(shù)據(jù)。就看哪個條件首先被滿足。

  •  max.partition.fetch.bytes

該屬性指定了服務(wù)器從每個分區(qū)里返回給消費者的最大字節(jié)數(shù)。它的默認值時 1MB,也就是說,KafkaConsumer.poll() 方法從每個分區(qū)里返回的記錄最多不超過 max.partition.fetch.bytes 指定的字節(jié)。如果一個主題有20個分區(qū)和5個消費者,那么每個消費者需要至少4 MB的可用內(nèi)存來接收記錄。在為消費者分配內(nèi)存時,可以給它們多分配一些,因為如果群組里有消費者發(fā)生崩潰,剩下的消費者需要處理更多的分區(qū)。max.partition.fetch.bytes 的值必須比 broker 能夠接收的最大消息的字節(jié)數(shù)(通過 max.message.size 屬性配置大),否則消費者可能無法讀取這些消息,導(dǎo)致消費者一直掛起重試。 在設(shè)置該屬性時,另外一個考量的因素是消費者處理數(shù)據(jù)的時間。消費者需要頻繁的調(diào)用 poll() 方法來避免會話過期和發(fā)生分區(qū)再平衡,如果單次調(diào)用poll() 返回的數(shù)據(jù)太多,消費者需要更多的時間進行處理,可能無法及時進行下一個輪詢來避免會話過期。如果出現(xiàn)這種情況,可以把 max.partition.fetch.bytes 值改小,或者延長會話過期時間。

  •  session.timeout.ms

這個屬性指定了消費者在被認為死亡之前可以與服務(wù)器斷開連接的時間,默認是 3s。如果消費者沒有在 session.timeout.ms 指定的時間內(nèi)發(fā)送心跳給群組協(xié)調(diào)器,就會被認定為死亡,協(xié)調(diào)器就會觸發(fā)重平衡。把它的分區(qū)分配給消費者群組中的其它消費者,此屬性與 heartbeat.interval.ms 緊密相關(guān)。heartbeat.interval.ms 指定了 poll() 方法向群組協(xié)調(diào)器發(fā)送心跳的頻率,session.timeout.ms 則指定了消費者可以多久不發(fā)送心跳。所以,這兩個屬性一般需要同時修改,heartbeat.interval.ms 必須比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 應(yīng)該是 1s。把 session.timeout.ms 值設(shè)置的比默認值小,可以更快地檢測和恢復(fù)崩憤的節(jié)點,不過長時間的輪詢或垃圾收集可能導(dǎo)致非預(yù)期的重平衡。把該屬性的值設(shè)置得大一些,可以減少意外的重平衡,不過檢測節(jié)點崩潰需要更長的時間。

  •  auto.offset.reset

該屬性指定了消費者在讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況下的該如何處理。它的默認值是 latest,意思指的是,在偏移量無效的情況下,消費者將從最新的記錄開始讀取數(shù)據(jù)。另一個值是 earliest,意思指的是在偏移量無效的情況下,消費者將從起始位置處開始讀取分區(qū)的記錄。

  •  enable.auto.commit

我們稍后將介紹幾種不同的提交偏移量的方式。該屬性指定了消費者是否自動提交偏移量,默認值是 true,為了盡量避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為 false,由自己控制何時提交偏移量。如果把它設(shè)置為 true,還可以通過 auto.commit.interval.ms 屬性來控制提交的頻率

  •  partition.assignment.strategy

我們知道,分區(qū)會分配給群組中的消費者。PartitionAssignor 會根據(jù)給定的消費者和主題,決定哪些分區(qū)應(yīng)該被分配給哪個消費者,Kafka 有兩個默認的分配策略Range 和 RoundRobin

  •  client.id

該屬性可以是任意字符串,broker 用他來標(biāo)識從客戶端發(fā)送過來的消息,通常被用在日志、度量指標(biāo)和配額中

  •  max.poll.records

該屬性用于控制單次調(diào)用 call() 方法能夠返回的記錄數(shù)量,可以幫你控制在輪詢中需要處理的數(shù)據(jù)量。

  •  receive.buffer.bytes 和 send.buffer.bytes

socket 在讀寫數(shù)據(jù)時用到的 TCP 緩沖區(qū)也可以設(shè)置大小。如果它們被設(shè)置為 -1,就使用操作系統(tǒng)默認值。如果生產(chǎn)者或消費者與 broker 處于不同的數(shù)據(jù)中心內(nèi),可以適當(dāng)增大這些值,因為跨數(shù)據(jù)中心的網(wǎng)絡(luò)一般都有比較高的延遲和比較低的帶寬。

提交和偏移量的概念

特殊偏移

我們上面提到,消費者在每次調(diào)用poll() 方法進行定時輪詢的時候,會返回由生產(chǎn)者寫入 Kafka 但是還沒有被消費者消費的記錄,因此我們可以追蹤到哪些記錄是被群組里的哪個消費者讀取的。消費者可以使用 Kafka 來追蹤消息在分區(qū)中的位置(偏移量)

消費者會向一個叫做 _consumer_offset 的特殊主題中發(fā)送消息,這個主題會保存每次所發(fā)送消息中的分區(qū)偏移量,這個主題的主要作用就是消費者觸發(fā)重平衡后記錄偏移使用的,消費者每次向這個主題發(fā)送消息,正常情況下不觸發(fā)重平衡,這個主題是不起作用的,當(dāng)觸發(fā)重平衡后,消費者停止工作,每個消費者可能會分到對應(yīng)的分區(qū),這個主題就是讓消費者能夠繼續(xù)處理消息所設(shè)置的。

如果提交的偏移量小于客戶端最后一次處理的偏移量,那么位于兩個偏移量之間的消息就會被重復(fù)處理

如果提交的偏移量大于最后一次消費時的偏移量,那么處于兩個偏移量中間的消息將會丟失

既然_consumer_offset 如此重要,那么它的提交方式是怎樣的呢?下面我們就來說一下

提交方式

KafkaConsumer API 提供了多種方式來提交偏移量

自動提交

最簡單的方式就是讓消費者自動提交偏移量。如果 enable.auto.commit 被設(shè)置為true,那么每過 5s,消費者會自動把從 poll() 方法輪詢到的最大偏移量提交上去。提交時間間隔由 auto.commit.interval.ms 控制,默認是 5s。與消費者里的其他東西一樣,自動提交也是在輪詢中進行的。消費者在每次輪詢中會檢查是否提交該偏移量了,如果是,那么就會提交從上一次輪詢中返回的偏移量。

提交當(dāng)前偏移量

把 auto.commit.offset 設(shè)置為 false,可以讓應(yīng)用程序決定何時提交偏移量。使用 commitSync() 提交偏移量。這個 API 會提交由 poll() 方法返回的最新偏移量,提交成功后馬上返回,如果提交失敗就拋出異常。

commitSync() 將會提交由 poll() 返回的最新偏移量,如果處理完所有記錄后要確保調(diào)用了 commitSync(),否則還是會有丟失消息的風(fēng)險,如果發(fā)生了在均衡,從最近一批消息到發(fā)生在均衡之間的所有消息都將被重復(fù)處理。

異步提交

異步提交 commitAsync() 與同步提交 commitSync() 最大的區(qū)別在于異步提交不會進行重試,同步提交會一致進行重試。

同步和異步組合提交

一般情況下,針對偶爾出現(xiàn)的提交失敗,不進行重試不會有太大的問題,因為如果提交失敗是因為臨時問題導(dǎo)致的,那么后續(xù)的提交總會有成功的。但是如果在關(guān)閉消費者或再均衡前的最后一次提交,就要確保提交成功。

因此,在消費者關(guān)閉之前一般會組合使用commitAsync和commitSync提交偏移量。

提交特定的偏移量

消費者API允許調(diào)用 commitSync() 和 commitAsync() 方法時傳入希望提交的 partition 和 offset 的 map,即提交特定的偏移量。 

 

責(zé)任編輯:龐桂玉 來源: segmentfault
相關(guān)推薦

2022-12-07 08:13:55

CNI抽象接口

2023-05-03 09:09:28

Golang數(shù)組

2022-09-08 13:58:39

Spring高并發(fā)異步

2023-05-29 08:32:40

JAVA重寫重載

2013-04-17 11:21:59

Windows PhoWindows Pho

2018-12-24 09:51:22

CPU天梯圖Inter

2024-05-27 00:00:00

AmpPHP非阻塞

2018-04-02 09:07:36

CIO

2020-10-15 07:13:53

算法監(jiān)控數(shù)據(jù)

2020-02-10 14:26:10

GitHub代碼倉庫

2020-12-10 08:44:35

WebSocket輪詢Comet

2020-09-25 19:53:39

數(shù)據(jù)

2020-04-26 09:59:00

黑客機器學(xué)習(xí)網(wǎng)絡(luò)安全

2022-03-07 06:34:22

CQRS數(shù)據(jù)庫數(shù)據(jù)模型

2012-07-12 15:08:59

WebGL

2021-06-29 19:27:53

JAVA方法接口

2020-02-20 11:32:09

Kafka概念問題

2022-07-20 08:55:02

區(qū)塊鏈技術(shù)數(shù)據(jù)記錄

2022-01-17 14:25:14

索引數(shù)據(jù)庫搜索

2017-01-16 17:30:52

點贊
收藏

51CTO技術(shù)棧公眾號