Kafka 原理以及分區(qū)分配策略剖析
一、簡介
流處理平臺有以下3個特性:
- 可以讓你發(fā)布和訂閱流式的記錄。這一方面與消息隊列或者企業(yè)消息系統(tǒng)類似。
- 可以儲存流式的記錄,并且有較好的容錯性。
- 可以在流式記錄產(chǎn)生時就進(jìn)行處理。
1.1 消息隊列的兩種模式
1.1.1 點對點模式
生產(chǎn)者將消息發(fā)送到queue中,然后消費(fèi)者從queue中取出并且消費(fèi)消息。消息被消費(fèi)以后,queue中不再存儲,所以消費(fèi)者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息。Queue支持存在多個消費(fèi)者,但是對一個消息而言,只能被一個消費(fèi)者消費(fèi)。
1.1.2 發(fā)布/訂閱模式
生產(chǎn)者將消息發(fā)布到topic中,同時可以有多個消費(fèi)者訂閱該消息。和點對點方式不同,發(fā)布到topic的消息會被所有訂閱者消費(fèi)。
1.2 Kafka 適合什么樣的場景
它可以用于兩大類別的應(yīng)用:
- 構(gòu)造實時流數(shù)據(jù)管道,它可以在系統(tǒng)或應(yīng)用之間可靠地獲取數(shù)據(jù)。(相當(dāng)于message queue)。
- 構(gòu)建實時流式應(yīng)用程序,對這些流數(shù)據(jù)進(jìn)行轉(zhuǎn)換或者影響。(就是流處理,通過kafka stream topic和topic之間內(nèi)部進(jìn)行變化)。
為了理解Kafka是如何做到以上所說的功能,從下面開始,我們將深入探索Kafka的特性。
首先是一些概念:
- Kafka作為一個集群,運(yùn)行在一臺或者多臺服務(wù)器上。
- Kafka 通過 topic 對存儲的流數(shù)據(jù)進(jìn)行分類。
- 每條記錄中包含一個key,一個value和一個timestamp(時間戳)。
1.3 主題和分區(qū)
Kafka的消息通過主題(Topic)進(jìn)行分類,就好比是數(shù)據(jù)庫的表,或者是文件系統(tǒng)里的文件夾。主題可以被分為若干個分區(qū)(Partition),一個分區(qū)就是一個提交日志。消息以追加的方式寫入分區(qū),然后以先進(jìn)先出的順序讀取。注意,由于一個主題一般包含幾個分區(qū),因此無法在整個主題范圍內(nèi)保證消息的順序,但可以保證消息在單個分區(qū)內(nèi)的順序。主題是邏輯上的概念,在物理上,一個主題是橫跨多個服務(wù)器的。
Kafka 集群保留所有發(fā)布的記錄(無論他們是否已被消費(fèi)),并通過一個可配置的參數(shù)——保留期限來控制(可以同時配置時間和消息大小,以較小的那個為準(zhǔn))。舉個例子, 如果保留策略設(shè)置為2天,一條記錄發(fā)布后兩天內(nèi),可以隨時被消費(fèi),兩天過后這條記錄會被拋棄并釋放磁盤空間。
有時候我們需要增加分區(qū)的數(shù)量,比如為了擴(kuò)展主題的容量、降低單個分區(qū)的吞吐量或者要在單個消費(fèi)者組內(nèi)運(yùn)行更多的消費(fèi)者(因為一個分區(qū)只能由消費(fèi)者組里的一個消費(fèi)者讀?。?。從消費(fèi)者的角度來看,基于鍵的主題添加分區(qū)是很困難的,因為分區(qū)數(shù)量改變,鍵到分區(qū)的映射也會變化,所以對于基于鍵的主題來說,建議在一開始就設(shè)置好分區(qū),避免以后對其進(jìn)行調(diào)整。
(注意:不能減少分區(qū)的數(shù)量,因為如果刪除了分區(qū),分區(qū)里面的數(shù)據(jù)也一并刪除了,導(dǎo)致數(shù)據(jù)不一致。如果一定要減少分區(qū)的數(shù)量,只能刪除topic重建)
1.4 生產(chǎn)者和消費(fèi)者
生產(chǎn)者(發(fā)布者)創(chuàng)建消息,一般情況下,一個消息會被發(fā)布到一個特定的主題上。生產(chǎn)者在默認(rèn)情況下把消息均衡的分布到主題的所有分區(qū)上,而并不關(guān)心特定消息會被寫入哪個分區(qū)。不過,生產(chǎn)者也可以把消息直接寫到指定的分區(qū)。這通常通過消息鍵和分區(qū)器來實現(xiàn),分區(qū)器為鍵生成一個散列值,并將其映射到指定的分區(qū)上。生產(chǎn)者也可以自定義分區(qū)器,根據(jù)不同的業(yè)務(wù)規(guī)則將消息映射到分區(qū)。
消費(fèi)者(訂閱者)讀取消息,消費(fèi)者可以訂閱一個或者多個主題,并按照消息生成的順序讀取它們。消費(fèi)者通過檢查消息的偏移量來區(qū)分已經(jīng)讀取過的消息。偏移量是一種元數(shù)據(jù),它是一個不斷遞增的整數(shù)值,在創(chuàng)建消息時,kafka會把它添加到消息里。在給定的分區(qū)里,每個消息的偏移量都是唯一的。消費(fèi)者把每個分區(qū)最后讀取的消息偏移量保存在zookeeper或者kafka上,如果消費(fèi)者關(guān)閉或者重啟,它的讀取狀態(tài)不會丟失。
消費(fèi)者是消費(fèi)者組的一部分,也就是說,會有一個或者多個消費(fèi)共同讀取一個主題。消費(fèi)者組保證每個分區(qū)只能被同一個組內(nèi)的一個消費(fèi)者使用。如果一個消費(fèi)者失效,群組里的其他消費(fèi)者可以接管失效消費(fèi)者的工作。
1.5 broker和集群
broker:一個獨(dú)立的kafka服務(wù)器被稱為broker。broker接收來自生產(chǎn)者的消息,為消息設(shè)置偏移量,并提交消息到磁盤保存。broker為消費(fèi)者提供服務(wù),對讀取分區(qū)的請求作出相應(yīng),返回已經(jīng)提交到磁盤上的消息。
集群:交給同一個zookeeper集群來管理的broker節(jié)點就組成了kafka的集群。
broker是集群的組成部分,每個集群都有一個broker同時充當(dāng)集群控制器的角色??刂破髫?fù)責(zé)管理工作,包括將分區(qū)分配給broker和監(jiān)控broker。在broker中,一個分區(qū)從屬于一個broker,該broker被稱為分區(qū)的首領(lǐng)。一個分區(qū)可以分配給多個broker(Topic設(shè)置了多個副本的時候),這時會發(fā)生分區(qū)復(fù)制。如下圖:
broker如何處理請求:broker會在它所監(jiān)聽的每個端口上運(yùn)行一個Acceptor線程,這個線程會創(chuàng)建一個連接并把它交給Processor線程去處理。Processor線程(也叫網(wǎng)絡(luò)線程)的數(shù)量是可配的,Processor線程負(fù)責(zé)從客戶端獲取請求信息,把它們放進(jìn)請求隊列,然后從響應(yīng)隊列獲取響應(yīng)信息,并發(fā)送給客戶端。如下圖所示:
生產(chǎn)請求和獲取請求都必須發(fā)送給分區(qū)的首領(lǐng)副本(分區(qū)Leader)。如果broker收到一個針對特定分區(qū)的請求,而該分區(qū)的首領(lǐng)在另外一個broker上,那么發(fā)送請求的客戶端會收到一個“非分區(qū)首領(lǐng)”的錯誤響應(yīng)。Kafka客戶端要自己負(fù)責(zé)把生產(chǎn)請求和獲取請求發(fā)送到正確的broker上。
客戶端如何知道該往哪里發(fā)送請求呢?客戶端使用了另外一種請求類型——元數(shù)據(jù)請求。這種請求包含了客戶端感興趣的主題列表,服務(wù)器的響應(yīng)消息里指明了這些主題所包含的分區(qū)、每個分區(qū)都有哪些副本,以及哪個副本是首領(lǐng)。元數(shù)據(jù)請求可以發(fā)給任意一個broker,因為所有的broker都緩存了這些信息??蛻舳司彺孢@些元數(shù)據(jù),并且會定時從broker請求刷新這些信息。此外如果客戶端收到“非首領(lǐng)”錯誤,它會在嘗試重新發(fā)送請求之前,先刷新元數(shù)據(jù)。
1.6 Kafka 基礎(chǔ)架構(gòu)
二、Kafka架構(gòu)深入
2.1 Kafka工作流程及文件存儲機(jī)制
2.1.1 工作流程
Kafka中消息是以topic進(jìn)行分類的,生產(chǎn)者生產(chǎn)消息,消費(fèi)者消費(fèi)消息,都是面向topic的。
Topic是邏輯上的概念,而partition(分區(qū))是物理上的概念,每個partition對應(yīng)于一個log文件,該log文件中存儲的就是producer生產(chǎn)的數(shù)據(jù)。Producer生產(chǎn)的數(shù)據(jù)會被不斷追加到該log文件末端,且每條數(shù)據(jù)都有自己的offset。消費(fèi)者組中的每個消費(fèi)者,都會實時記錄自己消費(fèi)到哪個offset,以便出錯恢復(fù)時,從上次的位置繼續(xù)消費(fèi)。
2.1.2 文件存儲機(jī)制
由于生產(chǎn)者生產(chǎn)的消息會不斷追加到log文件末尾,為防止log文件過大導(dǎo)致數(shù)據(jù)定位效率低下,Kafka采取了分片和索引的機(jī)制,將每個partition分為多個segment。(由log.segment.bytes決定,控制每個segment的大小,也可通過log.segment.ms控制,指定多長時間后日志片段會被關(guān)閉)每個segment對應(yīng)兩個文件——“.index”文件和“.log”文件。這些文件位于一個文件夾下,該文件夾的命名規(guī)則為:topic名稱+分區(qū)序號。例如:bing這個topic有3個分區(qū),則其對應(yīng)的文件夾為:bing-0、bing-1和bing-2。
索引文件和日志文件命名規(guī)則:每個 LogSegment 都有一個基準(zhǔn)偏移量,用來表示當(dāng)前 LogSegment 中第一條消息的 offset。偏移量是一個 64位的長整形數(shù),固定是20位數(shù)字,長度未達(dá)到,用 0 進(jìn)行填補(bǔ)。如下圖所示:
index和log文件以當(dāng)前segment的第一條消息的offset命名。index文件記錄的是數(shù)據(jù)文件的offset和對應(yīng)的物理位置,正是有了這個index文件,才能對任一數(shù)據(jù)寫入和查看擁有O(1)的復(fù)雜度,index文件的粒度可以通過參數(shù)log.index.interval.bytes來控制,默認(rèn)是是每過4096字節(jié)記錄一條index。下圖為index文件和log文件的結(jié)構(gòu)示意圖:
查找message的流程(比如要查找offset為170417的message):
- 首先用二分查找確定它是在哪個Segment文件中,其中0000000000000000000.index為最開始的文件,第二個文件為0000000000000170410.index(起始偏移為170410+1 = 170411),而第三個文件為0000000000000239430.index(起始偏移為239430+1 = 239431)。所以這個offset = 170417就落在第二個文件中。其他后續(xù)文件可以依此類推,以起始偏移量命名并排列這些文件,然后根據(jù)二分查找法就可以快速定位到具體文件位置。
- 用該offset減去索引文件的編號,即170417 - 170410 = 7,也用二分查找法找到索引文件中等于或者小于7的最大的那個編號??梢钥闯鑫覀兡軌蛘业絒4,476]這組數(shù)據(jù),476即offset=170410 + 4 = 170414的消息在log文件中的偏移量。
- 打開數(shù)據(jù)文件(0000000000000170410.log),從位置為476的那個地方開始順序掃描直到找到offset為170417的那條Message。
2.1.3 數(shù)據(jù)過期機(jī)制
當(dāng)日志片段大小達(dá)到log.segment.bytes指定的上限(默認(rèn)是1GB)或者日志片段打開時長達(dá)到log.segment.ms時,當(dāng)前日志片段就會被關(guān)閉,一個新的日志片段被打開。如果一個日志片段被關(guān)閉,就開始等待過期。當(dāng)前正在寫入的片段叫做活躍片段,活躍片段永遠(yuǎn)不會被刪除,所以如果你要保留數(shù)據(jù)1天,但是片段包含5天的數(shù)據(jù),那么這些數(shù)據(jù)就會被保留5天,因為片段被關(guān)閉之前,這些數(shù)據(jù)無法被刪除。
2.2 Kafka生產(chǎn)者
2.2.1 分區(qū)策略
為什么要分區(qū)
- 多Partition分布式存儲,利于集群數(shù)據(jù)的均衡。
- 并發(fā)讀寫,加快讀寫速度。
- 加快數(shù)據(jù)恢復(fù)的速率:當(dāng)某臺機(jī)器掛了,每個Topic僅需恢復(fù)一部分的數(shù)據(jù),多機(jī)器并發(fā)。
分區(qū)的原則
- 指明partition的情況下,使用指定的partition;
- 沒有指明partition,但是有key的情況下,將key的hash值與topic的partition數(shù)進(jìn)行取余得到partition值;
- 既沒有指定partition,也沒有key的情況下,第一次調(diào)用時隨機(jī)生成一個整數(shù)(后面每次調(diào)用在這個整數(shù)上自增),將這個值與topic可用的partition數(shù)取余得到partition值,也就是常說的round-robin算法。
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
//key為空時,獲取一個自增的計數(shù),然后對分區(qū)做取模得到分區(qū)編號
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
// key不為空時,通過key的hash對分區(qū)取模(疑問:為什么這里不像上面那樣,使用availablePartitions呢?)
// 根據(jù)《Kafka權(quán)威指南》Page45理解:為了保證相同的鍵,總是能路由到固定的分區(qū),如果使用可用分區(qū),那么因為分區(qū)數(shù)變化,會導(dǎo)致相同的key,路由到不同分區(qū)
// 所以如果要使用key來映射分區(qū),最好在創(chuàng)建主題的時候就把分區(qū)規(guī)劃好
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
//為每個topic維護(hù)了一個AtomicInteger對象,每次獲取時+1
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
2.2.2 數(shù)據(jù)可靠性保證
kafka提供了哪些方面的保證
- kafka可以保證分區(qū)消息的順序。如果使用同一個生產(chǎn)者往同一個分區(qū)寫入消息,而且消息B在消息A之后寫入,那么kafka可以保證消息B的偏移量比消息A的偏移量大,而且消費(fèi)者會先讀取到消息A再讀取消息B。
- 只有當(dāng)消息被寫入分區(qū)的所有副本時,它才被認(rèn)為是“已提交”的。生產(chǎn)者可以選擇接收不同類型的確認(rèn),比如在消息被完全提交時的確認(rèn)、在消息被寫入分區(qū)首領(lǐng)時的確認(rèn),或者在消息被發(fā)送到網(wǎng)絡(luò)時的確認(rèn)。
- 只要還有一個副本是活躍的,那么已經(jīng)提交的信息就不會丟失。
- 消費(fèi)者只能讀取到已經(jīng)提交的消息。
復(fù)制
Kafka的復(fù)制機(jī)制和分區(qū)的多副本架構(gòu)是kafka可靠性保證的核心。把消息寫入多個副本可以使kafka在發(fā)生奔潰時仍能保證消息的持久性。
kafka的topic被分成多個分區(qū),分區(qū)是基本的數(shù)據(jù)塊。每個分區(qū)可以有多個副本,其中一個是首領(lǐng)。所有事件都是發(fā)給首領(lǐng)副本,或者直接從首領(lǐng)副本讀取事件。其他副本只需要與首領(lǐng)副本保持同步,并及時復(fù)制最新的事件。
Leader維護(hù)了一個動態(tài)的in-sync replica set(ISR),意為和leader保持同步的follower集合。當(dāng)ISR中的follower完成數(shù)據(jù)同步后,leader就會發(fā)送ack。如果follower長時間未向leader同步數(shù)據(jù),則該follower將被踢出ISR,該時間閾值由replica.lag.time.max.ms參數(shù)設(shè)定。Leader不可用時,將會從ISR中選舉新的leader。滿足以下條件才能被認(rèn)為是同步的:
- 與zookeeper之間有一個活躍的會話,也就是說,它在過去的6s(可配置)內(nèi)向zookeeper發(fā)送過心跳。
- 在過去的10s(可配置)內(nèi)從首領(lǐng)那里獲取過最新的數(shù)據(jù)。
影響Kafka消息存儲可靠性的配置
ack應(yīng)答機(jī)制
對于某些不太重要的數(shù)據(jù),對數(shù)據(jù)的可靠性要求不是很高,能夠容忍數(shù)據(jù)的少量丟失,所以沒有必要等ISR中的follower全部接收成功。所以Kafka提供了三種可靠性級別,用戶可以根據(jù)對可靠性和延遲的要求進(jìn)行權(quán)衡。acks:
- 0: producer不等待broker的ack,這一操作提供了一個最低的延遲,broker一接收到還沒寫入磁盤就已經(jīng)返回,當(dāng)broker故障時可能丟失數(shù)據(jù);
- 1: producer等待leader的ack,partition的leader落盤成功后返回ack,如果在follower同步成功之前l(fā)eader故障,那么將會丟失數(shù)據(jù);
- -1(all):producer等待broker的ack,partition的leader和ISR里的follower全部落盤成功后才返回ack。但是如果在follower同步完成后,broker發(fā)送ack之前,leader發(fā)生故障,那么會造成重復(fù)數(shù)據(jù)。(極端情況下也有可能丟數(shù)據(jù):ISR中只有一個Leader時,相當(dāng)于1的情況)。
消費(fèi)一致性保證
(1)follower故障
follower發(fā)生故障后會被臨時踢出ISR,待該follower恢復(fù)后,follower會讀取本地磁盤記錄的上次的HW,并將log文件高于HW的部分截取掉,從HW開始向leader進(jìn)行同步。
等該follower的LEO大于等于該P(yáng)artition的HW,即follower追上leader之后,就可以重新加入ISR了。
(2)leader故障
leader發(fā)生故障后,會從ISR中選出一個新的leader,之后為了保證多個副本之間的數(shù)據(jù)一致性,其余的follower會先將各自的log文件高于HW的部分截掉,然后從新的leader同步數(shù)據(jù)。
注意:這只能保證副本之間的數(shù)據(jù)一致性,并不能保證數(shù)據(jù)不丟失或者不重復(fù)。
2.2.3 消息發(fā)送流程
Kafka 的producer 發(fā)送消息采用的是異步發(fā)送的方式。在消息發(fā)送過程中,涉及到了兩個線程——main線程和sender線程,以及一個線程共享變量——RecordAccumulator。main線程將消息發(fā)送給RecordAccumulator,sender線程不斷從RecordAccumulator中拉取消息發(fā)送到Kafka broker。
為了提高效率,消息被分批次寫入kafka。批次就是一組消息,這些消息屬于同一個主題和分區(qū)。(如果每一個消息都單獨(dú)穿行于網(wǎng)絡(luò),會導(dǎo)致大量的網(wǎng)絡(luò)開銷,把消息分成批次傳輸可以減少網(wǎng)絡(luò)開銷。不過要在時間延遲和吞吐量之間做出權(quán)衡:批次越大,單位時間內(nèi)處理的消息就越多,單個消息的傳輸時間就越長)。批次數(shù)據(jù)會被壓縮,這樣可以提升數(shù)據(jù)的傳輸和存儲能力,但要做更多的計算處理。
相關(guān)參數(shù):
- batch.size:只有數(shù)據(jù)積累到batch.size后,sender才會發(fā)送數(shù)據(jù)。(單位:字節(jié),注意:不是消息個數(shù))。
- linger.ms:如果數(shù)據(jù)遲遲未達(dá)到batch.size,sender等待 linger.ms之后也會發(fā)送數(shù)據(jù)。(單位:毫秒)。
- client.id:該參數(shù)可以是任意字符串,服務(wù)器會用它來識別消息的來源,還可用用在日志和配額指標(biāo)里。
- max.in.flight.requests.per.connection:該參數(shù)指定了生產(chǎn)者在收到服務(wù)器響應(yīng)之前可以發(fā)送多少個消息。它的值越高,就會占用越多的內(nèi)存,不過也會提升吞吐量。把它設(shè)置為1可以保證消息時按發(fā)送的順序?qū)懭敕?wù)器的,即使發(fā)生了重試。
2.3 Kafka消費(fèi)者
2.3.1 消費(fèi)方式
consumer采用pull(拉)的模式從broker中讀取數(shù)據(jù)。
push(推)模式很難適應(yīng)消費(fèi)速率不同的消費(fèi)者,因為消息發(fā)送速率是由broker決定的。它的目標(biāo)是盡可能以最快的速度傳遞消息,但是這樣容易造成consumer來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而pull模式可以根據(jù)consumer的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息。
pull模式的不足之處是,如果kafka沒有數(shù)據(jù),消費(fèi)者可能會陷入循環(huán)中,一直返回空數(shù)據(jù)。針對這一點,kafka的消費(fèi)者在消費(fèi)數(shù)據(jù)時會傳入一個時長參數(shù)timeout,如果當(dāng)前沒有數(shù)據(jù)可消費(fèi),consumer會等待一段時間后再返回。
2.3.2 分區(qū)分配策略
一個consumer group中有多個consumer,一個topic有多個partition,所以必然會涉及到partition的分配問題,即確定哪個partition由哪個consumer來消費(fèi)。Kafka提供了3種消費(fèi)者分區(qū)分配策略:RangeAssigor、RoundRobinAssignor、StickyAssignor。
PartitionAssignor接口用于用戶定義實現(xiàn)分區(qū)分配算法,以實現(xiàn)Consumer之間的分區(qū)分配。消費(fèi)組的成員訂閱它們感興趣的Topic并將這種訂閱關(guān)系傳遞給作為訂閱組協(xié)調(diào)者的Broker。協(xié)調(diào)者選擇其中的一個消費(fèi)者來執(zhí)行這個消費(fèi)組的分區(qū)分配并將分配結(jié)果轉(zhuǎn)發(fā)給消費(fèi)組內(nèi)所有的消費(fèi)者。Kafka默認(rèn)采用RangeAssignor的分配算法。
2.3.2.1 RangeAssignor
RangeAssignor對每個Topic進(jìn)行獨(dú)立的分區(qū)分配。對于每一個Topic,首先對分區(qū)按照分區(qū)ID進(jìn)行排序,然后訂閱這個Topic的消費(fèi)組的消費(fèi)者再進(jìn)行排序,之后盡量均衡的將分區(qū)分配給消費(fèi)者。這里只能是盡量均衡,因為分區(qū)數(shù)可能無法被消費(fèi)者數(shù)量整除,那么有一些消費(fèi)者就會多分配到一些分區(qū)。分配示意圖如下:
分區(qū)分配的算法如下:
@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
Map<String, List<TopicPartition>> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet())
assignment.put(memberId, new ArrayList<TopicPartition>());
//for循環(huán)對訂閱的多個topic分別進(jìn)行處理
for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
String topic = topicEntry.getKey();
List<String> consumersForTopic = topicEntry.getValue();
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic == null)
continue;
//對消費(fèi)者進(jìn)行排序
Collections.sort(consumersForTopic);
//計算平均每個消費(fèi)者分配的分區(qū)數(shù)
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
//計算平均分配后多出的分區(qū)數(shù)
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
//計算第i個消費(fèi)者,分配分區(qū)的起始位置
int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
//計算第i個消費(fèi)者,分配到的分區(qū)數(shù)量
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
}
}
return assignment;
}
這種分配方式明顯的一個問題是隨著消費(fèi)者訂閱的Topic的數(shù)量的增加,不均衡的問題會越來越嚴(yán)重,比如上圖中4個分區(qū)3個消費(fèi)者的場景,C0會多分配一個分區(qū)。如果此時再訂閱一個分區(qū)數(shù)為4的Topic,那么C0又會比C1、C2多分配一個分區(qū),這樣C0總共就比C1、C2多分配兩個分區(qū)了,而且隨著Topic的增加,這個情況會越來越嚴(yán)重。分配結(jié)果:
訂閱2個Topic,每個Topic4個分區(qū),共3個Consumer
- C0:[T0P0,T0P1,T1P0,T1P1]
- C1:[T0P2,T1P2]
- C2:[T0P3,T1P3]
2.3.2.2 RoundRobinAssignor
RoundRobinAssignor的分配策略是將消費(fèi)組內(nèi)訂閱的所有Topic的分區(qū)及所有消費(fèi)者進(jìn)行排序后盡量均衡的分配(RangeAssignor是針對單個Topic的分區(qū)進(jìn)行排序分配的)。如果消費(fèi)組內(nèi),消費(fèi)者訂閱的Topic列表是相同的(每個消費(fèi)者都訂閱了相同的Topic),那么分配結(jié)果是盡量均衡的(消費(fèi)者之間分配到的分區(qū)數(shù)的差值不會超過1)。如果訂閱的Topic列表是不同的,那么分配結(jié)果是不保證“盡量均衡”的,因為某些消費(fèi)者不參與一些Topic的分配。
以上兩個topic的情況,相比于之前RangeAssignor的分配策略,可以使分區(qū)分配的更均衡。不過考慮這種情況,假設(shè)有三個消費(fèi)者分別為C0、C1、C2,有3個Topic T0、T1、T2,分別擁有1、2、3個分區(qū),并且C0訂閱T0,C1訂閱T0和T1,C2訂閱T0、T1、T2,那么RoundRobinAssignor的分配結(jié)果如下:
看上去分配已經(jīng)盡量的保證均衡了,不過可以發(fā)現(xiàn)C2承擔(dān)了4個分區(qū)的消費(fèi)而C1訂閱了T1,是不是把T1P1交給C1消費(fèi)能更加的均衡呢?
2.3.2.3 StickyAssignor
StickyAssignor分區(qū)分配算法,目的是在執(zhí)行一次新的分配時,能在上一次分配的結(jié)果的基礎(chǔ)上,盡量少的調(diào)整分區(qū)分配的變動,節(jié)省因分區(qū)分配變化帶來的開銷。Sticky是“粘性的”,可以理解為分配結(jié)果是帶“粘性的”——每一次分配變更相對上一次分配做最少的變動。其目標(biāo)有兩點:
- 分區(qū)的分配盡量的均衡。
- 每一次重分配的結(jié)果盡量與上一次分配結(jié)果保持一致。
當(dāng)這兩個目標(biāo)發(fā)生沖突時,優(yōu)先保證第一個目標(biāo)。第一個目標(biāo)是每個分配算法都盡量嘗試去完成的,而第二個目標(biāo)才真正體現(xiàn)出StickyAssignor特性的。
StickyAssignor算法比較復(fù)雜,下面舉例來說明分配的效果(對比RoundRobinAssignor),前提條件:
- 有4個Topic:T0、T1、T2、T3,每個Topic有2個分區(qū)。
- 有3個Consumer:C0、C1、C2,所有Consumer都訂閱了這4個分區(qū)。
上面紅色的箭頭代表的是有變動的分區(qū)分配,可以看出,StickyAssignor的分配策略,變動較小。
2.3.3 offset的維護(hù)
由于Consumer在消費(fèi)過程中可能會出現(xiàn)斷電宕機(jī)等故障,Consumer恢復(fù)后,需要從故障前的位置繼續(xù)消費(fèi),所以Consumer需要實時記錄自己消費(fèi)到哪個位置,以便故障恢復(fù)后繼續(xù)消費(fèi)。Kafka0.9版本之前,Consumer默認(rèn)將offset保存在zookeeper中,從0.9版本開始,Consumer默認(rèn)將offset保存在Kafka一個內(nèi)置的名字叫_consumeroffsets的topic中。默認(rèn)是無法讀取的,可以通過設(shè)置consumer.properties中的exclude.internal.topics=false來讀取。
2.3.4 kafka高效讀寫數(shù)據(jù)(了解)
順序?qū)懘疟P
Kafka 的 producer生產(chǎn)數(shù)據(jù),要寫入到log文件中,寫的過程是一直追加到文件末端,為順序?qū)?。?shù)據(jù)表明,同樣的磁盤,順序?qū)懩艿?00M/s,而隨機(jī)寫只有100K/s。這與磁盤的機(jī)械結(jié)構(gòu)有關(guān),順序?qū)懼钥欤且驗槠涫∪チ舜罅看蓬^尋址的時間。
零拷貝技術(shù)
零拷貝主要的任務(wù)就是避免CPU將數(shù)據(jù)從一塊存儲拷貝到另外一塊存儲,主要就是利用各種零拷貝技術(shù),避免讓CPU做大量的數(shù)據(jù)拷貝任務(wù),減少不必要的拷貝,或者讓別的組件來做這一類簡單的數(shù)據(jù)傳輸任務(wù),讓CPU解脫出來專注于別的任務(wù)。這樣就可以讓系統(tǒng)資源的利用更加有效。