高級(jí)測試:如何使用Flink對Strom任務(wù)的邏輯功能進(jìn)行復(fù)現(xiàn)測試?
Flink和Strom都是時(shí)下較為流行的數(shù)據(jù)流平臺(tái),考慮以下一種應(yīng)用場景:已經(jīng)使用Strom完成了對于某一邏輯功能的開發(fā),如果現(xiàn)在期望使用Flink實(shí)現(xiàn)相同的邏輯,那么就需要考慮如何使用Flink來對Strom任務(wù)的邏輯功能進(jìn)行最簡單的復(fù)現(xiàn)測試。
使用Flink來測試Strom任務(wù)的邏輯主要存在兩個(gè)最基本的問題:第一,Storm通過自定義的Bolt類實(shí)現(xiàn)自定義的邏輯,在Flink中如何實(shí)現(xiàn)?第二,Storm按照自定義標(biāo)準(zhǔn)實(shí)現(xiàn)數(shù)據(jù)分發(fā)的邏輯,在Flink中如何實(shí)現(xiàn)?
本文主要通過兩個(gè)最基本的Flink程序?qū)嵗龑ι鲜鰞蓚€(gè)使用Flink測試Strom任務(wù)邏輯存在的基本問題進(jìn)行解答。
第一個(gè)問題,我們可以通過Flink的ProcessFuction類進(jìn)行實(shí)現(xiàn),通過繼承該類,在該類的processElement方法中實(shí)現(xiàn)自定義邏輯。ProcessFuction類如下圖所示,我們可以通過var1這個(gè)參數(shù)直接獲取當(dāng)前流中的數(shù)據(jù),然后進(jìn)行自定義的邏輯加工,再通過Collector類var3的collect方法將處理后的數(shù)據(jù)發(fā)送到下一個(gè)流中。
假設(shè)某一Strom任務(wù)的功能邏輯是:① 對初始數(shù)據(jù)源(一個(gè)字符串)末尾添加一個(gè)字符串。② 然后再次添加另一個(gè)字符串。
我們以上述對字符串加工的Strom任務(wù)為例,說明Flink程序如何通過ProcessFuction類對該任務(wù)實(shí)現(xiàn)復(fù)現(xiàn)測試。
(1)Flink主程序,假設(shè)初始數(shù)據(jù)源為“abc”。
(2)第一個(gè)業(yè)務(wù)加工類,給數(shù)據(jù)流末尾添加“def”。
(3)第二個(gè)業(yè)務(wù)加工類,給數(shù)據(jù)流末尾添加“ghi”。
(4)執(zhí)行Flink程序,觀察輸出結(jié)果,“abc”被二次加工為“abcdefghi”。
第二個(gè)分發(fā)數(shù)據(jù)的問題,我們假設(shè)某一Strom任務(wù)的功能邏輯是對數(shù)據(jù)源(股票對象)進(jìn)行分類,將股價(jià)高于X的分為一類,將股價(jià)小于等于X的分為另一類。
我們以上述對股票數(shù)據(jù)對象分類處理的Strom任務(wù)為例,說明Flink程序如何通過旁路輸出特性實(shí)現(xiàn)對數(shù)據(jù)流按照自定義標(biāo)準(zhǔn)分類,輸出到不同的子數(shù)據(jù)流中處理。
Flink 的旁路輸出依然涉及ProcessFunction類的processElement方法,該方法的Context類型的var2參數(shù)的主要作用是利用其output方法進(jìn)行旁路輸出(我們用于進(jìn)行數(shù)據(jù)分流)。
Flink的旁路輸出特性可以用來對數(shù)據(jù)進(jìn)行分流,通過創(chuàng)建一個(gè)流的標(biāo)簽(OutputTag),再利用這個(gè)OutputTag標(biāo)簽對象作為參數(shù),調(diào)用初始/父級(jí)數(shù)據(jù)流的getSideOutput(OutputTag)方法獲取子數(shù)據(jù)流。
每個(gè)流標(biāo)簽都有一個(gè)id,也可以不創(chuàng)建對象,只要流標(biāo)簽的id相同,其中的數(shù)據(jù)就相同。因此,可以通過匿名內(nèi)部類的形式來獲取子數(shù)據(jù)流。第一個(gè)參數(shù)是id,第二個(gè)參數(shù)是數(shù)據(jù)類型(不能省略)。
(1)創(chuàng)建股票類Stock,屬性包括名稱和價(jià)格。
(2)創(chuàng)建消費(fèi)消息的Flink程序。
(3)創(chuàng)建生產(chǎn)消息的Flink程序。
我們用“STOCK_LOW_PRICE”和“STOCK_HIGH_PRICE”這兩個(gè)ID作為兩個(gè)旁路輸出標(biāo)簽的ID。
在processElement方法中,我們通過判斷股票的價(jià)格是否大于50區(qū)分出低價(jià)股和高價(jià)股,利用Context對象的output方法進(jìn)行旁路輸出,把price小于50的Stock對象輸出到ID為“STOCK_LOW_PRICE”的低價(jià)股標(biāo)簽旁路中,而把price大于等于50的Stock對象輸出到ID為“STOCK_HIGH_PRICE”的高價(jià)股標(biāo)簽旁路中。
(4)依次啟動(dòng)消費(fèi)者程序、生產(chǎn)者程序,觀察消費(fèi)者程序控制臺(tái)中的輸出:
此時(shí),桌面生成了兩個(gè)文件夾,當(dāng)中記錄了股票數(shù)據(jù),result1記錄了小于50的低價(jià)股,result2中記錄了股價(jià)大于等于50的高價(jià)股。
? ?