自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

深度好文! 圖解Kafka Producer 內(nèi)存池架構設計

開發(fā) 架構
kafka為了提高Producer客戶端的發(fā)送吞吐量和提高性能,選擇了將消息暫時緩存起來,等到滿足一定的條件, 再進行批量發(fā)送, 這樣可以減少網(wǎng)絡請求,提高吞吐量。

在閱讀本文之前, 希望你可以思考一下下面幾個問題, 帶著問題去閱讀文章會獲得更好的效果。

  1. 發(fā)送消息的時候, 當Broker掛掉了,消息體還能寫入到消息緩存中嗎?
  2. 當消息還存儲在緩存中的時候, 假如Producer客戶端掛掉了,消息是不是就丟失了?
  3. 當最新的ProducerBatch還有空余的內(nèi)存,但是接下來的一條消息很大,不足以加上上一個Batch中,會怎么辦呢?
  4. 那么創(chuàng)建ProducerBatch的時候,應該分配多少的內(nèi)存呢?

什么是消息累加器RecordAccumulator

kafka為了提高Producer客戶端的發(fā)送吞吐量和提高性能,選擇了將消息暫時緩存起來,等到滿足一定的條件, 再進行批量發(fā)送, 這樣可以減少網(wǎng)絡請求,提高吞吐量。

而緩存這個消息的就是RecordAccumulator類.

上圖就是整個消息存放的緩存模型,我們接下來一個個來講解。

消息緩存模型

上圖表示的就是 消息緩存的模型, 生產(chǎn)的消息就是暫時存放在這個里面。

  1. 每條消息,我們按照TopicPartition維度,把他們放在不同的Deque<ProducerBatch> 隊列里面。TopicPartition相同,會在相同Deque<ProducerBatch> 的里面。
  2. ProducerBatch : 表示同一個批次的消息, 消息真正發(fā)送到Broker端的時候都是按照批次來發(fā)送的, 這個批次可能包含一條或者多條消息。
  3. 如果沒有找到消息對應的ProducerBatch隊列, 則創(chuàng)建一個隊列。
  4. 找到ProducerBatch隊列隊尾的Batch,發(fā)現(xiàn)Batch還可以塞下這條消息,則將消息直接塞到這個Batch中
  5. 找到ProducerBatch隊列隊尾的Batch,發(fā)現(xiàn)Batch中剩余內(nèi)存,不夠塞下這條消息,則會創(chuàng)建新的Batch
  6. 當消息發(fā)送成功之后, Batch會被釋放掉。

ProducerBatch的內(nèi)存大小

那么創(chuàng)建ProducerBatch的時候,應該分配多少的內(nèi)存呢?

先說結論: 當消息預估內(nèi)存大于batch.size的時候,則按照消息預估內(nèi)存創(chuàng)建, 否則按照batch.size的大小創(chuàng)建(默認16k).

我們來看一段代碼,這段代碼就是在創(chuàng)建ProducerBatch的時候預估內(nèi)存的大小

RecordAccumulator#append

/**
* 公眾號: 石臻臻的雜貨鋪
* 微信:szzdzhp001
**/
// 找到 batch.size 和 這條消息在batch中的總內(nèi)存大小的 最大值
int size = Math.max(this.batchSizeAbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagiccompressionkeyvalueheaders));
// 申請內(nèi)存
buffer = free.allocate(sizemaxTimeToBlock);
  1. 假設當前生產(chǎn)了一條消息為M, 剛好消息M找不到可以存放消息的ProducerBatch(不存在或者滿了),那么這個時候就需要創(chuàng)建一個新的ProducerBatch了
  2. 預估消息的大小 跟batch.size 默認大小16384(16kb). 對比,取最大值用于申請的內(nèi)存大小的值。

那么, 這個消息的預估是如何預估的?純粹的是消息體的大小嗎?

DefaultRecordBatch#estimateBatchSizeUpperBound

預估需要的Batch大小,是一個預估值,因為沒有考慮壓縮算法從額外開銷

/**
* 使用給定的鍵和值獲取只有一條記錄的批次大小的上限。
* 這只是一個估計,因為它沒有考慮使用的壓縮算法的額外開銷。
**/
static int estimateBatchSizeUpperBound(ByteBuffer keyByteBuffer valueHeader[] headers) {
return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(keyvalueheaders);
}
  1. 預估這個消息M的大小 + 一個RECORD_BATCH_OVERHEAD的大小
  2. RECORD_BATCH_OVERHEAD是一個Batch里面的一些基本元信息,總共占用了 61B
  3. 消息M的大小也并不是單單的只有消息體的大小,總大小=(key,value,headers)的大小+MAX_RECORD_OVERHEAD
  4. MAX_RECORD_OVERHEAD :一條消息頭最大占用空間, 最大值為21B

也就是說創(chuàng)建一個ProducerBatch,最少就要83B .

比如我發(fā)送一條消息 " 1 " , 預估得到的大小是 86B, 跟batch.size(默認16384) 相比取最大值。那么申請內(nèi)存的時候取最大值 16384 。

關于Batch的結構和消息的結構,我們回頭單獨用一篇文章來講解。

內(nèi)存分配

我們都知道RecordAccumulator里面的緩存大小是一開始定義好的, 由buffer.memory控制, 默認33554432 (32M)

當生產(chǎn)的速度大于發(fā)送速度的時候,就可能出現(xiàn)Producer寫入阻塞。

而且頻繁的創(chuàng)建和釋放ProducerBatch,會導致頻繁GC, 所有kafka中有個緩存池的概念,這個緩存池會被重復使用,但是只有固定( batch.size)的大小才能夠使用緩存池。

PS:以下16k指得是 batch.size的默認值.

Batch的創(chuàng)建和釋放

1. 內(nèi)存16K 緩存池中有可用內(nèi)存

①. 創(chuàng)建Batch的時候, 會去緩存池中,獲取隊首的一塊內(nèi)存ByteBuffer 使用。

②. 消息發(fā)送完成,釋放Batch, 則會把這個ByteBuffer,放到緩存池的隊尾中,并且調(diào)用ByteBuffer.clear 清空數(shù)據(jù)。以便下次重復使用

2. 內(nèi)存16K 緩存池中無可用內(nèi)存

①. 創(chuàng)建Batch的時候, 去非緩存池中的內(nèi)存獲取一部分內(nèi)存用于創(chuàng)建Batch. 注意:這里說的獲取內(nèi)存給Batch, 其實就是讓 非緩存池nonPooledAvailableMemory 減少 16K 的內(nèi)存, 然后Batch正常創(chuàng)建就行了, 不要誤以為好像真的發(fā)生了內(nèi)存的轉(zhuǎn)移。

②. 消息發(fā)送完成,釋放Batch, 則會把這個ByteBuffer,放到緩存池的隊尾中,并且調(diào)用ByteBuffer.clear 清空數(shù)據(jù), 以便下次重復使用

3. 內(nèi)存非16K 非緩存池中內(nèi)存夠用

①. 創(chuàng)建Batch的時候, 去非緩存池(nonPooledAvailableMemory)內(nèi)存獲取一部分內(nèi)存用于創(chuàng)建Batch. 注意:這里說的獲取內(nèi)存給Batch, 其實就是讓 非緩存池(nonPooledAvailableMemory) 減少對應的內(nèi)存, 然后Batch正常創(chuàng)建就行了, 不要誤以為好像真的發(fā)生了內(nèi)存的轉(zhuǎn)移。

②. 消息發(fā)送完成,釋放Batch, 純粹的是在非緩存池(nonPooledAvailableMemory)中加上剛剛釋放的Batch內(nèi)存大小。當然這個Batch會被GC掉

4. 內(nèi)存非16K  非緩存池內(nèi)存不夠用

①. 先嘗試將 緩存池中的內(nèi)存一個一個釋放到 非緩存池中, 直到非緩存池中的內(nèi)存夠用與創(chuàng)建Batch了

②. 創(chuàng)建Batch的時候, 去非緩存池(nonPooledAvailableMemory)內(nèi)存獲取一部分內(nèi)存用于創(chuàng)建Batch. 注意:這里說的獲取內(nèi)存給Batch, 其實就是讓 非緩存池(nonPooledAvailableMemory) 減少對應的內(nèi)存, 然后Batch正常創(chuàng)建就行了, 不要誤以為好像真的發(fā)生了內(nèi)存的轉(zhuǎn)移。

③. 消息發(fā)送完成,釋放Batch, 純粹的是在非緩存池(nonPooledAvailableMemory)中加上剛剛釋放的Batch內(nèi)存大小。當然這個Batch會被GC掉

例如: 下面我們需要創(chuàng)建 48k的batch, 因為超過了16k,所以需要在非緩存池中分配內(nèi)存, 但是非緩存池中當前可用內(nèi)存為0 , 分配不了, 這個時候就會嘗試去 緩存池里面釋放一部分內(nèi)存到 非緩存池。

釋放第一個ByteBuffer(16k) 不夠,則繼續(xù)釋放第二個,直到釋放了3個之后總共48k,發(fā)現(xiàn)內(nèi)存這時候夠了, 再去創(chuàng)建Batch。

注意:這里我們涉及到的 非緩存池中的內(nèi)存分配, 僅僅指的的內(nèi)存數(shù)字的增加和減少。

問題和答案

  1. 發(fā)送消息的時候, 當Broker掛掉了,消息體還能寫入到消息緩存中嗎?

當Broker掛掉了,Producer會提示下面的警告??,  但是發(fā)送消息過程中

這個消息體還是可以寫入到 消息緩存中的,也僅僅是寫到到緩存中而已。

WARN [Producer clientId=console-producer] Connection to node 0 (/172.23.164.192:9090) could not be established. Broker may not be available

  1. 當最新的ProducerBatch還有空余的內(nèi)存,但是接下來的一條消息很大,不足以加上上一個Batch中,會怎么辦呢?

那么會創(chuàng)建新的ProducerBatch。

  1. 那么創(chuàng)建ProducerBatch的時候,應該分配多少的內(nèi)存呢?

觸發(fā)創(chuàng)建ProducerBatch的那條消息預估大小大于batch.size ,則以預估內(nèi)存創(chuàng)建。否則,以batch.size創(chuàng)建。

還有一個問題供大家思考:

當消息還存儲在緩存中的時候, 假如Producer客戶端掛掉了,消息是不是就丟失了?

責任編輯:張燕妮 來源: 石臻臻的雜貨鋪
相關推薦

2022-02-23 15:08:18

開發(fā)分布式Java

2023-02-22 08:12:30

KafkaSender 線程

2023-03-15 08:17:27

Kafka網(wǎng)絡通信組件

2023-12-26 08:16:56

Kafka緩存架構客戶端

2012-05-11 10:38:15

Cloud Found

2021-11-01 17:17:13

Kafka 高并發(fā)場景

2024-03-14 08:33:13

kafka三高架構Zookeeper

2024-08-23 16:04:45

2024-10-30 10:06:51

2021-12-07 07:32:09

kafka架構原理

2022-11-07 09:25:02

Kafka存儲架構

2025-01-15 08:10:29

Java架構代碼

2022-08-07 13:06:43

NGINX服務器

2013-05-27 10:58:28

Tumblr架構設計雅虎收購

2023-07-03 17:15:12

系統(tǒng)架構設計

2021-04-09 08:54:14

Kafka源碼架構開發(fā)技術

2021-06-09 10:29:23

Kafka架構組件

2015-06-02 04:17:44

架構設計審架構設計說明書

2025-04-15 04:00:00

2023-07-05 08:00:52

MetrAuto系統(tǒng)架構
點贊
收藏

51CTO技術棧公眾號