自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

高級(jí)測試:如何使用Flink對Strom任務(wù)的邏輯功能進(jìn)行復(fù)現(xiàn)測試?

開發(fā) 測試
本文主要通過兩個(gè)最基本的Flink程序?qū)嵗龑ι鲜鰞蓚€(gè)使用Flink測試Strom任務(wù)邏輯存在的基本問題進(jìn)行解答。

Flink和Strom都是時(shí)下較為流行的數(shù)據(jù)流平臺(tái),考慮以下一種應(yīng)用場景:已經(jīng)使用Strom完成了對于某一邏輯功能的開發(fā),如果現(xiàn)在期望使用Flink實(shí)現(xiàn)相同的邏輯,那么就需要考慮如何使用Flink來對Strom任務(wù)的邏輯功能進(jìn)行最簡單的復(fù)現(xiàn)測試。

使用Flink來測試Strom任務(wù)的邏輯主要存在兩個(gè)最基本的問題:第一,Storm通過自定義的Bolt類實(shí)現(xiàn)自定義的邏輯,在Flink中如何實(shí)現(xiàn)?第二,Storm按照自定義標(biāo)準(zhǔn)實(shí)現(xiàn)數(shù)據(jù)分發(fā)的邏輯,在Flink中如何實(shí)現(xiàn)?

本文主要通過兩個(gè)最基本的Flink程序?qū)嵗龑ι鲜鰞蓚€(gè)使用Flink測試Strom任務(wù)邏輯存在的基本問題進(jìn)行解答。

第一個(gè)問題,我們可以通過Flink的ProcessFuction類進(jìn)行實(shí)現(xiàn),通過繼承該類,在該類的processElement方法中實(shí)現(xiàn)自定義邏輯。ProcessFuction類如下圖所示,我們可以通過var1這個(gè)參數(shù)直接獲取當(dāng)前流中的數(shù)據(jù),然后進(jìn)行自定義的邏輯加工,再通過Collector類var3的collect方法將處理后的數(shù)據(jù)發(fā)送到下一個(gè)流中。

假設(shè)某一Strom任務(wù)的功能邏輯是:① 對初始數(shù)據(jù)源(一個(gè)字符串)末尾添加一個(gè)字符串。② 然后再次添加另一個(gè)字符串。

我們以上述對字符串加工的Strom任務(wù)為例,說明Flink程序如何通過ProcessFuction類對該任務(wù)實(shí)現(xiàn)復(fù)現(xiàn)測試。

(1)Flink主程序,假設(shè)初始數(shù)據(jù)源為“abc”。

(2)第一個(gè)業(yè)務(wù)加工類,給數(shù)據(jù)流末尾添加“def”。

(3)第二個(gè)業(yè)務(wù)加工類,給數(shù)據(jù)流末尾添加“ghi”。

(4)執(zhí)行Flink程序,觀察輸出結(jié)果,“abc”被二次加工為“abcdefghi”。

第二個(gè)分發(fā)數(shù)據(jù)的問題,我們假設(shè)某一Strom任務(wù)的功能邏輯是對數(shù)據(jù)源(股票對象)進(jìn)行分類,將股價(jià)高于X的分為一類,將股價(jià)小于等于X的分為另一類。

我們以上述對股票數(shù)據(jù)對象分類處理的Strom任務(wù)為例,說明Flink程序如何通過旁路輸出特性實(shí)現(xiàn)對數(shù)據(jù)流按照自定義標(biāo)準(zhǔn)分類,輸出到不同的子數(shù)據(jù)流中處理。

Flink 的旁路輸出依然涉及ProcessFunction類的processElement方法,該方法的Context類型的var2參數(shù)的主要作用是利用其output方法進(jìn)行旁路輸出(我們用于進(jìn)行數(shù)據(jù)分流)。

Flink的旁路輸出特性可以用來對數(shù)據(jù)進(jìn)行分流,通過創(chuàng)建一個(gè)流的標(biāo)簽(OutputTag),再利用這個(gè)OutputTag標(biāo)簽對象作為參數(shù),調(diào)用初始/父級(jí)數(shù)據(jù)流的getSideOutput(OutputTag)方法獲取子數(shù)據(jù)流。

每個(gè)流標(biāo)簽都有一個(gè)id,也可以不創(chuàng)建對象,只要流標(biāo)簽的id相同,其中的數(shù)據(jù)就相同。因此,可以通過匿名內(nèi)部類的形式來獲取子數(shù)據(jù)流。第一個(gè)參數(shù)是id,第二個(gè)參數(shù)是數(shù)據(jù)類型(不能省略)。

(1)創(chuàng)建股票類Stock,屬性包括名稱和價(jià)格。

(2)創(chuàng)建消費(fèi)消息的Flink程序。

(3)創(chuàng)建生產(chǎn)消息的Flink程序。

我們用“STOCK_LOW_PRICE”和“STOCK_HIGH_PRICE”這兩個(gè)ID作為兩個(gè)旁路輸出標(biāo)簽的ID。

在processElement方法中,我們通過判斷股票的價(jià)格是否大于50區(qū)分出低價(jià)股和高價(jià)股,利用Context對象的output方法進(jìn)行旁路輸出,把price小于50的Stock對象輸出到ID為“STOCK_LOW_PRICE”的低價(jià)股標(biāo)簽旁路中,而把price大于等于50的Stock對象輸出到ID為“STOCK_HIGH_PRICE”的高價(jià)股標(biāo)簽旁路中。

(4)依次啟動(dòng)消費(fèi)者程序、生產(chǎn)者程序,觀察消費(fèi)者程序控制臺(tái)中的輸出:

此時(shí),桌面生成了兩個(gè)文件夾,當(dāng)中記錄了股票數(shù)據(jù),result1記錄了小于50的低價(jià)股,result2中記錄了股價(jià)大于等于50的高價(jià)股。

? ?

責(zé)任編輯:趙寧寧 來源: 今日頭條
相關(guān)推薦

2013-05-24 09:25:27

2020-07-07 13:00:00

Linux壓力測試

2009-12-01 19:12:41

Visual Stud

2010-08-31 08:57:02

谷歌即時(shí)搜索功能

2010-12-27 09:19:23

2022-01-10 07:17:02

安全工具CFB模糊測試

2025-01-27 11:52:23

2021-01-05 08:00:00

Windows 10工具GPU

2009-12-10 14:52:21

VS2005 Expr

2022-07-21 08:43:01

功能測試測試

2011-03-04 09:09:07

BlueJ

2013-05-17 13:31:58

2017-12-12 13:17:36

機(jī)器學(xué)習(xí)代碼單元測試

2021-07-03 08:54:49

LinuxSysbench性能

2021-03-28 23:03:50

Python程序員編碼

2023-06-06 16:10:11

2020-11-05 18:30:32

接口測試

2012-11-01 11:32:23

IBMdw

2012-11-01 11:37:05

JavaScript單元測試測試工具

2009-12-09 09:49:56

VS .NET 200
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)