作者 | 蔡柱梁
審校 | 重樓
目錄
- 前言
- 可靠性分析
- 副本設(shè)計(jì)
- leader選舉機(jī)制
- 日志同步機(jī)制
1 前言
本文里面涉及到較多基礎(chǔ)概念,如果忘記了,那么可以去看下《一文帶你快速入門kafka》。
對(duì)于一個(gè)消息中間件而言,可靠性是是至關(guān)重要的要素之一。不管是面試或者實(shí)際工作中,我們都不得不面對(duì)幾個(gè)問(wèn)題:是幾個(gè)九?消息會(huì)不會(huì)丟失?如何保證冪等?如何順序消費(fèi)?在這篇文章中,筆者會(huì)和大家一起去看 Kafka 是如何設(shè)計(jì)的。
2 可靠性分析
針對(duì)上面的幾個(gè)問(wèn)題,Kafka 需要考慮包括并不限于以下問(wèn)題:
- 可用性
– Kafka 支持分布式架構(gòu),實(shí)現(xiàn)了故障轉(zhuǎn)移,避免單點(diǎn)問(wèn)題
如何避免腦裂問(wèn)題(這個(gè)要了解 Kafka 的 leader 選舉機(jī)制)
– 多副本機(jī)制,支持容災(zāi)備份
- 數(shù)據(jù)一致性如何保證
- 數(shù)據(jù)同步要如何實(shí)現(xiàn)
- 消息問(wèn)題
– 生產(chǎn)者投遞消息
broker回復(fù)投遞成功,但是消息丟失了。出現(xiàn)這種情況,一般是以下幾種情況:
- acks 配置成 0,生產(chǎn)者以為自己投遞成功了,但其實(shí)并沒(méi)有成功寫入 leader
- 消息持久化在 leader 分區(qū),但是沒(méi)有同步給 follower 就宕機(jī)了
這個(gè)問(wèn)題也好解決:生產(chǎn)者可以在發(fā)送消息前,先將消息持久化。至多是用了存儲(chǔ)空間,現(xiàn)在磁盤空間可以說(shuō)是最不值錢的了,而且我們還可以定期進(jìn)行歸檔壓縮/刪除處理,問(wèn)題不大。
– 消費(fèi)者消費(fèi)消息遇到消息丟失或者消息重復(fù)處理
- 消息丟失
消息丟失一般是以下這幾種情況:
- 消費(fèi)者拿到消息了,但是處理過(guò)程中發(fā)生異常
- 消費(fèi)者提交消費(fèi)位移的設(shè)計(jì)不合理
針對(duì)這個(gè)問(wèn)題,我們通常拿到消息會(huì)選擇將消息持久化在本地,然后再做消息處理,處理出問(wèn)題也可以重復(fù)處理。這種設(shè)計(jì)滿足我們大多數(shù)場(chǎng)景,但是對(duì)于消息生產(chǎn)速度遠(yuǎn)高于我們持久化速度的場(chǎng)景可能就不適用了,因?yàn)槲覀円紤]消息堆積問(wèn)題。不管是這個(gè)問(wèn)題,還是有些場(chǎng)景下無(wú)法找生產(chǎn)者重新投遞消息的問(wèn)題,都讓我們期待著消息中間件可以支持消息回溯功能。
- 重復(fù)消費(fèi)
這個(gè)可以交由使用者自己做冪等處理
– 消息需要有序消費(fèi)
我們知道 Kafka 是分區(qū)內(nèi)消息有序的。當(dāng)然,需要有序的消息就只能使用一個(gè)分區(qū),無(wú)疑是以 Kafka 的水平擴(kuò)展能力作為代價(jià)的。如果是需要全局有序,而我們又確定使用 Kafka,而且單分區(qū)的吞吐量不能滿足要求,那么我們只能自己進(jìn)行額外設(shè)計(jì)來(lái)保證了。
2.1 acks配置對(duì)消息丟失的影響
2.1.1 acks=1
消息成功寫入 leader 后,就會(huì)告訴生產(chǎn)者投遞成功。
如上圖例子,一共三個(gè)分區(qū),其中 follower1 和 follower2 均屬于 ISR。假設(shè) leader 成功寫入 3 和 4 之后,ISR 還沒(méi)同步,leader 就宕機(jī)了,這樣就丟失了 3 和 4,如下圖:
2.1.2 acks=-1 或者 acks=all
消息不僅要成功寫入 leader,還要 ISR 中的所有 follower 同步完成后,才會(huì)告知生產(chǎn)者投遞成功。
還是 2.1.1 的例子,這里無(wú)非會(huì)有兩種情況:
- leader 在同步完成后宕機(jī)
- leader 在同步完成前宕機(jī)
這個(gè)配置對(duì) Kafka 的性能會(huì)有較大影響,需要自己斟酌得失。
2.2 unclean.leader.election.enable
這個(gè)配置是用來(lái)控制 Kafka 是否可以選舉非 ISR 中的副本為 leader,0.11.0.0 之后的版本默認(rèn)為 false。雖然設(shè)置為 true 可以提高 Kafka 的可用性,但是會(huì)降低 Kafka 數(shù)據(jù)的可靠性。
2.3 小結(jié)
上面提出的問(wèn)題均有指出和 Kafka 相關(guān)部分的設(shè)計(jì)是哪些,這里再總結(jié)一下:
- 如何避免腦裂問(wèn)題——了解 Kafka 的 leader 選舉機(jī)制
- 數(shù)據(jù)同步&數(shù)據(jù)一致性問(wèn)題——了解 Kafka 的多副本設(shè)計(jì)
- 消息順序消費(fèi)問(wèn)題——了解 Kafka 的日志同步機(jī)制(分區(qū)有序)
3.副本設(shè)計(jì)
副本( Replica )是分布式系統(tǒng)中常見的概念之一,指的是分布式系統(tǒng)對(duì)數(shù)據(jù)和服務(wù)提供的一種冗余方式。
我們知道 Kafka 通過(guò)多副本機(jī)制,增強(qiáng)了容災(zāi)備份的能力,并且實(shí)現(xiàn)了故障轉(zhuǎn)移。這無(wú)疑是大大提高了 Kafka 的可用性,下面筆者會(huì)帶著大家一起看 Kafka 的副本機(jī)制是如何設(shè)計(jì)的。
在此之前,先簡(jiǎn)單復(fù)習(xí)幾個(gè)相關(guān)的概念:
- 副本是相對(duì)分區(qū)而言的,即副本是指某個(gè)分區(qū)的副本
- 在多副本情況下,其中一個(gè)副本為 leader,其它均為 follower。只有 leader 對(duì)外提供服務(wù),follower 僅同步leader 數(shù)據(jù)
- 分區(qū)中所有的副本集合稱為 AR,ISR 是與 leader 保持同步狀態(tài)的副本集合,leader 也是 ISR 中的一員
- LEO 是每個(gè)分區(qū)下一條消息要寫入的位置
- HW 是 ISR 中最小的 LEO,消費(fèi)者只能拉取 HW 之前的消息
3.1 失效副本
正常情況下,分區(qū)中所有副本都應(yīng)該屬于 ISR,但是網(wǎng)絡(luò)具有不可靠性。因此,難免在某一個(gè)時(shí)刻會(huì)有一些成員會(huì)被踢出 ISR,這些副本要么處于同步失效狀態(tài),要么功能失效,這些副本統(tǒng)稱為失效副本。
功能失效指的是無(wú)法工作,比如某個(gè) broker 宕機(jī)了,那么在它上面的分區(qū)就會(huì)失效。
同步失效又是怎么判斷是否同步失效的呢?是通過(guò)參數(shù) replica.lag.time.max.ms 來(lái)判斷的,默認(rèn)是 10000 毫秒。當(dāng)某個(gè) follower 同步滯后 leader 的時(shí)間超過(guò) 10 秒,則判定為同步失效。
具體實(shí)現(xiàn)原理如下:
當(dāng) follower 將 leader LEO 之前的消息全部同步完成,那么會(huì)認(rèn)為該 follower 已經(jīng)追上 leader,并更新 lastCaughtUpTimeMs。Kafka 的副本管理器有一個(gè)副本過(guò)期檢測(cè)的定時(shí)任務(wù),如果發(fā)現(xiàn)當(dāng)前時(shí)間 - lastCaughtUpTimeMs > 10秒,則判定同步失效。
除了時(shí)間設(shè)置以外,還有另一個(gè)參數(shù) replica.lag.max.message(默認(rèn)4000,這個(gè)是 broker 級(jí)別的參數(shù)),也是用來(lái)判定失效副本的。
一般情況下,這兩個(gè)參數(shù)都是使用默認(rèn)值就可以,因?yàn)槿绻麤](méi)有調(diào)優(yōu)經(jīng)驗(yàn),自己亂配置,容易導(dǎo)致 ISR 變動(dòng)過(guò)于頻繁。同時(shí),需要監(jiān)控失效副本的數(shù)量,因?yàn)樗呛饬?Kafka 是否健康的一個(gè)很重要的指標(biāo)。
PS:新加入的副本因子/宕機(jī)恢復(fù)后重新加入的副本在追趕上 leader 之前,也會(huì)一直處于失效狀態(tài)。
3.1.1 失效副本的作用
失效副本為 Kafka 帶來(lái)了什么收益呢?為什么需要設(shè)計(jì)這么一個(gè)狀態(tài)呢?
大家不妨試想下:假設(shè)允許 ISR 中有一個(gè)副本同步一直跟不上 leader。當(dāng) leader 發(fā)生宕機(jī)時(shí),這個(gè) follower 被選舉成了新的 leader,那么這時(shí)就會(huì)發(fā)生消息丟失。
一般會(huì)造成副本失效基本是以下兩個(gè)原因:
- follower 副本進(jìn)程卡頓,在一段時(shí)間內(nèi)無(wú)法發(fā)起同步請(qǐng)求,比如說(shuō)頻繁發(fā)生 FULL GC
- follower 同步過(guò)慢,在一段時(shí)間內(nèi)無(wú)法追上 leader,比如 I/O有問(wèn)題(筆者實(shí)際工作中遇到過(guò)一次,公司搭建自己的物理機(jī)房,用了二手服務(wù)器,有一臺(tái)服務(wù)器I/O老化導(dǎo)致讀寫數(shù)據(jù)慢,導(dǎo)致了副本失效,消息堆積等問(wèn)題)
3.2 LEO 與 HW
這一小節(jié)會(huì)更進(jìn)一步去講解它們之間的關(guān)系,讓大家可以更清楚 Kafka 的副本同步機(jī)制。
假設(shè)現(xiàn)在有 3 個(gè) broker,某個(gè) topic 有 1 個(gè)分區(qū),3 個(gè)副本?,F(xiàn)在有一個(gè) producer 發(fā)送了一條消息,這 3 個(gè)副本會(huì)發(fā)生些什么操作。
具體步驟如下:
- producer 發(fā)送消息到 leader
- leader 將消息追加到日志,并且更新日志的偏移量
- follower 執(zhí)行定時(shí)任務(wù)向 leader 發(fā)送 fetch request 同步數(shù)據(jù),該請(qǐng)求會(huì)帶上自己的 LEO
- leader 讀取本地日志,并更新 follower 的信息
- leader 返回 fetch response 給 follower,response 會(huì)包含 HW
- follower 將消息追加到本地日志,并更新日志的偏移量
為了更直觀地理解上面的步驟,下面將會(huì)用圖來(lái)展示。
1.一個(gè)新建的 topic 被寫入了 5 條消息,兩個(gè) follower 去拉取數(shù)據(jù)
2.leader 給 follower 返回 fetch response,并且 leader 又被寫入了 5 條消息
其中 follower1 同步了 2 條數(shù)據(jù),而 follower2 同步了 3 條數(shù)據(jù)。
而 follower 的 HW = min(自己的LEO, 同步回來(lái)的HW)
3.follower 再次同步數(shù)據(jù),同時(shí) leader 又被寫入了 5 條消息
leader 更新了 HW
4.leader 給 follower 返回 fetch response
根據(jù)公式,follower 更新 HW = 3
在一個(gè)分區(qū)中,leader 所在 broker 會(huì)記錄所有副本的 LEO 和 自己的 HW;而 follower 所在的 broker 只會(huì)記錄自己的 LEO 和 HW。因此,在邏輯層面上,我們可以得到下圖:
0.11.0.0版本之前,Kafka 是基于 HW 的同步機(jī)制,但是這個(gè)設(shè)計(jì)有可能出現(xiàn)數(shù)據(jù)丟失和數(shù)據(jù)不一致的問(wèn)題。Kafka 后面的版本通過(guò) leader epoch 來(lái)進(jìn)行優(yōu)化。
3.3 數(shù)據(jù)丟失 & 數(shù)據(jù)不一致的解決方案
3.2小節(jié)說(shuō)到了 LEO 與 HW 的更新機(jī)制,并且提到這種設(shè)計(jì)可能會(huì)出現(xiàn)數(shù)據(jù)丟失和數(shù)據(jù)不一致。我們先一起來(lái)看下這兩個(gè)問(wèn)題是如何產(chǎn)生的。
3.3.1 數(shù)據(jù)丟失
假設(shè)某一分區(qū)在某一時(shí)刻的狀態(tài)如下圖(L 代表是 leader):
可以看見副本A的 LEO 是 2,HW 是 1;副本B的 LEO 是 2,HW 是 2。顯然,哪怕沒(méi)有新的消息寫入副本B中,副本A也要過(guò)一小段時(shí)間才能追上副本A,并更新 HW。
假設(shè)在副本A更新 HW = 2之前,A宕機(jī)了,隨后立馬就恢復(fù)。這里會(huì)有一個(gè)截?cái)鄼C(jī)制——根據(jù)宕機(jī)之前持久化的HW 恢復(fù)消息。也就是說(shuō),A只恢復(fù)了 m1,m2 丟失了。
再假設(shè) A 剛恢復(fù),B 也宕機(jī)了,A 成為了 leader。這時(shí) B 又恢復(fù)了,并成為了 follower。由于 follower 的 HW 不能比 leader 的 HW 高,所以 B 的 m2 也丟失了。
總結(jié):這里大家可以發(fā)現(xiàn) follower 的 HW 更新是有一定間隙的,像我這個(gè)例子其實(shí) follower 是拿到 m2 了,只不過(guò) HW 要追上 leader 需要等下一次的 fetch request。除非配置 acks=-1 并且配置min.insync.replicas 大于 1,unclean.leader.election.enable = true 才行。
3.3.2 數(shù)據(jù)不一致
假設(shè)某一分區(qū)在某一時(shí)刻,副本A 的 HW = 2,LEO = 2;副本B 的 HW = 1,LEO = 1。
又假設(shè)它們同時(shí)掛了,B 先恢復(fù)。這時(shí),B 會(huì)成為 leader,如下圖:
此時(shí),B 寫入新消息 m3,并將 HW、LEO 更新為 2。此時(shí),A 也恢復(fù)了。由于 A 的 HW 也是 2,所以沒(méi)有截?cái)嘞?。如下圖:
這樣一來(lái),A 中 offset = 1 的消息是 m2,B 中 offset = 1 的消息是 m3,數(shù)據(jù)不一致了。
3.3.3 leader epoch
為了解決 3.3.1 和 3.3.2 的問(wèn)題,Kafka 從 0.11.0.0 開始引入 leader epoch,在需要截?cái)鄷r(shí)使用 leader epoch 作為依據(jù),而不再是 HW。
如果看框架代碼比較多的同學(xué)應(yīng)該知道 epoch 是相當(dāng)于版本的這么一個(gè)概念。leader epoch 的初始值是 0,每變更一次 leader,leader epoch 就會(huì)增加 1。另外,每個(gè)副本中還會(huì)增加一個(gè)矢量<LeaderEpoch => StartOffset>,其中 StartOffset 是當(dāng)前 leader epoch 下寫入第一條消息的偏移量。每個(gè)副本的 Log 下都有一個(gè) leader-epoch-checkpoint 文件,在發(fā)生 leader 變更時(shí),會(huì)將對(duì)應(yīng)的矢量追加到這個(gè)文件中。
3.3.3.1 解決數(shù)據(jù)丟失問(wèn)題
還是3.3.1的例子,只不過(guò)多了 leader epoch 矢量信息。
副本A:HW=1,LEO=2,LE(leader epoch)=0,Offset(StartOffset)=0
leader-副本B:HW=2,LEO=2,LE=0,Offset(StartOffset)=0
假設(shè)在副本A更新 HW = 2之前,A宕機(jī)了,隨后立馬就恢復(fù)。不過(guò)這里不會(huì)立馬進(jìn)行截?cái)嗳罩静僮?,而是?huì)發(fā)送一個(gè) OffsetsForLeaderEpochRequest 請(qǐng)求給 B,B 作為目前的 leader 在收到請(qǐng)求之后會(huì)返回 OffsetsForLeaderEpochResponse 給 A。
我們先來(lái)看下 OffsetsForLeaderEpochRequest 和 OffsetsForLeaderEpochResponse 的數(shù)據(jù)結(jié)構(gòu)。如下圖:
- OffsetsForLeaderEpochRequest
A 會(huì)將自己的 leader epoch 信息給 leader(A的 leader epoch 這里簡(jiǎn)化成 LE_A)。這里會(huì)出現(xiàn)兩種情況:
– 變更了 leader
B 會(huì)返回 LE_A+1 的 StartOffset 給 A
– 沒(méi)有變更 leader
B 會(huì)返回 A 的 LEO 給 A
因此,我們可以把 OffsetsForLeaderEpochRequest 看作是一個(gè)查詢 follower 當(dāng)前 leader_epoch 的 LEO。
- OffsetsForLeaderEpochResponse
這個(gè)例子中,B 會(huì)返回2給 A,而此時(shí)的 A 的 LEO 剛好是 2,所以不用進(jìn)行截?cái)嗳罩?。如下圖:
如果此時(shí)B掛了,A成了 leader,并有 m3 寫入,就會(huì)得到下圖:
可以看見 m2 并沒(méi)有丟失,并且也更新了 leader_epoch 矢量為 (1,2)。
3.3.3.2 解決數(shù)據(jù)不一致問(wèn)題
上圖是3.3.2的例子。副本A是 leader,B 是 follower。
A 的 HW=2,LEO=2,LE=(0,0)
B 的 HW=1,LEO=1,LE=(0,0)
此時(shí),A 和 B 同時(shí)宕機(jī),并且 B 先恢復(fù)成為了 leader。此時(shí),epoch 變成了 1。另外,新消息 m3 成功寫入,就會(huì)得到下圖:
接著,A 也恢復(fù)了,這時(shí) A 不會(huì)急著截?cái)嗳罩?,而是給 leader 發(fā)送 OffsetsForLeaderEpochRequest,B 會(huì)返回 LEO = 1 給 A。因此,A 會(huì)截?cái)嗳罩荆瑒h除 m2。之后,再給 B 發(fā)送 fetch request,得到 B 的響應(yīng)并更新后,將得到下圖:
這樣數(shù)據(jù)不一致問(wèn)題就解決了。
這里大家可能會(huì)有疑問(wèn),m2不是丟失了嗎?是的,這種設(shè)計(jì)因?yàn)楦戮哂幸欢ǖ拈g隙,并且沒(méi)有事務(wù)管理,所以會(huì)有丟失消息的風(fēng)險(xiǎn)。
從 CAP 定理來(lái)看,這里的設(shè)計(jì)屬于 AP。為什么這么說(shuō)呢?大家不妨想一下,如果為了不丟失數(shù)據(jù),這里加了事務(wù)控制的設(shè)計(jì),那么對(duì)于分區(qū)而言它的吞吐量是會(huì)下降的,甚至是不可用的,因?yàn)轫憫?yīng)速度是由短板的副本所決定的。對(duì)于定位是高吞吐量的 Kafka 而言,這顯然是不可接受的。
3.4 小結(jié)
Kafka 通過(guò)多副本機(jī)制增強(qiáng)了容災(zāi)備份的能力,并且基于多副本機(jī)制實(shí)現(xiàn)了故障轉(zhuǎn)移,避免了單點(diǎn)問(wèn)題,但同時(shí)也引進(jìn)了新的問(wèn)題——數(shù)據(jù)丟失和數(shù)據(jù)不一致。從 0.11.0.0 版本開始,Kafka 增加了 leader epoch,它對(duì)這兩個(gè)問(wèn)題進(jìn)行了優(yōu)化。雖然無(wú)法完全避免消息丟失,但是從實(shí)際的使用角度而言,這個(gè)問(wèn)題其實(shí)并不大。有實(shí)際工作經(jīng)驗(yàn)的同學(xué)應(yīng)該都知道,我們發(fā)送消息難以避免需要重推,哪怕消息中間件做到了百分百不丟失,其實(shí)我們?cè)谑褂脮r(shí)仍然會(huì)做防止消息丟失的設(shè)計(jì)。相對(duì)而言,數(shù)據(jù)一致性就更重要了,否則很容易讓訂閱消息的下游系統(tǒng)出現(xiàn)臟數(shù)據(jù)。
4 leader 選舉機(jī)制
在 Kafka 集群中會(huì)有一個(gè)或者多個(gè) broker,其中有一個(gè) broker 會(huì)被選舉為控制器,它負(fù)責(zé)管理整個(gè)集群中所有分區(qū)和副本的狀態(tài)。分區(qū)的 leader 出現(xiàn)故障時(shí),由控制器負(fù)責(zé)為其選舉新的 leader;當(dāng)某個(gè)分區(qū)的 ISR 發(fā)生變化時(shí),由控制器負(fù)責(zé)通知所有 broker 更新其元數(shù)據(jù)信息;當(dāng)某個(gè) topic 的分區(qū)數(shù)量發(fā)生變化時(shí),還是由控制器負(fù)責(zé)分區(qū)的重新分配。因此,只要控制器正常工作,分區(qū)的 leader 就是唯一的,不會(huì)有腦裂問(wèn)題。
那么, Kafka 是如何保證控制器只有一個(gè)的呢?如果控制器發(fā)生異常了怎么辦?控制器的選舉和異?;謴?fù)又是怎樣的?
4.1 控制器
控制器是 broker 維度的角色,它負(fù)責(zé)管理整個(gè)集群中所有分區(qū)和副本的狀態(tài)。
Kafka 中的控制器選舉工作依賴于 ZooKeeper,成功競(jìng)選為控制器的 broker 會(huì)在 ZooKeeper 中創(chuàng)建 /controller 臨時(shí)節(jié)點(diǎn),節(jié)點(diǎn)會(huì)存儲(chǔ)以下信息:
{
"version ": 1,
"brokerid": 0,
"timestamp": "1529210278988"
}
其中 version 目前是固定值不用管,brokerid 是成為控制器的 broker 的 id,timestamp 是 broker.id=0 的 broker 成為控制器的時(shí)間戳。
在任意時(shí)刻,Kafka 集群中有且僅有一個(gè)控制器。每個(gè) broker 啟動(dòng)時(shí)會(huì)嘗試讀取 ZooKeeper 的 /controller 節(jié)點(diǎn)的 brokerid 的值,如果 brokerid ≠ -1,則表示當(dāng)前集群已有控制器,broker 就會(huì)放棄競(jìng)選;如果不存在 /controller 節(jié)點(diǎn),broker 就會(huì)嘗試創(chuàng)建節(jié)點(diǎn),創(chuàng)建成功的 broker 就會(huì)成為控制器,將自己的 ID 賦予 brokerid,而對(duì)于創(chuàng)建節(jié)點(diǎn)失敗的 broker 則會(huì)在內(nèi)存中保存當(dāng)前控制器的 brokerid 值,這個(gè)值標(biāo)識(shí)為 activeControllerId。
上面是啟動(dòng) Kafka 集群以及正常情況下添加 broker 情況下的選舉過(guò)程。那么當(dāng)控制器出現(xiàn)故障時(shí),就需要重新選舉了。ZooKeeper 中還有一個(gè)與控制器有關(guān)的 /controller_epoch 節(jié)點(diǎn),該節(jié)點(diǎn)是持久節(jié)點(diǎn),里面存儲(chǔ)了一個(gè)整型的 controller_epoch 值,初始值是 1。當(dāng)控制器發(fā)生變化時(shí),controller_epoch 就會(huì)加 1。每個(gè)和控制器交互的請(qǐng)求一定會(huì)帶上 controller_epoch,當(dāng)控制器發(fā)現(xiàn)請(qǐng)求帶上的 controller_epoch 比自己內(nèi)存的小,那么這個(gè)請(qǐng)求則是無(wú)效請(qǐng)求;如果請(qǐng)求帶上的 controller_epoch 比自己內(nèi)存的大,說(shuō)明自己不再是控制器。由此可見,Kafka 是通過(guò) controller_epoch 來(lái)保證控制器的唯一性,進(jìn)而保證相關(guān)操作的一致性。
這里再擴(kuò)展一下,說(shuō)下作為控制器的 broker 多出來(lái)的責(zé)任:
- 監(jiān)聽分區(qū)相關(guān)的變化
– 在 ZooKeeper 的 /admin/reassign_partitions 節(jié)點(diǎn)注冊(cè) PartitionReassignmentHandler,用來(lái)處理分區(qū)重分配的動(dòng)作。
– 在 ZooKeeper 的 /isr_change_notification 節(jié)點(diǎn)注冊(cè) IsrChangeNotificetionHandler,用來(lái)處理 ISR 集合變更的動(dòng)作。
– 在 ZooKeeper 的 /admin/preferred-replica-election 節(jié)點(diǎn)注冊(cè) PreferredReplicaElectionHandler,用來(lái)處理優(yōu)先副本的選舉動(dòng)作。
- 監(jiān)聽主題相關(guān)的變化
– 在 ZooKeeper 的 /brokers/topics 節(jié)點(diǎn)注冊(cè) TopicChangeHandler,用來(lái)處理主題增減的變化。
– 在 ZooKeeper 的 /admin/delete_topics 節(jié)點(diǎn)注冊(cè) TopicDeletionHandler,用來(lái)處理刪除主題的動(dòng)作。
- 監(jiān)聽 broker 相關(guān)的變化
在 ZooKeeper 的 /brokers/ids 節(jié)點(diǎn)注冊(cè) BrokerChangeHandler,用來(lái)處理 broker 增減的變化。
- 從 ZooKeeper 中讀取當(dāng)前所有與主題、分區(qū)及 broker 有關(guān)的信息并進(jìn)行相應(yīng)的管理
對(duì)所有主題對(duì)應(yīng)的在 ZooKeeper 中的 /brokers/topics/ 節(jié)點(diǎn)添加 PartitionModificationsHandler,用來(lái)監(jiān)聽主題中的分區(qū)分配變化。
- 啟動(dòng)并管理分區(qū)狀態(tài)機(jī)和副本狀態(tài)機(jī)
- 更新集群的元數(shù)據(jù)信息
- 如果設(shè)置了 auto.leader.rebalance.enable = true,則還會(huì)開啟一個(gè)名為“auto-leader-rebalance-task”的定時(shí)任務(wù)來(lái)負(fù)責(zé)維護(hù)分區(qū)的優(yōu)先副本的均衡
成功競(jìng)選控制器的 broker 會(huì)在當(dāng)選后,讀取包括不限于上面提到的在 ZooKeeper 中的節(jié)點(diǎn)的數(shù)據(jù),初始化上下文信息,并且進(jìn)行管理。在 Kafka 中,因?yàn)闀?huì)有大量需要讀取或者更新上下文信息的操作,所以這里會(huì)有多線程問(wèn)題。如果單純采用鎖機(jī)制實(shí)現(xiàn),那么整體性能會(huì)大打折扣。因此,控制器采用的是單線程基于事件隊(duì)列的模型。將所有相關(guān)的操作、事件進(jìn)一步封裝成一個(gè)個(gè)事件,按照事件發(fā)生的順序存入 LinkedBlockingQueue 中,最后再使用一個(gè)專用線程按 FIFO 的原則處理各個(gè)事件。
控制器、非控制器 broker、ZooKeeper 的關(guān)系圖如下:
只有控制器會(huì)注冊(cè)相應(yīng)的監(jiān)聽器關(guān)注節(jié)點(diǎn)的數(shù)據(jù)變化,其他 broker 則不關(guān)注這些節(jié)點(diǎn)的數(shù)據(jù)變化(除了 /controller)。因?yàn)樗?broker 都會(huì)關(guān)心當(dāng)前的控制器到底是誰(shuí),當(dāng) /controller 的數(shù)據(jù)發(fā)生變化時(shí),就要更新自己內(nèi)存中的 activeControllerId。如果原來(lái)是控制器的 broker,發(fā)現(xiàn)自己現(xiàn)在不是了,就需要關(guān)閉資源,如注銷只有控制器才需要的監(jiān)聽器等。不管什么原因造成 /controller 的 brokerid 變更,再重新選舉控制器之前,要先確定參選的 broker 里面是否有前控制器,如果有,就要先“退位”,再開始新的選舉。
優(yōu)點(diǎn):只有控制器注冊(cè)監(jiān)聽器,可以有效避免嚴(yán)重依賴 ZooKeeper 的設(shè)計(jì)的通病——腦裂、羊群效應(yīng)、ZooKeeper 過(guò)載。
5 日志同步機(jī)制
在多副本的設(shè)計(jì)中,要實(shí)現(xiàn)數(shù)據(jù)一致性和順序性,最簡(jiǎn)單有效的辦法就是選舉 leader,由 leader 負(fù)責(zé)寫入順序,follower 復(fù)制同步即可。只要 leader 不出問(wèn)題,如宕機(jī)、腦裂等,那么就不需要擔(dān)心 follower 的數(shù)據(jù)同步問(wèn)題。
不過(guò),一個(gè)分布式系統(tǒng)肯定需要考慮故障轉(zhuǎn)移的。這時(shí)就需要考慮 leader 宕機(jī)后,選舉新 leader 的問(wèn)題。上面講了控制器會(huì)保證分區(qū) leader 的唯一性,但是數(shù)據(jù)丟失的問(wèn)題,還是需要 follower 里面有跟上 leader 的才行。因此,Kafka 里面會(huì)有 ISR 這么一個(gè)概念。另外,如果需要做到告知客戶端成功提交了某條消息,就需要保證新 leader 里面必需有這條消息,那么需要配置 acks=all 等相關(guān)配置。
作者介紹
蔡柱梁,51CTO社區(qū)編輯,從事Java后端開發(fā)8年,做過(guò)傳統(tǒng)項(xiàng)目廣電BOSS系統(tǒng),后投身互聯(lián)網(wǎng)電商,負(fù)責(zé)過(guò)訂單,TMS,中間件等。