自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

新同事上來(lái)就用Kafka,瑟瑟發(fā)抖...

開(kāi)發(fā) 架構(gòu) 開(kāi)發(fā)工具 Kafka
事務(wù)是一個(gè)程序執(zhí)行單元,里面的所有操作要么全部執(zhí)行成功,要么全部執(zhí)行失敗。

 

圖片來(lái)自 包圖網(wǎng)

RocketMQ、Kafka 和 Pulsar 都是當(dāng)今業(yè)界應(yīng)用十分廣泛的開(kāi)源消息隊(duì)列(MQ)組件。

筆者在工作中遇到關(guān)于 MQ 選型相關(guān)的內(nèi)容,了解到關(guān)于“事務(wù)消息”這個(gè)概念在不同的 MQ 組件里有不同內(nèi)涵。

故借此文,試著淺析一番這三種消息隊(duì)列(MQ)的事務(wù)消息有何異同,目的是形成關(guān)于消息隊(duì)列事務(wù)消息的全景視圖,給有類似業(yè)務(wù)需求的同學(xué)提供一些參考和借鑒。

消息隊(duì)列演化

消息隊(duì)列(Message Queue,簡(jiǎn)稱 MQ),是指在消息的傳輸中保存消息的容器或服務(wù),是一種異步的服務(wù)間通信方式,適用于無(wú)服務(wù)器和微服務(wù)架構(gòu),是分布式系統(tǒng)實(shí)現(xiàn)高性能、高可用、可伸縮等高級(jí)特效的重要組件。

常見(jiàn)的主流消息隊(duì)列有 ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ、Pulsar 等。

而在公司內(nèi)有 TubeMQ、Ckafka、TDMQ、CMQ、CDMQ、Hippo 等。

①Kafka

Apache Kafka 是由 Apache 軟件基金會(huì)開(kāi)發(fā)的一個(gè)開(kāi)源消息系統(tǒng)項(xiàng)目,由 Scala 寫(xiě)成。

Kafka 最初是由 LinkedIn 開(kāi)發(fā),并于 2011 年初開(kāi)源。2012 年 10 月從 Apache Incubator 畢業(yè)。該項(xiàng)目的目標(biāo)是為處理實(shí)時(shí)數(shù)據(jù)提供一個(gè)統(tǒng)一、高通量、低等待的平臺(tái)。

Kafka 是一個(gè)分布式的、分區(qū)的、多復(fù)本的日志提交服務(wù)。它通過(guò)一種獨(dú)一無(wú)二的設(shè)計(jì)提供了一個(gè)消息系統(tǒng)的功能。

其整體架構(gòu)圖如下所示:

②RocketMQ

Apache RocketMQ 是一個(gè)分布式消息和流媒體平臺(tái),具有低延遲、強(qiáng)一致、高性能和可靠性、萬(wàn)億級(jí)容量和靈活的可擴(kuò)展性。它有借鑒 Kafka 的設(shè)計(jì)思想,但不是 Kafka 的拷貝。

其整體架構(gòu)圖如下所示:

③Pulsar

Apache Pulsar 是 Apache 軟件基金會(huì)頂級(jí)項(xiàng)目,是下一代云原生分布式消息流平臺(tái)。

它集消息、存儲(chǔ)、輕量化函數(shù)式計(jì)算為一體,采用計(jì)算與存儲(chǔ)分離架構(gòu)設(shè)計(jì),支持多租戶、持久化存儲(chǔ)、多機(jī)房跨區(qū)域數(shù)據(jù)復(fù)制,具有強(qiáng)一致性、高吞吐、低延時(shí)及高可擴(kuò)展性等流數(shù)據(jù)存儲(chǔ)特性,被看作是云原生時(shí)代實(shí)時(shí)消息流傳輸、存儲(chǔ)和計(jì)算最佳解決方案。

其整體架構(gòu)圖如下所示:

背景知識(shí)

①什么是事務(wù)?

事務(wù):是一個(gè)程序執(zhí)行單元,里面的所有操作要么全部執(zhí)行成功,要么全部執(zhí)行失敗。

一個(gè)事務(wù)有四個(gè)基本特性,也就是我們常說(shuō)的(ACID):

  • Atomicity(原子性):事務(wù)是一個(gè)不可分割的整體,事務(wù)內(nèi)所有操作要么全做成功,要么全失敗。
  • Consistency(一致性):事務(wù)執(zhí)行前后,數(shù)據(jù)從一個(gè)狀態(tài)到另一個(gè)狀態(tài)必須是一致的(A 向 B 轉(zhuǎn)賬,不能出現(xiàn) A 扣了錢(qián),B 卻沒(méi)收到)。
  • Isolation(隔離性):多個(gè)并發(fā)事務(wù)之間相互隔離,不能互相干擾。
  • Durablity(持久性):事務(wù)完成后,對(duì)數(shù)據(jù)的更改是永久保存的,不能回滾。

分布式事務(wù):是指事務(wù)的參與者、支持事務(wù)的服務(wù)器、資源服務(wù)器以及事務(wù)管理器分別位于不同的分布式系統(tǒng)的不同節(jié)點(diǎn)之上。分布式事務(wù)通常用于在分布式系統(tǒng)中保證不同節(jié)點(diǎn)之間的數(shù)據(jù)一致性。

分布式事務(wù)的解決方案一般有以下幾種:

XA(2PC/3PC):最具有代表性的是由 Oracle Tuxedo 系統(tǒng)提出的 XA 分布式事務(wù)協(xié)議。XA 中大致分為兩部分:事務(wù)管理器和本地資源管理器。

其中本地資源管理器往往由數(shù)據(jù)庫(kù)實(shí)現(xiàn),比如 Oracle、DB2 這些商業(yè)數(shù)據(jù)庫(kù)都實(shí)現(xiàn)了 XA 接口,而事務(wù)管理器作為全局的調(diào)度者,負(fù)責(zé)各個(gè)本地資源的提交和回滾。

XA 協(xié)議通常包含兩階段提交(2PC)和三階段提交(3PC)兩種實(shí)現(xiàn)。兩階段提交顧名思義就是要進(jìn)行兩個(gè)階段的提交:第一階段,準(zhǔn)備階段(投票階段);第二階段,提交階段(執(zhí)行階段)。

實(shí)現(xiàn)過(guò)程如下所示:

二階段提交看似能夠提供原子性的操作,但它存在著一些缺陷,三段提交(3PC)是對(duì)兩段提交(2PC)的一種升級(jí)優(yōu)化,有興趣的可以深入了解一下,這里不再贅述。

TCC(Try-Confirm-Cancel):是 Try、Commit、Cancel 三種指令的縮寫(xiě),又被稱補(bǔ)償事務(wù)。

其邏輯模式類似于 XA 兩階段提交,事務(wù)處理流程也很相似,但 2PC 是應(yīng)用于在 DB 層面,TCC 則可以理解為在應(yīng)用層面的 2PC,是需要我們編寫(xiě)業(yè)務(wù)邏輯來(lái)實(shí)現(xiàn)。

TCC 它的核心思想是:“針對(duì)每個(gè)操作都要注冊(cè)一個(gè)與其對(duì)應(yīng)的確認(rèn)(Try)和補(bǔ)償(Cancel)”。

消息事務(wù):所謂的消息事務(wù)就是基于消息隊(duì)列的兩階段提交,本質(zhì)上是對(duì)消息隊(duì)列的一種特殊利用。

它是將本地事務(wù)和發(fā)消息放在了一個(gè)分布式事務(wù)里,保證要么本地操作成功成功并且對(duì)外發(fā)消息成功,要么兩者都失敗。

基于消息隊(duì)列的兩階段提交往往用在高并發(fā)場(chǎng)景下,將一個(gè)分布式事務(wù)拆成一個(gè)消息事務(wù)(A 系統(tǒng)的本地操作+發(fā)消息)+B 系統(tǒng)的本地操作。

其中 B 系統(tǒng)的操作由消息驅(qū)動(dòng),只要消息事務(wù)成功,那么 A 操作一定成功,消息也一定發(fā)出來(lái)了。

這時(shí)候 B 會(huì)收到消息去執(zhí)行本地操作,如果本地操作失敗,消息會(huì)重投,直到 B 操作成功,這樣就變相地實(shí)現(xiàn)了 A 與 B 的分布式事務(wù)。

原理如下:

雖然上面的方案能夠完成 A 和 B 的操作,但是 A 和 B 并不是強(qiáng)一致的,而是最終一致(Eventually consistent)的。而這也是滿足 BASE 理論的要求的。

這里引申一下,BASE 是 Basically Available(基本可用)、Soft state(軟狀態(tài))和 Eventually consistent(最終一致性)三個(gè)短語(yǔ)的縮寫(xiě)。

BASE 理論是對(duì) CAP 中 AP(CAP 已經(jīng)被證實(shí)一個(gè)分布式系統(tǒng)最多只能同時(shí)滿足 CAP 三項(xiàng)中的兩項(xiàng))的一個(gè)擴(kuò)展,通過(guò)犧牲強(qiáng)一致性來(lái)獲得可用性。

當(dāng)出現(xiàn)故障允許部分不可用但要保證核心功能可用,允許數(shù)據(jù)在一段時(shí)間內(nèi)是不一致的,但最終達(dá)到一致?tīng)顟B(tài)。滿足 BASE 理論的事務(wù),我們稱之為“柔性事務(wù)”。

②什么是 Exactly-once (精確一次)語(yǔ)義?

在分布式系統(tǒng)中,任何節(jié)點(diǎn)都有可能出現(xiàn)異常甚至宕機(jī)。在消息隊(duì)列中也一樣,當(dāng) Producer 在生產(chǎn)消息時(shí),可能會(huì)發(fā)生 Broker 宕機(jī)不可用,或者網(wǎng)絡(luò)突然中斷等異常情況。

根據(jù)在發(fā)生異常時(shí) Producer 處理消息的方式,系統(tǒng)可以具備以下三種消息語(yǔ)義。

At-least-once(至少一次)語(yǔ)義:Producer 通過(guò)接收 Broker 的 ACK(消息確認(rèn))通知來(lái)確保消息成功寫(xiě)入 Topic。

然而,當(dāng) Producer 接收 ACK 通知超時(shí),或者收到 Broker 出錯(cuò)信息時(shí),會(huì)嘗試重新發(fā)送消息。

如果 Broker 正好在成功把消息寫(xiě)入到 Topic,但還沒(méi)有給 Producer 發(fā)送 ACK 時(shí)宕機(jī),Producer 重新發(fā)送的消息會(huì)被再次寫(xiě)入到 Topic,最終導(dǎo)致消息被重復(fù)分發(fā)至 Consumer。即:消息不會(huì)丟失,但有可能被重復(fù)發(fā)送。

At-most-once(最多一次)語(yǔ)義:當(dāng) Producer 在接收 ACK 超時(shí),或者收到 Broker 出錯(cuò)信息時(shí)不重發(fā)消息,那就有可能導(dǎo)致這條消息丟失,沒(méi)有寫(xiě)入到 Topic 中,也不會(huì)被 Consumer 消費(fèi)到。

在某些場(chǎng)景下,為了避免發(fā)生重復(fù)消費(fèi),我們可以容許消息丟失的發(fā)生。即:消息可能會(huì)丟失,但絕不會(huì)被重復(fù)發(fā)送。

Exactly-once(精確一次)語(yǔ)義:Exactly-once 語(yǔ)義保證了即使 Producer 多次發(fā)送同一條消息到服務(wù)端,服務(wù)端也僅僅會(huì)記錄一次。

Exactly-once 語(yǔ)義是最可靠的,同時(shí)也是最難理解的。Exactly-once 語(yǔ)義需要消息隊(duì)列服務(wù)端,消息生產(chǎn)端和消費(fèi)端應(yīng)用三者的協(xié)同才能實(shí)現(xiàn)。

比如,當(dāng)消費(fèi)端應(yīng)用成功消費(fèi)并且 ACK 了一條消息之后,又把消費(fèi)位點(diǎn)回滾到之前的一個(gè)消息 ID,那么從那個(gè)消息 ID 往后的所有消息都會(huì)被消費(fèi)端應(yīng)用重新消費(fèi)到。即:消息不會(huì)丟失,也不會(huì)被重復(fù)發(fā)送。

RocketMQ、Kafka、Pulsar 事務(wù)消息

①RocketMQ 的事務(wù)消息

RocketMQ 在 4.3.0 版中已經(jīng)支持分布式事務(wù)消息,這里 RocketMQ 采用了 2PC 的思想來(lái)實(shí)現(xiàn)了提交事務(wù)消息,同時(shí)增加一個(gè)補(bǔ)償邏輯來(lái)處理二階段超時(shí)或者失敗的消息。

流程如下圖所示:

其具體工作流程分為正常事務(wù)消息的發(fā)送及提交和不正常情況下事務(wù)消息的補(bǔ)償流程:

  • 在消息隊(duì)列上開(kāi)啟一個(gè)事務(wù)主題。
  • 事務(wù)中第一個(gè)執(zhí)行的服務(wù)發(fā)送一條“半消息”(半消息和普通消息的唯一區(qū)別是,在事務(wù)提交之前,對(duì)于消費(fèi)者來(lái)說(shuō),這個(gè)消息是不可見(jiàn)的)給消息隊(duì)列。
  • 半消息發(fā)送成功后,發(fā)送半消息的服務(wù)就會(huì)開(kāi)始執(zhí)行本地事務(wù),根據(jù)本地事務(wù)執(zhí)行結(jié)果來(lái)決定事務(wù)消息提交或者回滾。
  • 本地事務(wù)成功后會(huì)讓這個(gè)“半消息”變成正常消息,供分布式事務(wù)后面的步驟執(zhí)行自己的本地事務(wù)。

這里的事務(wù)消息,Producer 不會(huì)因?yàn)?Consumer 消費(fèi)失敗而做回滾,采用事務(wù)消息的應(yīng)用,其所追求的是高可用和最終一致性,消息消費(fèi)失敗的話,RocketMQ 自己會(huì)負(fù)責(zé)重推消息,直到消費(fèi)成功。

補(bǔ)償流程:RocketMQ 提供事務(wù)反查來(lái)解決異常情況,如果 RocketMQ 沒(méi)有收到提交或者回滾的請(qǐng)求,Broker 會(huì)定時(shí)到生產(chǎn)者上去反查本地事務(wù)的狀態(tài),然后根據(jù)生產(chǎn)者本地事務(wù)的狀態(tài)來(lái)處理這個(gè)“半消息”是提交還是回滾。

值得注意的是我們需要根據(jù)自己的業(yè)務(wù)邏輯來(lái)實(shí)現(xiàn)反查邏輯接口,然后根據(jù)返回值 Broker 決定是提交還是回滾。

而且這個(gè)反查接口需要是無(wú)狀態(tài)的,請(qǐng)求到任意一個(gè)生產(chǎn)者節(jié)點(diǎn)都會(huì)返回正確的數(shù)據(jù)。

其中,補(bǔ)償流程用于解決消息 Commit 或者 Rollback 發(fā)生超時(shí)或者失敗的情況。在 RocketMQ 事務(wù)消息的主要流程中,一階段的消息如何對(duì)用戶不可見(jiàn)。

其中,事務(wù)消息相對(duì)普通消息最大的特點(diǎn)就是一階段發(fā)送的消息對(duì)用戶是不可見(jiàn)的。

那么,如何做到寫(xiě)入消息但是對(duì)用戶不可見(jiàn)呢?RocketMQ 事務(wù)消息的做法是:如果消息是“半消息”,將備份原消息的主題與消息消費(fèi)隊(duì)列,然后改變主題為 RMQ_SYS_TRANS_HALF_TOPIC。

由于消費(fèi)組未訂閱該主題,故消費(fèi)端無(wú)法消費(fèi)“半消息”的消息,然后 RocketMQ 會(huì)開(kāi)啟一個(gè)定時(shí)任務(wù),從 Topic 為 RMQ_SYS_TRANS_HALF_TOPIC 中拉取消息進(jìn)行消費(fèi)。

根據(jù)生產(chǎn)者組獲取一個(gè)服務(wù)提供者發(fā)送回查事務(wù)狀態(tài)請(qǐng)求,根據(jù)事務(wù)狀態(tài)來(lái)決定是提交或回滾消息。

講到這里大家就明白了,這里說(shuō)的就是上文提到分布式事務(wù)中的消息事務(wù),目的是在分布式事務(wù)中實(shí)現(xiàn)系統(tǒng)的最終一致性。

②Kafka 的事務(wù)消息

與 RocketMQ 的事務(wù)消息用途不同,Kafka 的事務(wù)基本上是配合其冪等機(jī)制來(lái)實(shí)現(xiàn) Exactly-once(見(jiàn)上文)語(yǔ)義的。

開(kāi)發(fā)此功能的原因可以總結(jié)如下:

流處理的需求:隨著流處理的興起,對(duì)具有更強(qiáng)處理保證的流處理應(yīng)用的需求也在增長(zhǎng)。

例如,在金融行業(yè),金融機(jī)構(gòu)使用流處理引擎為用戶處理借款和信貸。這種類型的用例要求每條消息都只處理一次,無(wú)一例外。

換句話說(shuō),如果流處理應(yīng)用程序消費(fèi)消息 A 并將結(jié)果作為消息 B(B = f(A)),那么恰好一次處理保證意味著當(dāng)且僅當(dāng) B 被成功生產(chǎn)后 A 才能被標(biāo)記為消費(fèi),反之亦然。

事務(wù) API 使流處理應(yīng)用程序能夠在一個(gè)原子操作中使用、處理和生成消息。這意味著,事務(wù)中的一批消息可以從許多主題分區(qū)接收、生成和確認(rèn)。一個(gè)事務(wù)涉及的所有操作都作為整體成功或失敗。

目前,Kafka 默認(rèn)提供的交付可靠性保障是 At-least-once。如果消息成功“提交”,但 Broker 的應(yīng)答沒(méi)有成功發(fā)送回 Producer 端(比如網(wǎng)絡(luò)出現(xiàn)瞬時(shí)抖動(dòng)),那么 Producer 就無(wú)法確定消息是否真的提交成功了。

因此,它只能選擇重試,這就是 Kafka 默認(rèn)提供 At-least-once 保障的原因,不過(guò)這會(huì)導(dǎo)致消息重復(fù)發(fā)送。

大部分用戶還是希望消息只會(huì)被交付一次,這樣的話,消息既不會(huì)丟失,也不會(huì)被重復(fù)處理。

或者說(shuō),即使 Producer 端重復(fù)發(fā)送了相同的消息,Broker 端也能做到自動(dòng)去重。

在下游 Consumer 看來(lái),消息依然只有一條。那么問(wèn)題來(lái)了,Kafka 是怎么做到精確一次的呢?

簡(jiǎn)單來(lái)說(shuō),這是通過(guò)兩種機(jī)制:

  • 冪等性(Idempotence)
  • 事務(wù)(Transaction)

冪等性 Producer:“冪等”這個(gè)詞原是數(shù)學(xué)領(lǐng)域中的概念,指的是某些操作或函數(shù)能夠被執(zhí)行多次,但每次得到的結(jié)果都是不變的。

冪等性有很多好處,其最大的優(yōu)勢(shì)在于我們可以安全地重試任何冪等性操作,反正它們也不會(huì)破壞我們的系統(tǒng)狀態(tài)。

如果是非冪等性操作,我們還需要擔(dān)心某些操作執(zhí)行多次對(duì)狀態(tài)的影響,但對(duì)于冪等性操作而言,我們根本無(wú)需擔(dān)心此事。

在 Kafka 中,Producer 默認(rèn)不是冪等性的,但我們可以創(chuàng)建冪等性 Producer。它其實(shí)是 0.11.0.0 版本引入的新功能。

enable.idempotence 被設(shè)置成 true 后,Producer 自動(dòng)升級(jí)成冪等性 Producer,其他所有的代碼邏輯都不需要改變。

Kafka 自動(dòng)幫你做消息的重復(fù)去重。Kafka 為了實(shí)現(xiàn)冪等性,它在底層設(shè)計(jì)架構(gòu)中引入了 ProducerID 和 SequenceNumber。

ProducerID:在每個(gè)新的 Producer 初始化時(shí),會(huì)被分配一個(gè)唯一的 ProducerID,用來(lái)標(biāo)識(shí)本次會(huì)話。

SequenceNumber:對(duì)于每個(gè) ProducerID,Producer 發(fā)送數(shù)據(jù)的每個(gè) Topic 和 Partition 都對(duì)應(yīng)一個(gè)從 0 開(kāi)始單調(diào)遞增的 SequenceNumber 值。

Broker 在內(nèi)存維護(hù)(pid,seq)映射,收到消息后檢查 seq。Producer 在收到明確的的消息丟失 ack,或者超時(shí)后未收到 ack,要進(jìn)行重試。

  • new_seq=old_seq+1:正常消息。
  • new_seq<=old_seq:重復(fù)消息。
  • new_seq>old_seq+1:消息丟失。

另外我們需要了解冪等性 Producer 的作用范圍。首先,它只能保證單分區(qū)上的冪等性,即一個(gè)冪等性 Producer 能夠保證某個(gè)主題的一個(gè)分區(qū)上不出現(xiàn)重復(fù)消息,它無(wú)法實(shí)現(xiàn)多個(gè)分區(qū)的冪等性。

其次,它只能實(shí)現(xiàn)單會(huì)話上的冪等性,不能實(shí)現(xiàn)跨會(huì)話的冪等性。這里的會(huì)話,你可以理解為 Producer 進(jìn)程的一次運(yùn)行。當(dāng)你重啟了 Producer 進(jìn)程之后,這種冪等性保證就喪失了。

如果想實(shí)現(xiàn)多分區(qū)以及多會(huì)話上的消息無(wú)重復(fù),應(yīng)該怎么做呢?答案就是事務(wù)(transaction)或者依賴事務(wù)型 Producer。這也是冪等性 Producer 和事務(wù)型 Producer 的最大區(qū)別。

事務(wù)型 Producer:能夠保證將消息原子性地寫(xiě)入到多個(gè)分區(qū)中。這批消息要么全部寫(xiě)入成功,要么全部失敗。

另外,事務(wù)型 Producer 也不受進(jìn)程的重啟影響。Producer 重啟后,Kafka 依然保證它們發(fā)送消息的 Exactly-once 處理。

和普通 Producer 代碼相比,事務(wù)型 Producer 的顯著特點(diǎn)是調(diào)用了一些事務(wù) API。

如 initTransaction、beginTransaction、commitTransaction 和 abortTransaction,它們分別對(duì)應(yīng)事務(wù)的初始化、事務(wù)開(kāi)始、事務(wù)提交以及事務(wù)終止。

Kafka 事務(wù)消息是由 Producer、事務(wù)協(xié)調(diào)器、Broker、組協(xié)調(diào)器、Consumer 等共同參與實(shí)現(xiàn)的。

Producer:為 Producer 指定固定的 TransactionalId(事務(wù) id),可以穿越 Producer 的多次會(huì)(Producer 重啟/斷線重連)中,持續(xù)標(biāo)識(shí) Producer 的身份。

每個(gè)生產(chǎn)者增加一個(gè) epoch。用于標(biāo)識(shí)同一個(gè) TransactionalId 在一次事務(wù)中的 epoch,每次初始化事務(wù)時(shí)會(huì)遞增,從而讓服務(wù)端可以知道生產(chǎn)者請(qǐng)求是否舊的請(qǐng)求。

使用 epoch 標(biāo)識(shí) Producer 的每一次“重生”,可以防止同一 Producer 存在多個(gè)會(huì)話。

Producer 遵從冪等消息的行為,并在發(fā)送的 BatchRecord 中增加事務(wù) id 和 epoch。

事務(wù)協(xié)調(diào)器(Transaction Coordinator):引入事務(wù)協(xié)調(diào)器,類似于消費(fèi)組負(fù)載均衡的協(xié)調(diào)者,每一個(gè)實(shí)現(xiàn)事務(wù)的生產(chǎn)端都被分配到一個(gè)事務(wù)協(xié)調(diào)者。以兩階段提交的方式,實(shí)現(xiàn)消息的事務(wù)提交。

事務(wù)協(xié)調(diào)器使用一個(gè)特殊的 Topic:即事務(wù) Topic,事務(wù) Topic 本身也是持久化的,日志信息記錄事務(wù)狀態(tài)信息,由事務(wù)協(xié)調(diào)者寫(xiě)入。

事務(wù)協(xié)調(diào)器通過(guò) RPC 調(diào)用,協(xié)調(diào) Broker 和 Consumer 實(shí)現(xiàn)事務(wù)的兩階段提交。

每一個(gè) Broker 都會(huì)啟動(dòng)一個(gè)事務(wù)協(xié)調(diào)器,使用 hash(TransactionalId)確定 Producer 對(duì)應(yīng)的事務(wù)協(xié)調(diào)器,使得整個(gè)集群的負(fù)載均衡。

Broker:引入控制消息(Control Messages):這些消息是客戶端產(chǎn)生的并寫(xiě)入到主題的特殊消息,但對(duì)于使用者來(lái)說(shuō)不可見(jiàn)。它們是用來(lái)讓 Broker 告知消費(fèi)者之前拉取的消息是否被原子性提交。

Broker 處理事務(wù)協(xié)調(diào)器的 commit/abort 控制消息,把控制消息向正常消息一樣寫(xiě)入 Topic(圖中標(biāo) c 的消息,和正常消息交織在一起,用來(lái)確認(rèn)事務(wù)提交的日志偏移),并向前推進(jìn)消息提交偏移 hw。

組協(xié)調(diào)器:如果在事務(wù)過(guò)程中,提交了消費(fèi)偏移,組協(xié)調(diào)器在 offset log 中寫(xiě)入事務(wù)消費(fèi)偏移。當(dāng)事務(wù)提交時(shí),在 offset log 中寫(xiě)入事務(wù) offset 確認(rèn)消息。

Consumer:Consumer 過(guò)濾未提交消息和事務(wù)控制消息,使這些消息對(duì)用戶不可見(jiàn)。

有兩種實(shí)現(xiàn)方式:

Consumer 緩存方式:設(shè)置 isolation.level=read_uncommitted,此時(shí) topic 的所有消息對(duì) Consumer 都可見(jiàn)。

Consumer 緩存這些消息,直到收到事務(wù)控制消息。若事務(wù) commit,則對(duì)外發(fā)布這些消息;若事務(wù) abort,則丟棄這些消息。

Broker 過(guò)濾方式:設(shè)置 isolation.level=read_committed,此時(shí) topic 中未提交的消息對(duì) Consumer 不可見(jiàn),只有在事務(wù)結(jié)束后,消息才對(duì) Consumer 可見(jiàn)。

Broker 給 Consumer 的 BatchRecord 消息中,會(huì)包含以列表,指明哪些是“abort”事務(wù),Consumer 丟棄 abort 事務(wù)的消息即可。

因?yàn)槭聞?wù)機(jī)制會(huì)影響消費(fèi)者所能看到的消息的范圍,它不只是簡(jiǎn)單依賴高水位來(lái)判斷。

它依靠一個(gè)名為 LSO(Log Stable Offset)的位移值來(lái)判斷事務(wù)型消費(fèi)者的可見(jiàn)性。

③Pulsar 的事務(wù)消息

Apache Pulsar 在 2.8.0 正式支持了事務(wù)相關(guān)的功能,Pulsar 這里提供的事務(wù)區(qū)別于 RocketMQ 中 2PC 那種事務(wù)的實(shí)現(xiàn)方式,沒(méi)有本地事務(wù)回查的機(jī)制,更類似于 Kafka 的事務(wù)實(shí)現(xiàn)機(jī)制。

Apache Pulsar 中的事務(wù)主要用來(lái)保證類似 Pulsar Functions 這種流計(jì)算場(chǎng)景中 Exactly-once 語(yǔ)義的實(shí)現(xiàn)。

這也符合 Apache Pulsar 本身 Event Streaming 的定位,即保證端到端(End-to-End)的事務(wù)實(shí)現(xiàn)的語(yǔ)義。

在 Pulsar 中,對(duì)于事務(wù)語(yǔ)義是這樣定義的:允許事件流應(yīng)用將消費(fèi)、處理、生產(chǎn)消息整個(gè)過(guò)程定義為一個(gè)原子操作,即生產(chǎn)者或消費(fèi)者能夠處理跨多個(gè)主題和分區(qū)的消息,并確保這些消息作為一個(gè)單元被處理。

Pulsar 事務(wù)具有以下語(yǔ)義:

  • 事務(wù)中的所有操作都作為一個(gè)單元提交。要么提交所有消息,要么都不提交。
  • 每條消息只寫(xiě)入或處理一次,不會(huì)丟失數(shù)據(jù)或重復(fù)(即使發(fā)生故障)。
  • 如果事務(wù)中止,則此事務(wù)中的所有寫(xiě)入和確認(rèn)都將回滾。

事務(wù)中的批量消息可以被以多分區(qū)接收、生產(chǎn)和確認(rèn):

  • 消費(fèi)者只能讀取已提交(確認(rèn))的消息。換句話說(shuō),Broker 不傳遞屬于打開(kāi)事務(wù)的事務(wù)消息或?qū)儆谥兄故聞?wù)的消息。
  • 跨多個(gè)分區(qū)的消息寫(xiě)入是原子性的。
  • 跨多個(gè)訂閱的消息確認(rèn)是原子性的。訂閱下的消費(fèi)者在確認(rèn)帶有事務(wù) ID 的消息時(shí),只會(huì)成功確認(rèn)一次消息。

Pulsar 事務(wù)消息由以下幾個(gè)關(guān)鍵點(diǎn)構(gòu)成:

事務(wù) ID(TxnID):標(biāo)識(shí) Pulsar 中的唯一事務(wù)。事務(wù) ID 長(zhǎng)度是 128-bit。最高 16 位保留給事務(wù)協(xié)調(diào)器的 ID,其余位用于每個(gè)事務(wù)協(xié)調(diào)器中單調(diào)遞增的數(shù)字。

事務(wù)協(xié)調(diào)器(TC):是運(yùn)行在 Pulsar Broker 中的一個(gè)模塊。它維護(hù)事務(wù)的整個(gè)生命周期,并防止事務(wù)進(jìn)入錯(cuò)誤狀態(tài);它處理事務(wù)超時(shí),并確保事務(wù)在事務(wù)超時(shí)后中止。

事務(wù)日志:所有事務(wù)元數(shù)據(jù)都保存在事務(wù)日志中。事務(wù)日志由 Pulsar 主題記錄。如果事務(wù)協(xié)調(diào)器崩潰,它可以從事務(wù)日志恢復(fù)事務(wù)元數(shù)據(jù)。

事務(wù)日志存儲(chǔ)事務(wù)狀態(tài),而不是事務(wù)中的實(shí)際消息(實(shí)際消息存儲(chǔ)在實(shí)際的主題分區(qū)中)。

事務(wù)緩存:向事務(wù)內(nèi)的主題分區(qū)生成的消息存儲(chǔ)在該主題分區(qū)的事務(wù)緩沖區(qū)(TB)中。

在提交事務(wù)之前,事務(wù)緩沖區(qū)中的消息對(duì)消費(fèi)者不可見(jiàn)。當(dāng)事務(wù)中止時(shí),事務(wù)緩沖區(qū)中的消息將被丟棄。

事務(wù)緩沖區(qū)將所有正在進(jìn)行和中止的事務(wù)存儲(chǔ)在內(nèi)存中。所有消息都發(fā)送到實(shí)際的分區(qū) Pulsar 主題。

提交事務(wù)后,事務(wù)緩沖區(qū)中的消息對(duì)消費(fèi)者具體化(可見(jiàn))。事務(wù)中止時(shí),事務(wù)緩沖區(qū)中的消息將被丟棄。

待確認(rèn)狀態(tài):掛起確認(rèn)狀態(tài)在事務(wù)完成之前維護(hù)事務(wù)中的消息確認(rèn)。如果消息處于掛起確認(rèn)狀態(tài),則在該消息從掛起確認(rèn)狀態(tài)中移除之前,其他事務(wù)無(wú)法確認(rèn)該消息。

掛起的確認(rèn)狀態(tài)被保留到掛起的確認(rèn)日志中(cursor ledger)。新啟動(dòng)的 broker 可以從掛起的確認(rèn)日志中恢復(fù)狀態(tài),以確保狀態(tài)確認(rèn)不會(huì)丟失。

處理流程一般分為以下幾個(gè)步驟:

  • 開(kāi)啟事務(wù)。
  • 使用事務(wù)發(fā)布消息。
  • 使用事務(wù)確認(rèn)消息。
  • 結(jié)束事務(wù)。

Pulsar 的事務(wù)處理流程與 Kafka 的事務(wù)處理思路大致上保持一致,大家都有一個(gè) TC 以及對(duì)應(yīng)的一個(gè)用于持久化 TC 所有操作的 Topic 來(lái)記錄所有事務(wù)狀態(tài)變更的請(qǐng)求。

同樣的在事務(wù)開(kāi)始階段也都有一個(gè)專門(mén)的 Topic 來(lái)去查詢 TC 對(duì)應(yīng)的 Owner Broker 的位置在哪里。

不同的是:

  • Kafka 中對(duì)于未確認(rèn)的消息是維護(hù)在 Broker 端的,但是 Pulsar 的是維護(hù)在 Client 端的,通過(guò) Transaction Timeout 來(lái)決定這個(gè)事務(wù)是否執(zhí)行成功,所以有了 Transaction Timeout 的存在之后,就可以確保 Client 和 Broker 側(cè)事務(wù)處理的一致性。
  • 由于 Kafka 本身沒(méi)有單條消息的 Ack,所以 Kafka 的事務(wù)處理只能是順序執(zhí)行的,當(dāng)一個(gè)事務(wù)請(qǐng)求被阻塞之后,會(huì)阻塞后續(xù)所有的事務(wù)請(qǐng)求,但是 Pulsar 是可以對(duì)消息進(jìn)行單條 Ack 的,所以在這里每一個(gè)事務(wù)的 Ack 動(dòng)作是獨(dú)立的,不會(huì)出現(xiàn)事務(wù)阻塞的情況。

結(jié)論

RocketMQ 和 Kafka/Pulsar 的事務(wù)消息實(shí)用的場(chǎng)景是不一樣的。

RocketMQ 中的事務(wù),它解決的問(wèn)題是,確保執(zhí)行本地事務(wù)和發(fā)消息這兩個(gè)操作,要么都成功,要么都失敗。

并且 RocketMQ 增加了一個(gè)事務(wù)反查的機(jī)制,來(lái)盡量提高事務(wù)執(zhí)行的成功率和數(shù)據(jù)一致性。

Kafka 中的事務(wù),它解決的問(wèn)題是,確保在一個(gè)事務(wù)中發(fā)送的多條消息,要么都成功,要么都失敗。

這里面的多條消息不一定要在同一個(gè)主題和分區(qū)中,可以是發(fā)往多個(gè)主題和分區(qū)的消息。

當(dāng)然也可以在 Kafka 事務(wù)執(zhí)行過(guò)程中開(kāi)啟本地事務(wù)來(lái)實(shí)現(xiàn)類似 RocketMQ 事務(wù)消息的效果。

但是 Kafka 是沒(méi)有事務(wù)消息反查機(jī)制的,它是直接拋出異常的,用戶可以根據(jù)異常來(lái)實(shí)現(xiàn)自己的重試等方法保證事務(wù)正常運(yùn)行。

它們的共同點(diǎn)就是:都是通過(guò)兩階段提交來(lái)實(shí)現(xiàn)事務(wù)的,事務(wù)消息都保存在單獨(dú)的主題上。

不同的地方就是 RocketMQ 是通過(guò)“半消息”來(lái)實(shí)現(xiàn)的,Kafka 是直接將消息發(fā)送給對(duì)應(yīng)的 topic,通過(guò)客戶端來(lái)過(guò)濾實(shí)現(xiàn)的。

而且它們兩個(gè)使用的場(chǎng)景區(qū)別是非常之大的,RockteMQ 主要解決的是基于本地事務(wù)和消息的數(shù)據(jù)一致性,而 Kafka 的事務(wù)則是用于實(shí)現(xiàn)它的 Exactly-once 機(jī)制,應(yīng)用于實(shí)時(shí)流計(jì)算的場(chǎng)景中。

Pulsar 的事務(wù)消息和 Kafka 應(yīng)用場(chǎng)景和語(yǔ)義類似,只是由于底層實(shí)現(xiàn)機(jī)制有差別,在一些細(xì)節(jié)上有區(qū)別。

相信看到這里就非常清楚了,對(duì)于事務(wù)消息如何選型和應(yīng)用,首先要明白你的業(yè)務(wù)需求是什么。

是要實(shí)現(xiàn)分布式事務(wù)的最終一致性,還是要實(shí)現(xiàn) Exactly-once (精確一次)語(yǔ)義?明白之后需求,選擇什么組件就十分明確了。

作者:劉若愚

簡(jiǎn)介:微信支付后臺(tái)開(kāi)發(fā)工程師,碩士畢業(yè)于北京大學(xué)。深度參與騰訊 WXG 境外支付團(tuán)隊(duì)多個(gè)重要業(yè)務(wù)的研發(fā)工作,有豐富的后臺(tái)開(kāi)發(fā)經(jīng)驗(yàn)。騰訊技術(shù)分享達(dá)人,社會(huì)招聘伯樂(lè)。

編輯:陶家龍 

出處:轉(zhuǎn)載自公眾號(hào)云加社區(qū)(ID:QcloudCommunity)

 

責(zé)任編輯:武曉燕 來(lái)源: 云加社區(qū)
相關(guān)推薦

2021-04-06 06:07:37

ZAB 協(xié)議原子廣播協(xié)議網(wǎng)絡(luò)協(xié)議

2021-03-25 08:45:15

MySQL

2021-01-11 07:48:59

CTO團(tuán)隊(duì)職場(chǎng)

2022-07-03 06:26:53

JetBrains插件

2020-09-18 10:00:33

iOS蘋(píng)果瀏覽器

2020-11-23 10:06:00

互聯(lián)網(wǎng)數(shù)據(jù)技術(shù)

2021-05-02 23:13:35

人工智能自動(dòng)化人臉識(shí)別

2019-10-23 09:50:53

微信支付寶

2019-07-15 14:13:58

人工智能職業(yè)被取代

2021-12-08 23:30:14

互聯(lián)網(wǎng)裁員危機(jī)

2019-05-13 09:23:50

GitHub代碼開(kāi)發(fā)者

2022-11-24 15:05:51

谷歌碼農(nóng)

2017-11-07 11:49:23

工信部套餐運(yùn)營(yíng)商

2022-04-22 15:28:22

算法MIT數(shù)據(jù)

2025-04-07 03:00:00

Dreamer世界模型

2020-09-04 14:30:36

程序員AI產(chǎn)品

2022-02-23 12:06:54

勒索軟件網(wǎng)絡(luò)攻擊網(wǎng)絡(luò)安全

2023-03-16 17:15:56

Meta硅谷

2022-01-26 10:52:21

代碼Python數(shù)據(jù)庫(kù)

2017-12-18 10:09:38

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)