面試官:你對Kafka比較熟? 那說說kafka日志段如何讀寫的吧?
之所以寫這篇文章是因為之前面試時候被面試官問到(倒)了,面試官說:“你說你對Kafka比較熟?看過源碼? 那說說kafka日志段如何讀寫的吧?”
我心里默默的說了句 “擦…我說看過一點點源碼,不是億點點。早知道不提這句了!”,那怎么辦呢,只能回家等通知了啊。
但是為了以后找回場子,咱也不能坐以待斃,日拱一卒從一點點到億點點。今天我們就來看看源碼層面來Kafka日志段的是如何讀寫的。
Kafka的存儲結(jié)構(gòu)
總所周知,Kafka的Topic可以有多個分區(qū),分區(qū)其實就是最小的讀取和存儲結(jié)構(gòu),即Consumer看似訂閱的是Topic,實則是從Topic下的某個分區(qū)獲得消息,Producer也是發(fā)送消息也是如此。
topic-partition關(guān)系
上圖是總體邏輯上的關(guān)系,映射到實際代碼中在磁盤上的關(guān)系則是如下圖所示:
每個分區(qū)對應(yīng)一個Log對象,在磁盤中就是一個子目錄,子目錄下面會有多組日志段即多Log Segment,每組日志段包含:消息日志文件(以log結(jié)尾)、位移索引文件(以index結(jié)尾)、時間戳索引文件(以timeindex結(jié)尾)。其實還有其它后綴的文件,例如.txnindex、.deleted等等。篇幅有限,暫不提起。
以下為日志的定義
以下為日志段的定義
indexIntervalBytes可以理解為插了多少消息之后再建一個索引,由此可以看出Kafka的索引其實是稀疏索引,這樣可以避免索引文件占用過多的內(nèi)存,從而可以在內(nèi)存中保存更多的索引。對應(yīng)的就是Broker 端參數(shù)log.index.interval.bytes 值,默認(rèn)4KB。
實際的通過索引查找消息過程是先通過offset找到索引所在的文件,然后通過二分法找到離目標(biāo)最近的索引,再順序遍歷消息文件找到目標(biāo)文件。這波操作時間復(fù)雜度為O(log2n)+O(m),n是索引文件里索引的個數(shù),m為稀疏程度。
這就是空間和時間的互換,又經(jīng)過數(shù)據(jù)結(jié)構(gòu)與算法的平衡,妙啊!
再說下rollJitterMs,這其實是個擾動值,對應(yīng)的參數(shù)是log.roll.jitter.ms,這其實就要說到日志段的切分了,log.segment.bytes,這個參數(shù)控制著日志段文件的大小,默認(rèn)是1G,即當(dāng)文件存儲超過1G之后就新起一個文件寫入。這是以大小為維度的,還有一個參數(shù)是log.segment.ms,以時間為維度切分。
那配置了這個參數(shù)之后如果有很多很多分區(qū),然后因為這個參數(shù)是全局的,因此同一時刻需要做很多文件的切分,這磁盤IO就頂不住了啊,因此需要設(shè)置個rollJitterMs,來岔開它們。
怎么樣有沒有聯(lián)想到redis緩存的過期時間?過期時間加個隨機(jī)數(shù),防止同一時刻大量緩存過期導(dǎo)致緩存擊穿數(shù)據(jù)庫。看看知識都是通的啊!
日志段的寫入
1、判斷下當(dāng)前日志段是否為空,空的話記錄下時間,來作為之后日志段的切分依據(jù)
2、確保位移值合法,最終調(diào)用的是AbstractIndex.toRelative(..)方法,即使判斷offset是否小于0,是否大于int最大值。
3、append消息,實際上就是通過FileChannel將消息寫入,當(dāng)然只是寫入內(nèi)存中及頁緩存,是否刷盤看配置。
4、更新日志段最大時間戳和最大時間戳對應(yīng)的位移值。這個時間戳其實用來作為定期刪除日志的依據(jù)
5、更新索引項,如果需要的話(bytesSinceLastIndexEntry > indexIntervalBytes)
最后再來個流程圖
消息寫入流程
日志段的讀取
1、根據(jù)第一條消息的offset,通過OffsetIndex找到對應(yīng)的消息所在的物理位置和大小。
2、獲取LogOffsetMetadata,元數(shù)據(jù)包含消息的offset、消息所在segment的起始o(jì)ffset和物理位置
3、判斷minOneMessage是否為true,若是則調(diào)整為必定返回一條消息大小,其實就是在單條消息大于maxSize的情況下得以返回,防止消費者餓死
4、再計算最大的fetchSize,即(最大物理位移-此消息起始物理位移)和adjustedMaxSize的最小值(這波我不是很懂,因為以上一波操作adjustedMaxSize已經(jīng)最小為一條消息的大小了)
5、調(diào)用 FileRecords 的 slice 方法從指定位置讀取指定大小的消息集合,并且構(gòu)造FetchDataInfo返回
再來個流程圖:
消息讀取流程
小結(jié)
從哪里跌倒就從哪里爬起來對吧,這波操作下來咱也不怕下次遇到面試官問了。
區(qū)區(qū)源碼不過爾爾,哈哈哈哈(首先得要有氣勢)
實際上這只是Kafka源碼的冰山一角,長路漫漫。雖說Kafka Broker都是由Scala寫的,不過語言不是問題,這不看下來也沒什么難點,注釋也很豐富。遇到不知道的語法小查一下搞定。
所以強(qiáng)烈建議大家入手源碼,從源碼上理解。今天說的 append 和 read 是很核心的功能,但一看也并不復(fù)雜,所以不要被源碼這兩個字嚇到了。
看源碼可以讓我們深入的理解內(nèi)部的設(shè)計原理,精進(jìn)我們的代碼功力(經(jīng)??粗粗?,我擦還能這么寫)。當(dāng)然還有系統(tǒng)架構(gòu)能力。
然后對我而言最重要的是可以裝逼了(哈哈哈)。
情景劇
老白正目不轉(zhuǎn)睛盯著監(jiān)控大屏,“為什么?為什么Kafka Broker物理磁盤 I/O 負(fù)載突然這么高?”。寥寥無幾的秀發(fā)矗立在老白的頭上,顯得如此的無助。
“是不是設(shè)置了 log.segment.ms參數(shù) ?試試 log.roll.jitter.ms吧”,老白抬頭間我已走出了辦公室,留下了一個偉岸的背影和一顆锃亮的光頭!
“我變禿了,也變強(qiáng)了”