自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

kafka高性能原理分析,你看懂了嗎?

存儲(chǔ) 存儲(chǔ)架構(gòu)
機(jī)械結(jié)構(gòu)的磁盤,如果把消息以隨機(jī)的方式寫入到磁盤,那么磁盤首先要做的就是 尋址,也就是定位到數(shù)據(jù)所在的物理地址,在磁盤上就要 找到對(duì)應(yīng)的柱面、磁頭以及對(duì)應(yīng)的扇區(qū);這個(gè)過(guò)程相對(duì)內(nèi) 存來(lái)說(shuō)會(huì)消耗大量時(shí)間,為了規(guī)避隨機(jī)讀寫帶來(lái)的時(shí)間消 耗,kafka采用順序?qū)懙姆绞酱鎯?chǔ)數(shù)據(jù)來(lái)避免這個(gè)過(guò)程。

一、消費(fèi)者消費(fèi)消息offset存儲(chǔ)

kafka的所有消息都是持久化存儲(chǔ)在broker上的,消費(fèi)者每次消費(fèi)消息是如何知道獲取哪一條呢?kafka提供一個(gè)專門的tipic存儲(chǔ)每個(gè)consumer group的消費(fèi)消息的offset,offset保證消息在分區(qū)內(nèi)部有序,所以每次消費(fèi)者都可以知道自己要從哪一條消息開始消費(fèi)。__consumer_offsets_* 的一個(gè)topic ,把 offset 信 息 寫 入 到 這 個(gè) topic 中。__consumer_offsets 默認(rèn)有50 個(gè)分區(qū)。broker按照以下規(guī)則,存儲(chǔ)消費(fèi)者組的消費(fèi)offset到對(duì)應(yīng)的 __consumer_offsets分區(qū)文件中。

Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ; 默 認(rèn) 情 況 下groupMetadataTopicPartitionCount 有 50 個(gè)分區(qū),假如groupid=”KafkaConsumerDemo”,計(jì)算得到的結(jié)果為:35, 意味著當(dāng)前的consumer_group 的位移信息保存在__consumer_offsets 的第 35 個(gè)分區(qū),可以用命令格式化查看分區(qū)數(shù)據(jù)

kafka-simple-consumer-shell.sh –topic __consumer_offsets –partition 35 –broker-list 192.168.0.15:9092 –formatter “kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter

或者直接使用ui工具查看分區(qū)數(shù)據(jù)。

消費(fèi)消息的offset保存,是按照整個(gè)消費(fèi)者group來(lái)分配保存的,同一個(gè)group的消費(fèi)者offset保存在同一個(gè)__consumer_offsets分區(qū)。

二、消息持久化存儲(chǔ)

首先在kafka里面,消息都是需要持久化存儲(chǔ)的,不會(huì)分持久化和非持久化消息。存儲(chǔ)的方式是基于索引文件+內(nèi)容文件的方式來(lái)進(jìn)行存儲(chǔ)。下面看一下有關(guān)存儲(chǔ)的相關(guān)內(nèi)容。

消息存儲(chǔ)的路徑

首先我們知道,一個(gè)topic可以有多個(gè)分區(qū),然后多個(gè)分區(qū)按照取模算法分配到集群中的多個(gè)broker中。其次一個(gè)topic的每一個(gè)分區(qū)的消息都是分開存儲(chǔ)的,例如一個(gè)topic test,有三個(gè)分區(qū)。就會(huì)創(chuàng)建三個(gè)文件夾 test_0,test_1,test_2,去存儲(chǔ)消息,消息的結(jié)構(gòu)上面說(shuō)了,就是index+內(nèi)容的組合。例如有一個(gè)test3p的topic,在單個(gè)broker集群環(huán)境下,可以看到在dataDir的目錄下面生成了如下三個(gè)文件夾。

圖片

總的來(lái)說(shuō)消息按照不同分區(qū)來(lái)進(jìn)行存儲(chǔ)。

消息存儲(chǔ)機(jī)制詳細(xì)解析

在對(duì)應(yīng)的分區(qū)文件夾內(nèi)部是如何存儲(chǔ)消息的呢?

log.segment.bytes?

log.segment.bytes是配置文件里面的一個(gè)重要配置,當(dāng)內(nèi)容文件達(dá)到這個(gè)配置的字節(jié)數(shù)大小時(shí),消息存儲(chǔ)的內(nèi)容文件就會(huì)分隔,新增一個(gè)內(nèi)容文件來(lái)存儲(chǔ)內(nèi)容,新內(nèi)容文件的命名是上一個(gè)內(nèi)容文件存儲(chǔ)的最后一個(gè)offset命令。

圖片

上面這圖是我設(shè)置log.segment.bytes=10000,然后不停發(fā)送消息測(cè)試結(jié)果,我發(fā)送的消息內(nèi)容大小是固定的,可以看到大約是在經(jīng)過(guò)26000個(gè)offset左右就會(huì)新加一個(gè)log文件,同時(shí)會(huì)成對(duì)新增index,timindex文件。這個(gè)就是kafka的logSegment,消息文件分片,控制文件大小可以提高io性能。

每種存儲(chǔ)文件的作用

00000000000000000000.index?

這個(gè)就是一個(gè)索引文件,里面存儲(chǔ)對(duì)消息內(nèi)容文件的物理索引,可以快速定位消息內(nèi)容所在,內(nèi)容類似下面格式。

執(zhí)行命令查看。

kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafkalogs/test3p-0/00000000000000000000.index --print-datalog

offset: 48 position: 4128
offset: 96 position: 8256
offset: 144 position: 12373

上面就是查看結(jié)果,offset就是消息在分區(qū)內(nèi)部的offset,partition就是一個(gè)物理地址,用于索引內(nèi)容,可以看出這里的索引是屬于稀疏索引,并不是每個(gè)offset都存儲(chǔ)消息的物理地址。

00000000000000000000.log?

這個(gè)就是內(nèi)容文件,同樣可以使用上面使用的命令查看內(nèi)容,截取部分結(jié)果如下。?

producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 0 payload: isAsyncSend48
offset: 151 position: 12968 CreateTime: 1534321675701 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: - payload: isAsyncSend45
offset: 152 position: 13053 CreateTime: 1534321675705 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: * payload: isAsyncSend42
offset: 153 position: 13138 CreateTime: 1534321675706 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: ' payload: isAsyncSend39
offset: 154 position: 13223 CreateTime: 1534321675706 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: $ payload: isAsyncSend36
offset: 155 position: 13308 CreateTime: 1534321675706 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: ! payload: isAsyncSend33
offset: 156 position: 13393 CreateTime: 1534321675707 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend30
offset: 157 position: 13478 CreateTime: 1534321675707 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: ayload: isAsyncSend27
offset: 158 position: 13563 CreateTime: 1534321675707 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend24
offset: 159 position: 13648 CreateTime: 1534321675707 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend21
offset: 160 position: 13733 CreateTime: 1534321675708 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend18
offset: 161 position: 13818 CreateTime: 1534321675708 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend15
offset: 162 position: 13903 CreateTime: 1534321675708 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key:
payload: isAsyncSend12
offset: 163 position: 13988 CreateTime: 1534321675708 isvalid: true keysize: 4 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend9
offset: 164 position: 14072 CreateTime: 1534321675709 isvalid: true keysize: 4 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend6
offset: 165 position: 14156 CreateTime: 1534321675709 isvalid: true keysize: 4 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend3

?

可以看出消息內(nèi)容文件存儲(chǔ)了offset、position和payload等內(nèi)容,通過(guò)索引就可以快速定位到position位置,找到消息內(nèi)容。

實(shí)際的查找算法過(guò)程?

1.索引文件命名是有序的,因此使用二分查找的方式,可以快速查詢到消息對(duì)應(yīng)的索引文件

2.在對(duì)應(yīng)的索引文件中,由于使用的是稀疏索引,所以利用offset查找符合offset范圍的position。

3.得到position之后自然可以快速?gòu)膒osition位置開始查找對(duì)應(yīng)offset的消息,而不必從頭搜索

三、消息日志的清理與壓縮

消息清理?

消息日志的能夠分段存儲(chǔ),一方面能夠減少單個(gè)文件 內(nèi)容的大小,另一方面,方便kafka進(jìn)行日志清理。日志的 清理策略有兩個(gè)分別是按消息時(shí)間和topic消息大小來(lái)清理。

1. 根據(jù)消息的保留時(shí)間,當(dāng)消息在 kafka 中保存的時(shí)間超 過(guò)了指定的時(shí)間,就會(huì)觸發(fā)清理過(guò)程

2. 根據(jù)topic存儲(chǔ)的數(shù)據(jù)大小,當(dāng)topic所占的日志文件大 小大于一定的閥值,則可以開始刪除最舊的消息。kafka 會(huì)啟動(dòng)一個(gè)后臺(tái)線程,定期檢查是否存在可以刪除的消 息 通過(guò) log.retention.bytes 和 log.retention.hours 這兩個(gè)參 數(shù)來(lái)設(shè)置,當(dāng)其中任意一個(gè)達(dá)到要求,都會(huì)執(zhí)行刪除。默認(rèn)的保留時(shí)間是:7天

消息壓縮?

Kafka 還提供了“日志壓縮(Log Compaction)”功能,通過(guò)這個(gè)功能可以有效的減少日志文件的大小,緩解磁盤緊 張的情況,在很多實(shí)際場(chǎng)景中,消息的 key 和 value 的值 之間的對(duì)應(yīng)關(guān)系是不斷變化的,就像數(shù)據(jù)庫(kù)中的數(shù)據(jù)會(huì)不 斷被修改一樣,消費(fèi)者只關(guān)心key對(duì)應(yīng)的最新的value。因 此,我們可以開啟 kafka 的日志壓縮功能,服務(wù)端會(huì)在后 臺(tái)啟動(dòng)啟動(dòng)Cleaner線程池,定期將相同的key進(jìn)行合并, 只保留最新的value值。

四、kafka高性能io

機(jī)械結(jié)構(gòu)的磁盤,如果把消息以隨機(jī)的方式寫入到磁盤,那么磁盤首先要做的就是 尋址,也就是定位到數(shù)據(jù)所在的物理地址,在磁盤上就要 找到對(duì)應(yīng)的柱面、磁頭以及對(duì)應(yīng)的扇區(qū);這個(gè)過(guò)程相對(duì)內(nèi) 存來(lái)說(shuō)會(huì)消耗大量時(shí)間,為了規(guī)避隨機(jī)讀寫帶來(lái)的時(shí)間消 耗,kafka采用順序?qū)懙姆绞酱鎯?chǔ)數(shù)據(jù)來(lái)避免這個(gè)過(guò)程。

但是 頻繁的 I/O 操作仍然會(huì)造成磁盤的性能瓶頸,所以 kafka 還有一個(gè)重要的性能策略,零拷貝。

如果不使用零拷貝技術(shù),要把數(shù)據(jù)從磁盤讀出并且發(fā)送到網(wǎng)卡需要進(jìn)行以下步驟:

  • 操作系統(tǒng)將數(shù)據(jù)從磁盤讀入到內(nèi)核空間的頁(yè)緩存
  • 應(yīng)用程序?qū)?shù)據(jù)從內(nèi)核空間讀入到用戶空間緩存中
  • 應(yīng)用程序?qū)?shù)據(jù)寫回到內(nèi)核空間到socket緩存中
  • 操作系統(tǒng)將數(shù)據(jù)從socket緩沖區(qū)復(fù)制到網(wǎng)卡緩沖區(qū),最后將數(shù)據(jù)經(jīng)網(wǎng)絡(luò)發(fā)出

這個(gè)過(guò)程涉及到4次上下文切換以及4次數(shù)據(jù)復(fù)制,并且有兩次復(fù)制操作是由 CPU 完成。但是這個(gè)過(guò)程中,數(shù)據(jù)完全沒有 進(jìn)行變化,僅僅是從磁盤復(fù)制到網(wǎng)卡緩沖區(qū)。

如果是零拷貝技術(shù)的話,,可以去掉這些沒必要的數(shù)據(jù)復(fù)制操作, 同時(shí)也會(huì)減少上下文切換次數(shù);現(xiàn)代的unix操作系統(tǒng)提供 一個(gè)優(yōu)化的代碼路徑,用于將數(shù)據(jù)直接從頁(yè)緩存?zhèn)鬏數(shù)絪ocket;在 Linux 中通過(guò) sendfile 系統(tǒng)調(diào)用來(lái)完成的。Java 提 供了訪問(wèn)這個(gè)系統(tǒng)調(diào)用的方法,F(xiàn)ileChannel.transferTo API ,這樣就可以直接跳過(guò)數(shù)據(jù)復(fù)制到用戶空間然后又從用戶控制復(fù)制到socket的過(guò)程。

責(zé)任編輯:武曉燕 來(lái)源: 碼蟲甲
相關(guān)推薦

2023-06-27 07:09:39

2024-08-12 12:30:27

2024-04-29 09:25:19

2022-11-28 07:10:57

2024-04-07 08:23:01

JS隔離JavaScript

2022-06-06 07:58:52

勒索軟件惡意軟件解密

2021-04-26 10:30:43

USB4設(shè)備Thunderbolt

2024-09-29 08:47:55

2024-05-17 09:44:49

Kubernetes均衡器Envoy

2024-03-05 18:19:07

localhostLinux數(shù)據(jù)庫(kù)

2022-03-18 00:17:30

NISTICS安全

2024-09-10 10:21:19

2018-01-04 00:10:52

物聯(lián)網(wǎng)技術(shù)信息

2022-06-15 08:00:50

磁盤RedisRocketMQ

2021-10-28 19:35:02

代碼main方法

2019-11-20 15:40:48

CPU軟件處理器

2025-04-02 08:21:10

2021-10-10 20:36:49

Android Root權(quán)限

2011-09-02 16:08:09

Sencha ToucAPI文檔

2011-06-14 12:56:55

SQL Server復(fù)災(zāi)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)