Kafka放棄Zookeeper后如何存儲(chǔ)主題與消費(fèi)組呢?
由于筆者公司目前使用的kafka版本是2.2.1,故當(dāng)下關(guān)于kafka的內(nèi)核研究目前主要是基于該版本,當(dāng)然該專欄還會(huì)繼續(xù)關(guān)注Kafka3.0。
我在使用kafka時(shí)發(fā)現(xiàn)客戶端可以不依賴Zookeeper的情況下完成消息發(fā)送、消息消費(fèi),眾所周知早期的Kafka,所有的元信息(topic、消費(fèi)組、集群)等信息都存儲(chǔ)在Zookeeper中,原先的消息發(fā)送客戶端、消息消費(fèi)客戶端都需要依賴Zookeeper。
溫馨提示:Kafka逐步開啟了去zookeeper化,到kafka2.8之前實(shí)現(xiàn)了消息發(fā)送者、消息消費(fèi)者的去zookeeper化,從2.8版之后broker也支持去zookeeper。
那kafka2.2.1版本中,主題的路由信息、消費(fèi)組信息分別是存儲(chǔ)在什么地方呢?消息發(fā)送端、消息消費(fèi)端是如何感知的呢?
溫馨提示:如果大家對(duì)Kafka有基本的了解,不防停留片刻,稍作思考。
1.主題元數(shù)據(jù)存儲(chǔ)在Zookeeper中
進(jìn)入到Kafka Broker連接的Zookeeper集群,我們不難發(fā)現(xiàn)在 /{namespace}/brokers/topics節(jié)點(diǎn)下存在該集群中所有的主題信息,展開某一個(gè)具體的主題,如下圖所示:
關(guān)于主題的元信息,其實(shí)主要包括如下信息:
- 分區(qū)數(shù)量 每一個(gè)具體topic下會(huì)有一個(gè)partitions節(jié)點(diǎn),該節(jié)點(diǎn)下的每一個(gè)子節(jié)點(diǎn)代表一個(gè)分區(qū)。
- 分區(qū)狀態(tài)信息 每一個(gè)分區(qū)的的狀態(tài)由葉子節(jié)點(diǎn) /{namespace}/brokers/topics/{topicName}/parttions/{partNO}/state表示,存儲(chǔ)的內(nèi)容如下:
controller_epoch 控制器當(dāng)前的選舉版本。
leader 該分區(qū)的Leader所在的Broker節(jié)點(diǎn)ID。
version 當(dāng)前的存儲(chǔ)格式版本,默認(rèn)為1。
leader_epoch 分區(qū)Leader的選舉版本。
isr 分區(qū)的ISR集合。
主題的路由信息是存儲(chǔ)在Zookeeper中,那為什么客戶端只需要Broker的地址,就可以獲取到主題的路由信息呢?
1.1 主題路由尋址
查找路由信息在Kafka2.1版本中是發(fā)送ApiKeys.METADATA請(qǐng)求,該請(qǐng)求的響應(yīng)邏輯定義在Broker中,那客戶端是如何對(duì)Broker進(jìn)行路由,Broker中的路由信息又是從何而來呢?
消息發(fā)送者首次發(fā)送METADATA定位Broker機(jī)制:首次發(fā)送請(qǐng)求會(huì)從KafkaProducer的bootstrap.servers中設(shè)置的broker列表中選擇當(dāng)前最空閑的Broker,后續(xù)能感知所有的Broker。
消息消費(fèi)者發(fā)送METADATA定位Broker機(jī)制:發(fā)送到當(dāng)前消費(fèi)組的組協(xié)調(diào)所在的Broker。
根據(jù)查閱KafkaApis的handleTopicMetadataRequest方法,進(jìn)行一些ACL校驗(yàn)后進(jìn)入其核心方法:
關(guān)鍵點(diǎn):
- 從MetadataCache中獲取topic到路由信息。
- 如果MetadataCache中不存在指定topic的路由信息,如果Broker允許自動(dòng)創(chuàng)建主題(auto.create.topics.enable),默認(rèn)為true,則自動(dòng)創(chuàng)建該主題的信息,并將主題信息寫入到zookeeper,具體操作:
在/brokers/topics節(jié)點(diǎn)下創(chuàng)建子節(jié)點(diǎn),子節(jié)點(diǎn)名稱為topic的名稱。
根據(jù)當(dāng)前kafka分區(qū)的機(jī)架信息,分區(qū)數(shù)、副本數(shù),broker節(jié)點(diǎn)數(shù),進(jìn)行分配,主要盡量將主分區(qū)不放在同一個(gè)機(jī)架、存儲(chǔ)在主題的節(jié)點(diǎn)信息中,例如{"version":1,"partitions":{"4":[2,0,1],"5":[0,1,2],"1":[2,1,0],"0":[1,0,2],"2":[0,2,1],"3":[1,2,0]}},其中key為分區(qū)名稱,值為副本所在的brokerId,其中排在第一位是傾向性Leader,主題中存儲(chǔ)的值是靜態(tài)數(shù)據(jù),具體還會(huì)觸發(fā)選舉,選舉算法會(huì)參考這個(gè)分配。
控制器還會(huì)注冊(cè)調(diào)用registerPartitionModificationsHandlers方法,監(jiān)聽主題信息的變化,從而觸發(fā)后續(xù)流程,啟動(dòng)分區(qū)的真正創(chuàng)建(各個(gè)分區(qū)的Leader選舉等)。
溫馨提示:Kafka開啟自動(dòng)創(chuàng)建主題,分區(qū)數(shù)量取自kafka broker中的num.partitions參數(shù),默認(rèn)為1,副本因子則取決于default.replication.factor參數(shù),默認(rèn)為1。
1.2 路由信息同步機(jī)制
MetadataCache,元信息緩存,那這里的數(shù)據(jù)又是從何而來呢?MetadataCache中路由信息的更新調(diào)用鏈如下圖所示:
Kafka的KafkaController(后續(xù)統(tǒng)稱控制器)首先會(huì)聽/brokers/topics/{topicName}節(jié)點(diǎn)內(nèi)容的變化,一旦有新主題創(chuàng)建或主題信息變更,topic變更事件就會(huì)觸發(fā),此時(shí)TopicChange的process方法會(huì)調(diào)用,最終調(diào)用updatePartitionReplicaAssignment,也就是一旦主題的信息發(fā)生變更,控制器會(huì)向所有Broker節(jié)點(diǎn)發(fā)送ApiKeys.UPDATE_METADATA,各個(gè)Broker在到該請(qǐng)求后,會(huì)更新各個(gè)Broker中的內(nèi)存緩存,供消息發(fā)送者查找topic路由信息。
即Kafka2.2版本中,topic的元信息存儲(chǔ)在Zookeeper中,同時(shí)Kafka Controller會(huì)監(jiān)聽zookeeper中相關(guān)節(jié)點(diǎn),從而感知信息變更,從而將路由信息通過RPC發(fā)送到集群內(nèi)所有的Broker中,故每一個(gè)Broker的內(nèi)存中都存儲(chǔ)一份相同的路由信息。
Kafka2.8版本開始嘗試去Zookeeper化。
思考題:為什么各個(gè)Broker不都監(jiān)聽zookeeper,從而感知topic變化,更新本地內(nèi)存呢?歡迎各位留言討論或私信dingwpmz,共同交流。
2.消費(fèi)組存儲(chǔ)在位點(diǎn)主題中
在較低版本中,啟動(dòng)Kafka消費(fèi)組需要指定zookeeper集群的地址,因?yàn)樵诘桶姹局邢M(fèi)組的元信息存儲(chǔ)在zookeeper中,具體路徑為/consumers,但后續(xù)版本中消費(fèi)端的啟動(dòng)已經(jīng)不需指定zookeeper,而是指定broker的地址列表即可,那這個(gè)時(shí)候,消費(fèi)組的信息是存儲(chǔ)在哪呢?
在前面介紹Kafka故障解決相關(guān)的文章中我們常??吹较M(fèi)組組協(xié)調(diào)器,內(nèi)部持有一個(gè)消費(fèi)組元數(shù)據(jù)管理器GroupMetadataManager,相關(guān)的代碼截圖如下所示:
在GroupMetadataManager對(duì)象中持有一個(gè)Map結(jié)構(gòu)的緩存,其鍵為消費(fèi)組的名稱,值為GroupMetadata對(duì)象,內(nèi)部記錄消費(fèi)組的狀態(tài),消費(fèi)組的成員列表,位點(diǎn)信息。
內(nèi)存的特點(diǎn):訪問高效,但隨著Broker進(jìn)程的退出而丟失,消費(fèi)組存儲(chǔ)在內(nèi)存中顯然不行,但又不在zookeeper中,那消費(fèi)組的定義信息存儲(chǔ)在什么地方呢?
2.1消費(fèi)組元信息存儲(chǔ)
消費(fèi)組的定義信息存儲(chǔ)在系統(tǒng)主題__consumer_offsets中,什么,這個(gè)主題不是用來存儲(chǔ)消費(fèi)位點(diǎn)的嗎?
原來__consumer_offsets不僅存儲(chǔ)消費(fèi)組的位點(diǎn)信息,還存儲(chǔ)消費(fèi)組的元信息,具體代碼入口:GroupMetadataManager#storeGroup,部分代碼截圖如下所示:
即消費(fèi)組元信息當(dāng)成一條消息寫入到__consumer_offsets,一條消費(fèi)組元信息存儲(chǔ)的value值,由GroupMetadataManager的groupMetadataValue方法定義,具體代碼如下:
隨著Kafka的不斷演化,存儲(chǔ)格式進(jìn)行了多次修改,對(duì)應(yīng)的版本如下:
- V0:Kafka 0.10級(jí)以下版本
- V1:大于 0.10,低于等于2.1版本。
- V2:2.2版本及以后
消費(fèi)組元信息存儲(chǔ)的格式為Json,具體存儲(chǔ)的內(nèi)容:
- protocol_type 協(xié)議版本,取自AbstractCoordinator的抽象方法protocolType(),消費(fèi)組的固定為:consumer。
- generation 消費(fèi)組元信息的版本號(hào),每發(fā)生一次消費(fèi)組重平衡,該值會(huì)加一。
- protocol 協(xié)議內(nèi)容,存儲(chǔ)消費(fèi)組的隊(duì)列負(fù)載算法,在構(gòu)建消費(fèi)者時(shí)可通過partition.assignment.strategy參數(shù)傳遞,可以傳遞多個(gè),消費(fèi)組具體的負(fù)載算法會(huì)選擇每一個(gè)消費(fèi)者都支持的協(xié)議進(jìn)行隊(duì)列負(fù)載,默認(rèn)的負(fù)載算法為RangeAssignor。
- leader 當(dāng)前消費(fèi)組的Leader,通常為第一個(gè)加入該消費(fèi)組的消費(fèi)者。
- current_state_timestamp 最新狀態(tài)變更的時(shí)間戳,該值是從V2版本開始引入。
- members 消費(fèi)組的成員信息,每一個(gè)成員信息存儲(chǔ)的信息如下:
- member_id 成員id,客戶端id(clientId) + uuid。
client_id 客戶端ID。
client_host 客戶端ip地址。
rebalance_timeout 重平衡時(shí)間,默認(rèn)為300000,5分鐘。
session_timeout 會(huì)話超時(shí)時(shí)間,默認(rèn)為10s。
subscription 元信息,取自AbstractCoordinator的抽象方法metadata(),消費(fèi)組的實(shí)現(xiàn)類為ConsumerCoordinator,主要是遍歷負(fù)載算法,每一個(gè)負(fù)載算法根據(jù)訂閱信息計(jì)算元信息。
assignment
各個(gè)消費(fèi)者的隊(duì)列負(fù)載情況。
溫馨提示:GroupMetadataManager的storeGroup方法的調(diào)用時(shí)間是在消費(fèi)組進(jìn)行重平衡時(shí),具體是重平衡第二階段(SYNC_GROUP)與完成重平衡。
2.2加載消息組元信息
消費(fèi)組元信息是存儲(chǔ)在 __consumer_offsets主題中,在什么時(shí)候會(huì)從該主題中加載到內(nèi)存中呢?
在__consumer_offsets的分區(qū)發(fā)生Leader選舉時(shí)會(huì)觸發(fā)將對(duì)應(yīng)分區(qū)中的數(shù)據(jù)加載到內(nèi)存,具體的處理入口在KafkaApis的handleLeaderAndIsrRequest方法,簡(jiǎn)易調(diào)用鏈如下圖所示:
3.總結(jié)
本文主要介紹了Kafka 主題與消費(fèi)組的持久化機(jī)制,在Kafka2.8版本開始,官方逐步去除對(duì)Zookeeper的依賴,那kafka3.x之后,又會(huì)是如何存儲(chǔ)消費(fèi)組、主題的信息呢?