Kafka集群是如何選擇Leader,你知道嗎?
前言
kafka集群是由多個(gè)broker節(jié)點(diǎn)組成,這里面包含了許多的知識(shí)點(diǎn),以下的這些問(wèn)題你都知道嗎?
- 你知道topic的分區(qū)leader是怎么選舉的嗎?
- 你知道zookeeper中存儲(chǔ)了kafka的什么信息嗎?起到什么做呢?
- 你知道kafka消息文件是怎么存儲(chǔ)的嗎?
- 如果kafka中l(wèi)eader節(jié)點(diǎn)或者follower節(jié)點(diǎn)發(fā)生故障,消息會(huì)丟失嗎?如何保證消息的一致性和可靠性呢?
如果你對(duì)這些問(wèn)題比較模糊的話,那么很有必要看看本文,去了解以下kafka的核心設(shè)計(jì),本文主要基于kafka3.x版本講解。
kafka broker核心機(jī)制
kafka集群整體架構(gòu)
kafka集群是由多個(gè)kafka broker通過(guò)連同一個(gè)zookeeper組成,那么他們是如何協(xié)同工作對(duì)外提供服務(wù)的呢?zookeeper中又存儲(chǔ)了什么信息呢?
- kafka broker啟動(dòng)后,會(huì)在zookeeper的/brokers/ids路徑下注冊(cè)。
- 同時(shí),其中一個(gè)broker會(huì)被選舉為控制器(Kafka Controller)。選舉規(guī)則也很簡(jiǎn)單,誰(shuí)先注冊(cè)到zookeeper中的/controller節(jié)點(diǎn),誰(shuí)就是控制器。Controller主要負(fù)責(zé)管理整個(gè)集群中所有分區(qū)和副本的狀態(tài)。
- Kafka Controller會(huì)進(jìn)行Leader選擇,比如上圖中針對(duì)TopicA中的0號(hào)分區(qū),選擇broker0作為L(zhǎng)eader, 然后會(huì)將選擇的節(jié)點(diǎn)信息注冊(cè)到zookeeper的/brokers/topics路徑下,記錄誰(shuí)是Leader,有哪些服務(wù)器可用。
- 被選舉為L(zhǎng)eader的topic分區(qū)提供對(duì)外的讀寫服務(wù)。為什么只有Leader節(jié)點(diǎn)提供讀寫服務(wù),而不是設(shè)計(jì)成主從方式,F(xiàn)ollower提供讀服務(wù)呢?
- 為了保證數(shù)據(jù)的一致性,因?yàn)橄⑼窖舆t,可能導(dǎo)致消費(fèi)者從不同節(jié)點(diǎn)讀取導(dǎo)致不一致。
- kafka設(shè)計(jì)目的是分布式日志系統(tǒng),不是一個(gè)讀多寫少的場(chǎng)景,kafka的讀寫基本是對(duì)等的。
- 主從方式的話帶來(lái)設(shè)計(jì)上的復(fù)雜度。
kafka leader選舉機(jī)制
那么問(wèn)題來(lái)了,kafka中topic分區(qū)是如何選擇leader的呢?為了更好的闡述,我們先來(lái)理解下面3個(gè)概念。
- ****ISR:表示和 Leader 保持同步的 Follower 集合。如果 Follower 長(zhǎng)時(shí)間未向 Leader 發(fā)送通信請(qǐng)求或同步數(shù)據(jù),則該 Follower 將被踢出 ISR。該時(shí)間閾值由replica.lag.time.max.ms參數(shù)設(shè)定,默認(rèn) 30s。Leader 發(fā)生故障之后,就會(huì)從 ISR 中選舉新的Leader。
- ****OSR:表示 Follower 與 Leader 副本同步時(shí),延遲過(guò)多的副本。
- ****AR: 指的是分區(qū)中的所有副本,所以AR = ISR + OSR。
Kafka Controller選舉Leader的規(guī)則:在isr隊(duì)列中存活為前提,按照AR中排在前面的優(yōu)先。例如ar[1,0,2], isr [1,0,2],那么leader就會(huì)按照1,0,2的順序輪詢。而AR中的這個(gè)順序kafka會(huì)進(jìn)行打散,分?jǐn)俴afka broker的壓力。
當(dāng)運(yùn)行中的控制器突然宕機(jī)或意外終止時(shí),Kafka 通過(guò)監(jiān)聽(tīng)zookeeper能夠快速地感知到,并立即啟用備用控制器來(lái)代替之前失敗的控制器。這個(gè)過(guò)程就被稱為 Failover,該過(guò)程是自動(dòng)完成的,無(wú)需你手動(dòng)干預(yù)。
開始的時(shí)候,Broker 0 是控制器。當(dāng) Broker 0 宕機(jī)后,ZooKeeper 通過(guò)`` Watch 機(jī)制感知到并刪除了 /controller 臨時(shí)節(jié)點(diǎn)。之后,所有存活的 Broker 開始競(jìng)選新的控制器身份。Broker 3最終贏得了選舉,成功地在 ZooKeeper 上重建了 /controller 節(jié)點(diǎn)。之后,Broker 3 會(huì)從 ZooKeeper 中讀取集群元數(shù)據(jù)信息,并初始化到自己的緩存中,后面就有Broker 3來(lái)接管選擇Leader的功能了。
Leader 和 Follower 故障處理機(jī)制
如果topic分區(qū)的leader和follower發(fā)生了故障,那么對(duì)于數(shù)據(jù)的一致性和可靠性會(huì)有什么樣的影響呢?
- LEO(Log End Offset):每個(gè)副本的最后一個(gè)offset,LEO就是最新的offset + 1。
- HW(High Watermark):水位線,所有副本中最小的LEO ,消費(fèi)者只能看到這個(gè)水位線左邊的消息,從而保證數(shù)據(jù)的一致性。
上圖所示,如果follower發(fā)生故障怎么辦?
- Follower發(fā)生故障后會(huì)被臨時(shí)踢出ISR隊(duì)列。
- 這個(gè)期間Leader和Follower繼續(xù)接收數(shù)據(jù)。
- 待該Follower恢復(fù)后,F(xiàn)ollower會(huì)讀取本地磁盤記錄的上次的HW,并將log文件高于HW的部分截取掉,從HW開始向Leader進(jìn)行同步。
- 等該Follower的LEO大于等于該P(yáng)artition的HW,即Follower追上Leader之后,就可以重新加入ISR了。
如果leader發(fā)生故障怎么辦?
- Leader發(fā)生故障之后,會(huì)從ISR中選出一個(gè)新的Leader
- 為保證多個(gè)副本之間的數(shù)據(jù)一致性,其余的Follower會(huì)先將各自的log文件高于HW的部分截掉,然后從新的Leader同步數(shù)據(jù)。
所以為了讓kafka broker保證消息的可靠性和一致性,我們要做如下的配置:
- 設(shè)置 生產(chǎn)者producer 的配置acks=all或者-1。leader 在返回確認(rèn)或錯(cuò)誤響應(yīng)之前,會(huì)等待所有副本收到悄息,需要配合min.insync.replicas配置使用。這樣就意味著leader和follower的LEO對(duì)齊。
- 設(shè)置topic 的配置replication.factor>=3副本大于3個(gè),并且 min.insync.replicas>=2表示至少兩個(gè)副本應(yīng)答。
- 設(shè)置broker配置unclean.leader.election.enable=false,默認(rèn)也是false,表示不對(duì)落后leader很多的follower也就是非ISR隊(duì)列中的副本選擇為L(zhǎng)eader, 這樣可以避免數(shù)據(jù)丟失和數(shù)據(jù) 不一致,但是可用性會(huì)降低。
Leader Partition 負(fù)載平衡
正常情況下,Kafka本身會(huì)自動(dòng)把Leader Partition均勻分散在各個(gè)機(jī)器上,來(lái)保證每臺(tái)機(jī)器的讀寫吞吐量都是均勻的。但是如果某些broker宕機(jī),會(huì)導(dǎo)致Leader Partition過(guò)于集中在其他少部分幾臺(tái)broker上,這會(huì)導(dǎo)致少數(shù)幾臺(tái)broker的讀寫請(qǐng)求壓力過(guò)高,其他宕機(jī)的broker重啟之后都是follower partition,讀寫請(qǐng)求很低,造成集群負(fù)載不均衡。那么該如何負(fù)載平衡呢?
- 自動(dòng)負(fù)載均衡
通過(guò)broker配置設(shè)置自動(dòng)負(fù)載均衡。
- auto.leader.rebalance.enable:默認(rèn)是 true。自動(dòng) Leader Partition 平衡。生產(chǎn)環(huán)境中,leader 重選舉的代價(jià)比較大,可能會(huì)帶來(lái)性能影響,建議設(shè)置為 false 關(guān)閉。
- leader.imbalance.per.broker.percentage:默認(rèn)是 10%。每個(gè) broker 允許的不平衡的 leader的比率。如果每個(gè) broker 超過(guò)了這個(gè)值,控制器會(huì)觸發(fā) leader 的平衡。
- leader.imbalance.check.interval.seconds:默認(rèn)值 300 秒。檢查 leader 負(fù)載是否平衡的間隔時(shí)間。
- 手動(dòng)負(fù)載均衡
- 對(duì)所有topic進(jìn)行負(fù)載均衡
./bin/kafka-preferred-replica-election.sh --zookeeper hadoop16:2181,hadoop17:2181,hadoop18:2181/kafka08
- 對(duì)指定topic負(fù)載均衡
cat topicPartitionList.json
{
"partitions":
[
{"topic":"test.example","partition": "0"}
]
}
./bin/kafka-preferred-replica-election.sh --zookeeper hadoop16:2181,hadoop17:2181,hadoop18:2181/kafka08 --path-to-json-file topicPartitionList.json
kafka的存儲(chǔ)機(jī)制
kafka消息最終會(huì)存儲(chǔ)到磁盤文件中,那么是如何存儲(chǔ)的呢?清理策略是什么呢?
一個(gè)topic分為多個(gè)partition,每個(gè)partition對(duì)應(yīng)于一個(gè)log文件,為防止log文件過(guò)大導(dǎo)致數(shù)據(jù)定位效率低下,Kafka采取了分片和索引機(jī)制,每個(gè)partition分為多個(gè)segment。每個(gè)segment包括:“.index”文件、“.log”文件和.timeindex等文件,Producer生產(chǎn)的數(shù)據(jù)會(huì)被不斷追加到該log文件末端。
上圖中t1即為一個(gè)topic的名稱,而“t1-0/t1-1”則表明這個(gè)目錄是t1這個(gè)topic的哪個(gè)partition。
kafka中的索引文件以稀疏索引(sparseindex)的方式構(gòu)造消息的索引,如下圖所示:
1.根據(jù)目標(biāo)offset定位segment文件
2.找到小于等于目標(biāo)offset的最大offset對(duì)應(yīng)的索引項(xiàng)
3.定位到log文件
4.向下遍歷找到目標(biāo)Record
注意:index為稀疏索引,大約每往log文件寫入4kb數(shù)據(jù),會(huì)往index文件寫入一條索引。通過(guò)參數(shù)log.index.interval.bytes控制,默認(rèn)4kb。
那kafka中磁盤文件保存多久呢?
kafka 中默認(rèn)的日志保存時(shí)間為 7 天,可以通過(guò)調(diào)整如下參數(shù)修改保存時(shí)間。
- log.retention.hours,最低優(yōu)先級(jí)小時(shí),默認(rèn) 7 天。
- log.retention.minutes,分鐘。
- log.retention.ms,最高優(yōu)先級(jí)毫秒。
- log.retention.check.interval.ms,負(fù)責(zé)設(shè)置檢查周期,默認(rèn) 5 分鐘。
kafka broker重要參數(shù)
前面講解了kafka broker中的核心機(jī)制,我們?cè)賮?lái)看下重要的配置參數(shù)。
首先來(lái)說(shuō)下kafka服務(wù)端配置屬性Update Mode的作用:
- read-only。被標(biāo)記為read-only 的參數(shù)和原來(lái)的參數(shù)行為一樣,只有重啟 Broker,才能令修改生效。
- per-broker。被標(biāo)記為 per-broker 的參數(shù)屬于動(dòng)態(tài)參數(shù),修改它之后,無(wú)需重啟就會(huì)在對(duì)應(yīng)的 broker 上生效。
- cluster-wide。被標(biāo)記為 cluster-wide 的參數(shù)也屬于動(dòng)態(tài)參數(shù),修改它之后,會(huì)在整個(gè)集群范圍內(nèi)生效,也就是說(shuō),對(duì)所有 broker 都生效。也可以為具體的 broker 修改cluster-wide 參數(shù)。
Broker重要參數(shù)
參數(shù)名稱 | 描述 |
replica.lag.time.max.ms | ISR 中,如果 Follower 長(zhǎng)時(shí)間未向 Leader 發(fā)送通信請(qǐng)求或同步數(shù)據(jù),則該 Follower 將被踢出 ISR。該時(shí)間閾值,默認(rèn) 30s。 |
auto.leader.rebalance.enable | 默認(rèn)是 true。自動(dòng) Leader Partition 平衡。 |
leader.imbalance.per.broker.percentage | 默認(rèn)是 10%。每個(gè) broker 允許的不平衡的 leader的比率。如果每個(gè) broker 超過(guò)了這個(gè)值,控制器會(huì)觸發(fā) leader 的平衡。 |
leader.imbalance.check.interval.seconds | 默認(rèn)值 300 秒。檢查 leader 負(fù)載是否平衡的間隔時(shí)間。 |
log.segment.bytes | Kafka 中 log 日志是分成一塊塊存儲(chǔ)的,此配置是指 log 日志劃分 成塊的大小,默認(rèn)值 1G。 |
log.index.interval.bytes | 默認(rèn) 4kb,kafka 里面每當(dāng)寫入了 4kb 大小的日志(.log),然后就往 index 文件里面記錄一個(gè)索引。 |
log.retention.hours | Kafka 中數(shù)據(jù)保存的時(shí)間,默認(rèn) 7 天。 |
log.retention.minutes | Kafka 中數(shù)據(jù)保存的時(shí)間,分鐘級(jí)別,默認(rèn)關(guān)閉。 |
log.retention.ms | Kafka 中數(shù)據(jù)保存的時(shí)間,毫秒級(jí)別,默認(rèn)關(guān)閉。 |
log.retention.check.interval.ms | 檢查數(shù)據(jù)是否保存超時(shí)的間隔,默認(rèn)是 5 分鐘。 |
log.retention.bytes | 默認(rèn)等于-1,表示無(wú)窮大。超過(guò)設(shè)置的所有日志總大小,刪除最早的 segment。 |
log.cleanup.policy | 默認(rèn)是 delete,表示所有數(shù)據(jù)啟用刪除策略;如果設(shè)置值為 compact,表示所有數(shù)據(jù)啟用壓縮策略。 |
num.io.threads | 默認(rèn)是 8。負(fù)責(zé)寫磁盤的線程數(shù)。整個(gè)參數(shù)值要占總核數(shù)的 50%。 |
num.replica.fetchers | 副本拉取線程數(shù),這個(gè)參數(shù)占總核數(shù)的 50%的 1/3 |
num.network.threads | 默認(rèn)是 3。數(shù)據(jù)傳輸線程數(shù),這個(gè)參數(shù)占總核數(shù)的50%的 2/3 。 |
log.flush.interval.messages | 強(qiáng)制頁(yè)緩存刷寫到磁盤的條數(shù),默認(rèn)是 long 的最大值,9223372036854775807。一般不建議修改,交給系統(tǒng)自己管理。 |
log.flush.interval.ms | 每隔多久,刷數(shù)據(jù)到磁盤,默認(rèn)是 null。一般不建議修改,交給系統(tǒng)自己管理。 |
總結(jié)
Kafka集群的分區(qū)多副本架構(gòu)是 Kafka 可靠性保證的核心,把消息寫入多個(gè)副本可以使 Kafka 在發(fā)生崩潰時(shí)仍能保證消息的持久性。本文圍繞這樣的核心架構(gòu)講解了其中的一些核心機(jī)制,包括Leader的選舉、消息的存儲(chǔ)機(jī)制等等。