自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Kafka放棄Zookeeper后如何存儲(chǔ)主題與消費(fèi)組呢?

云計(jì)算 Kafka
主題的路由信息是存儲(chǔ)在Zookeeper中,那為什么客戶端只需要Broker的地址,就可以獲取到主題的路由信息呢?

由于筆者公司目前使用的kafka版本是2.2.1,故當(dāng)下關(guān)于kafka的內(nèi)核研究目前主要是基于該版本,當(dāng)然該專欄還會(huì)繼續(xù)關(guān)注Kafka3.0。

我在使用kafka時(shí)發(fā)現(xiàn)客戶端可以不依賴Zookeeper的情況下完成消息發(fā)送、消息消費(fèi),眾所周知早期的Kafka,所有的元信息(topic、消費(fèi)組、集群)等信息都存儲(chǔ)在Zookeeper中,原先的消息發(fā)送客戶端、消息消費(fèi)客戶端都需要依賴Zookeeper。

溫馨提示:Kafka逐步開啟了去zookeeper化,到kafka2.8之前實(shí)現(xiàn)了消息發(fā)送者、消息消費(fèi)者的去zookeeper化,從2.8版之后broker也支持去zookeeper。

那kafka2.2.1版本中,主題的路由信息、消費(fèi)組信息分別是存儲(chǔ)在什么地方呢?消息發(fā)送端、消息消費(fèi)端是如何感知的呢?

溫馨提示:如果大家對(duì)Kafka有基本的了解,不防停留片刻,稍作思考。

1.主題元數(shù)據(jù)存儲(chǔ)在Zookeeper中

進(jìn)入到Kafka Broker連接的Zookeeper集群,我們不難發(fā)現(xiàn)在 /{namespace}/brokers/topics節(jié)點(diǎn)下存在該集群中所有的主題信息,展開某一個(gè)具體的主題,如下圖所示:

關(guān)于主題的元信息,其實(shí)主要包括如下信息:

  • 分區(qū)數(shù)量 每一個(gè)具體topic下會(huì)有一個(gè)partitions節(jié)點(diǎn),該節(jié)點(diǎn)下的每一個(gè)子節(jié)點(diǎn)代表一個(gè)分區(qū)。
  • 分區(qū)狀態(tài)信息 每一個(gè)分區(qū)的的狀態(tài)由葉子節(jié)點(diǎn) /{namespace}/brokers/topics/{topicName}/parttions/{partNO}/state表示,存儲(chǔ)的內(nèi)容如下:

controller_epoch 控制器當(dāng)前的選舉版本。

leader 該分區(qū)的Leader所在的Broker節(jié)點(diǎn)ID。

version 當(dāng)前的存儲(chǔ)格式版本,默認(rèn)為1。

leader_epoch 分區(qū)Leader的選舉版本。

isr 分區(qū)的ISR集合。

主題的路由信息是存儲(chǔ)在Zookeeper中,那為什么客戶端只需要Broker的地址,就可以獲取到主題的路由信息呢?

1.1 主題路由尋址

查找路由信息在Kafka2.1版本中是發(fā)送ApiKeys.METADATA請(qǐng)求,該請(qǐng)求的響應(yīng)邏輯定義在Broker中,那客戶端是如何對(duì)Broker進(jìn)行路由,Broker中的路由信息又是從何而來呢?

消息發(fā)送者首次發(fā)送METADATA定位Broker機(jī)制:首次發(fā)送請(qǐng)求會(huì)從KafkaProducer的bootstrap.servers中設(shè)置的broker列表中選擇當(dāng)前最空閑的Broker,后續(xù)能感知所有的Broker。

消息消費(fèi)者發(fā)送METADATA定位Broker機(jī)制:發(fā)送到當(dāng)前消費(fèi)組的組協(xié)調(diào)所在的Broker。

根據(jù)查閱KafkaApis的handleTopicMetadataRequest方法,進(jìn)行一些ACL校驗(yàn)后進(jìn)入其核心方法:

關(guān)鍵點(diǎn):

  • 從MetadataCache中獲取topic到路由信息。
  • 如果MetadataCache中不存在指定topic的路由信息,如果Broker允許自動(dòng)創(chuàng)建主題(auto.create.topics.enable),默認(rèn)為true,則自動(dòng)創(chuàng)建該主題的信息,并將主題信息寫入到zookeeper,具體操作:

在/brokers/topics節(jié)點(diǎn)下創(chuàng)建子節(jié)點(diǎn),子節(jié)點(diǎn)名稱為topic的名稱。

根據(jù)當(dāng)前kafka分區(qū)的機(jī)架信息,分區(qū)數(shù)、副本數(shù),broker節(jié)點(diǎn)數(shù),進(jìn)行分配,主要盡量將主分區(qū)不放在同一個(gè)機(jī)架、存儲(chǔ)在主題的節(jié)點(diǎn)信息中,例如{"version":1,"partitions":{"4":[2,0,1],"5":[0,1,2],"1":[2,1,0],"0":[1,0,2],"2":[0,2,1],"3":[1,2,0]}},其中key為分區(qū)名稱,值為副本所在的brokerId,其中排在第一位是傾向性Leader,主題中存儲(chǔ)的值是靜態(tài)數(shù)據(jù),具體還會(huì)觸發(fā)選舉,選舉算法會(huì)參考這個(gè)分配。

控制器還會(huì)注冊(cè)調(diào)用registerPartitionModificationsHandlers方法,監(jiān)聽主題信息的變化,從而觸發(fā)后續(xù)流程,啟動(dòng)分區(qū)的真正創(chuàng)建(各個(gè)分區(qū)的Leader選舉等)。

溫馨提示:Kafka開啟自動(dòng)創(chuàng)建主題,分區(qū)數(shù)量取自kafka broker中的num.partitions參數(shù),默認(rèn)為1,副本因子則取決于default.replication.factor參數(shù),默認(rèn)為1。

1.2 路由信息同步機(jī)制

MetadataCache,元信息緩存,那這里的數(shù)據(jù)又是從何而來呢?MetadataCache中路由信息的更新調(diào)用鏈如下圖所示:

Kafka的KafkaController(后續(xù)統(tǒng)稱控制器)首先會(huì)聽/brokers/topics/{topicName}節(jié)點(diǎn)內(nèi)容的變化,一旦有新主題創(chuàng)建或主題信息變更,topic變更事件就會(huì)觸發(fā),此時(shí)TopicChange的process方法會(huì)調(diào)用,最終調(diào)用updatePartitionReplicaAssignment,也就是一旦主題的信息發(fā)生變更,控制器會(huì)向所有Broker節(jié)點(diǎn)發(fā)送ApiKeys.UPDATE_METADATA,各個(gè)Broker在到該請(qǐng)求后,會(huì)更新各個(gè)Broker中的內(nèi)存緩存,供消息發(fā)送者查找topic路由信息。

即Kafka2.2版本中,topic的元信息存儲(chǔ)在Zookeeper中,同時(shí)Kafka Controller會(huì)監(jiān)聽zookeeper中相關(guān)節(jié)點(diǎn),從而感知信息變更,從而將路由信息通過RPC發(fā)送到集群內(nèi)所有的Broker中,故每一個(gè)Broker的內(nèi)存中都存儲(chǔ)一份相同的路由信息。

Kafka2.8版本開始嘗試去Zookeeper化。

思考題:為什么各個(gè)Broker不都監(jiān)聽zookeeper,從而感知topic變化,更新本地內(nèi)存呢?歡迎各位留言討論或私信dingwpmz,共同交流。

2.消費(fèi)組存儲(chǔ)在位點(diǎn)主題中

在較低版本中,啟動(dòng)Kafka消費(fèi)組需要指定zookeeper集群的地址,因?yàn)樵诘桶姹局邢M(fèi)組的元信息存儲(chǔ)在zookeeper中,具體路徑為/consumers,但后續(xù)版本中消費(fèi)端的啟動(dòng)已經(jīng)不需指定zookeeper,而是指定broker的地址列表即可,那這個(gè)時(shí)候,消費(fèi)組的信息是存儲(chǔ)在哪呢?

在前面介紹Kafka故障解決相關(guān)的文章中我們常??吹较M(fèi)組組協(xié)調(diào)器,內(nèi)部持有一個(gè)消費(fèi)組元數(shù)據(jù)管理器GroupMetadataManager,相關(guān)的代碼截圖如下所示:

在GroupMetadataManager對(duì)象中持有一個(gè)Map結(jié)構(gòu)的緩存,其鍵為消費(fèi)組的名稱,值為GroupMetadata對(duì)象,內(nèi)部記錄消費(fèi)組的狀態(tài),消費(fèi)組的成員列表,位點(diǎn)信息。

內(nèi)存的特點(diǎn):訪問高效,但隨著Broker進(jìn)程的退出而丟失,消費(fèi)組存儲(chǔ)在內(nèi)存中顯然不行,但又不在zookeeper中,那消費(fèi)組的定義信息存儲(chǔ)在什么地方呢?

2.1消費(fèi)組元信息存儲(chǔ)

消費(fèi)組的定義信息存儲(chǔ)在系統(tǒng)主題__consumer_offsets中,什么,這個(gè)主題不是用來存儲(chǔ)消費(fèi)位點(diǎn)的嗎?

原來__consumer_offsets不僅存儲(chǔ)消費(fèi)組的位點(diǎn)信息,還存儲(chǔ)消費(fèi)組的元信息,具體代碼入口:GroupMetadataManager#storeGroup,部分代碼截圖如下所示:

即消費(fèi)組元信息當(dāng)成一條消息寫入到__consumer_offsets,一條消費(fèi)組元信息存儲(chǔ)的value值,由GroupMetadataManager的groupMetadataValue方法定義,具體代碼如下:

隨著Kafka的不斷演化,存儲(chǔ)格式進(jìn)行了多次修改,對(duì)應(yīng)的版本如下:

  • V0:Kafka 0.10級(jí)以下版本
  • V1:大于 0.10,低于等于2.1版本。
  • V2:2.2版本及以后

消費(fèi)組元信息存儲(chǔ)的格式為Json,具體存儲(chǔ)的內(nèi)容:

  • protocol_type 協(xié)議版本,取自AbstractCoordinator的抽象方法protocolType(),消費(fèi)組的固定為:consumer。
  • generation 消費(fèi)組元信息的版本號(hào),每發(fā)生一次消費(fèi)組重平衡,該值會(huì)加一。
  • protocol 協(xié)議內(nèi)容,存儲(chǔ)消費(fèi)組的隊(duì)列負(fù)載算法,在構(gòu)建消費(fèi)者時(shí)可通過partition.assignment.strategy參數(shù)傳遞,可以傳遞多個(gè),消費(fèi)組具體的負(fù)載算法會(huì)選擇每一個(gè)消費(fèi)者都支持的協(xié)議進(jìn)行隊(duì)列負(fù)載,默認(rèn)的負(fù)載算法為RangeAssignor。
  • leader 當(dāng)前消費(fèi)組的Leader,通常為第一個(gè)加入該消費(fèi)組的消費(fèi)者。
  • current_state_timestamp 最新狀態(tài)變更的時(shí)間戳,該值是從V2版本開始引入。
  • members 消費(fèi)組的成員信息,每一個(gè)成員信息存儲(chǔ)的信息如下:
  • member_id 成員id,客戶端id(clientId) + uuid。

client_id 客戶端ID。

client_host 客戶端ip地址。

rebalance_timeout 重平衡時(shí)間,默認(rèn)為300000,5分鐘。

session_timeout 會(huì)話超時(shí)時(shí)間,默認(rèn)為10s。

subscription 元信息,取自AbstractCoordinator的抽象方法metadata(),消費(fèi)組的實(shí)現(xiàn)類為ConsumerCoordinator,主要是遍歷負(fù)載算法,每一個(gè)負(fù)載算法根據(jù)訂閱信息計(jì)算元信息。

assignment

各個(gè)消費(fèi)者的隊(duì)列負(fù)載情況。

溫馨提示:GroupMetadataManager的storeGroup方法的調(diào)用時(shí)間是在消費(fèi)組進(jìn)行重平衡時(shí),具體是重平衡第二階段(SYNC_GROUP)與完成重平衡。

2.2加載消息組元信息

消費(fèi)組元信息是存儲(chǔ)在 __consumer_offsets主題中,在什么時(shí)候會(huì)從該主題中加載到內(nèi)存中呢?

在__consumer_offsets的分區(qū)發(fā)生Leader選舉時(shí)會(huì)觸發(fā)將對(duì)應(yīng)分區(qū)中的數(shù)據(jù)加載到內(nèi)存,具體的處理入口在KafkaApis的handleLeaderAndIsrRequest方法,簡(jiǎn)易調(diào)用鏈如下圖所示:

3.總結(jié)

本文主要介紹了Kafka 主題與消費(fèi)組的持久化機(jī)制,在Kafka2.8版本開始,官方逐步去除對(duì)Zookeeper的依賴,那kafka3.x之后,又會(huì)是如何存儲(chǔ)消費(fèi)組、主題的信息呢?

責(zé)任編輯:武曉燕 來源: 中間件興趣圈
相關(guān)推薦

2022-05-10 15:24:34

KafkaZooKeeperKafka Raft

2021-04-21 12:29:45

KafkaZookeeper模型

2020-11-13 10:58:24

Kafka

2020-09-30 14:07:05

Kafka心跳機(jī)制API

2020-12-04 10:31:56

組消費(fèi)分區(qū)Kafka

2021-07-12 10:25:03

RocketMQ數(shù)據(jù)結(jié)構(gòu)kafka

2024-09-23 20:55:04

2021-04-30 08:29:16

KafkaZooKeeper分布式

2023-08-27 21:51:50

Kafka數(shù)據(jù)庫數(shù)據(jù)存儲(chǔ)

2023-11-27 17:29:43

Kafka全局順序性

2024-10-29 11:08:23

2019-12-16 09:37:19

Kafka架構(gòu)數(shù)據(jù)

2021-02-01 07:20:51

KafkaPulsar搜索

2023-06-01 08:08:38

kafka消費(fèi)者分區(qū)策略

2024-10-17 16:41:57

KafkaZooKeeper

2025-03-25 07:54:15

2021-04-28 18:28:44

KafkaZookeeper

2021-05-20 09:06:20

KafkaZookeeper分布式

2018-07-26 15:18:41

阿里JavaKafka架構(gòu)

2012-01-05 15:22:01

思科
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)