小紅書一面:Kafka 是如何選擇 Leader 的?
Kafka作為一款優(yōu)秀的分布式消息中間件,內(nèi)部也存在一些選舉機(jī)制,這篇文章,我們將詳細(xì)地分析 Kafka如何實(shí)現(xiàn)選擇 Leader?
一、Kafka集群整體架構(gòu)
Kafka集群是由多個(gè) Kafka Broker通過連同一個(gè) Zookeeper組成,整個(gè)架構(gòu)可以抽象成下圖:
在 Kafka中,數(shù)據(jù)以 Topic的形式組織,每個(gè)主題又被劃分為多個(gè)分區(qū)(Partition),每個(gè)分區(qū)的數(shù)據(jù)在 Broker之間有多個(gè)副本(Replica),保證數(shù)據(jù)的高可用和持久性。
二、Controller的作用
Kafka Controller是一個(gè)特殊的 Broker實(shí)例,它負(fù)責(zé) Kafka集群中的領(lǐng)導(dǎo)者選舉、分區(qū)的分配、以及在 Broker上下線期間重新分配 Leader和副本。Controller通過與 Zookeeper交互來感知集群狀態(tài)的變化,從而進(jìn)行必要的領(lǐng)導(dǎo)者重新選舉。
三、選主的原理分析
1.Leader的概念
在 Kafka中,每個(gè)分區(qū)會(huì)有多個(gè)副本,其中只有一個(gè)副本是Leader,其他副本為Follower。Producer和 Consumer會(huì)向分區(qū)的Leader寫入或讀取數(shù)據(jù),F(xiàn)ollower從 Leader復(fù)制數(shù)據(jù)。這樣設(shè)計(jì)實(shí)現(xiàn)了高吞吐量的同時(shí)保證了數(shù)據(jù)的冗余。
2.選主過程
選主過程主要包括兩個(gè)方面:Controller選舉和分區(qū)Leader選舉。
(1) Controller選舉
在 Kafka啟動(dòng)時(shí),會(huì)注冊(cè)到 Zookeeper的/brokers/ids的路徑下,其中會(huì)有一個(gè) Broker節(jié)點(diǎn)通過與 Zookeeper的交互被選舉為 Controller。具體而言,Brokers通過在 Zookeeper的/controller路徑嘗試創(chuàng)建一個(gè)臨時(shí)節(jié)點(diǎn)(ephemeral node)來競(jìng)爭(zhēng)成為 Controller,選舉規(guī)則也很簡(jiǎn)單,誰(shuí)先注冊(cè)到 Zookeeper中的/controller節(jié)點(diǎn),誰(shuí)就是 Controller。
當(dāng)當(dāng)前Controller失效(如宕機(jī)或網(wǎng)絡(luò)問題)時(shí),Zookeeper會(huì)刪除/controller節(jié)點(diǎn),其他Broker會(huì)再次競(jìng)爭(zhēng),該過程保證了Controller的高可用。
(2) 分區(qū)Leader選舉
一旦一個(gè) Broker成為 Controller,它會(huì)獲取所有分區(qū)的最新信息,并基于持久化在 Zookeeper的數(shù)據(jù)進(jìn)行當(dāng)前各分區(qū) Leader的選舉。Controller使用 ISR(In-Sync Replica)列表,即當(dāng)前與 Leader保持同步的所有Follower副本進(jìn)行選主。默認(rèn)情況下,ISR中第一個(gè)副本被選為新的 Leader。
比如上圖中 TopicA中的 Partition0號(hào)分區(qū),選擇 broker0作為 Leader, 然后會(huì)將選擇的節(jié)點(diǎn)信息注冊(cè)到 Zookeeper的/brokers/topics路徑下,記錄誰(shuí)是 Leader,有哪些服務(wù)器可用。
Kafka 在實(shí)現(xiàn) Controller選舉方面采用了一種基于 Zookeeper的機(jī)制,這種機(jī)制充分利用了 Zookeeper的特性來確保集群的高可用性和一致性,接下來,我將深入解析這兩種選主的機(jī)制。
四、Controller選舉機(jī)制詳解
1. Zookeeper的Role
Zookeeper在Kafka中作為一個(gè)分布式協(xié)調(diào)服務(wù),其負(fù)責(zé)維護(hù)集群的元數(shù)據(jù)信息,包括Kafka節(jié)點(diǎn)的活動(dòng)狀態(tài)和每個(gè)分區(qū)的Replica信息。在Controller選舉過程中,Zookeeper充當(dāng)著協(xié)調(diào)者的角色,利用其特有的臨時(shí)節(jié)點(diǎn)機(jī)制來實(shí)現(xiàn)一個(gè)分布式的鎖。
2. 臨時(shí)節(jié)點(diǎn)
在Zookeeper中,臨時(shí)節(jié)點(diǎn)(Ephemeral Znode)是一個(gè)重要的特性,這種節(jié)點(diǎn)在客戶端會(huì)話結(jié)束時(shí)自動(dòng)被刪除。Kafka利用這一特性實(shí)現(xiàn)Controller的自動(dòng)化選舉。
3. Controller選舉過程
Kafka的Controller選舉過程主要分為以下幾個(gè)步驟:
- 初始化: 當(dāng)Kafka Broker啟動(dòng)時(shí),所有Broker都試圖成為Controller。每個(gè)Broker會(huì)進(jìn)行一次自檢,初始化必要的Controller管理器和相關(guān)結(jié)構(gòu)。
- 創(chuàng)建Zookeeper路徑: 每個(gè)Broker嘗試在Zookeeper的特定路徑(通常是/controller)下創(chuàng)建一個(gè)臨時(shí)節(jié)點(diǎn)。該節(jié)點(diǎn)的路徑即為Zookeeper中控制選舉的關(guān)鍵路徑。
- 競(jìng)爭(zhēng)鎖: 因?yàn)榕R時(shí)節(jié)點(diǎn)的特性,只有第一個(gè)成功創(chuàng)建的節(jié)點(diǎn)會(huì)存在于Zookeeper。因此,能創(chuàng)建成功的Broker就會(huì)成為當(dāng)前集群的Controller。這相當(dāng)于分布式鎖機(jī)制,誰(shuí)獲取到鎖誰(shuí)成為Controller。
- 故障處理與重新選舉: 如果當(dāng)前的Controller(持有Zookeeper節(jié)點(diǎn)的Broker)崩潰或因網(wǎng)絡(luò)問題與Zookeeper斷開連接,Zookeeper會(huì)自動(dòng)刪除該Broker創(chuàng)建的臨時(shí)節(jié)點(diǎn)。剩余的Broker會(huì)監(jiān)聽這個(gè)節(jié)點(diǎn)的變化(通過Zookeeper的Watcher機(jī)制),當(dāng)節(jié)點(diǎn)被刪除時(shí),會(huì)重新發(fā)起競(jìng)爭(zhēng),確保能夠快速選出一個(gè)新的Controller。
4. 實(shí)現(xiàn)細(xì)節(jié)
從實(shí)現(xiàn)的角度來看,我們可以看看 Kafka的相關(guān)主要類和方法涉及的過程:
- Zookeeper客戶端初始化: 初始化時(shí),Kafka的KafkaController類通過Zookeeper客戶端來與ZooKeeper服務(wù)建立連接,這是基礎(chǔ)。
- Controller路徑定義: 在Kafka源碼中,通常由ControllerZNodePaths.CONTROLLER_PATH常量定義Controller路徑。
在ControllerEventManager類中,核心的方法參與Zookeeper節(jié)點(diǎn)的創(chuàng)建與監(jiān)聽:
public void onControllerFailover() {
try {
// 嘗試在Zookeeper創(chuàng)建臨時(shí)節(jié)點(diǎn)
zkClient.createEphemeralPathExpectConflictHandleZnode(
ControllerZNodePaths.CONTROLLER_PATH,
controllerString(),
onControllerFailover);
// 設(shè)置Controller監(jiān)聽器
zkClient.subscribeDataChanges(ControllerZNodePaths.CONTROLLER_PATH, new ControllerChangeListener());
} catch (Exception e) {
// 異常處理
}
}
在上述代碼段中,展示了當(dāng)一個(gè) Broker準(zhǔn)備競(jìng)選為Controller時(shí),他會(huì)在Zookeeper的/controller路徑創(chuàng)建一個(gè)臨時(shí)節(jié)點(diǎn),并設(shè)置對(duì)該節(jié)點(diǎn)變化的監(jiān)聽器。
5. 監(jiān)聽機(jī)制
每個(gè)Broker通過設(shè)置Watcher來監(jiān)聽/controller節(jié)點(diǎn)的刪除事件。一旦現(xiàn)有Controller的連接丟失,所有的Broker都會(huì)收到這個(gè)事件通知。這個(gè)機(jī)制確保了在現(xiàn)有Controller失效時(shí),能夠迅速選出新的Controller。
6. Leader和集群的穩(wěn)定性
一旦新的Controller被選出,它就會(huì)獲取集群的元數(shù)據(jù),并開始執(zhí)行其職責(zé),包括領(lǐng)導(dǎo)者重新選舉和分區(qū)管理等操作。為了確保集群狀態(tài)的一致性和穩(wěn)定性,Controller必須在全面獲取并更新當(dāng)前集群狀態(tài)后才能完全上線。
五、分區(qū)Leader選舉詳解
當(dāng)然,Kafka中的分區(qū)Leader選舉是確保數(shù)據(jù)高可用性和一致性的關(guān)鍵機(jī)制之一。讓我們更詳細(xì)地探討一下這一過程,包括其觸發(fā)條件、具體步驟和相關(guān)代碼實(shí)現(xiàn)細(xì)節(jié)。
1.分區(qū)Leader選舉的觸發(fā)條件
分區(qū)Leader選舉主要在以下幾種情況下被觸發(fā):
- Broker新增或宕機(jī):當(dāng)一個(gè)Broker加入集群或者從集群中失聯(lián)(掉線)時(shí),需要重新分配分區(qū)的Leader。
- ISR(In-Sync Replica)變化:ISR列表中的Replica發(fā)生變化,比如某個(gè)Replica落后過多或恢復(fù)同步。
- Controller切換:如果當(dāng)前的Controller失效,新Controller上線后需要重新確認(rèn)并分配分區(qū)的Leader。
2.選舉的具體步驟
分區(qū)Leader選舉過程主要涉及以下幾步:
(1) 獲取分區(qū)信息
一旦選舉觸發(fā),Controller需獲取每個(gè)分區(qū)的元數(shù)據(jù)信息,包括:
- 當(dāng)前Leader。
- ISR列表(保持與Leader同步的副本集合)。
- 所有分區(qū)的Replica列表。
(2) 確定新Leader
Controller根據(jù)ISR列表來選擇新的Leader,通常選擇第一個(gè)Replica作為新的Leader,這樣保證選擇的是同步的且相對(duì)最新的副本。此外,Kafka允許通過配置參數(shù)自定義選舉策略,確保更靈活地處理特殊場(chǎng)景。
(3) 更新Zookeeper
選出新的Leader后,需將這個(gè)新的Leader信息更新到Zookeeper,這包括更新分區(qū)的Leader和ISR信息。此步驟確保其他Broker即使在Controller切換的情況下,也能從Zookeeper獲取到正確的分區(qū)Leader信息。
(4) 通知各Broker
更新完Zookeeper后,Controller通過向集群中其他Broker發(fā)送Leader和ISR更新信息,通知它們?cè)摲謪^(qū)的Leader已發(fā)生改變。這涉及使用Kafka的內(nèi)部API向其他Broker推送集群狀態(tài)變更。
3.核心代碼分析
以下是分區(qū)Leader選舉過程中的一些核心代碼實(shí)現(xiàn)示例:
(1) 獲取ISR列表
public List<Integer> getIsrForPartition(Partition partition) {
// 獲取分區(qū)的ISR列表
return partition.getIsr();
}
ISR列表的獲取是選舉過程中的基礎(chǔ)步驟,確保后續(xù)的Leader選舉從一致的數(shù)據(jù)集合中挑選。
(2) 選舉新Leader
下面的代碼展示了如何選擇 Leader,通過從 ISR中挑選第一個(gè)節(jié)點(diǎn)作為新 Leader,確保選擇的 Leader始終是最新同步過的一個(gè)。
public int selectNewLeader(/* some parameters */) {
List<Integer> isr = currentIsr(partition);
if (isr.isEmpty()) {
throw new IllegalStateException("ISR should not be empty");
}
// 默認(rèn)選擇ISR列表中的第一個(gè)
int newLeader = isr.get(0);
// 更新新Leader信息到Zookeeper
zkClient.updateLeaderAndIsr(newLeader, isr);
return newLeader;
}
(3) 更新到Zookeeper
public void updateLeaderAndISR(int newLeader, List<Integer> newIsrList, Partition partition) {
zkClient.setData(ControllerZNodePaths.getTopicPartitionStatePath(partition),
new LeaderAndIsrZNodeData(newLeader, newIsrList));
}
這段代碼表示將新的 Leader和 ISR信息更新到Zookeeper,確保全局一致性。
總結(jié)
本文,我們分析了 Kafka的 Leader選舉機(jī)制原理,它通過巧妙利用 Zookeeper和 ISR列表,提升了 Kafka的可靠性和可用性,但是,因?yàn)橹囟纫蕾?Zookeeper,因此使得 Kafka也存在很多風(fēng)險(xiǎn)。作為程序員,了解 Kafka的機(jī)制,可以幫助我們更好地使用和運(yùn)維 Kafka。