自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

聊聊Flink:Flink的分區(qū)機制

開發(fā) 前端
flink中的重分區(qū)算子定義上下游subtask之間數(shù)據(jù)傳遞的方式,SubTask之間進行數(shù)據(jù)傳遞模式有兩種,一種是one-to-one(forwarding)模式,另一種是redistributing的模式。

一、前言

flink任務(wù)在執(zhí)行過程中,一個流(stream)包含一個或多個分區(qū)(Stream partition)。TaskManager中的一個slot的subtask就是一個stream partition(流分區(qū)),一個Job的流(stream)分布在多個不同的Slot上執(zhí)行。每一個算子可以包含一個或多個子任務(wù)(subtask),這些subtask執(zhí)行在不同的分區(qū)中,本質(zhì)是在不同的線程、不同的物理機或不同的容器中彼此互不依賴地執(zhí)行。

1.1 Flink數(shù)據(jù)傳輸

  • 組件之間的通信消息傳輸,即Client、JobManager、TaskManager之間的信息傳遞,采用Akka框架(主要用作組件間的協(xié)同,如心跳檢測、狀態(tài)上報、指標統(tǒng)計、作業(yè)提交和部署等)。
  • 算子之間的流數(shù)據(jù)傳輸

本地線程內(nèi)的流數(shù)據(jù)傳輸(同一個SubTask中):同一個SubTask內(nèi)的兩個Operator(屬于同一個OperatorChain)之間的數(shù)據(jù)傳輸是方法調(diào)用,即上游算子處理完數(shù)據(jù)后,直接調(diào)用下游算子的processElement方法。

本地線程間的流數(shù)據(jù)傳輸(同一個TaskManager的不同SubTask中):即同一個TaskManager(JVM進程)中的不同Task(線程,本質(zhì)上是SubTask)的算子之間的數(shù)據(jù)傳輸,通過本地內(nèi)存進行數(shù)據(jù)傳遞,存在數(shù)據(jù)序列化和反序列過程。

跨網(wǎng)絡(luò)的流數(shù)據(jù)傳輸(不同TaskManager的SubTask中):采用Netty框架,通過Socket傳遞,也存在數(shù)據(jù)序列化和反序列過程。

flink中的重分區(qū)算子定義上下游subtask之間數(shù)據(jù)傳遞的方式,SubTask之間進行數(shù)據(jù)傳遞模式有兩種,一種是one-to-one(forwarding)模式,另一種是redistributing的模式。

1.2 重分區(qū)算子數(shù)據(jù)傳遞的兩種方式

  • One-to-one:數(shù)據(jù)不需要重新分布,上游SubTask生產(chǎn)的數(shù)據(jù)與下游SubTask受到的數(shù)據(jù)完全一致,數(shù)據(jù)不需要重分區(qū),也就是數(shù)據(jù)不需要經(jīng)過IO,比如下圖中source->map的數(shù)據(jù)傳遞形式就是One-to-One方式。常見的map、fliter、flatMap等算子的SubTask的數(shù)據(jù)傳遞都是one-to-one的對應(yīng)關(guān)系。類似于spark中的窄依賴。
  • Redistributing:數(shù)據(jù)需要通過shuffle過程重新分區(qū),需要經(jīng)過IO,比如上圖中的map->keyBy。創(chuàng)建的keyBy、broadcast、rebalance、shuffle等算子的SubTask的數(shù)據(jù)傳遞都是Redistributing方式,但它們具體數(shù)據(jù)傳遞方式是不同的。類似于spark中的寬依賴。

圖片圖片

flink中的重分區(qū)算子除了keyBy以外,還有broadcast、rebalance、shuffle、rescale、global、partitionCustom等多種算子,它們的分區(qū)方式各不相同。需要注意的是,這些算子中除了keyBy能將DataStream轉(zhuǎn)化為KeyedStream外,其它重分區(qū)算子均不會改變Stream的類型。

二、分區(qū)策略

數(shù)據(jù)在算子之間流動需要依靠分區(qū)策略(分區(qū)器),F(xiàn)link目前內(nèi)置了以下幾種分區(qū)策略和自定義分區(qū)策略。已實現(xiàn)的分區(qū)策略對應(yīng)的API為:

圖片圖片

自定義分區(qū)策略的API為CustomPartitionerWrapper。

各個API的繼承關(guān)系如下圖所示:

圖片圖片

ChannelSelector是分區(qū)策略的頂層接口,其決定了記錄應(yīng)該寫入哪個邏輯通道,通道可理解為下游算子的某個實例,或下游并行算子的某個子任務(wù)。該接口的定義源碼如下:

圖片圖片

抽象類StreamPartitioner實現(xiàn)了ChannelSelector接口,是一個用于流程序的特殊的ChannelSelector,其中定義了一些通用的分區(qū)策略方法。Flink中的所有分區(qū)策略(分區(qū)器)都繼承了StreamPartitioner類,并且實現(xiàn)了各自獨有的分區(qū)規(guī)則。

三、內(nèi)置分區(qū)策略

3.1 BinaryHashPartitioner

該分區(qū)策略位于Blink的Table API的org.apache.flink.table.runtime.partitioner包中,是一種針對BinaryRowData的哈希分區(qū)器。BinaryRowData是RowData的實現(xiàn),可以顯著減少Java對象的序列化/反序列化。RowData用于表示結(jié)構(gòu)化數(shù)據(jù)類型,運行時通過Table API或SQL管道傳遞的所有頂級記錄都是RowData的實例。關(guān)于BinaryHashPartitioner,我們這里不做過多講解。

3.2 BroadcastPartitioner

廣播分區(qū)策略將上游數(shù)據(jù)記錄輸出到下游算子的每個并行實例中,即下游每個分區(qū)都會有上游的所有數(shù)據(jù)。使用DataStream的broadcast()方法即可設(shè)置該DataStream向下游發(fā)送數(shù)據(jù)時使用廣播分區(qū)策略。

來一段代碼演示下:

/**
 * 微信公眾號:老周聊架構(gòu)
 */
public class PartitionerTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        DataStream<Integer> dataStream = env.fromElements(1, 2, 3, 4, 5, 6);
        //1.分區(qū)策略前的操作
        //輸出dataStream每個元素及所屬的子任務(wù)編號
        dataStream.map(new RichMapFunction<Integer, Object>() {
            @Override
            public Object map(Integer value) throws Exception {
                System.out.println(String.format("元素值: %s, 分區(qū)策略前,子任務(wù)編號: %s", value,
                        getRuntimeContext().getIndexOfThisSubtask()));
                return value;
            }
        });
        //2.設(shè)置分區(qū)策略
        //設(shè)置DataStream向下游發(fā)送數(shù)據(jù)時使用的策略
        DataStream<Integer> dataStreamAfter = dataStream.broadcast();
        //3.分區(qū)策略后的操作
        dataStreamAfter.map(new RichMapFunction<Integer, Object>() {
            @Override
            public Object map(Integer value) throws Exception {
                System.out.println(String.format("元素值: %s, 分區(qū)策略后,子任務(wù)編號: %s", value,
                        getRuntimeContext().getIndexOfThisSubtask()));
                return value;
            }
        }).print();
        env.execute("PartitionerTest Job");
    }
}

直接IDEA控制臺輸出:

圖片圖片

從輸出結(jié)果可以看出,數(shù)據(jù)共分為3個分區(qū)(編號為0、1、2)。執(zhí)行分區(qū)策略前,每個元素所屬的分區(qū):

圖片圖片

執(zhí)行分區(qū)策略后,每個元素所屬的分區(qū)如下:

圖片圖片

對比表發(fā)現(xiàn),廣播分區(qū)策略將上游每個元素分別發(fā)送到了下游算子的所有分區(qū),這種策略會把數(shù)據(jù)復(fù)制多份,向下游算子的每個分區(qū)發(fā)送一份。

圖片圖片

我們把上面的任務(wù)提交到Flink,同樣也可以看出前面分區(qū)前每個子任務(wù)兩條數(shù)據(jù),分區(qū)后每個子任務(wù)六條數(shù)據(jù)。

圖片圖片

圖片圖片

3.3 ForwardPartitioner

轉(zhuǎn)發(fā)分區(qū)策略只將元素轉(zhuǎn)發(fā)給本地運行的下游算子的實例,即將元素發(fā)送到與當前算子實例在同一個TaskManager的下游算子實例,而不需要進行網(wǎng)絡(luò)傳輸。要求上下游算子并行度一樣,這樣上下游算子可以同屬一個子任務(wù)。

這里把上面的代碼調(diào)整下:

dataStream.forward()

IDEA控制臺輸出:

圖片圖片

從輸出結(jié)果可以看出,數(shù)據(jù)共分為3個分區(qū)(編號為0、1、2)。執(zhí)行分區(qū)策略前,每個元素所屬的分區(qū):

圖片圖片

執(zhí)行分區(qū)策略后,每個元素所屬的分區(qū)如下:

圖片圖片

對比發(fā)現(xiàn),轉(zhuǎn)發(fā)分區(qū)策略將上游同一個分區(qū)的元素發(fā)送到了下游同一個分區(qū)中。使用數(shù)據(jù)流圖表示如下圖:

圖片圖片

在上下游的算子沒有指定分區(qū)策略的情況下,如果上下游的算子并行度一致,則默認使用ForwardPartitioner,否則使用RebalancePartitioner。在StreamGraph類的源碼中可以看到該規(guī)則:

圖片圖片

對于ForwardPartitioner,必須保證上下游算子并行度一致,否則會拋出異常。

圖片圖片

3.4 GlobalPartitioner

全局分區(qū)策略將上游所有元素發(fā)送到下游子任務(wù)編號等于0的分區(qū)算子實例上(下游第一個實例)。

這里把上面的代碼調(diào)整下:

dataStream.global()

IDEA控制臺輸出:

圖片圖片

分區(qū)前:

圖片圖片

分區(qū)后:

圖片圖片

全局分區(qū)策略將上游所有分區(qū)中的所有元素發(fā)送到了下游編號為0的分區(qū)中:

圖片圖片

3.5 .KeyGroupStreamPartitioner

Key分區(qū)策略根據(jù)元素Key的Hash值輸出到下游算子指定的實例。keyBy()算子底層正是使用的該分區(qū)策略,底層最終會調(diào)用KeyGroupStreamPartitioner的selectChannel()方法,計算每個Key對應(yīng)的通道索引(通道編號,可理解為分區(qū)編號),根據(jù)通道索引將Key發(fā)送到下游相應(yīng)的分區(qū)中。selectChannel()方法源碼如下:

圖片圖片

圖片圖片

總的來說,F(xiàn)link底層計算通道索引(分區(qū)編號)的流程如下:

  • 計算Key的HashCode值。
  • 將Key的HashCode值進行特殊的Hash處理,即MathUtils.murmurHash(keyHash),返回一個非負哈希碼。
  • 將非負哈希碼除以最大并行度取余數(shù),得到keyGroupId,即Key組索引。
  • 使用公式keyGroupId×parallelism/maxParallelism得到分區(qū)編號。parallelism為當前算子的并行度,即通道數(shù)量;maxParallelism為系統(tǒng)默認支持的最大并行度,即128。

3.6 RebalancePartitioner

平衡分區(qū)策略使用循環(huán)遍歷下游分區(qū)的方式,將上游元素均勻分配給下游算子的每個實例。每個下游算子的實例都具有相等的負載。當數(shù)據(jù)流中的元素存在數(shù)據(jù)傾斜時,使用該策略對性能有很大的提升。

這里把上面的代碼調(diào)整下:

dataStream.setParallelism(2);

dataStreamAfter.setParallelism(3);

dataStream.rebalance()

IDEA控制臺輸出:

圖片圖片

分區(qū)前:

圖片圖片

分區(qū)后:

圖片圖片

平衡分區(qū)策略將上游所有元素均勻發(fā)送到了下游算子的所有分區(qū):

圖片圖片

3.7 RescalePartitioner

重新調(diào)節(jié)分區(qū)策略基于上下游算子的并行度,將元素以循環(huán)的方式輸出到下游算子的每個實例。類似于平衡分區(qū)策略,但又與平衡分區(qū)策略不同。

上游算子將元素發(fā)送到下游哪一個算子實例,取決于上游和下游算子的并行度。例如,如果上游算子的并行度為2,而下游算子的并行度為4,那么一個上游算子實例將把元素均勻分配給兩個下游算子實例,而另一個上游算子實例將把元素均勻分配給另外兩個下游算子實例。相反,如果下游算子的并行度為2,而上游算子的并行度為4,那么兩個上游算子實例將分配給一個下游算子實例,而另外兩個上游算子實例將分配給另一個下游算子實例。

假設(shè)上游算子并行度為2,分區(qū)編號為A和B,下游算子并行度為4,分區(qū)編號為1、2、3、4,那么A將把數(shù)據(jù)循環(huán)發(fā)送給1和2,B則把數(shù)據(jù)循環(huán)發(fā)送給3和4。假設(shè)上游算子并行度為4,編號為A、B、C、D,下游算子并行度為2,編號為1、2,那么A和B把數(shù)據(jù)發(fā)送給1,C和D則把數(shù)據(jù)發(fā)送給2。

這里把上面的代碼調(diào)整下:

dataStream.rescale()

同時將第一個map算子的并行度設(shè)置為2,第二個map算子的并行度設(shè)置為4。

IDEA控制臺輸出:

圖片圖片

分區(qū)前:

圖片圖片

分區(qū)后:

圖片圖片

圖片圖片

接下來改變map算子的并行度,將第一個map算子的并行度設(shè)置為4,第二個map算子的并行度設(shè)置為2。

圖片圖片

如果想將元素均勻地輸出到下游算子的每個實例,以實現(xiàn)負載均衡,同時又不希望使用平衡分區(qū)策略的全局負載均衡,則可以使用重新調(diào)節(jié)分區(qū)策略。該策略會盡可能避免數(shù)據(jù)在網(wǎng)絡(luò)間傳輸,而能否避免還取決于TaskManager的Task Slot數(shù)量、上下游算子的并行度等。

3.8 ShufflePartitioner

隨機分區(qū)策略將上游算子元素輸出到下游算子的隨機實例中。元素會被均勻分配到下游算子的每個實例。這種策略可以實現(xiàn)計算任務(wù)的負載均衡。

這里把上面的代碼調(diào)整下:

dataStream.shuffle()

這里就不做過多演示了。我們下面來看下自定義分區(qū)策略。

四、自定義分區(qū)策略

自定義分區(qū)策略的API為CustomPartitionerWrapper。該策略允許開發(fā)者自定義規(guī)則將上游算子元素發(fā)送到下游指定的算子實例中。

4.1 新建自定義分區(qū)器

新建分區(qū)器類MyCustomPartitioner并實現(xiàn)接口Partitioner(Object表示分區(qū)Key的數(shù)據(jù)類型),實現(xiàn)其中未實現(xiàn)的方法partition(),在該方法中添加相應(yīng)的分區(qū)邏輯。

/**
 * 自定義分區(qū)策略
 * 微信公眾號:老周聊架構(gòu)
 */
public class MyCustomPartitioner implements Partitioner {
    @Override
    public int partition(Object key, int numPartitions) {
        if (key.equals("chinese")) {
            return 0;
        } else if (key.equals("math")) {
            return 1;
        } else {
            return 2;
        }
    }
}

上述代碼通過partition()方法取得分區(qū)編號,將Key值等于chinese的元素分配到編號為0的分區(qū),將Key值等于math的元素分配到編號為1的分區(qū),其余元素分配到編號為2的分區(qū)。

4.2 使用自定義分區(qū)器

調(diào)用DataStream的partitionCustom()方法傳入自定義分區(qū)器類MyCustomPartitioner的實例,可以對DataStream按照自定義規(guī)則進行重新分區(qū),代碼如下:

/**
 * 自定義分區(qū)策略
 * 微信公眾號:老周聊架構(gòu)
 */
public class CustomPartitionerTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        DataStream<String> dataStream = env.fromElements("chinese,98", "math,88", "english,96");
        //1.分區(qū)策略前的操作
        //輸出dataStream每個元素及所屬的子任務(wù)編號
        SingleOutputStreamOperator<Map<String, Integer>> dataStreamBefore =
                dataStream.map(new RichMapFunction<String, Map<String, Integer>>() {
                    @Override
                    public Map<String, Integer> map(String value) throws Exception {
                        System.out.println(String.format("元素值: %s, 分區(qū)策略前,子任務(wù)編號: %s", value,
                                getRuntimeContext().getIndexOfThisSubtask()));
                        Map<String, Integer> map = new HashMap<>();
                        map.put(value.split(",")[0], Integer.parseInt(value.split(",")[1]));
                        return map;
                    }
                }).setParallelism(2);

        //2.設(shè)置分區(qū)策略
        //設(shè)置DataStream向下游發(fā)送數(shù)據(jù)時使用的策略
        DataStream<Map<String, Integer>> dataStreamAfter = dataStreamBefore.partitionCustom(new MyCustomPartitioner(), value -> value);

        //3.分區(qū)策略后的操作
        dataStreamAfter.map(new RichMapFunction<Map<String, Integer>, Map<String, Integer>>() {
            @Override
            public Map<String, Integer> map(Map<String, Integer> value) throws Exception {
                System.out.println(String.format("元素值: %s, 分區(qū)策略后,子任務(wù)編號: %s", value,
                        getRuntimeContext().getIndexOfThisSubtask()));
                return value;
            }
        }).setParallelism(3).print();
        env.execute("CustomPartitionerTest Job");
    }
}

分區(qū)前:

圖片圖片

分區(qū)后:

圖片圖片

自定義分區(qū)策略將上游所有元素按照自定義的規(guī)則發(fā)送到了下游的3個分區(qū)中。

把任務(wù)給到Flink上去跑,發(fā)現(xiàn):

圖片圖片

這是因為泛型擦除,下面的DataStream泛型需要指定類型,不能

圖片圖片

小知識:

在編譯之后程序會采取去泛型化的措施。也就是說Java中的泛型,只在編譯階段有效。在編譯過程中,正確檢驗泛型結(jié)果后,在運行時會將泛型的相關(guān)信息擦除,編譯器只會在對象進入JVM和離開JVM的邊界處添加類型檢查和轉(zhuǎn)換的方法,泛型的信息不會進入到運行時階段,這就是所謂的Java類型擦除。

類型加好以后,再跑一下任務(wù),會出現(xiàn)任務(wù)成功。

圖片圖片

圖片圖片


責任編輯:武曉燕 來源: 老周聊架構(gòu)
相關(guān)推薦

2024-01-29 08:07:42

FlinkYARN架構(gòu)

2024-04-09 07:50:59

Flink語義Watermark

2022-01-14 07:56:38

Checkpoint機制Flink

2023-03-22 18:34:30

Flink調(diào)度部署

2021-11-02 06:58:55

FlinkWindow機制

2022-12-08 07:17:49

2021-05-14 08:33:02

Flink策略源碼

2024-03-27 10:08:05

Flink觸發(fā)器Trigger

2022-02-09 15:23:41

大數(shù)據(jù)流計算Spark

2022-05-19 08:47:30

Flinkwatermark窗口計算

2021-09-12 07:01:07

Flink SQL ETL datastream

2021-12-11 19:00:54

Java中斷機制

2023-02-24 16:46:25

Glide緩存機制

2024-06-03 08:26:35

2021-05-06 11:54:40

大數(shù)據(jù)Flink

2024-08-05 00:10:00

2020-12-02 08:43:00

Flink SQLHBase場景

2021-09-08 10:36:01

Flink阿里云

2021-06-08 23:18:24

RestApiFlink metriFlink

2021-12-09 06:59:24

FlinkSQL 開發(fā)
點贊
收藏

51CTO技術(shù)棧公眾號