六個技術點帶你理解 Kafka 高性能背后的原理
大家好,我是君哥。
Kafka 是一款性能非常優(yōu)秀的消息隊列,每秒處理的消息體量可以達到千萬級別。今天來聊一聊 Kafka 高性能背后的技術原理。
1、批量發(fā)送
Kafka 收發(fā)消息都是批量進行處理的。我們看一下 Kafka 生產(chǎn)者發(fā)送消息的代碼:
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
//省略前面代碼
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
//把消息追加到之前緩存的這一批消息上
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
//積累到設置的緩存大小,則發(fā)送出去
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch /**省略 catch 代碼*/
}
從代碼中可以看到,生產(chǎn)者調用 doSend 方法后,并不會直接把消息發(fā)送出去,而是把消息緩存起來,緩存消息量達到配置的批量大小后,才會發(fā)送出去。
注意:從上面 accumulator.append 代碼可以看到,一批消息屬于同一個 topic 下面的同一個 partition。
Broker 收到消息后,并不會把批量消息解析成單條消息后落盤,而是作為批量消息進行落盤,同時也會把批量消息直接同步給其他副本。
消費者拉取消息,也不會按照單條進行拉取,而是按照批量進行拉取,拉取到一批消息后,再解析成單條消息進行消費。
使用批量收發(fā)消息,減輕了客戶端和 Broker 的交互次數(shù),提升了 Broker 處理能力。
2、消息壓縮
如果消息體比較大,Kafka 消息吞吐量要達到千萬級別,網(wǎng)卡支持的網(wǎng)絡傳輸帶寬會是一個瓶頸。Kafka 的解決方案是消息壓縮。發(fā)送消息時,如果增加參數(shù) compression.type,就可以開啟消息壓縮:
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//開啟消息壓縮
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "key1", "value1");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
logger.error("sending message error: ", e);
} else {
logger.info("sending message successful, Offset: ", metadata.offset());
}
}
});
producer.close();
}
如果 compression.type 的值設置為 none,則不開啟壓縮。那消息是在什么時候進行壓縮呢?前面提到過,生產(chǎn)者緩存一批消息后才會發(fā)送,在發(fā)送這批消息之前就會進行壓縮,代碼如下:
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
// ...
try {
// ...
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
//...
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}
//這批消息緩存已滿,這里進行壓縮
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
dq.addLast(batch);
incomplete.add(batch);
// Don't deallocate this buffer in the finally block as it's being used in the record batch
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}
} finally {
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}
上面的 recordsBuilder 方法最終調用了下面 MemoryRecordsBuilder 的構造方法。
public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
byte magic,
CompressionType compressionType,
TimestampType timestampType,
long baseOffset,
long logAppendTime,
long producerId,
short producerEpoch,
int baseSequence,
boolean isTransactional,
boolean isControlBatch,
int partitionLeaderEpoch,
int writeLimit) {
//省略其他代碼
this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
}
上面的 wrapForOutput 方法會根據(jù)配置的壓縮算法進行壓縮或者選擇不壓縮。目前 Kafka 支持的壓縮算法包括:gzip、snappy、lz4,從 2.1.0 版本開始,Kafka 支持 Zstandard 算法。
在 Broker 端,會解壓 header 做一些校驗,但不會解壓消息體。消息體的解壓是在消費端,消費者拉取到一批消息后,首先會進行解壓,然后進行消息處理。
因為壓縮和解壓都是耗費 CPU 的操作,所以在開啟消息壓縮時,也要考慮生產(chǎn)者和消費者的 CPU 資源情況。
有了消息批量收集和壓縮,kafka 生產(chǎn)者發(fā)送消息的過程如下圖:
3、磁盤順序讀寫
順序讀寫省去了尋址的時間,只要一次尋址,就可以連續(xù)讀寫。
在固態(tài)硬盤上,順序讀寫的性能是隨機讀寫的好幾倍。而在機械硬盤上,尋址時需要移動磁頭,這個機械運動會花費很多時間,因此機械硬盤的順序讀寫性能是隨機讀寫的幾十倍。
Kafka 的 Broker 在寫消息數(shù)據(jù)時,首先為每個 Partition 創(chuàng)建一個文件,然后把數(shù)據(jù)順序地追加到該文件對應的磁盤空間中,如果這個文件寫滿了,就再創(chuàng)建一個新文件繼續(xù)追加寫。這樣大大減少了尋址時間,提高了讀寫性能。
4、PageCache
在 Linux 系統(tǒng)中,所有文件 IO 操作都要通過 PageCache,PageCache 是磁盤文件在內(nèi)存中建立的緩存。當應用程序讀寫文件時,并不會直接讀寫磁盤上的文件,而是操作 PageCache。
應用程序寫文件時,都先會把數(shù)據(jù)寫入 PageCache,然后操作系統(tǒng)定期地將 PageCache 的數(shù)據(jù)寫到磁盤上。如下圖:
而應用程序在讀取文件數(shù)據(jù)時,首先會判斷數(shù)據(jù)是否在 PageCache 中,如果在則直接讀取,如果不在,則讀取磁盤,并且將數(shù)據(jù)緩存到 PageCache。
Kafka 充分利用了 PageCache 的優(yōu)勢,當生產(chǎn)者生產(chǎn)消息的速率和消費者消費消息的速率差不多時,Kafka 基本可以不用落盤就能完成消息的傳輸。
5、零拷貝
Kafka Broker 將消息發(fā)送給消費端時,即使命中了 PageCache,也需要將 PageCache 中的數(shù)據(jù)先復制到應用程序的內(nèi)存空間,然后從應用程序的內(nèi)存空間復制到 Socket 緩存區(qū),將數(shù)據(jù)發(fā)送出去。如下圖:
Kafka 采用了零拷貝技術把數(shù)據(jù)直接從 PageCache 復制到 Socket 緩沖區(qū)中,這樣數(shù)據(jù)不用復制到用戶態(tài)的內(nèi)存空間,同時 DMA 控制器直接完成數(shù)據(jù)復制,不需要 CPU 參與。如下圖:
Java 零拷貝技術采用 FileChannel.transferTo() 方法,底層調用了 sendfile 方法。
6、mmap
Kafka 的日志文件分為數(shù)據(jù)文件(.log)和索引文件(.index),Kafka 為了提高索引文件的讀取性能,對索引文件采用了 mmap 內(nèi)存映射,將索引文件映射到進程的內(nèi)存空間,這樣讀取索引文件就不需要從磁盤進行讀取。如下圖:
7、總結
本文介紹了 Kafka 實現(xiàn)高性能用到的關鍵技術,這些技術可以為我們學習和工作提供參考。