探秘 Kafka 的內(nèi)部機制原理
我是碼哥,可以叫我靚仔,愿大家擁抱硬核技術(shù)和對象,面向人民幣編程。
簡介
kafka是一個分布式消息隊列。具有高性能、持久化、多副本備份、橫向擴展能力。生產(chǎn)者往隊列里寫消息,消費者從隊列里取消息進行業(yè)務(wù)邏輯。一般在架構(gòu)設(shè)計中起到解耦、削峰、異步處理的作用。
kafka對外使用topic的概念,生產(chǎn)者往topic里寫消息,消費者從讀消息。為了做到水平擴展,一個topic實際是由多個partition組成的,遇到瓶頸時,可以通過增加partition的數(shù)量來進行橫向擴容。單個parition內(nèi)是保證消息有序。
每新寫一條消息,kafka就是在對應(yīng)的文件append寫,所以性能非常高。
kafka的總體數(shù)據(jù)流是這樣的:
kafka data flow
大概用法就是,Producers往Brokers里面的指定Topic中寫消息,Consumers從Brokers里面拉去指定Topic的消息,然后進行業(yè)務(wù)處理。圖中有兩個topic,topic 0有兩個partition,topic 1有一個partition,三副本備份??梢钥吹絚onsumer gourp 1中的consumer 2沒有分到partition處理,這是有可能出現(xiàn)的,下面會講到。
關(guān)于broker、topics、partitions的一些元信息用zk來存,監(jiān)控和路由啥的也都會用到zk。
生產(chǎn)
基本流程是這樣的:
kafka sdk product flow.png
創(chuàng)建一條記錄,記錄中一個要指定對應(yīng)的topic和value,key和partition可選。先序列化,然后按照topic和partition,放進對應(yīng)的發(fā)送隊列中。kafka produce都是批量請求,會積攢一批,然后一起發(fā)送,不是調(diào)send()就進行立刻進行網(wǎng)絡(luò)發(fā)包。如果partition沒填,那么情況會是這樣的:
key有填 按照key進行哈希,相同key去一個partition。(如果擴展了partition的數(shù)量那么就不能保證了)
key沒填 round-robin來選partition
這些要發(fā)往同一個partition的請求按照配置,攢一波,然后由一個單獨的線程一次性發(fā)過去。
API
有high level api,替我們把很多事情都干了,offset,路由啥都替我們干了,用以來很簡單。還有simple api,offset啥的都是要我們自己記錄。
partition
當存在多副本的情況下,會盡量把多個副本,分配到不同的broker上。kafka會為partition選出一個leader,之后所有該partition的請求,實際操作的都是leader,然后再同步到其他的follower。 當一個broker歇菜后,所有l(wèi)eader在該broker上的partition都會重新選舉,選出一個leader。(這里不像分布式文件存儲系統(tǒng)那樣會自動進行復制保持副本數(shù))
然后這里就涉及兩個細節(jié):怎么分配partition,怎么選leader。
關(guān)于partition的分配,還有l(wèi)eader的選舉,總得有個執(zhí)行者。在kafka中,這個執(zhí)行者就叫controller。 kafka使用zk在broker中選出一個controller,用于partition分配和leader選舉。
partition的分配
將所有Broker(假設(shè)共n個Broker)和待分配的Partition排序
將第i個Partition分配到第(i mod n)個Broker上 (這個就是leader)
將第i個Partition的第j個Replica分配到第((i + j) mode n)個Broker上
leader容災
controller會在Zookeeper的/brokers/ids節(jié)點上注冊Watch,一旦有broker宕機,它就能知道。當broker宕機后,controller就會給受到影響的partition選出新leader。controller從zk的/brokers/topics/[topic]/partitions/[partition]/state中,讀取對應(yīng)partition的ISR(in-sync replica已同步的副本)列表,選一個出來做leader。選出leader后,更新zk,然后發(fā)送LeaderAndISRRequest給受影響的broker,讓它們改變知道這事。為什么這里不是使用zk通知,而是直接給broker發(fā)送rpc請求,我的理解可能是這樣做zk有性能問題吧。
如果ISR列表是空,那么會根據(jù)配置,隨便選一個replica做leader,或者干脆這個partition就是歇菜。如果ISR列表的有機器,但是也歇菜了,那么還可以等ISR的機器活過來。
多副本同步
這里的策略,服務(wù)端這邊的處理是follower從leader批量拉取數(shù)據(jù)來同步。但是具體的可靠性,是由生產(chǎn)者來決定的。生產(chǎn)者生產(chǎn)消息的時候,通過request.required.acks參數(shù)來設(shè)置數(shù)據(jù)的可靠性。
acks | what happen |
0 | which means that the producer never waits for an acknowledgement from the broker.發(fā)過去就完事了,不關(guān)心broker是否處理成功,可能丟數(shù)據(jù)。 |
1 | which means that the producer gets an acknowledgement after the leader replica has received the data. 當寫Leader成功后就返回,其他的replica都是通過fetcher去同步的,所以kafka是異步寫,主備切換可能丟數(shù)據(jù)。 |
-1 | which means that the producer gets an acknowledgement after all in-sync replicas have received the data. 要等到isr里所有機器同步成功,才能返回成功,延時取決于最慢的機器。強一致,不會丟數(shù)據(jù)。 |
在acks=-1的時候,如果ISR少于min.insync.replicas指定的數(shù)目,那么就會返回不可用。
這里ISR列表中的機器是會變化的,根據(jù)配置replica.lag.time.max.ms,多久沒同步,就會從ISR列表中剔除。以前還有根據(jù)落后多少條消息就踢出ISR,在1.0版本后就去掉了,因為這個值很難取,在高峰的時候很容易出現(xiàn)節(jié)點不斷的進出ISR列表。
從ISA中選出leader后,follower會從把自己日志中上一個高水位后面的記錄去掉,然后去和leader拿新的數(shù)據(jù)。因為新的leader選出來后,follower上面的數(shù)據(jù),可能比新leader多,所以要截取。這里高水位的意思,對于partition和leader,就是所有ISR中都有的最新一條記錄。消費者最多只能讀到高水位;
從leader的角度來說高水位的更新會延遲一輪,例如寫入了一條新消息,ISR中的broker都fetch到了,但是ISR中的broker只有在下一輪的fetch中才能告訴leader。
也正是由于這個高水位延遲一輪,在一些情況下,kafka會出現(xiàn)丟數(shù)據(jù)和主備數(shù)據(jù)不一致的情況,0.11開始,使用leader epoch來代替高水位。(https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation#KIP-101-AlterReplicationProtocoltouseLeaderEpochratherthanHighWatermarkforTruncation-Scenario1:HighWatermarkTruncationfollowedbyImmediateLeaderElection)
思考:當acks=-1時
是follwers都來fetch就返回成功,還是等follwers第二輪fetch?
leader已經(jīng)寫入本地,但是ISR中有些機器失敗,那么怎么處理呢?
消費
訂閱topic是以一個消費組來訂閱的,一個消費組里面可以有多個消費者。同一個消費組中的兩個消費者,不會同時消費一個partition。換句話來說,就是一個partition,只能被消費組里的一個消費者消費 ,但是可以同時被多個消費組消費。因此,如果消費組內(nèi)的消費者如果比partition多的話,那么就會有個別消費者一直空閑。
API
訂閱topic時,可以用正則表達式,如果有新topic匹配上,那能自動訂閱上。
offset的保存
一個消費組消費partition,需要保存offset記錄消費到哪,以前保存在zk中,由于zk的寫性能不好,以前的解決方法都是consumer每隔一分鐘上報一次。這里zk的性能嚴重影響了消費的速度,而且很容易出現(xiàn)重復消費。
在0.10版本后,kafka把這個offset的保存,從zk總剝離,保存在一個名叫__consumeroffsets topic的topic中。寫進消息的key由groupid、topic、partition組成,value是偏移量offset。topic配置的清理策略是compact。總是保留最新的key,其余刪掉。一般情況下,每個key的offset都是緩存在內(nèi)存中,查詢的時候不用遍歷partition,如果沒有緩存,第一次就會遍歷partition建立緩存,然后查詢返回。
確定consumer group位移信息寫入__consumers_offsets的哪個partition,具體計算公式:
__consumers_offsets partition =
Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
//groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默認是50個分區(qū)。
思考:如果正在跑的服務(wù),修改了offsets.topic.num.partitions,那么offset的保存是不是就亂套了?
分配partition--reblance
生產(chǎn)過程中broker要分配partition,消費過程這里,也要分配partition給消費者。類似broker中選了一個controller出來,消費也要從broker中選一個coordinator,用于分配partition。下面從頂向下,分別闡述一下
- 怎么選coordinator。
- 交互流程。
- reblance的流程。
選coordinator
- 看offset保存在那個partition
- 該partition leader所在的broker就是被選定的coordinator
這里我們可以看到,consumer group的coordinator,和保存consumer group offset的partition leader是同一臺機器。
交互流程
把coordinator選出來之后,就是要分配了 整個流程是這樣的:
- consumer啟動、或者coordinator宕機了,consumer會任意請求一個broker,發(fā)送ConsumerMetadataRequest請求,broker會按照上面說的方法,選出這個consumer對應(yīng)coordinator的地址。
- consumer 發(fā)送heartbeat請求給coordinator,返回IllegalGeneration的話,就說明consumer的信息是舊的了,需要重新加入進來,進行reblance。返回成功,那么consumer就從上次分配的partition中繼續(xù)執(zhí)行。
reblance流程
- consumer給coordinator發(fā)送JoinGroupRequest請求。
- 這時其他consumer發(fā)heartbeat請求過來時,coordinator會告訴他們,要reblance了。
- 其他consumer發(fā)送JoinGroupRequest請求。
- 所有記錄在冊的consumer都發(fā)了JoinGroupRequest請求之后,coordinator就會在這里consumer中隨便選一個leader。然后回JoinGroupRespone,這會告訴consumer你是follower還是leader,對于leader,還會把follower的信息帶給它,讓它根據(jù)這些信息去分配partition
5. consumer向coordinator發(fā)送SyncGroupRequest,其中l(wèi)eader的 SyncGroupRequest會包含分配的情況。6、coordinator回包,把分配的情況告訴consumer,包括leader。
當partition或者消費者的數(shù)量發(fā)生變化時,都得進行reblance。列舉一下會reblance的情況:
- 增加partition
- 增加消費者
- 消費者主動關(guān)閉
- 消費者宕機了
- coordinator自己也宕機了
消息投遞語義
kafka支持3種消息投遞語義 At most once:最多一次,消息可能會丟失,但不會重復 At least once:最少一次,消息不會丟失,可能會重復 Exactly once:只且一次,消息不丟失不重復,只且消費一次(0.11中實現(xiàn),僅限于下游也是kafka)
在業(yè)務(wù)中,常常都是使用At least once的模型,如果需要可重入的話,往往是業(yè)務(wù)自己實現(xiàn)。
At least once
先獲取數(shù)據(jù),再進行業(yè)務(wù)處理,業(yè)務(wù)處理成功后commit offset。1、生產(chǎn)者生產(chǎn)消息異常,消息是否成功寫入不確定,重做,可能寫入重復的消息 2、消費者處理消息,業(yè)務(wù)處理成功后,更新offset失敗,消費者重啟的話,會重復消費
At most once
先獲取數(shù)據(jù),再commit offset,最后進行業(yè)務(wù)處理。1、生產(chǎn)者生產(chǎn)消息異常,不管,生產(chǎn)下一個消息,消息就丟了 2、消費者處理消息,先更新offset,再做業(yè)務(wù)處理,做業(yè)務(wù)處理失敗,消費者重啟,消息就丟了
Exactly once
思路是這樣的,首先要保證消息不丟,再去保證不重復。所以盯著At least once的原因來搞。首先想出來的:
- 生產(chǎn)者重做導致重復寫入消息----生產(chǎn)保證冪等性
- 消費者重復消費---消滅重復消費,或者業(yè)務(wù)接口保證冪等性重復消費也沒問題
由于業(yè)務(wù)接口是否冪等,不是kafka能保證的,所以kafka這里提供的exactly once是有限制的,消費者的下游也必須是kafka。 所以一下討論的,沒特殊說明,消費者的下游系統(tǒng)都是kafka(注:使用kafka conector,它對部分系統(tǒng)做了適配,實現(xiàn)了exactly once)。
生產(chǎn)者冪等性好做,沒啥問題。
解決重復消費有兩個方法:
- 下游系統(tǒng)保證冪等性,重復消費也不會導致多條記錄。
- 把commit offset和業(yè)務(wù)處理綁定成一個事務(wù)。
本來exactly once實現(xiàn)第1點就ok了。
但是在一些使用場景下,我們的數(shù)據(jù)源可能是多個topic,處理后輸出到多個topic,這時我們會希望輸出時要么全部成功,要么全部失敗。這就需要實現(xiàn)事務(wù)性。 既然要做事務(wù),那么干脆把重復消費的問題從根源上解決,把commit offset和輸出到其他topic綁定成一個事務(wù)。
生產(chǎn)冪等性
思路是這樣的,為每個producer分配一個pid,作為該producer的唯一標識。producer會為每一個<topic,partition>維護一個單調(diào)遞增的seq。類似的,broker也會為每個<pid,topic,partition>記錄下最新的seq。當req_seq == broker_seq+1時,broker才會接受該消息。因為:
- 消息的seq比broker的seq大超過時,說明中間有數(shù)據(jù)還沒寫入,即亂序了。
- 消息的seq不比broker的seq小,那么說明該消息已被保存。
解決重復生產(chǎn)
事務(wù)性/原子性廣播
場景是這樣的:
- 先從多個源topic中獲取數(shù)據(jù)。
- 做業(yè)務(wù)處理,寫到下游的多個目的topic。
- 更新多個源topic的offset。
其中第2、3點作為一個事務(wù),要么全成功,要么全失敗。這里得益與offset實際上是用特殊的topic去保存,這兩點都歸一為寫多個topic的事務(wù)性處理。
基本思路是這樣的:引入tid(transaction id),和pid不同,這個id是應(yīng)用程序提供的,用于標識事務(wù),和producer是誰并沒關(guān)系。就是任何producer都可以使用這個tid去做事務(wù),這樣進行到一半就死掉的事務(wù),可以由另一個producer去恢復。
同時為了記錄事務(wù)的狀態(tài),類似對offset的處理,引入transaction coordinator用于記錄transaction log。在集群中會有多個transaction coordinator,每個tid對應(yīng)唯一一個transaction coordinator。注:transaction log刪除策略是compact,已完成的事務(wù)會標記成null,compact后不保留。
做事務(wù)時,先標記開啟事務(wù),寫入數(shù)據(jù),全部成功就在transaction log中記錄為prepare commit狀態(tài),否則寫入prepare abort的狀態(tài)。
之后再去給每個相關(guān)的partition寫入一條marker(commit或者abort)消息,標記這個事務(wù)的message可以被讀取或已經(jīng)廢棄。
成功后在transaction log記錄下commit/abort狀態(tài),至此事務(wù)結(jié)束。
數(shù)據(jù)流:
Kafka Transactions Data Flow.png
- 首先使用tid請求任意一個broker(代碼中寫的是負載最小的broker),找到對應(yīng)的transaction coordinator。
- 請求transaction coordinator獲取到對應(yīng)的pid,和pid對應(yīng)的epoch,這個epoch用于防止僵死進程復活導致消息錯亂,當消息的epoch比當前維護的epoch小時,拒絕掉。tid和pid有一一對應(yīng)的關(guān)系,這樣對于同一個tid會返回相同的pid。
- client先請求transaction coordinator記錄<topic,partition>的事務(wù)狀態(tài),初始狀態(tài)是BEGIN,如果是該事務(wù)中第一個到達的<topic,partition>,同時會對事務(wù)進行計時;client輸出數(shù)據(jù)到相關(guān)的partition中;client再請求transaction coordinator記錄offset的<topic,partition>事務(wù)狀態(tài);client發(fā)送offset commit到對應(yīng)offset partition。
- client發(fā)送commit請求,transaction coordinator記錄prepare commit/abort,然后發(fā)送marker給相關(guān)的partition。全部成功后,記錄commit/abort的狀態(tài),最后這個記錄不需要等待其他replica的ack,因為prepare不丟就能保證最終的正確性了。
這里prepare的狀態(tài)主要是用于事務(wù)恢復,例如給相關(guān)的partition發(fā)送控制消息,沒發(fā)完就宕機了,備機起來后,producer發(fā)送請求獲取pid時,會把未完成的事務(wù)接著完成。
當partition中寫入commit的marker后,相關(guān)的消息就可被讀取。所以kafka事務(wù)在prepare commit到commit這個時間段內(nèi),消息是逐漸可見的,而不是同一時刻可見。
詳細細節(jié)可看:https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-TransactionalGuarantees
消費事務(wù)
前面都是從生產(chǎn)的角度看待事務(wù)。還需要從消費的角度去考慮一些問題。消費時,partition中會存在一些消息處于未commit狀態(tài),即業(yè)務(wù)方應(yīng)該看不到的消息,需要過濾這些消息不讓業(yè)務(wù)看到,kafka選擇在消費者進程中進行過來,而不是在broker中過濾,主要考慮的還是性能。
kafka高性能的一個關(guān)鍵點是zero copy,如果需要在broker中過濾,那么勢必需要讀取消息內(nèi)容到內(nèi)存,就會失去zero copy的特性。
文件組織
kafka的數(shù)據(jù),實際上是以文件的形式存儲在文件系統(tǒng)的。topic下有partition,partition下有segment,segment是實際的一個個文件,topic和partition都是抽象概念。
在目錄/${topicName}-{$partitionid}/下,存儲著實際的log文件(即segment),還有對應(yīng)的索引文件。
每個segment文件大小相等,文件名以這個segment中最小的offset命名,文件擴展名是.log;segment對應(yīng)的索引的文件名字一樣,擴展名是.index。有兩個index文件,一個是offset index用于按offset去查message,一個是time index用于按照時間去查,其實這里可以優(yōu)化合到一起,下面只說offset index??傮w的組織是這樣的:
kafka 文件組織.png
為了減少索引文件的大小,降低空間使用,方便直接加載進內(nèi)存中,這里的索引使用稀疏矩陣,不會每一個message都記錄下具體位置,而是每隔一定的字節(jié)數(shù),再建立一條索引。索引包含兩部分,分別是baseOffset,還有position。
baseOffset:意思是這條索引對應(yīng)segment文件中的第幾條message。這樣做方便使用數(shù)值壓縮算法來節(jié)省空間。例如kafka使用的是varint。
position:在segment中的絕對位置。
查找offset對應(yīng)的記錄時,會先用二分法,找出對應(yīng)的offset在哪個segment中,然后使用索引,在定位出offset在segment中的大概位置,再遍歷查找message。
常用配置項
broker配置
配置項 | 作用 |
broker.id | broker的唯一標識 |
auto.create.topics.auto | 設(shè)置成true,就是遇到?jīng)]有的topic自動創(chuàng)建topic。 |
log.dirs | log的目錄數(shù),目錄里面放partition,當生成新的partition時,會挑目錄里partition數(shù)最少的目錄放。 |
topic配置
配置項 | 作用 |
num.partitions | 新建一個topic,會有幾個partition。 |
log.retention.ms | 對應(yīng)的還有minutes,hours的單位。日志保留時間,因為刪除是文件維度而不是消息維度,看的是日志文件的mtime。 |
log.retention.bytes | partion最大的容量,超過就清理老的。注意這個是partion維度,就是說如果你的topic有8個partition,配置1G,那么平均分配下,topic理論最大值8G。 |
log.segment.bytes | 一個segment的大小。超過了就滾動。 |
log.segment.ms | 一個segment的打開時間,超過了就滾動。 |
message.max.bytes | message最大多大 |
關(guān)于日志清理,默認當前正在寫的日志,是怎么也不會清理掉的。還有0.10之前的版本,時間看的是日志文件的mtime,但這個指是不準確的,有可能文件被touch一下,mtime就變了。
因此在0.10版本開始,改為使用該文件最新一條消息的時間來判斷。按大小清理這里也要注意,Kafka在定時任務(wù)中嘗試比較當前日志量總大小是否超過閾值至少一個日志段的大小。如果超過但是沒超過一個日志段,那么就不會刪除。