消息隊(duì)列實(shí)現(xiàn) Exactly Once,看 Pulsar 是怎樣實(shí)現(xiàn)的
大家好 ,我是君哥。
在使用消息隊(duì)列時(shí),我們希望消息能夠精準(zhǔn)推送(Exactly Once),不會(huì)丟失、也不會(huì)重復(fù)。Exactly Once 其實(shí)是很難實(shí)現(xiàn)的,Pulsar 這款消息中間件使用事務(wù)消息實(shí)現(xiàn)了 Exactly Once,今天就帶大家了解一下。
1.一個(gè)場景
為什么需要 Exactly Once 呢?下面我們看一個(gè)轉(zhuǎn)賬場景。
客戶從轉(zhuǎn)賬 APP 上操作,從 A 賬戶向 B 賬戶轉(zhuǎn)賬 100 元,但是 B 賬戶增加金額后,給 Broker 返回 ACK 失敗,導(dǎo)致 Broker 再次給賬戶 B 推送增加金額的消息,導(dǎo)致賬戶 B 增加了兩次,最終導(dǎo)致金額不一致。
當(dāng)然,賬戶 B 通過消費(fèi)者冪等可以避免這個(gè)問題,但如果是生產(chǎn)者重復(fù)發(fā)送導(dǎo)致 Broker 保存了兩條消息呢?
2.Pulsar 去重
通過消息去重可以解決上面的消息重復(fù)問題嗎?我們看一下 Pulsar 的去重機(jī)制。
Producer 發(fā)送消息時(shí),消息體帶一個(gè) sequenceId 字段,這個(gè)字段在同一個(gè) Producer 內(nèi)是嚴(yán)格遞增的。Broker 通過<ProducerName, sequenceId> 來記錄每一個(gè) Producer 的最大 sequenceId。如果 Broker 收到 Producer 的消息小于等于保存的當(dāng)前 Producer 的 sequenceId,說明是重復(fù)消息,直接返回失敗。
消息去重從一定程度上可以避免消息重復(fù),但是只能保證在 Topic-Partition 這個(gè)維度進(jìn)行去重,如果一個(gè) Topic 對(duì)應(yīng)多個(gè) Partition,如下圖:
Producer 發(fā)送消息后,Broker1 保存成功,但是沒有返回 ack,Producer 把消息重新發(fā)送到了 Broker2,最終導(dǎo)致 Consumer 收到 2 條消息。
3.事務(wù)消息
Pusar 的事務(wù)消息不僅可以解決上面的去重問題,還可以解決一些復(fù)雜場景。比如下面這個(gè)場景:
Consumer 從 Topic1 的兩個(gè) Partition 中各消費(fèi)一條消息后,做加工計(jì)算(重復(fù)消費(fèi)會(huì)影響加工結(jié)果),然后把結(jié)果分別發(fā)送到 Topic2 的兩個(gè) Partition 中。這個(gè)復(fù)雜的事務(wù),要保證消息既不會(huì)重復(fù)也不會(huì)丟失,僅僅靠去重,就很難實(shí)現(xiàn)了。Pulsar 參考了分布式事務(wù)的主流實(shí)現(xiàn),支持了消息的分布式事務(wù)。
Pulsar 的事務(wù)模型能保證生產(chǎn)和消費(fèi)都能精確一次,即使 Broker 宕機(jī),也不會(huì)處理失敗。
同時(shí),Pulsar 事務(wù)消息支持更復(fù)雜的場景,比如:
- 生產(chǎn)者在一個(gè)事務(wù)中分別發(fā)送一條消息到不同 Partition,要不同時(shí)成功,要不同時(shí)失??;
- 消費(fèi)者從不同 Partition 消費(fèi)多條消息,要不全部成功,要不全部失敗;
- 上面兩個(gè)場景的組合,見上面的圖。
那 Pulsar 的事務(wù)消息是怎么實(shí)現(xiàn)的呢?Pulsar 參考了分布式事務(wù)的實(shí)現(xiàn)方式,我們?cè)倩仡櫼幌路植际绞聞?wù)的三個(gè)角色:
- TC: 事務(wù)協(xié)調(diào)器,管理全局事務(wù)和分支事務(wù)的狀態(tài),Pulsar 會(huì)選擇 Topic 中 Partition 所在的一個(gè) Broker 作為 TC;
- TM:管理全局事務(wù),包括開啟全局事務(wù),提交/回滾全局事務(wù)。Pulsar 使用
pulsarClient.newTransaction()
開啟一個(gè)事務(wù),這會(huì)向 TC 注冊(cè)全局事務(wù)并且獲得全局事務(wù) ID(TCID)。 - RM:管理分支事務(wù)。
下圖,我們把上面復(fù)雜的事務(wù)用分布式事務(wù)來實(shí)現(xiàn):
說明幾點(diǎn):
- Producer1 既是生產(chǎn)者也是 TM;
- Broker1 既是 TC 也是 RM;
- Producer 和 Consumer 的事務(wù)分開來管理。上圖中只是畫出了生產(chǎn)者的事務(wù)提交,消費(fèi)者類似;
- 我們知道,分布式事務(wù)的實(shí)現(xiàn)模式一般包括 AT、TCC、SAGA 和 XA,那 Pulsar 的實(shí)現(xiàn)模式是哪一種呢?對(duì)于 Producer 和 Consumer,情況不一樣。
對(duì)于 Producer 的事務(wù)消息,更像是 AT 模式,消息直接發(fā)送給 Broker 并持久化,不過持久化之前會(huì)在 TopicTransactionBuffer 中記錄元數(shù)據(jù)(類似 AT 模式中的回滾日志),全局事務(wù)回滾時(shí)可以使用這些元數(shù)據(jù)回滾消息。當(dāng)然回滾消息并不是刪除消息,而是讓消息不被消費(fèi)到,具體做法是在回滾的事務(wù)會(huì)被打上 Aborted 標(biāo)簽,根據(jù)這個(gè)標(biāo)簽來決定消息不會(huì)推送給 Consumer。
對(duì)于 Consumer 的事務(wù)消息,我個(gè)人覺得有點(diǎn)參考 XA 模式,不過這里沒有數(shù)據(jù)源代理,而是用了消息緩存,這里緩存的不是消息本身,而是消費(fèi)者的 ack 消息。也就是說消費(fèi)者消費(fèi)完成后并沒有直接發(fā)送 ack 給 Broker,而是先發(fā)送到 pendingAckSore 做緩存,在提交全局事務(wù)時(shí)才會(huì)真正地提交 ack 消息。
- 全局事務(wù)沒有提交之前,消息可能會(huì)被消費(fèi)到嗎?不會(huì),每個(gè) Topic 都會(huì)記錄自己的 maxReadPosition 屬性,標(biāo)識(shí)消費(fèi)者可以從 Broker 拉取消息的最大位置,分布式事務(wù)提交全局事務(wù)之前,maxReadPosition 是不變的,所有未提交全局事務(wù)的消息不可能被消費(fèi)到。但這里也會(huì)有一個(gè)隱患,那就是阻塞普通消息的消費(fèi),在當(dāng)前事務(wù)提交之前,普通消息即使發(fā)送成功了,消費(fèi)者也拉取不到。
4.總結(jié)
Pulsar 使用事務(wù)消息實(shí)現(xiàn)了 Exactly Once 這個(gè)消息投遞的最高要求。從上面的講解看,事務(wù)消息的實(shí)現(xiàn)還是比較復(fù)雜的,不過從 Producer 和 Consumer 端分開實(shí)現(xiàn)這個(gè)角度看 ,更容易理解一些。
最后,一起思考一個(gè)極端場景,如果分布式事務(wù)中有兩個(gè)消費(fèi)者,一個(gè)消費(fèi)者消費(fèi)成功并且發(fā)送 ack,另一個(gè)消費(fèi)者因?yàn)榇a問題消費(fèi)失敗并且沒有回復(fù) ack,最終全局事務(wù)因?yàn)槌瑫r(shí)而做回滾,那第一個(gè)消費(fèi)者已經(jīng)消費(fèi),這還能保證全局一致嗎?當(dāng)然不能,除非消費(fèi)者消費(fèi)邏輯也加入這個(gè)全局事務(wù)。
消息隊(duì)列的分布式事務(wù)一直是一個(gè)復(fù)雜的話題,分布式事務(wù)的設(shè)計(jì)思想也非常值得我們借鑒學(xué)習(xí)。但無論使用哪個(gè)中間件,消費(fèi)端冪等是保障業(yè)務(wù)正確性的底線,最靠譜的方式還是從業(yè)務(wù)代碼層面來保證冪等。