關(guān)于Kafka ACK機(jī)制的詳解!
Kafka的 ACK機(jī)制是確保消息成功傳遞和處理的重要機(jī)制.這篇文章,我們將詳細(xì)分析 Kafka ACK機(jī)制,包括其原理、源碼分析、使用場景以及優(yōu)缺點(diǎn)。
ACK 方式
Kafka的 ACK機(jī)制主要用于確保生產(chǎn)者發(fā)送的消息能夠被可靠地寫入到 Kafka集群的 Topic中。ACK機(jī)制的核心思想是生產(chǎn)者發(fā)送消息后,需要等待 Kafka集群的確認(rèn)(ACK),才認(rèn)為消息發(fā)送成功。
Kafka的 ACK機(jī)制主要有三種級別:
1.acks=0
生產(chǎn)者不等待服務(wù)器的確認(rèn),消息發(fā)送后即認(rèn)為成功,不管消息是否真正寫入 Kafka,這種方式效率最高,但可靠性最低,數(shù)據(jù)可能存在丟失。
2.acks=1
生產(chǎn)者會等待來自 Leader分區(qū)的確認(rèn)。Leader分區(qū)接收到消息并寫入本地日志后即返回確認(rèn)。這種方式在 Leader分區(qū)可用時(shí)可靠,但如果 Leader分區(qū)發(fā)生故障,可能會丟失數(shù)據(jù)。從 Kafka 2.0 開始,默認(rèn)值是 acks=1
3.acks=all(或-1)
生產(chǎn)者等待所有 ISR(In-Sync Replica,同步副本)分區(qū)的確認(rèn)。只有當(dāng)消息被寫入所有同步副本后才返回確認(rèn),這種方式最可靠,但性能較低。
ISR的工作原理
ISR,全稱 In-Sync Replicas,翻譯為同步副本,它是指某個(gè)分區(qū)中的一組與 Leader副本保持同步的副本,即這些副本包含了 Leader副本中的所有已確認(rèn)消息。ISR是 Kafka 集群中用于保證數(shù)據(jù)可靠性的一個(gè)關(guān)鍵概念。
- Leader和 Follower:在 Kafka中,每個(gè)分區(qū)都有一個(gè) Leader和若干個(gè) Follower,Leader負(fù)責(zé)處理所有的讀寫請求,而 Follower則從 Leader那里拉取數(shù)據(jù)并進(jìn)行同步。
- 同步副本(ISR):ISR是一個(gè)動態(tài)的集合,包含了 Leader和所有與 Leader保持同步的 Follower,只有在 ISR中的副本才被認(rèn)為是可靠的,因?yàn)樗鼈儼伺c Leader相同的數(shù)據(jù)。
- ACK機(jī)制與 ISR:當(dāng)生產(chǎn)者發(fā)送消息并設(shè)置acks=all時(shí),Kafka只有在消息被寫入 ISR中的所有副本后才會返回確認(rèn),這確保了消息即使在 Leader故障的情況下也不會丟失,因?yàn)?ISR中的其他副本可以選舉為新的 Leader。
1.ISR的維護(hù)
Kafka通過以下機(jī)制來維護(hù)ISR:
- 加入ISR:當(dāng)一個(gè) Follower副本成功地追上了 Leader副本的日志(即復(fù)制了 Leader的所有新的消息),它會被加入到 ISR中。
- 移出ISR:當(dāng)一個(gè) Follower副本落后于 Leader超過一定的時(shí)間(由參數(shù)replica.lag.time.max.ms控制),它會被移出 ISR。
2.ISR源碼分析
以下是 Kafka中維護(hù)ISR的關(guān)鍵代碼片段(以 Kafka 2.x版本為例):
class Partition {
private Set<Replica> isr; // 當(dāng)前分區(qū)的ISR集合
public void updateISR() {
// 獲取所有副本的狀態(tài)
List<Replica> replicas = getReplicas();
// 計(jì)算新的ISR集合
Set<Replica> newIsr = new HashSet<>();
for (Replica replica : replicas) {
if (replica.isInSync()) {
newIsr.add(replica);
}
}
// 更新ISR
if (!newIsr.equals(this.isr)) {
this.isr = newIsr;
// 觸發(fā)ISR變化的事件
onISRChanged();
}
}
}
class Replica {
public boolean isInSync() {
// 判斷該副本是否與Leader同步
return this.logEndOffset >= leaderLogEndOffset - replicaLagMaxMessages;
}
}
源碼分析
以 Kafka的 Producer端代碼為例,下面是簡化后的發(fā)送消息時(shí)處理ACK機(jī)制的關(guān)鍵代碼片段:
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// 構(gòu)建請求
ProduceRequest request = new ProduceRequest(record, callback);
// 發(fā)送請求
Future<RecordMetadata> future = this.sender.send(request);
// 根據(jù)ACK配置處理確認(rèn)
if (this.acks == 0) {
// 不等待確認(rèn),直接返回成功
callback.onCompletion(null, null);
} else if (this.acks == 1) {
// 等待Leader確認(rèn)
RecordMetadata metadata = future.get();
callback.onCompletion(metadata, null);
} else if (this.acks == -1 || this.acks == "all") {
// 等待所有ISR確認(rèn)
RecordMetadata metadata = future.get();
callback.onCompletion(metadata, null);
}
return future;
}
優(yōu)缺點(diǎn)
acks=0:
- 優(yōu)點(diǎn):性能最高,延遲最低。
- 缺點(diǎn):消息可能丟失,可靠性最低。
acks=1:
- 優(yōu)點(diǎn):在性能和可靠性之間取得平衡。
- 缺點(diǎn):如果領(lǐng)導(dǎo)者在消息寫入后但未同步給副本前崩潰,消息可能丟失。
acks=all:
- 優(yōu)點(diǎn):最高的可靠性,確保消息被所有同步副本確認(rèn)。
- 缺點(diǎn):性能較低,延遲較高。
缺點(diǎn):
- 性能影響:更高的ACK級別會帶來更高的延遲,降低吞吐量。
- 復(fù)雜性:需要根據(jù)具體應(yīng)用場景選擇合適的ACK配置,增加了系統(tǒng)設(shè)計(jì)的復(fù)雜性。
適用場景
- acks=0:適用于對消息丟失不敏感且追求高吞吐量的場景,例如日志收集、監(jiān)控?cái)?shù)據(jù)等。
- acks=1:適用于對消息有一定可靠性要求,但對性能要求較高的場景,例如實(shí)時(shí)數(shù)據(jù)處理。
- acks=all:適用于對消息可靠性要求極高且可以接受較低吞吐量的場景,例如金融交易、訂單處理等。
總結(jié)
本文我們分析了 Kafka的 ACK機(jī)制以及 ISR機(jī)制,從全局來看, Kafka 和 RocketMQ有著異曲同工之妙,Kafka的 ack=all 對應(yīng) RocketMQ的同步發(fā)送,ack=1 對應(yīng) RocketMQ的異步發(fā)送,ack=0 對應(yīng) RocketMQ的單向發(fā)送。
總體來說,Kafka的 ACK機(jī)制為消息的可靠傳遞提供了不同級別的保障,開發(fā)者可以根據(jù)具體的應(yīng)用需求選擇合適的 ACK配置,以在性能和可靠性之間取得平衡。