kafka高性能原理分析,你看懂了嗎?
一、消費(fèi)者消費(fèi)消息offset存儲(chǔ)
kafka的所有消息都是持久化存儲(chǔ)在broker上的,消費(fèi)者每次消費(fèi)消息是如何知道獲取哪一條呢?kafka提供一個(gè)專門的tipic存儲(chǔ)每個(gè)consumer group的消費(fèi)消息的offset,offset保證消息在分區(qū)內(nèi)部有序,所以每次消費(fèi)者都可以知道自己要從哪一條消息開始消費(fèi)。__consumer_offsets_* 的一個(gè)topic ,把 offset 信 息 寫 入 到 這 個(gè) topic 中。__consumer_offsets 默認(rèn)有50 個(gè)分區(qū)。broker按照以下規(guī)則,存儲(chǔ)消費(fèi)者組的消費(fèi)offset到對(duì)應(yīng)的 __consumer_offsets分區(qū)文件中。
Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ; 默 認(rèn) 情 況 下groupMetadataTopicPartitionCount 有 50 個(gè)分區(qū),假如groupid=”KafkaConsumerDemo”,計(jì)算得到的結(jié)果為:35, 意味著當(dāng)前的consumer_group 的位移信息保存在__consumer_offsets 的第 35 個(gè)分區(qū),可以用命令格式化查看分區(qū)數(shù)據(jù)
kafka-simple-consumer-shell.sh –topic __consumer_offsets –partition 35 –broker-list 192.168.0.15:9092 –formatter “kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter”
或者直接使用ui工具查看分區(qū)數(shù)據(jù)。
消費(fèi)消息的offset保存,是按照整個(gè)消費(fèi)者group來(lái)分配保存的,同一個(gè)group的消費(fèi)者offset保存在同一個(gè)__consumer_offsets分區(qū)。
二、消息持久化存儲(chǔ)
首先在kafka里面,消息都是需要持久化存儲(chǔ)的,不會(huì)分持久化和非持久化消息。存儲(chǔ)的方式是基于索引文件+內(nèi)容文件的方式來(lái)進(jìn)行存儲(chǔ)。下面看一下有關(guān)存儲(chǔ)的相關(guān)內(nèi)容。
消息存儲(chǔ)的路徑
首先我們知道,一個(gè)topic可以有多個(gè)分區(qū),然后多個(gè)分區(qū)按照取模算法分配到集群中的多個(gè)broker中。其次一個(gè)topic的每一個(gè)分區(qū)的消息都是分開存儲(chǔ)的,例如一個(gè)topic test,有三個(gè)分區(qū)。就會(huì)創(chuàng)建三個(gè)文件夾 test_0,test_1,test_2,去存儲(chǔ)消息,消息的結(jié)構(gòu)上面說(shuō)了,就是index+內(nèi)容的組合。例如有一個(gè)test3p的topic,在單個(gè)broker集群環(huán)境下,可以看到在dataDir的目錄下面生成了如下三個(gè)文件夾。
總的來(lái)說(shuō)消息按照不同分區(qū)來(lái)進(jìn)行存儲(chǔ)。
消息存儲(chǔ)機(jī)制詳細(xì)解析
在對(duì)應(yīng)的分區(qū)文件夾內(nèi)部是如何存儲(chǔ)消息的呢?
log.segment.bytes?
log.segment.bytes是配置文件里面的一個(gè)重要配置,當(dāng)內(nèi)容文件達(dá)到這個(gè)配置的字節(jié)數(shù)大小時(shí),消息存儲(chǔ)的內(nèi)容文件就會(huì)分隔,新增一個(gè)內(nèi)容文件來(lái)存儲(chǔ)內(nèi)容,新內(nèi)容文件的命名是上一個(gè)內(nèi)容文件存儲(chǔ)的最后一個(gè)offset命令。
上面這圖是我設(shè)置log.segment.bytes=10000,然后不停發(fā)送消息測(cè)試結(jié)果,我發(fā)送的消息內(nèi)容大小是固定的,可以看到大約是在經(jīng)過(guò)26000個(gè)offset左右就會(huì)新加一個(gè)log文件,同時(shí)會(huì)成對(duì)新增index,timindex文件。這個(gè)就是kafka的logSegment,消息文件分片,控制文件大小可以提高io性能。
每種存儲(chǔ)文件的作用
00000000000000000000.index?
這個(gè)就是一個(gè)索引文件,里面存儲(chǔ)對(duì)消息內(nèi)容文件的物理索引,可以快速定位消息內(nèi)容所在,內(nèi)容類似下面格式。
執(zhí)行命令查看。
kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafkalogs/test3p-0/00000000000000000000.index --print-datalog
offset: 48 position: 4128
offset: 96 position: 8256
offset: 144 position: 12373
上面就是查看結(jié)果,offset就是消息在分區(qū)內(nèi)部的offset,partition就是一個(gè)物理地址,用于索引內(nèi)容,可以看出這里的索引是屬于稀疏索引,并不是每個(gè)offset都存儲(chǔ)消息的物理地址。
00000000000000000000.log?
這個(gè)就是內(nèi)容文件,同樣可以使用上面使用的命令查看內(nèi)容,截取部分結(jié)果如下。?
producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 0 payload: isAsyncSend48
offset: 151 position: 12968 CreateTime: 1534321675701 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: - payload: isAsyncSend45
offset: 152 position: 13053 CreateTime: 1534321675705 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: * payload: isAsyncSend42
offset: 153 position: 13138 CreateTime: 1534321675706 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: ' payload: isAsyncSend39
offset: 154 position: 13223 CreateTime: 1534321675706 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: $ payload: isAsyncSend36
offset: 155 position: 13308 CreateTime: 1534321675706 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: ! payload: isAsyncSend33
offset: 156 position: 13393 CreateTime: 1534321675707 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend30
offset: 157 position: 13478 CreateTime: 1534321675707 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: ayload: isAsyncSend27
offset: 158 position: 13563 CreateTime: 1534321675707 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend24
offset: 159 position: 13648 CreateTime: 1534321675707 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend21
offset: 160 position: 13733 CreateTime: 1534321675708 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend18
offset: 161 position: 13818 CreateTime: 1534321675708 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend15
offset: 162 position: 13903 CreateTime: 1534321675708 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key:
payload: isAsyncSend12
offset: 163 position: 13988 CreateTime: 1534321675708 isvalid: true keysize: 4 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend9
offset: 164 position: 14072 CreateTime: 1534321675709 isvalid: true keysize: 4 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend6
offset: 165 position: 14156 CreateTime: 1534321675709 isvalid: true keysize: 4 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend3
?
可以看出消息內(nèi)容文件存儲(chǔ)了offset、position和payload等內(nèi)容,通過(guò)索引就可以快速定位到position位置,找到消息內(nèi)容。
實(shí)際的查找算法過(guò)程?
1.索引文件命名是有序的,因此使用二分查找的方式,可以快速查詢到消息對(duì)應(yīng)的索引文件
2.在對(duì)應(yīng)的索引文件中,由于使用的是稀疏索引,所以利用offset查找符合offset范圍的position。
3.得到position之后自然可以快速?gòu)膒osition位置開始查找對(duì)應(yīng)offset的消息,而不必從頭搜索
三、消息日志的清理與壓縮
消息清理?
消息日志的能夠分段存儲(chǔ),一方面能夠減少單個(gè)文件 內(nèi)容的大小,另一方面,方便kafka進(jìn)行日志清理。日志的 清理策略有兩個(gè)分別是按消息時(shí)間和topic消息大小來(lái)清理。
1. 根據(jù)消息的保留時(shí)間,當(dāng)消息在 kafka 中保存的時(shí)間超 過(guò)了指定的時(shí)間,就會(huì)觸發(fā)清理過(guò)程
2. 根據(jù)topic存儲(chǔ)的數(shù)據(jù)大小,當(dāng)topic所占的日志文件大 小大于一定的閥值,則可以開始刪除最舊的消息。kafka 會(huì)啟動(dòng)一個(gè)后臺(tái)線程,定期檢查是否存在可以刪除的消 息 通過(guò) log.retention.bytes 和 log.retention.hours 這兩個(gè)參 數(shù)來(lái)設(shè)置,當(dāng)其中任意一個(gè)達(dá)到要求,都會(huì)執(zhí)行刪除。默認(rèn)的保留時(shí)間是:7天
消息壓縮?
Kafka 還提供了“日志壓縮(Log Compaction)”功能,通過(guò)這個(gè)功能可以有效的減少日志文件的大小,緩解磁盤緊 張的情況,在很多實(shí)際場(chǎng)景中,消息的 key 和 value 的值 之間的對(duì)應(yīng)關(guān)系是不斷變化的,就像數(shù)據(jù)庫(kù)中的數(shù)據(jù)會(huì)不 斷被修改一樣,消費(fèi)者只關(guān)心key對(duì)應(yīng)的最新的value。因 此,我們可以開啟 kafka 的日志壓縮功能,服務(wù)端會(huì)在后 臺(tái)啟動(dòng)啟動(dòng)Cleaner線程池,定期將相同的key進(jìn)行合并, 只保留最新的value值。
四、kafka高性能io
機(jī)械結(jié)構(gòu)的磁盤,如果把消息以隨機(jī)的方式寫入到磁盤,那么磁盤首先要做的就是 尋址,也就是定位到數(shù)據(jù)所在的物理地址,在磁盤上就要 找到對(duì)應(yīng)的柱面、磁頭以及對(duì)應(yīng)的扇區(qū);這個(gè)過(guò)程相對(duì)內(nèi) 存來(lái)說(shuō)會(huì)消耗大量時(shí)間,為了規(guī)避隨機(jī)讀寫帶來(lái)的時(shí)間消 耗,kafka采用順序?qū)懙姆绞酱鎯?chǔ)數(shù)據(jù)來(lái)避免這個(gè)過(guò)程。
但是 頻繁的 I/O 操作仍然會(huì)造成磁盤的性能瓶頸,所以 kafka 還有一個(gè)重要的性能策略,零拷貝。
如果不使用零拷貝技術(shù),要把數(shù)據(jù)從磁盤讀出并且發(fā)送到網(wǎng)卡需要進(jìn)行以下步驟:
- 操作系統(tǒng)將數(shù)據(jù)從磁盤讀入到內(nèi)核空間的頁(yè)緩存
- 應(yīng)用程序?qū)?shù)據(jù)從內(nèi)核空間讀入到用戶空間緩存中
- 應(yīng)用程序?qū)?shù)據(jù)寫回到內(nèi)核空間到socket緩存中
- 操作系統(tǒng)將數(shù)據(jù)從socket緩沖區(qū)復(fù)制到網(wǎng)卡緩沖區(qū),最后將數(shù)據(jù)經(jīng)網(wǎng)絡(luò)發(fā)出
這個(gè)過(guò)程涉及到4次上下文切換以及4次數(shù)據(jù)復(fù)制,并且有兩次復(fù)制操作是由 CPU 完成。但是這個(gè)過(guò)程中,數(shù)據(jù)完全沒有 進(jìn)行變化,僅僅是從磁盤復(fù)制到網(wǎng)卡緩沖區(qū)。
如果是零拷貝技術(shù)的話,,可以去掉這些沒必要的數(shù)據(jù)復(fù)制操作, 同時(shí)也會(huì)減少上下文切換次數(shù);現(xiàn)代的unix操作系統(tǒng)提供 一個(gè)優(yōu)化的代碼路徑,用于將數(shù)據(jù)直接從頁(yè)緩存?zhèn)鬏數(shù)絪ocket;在 Linux 中通過(guò) sendfile 系統(tǒng)調(diào)用來(lái)完成的。Java 提 供了訪問(wèn)這個(gè)系統(tǒng)調(diào)用的方法,F(xiàn)ileChannel.transferTo API ,這樣就可以直接跳過(guò)數(shù)據(jù)復(fù)制到用戶空間然后又從用戶控制復(fù)制到socket的過(guò)程。