場景題:如何提升Kafka效率?
Kafka 以其高吞吐量、低延遲和可擴展性而備受青睞。無論是在實時數據分析、日志收集還是事件驅動架構中,Kafka 都扮演著關鍵角色。
但是,如果 Kafka 使用不當,也可能會面臨性能瓶頸,影響系統的整體效率。所以,了解如何提升 Kafka 的運行效率?對于生產環(huán)境的使用和面試都是至關重要的。
那么,提升 Kafka 性能的有效手段都有哪些呢?接下來,我們一起來看。
性能調優(yōu)主要手段
Kafka 性能調優(yōu)的主要手段有以下幾個:
- 分區(qū)擴展
- 消息批發(fā)送(重要)
- 消息批獲取(重要)
- 配置調優(yōu)
- JVM 調優(yōu)
圖片
1.分區(qū)擴展
在 Kafka 架構中,使用多分區(qū)(Partition)來實現數據分片功能。也就是 Kafka 會將多條消息并發(fā)存儲到一個主題(Topic)的多個 Broker(Kafka 服務)中的多個 Partition 中,以實現并行操作的功能,極大地提高了整體系統的讀寫能力,如下圖所示:
圖片
數據分片是一種技術將大數據分割成更小、更易于管理的片段(稱為“分片”),并將分片都存儲在不同的服務器上,從而實現了數據的水平拆分。通過數據分片,可以有效地解決單一數據庫的性能瓶頸、存儲限制以及高可用性等問題。
因此,增加更多的 Broker,擴展更多的分區(qū) Partition 是提升 Kafka 性能的關鍵,如下圖所示:
圖片
2.消息批發(fā)送(重要)
Kafka 默認是不支持批量發(fā)送消息的,然而開啟批量發(fā)送消息可以提升 Kafka 整體運行效率。
為什么要批量發(fā)送消息?
批量發(fā)送消息有以下優(yōu)點:
- 減少網絡開銷:當生產者發(fā)送消息給 Kafka 時,如果每次只發(fā)送一條消息,那么就需要建立一次 TCP 連接,這涉及到三次握手的過程。而如果采用批量發(fā)送的方式,則可以在一次 TCP 連接中發(fā)送多條消息,減少了網絡連接建立和斷開的次數,從而降低了網絡開銷。
- 減少 I/O 操作:批量發(fā)送意味著一次寫入操作可以處理更多的數據。這對于磁盤 I/O 來說是一個優(yōu)勢,因為一次大的寫操作比多次小的寫操作更高效。
- 提高吞吐量:由于減少了通信次數,批量發(fā)送可以提高單位時間內發(fā)送的消息數量,即提高了吞吐量。
那么,想要實現 Kafka 批量消息發(fā)送只需要正確配置以下 3 個參數即可:
- batch-size:定義了 Kafka 生產者嘗試批量發(fā)送的消息的最大大?。ㄒ宰止?jié)為單位),生產者收集到足夠多的消息達到這個大小時,它會嘗試發(fā)送這些消息給 Kafka Broker,默認值為 16KB。
- buffer-memory:指定了 Kafka 生產者可以用來緩沖待發(fā)送消息的總內存空間,如果生產者試圖發(fā)送的消息超過了這個限制,生產者將會阻塞,直到有足夠空間可用或者消息被發(fā)送出去,默認值為 32MB。
- linger.ms:生產者在嘗試發(fā)送消息前等待的最長時間(以毫秒為單位)。默認情況下,linger.ms 的值為 0,這意味著立即發(fā)送。
以上 3 個參數滿足任一個都會立即(批量)發(fā)送。
因此我們如果需要匹配發(fā)送,主要需要調整的參數是 linger.ms,如下配置所示:
spring:
kafka:
bootstrap-servers: localhost:9092 # Kafka服務器地址
consumer:
group-id: my-group # 消費者組ID
auto-offset-reset: earliest # 自動重置偏移量到最早的可用消息
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 鍵的反序列化器
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化器
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 鍵的序列化器
value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化器
batch-size: 16384
buffer-memory: 33554432
properties:
linger:
ms: 2000
3.消息批獲?。ㄖ匾?/span>
Kafka 默認每次拉取一條消息,而使用批量獲取消息可以有效提升 Kafka 運行效率。
為什么要批量獲取消息?
批量獲取消息有以下優(yōu)點:
- 降低客戶端處理開銷:對于客戶端來說,每次處理一個消息需要進行一系列的操作,如解包、解析、處理邏輯等。如果每次只拉取一個消息,客戶端會頻繁地進行這些操作,帶來較大的處理開銷。而批量拉取消息時,客戶端可以一次性處理多個消息,減少了處理單個消息的頻率,從而降低了客戶端的處理開銷。
- 減少網絡往返次數:每次拉取一個消息時,客戶端需要與 Kafka 服務器進行多次網絡往返,包括發(fā)送請求、接收響應等。這些網絡往返會帶來一定的延遲。而批量拉取消息時,客戶端可以一次性拉取多個消息,減少了網絡往返的次數,從而降低了網絡延遲。
- 優(yōu)化內存使用:批量拉取消息可以更好地規(guī)劃和利用內存。客戶端可以一次性分配足夠的內存來存儲批量拉取的消息,避免了頻繁地分配和釋放小內存塊的操作。這樣可以提高內存的使用效率,減少內存碎片的產生,進而提升系統的運行效率。
- 提高吞吐量:批量拉取消息可以提高單位時間內處理的消息數量,從而提升了 Kafka 的吞吐量。
想要實現批量讀取數據需要做以下兩步調整:
- 在配置文件中設置批讀?。?/li>
spring.kafka.listener.type=batch
- 消費者使用 List<ConsumerRecord> 接收消息,具體實現代碼如下:
@KafkaListener(topics = TOPIC)
public void listen(List<ConsumerRecord<?, ?>> consumerRecords) {
for (int i = 0; i < consumerRecords.size(); i++) {
System.out.println("監(jiān)聽到消息:" + consumerRecords.get(i).value());
}
System.out.println("------------end------------");
}
以上程序的執(zhí)行結果如下:
圖片
從執(zhí)行結果可以看出:只有一個“end”打印,這說明 Kafka 一次拉取了一批數據,而不是一個數據,否則就會有多個“end”。
4.配置調優(yōu)
合理設置 Kafka 的配置也可以一定程度的提升 Kafka 的效率,例如以下這些配置:
- 配置文件刷盤策略:調整 flush.ms 和 flush.messages 參數,控制數據何時寫入磁盤。較小的值可以降低延遲,而較大的值可以提高吞吐量。
- 網絡和 IO 操作線程配置優(yōu)化:num.network.threads 應該設置為 CPU 核心數加 1,以充分利用硬件資源。調整 socket.send.buffer.bytes 和 socket.receive.buffer.bytes 以優(yōu)化網絡緩沖區(qū)大小,緩沖區(qū)越大,吞吐量也越高。
5.JVM 調優(yōu)
因為 Kafka 是用 Java 和 Scala 兩種語言編寫的,而 Java 和 Scala 都是運行在 JVM 上的,因此保證 JVM 的高效運行,設置合理的垃圾回收器,也能間接的保證 Kafka 的運行效率。例如,對于大內存機器,可以使用 G1 垃圾收集器來減少 GC 暫停時間,并為操作系統留出足夠的內存用于頁面緩存。