自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

RocketMQ-Streams架構(gòu)設(shè)計(jì)淺析

精選
開發(fā) 架構(gòu)
本期將帶領(lǐng)大家從源碼的角度,解析RocketMQ-Streams的構(gòu)建,數(shù)據(jù)流轉(zhuǎn)過程。也會(huì)討論RocketMQ-Streams是如何實(shí)現(xiàn)故障恢復(fù)和擴(kuò)縮容的。

作者 |倪澤

RocketMQ-Streams 是一款輕量級(jí)流處理引擎,應(yīng)用以SDK 的形式嵌入并啟動(dòng),即可進(jìn)行流處理計(jì)算,不依賴于其他組件,最低1核1G可部署,在資源敏感場(chǎng)景具有很大優(yōu)勢(shì)。同時(shí)它支持 UTF/UTAF/UTDF 多種計(jì)算類型。目前已經(jīng)廣泛運(yùn)用于安全,風(fēng)控,邊緣計(jì)算等場(chǎng)景。

本期將帶領(lǐng)大家從源碼的角度,解析RocketMQ-Streams的構(gòu)建,數(shù)據(jù)流轉(zhuǎn)過程。也會(huì)討論RocketMQ-Streams是如何實(shí)現(xiàn)故障恢復(fù)和擴(kuò)縮容的。

一、使用示例

代碼示例:

public class RocketMQWindowExample {
public static void main(String[] args) {
DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
source.fromRocketmq(
"topicName",
"groupName",
false,
"namesrvAddr")
.map(message -> JSONObject.parseObject((String) message))
.window(TumblingWindow.of(Time.seconds(10)))
.groupBy("groupByKey")
.sum("字段名", "輸出別名")
.count("total")
.waterMark(5)
.setLocalStorageOnly(true)
.toDataSteam()
.toPrint(1)
.start();

}

}

pom文件依賴:

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-clients</artifactId>
<version>1.0.1-preview</version>
</dependency>

上述代碼是一個(gè)簡(jiǎn)單的使用例子,它主要的功能是從RocketMQ中指定topic讀取數(shù)據(jù),經(jīng)過轉(zhuǎn)化成JSON格式,以groupByKey字段值分組、10秒一個(gè)窗口,對(duì)OutFlow字段值進(jìn)行累加,結(jié)果輸出到total字段,并打印到控制臺(tái)上。上述計(jì)算中還允許輸入亂序5秒,即窗口時(shí)間到達(dá)后不會(huì)馬上觸發(fā),而是會(huì)等待5s,如果這個(gè)段時(shí)間內(nèi),有窗口數(shù)據(jù)到達(dá)依然有效。上述setLocalStorageOnly為true表示不對(duì)狀態(tài)進(jìn)行遠(yuǎn)程存儲(chǔ),僅使用RocksDB做本地存儲(chǔ)。目前1.0.1的RocketMQ-Streams版本依然使用Mysql作為遠(yuǎn)程狀態(tài)存儲(chǔ),下一版本將使用RocketMQ作為遠(yuǎn)程狀態(tài)存儲(chǔ)。

二、RocketMQ總體架構(gòu)圖

RocketMQ-Streams 作為輕量流處理引擎,本質(zhì)上是作為RocketMQ 的客戶端消費(fèi)數(shù)據(jù),一個(gè)流處理實(shí)例可以處理多個(gè)隊(duì)列,而一個(gè)隊(duì)列只能被一個(gè)實(shí)例消費(fèi)。若干RocketMQ-Streams 實(shí)例組成消費(fèi)者組共同消費(fèi)數(shù)據(jù),通過擴(kuò)容實(shí)例達(dá)到增加處理能力的消費(fèi),減少實(shí)例則會(huì)發(fā)生rebalance,消費(fèi)的隊(duì)列自動(dòng)重平衡到其他消費(fèi)實(shí)例上。從上述圖中,我們還可以看出計(jì)算實(shí)例間不需要直接交換任何數(shù)據(jù),可各自獨(dú)立完成所有計(jì)算處理。這種架構(gòu)簡(jiǎn)化了RocketMQ-Streams 本身的設(shè)計(jì),同時(shí)也可非常方便的進(jìn)行實(shí)例擴(kuò)縮容。

處理拓?fù)?/h4>

處理器拓?fù)錇閼?yīng)用定義了流處理過程的計(jì)算邏輯,它由一系列的處理器節(jié)點(diǎn)和數(shù)據(jù)流向組成。例如,在開頭的代碼示例中,整個(gè)處理拓?fù)溆蓅ource、map、groupBy、sum、count、print等處理節(jié)點(diǎn)組成。有兩種特殊的處理節(jié)點(diǎn):

  • source節(jié)點(diǎn)

他沒有任何上游節(jié)點(diǎn),從外部讀入數(shù)據(jù)到RocketMQ-Streams,并交由下游處理。

  • sink節(jié)點(diǎn)

他沒有任何下游節(jié)點(diǎn),他將處理后的數(shù)據(jù)寫出到外部。

處理拓?fù)鋬H僅是流處理代碼的邏輯抽象,在流計(jì)算啟動(dòng)時(shí)將會(huì)被實(shí)例化。為了設(shè)計(jì)簡(jiǎn)單,目前一個(gè)流處理實(shí)例中僅有一張計(jì)算拓?fù)洹?/p>

在所有流處理算子之中,有兩種特別的算子,一種是涉及數(shù)據(jù)分組的算子groupBy,另一種是有狀態(tài)計(jì)算例如count等。這兩種算子會(huì)影響整個(gè)計(jì)算拓?fù)涞臉?gòu)建,下面將具體分析RocketMQ-Streams是如何處理他們的。

groupBy

分組算子groupBy特殊是因?yàn)榻?jīng)過groupBy操作,后續(xù)算子期望對(duì)相同key的數(shù)據(jù)進(jìn)行操作,例如經(jīng)過groupBy("年級(jí)")之后再進(jìn)行sum就是對(duì)按照年級(jí)分組求和,這就要求需要將具有相同“年級(jí)”的數(shù)據(jù)重新路由到一個(gè)流計(jì)算實(shí)例上處理,如果不這樣做,每個(gè)實(shí)例上得出的結(jié)果都將是不完整的,整體輸出結(jié)果也將是錯(cuò)誤的。

RocketMQ-Streams 采用 shuffle topic 這種方式來處理。具體說來,計(jì)算實(shí)例將groupBy數(shù)據(jù)重新發(fā)回RocketMQ的一個(gè)topic,并且在發(fā)回過程中按照key的hash值來選擇目標(biāo)隊(duì)列,再從這個(gè)topic讀取數(shù)據(jù)進(jìn)行后續(xù)流處理。按照key hash后相同的key一定在一個(gè)隊(duì)列里面,而一個(gè)隊(duì)列只會(huì)被一個(gè)流處理實(shí)例消費(fèi),這樣就達(dá)到相同key被路由到一個(gè)實(shí)例上處理的效果。

有狀態(tài)算子

有狀態(tài)算子與無狀態(tài)算子相對(duì)。如果計(jì)算結(jié)果只與當(dāng)前輸入有關(guān),和上一次輸入無關(guān)就是無狀態(tài)算子,例如filter、map、foreach結(jié)果只與當(dāng)前輸入有關(guān)系。還有一種算子的輸出結(jié)果不僅與當(dāng)前算子有關(guān)系還與上一次輸入有關(guān),例如sum,需要對(duì)一段時(shí)間內(nèi)輸入進(jìn)行求和,他就是有狀態(tài)算子。

RocketMQ-Streams利用RocksDB作為本地存儲(chǔ),Mysql作為遠(yuǎn)程存儲(chǔ)來保存狀態(tài)數(shù)據(jù)。他具體做法是:

當(dāng)發(fā)現(xiàn)消息來自新的隊(duì)列時(shí),檢查是否需要加載狀態(tài),如果需要異步加載狀態(tài)到RocksDB。

數(shù)據(jù)到達(dá)有狀態(tài)算子時(shí),如果加載完成使用RocksDB中狀態(tài)進(jìn)行計(jì)算,如果沒有,使用Mysql中狀態(tài)計(jì)算。

計(jì)算完成后,將狀態(tài)數(shù)據(jù)保存到RocksDB和Mysql中。

窗口觸發(fā)后,從RocksDB中查詢出狀態(tài)數(shù)據(jù),并將結(jié)果向下游算子傳遞。

整體數(shù)據(jù)流向圖如下:

三、擴(kuò)縮容與故障恢復(fù)

擴(kuò)縮容和故障恢復(fù)是一個(gè)硬幣的兩面,即同一個(gè)事物的兩種表達(dá),計(jì)算集群如果能正確擴(kuò)縮容就等于具備故障恢復(fù)的能力,反之亦然。通過前面介紹我們知道,RocketMQ-Streams具有非常良好的擴(kuò)縮容性能,擴(kuò)容時(shí)只需要新部署一個(gè)流計(jì)算實(shí)例即可,縮容時(shí)停止計(jì)算實(shí)例即可。對(duì)于無狀態(tài)的計(jì)算來說比較簡(jiǎn)單,擴(kuò)容后,數(shù)據(jù)計(jì)算不需要之前的狀態(tài)。有狀態(tài)計(jì)算的擴(kuò)縮容涉及到狀態(tài)的遷移。有狀態(tài)的擴(kuò)縮容可由下圖表示:

當(dāng)計(jì)算實(shí)例從3個(gè)縮容到2個(gè),借助于RocketMQ的rebalance,MQ會(huì)在計(jì)算實(shí)例之間重新分配。

Instance1上消費(fèi)的MQ2和MQ3被分配到Instance2和Instance3上,這兩個(gè)MQ的狀態(tài)數(shù)據(jù)也需要遷移到Instance2和Instance3上,這也暗示,狀態(tài)數(shù)據(jù)是根據(jù)源數(shù)據(jù)分片保存的;擴(kuò)容則是剛好相反的過程。

具體實(shí)現(xiàn)上,RocketMQ-Streams采用系統(tǒng)消息來觸發(fā)狀態(tài)的加載和持久化。

系統(tǒng)消息類別:

//新增消費(fèi)隊(duì)列
NewSplitMessage

//不在消費(fèi)某個(gè)隊(duì)列
RemoveSplitMessage

//客戶端持久化消費(fèi)位點(diǎn)到MQ
CheckPointMessage

當(dāng)發(fā)現(xiàn)消息來自一個(gè)新的RocketMQ隊(duì)列(MessageQueue),RocketMQ-Streams之前沒有處理過來自該隊(duì)列的消息,會(huì)先于數(shù)據(jù)前發(fā)送NewSplitMessage消息,通過處理拓?fù)湎掠嗡阕觽鬟f,當(dāng)有狀態(tài)算子收到該消息時(shí)會(huì)將新增隊(duì)列對(duì)應(yīng)的狀態(tài)加載到本地內(nèi)存RocksDB中,當(dāng)數(shù)據(jù)真正到達(dá)時(shí),就根據(jù)這個(gè)狀態(tài)繼續(xù)計(jì)算。

當(dāng)因?yàn)橛?jì)算實(shí)例增加或者RocketMQ集群變動(dòng),rebalance后,計(jì)算實(shí)例不再消費(fèi)某個(gè)隊(duì)列(MessageQueue)時(shí),會(huì)發(fā)出RemoveSplitMessage消息,有狀態(tài)算子刪除本地RocksDB中的狀態(tài)。

CheckPointMessage是一種特別的系統(tǒng)消息,他的作用與實(shí)現(xiàn)exactly-once有關(guān)。我們?cè)跀U(kuò)縮容過程中需要做到exactly-once,才能保證擴(kuò)縮容或故障恢復(fù)對(duì)計(jì)算結(jié)果沒有影響。RocketMQ-streams向broker提交消費(fèi)offset前會(huì)產(chǎn)生CheckPointMessage消息,向下游拓?fù)鋫鬟f,他將保證即將提交消費(fèi)位點(diǎn)的所有消息都已經(jīng)被sink處理掉。

開源地址:

RocketMQ-Streams 倉庫地址:

https://github.com/apache/rocketmq-streams

RocketMQ 倉庫地址:

https://github.com/apache/rocketmq

作者:倪澤,RocketMQ 資深貢獻(xiàn)者, RocketMQ-Streams 維護(hù)者之一,阿里云技術(shù)專家。

責(zé)任編輯:武曉燕 來源: 阿里巴巴中間件
相關(guān)推薦

2010-07-14 09:01:07

架構(gòu)設(shè)計(jì)

2020-04-22 14:25:48

云開發(fā)高可用架構(gòu)

2013-05-27 10:58:28

Tumblr架構(gòu)設(shè)計(jì)雅虎收購

2023-07-05 08:00:52

MetrAuto系統(tǒng)架構(gòu)

2015-06-02 04:17:44

架構(gòu)設(shè)計(jì)審架構(gòu)設(shè)計(jì)說明書

2025-04-15 04:00:00

2015-06-02 04:34:05

架構(gòu)設(shè)計(jì)

2012-06-07 10:45:12

軟件架構(gòu)設(shè)計(jì)原則

2019-11-25 10:58:19

Tomcat架構(gòu)Web

2021-10-28 06:17:46

架構(gòu)設(shè)計(jì)組件

2009-02-01 10:17:19

Java架構(gòu)設(shè)計(jì)設(shè)計(jì)模式

2023-05-12 08:06:46

Kubernetes多云架構(gòu)

2024-04-17 08:03:45

架構(gòu)設(shè)計(jì)Java

2009-07-10 09:31:57

MyEclipse U

2017-11-17 07:06:27

互聯(lián)網(wǎng)分層架構(gòu)APP

2024-08-18 14:09:24

2021-07-21 16:30:38

iOSAPP架構(gòu)

2012-09-19 13:46:37

存儲(chǔ)存儲(chǔ)設(shè)計(jì)快速表態(tài)

2013-09-02 17:46:41

MVC架構(gòu)設(shè)計(jì)MVC架構(gòu)設(shè)計(jì)

2023-12-27 13:54:00

RocketMQJava架構(gòu)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)