原來Kafka也有事務(wù)啊,再也不擔(dān)心消息不一致了
前言
現(xiàn)在假定這么一個業(yè)務(wù)場景,從kafka中的topic獲取消息數(shù)據(jù),經(jīng)過一定加工處理后,發(fā)送到另外一個topic中,要求整個過程消息不能丟失,也不能重復(fù)發(fā)送,即實現(xiàn)端到端的Exactly-Once精確一次消息投遞。這該如何實現(xiàn)呢?
Kafka事務(wù)介紹
針對上面的業(yè)務(wù)場景,kafka已經(jīng)替我們想到了,在kafka 0.11版本以后,引入了一個重大的特性:冪等性和事務(wù)。
冪等性
這里提到冪等性的原因,主要是因為事務(wù)的啟用必須要先開啟冪等性,那么什么是冪等性呢?
冪等性是指生產(chǎn)者無論向kafka broker發(fā)送多少次重復(fù)的數(shù)據(jù),broker 端只會持久化一條,保證數(shù)據(jù)不會重復(fù)。
冪等性通過生產(chǎn)者配置項enable.idempotence=true開啟,默認(rèn)情況下為true。
冪等性實現(xiàn)原理
- 每條消息都有一個主鍵,這個主鍵由 <PID, Partition, SeqNumber>組成。
- PID:ProducerID,每個生產(chǎn)者啟動時,Kafka 都會給它分配一個 ID,ProducerID 是生產(chǎn)者的唯一標(biāo)識,需要注意的是,Kafka 重啟也會重新分配 PID。
- Partition:消息需要發(fā)往的分區(qū)號。
- SeqNumber:生產(chǎn)者,他會記錄自己所發(fā)送的消息,給他們分配一個自增的 ID,這個 ID 就是 SeqNumber,是該消息的唯一標(biāo)識,每發(fā)送一條消息,序列號加 1。
- 對于主鍵相同的數(shù)據(jù),kafka 是不會重復(fù)持久化的,它只會接收一條。
冪等性缺點
根據(jù)冪等性的原理,我們發(fā)現(xiàn)它存在下面的缺點:
- 只能保證單分區(qū)、單會話內(nèi)的數(shù)據(jù)不重復(fù)
- kafka 掛掉,重新給生產(chǎn)者分配了 PID,還是有可能產(chǎn)生重復(fù)的數(shù)據(jù)
那么如何實現(xiàn)跨分區(qū)、kafka broker重啟也能保證不重復(fù)呢?這就要使用事務(wù)了。
事務(wù)
所謂事務(wù),就是要求保證原子性,要么全部成功,要么全部失敗。那么具體該如何開啟呢?
- kafka要想開啟事務(wù)必須要啟用冪等性,即生產(chǎn)者配置enable.idempotence=true
- kafka生產(chǎn)者需要配置唯一的事務(wù)idtransactional.id, 最好為其設(shè)置一個有意義的名字。
- kafka消費端也有一個配置項isolation.level和事務(wù)有很大關(guān)系。
- read_uncommitted:默認(rèn)值,消費端應(yīng)用可以看到(消費到)未提交的事務(wù),當(dāng)然對于已提交的事務(wù)也是可見的。
- read_committed:消費端應(yīng)用只能消費到提交的事務(wù)內(nèi)的消息。
Kafka事務(wù) API
現(xiàn)在我們用java的api來實現(xiàn)一下前面這個“消費-處理-生產(chǎn)“的例子吧。
- 引入依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
- 創(chuàng)建事務(wù)的生產(chǎn)者
Properties prodcuerProps = new Properties();
// kafka地址
prodcuerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
// key序列化
prodcuerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// value序列化
prodcuerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 啟用冪等性
producerProps.put("enable.idempotence", "true");
// 設(shè)置事務(wù)id
producerProps.put("transactional.id", "prod-1");
KafkaProducer<String, String> producer = new KafkaProducer(prodcuerProps);
- enable.idempotence配置項目為true
- 設(shè)置transactional.id
- 創(chuàng)建事務(wù)的消費者
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put("group.id", "my-group-id");
// 設(shè)置consumer手動提交
consumerProps.put("enable.auto.commit", "false");
// 設(shè)置隔離級別,讀取事務(wù)已提交的消息
consumerProps.put("isolation.level", "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
//訂閱主題
consumer.subscribe(Collections.singletonList("topic1"));
- enable.auto.commit=false,設(shè)置手動提交消費者offset
- 設(shè)置isolation.level=read_committed,消費事務(wù)已提交的消息
4.核心邏輯
// 初始化事務(wù)
producer.initTransactions();
while(true) {
// 拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
if(!records.isEmpty()){
// 準(zhǔn)備一個 hashmap 來記錄:"分區(qū)-消費位移" 鍵值對
HashMap<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();
// 開啟事務(wù)
producer.beginTransaction();
try {
// 獲取本批消息中所有的分區(qū)
Set<TopicPartition> partitions = records.partitions();
// 遍歷每個分區(qū)
for (TopicPartition partition : partitions) {
// 獲取該分區(qū)的消息
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
// 遍歷每條消息
for (ConsumerRecord<String, String> record : partitionRecords) {
// 執(zhí)行數(shù)據(jù)的業(yè)務(wù)處理邏輯
ProducerRecord<String, String> outRecord = new ProducerRecord<>("topic2", record.key(), record.value().toUpperCase());
// 將處理結(jié)果寫入 kafka
producer.send(outRecord);
}
// 將處理完的本分區(qū)對應(yīng)的消費位移記錄到 hashmap 中
long offset = partitionRecords.get(partitionRecords.size() - 1).offset();
// 事務(wù)提交的是即將到來的偏移量,這意味著我們需要加 1
offsetsMap.put(partition,new OffsetAndMetadata(offset+1));
}
// 向事務(wù)管理器提交消費位移
producer.sendOffsetsToTransaction(offsetsMap,"groupid");
// 提交事務(wù)
producer.commitTransaction();
} catch(Exeception e) {
e.printStackTrace();
// 終止事務(wù)
producer.abortTransaction();
}
}
}
- initTransactions(): 初始化事務(wù)
- beginTransaction(): 開啟事務(wù)
- sendOffsetsToTransaction(): 在事務(wù)內(nèi)提交已經(jīng)消費的偏移量(主要用于消費者)
- commitTransaction(): 提交事務(wù)
- abortTransaction(): 放棄事務(wù)
Kafka事務(wù)實現(xiàn)原理
kafka事務(wù)的實現(xiàn)引入了事務(wù)協(xié)調(diào)器,如下圖所示:
- 生產(chǎn)者使用事務(wù)必須配置事務(wù)id, kafka根據(jù)事務(wù)id計算分配事務(wù)協(xié)調(diào)器
- 事務(wù)協(xié)調(diào)器返回pid,前面的冪等性中需要
- 開始發(fā)送消息到topic中,不過這些消息與普通的消息不同,它們帶著一個字段標(biāo)識自己是事務(wù)消息
- 當(dāng)生產(chǎn)者事務(wù)內(nèi)的消息發(fā)送完畢,會向事務(wù)協(xié)調(diào)器發(fā)送 commit 或 abort 請求,等待 kafka 響應(yīng)
- 事務(wù)協(xié)調(diào)器收到請求后先持久化到內(nèi)置事務(wù)主題__transaction_state中,__transaction_state默認(rèn)有50個分區(qū),每個分區(qū)負(fù)責(zé)一部分事務(wù)。事務(wù)劃分是根據(jù)transactional.id的hashcode值%50,計算出該事務(wù)屬于哪個分區(qū)。 該分區(qū)Leader副本所在的broker節(jié)點即為這個transactional.id對應(yīng)的Transaction Coordinator節(jié)點,這也是上面第一步中的計算邏輯。
- 事務(wù)協(xié)調(diào)器后臺會跟topic通信,告訴它們事務(wù)是成功還是失敗的。
- 如果是成功,topic會匯報自己已經(jīng)收到消息,協(xié)調(diào)者收到主題的回應(yīng)便確認(rèn)了事務(wù)完成,并持久化這一結(jié)果。
- 如果是失敗的,主題會把這個事務(wù)內(nèi)的消息丟棄,并匯報給協(xié)調(diào)者,協(xié)調(diào)者收到所有結(jié)果后再持久化這一信息,事務(wù)結(jié)束。
- 持久化第6步中的事務(wù)成功或者失敗的信息, 如果kafka broker配置max.transaction.timeout.ms之前既不提交也不中止事務(wù), kafka broker將中止事務(wù)本身。 此屬性的默認(rèn)值為 15 分鐘。
總結(jié)
本文講解了通過kafka事務(wù)可以實現(xiàn)端到端的精確一次的消息語義,通過事務(wù)機制,KAFKA 實現(xiàn)了對多個 topic 的多個 partition 的原子性的寫入,通過一個例子了解了一下如何使用事物。同時也簡單介紹了事務(wù)實現(xiàn)的原理,它底層必須要依賴kafka的冪等性機制,同時通過類似“二段提交”的方式保證事務(wù)的原子性。