圖解 Kafka 源碼實(shí)現(xiàn)機(jī)制之客戶端緩存架構(gòu)設(shè)計(jì)
大家好,我是 華仔, 又跟大家見面了。
上篇主要帶大家深度剖析了「Kafka 網(wǎng)絡(luò)層收發(fā)總流程」,今天主要聊聊 「Kafka 客戶端消息緩存架構(gòu)設(shè)計(jì)」,深度剖析下消息是如何進(jìn)行緩存的。
認(rèn)真讀完這篇文章,我相信你會(huì)對(duì) Kafka 客戶端緩存架構(gòu)的源碼有更加深刻的理解。
這篇文章干貨很多,希望你可以耐心讀完。
一、總體概述
通過場(chǎng)景驅(qū)動(dòng)的方式,當(dāng)被發(fā)送消息通過網(wǎng)絡(luò)請(qǐng)求封裝、NIO多路復(fù)用器監(jiān)聽網(wǎng)絡(luò)讀寫事件并進(jìn)行消息網(wǎng)絡(luò)收發(fā)后,回頭來看看消息是如何在客戶端緩存的?
大家都知道 Kafka 是一款超高吞吐量的消息系統(tǒng),主要體現(xiàn)在「異步發(fā)送」、「批量發(fā)送」、「消息壓縮」。
跟本篇相關(guān)的是「批量發(fā)送」即生產(chǎn)者會(huì)將消息緩存起來,等滿足一定條件后,Sender 子線程再把消息批量發(fā)送給 Kafka Broker。
這樣好處就是「盡量減少網(wǎng)絡(luò)請(qǐng)求次數(shù),提升網(wǎng)絡(luò)吞吐量」。
為了方便大家理解,所有的源碼只保留骨干。
二、消息如何在客戶端緩存的
既然是批量發(fā)送,那么消息肯定要進(jìn)行緩存的,那消息被緩存在哪里呢?又是如何管理的?
通過下面簡(jiǎn)化流程圖可以看出,待發(fā)送消息主要被緩存在 RecordAccumulator 里。
我以一個(gè)真實(shí)生活場(chǎng)景類比解說一下會(huì)更好理解。
既然說 RecordAccumulator 像一個(gè)累積消息的倉庫,就拿快遞倉庫類比。
上圖是一個(gè)快遞倉庫,堆滿了貨物??梢钥吹椒謷T把不同目的地的包裹放入對(duì)應(yīng)目的地的貨箱,每裝滿一箱就放置在對(duì)應(yīng)的區(qū)域。
那么分揀員就是指 RecordAccumulator,而貨箱以及各自所屬的堆放區(qū)域,就是 RecordAccumulator 中緩存消息的地方。所有封箱的都會(huì)等待 sender 來取貨發(fā)送出去。
如果你看懂了上圖,就大概理解了 RecordAccumulator 的架構(gòu)設(shè)計(jì)和運(yùn)行邏輯。
總結(jié)下倉庫里有什么:
- 分揀員
- 貨物
- 目的地
- 貨箱
- 堆放區(qū)域
記住這些概念,都會(huì)體現(xiàn)在源碼里,流程如下圖所示:
從上面圖中可以看出:
- 至少有一個(gè)業(yè)務(wù)主線程和一個(gè) sender 線程同時(shí)操作 RecordAccumulator,所以它必須是線程安全的。
- 在它里面有一個(gè) ConcurrentMap 集合「Kafka 自定義的 CopyOnWriteMap」。key:TopicPartiton, value:Deque<ProducerBatch>,即以主題分區(qū)為單元,把消息以 ProducerBatch 為單位累積緩存,多個(gè) ProducerBatch 保存在 Deque 隊(duì)列中。當(dāng) Deque 中最新的 batch 不能容納消息時(shí),就會(huì)創(chuàng)建新的 batch 來繼續(xù)緩存,并將其加入 Deque。
- 通過 ProducerBatch 進(jìn)行緩存數(shù)據(jù),為了減少頻繁申請(qǐng)銷毀內(nèi)存造成 Full GC 問題,Kafka 設(shè)計(jì)了經(jīng)典的「緩存池 BufferPool 機(jī)制」。
綜上可以得出 RecordAccumulator 類中有三個(gè)重要的組件:「消息批次 ProducerBatch」、「自定義 CopyOnWriteMap」、「緩存池 BufferPool 機(jī)制」。
由于篇幅原因,RecordAccumulator 類放到下篇來講解。
先來看看 ProducerBatch,它是消息緩存及發(fā)送消息的最小單位。
github 源碼地址如下:
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java。
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java。
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java。
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java。
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java。
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java。
通過調(diào)用關(guān)系可以看出,ProducerBatch 依賴 MemoryRecordsBuilder,而 MemoryRecordsBuilder 依賴 MemoryRecords 構(gòu)建,所以 「MemoryRecords 才是真正用來保存消息的地方」。
1、MemoryRecords
import java.nio.ByteBuffer;
public class MemoryRecords extends AbstractRecords {
public static MemoryRecordsBuilder builder(..){
// 重載builder
return builder(...);
}
public static MemoryRecordsBuilder builder(
ByteBuffer buffer,
// 消息版本
byte magic,
// 消息壓縮類型
CompressionType compressionType,
// 時(shí)間戳
TimestampType timestampType,
// 基本位移
long baseOffset,
// 日志追加時(shí)間
long logAppendTime,
// 生產(chǎn)者id
long producerId,
// 生產(chǎn)者版本
short producerEpoch,
// 批次序列號(hào)
int baseSequence,
boolean isTransactional,
// 是否是控制類的批次
boolean isControlBatch,
// 分區(qū)leader的版本
int partitionLeaderEpoch) {
// 初始化MemoryRecordsBuilder類
return new MemoryRecordsBuilder(...);
}
}
該類比較簡(jiǎn)單,通過 builder 方法可以看出依賴 ByteBuffer 來存儲(chǔ)消息。MemoryRecordsBuilder 類的構(gòu)建是通過 MemoryRecords.builder() 來初始化的。
來看看 MemoryRecordsBuilder 類的實(shí)現(xiàn)。
2、MemoryRecordBuilder
public class MemoryRecordsBuilder implements AutoCloseable {
// 寫操作關(guān)閉的輸出流
private static final DataOutputStream CLOSED_STREAM = new DataOutputStream(new OutputStream() {
// 當(dāng)向某個(gè)ByteBuffer關(guān)閉輸出流寫數(shù)據(jù)時(shí)拋異常
public void write(int b) {
throw new ...;
}
});
// 日志時(shí)間
private final TimestampType timestampType;
// 消息壓縮類型
private final CompressionType compressionType;
// kafka對(duì)OutputStream接口的實(shí)現(xiàn)類,對(duì)ByteBuffer實(shí)現(xiàn)了自動(dòng)擴(kuò)容功能
private final ByteBufferOutputStream bufferStream;
// 消息的版本
private final byte magic;
// ByteBuffer的最初始位置
private final int initialPosition;
// 基本位移
private final long baseOffset;
// 消息追加的時(shí)間
private final long logAppendTime;
// 是否是控制類的批次
private final boolean isControlBatch;
// 分區(qū)leader的版本
private final int partitionLeaderEpoch;
// 寫入上限
private final int writeLimit;
// batch頭大小字節(jié)數(shù)
private final int batchHeaderSizeInBytes;
// 評(píng)估壓縮率
private float estimatedCompressionRatio = 1.0F;
// 對(duì)bufferStream添加壓縮功能
private DataOutputStream appendStream;
// 是否是事務(wù)批次
private boolean isTransactional;
// 生產(chǎn)者id
private long producerId;
// 生產(chǎn)者版本
private short producerEpoch;
// 批次序列號(hào)
private int baseSequence;
// 壓縮前要寫入的消息體大小字節(jié)數(shù)
private int uncompressedRecordsSizeInBytes = 0;
// 壓縮前寫入的記錄數(shù)(不包括頭)
private int numRecords = 0;
// 實(shí)際壓縮率
private float actualCompressionRatio = 1;
// 最大時(shí)間戳
private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
// 最大時(shí)間戳偏移量
private long offsetOfMaxTimestamp = -1;
// 最后的偏移量
private Long lastOffset = null;
// 第一次追加消息的時(shí)間戳
private Long firstTimestamp = null;
// 真正保存消息的地方
private MemoryRecords builtRecords;
從該類屬性字段來看比較多,這里只講2個(gè)關(guān)于字節(jié)流的字段。
- CLOSED_STREAM:當(dāng)關(guān)閉某個(gè) ByteBuffer 也會(huì)把它對(duì)應(yīng)的寫操作輸出流設(shè)置為 CLOSED_STREAM,目的就是防止再向該 ByteBuffer 寫數(shù)據(jù),否則就拋異常。
- bufferStream:首先 MemoryRecordsBuilder 依賴 ByteBuffer 來完成消息存儲(chǔ)。它會(huì)將 ByteBuffer 封裝成 ByteBufferOutputStream 并實(shí)現(xiàn)了 Java NIO 的 OutputStream,這樣就可以按照流的方式寫數(shù)據(jù)了。同時(shí) ByteBufferOutputStream 提供了自動(dòng)擴(kuò)容 ByteBuffer 能力。
來看看它的初始化構(gòu)造方法。
public MemoryRecordsBuilder(ByteBuffer buffer,...) {
// 將MemoryRecordsBuilder關(guān)聯(lián)的ByteBuffer封裝成ByteBufferOutputStream流
this(new ByteBufferOutputStream(buffer), ...);
}
// 構(gòu)造方法
public MemoryRecordsBuilder(
ByteBufferOutputStream bufferStream,
...
int writeLimit) {
....
// 初始位置
this.initialPosition = bufferStream.position();
// 1. 根據(jù)不同消息版本計(jì)算批次Batch頭的長(zhǎng)度
this.batchHeaderSizeInBytes = AbstractRecords.recordBatchHeaderSizeInBytes(magic, compressionType);
// 2. 調(diào)整對(duì)應(yīng)的position
bufferStream.position(initialPosition + batchHeaderSizeInBytes);
this.bufferStream = bufferStream;
// 3. 在bufferStream流外層套一層壓縮流,再套一層DataOutputStream流
this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
}
}
從構(gòu)造函數(shù)可以看出,除了基本字段的賦值之外,會(huì)做以下3件事情:
- 根據(jù)消息版本、壓縮類型來計(jì)算批次 Batch 頭的大小長(zhǎng)度。
- 通過調(diào)整 bufferStream 的 position,使其跳過 Batch 頭部位置,就可以直接寫入消息了。
- 對(duì) bufferStream 增加壓縮功能。
看到這里,挺有意思的,不知讀者是否意識(shí)到這里涉及到 「ByteBuffer」、「bufferStream」 、「appendStream」。
三者的關(guān)系是通過「裝飾器模式」實(shí)現(xiàn)的,即 bufferStream 對(duì) ByteBuffer 裝飾實(shí)現(xiàn)擴(kuò)容功能,而 appendStream 又對(duì) bufferStream 裝飾實(shí)現(xiàn)壓縮功能。
來看看它的核心方法。
(1)appendWithOffset()
// 追加新記錄
public Long append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
return appendWithOffset(nextSequentialOffset(), timestamp, key, value, headers);
}
// 計(jì)算下一個(gè)連續(xù)偏移量
private long nextSequentialOffset() {
// lastOffset用來記錄當(dāng)前寫入Record的offset,每次當(dāng)有新Record寫入時(shí),都會(huì)遞增它。
return lastOffset == null ? baseOffset : lastOffset + 1;
}
// 根據(jù)偏移量追加消息
private Long appendWithOffset(
long offset,
boolean isControlRecord,
long timestamp,
ByteBuffer key,
ByteBuffer value,
Header[] headers) {
try {
// 檢查isControl標(biāo)志是否一致
if (isControlRecord != isControlBatch)
throw new ...;
// 保證offset是遞增的
if (lastOffset != null && offset <= lastOffset)
throw new ...;
// 檢查時(shí)間戳
if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP)
throw new ...;
// 只有V2版本才有header
if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0)
throw new ...;
// 更新firstTimestamp
if (firstTimestamp == null)
firstTimestamp = timestamp;
// V2版本消息寫入
if (magic > RecordBatch.MAGIC_VALUE_V1) {
appendDefaultRecord(offset, timestamp, key, value, headers);
return null;
} else {
//V0、V1 版本消息寫入(此處不進(jìn)行剖析)
return appendLegacyRecord(offset, timestamp, key, value, magic);
}
} catch (IOException e) {
// 拋異常
}
}
該方法主要用來根據(jù)偏移量追加寫消息,會(huì)根據(jù)消息版本來寫對(duì)應(yīng)消息,但需要明確的是 ProducerBatch 對(duì)標(biāo) V2 版本。
來看看 V2 版本消息寫入邏輯。
private void appendDefaultRecord(
long offset,
long timestamp,
ByteBuffer key,
ByteBuffer value,
Header[] headers) throws IOException {
// 1. 檢查appendStream狀態(tài)是否可以寫
ensureOpenForRecordAppend();
// 2. 計(jì)算寫入多少偏移量
int offsetDelta = (int) (offset - baseOffset);
// 3.計(jì)算本次寫與第一次寫之間時(shí)間差
long timestampDelta = timestamp - firstTimestamp;
// 4.使用DefaultRecord.writeTo()方法會(huì)按照V2 版本格式寫入appendStream流中,并返回壓縮前的消息大小
int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
// 5. 消息寫入成功后更新RecordBatch的元信息
recordWritten(offset, timestamp, sizeInBytes);
}
// 判斷appendStream狀態(tài)是否為CLOSED_STREAM
private void ensureOpenForRecordAppend() {
if (appendStream == CLOSED_STREAM)
throw new ...;
}
// 消息寫入成功后更新RecordBatch的元信息
private void recordWritten(long offset, long timestamp, int size) {
....
// 壓縮前寫入的記錄數(shù) + 1
numRecords += 1;
// 壓縮前要寫入的消息體大小字節(jié)數(shù) + size
uncompressedRecordsSizeInBytes += size;
// 最后的偏移量 + offset
lastOffset = offset;
if (magic > RecordBatch.MAGIC_VALUE_V0 && timestamp > maxTimestamp) {
// 賦值最大時(shí)間戳
maxTimestamp = timestamp;
// 賦值最大時(shí)間戳偏移量
offsetOfMaxTimestamp = offset;
}
}
該方法主要用來寫入 V2 版本消息的,主要做以下5件事情:
- 檢查是否可寫:判斷 appendStream 狀態(tài)是否為 CLOSED_STREAM,如果不是就可寫,否則拋異常。
- 計(jì)算本次要寫入多少偏移量。
- 計(jì)算本次寫入和第一次寫的時(shí)間差。
- 按照 V2 版本格式寫入 appendStream 流中,并返回壓縮前的消息大小。
- 成功后更新 RecordBatch 的元信息。
(2)hasRoomFor()
public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
// 檢查兩個(gè)狀態(tài)
// (1)appendStream流狀態(tài)
// (2)當(dāng)前已經(jīng)寫入的預(yù)估字節(jié)數(shù)是否超過了writeLimit寫入上限
if (isFull())
return false;
// 每個(gè)RecordBatch至少可以寫入一個(gè)Record,此時(shí)如果一個(gè)Record都沒有,則可以繼續(xù)寫入
if (numRecords == 0)
return true;
final int recordSize;
if (magic < RecordBatch.MAGIC_VALUE_V2) {
// 預(yù)估V0、V1舊版本的Record大小
recordSize = Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
} else {
// 預(yù)估V2版本寫入的Record大小
int nextOffsetDelta = lastOffset == null ? 0 : (int) (lastOffset - baseOffset + 1);
...
recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, timestampDelta, key, value, headers);
}
// 已寫入字節(jié)數(shù) + 本次寫入Record的預(yù)估字節(jié)數(shù)不能超過writeLimit寫入上限
return this.writeLimit >= estimatedBytesWritten() + recordSize;
}
public boolean isFull() {
return appendStream == CLOSED_STREAM ||
(this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten());
}
該方法主要用來估計(jì)當(dāng)前 MemoryRecordsBuilder 是否還有空間來容納要寫入的 Record,會(huì)在下面 ProducerBatch.tryAppend() 里面調(diào)用。
最后來看看小節(jié)開始提到的自動(dòng)擴(kuò)容功能。
(3)expandBuffer()
public class ByteBufferOutputStream extends OutputStream {
// 擴(kuò)容因子1.1倍
private static final float REALLOCATION_FACTOR = 1.1f;
// 初始容量
private final int initialCapacity;
// 初始位置
private final int initialPosition;
// 計(jì)算是否需要擴(kuò)容
public void ensureRemaining(int remainingBytesRequired) {
// 當(dāng)寫入字節(jié)數(shù)大于buffer當(dāng)前剩余字節(jié)數(shù)就開啟擴(kuò)容
if (remainingBytesRequired > buffer.remaining())
expandBuffer(remainingBytesRequired);
}
// 擴(kuò)容
private void expandBuffer(int remainingRequired) {
// 1. 評(píng)估需要多少空間
int expandSize = Math.max((int) (buffer.limit() * REALLOCATION_FACTOR), buffer.position() + remainingRequired);
// 2. 申請(qǐng)新的ByteBuffer
ByteBuffer temp = ByteBuffer.allocate(expandSize);
// 3. 獲取寫入上限
int limit = limit();
// 4. 寫狀態(tài)轉(zhuǎn)換為讀狀態(tài)
buffer.flip();
// 5. 將buffer讀到新申請(qǐng)的temp里
temp.put(buffer);
// 6. 修改寫模式的limit上限
buffer.limit(limit);
// 7. 更新原來的buffer的position,防止被重復(fù)消費(fèi)
buffer.position(initialPosition);
// 8. 將引用指向新申請(qǐng)的ByteBuffer
buffer = temp;
}
}
該方法主要用來判斷是否需要擴(kuò)容 ByteBuffer 的,即當(dāng)寫入字節(jié)數(shù)大于 buffer 當(dāng)前剩余字節(jié)數(shù)就開啟擴(kuò)容,擴(kuò)容需要做以下3件事情:
- 評(píng)估需要多少空間: 在「擴(kuò)容空間」、「真正需要多少字節(jié)」之間取最大值,此處通過「擴(kuò)容因子」來計(jì)算主要是因?yàn)閿U(kuò)容是需要消耗系統(tǒng)資源的,如果每次都按實(shí)際數(shù)據(jù)大小來進(jìn)行分配空間,會(huì)浪費(fèi)不必要的系統(tǒng)資源。
- 申請(qǐng)新的空間:根據(jù)擴(kuò)容多少申請(qǐng)新的 ByteBuffer,然后將原來的 ByteBuffer 數(shù)據(jù)拷貝進(jìn)去,對(duì)應(yīng)源碼步驟:「3 - 7」。
- 最后將引用指向新申請(qǐng)的 ByteBuffer。
接下來看看 ProducerBatch 的實(shí)現(xiàn)。
3、ProducerBatch
public final class ProducerBatch {
// 批次最終狀態(tài)
private enum FinalState { ABORTED, FAILED, SUCCEEDED }
// 批次創(chuàng)建時(shí)間
final long createdMs;
// 批次對(duì)應(yīng)的主題分區(qū)
final TopicPartition topicPartition;
// 請(qǐng)求結(jié)果的future
final ProduceRequestResult produceFuture;
// 用來存儲(chǔ)消息的callback和響應(yīng)數(shù)據(jù)
private final List<Thunk> thunks = new ArrayList<>();
// 封裝MemoryRecords對(duì)象,用來存儲(chǔ)消息的ByteBuffer
private final MemoryRecordsBuilder recordsBuilder;
// batch的失敗重試次數(shù)
private final AtomicInteger attempts = new AtomicInteger(0);
// 是否是被分裂的批次
private final boolean isSplitBatch;
// ProducerBatch的最終狀態(tài)
private final AtomicReference<FinalState> finalState = new AtomicReference<>(null);
// Record個(gè)數(shù)
int recordCount;
// 最大Record字節(jié)數(shù)
int maxRecordSize;
// 最后一次失敗重試發(fā)送的時(shí)間戳
private long lastAttemptMs;
// 最后一次向該P(yáng)roducerBatch追加Record的時(shí)間戳
private long lastAppendTime;
// Sender子線程拉取批次的時(shí)間
private long drainedMs;
// 是否正在重試過,如果ProducerBatch中的數(shù)據(jù)發(fā)送失敗,則會(huì)重新嘗試發(fā)送
private boolean retry;
}
// 構(gòu)造函數(shù)
public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs, boolean isSplitBatch) {
...
// 請(qǐng)求結(jié)果的future
this.produceFuture = new ProduceRequestResult(topicPartition);
...
}
一個(gè) ProducerBatch 會(huì)存放一條或多條消息,通常把它稱為「批次消息」。
先來看看幾個(gè)重要字段:
- topicPartition:批次對(duì)應(yīng)的主題分區(qū),當(dāng)前 ProducerBatch 中緩存的 Record 都會(huì)發(fā)送給該 TopicPartition。
- produceFuture:請(qǐng)求結(jié)果的 Future,通過 ProduceRequestResult 類實(shí)現(xiàn)。
- thunks:Thunk 對(duì)象集合,用來存儲(chǔ)消息的 callback 和每個(gè) Record 關(guān)聯(lián)的 Feture 響應(yīng)數(shù)據(jù)。
- recordsBuilder:封裝 MemoryRecords 對(duì)象,用來存儲(chǔ)消息的 ByteBuffer。
- attemps:batch 的失敗重試次數(shù),通過 AtomicInteger 提供原子操作來進(jìn)行 Integer 的使用,適合高并發(fā)情況下的使用。
- isSplitBatch:是否是被分裂的批次,因單個(gè)消息過大導(dǎo)致一個(gè) ProducerBatch 存不下,被分裂成多個(gè) ProducerBatch 來存儲(chǔ)的情況。
- drainedMs:Sender 子線程拉取批次的時(shí)間。
- retry:如果 ProducerBatch 中的數(shù)據(jù)發(fā)送失敗,則會(huì)重新嘗試發(fā)送。
在構(gòu)造函數(shù)中,有個(gè)重要的依賴組件就是 「ProduceRequestResult」,而它是「異步獲取消息生產(chǎn)結(jié)果的類」,簡(jiǎn)單剖析下。
(1)ProduceRequestResult 類
public class ProduceRequestResult {
// 通過一個(gè)count為1的CountDownLatch對(duì)象間接地實(shí)現(xiàn)了Future的功能。
private final CountDownLatch latch = new CountDownLatch(1);
private final TopicPartition topicPartition;
// 用來記錄broker端關(guān)聯(lián)ProducerBatch中第一條Record分配的offset值
// 這樣每個(gè)Record的真實(shí)offset就可以根據(jù)自身在ProducerBatch的位置計(jì)算出來了(baseOffset + relativeOffset)
private volatile Long baseOffset = null;
// 構(gòu)造函數(shù)
public ProduceRequestResult(TopicPartition topicPartition) {
this.topicPartition = topicPartition;
}
// 當(dāng)?shù)鹊巾憫?yīng)會(huì)會(huì)調(diào)該函數(shù)喚醒阻塞的主線程
public void done() {
if (baseOffset == null)
throw new ...;
this.latch.countDown();
}
// 調(diào)用await()方法的線程會(huì)被掛起,它會(huì)等待直到count值為0才繼續(xù)執(zhí)行
public void await() throws InterruptedException {
latch.await();
}
// 和await()類似,只不過等待一定的時(shí)間后count值還沒變?yōu)?的話就會(huì)繼續(xù)執(zhí)行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return latch.await(timeout, unit);
}
}
該類通過 CountDownLatch(1) 間接地實(shí)現(xiàn)了 Future 功能,并讓其他所有線程都在這個(gè)鎖上等待,此時(shí)只需要調(diào)用一次 countDown() 方法就可以讓其他所有等待的線程同時(shí)恢復(fù)執(zhí)行。
當(dāng) Producer 發(fā)送消息時(shí)會(huì)間接調(diào)用「ProduceRequestResult.await」,此時(shí)線程就會(huì)等待服務(wù)端的響應(yīng)。當(dāng)服務(wù)端響應(yīng)時(shí)調(diào)用「ProduceRequestResult.done」,該方法調(diào)用了「CountDownLatch.countDown」喚醒了阻塞在「CountDownLatch.await」上的主線程。這些線程后續(xù)可以通過 ProduceRequestResult 的 error 字段來判斷本次請(qǐng)求成功還是失敗。
接下來看看 ProducerBatch 類的重要方法。
(2)tryAppend()
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
// 1.檢查MemoryRecordsBuilder是否還有空間寫入
if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
return null;
} else {
// 2.調(diào)用append()方法寫入Record
Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
// 3. 更新最大Record字節(jié)數(shù)
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),recordsBuilder.compressionType(), key, value, headers));
...
// 4.構(gòu)建FutureRecordMetadata對(duì)象
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,timestamp, checksum,key == null ? -1 : key.length,value == null ? -1 : value.length, Time.SYSTEM);
// 5. 將Callback和FutureRecordMetadata記錄到thunks集合中
thunks.add(new Thunk(callback, future));
// 6. 更新Record記錄數(shù)
this.recordCount++;
// 7. 返回FutureRecordMetadata
return future;
}
}
該方法主要用來嘗試追加寫消息的,主要做以下6件事情:
- 通過 MemoryRecordsBuilder 的 hasRoomFor() 檢查當(dāng)前 ProducerBatch 是否還有足夠的空間來存儲(chǔ)此次寫入的 Record。
- 調(diào)用 MemoryRecordsBuilder.append() 方法將 Record 追加到 ByteBuffer 中。
- 創(chuàng)建 FutureRecordMetadata 對(duì)象,底層繼承了 Future 接口,對(duì)應(yīng)此次 Record 的發(fā)送。
- 將 Future 和消息的 callback 回調(diào)封裝成 Thunk 對(duì)象,放入 thunks 集合中。
- 更新 Record 記錄數(shù)。
- 返回 FutureRecordMetadata。
可以看出該方法只是讓 Producer 主線程完成了消息的緩存,并沒有實(shí)現(xiàn)真正的網(wǎng)絡(luò)發(fā)送。
接下來簡(jiǎn)單看看 FutureRecordMetadata,它實(shí)現(xiàn)了 JDK 中 concurrent 的 Future 接口。除了維護(hù) ProduceRequestResult 對(duì)象外還維護(hù)了 relativeOffset 等字段,其中 relativeOffset 用來記錄對(duì)應(yīng) Record 在 ProducerBatch 中的偏移量。
該類有2個(gè)值得注意的方法,get() 和 value()。
public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
...
// 依賴ProduceRequestResult的CountDown來實(shí)現(xiàn)阻塞等待
boolean occurred = this.result.await(timeout, unit);
...
// 調(diào)用value()方法返回RecordMetadata對(duì)象
return valueOrError();
}
RecordMetadata valueOrError() throws ExecutionException {
...
return value();
}
該方法主要依賴 ProduceRequestResult 的 CountDown 來實(shí)現(xiàn)阻塞等待,最后調(diào)用 value() 返回 RecordMetadata 對(duì)象。
RecordMetadata value() {
...
// 將 partition、baseOffset、relativeOffset、時(shí)間戳(LogAppendTime | CreateTimeStamp)等信息封裝成 RecordMetadata 對(duì)象返回
return new RecordMetadata(
result.topicPartition(),
...);
}
private long timestamp() {
return result.hasLogAppendTime() ? result.logAppendTime() : createTimestamp;
}
該方法主要通過各種參數(shù)封裝成 RecordMetadata 對(duì)象返回。
了解了 ProducerBatch 是如何寫入數(shù)據(jù)的,我們?cè)賮砜纯?done() 方法。當(dāng) Producer 收到 Broker 端「正常」|「超時(shí)」|「異?!箌「關(guān)閉生產(chǎn)者」等響應(yīng)都會(huì)調(diào)用 ProducerBatch 的 done()方法。
(3)done()
public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) {
// 1.根據(jù)exception決定本次ProducerBatch發(fā)送的最終狀態(tài)
final FinalState tryFinalState = (exception == null) ? FinalState.SUCCEEDED : FinalState.FAILED;
....
// 2.通過CAS操作更新finalState狀態(tài),只有第一次更新的時(shí)候,才會(huì)觸發(fā)completeFutureAndFireCallbacks()方法
if (this.finalState.compareAndSet(null, tryFinalState)) {
// 3.執(zhí)行回調(diào)
completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
return true;
}
....
return false;
}
該方法主要用來是否可以執(zhí)行回調(diào)操作,即當(dāng)收到該批次響應(yīng)后,判斷批次 Batch 最終狀態(tài)是否可以執(zhí)行回調(diào)操作。
(4)completeFutureAndFireCallbacks()
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
// 1.更新ProduceRequestResult中的相關(guān)字段
produceFuture.set(baseOffset, logAppendTime, exception);
// 2.遍歷thunks集合,觸發(fā)每個(gè)Record的Callback回調(diào)
for (Thunk thunk : thunks) {
try {
if (exception == null) {
// 3.獲取消息元數(shù)據(jù)
RecordMetadata metadata = thunk.future.value();
if (thunk.callback != null)
//4.調(diào)用回調(diào)方法
thunk.callback.onCompletion(metadata, null);
} else {
if (thunk.callback != null)
// 4.調(diào)用回調(diào)方法
thunk.callback.onCompletion(null, exception);
}
}
....
}
// 4.調(diào)用底層 CountDownLatch.countDown()方法,阻塞在其上的主線程。
produceFuture.done();
}
該方法主要用來調(diào)用回調(diào)方法和完成 future,主要做以下3件事情:
- 更新 ProduceRequestResult 中的相關(guān)字段,包括基本位移、消息追加的時(shí)間、異常。
- 遍歷 thunks 集合,觸發(fā)每個(gè) Record 的 Callback 回調(diào)。
- 調(diào)用底層 CountDownLatch.countDown()方法,阻塞在其上的主線程。
至此我們已經(jīng)講解了 ProducerBatch 「如何緩存消息」、「如何處理響應(yīng)」、「如何處理回調(diào)」三個(gè)最重要方法。
通過一張圖來描述下緩存消息的存儲(chǔ)結(jié)構(gòu):
接下來看看 Kafka 生產(chǎn)端最經(jīng)典的 「緩沖池架構(gòu)」。
三、客戶端緩存池架構(gòu)設(shè)計(jì)
為什么客戶端需要緩存池這個(gè)經(jīng)典架構(gòu)設(shè)計(jì)呢?
主要原因就是頻繁的創(chuàng)建和釋放 ProducerBatch 會(huì)導(dǎo)致 Full GC 問題,所以 Kafka 針對(duì)這個(gè)問題實(shí)現(xiàn)了一個(gè)非常優(yōu)秀的機(jī)制,就是「緩存池 BufferPool 機(jī)制」。即每個(gè) Batch 底層都對(duì)應(yīng)一塊內(nèi)存空間,這個(gè)內(nèi)存空間就是專門用來存放消息,用完歸還就行。
接下來看看緩存池的源碼設(shè)計(jì)。
1、BufferPool
github 源碼地址如下:
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java。
public class BufferPool {
// 整個(gè)BufferPool總內(nèi)存大小 默認(rèn)32M
private final long totalMemory;
// 當(dāng)前BufferPool管理的單個(gè)ByteBuffer大小,16k
private final int poolableSize;
// 因?yàn)橛卸嗑€程并發(fā)分配和回收ByteBuffer,用鎖控制并發(fā),保證線程安全。
private final ReentrantLock lock;
// 對(duì)應(yīng)一個(gè)ArrayDeque<ByteBuffer> 隊(duì)列,其中緩存了固定大小的 ByteBuffer 對(duì)象
private final Deque<ByteBuffer> free;
// 此隊(duì)列記錄因申請(qǐng)不到足夠空間而阻塞的線程對(duì)應(yīng)的Condition 對(duì)象
private final Deque<Condition> waiters;
// 非池化可用的內(nèi)存即totalMemory減去free列表中的全部ByteBuffer的大小
private long nonPooledAvailableMemory;
// 構(gòu)造函數(shù)
public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
...
// 總的內(nèi)存
this.totalMemory = memory;
// 默認(rèn)的池外內(nèi)存,就是總的內(nèi)存
this.nonPooledAvailableMemory = memory;
}
}
先來看看上面幾個(gè)重要字段:
- totalMemory:整個(gè) BufferPool 內(nèi)存大小「buffer.memory」,默認(rèn)是32M。
- poolableSize:池化緩存池一塊內(nèi)存塊的大小「batch.size」,默認(rèn)是16k。
- lock:當(dāng)有多線程并發(fā)分配和回收 ByteBuffer 時(shí),為了保證線程的安全,使用鎖來控制并發(fā)。
- free:池化的 free 隊(duì)列,其中緩存了指定大小的 ByteBuffer 對(duì)象。
- waiters:阻塞線程對(duì)應(yīng)的 Condition 隊(duì)列,當(dāng)有申請(qǐng)不到足夠內(nèi)存的線程時(shí),為了等待其他線程釋放內(nèi)存而阻塞等待,對(duì)應(yīng)的 Condition 對(duì)象會(huì)進(jìn)入該隊(duì)列。
- nonPooledAvailableMemory:非池化可用內(nèi)存。
可以看出它只會(huì)針對(duì)固定大小「poolableSize 16k」的 ByteBuffer 進(jìn)行管理,ArrayDeque 的初始化大小是16,此時(shí) BufferPool 的狀態(tài)如下圖:
接下來看看 BufferPool 的重要方法。
(1)allocate()
// 分配指定空間的緩存,如果緩沖區(qū)中沒有足夠的空閑空間,那么會(huì)阻塞線程,直到超時(shí)或得到足夠空間
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
// 1.判斷申請(qǐng)的內(nèi)存是否大于總內(nèi)存
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of "+ this.totalMemory + " on memory allocations.");
// 初始化buffer
ByteBuffer buffer = null;
// 2.加鎖,保證線程安全。
this.lock.lock();
// 如果當(dāng)前BufferPool處于關(guān)閉狀態(tài),則直接拋出異常
if (this.closed) {
this.lock.unlock();
throw new KafkaException("Producer closed while allocating memory");
}
....
try {
// 3.申請(qǐng)內(nèi)存大小恰好為16k 且free緩存池不為空
if (size == poolableSize && !this.free.isEmpty())
// 從free隊(duì)列取出一個(gè)ByteBuffer
return this.free.pollFirst();
// 對(duì)于申請(qǐng)內(nèi)存大小非16k情況
// 先計(jì)算free緩存池總空間大小,判斷是否足夠
int freeListSize = freeSize() * this.poolableSize;
// 4.當(dāng)前BufferPool能夠釋放出申請(qǐng)內(nèi)存大小的空間
if (this.nonPooledAvailableMemory + freeListSize >= size) {
// 5.如果size大于非池化可用內(nèi)存大小,就循環(huán)從free緩存池里釋放出來空閑Bytebuffer補(bǔ)充到nonPooledAvailableMemory中,直到滿足size大小為止。
freeUp(size);
// 釋放非池化可用內(nèi)存大小
this.nonPooledAvailableMemory -= size;
} else {
// 如果當(dāng)前BufferPool不夠提供申請(qǐng)內(nèi)存大小,則需要阻塞當(dāng)前線程
// 累計(jì)已經(jīng)釋放的內(nèi)存
int accumulated = 0;
// 創(chuàng)建對(duì)應(yīng)的Condition,阻塞自己等待別的線程釋放內(nèi)存
Condition moreMemory = this.lock.newCondition();
try {
// 計(jì)算當(dāng)前線程最大阻塞時(shí)長(zhǎng)
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
// 把自己添加到等待隊(duì)列中末尾,保持公平性,先來的先獲取內(nèi)存,防止饑餓
this.waiters.addLast(moreMemory);
// 循環(huán)等待直到分配成功或超時(shí)
while (accumulated < size) {
....
try {
// 當(dāng)前線程阻塞等待,返回結(jié)果為false則表示阻塞超時(shí)
waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
} finally {
....
}
....
// 申請(qǐng)內(nèi)存大小是16k,且free緩存池有了空閑的ByteBuffer
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
// 從free隊(duì)列取出一個(gè)ByteBuffer
buffer = this.free.pollFirst();
// 計(jì)算累加器
accumulated = size;
} else {
// 釋放空間給非池化可用內(nèi)存,并繼續(xù)等待空閑空間,如果分配多了只取夠size的空間
freeUp(size - accumulated);
int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
// 釋放非池化可用內(nèi)存大小
this.nonPooledAvailableMemory -= got;
// 累計(jì)分配了多少空間
accumulated += got;
}
}
accumulated = 0;
} finally {
// 如果循環(huán)有異常,將已釋放的空間歸還給非池化可用內(nèi)存
this.nonPooledAvailableMemory += accumulated;
//把自己從等待隊(duì)列中移除并結(jié)束
this.waiters.remove(moreMemory);
}
}
} finally {
// 當(dāng)非池化可用內(nèi)存有內(nèi)存或free緩存池有空閑ByteBufer且等待隊(duì)列里有線程正在等待
try {
if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
// 喚醒隊(duì)列里正在等待的線程
this.waiters.peekFirst().signal();
} finally {
// 解鎖
lock.unlock();
}
}
// 說明空間足夠,并且有足夠空閑的了??梢詧?zhí)行真正的分配空間了。
if (buffer == null)
// 沒有正好的buffer,從緩沖區(qū)外(JVM Heap)中直接分配內(nèi)存
return safeAllocateByteBuffer(size);
else
// 直接復(fù)用free緩存池的ByteBuffer
return buffer;
}
private ByteBuffer safeAllocateByteBuffer(int size) {
boolean error = true;
try {
//分配空間
ByteBuffer buffer = allocateByteBuffer(size);
error = false;
//返回buffer
return buffer;
} finally {
if (error) {
//分配失敗了, 加鎖,操作內(nèi)存pool
this.lock.lock();
try {
//歸還空間給非池化可用內(nèi)存
this.nonPooledAvailableMemory += size;
if (!this.waiters.isEmpty())
//有其他在等待的線程的話,喚醒其他線程
this.waiters.peekFirst().signal();
} finally {
// 加鎖不忘解鎖
this.lock.unlock();
}
}
}
}
protected ByteBuffer allocateByteBuffer(int size) {
// 從JVM Heap中分配空間
return ByteBuffer.allocate(size);
}
// 不斷從free隊(duì)列中釋放空閑的ByteBuffer來補(bǔ)充非池化可用內(nèi)存
private void freeUp(int size) {
while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
this.nonPooledAvailableMemory += this.free.pollLast().capacity();
}
該方法主要用來嘗試分配 ByteBuffer,這里分4種情況說明下:
情況1:申請(qǐng)16k且free緩存池有可用內(nèi)存
此時(shí)會(huì)直接從 free 緩存池中獲取隊(duì)首的 ByteBuffer 分配使用,用完后直接將 ByteBuffer 放到 free 緩存池的隊(duì)尾中,并調(diào)用 clear() 清空數(shù)據(jù),以便下次重復(fù)使用。
情況2:申請(qǐng)16k且free緩存池?zé)o可用內(nèi)存
此時(shí) free 緩存池?zé)o可用內(nèi)存,只能從非池化可用內(nèi)存中獲取16k內(nèi)存來分配,用完后直接將 ByteBuffer 放到 free 緩存池的隊(duì)尾中,并調(diào)用 clear() 清空數(shù)據(jù),以便下次重復(fù)使用。
情況3:申請(qǐng)非16k且free緩存池?zé)o可用內(nèi)存
此時(shí) free 緩存池?zé)o可用內(nèi)存,且申請(qǐng)的是非16k,只能從非池化可用內(nèi)存(空間夠分配)中獲取一部分內(nèi)存來分配,用完后直接將申請(qǐng)到的內(nèi)存空間釋放到非池化可用內(nèi)存中,后續(xù)會(huì)被 GC 掉。
情況4:申請(qǐng)非16k且free緩存池有可用內(nèi)存,但非池化可用內(nèi)存不夠
此時(shí) free 緩存池有可用內(nèi)存,但申請(qǐng)的是非16k,先嘗試從 free 緩存池中將 ByteBuffer 釋放到非池化可用內(nèi)存中,直到滿足申請(qǐng)內(nèi)存大小(size),然后從非池化可用內(nèi)存獲取對(duì)應(yīng)內(nèi)存大小來分配,用完后直接將申請(qǐng)到的內(nèi)存空間釋放到到非池化可用內(nèi)存中,后續(xù)會(huì)被 GC 掉。
(2)deallocate()
public void deallocate(ByteBuffer buffer, int size) {
// 1.加鎖,保證線程安全。
lock.lock();
try {
// 2.如果待釋放的size大小為16k,則直接放入free隊(duì)列中
if (size == this.poolableSize && size == buffer.capacity()) {
// 清空buffer
buffer.clear();
// 釋放buffer到free隊(duì)列里
this.free.add(buffer);
} else {
//如果非16k,則由JVM GC來回收ByteBuffer并增加非池化可用內(nèi)存
this.nonPooledAvailableMemory += size;
}
// 3.喚醒waiters中的第一個(gè)阻塞線程
Condition moreMem = this.waiters.peekFirst();
if (moreMem != null)
moreMem.signal();
} finally {
// 釋放鎖
lock.unlock();
}
}
該方法主要用來嘗試釋放 ByteBuffer 空間,主要做以下幾件事情:
- 先加鎖,保證線程安全。
- 如果待釋放的 size 大小為16k,則直接放入 free 隊(duì)列中。
- 否則由 JVM GC 來回收 ByteBuffer 并增加 nonPooledAvailableMemory。
- 當(dāng)有 ByteBuffer 回收了,喚醒 waiters 中的第一個(gè)阻塞線程。
最后來看看 kafka 自定義的支持「讀寫分離場(chǎng)景」CopyOnWriteMap 的實(shí)現(xiàn)。
2、CopyOnWriteMap
github 源碼地址如下:
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/CopyOnWriteMap.java。
通過 RecordAccumulator 類的屬性字段中可以看到,CopyOnWriteMap 中 key 為主題分區(qū),value 為向這個(gè)分區(qū)發(fā)送的 Deque<ProducerBatch> 隊(duì)列集合。
我們知道生產(chǎn)消息時(shí),要發(fā)送的分區(qū)是很少變動(dòng)的,所以寫操作會(huì)很少。大部分情況都是先獲取分區(qū)對(duì)應(yīng)的隊(duì)列,然后將 ProducerBatch 放入隊(duì)尾,所以讀操作是很頻繁的,這就是個(gè)典型的「讀多寫少」的場(chǎng)景。
所謂 「CopyOnWrite」 就是當(dāng)寫的時(shí)候會(huì)拷貝一份來進(jìn)行寫操作,寫完了再替換原來的集合。
來看看它的源碼實(shí)現(xiàn)。
public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
// volatile Map
private volatile Map<K, V> map;
// 構(gòu)造函數(shù)
public CopyOnWriteMap() {
this.map = Collections.emptyMap();
}
該類只有一個(gè)重要的字段 Map,是通過「volatile」來修飾的,目的就是在多線程的場(chǎng)景下,當(dāng) Map 發(fā)生變化的時(shí)候其他的線程都是可見的。
接下來看幾個(gè)重要方法,都比較簡(jiǎn)單,但是實(shí)現(xiàn)非常經(jīng)典。
03.2.1 get()
// 獲取集合中隊(duì)列
public V get(Object k) {
return map.get(k);
}
該方法主要用來讀取集合中的隊(duì)列,可以看到讀操作并沒有加鎖,多線程并發(fā)讀取的場(chǎng)景并不會(huì)阻塞,可以實(shí)現(xiàn)高并發(fā)讀取。如果隊(duì)列已經(jīng)存在了就直接返回即可。
(2)putIfAbsent()
public synchronized V putIfAbsent(K k, V v) {
if (!containsKey(k))
return put(k, v);
else
return get(k);
}
// 判斷隊(duì)列是否存在
public boolean containsKey(Object k) {
return map.containsKey(k);
}
該方法主要用來獲取或者設(shè)置隊(duì)列,會(huì)被多個(gè)線程并發(fā)執(zhí)行,通過「synchronized」來修飾可以保證線程安全的,除非隊(duì)列不存在才會(huì)去設(shè)置。
(3)put()
public synchronized V put(K k, V v) {
Map<K, V> copy = new HashMap<K, V>(this.map);
V prev = copy.put(k, v);
this.map = Collections.unmodifiableMap(copy);
return prev;
}
該方法主要用來設(shè)置隊(duì)列的, put 時(shí)也是通過「synchronized」來修飾的,可以保證同一時(shí)間只有一個(gè)線程會(huì)來更新這個(gè)值。
那為什么說寫操作不會(huì)阻塞讀操作呢?
- 首先重新創(chuàng)建一個(gè) HashMap 集合副本。
- 通過「volatile」寫的方式賦值給對(duì)應(yīng)集合里。
- 把新的集合設(shè)置成「不可修改的 map」,并賦值給字段 map。
這就實(shí)現(xiàn)了讀寫分離。對(duì)于 Producer 最最核心,會(huì)出現(xiàn)多線程并發(fā)訪問的就是緩存池。因此這塊的高并發(fā)設(shè)計(jì)相當(dāng)重要。
四、總結(jié)
這里,我們一起來總結(jié)一下這篇文章的重點(diǎn)。
- 帶你先整體的梳理了 Kafka 客戶端消息批量發(fā)送的好處。
- 通過一個(gè)真實(shí)生活場(chǎng)景類比來帶你理解 RecordAccumulator 內(nèi)部構(gòu)造,并且深度剖析了消息是如何在客戶端緩存的,以及內(nèi)部各組件實(shí)現(xiàn)原理。
- 帶你深度剖析了 Kafka 客戶端非常重要的 BufferPool 、CopyOnWriteMap 的實(shí)現(xiàn)原理。