螞蟻面試:Kafka 如何做壓測(cè)?如何保證系統(tǒng)穩(wěn)定?
Kafka是大數(shù)據(jù)領(lǐng)域應(yīng)用非常廣泛的消息中間件,如何確定Kafka集群的最大吞吐量和延遲呢?又如何保證Kafka集群的穩(wěn)定呢?今天我們來(lái)介紹Kafka壓測(cè)方案,來(lái)確認(rèn)Kafka集群的各類指標(biāo)。
一、Kafka自帶性能測(cè)試工具
Kafka提供了內(nèi)置的性能測(cè)試工具,可以用于生產(chǎn)者和消費(fèi)者的基準(zhǔn)測(cè)試:
- 生產(chǎn)者性能測(cè)試工具:kafka-producer-perf-test.sh
- 消費(fèi)者性能測(cè)試工具:kafka-consumer-perf-test.sh
第三方壓測(cè)工具JMeter:
- 可以使用JMeter的Kafka插件進(jìn)行壓測(cè)Tsung
- 支持Kafka協(xié)議的分布式壓測(cè)工具Gatling
- 可以通過(guò)Kafka插件進(jìn)行壓測(cè)
二、壓測(cè)場(chǎng)景設(shè)計(jì)
1. 生產(chǎn)者性能測(cè)試
測(cè)試不同消息大小、批處理設(shè)置和壓縮算法對(duì)生產(chǎn)者性能的影響:
# 測(cè)試100字節(jié)消息,無(wú)壓縮
/opt/kafka/bin/kafka-producer-perf-test.sh \
--topic test-topic \
--num-records 10000000 \
--record-size 100 \
--throughput -1 \
--producer-props bootstrap.servers=broker1:9092,broker2:9092,broker3:9092 \
acks=1 \
batch.size=16384 \
linger.ms=0 \
compression.type=none
# 測(cè)試1KB消息,使用lz4壓縮
/opt/kafka/bin/kafka-producer-perf-test.sh \
--topic test-topic \
--num-records 10000000 \
--record-size 1024 \
--throughput -1 \
--producer-props bootstrap.servers=broker1:9092,broker2:9092,broker3:9092 \
acks=1 \
batch.size=65536 \
linger.ms=10 \
compression.type=lz4
2. 消費(fèi)者性能測(cè)試
測(cè)試不同消費(fèi)者組配置和分區(qū)數(shù)對(duì)消費(fèi)性能的影響:
# 基本消費(fèi)者性能測(cè)試
/opt/kafka/bin/kafka-consumer-perf-test.sh \
--bootstrap-server broker1:9092,broker2:9092,broker3:9092 \
--topic test-topic \
--messages 10000000 \
--threads 1 \
--print-metrics
# 多線程消費(fèi)者測(cè)試
/opt/kafka/bin/kafka-consumer-perf-test.sh \
--bootstrap-server broker1:9092,broker2:9092,broker3:9092 \
--topic test-topic \
--messages 10000000 \
--threads 8 \
--print-metrics
3. 端到端延遲測(cè)試
測(cè)量從生產(chǎn)到消費(fèi)的端到端延遲:
# 創(chuàng)建一個(gè)具有多個(gè)分區(qū)的測(cè)試主題
/opt/kafka/bin/kafka-topics.sh \
--bootstrap-server broker1:9092 \
--create \
--topic latency-test \
--partitions 8 \
--replication-factor 3
# 使用自定義Java程序測(cè)量端到端延遲
// 生產(chǎn)者代碼示例
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10000; i++) {
long timestamp = System.currentTimeMillis();
ProducerRecord<String, String> record =
new ProducerRecord<>("latency-test", null, timestamp, "key-" + i, "value-" + timestamp);
producer.send(record);
Thread.sleep(100); // 每秒發(fā)送10條消息
}
producer.close();
// 消費(fèi)者代碼示例
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092");
props.put("group.id", "latency-test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("latency-test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
long latency = System.currentTimeMillis() - record.timestamp();
System.out.printf("Offset = %d, Latency = %d ms%n", record.offset(), latency);
}
}
4. 吞吐量與延遲權(quán)衡測(cè)試
測(cè)試不同配置下吞吐量與延遲的權(quán)衡關(guān)系:
# 高吞吐量配置測(cè)試
/opt/kafka/bin/kafka-producer-perf-test.sh \
--topic throughput-test \
--num-records 5000000 \
--record-size 1024 \
--throughput -1 \
--producer-props bootstrap.servers=broker1:9092,broker2:9092,broker3:9092 \
acks=1 \
batch.size=131072 \
linger.ms=50 \
compression.type=lz4 \
buffer.memory=67108864
# 低延遲配置測(cè)試
/opt/kafka/bin/kafka-producer-perf-test.sh \
--topic latency-test \
--num-records 1000000 \
--record-size 1024 \
--throughput -1 \
--producer-props bootstrap.servers=broker1:9092,broker2:9092,broker3:9092 \
acks=1 \
batch.size=8192 \
linger.ms=0 \
compression.type=none
三、壓測(cè)指標(biāo)分析
1. 生產(chǎn)者關(guān)鍵指標(biāo)
- 吞吐量(Throughput):每秒處理的消息數(shù)或字節(jié)數(shù)
- 延遲(Latency):消息從發(fā)送到確認(rèn)的時(shí)間
- CPU使用率:生產(chǎn)者進(jìn)程的CPU使用情況
- 內(nèi)存使用率:生產(chǎn)者進(jìn)程的內(nèi)存使用情況
- 批處理率:每批次的平均消息數(shù)
2. 消費(fèi)者關(guān)鍵指標(biāo)
- 吞吐量:每秒消費(fèi)的消息數(shù)或字節(jié)數(shù)
- 延遲:消息從生產(chǎn)到消費(fèi)的時(shí)間
- 消費(fèi)者滯后(Consumer Lag):消費(fèi)者落后于生產(chǎn)者的消息數(shù)
- 處理時(shí)間:消費(fèi)者處理每條消息的時(shí)間
- 提交率:偏移量提交的頻率和成功率
3. Broker關(guān)鍵指標(biāo)
- 請(qǐng)求處理率:每秒處理的請(qǐng)求數(shù)
- 請(qǐng)求隊(duì)列大?。旱却幚淼恼?qǐng)求數(shù)
- 網(wǎng)絡(luò)吞吐量:進(jìn)出Broker的網(wǎng)絡(luò)流量
- 磁盤使用率:日志文件的增長(zhǎng)速率
- GC暫停時(shí)間:垃圾收集對(duì)性能的影響
四、壓測(cè)結(jié)果解讀
1. 生產(chǎn)者性能分析
以下是一個(gè)典型的生產(chǎn)者性能測(cè)試結(jié)果示例:
100000 records sent, 25000.0 records/sec (24.41 MB/sec), 15.2 ms avg latency, 293.0 ms max latency.
200000 records sent, 26315.8 records/sec (25.67 MB/sec), 12.8 ms avg latency, 128.0 ms max latency.
300000 records sent, 27272.7 records/sec (26.61 MB/sec), 11.5 ms avg latency, 98.0 ms max latency.
結(jié)果解讀:
- 吞吐量隨時(shí)間穩(wěn)定在約26,000條記錄/秒(約25MB/秒)
- 平均延遲約為13毫秒,最大延遲為293毫秒
- 隨著測(cè)試進(jìn)行,延遲趨于穩(wěn)定,表明系統(tǒng)性能良好
2. 消費(fèi)者性能分析
以下是一個(gè)典型的消費(fèi)者性能測(cè)試結(jié)果示例:
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2023-05-01 10:00:00, 2023-05-01 10:01:00, 1024.00, 17.07, 1048576, 17476.27, 20, 60000, 17.07, 17476.27
結(jié)果解讀:
- 消費(fèi)速率為17.07MB/秒,約17,476條消息/秒
- 重平衡時(shí)間為20毫秒,表明消費(fèi)者組協(xié)調(diào)效率高
- 獲取時(shí)間為60秒,與測(cè)試持續(xù)時(shí)間一致
3. 瓶頸識(shí)別與解決
常見(jiàn)的性能瓶頸及解決方案:
CPU瓶頸:
- 增加broker數(shù)量
- 優(yōu)化消息壓縮算法
- 調(diào)整JVM參數(shù)
內(nèi)存瓶頸:
- 增加堆內(nèi)存大小
- 優(yōu)化生產(chǎn)者/消費(fèi)者客戶端緩沖區(qū)大小
- 減少不必要的對(duì)象創(chuàng)建
磁盤I/O瓶頸:
- 使用更快的存儲(chǔ)(如SSD)
- 增加數(shù)據(jù)目錄數(shù)量,分散I/O負(fù)載
- 優(yōu)化日志段大小和刷盤策略
網(wǎng)絡(luò)瓶頸:
- 增加網(wǎng)絡(luò)帶寬
- 優(yōu)化消息批處理大小
- 使用更高效的壓縮算法