自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

消費(fèi)者實(shí)現(xiàn)邏輯-kafka知識(shí)體系(四)

開發(fā) 架構(gòu) Kafka
Consumer Group 是 Kafka 提供的可擴(kuò)展且具有容錯(cuò)性的消費(fèi)者機(jī)制。Kafka 僅僅使用 Consumer Group 這一種機(jī)制,卻同時(shí)實(shí)現(xiàn)了傳統(tǒng)消息引擎系統(tǒng)的兩大模型:消息隊(duì)列模型、發(fā)布 / 訂閱模型。

[[410017]]

上篇文章分享kafka broker 的實(shí)現(xiàn)原理、數(shù)據(jù)的存儲(chǔ)結(jié)構(gòu)和消息持久化相關(guān)的東西,那消息存儲(chǔ)完了之后,怎么被消費(fèi)端消費(fèi)呢,本文來聊一聊Kafka 消費(fèi)端的那些事兒。

1)拉取機(jī)制

Kafka生產(chǎn)端是推的機(jī)制即Push,消費(fèi)端是拉的機(jī)制即Pull。

2)Pull的優(yōu)缺點(diǎn)

優(yōu)點(diǎn)是消費(fèi)端可以自己控制消息的讀取速度和數(shù)量;

缺點(diǎn)是不知道服務(wù)端有沒有數(shù)據(jù),所以要一直pull或隔一定時(shí)間pull,可能要pull多次并等待。

3)消息投遞語義:

Kafka默認(rèn)保證at-least-once delivery,容許用戶實(shí)現(xiàn)at-most-once語義,exactly-once的實(shí)現(xiàn)取決于目的存儲(chǔ)系統(tǒng)。

4)分區(qū)分配策略

RangeAssignor:按照分區(qū)范圍分配,當(dāng)前默認(rèn)策略;

RoundRobinAssignor:輪詢的方式分配;

StickyAssignor:Kafka 0.11版本引入,根據(jù)更多指標(biāo)比如負(fù)載,盡可能均勻。

這些前面的文章中也有提到。

消費(fèi)者組

Consumer Group 是 Kafka 提供的可擴(kuò)展且具有容錯(cuò)性的消費(fèi)者機(jī)制。Kafka 僅僅使用 Consumer Group 這一種機(jī)制,卻同時(shí)實(shí)現(xiàn)了傳統(tǒng)消息引擎系統(tǒng)的兩大模型:消息隊(duì)列模型、發(fā)布 / 訂閱模型。

理想情況下,Consumer 實(shí)例的數(shù)量應(yīng)該等于該 Group 訂閱主題的分區(qū)總數(shù)。

【消費(fèi)者和消費(fèi)組】

Kafka消費(fèi)者是消費(fèi)組的一部分,當(dāng)多個(gè)消費(fèi)者形成一個(gè)消費(fèi)組來消費(fèi)主題時(shí),每個(gè)消費(fèi)者會(huì)收到不同分區(qū)的消息。假設(shè)有一個(gè)T1主題,該主題有4個(gè)分區(qū);同時(shí)我們有一個(gè)消費(fèi)組G1,這個(gè)消費(fèi)組只有一個(gè)消費(fèi)者C1。那么消費(fèi)者C1將會(huì)收到這4個(gè)分區(qū)的消息,如下所示:

Kafka一個(gè)很重要的特性就是,只需寫入一次消息,可以支持任意多的應(yīng)用讀取這個(gè)消息。換句話說,每個(gè)應(yīng)用都可以讀到全量的消息。為了使得每個(gè)應(yīng)用都能讀到全量消息,應(yīng)用需要有不同的消費(fèi)組。對(duì)于上面的例子,假如我們新增了一個(gè)新的消費(fèi)組G2,而這個(gè)消費(fèi)組有兩個(gè)消費(fèi)者,那么會(huì)是這樣的:

這里值得我們注意的是:

  • 一個(gè)topic 可以被 多個(gè) 消費(fèi)者組 消費(fèi),

但是每個(gè) 消費(fèi)者組 消費(fèi)的數(shù)據(jù)是 互不干擾 的,也就是說,每個(gè) 消費(fèi)組 消費(fèi)的都是 完整的數(shù)據(jù) 。

  • 一個(gè)分區(qū)只能被 同一個(gè)消費(fèi)組內(nèi) 的一個(gè) 消費(fèi)者 消費(fèi),

而 不能拆給多個(gè)消費(fèi)者 消費(fèi),也就是說如果你某個(gè) 消費(fèi)者組內(nèi)的消費(fèi)者數(shù) 比 該 Topic 的分區(qū)數(shù)還多,那么多余的消費(fèi)者是不起作用的

消費(fèi)者分區(qū)分配的過程

那么我們現(xiàn)在就來看看分配過程是怎么樣的。

1.確定 群組協(xié)調(diào)器

每當(dāng)我們創(chuàng)建一個(gè)消費(fèi)組,kafka 會(huì)為我們分配一個(gè) broker 作為該消費(fèi)組的 coordinator(協(xié)調(diào)器)

2.注冊消費(fèi)者 并選出 leader consumer

當(dāng)我們的有了 coordinator 之后,消費(fèi)者將會(huì)開始往該 coordinator上進(jìn)行注冊,第一個(gè)注冊的 消費(fèi)者將成為該消費(fèi)組的 leader,后續(xù)的 作為 follower

3.當(dāng) leader 選出來后,

他會(huì)從coordinator那里實(shí)時(shí)獲取分區(qū) 和 consumer 信息,并根據(jù)分區(qū)策略給每個(gè)consumer 分配 分區(qū),并將分配結(jié)果告訴 coordinator。

4.follower 消費(fèi)者將從 coordinator 那里獲取到自己相關(guān)的分區(qū)信息進(jìn)行消費(fèi),

對(duì)于所有的 follower 消費(fèi)者而言,他們只知道自己消費(fèi)的分區(qū),并不知道其他消費(fèi)者的存在。

5.至此,消費(fèi)者都知道自己的消費(fèi)的分區(qū),

分區(qū)過程結(jié)束,當(dāng)發(fā)生 分區(qū)再均衡 的時(shí)候,leader 將會(huì)重復(fù)分配過程

具體的流程圖可以翻閱前面的文章。

關(guān)于位移

【位移 offset】

  • 每個(gè)消費(fèi)者在消費(fèi)消息的過程中必然需要有個(gè)字段記錄它當(dāng)前消費(fèi)到了分區(qū)的哪個(gè)位置上,這個(gè)字段就是消費(fèi)者位移(Consumer Offset),它是消費(fèi)者消費(fèi)進(jìn)度的指示器。
  • 看上去Offset 就是一個(gè)數(shù)值而已,其實(shí)對(duì)于 Consumer Group 而言,它是一組 KV 對(duì),Key 是分區(qū),V 對(duì)應(yīng) Consumer 消費(fèi)該分區(qū)的最新位移 TopicPartition->long
  • 不過切記的是消費(fèi)者位移是下一條消息的位移,而不是目前最新消費(fèi)消息的位移。
  • 提交位移主要是為了表征 Consumer 的消費(fèi)進(jìn)度,這樣當(dāng) Consumer 發(fā)生故障重啟之后,就能夠從 Kafka 中讀取之前提交的位移值,然后從相應(yīng)的位移處繼續(xù)消費(fèi),從而避免整個(gè)消費(fèi)過程重來一遍。

【位移的保存】

其實(shí)Consumer 端應(yīng)用程序在提交位移時(shí),其實(shí)是向 Coordinator 所在的 Broker 提交位移。同樣地,當(dāng) Consumer 應(yīng)用啟動(dòng)時(shí),也是向 Coordinator 所在的 Broker 發(fā)送各種請求,然后由 Coordinator 負(fù)責(zé)執(zhí)行消費(fèi)者組的注冊、成員管理記錄等元數(shù)據(jù)管理操作。

老版本的 Consumer Group 把位移保存在 ZooKeeper 中,新版本的 Consumer Group 中,Kafka 社區(qū)重新設(shè)計(jì)了 Consumer Group 的位移管理方式,采用了將位移保存在 Kafka內(nèi)部主題的方法,也就是__consumer_offsets,俗稱位移主題。至于為什么放棄kafka 保存位移請看我前面的文章《基礎(chǔ)概念、架構(gòu)和新版的升級(jí)Kafka知識(shí)體系1》。

【位移主題的數(shù)據(jù)格式】

key

  • 位移主題的 Key 中應(yīng)該保存 3 部分內(nèi)容:Group ID,主題名,分區(qū)號(hào)

value

  • 主要保存的是offset 的信息,當(dāng)然還有時(shí)間戳等信息,你還記得你可以根據(jù)時(shí)間重置一個(gè)消費(fèi)者開始消費(fèi)的地方嗎

【位移的提交】

1. 自動(dòng)提交

最簡單的提交方式是讓消費(fèi)者自動(dòng)提交偏移量,如果 enable.auto.commit 被設(shè)為 true,那么每過 5s,消費(fèi)者會(huì)自動(dòng)把從 poll() 方法接收到的最大偏移量提交上去。

可能造成的問題:數(shù)據(jù)重復(fù)讀

假設(shè)我們?nèi)匀皇褂媚J(rèn)的 5s 提交時(shí)間間隔,在最近一次提交之后的 3s 發(fā)生了再均衡,再均衡之后,消費(fèi)者從最后一次提交的偏移量位置開始讀取消息。這個(gè)時(shí)候偏移量已經(jīng)落后了 3s,所以在這 3s內(nèi)到達(dá)的消息會(huì)被重復(fù)處理??梢酝ㄟ^修改提交時(shí)間間隔來更頻繁地提交偏移量,減小可能出現(xiàn)重復(fù)消息的時(shí)間窗,不過這種情況是無法完全避免的。

2. 手動(dòng)提交

2.1 同步提交

同步存在的問題

  • 從名字上來看,它是一個(gè)同步操作,即該方法會(huì)一直等待,直到位移被成功提交才會(huì)返回。如果提交過程中出現(xiàn)異常,該方法會(huì)將異常信息拋出。
  • commitSync()的問題在于,Consumer程序會(huì)處于阻塞狀態(tài),直到遠(yuǎn)端的Broker返回提交結(jié)果,這個(gè)狀態(tài)才會(huì)結(jié)束,需要注意的是同步提交會(huì)在提交失敗之后進(jìn)行重試
  • 在任何系統(tǒng)中,因?yàn)槌绦蚨琴Y源限制而導(dǎo)致的阻塞都可能是系統(tǒng)的瓶頸,會(huì)影響整個(gè)應(yīng)用程序的 TPS,影響吞吐量。

2.2 異步提交

手動(dòng)提交有一個(gè)不足之處,在 broker 對(duì)提交請求作出回應(yīng)之前,應(yīng)用程序會(huì)一直阻塞,這樣會(huì)限制應(yīng)用程序的吞吐量。我們可以通過降低提交頻率來提升吞吐量,但如果發(fā)生了再均衡,會(huì)增加重復(fù)消息的數(shù)量。

這時(shí)可以使用異步提交,只管發(fā)送提交請求,無需等待 broker 的響應(yīng)。它之所以不進(jìn)行重試,是因?yàn)樵谒盏椒?wù)器響應(yīng)的時(shí)候,可能有一個(gè)更大的偏移量已經(jīng)提交成功。

假設(shè)我們發(fā)出一個(gè)請求用于提交偏移量2000,這個(gè)時(shí)候發(fā)生了短暫的通信問題,服務(wù)器收不到請求,自然也不會(huì)作出任何響應(yīng)。與此同時(shí),我們處理了另外一批消息,并成功提交了偏移量3000。如果commitAsync()重新嘗試提交偏移量2000,它有可能在偏移量3000之后提交成功。這個(gè)時(shí)候如果發(fā)生再均衡,就會(huì)出現(xiàn)重復(fù)消息。

異步存在的問題

  • commitAsync 的問題在于,出現(xiàn)問題時(shí)它不會(huì)自動(dòng)重試。因?yàn)樗钱惒讲僮?,倘若提交失敗后自?dòng)重試,那么它重試時(shí)提交的位移值可能早已經(jīng)“過期”或不是最新值了。因此,異步提交的重試其實(shí)沒有意義,所以 commitAsync 是不會(huì)重試的,所以只要在程序停止前最后一次提交成功即可。
  • 這里提供一個(gè)解決方案,那就是不論成功還是失敗我們都將offsets信息記錄下來,如果最后一次提交成功那就忽略,如果最后一次沒有提交成功,我們可以在下次重啟的時(shí)候手動(dòng)指定offset

綜合異步和同步來提交

同時(shí)使用了 commitSync() 和 commitAsync()。對(duì)于常規(guī)性、階段性的手動(dòng)提交,我們調(diào)用 commitAsync() 避免程序阻塞,而在 Consumer 要關(guān)閉前,我們調(diào)用 commitSync() 方法執(zhí)行同步阻塞式的位移提交,以確保 Consumer 關(guān)閉前能夠保存正確的位移數(shù)據(jù)。

關(guān)于再均衡Rebalance

分區(qū)的所有權(quán)從一個(gè)消費(fèi)者轉(zhuǎn)移到另一個(gè)消費(fèi)者,這樣的行為被稱為再均衡(Rebalance)。再均衡非常重要,為消費(fèi)者組帶來了高可用性和伸縮性,可以放心的增加或移除消費(fèi)者。以下是觸發(fā)再均衡的三種行為:

  1. 當(dāng)一個(gè) 消費(fèi)者 加入組時(shí),讀取了原本由其他消費(fèi)者讀取的分區(qū),會(huì)觸發(fā)再均衡。
  2. 當(dāng)一個(gè) 消費(fèi)者 離開組時(shí)(被關(guān)閉或發(fā)生崩潰),原本由它讀取的分區(qū)將被組里的其他 消費(fèi)者 來讀取,會(huì)觸發(fā)再均衡。
  3. 當(dāng) Topic 發(fā)生變化時(shí),比如添加了新的分區(qū),會(huì)發(fā)生分區(qū)重分配,會(huì)觸發(fā)再均衡。

分區(qū)再均衡 期間該 Topic 是不可用的,所以Rebalance 實(shí)在是太慢了!!!

這里再補(bǔ)充一下生產(chǎn)環(huán)境中因?yàn)椴徽_的配置引起的不需要的分區(qū)再均衡。

正常集群變動(dòng)不再考慮范圍內(nèi):

1.防止 因?yàn)槲茨芗皶r(shí)發(fā)送心跳,導(dǎo)致Consumer 超時(shí)被踢出消費(fèi)者組。

這里可以設(shè)置 session.timeout.ms超時(shí)時(shí)間 和 heartbeat.interval.ms 心跳間隔一般可以把 超時(shí)時(shí)間設(shè)置為 心跳間隔的 3倍。

2.Consumer消費(fèi)時(shí)間過長導(dǎo)致的。

Consumer端如果無法在規(guī)定時(shí)間內(nèi)消費(fèi)完 poll 來的消息,那么就認(rèn)為該消費(fèi)者有問題,從而該消費(fèi)者會(huì)自主離組,所以我們可以設(shè)置 max.poll.interval.ms比處理時(shí)間略長。

3.從第二點(diǎn)我們還可能引申一點(diǎn)就是,如果集群經(jīng)常發(fā)生 分區(qū)在均衡,

那么你可能需要去觀察下消費(fèi)者執(zhí)行任務(wù)的耗時(shí),特別注意觀察下 GC 的占用時(shí)間。

往往線上出問題也是因?yàn)榕渲貌缓侠韺?dǎo)致的。

 

責(zé)任編輯:姜華 來源: 今日頭條
相關(guān)推薦

2021-07-12 10:25:03

RocketMQ數(shù)據(jù)結(jié)構(gòu)kafka

2021-07-05 06:26:08

生產(chǎn)者kafka架構(gòu)

2021-07-07 07:06:31

Brokerkafka架構(gòu)

2023-06-01 08:08:38

kafka消費(fèi)者分區(qū)策略

2015-07-28 17:52:36

IOS知識(shí)體系

2021-07-14 17:18:14

RocketMQ消息分布式

2012-03-08 11:13:23

企業(yè)架構(gòu)

2017-06-22 13:07:21

2021-07-13 11:52:47

順序消息RocketMQkafka

2021-10-26 10:50:25

Kafkabroker

2021-12-22 11:00:05

模型Golang語言

2017-02-27 16:42:23

Spark識(shí)體系

2017-04-03 15:35:13

知識(shí)體系架構(gòu)

2021-07-02 06:27:00

Kafka架構(gòu)主從架構(gòu)

2015-08-26 09:39:30

java消費(fèi)者

2017-05-16 12:30:21

Python多線程生產(chǎn)者消費(fèi)者模式

2022-08-02 10:01:42

架構(gòu)

2021-07-08 07:16:24

RocketMQ數(shù)據(jù)結(jié)構(gòu)Message

2022-07-07 09:00:49

RocketMQ消費(fèi)者消息消費(fèi)

2018-09-26 06:45:23

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)