自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Flink的八種分區(qū)策略源碼解讀

開源
Flink包含8中分區(qū)策略,這8中分區(qū)策略(分區(qū)器)分別如下面所示,本文將從源碼的角度一一解讀每個分區(qū)器的實現(xiàn)方式。

[[399426]]

本文轉(zhuǎn)載自微信公眾號「大數(shù)據(jù)技術(shù)與數(shù)倉」,作者西貝。轉(zhuǎn)載本文請聯(lián)系大數(shù)據(jù)技術(shù)與數(shù)倉公眾號。

 Flink包含8中分區(qū)策略,這8中分區(qū)策略(分區(qū)器)分別如下面所示,本文將從源碼的角度一一解讀每個分區(qū)器的實現(xiàn)方式。

  • GlobalPartitioner
  • ShufflePartitioner
  • RebalancePartitioner
  • RescalePartitioner
  • BroadcastPartitioner
  • ForwardPartitioner
  • KeyGroupStreamPartitioner
  • CustomPartitionerWrapper

繼承關(guān)系圖

接口

名稱

ChannelSelector

實現(xiàn)

  1. public interface ChannelSelector<T extends IOReadableWritable> { 
  2.  
  3.     /** 
  4.      * 初始化channels數(shù)量,channel可以理解為下游Operator的某個實例(并行算子的某個subtask). 
  5.      */ 
  6.     void setup(int numberOfChannels); 
  7.  
  8.     /** 
  9.      *根據(jù)當(dāng)前的record以及Channel總數(shù), 
  10.      *決定應(yīng)將record發(fā)送到下游哪個Channel。 
  11.      *不同的分區(qū)策略會實現(xiàn)不同的該方法。 
  12.      */ 
  13.     int selectChannel(T record); 
  14.  
  15.     /** 
  16.     *是否以廣播的形式發(fā)送到下游所有的算子實例 
  17.      */ 
  18.     boolean isBroadcast(); 

抽象類

名稱

StreamPartitioner

實現(xiàn)

  1. public abstract class StreamPartitioner<T> implements 
  2.         ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable { 
  3.     private static final long serialVersionUID = 1L; 
  4.  
  5.     protected int numberOfChannels; 
  6.  
  7.     @Override 
  8.     public void setup(int numberOfChannels) { 
  9.         this.numberOfChannels = numberOfChannels; 
  10.     } 
  11.  
  12.     @Override 
  13.     public boolean isBroadcast() { 
  14.         return false
  15.     } 
  16.  
  17.     public abstract StreamPartitioner<T> copy(); 

繼承關(guān)系圖

GlobalPartitioner

簡介

該分區(qū)器會將所有的數(shù)據(jù)都發(fā)送到下游的某個算子實例(subtask id = 0)

源碼解讀

  1. /** 
  2.  * 發(fā)送所有的數(shù)據(jù)到下游算子的第一個task(ID = 0) 
  3.  * @param <T> 
  4.  */ 
  5. @Internal 
  6. public class GlobalPartitioner<T> extends StreamPartitioner<T> { 
  7.     private static final long serialVersionUID = 1L; 
  8.  
  9.     @Override 
  10.     public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { 
  11.         //只返回0,即只發(fā)送給下游算子的第一個task 
  12.         return 0; 
  13.     } 
  14.  
  15.     @Override 
  16.     public StreamPartitioner<T> copy() { 
  17.         return this; 
  18.     } 
  19.  
  20.     @Override 
  21.     public String toString() { 
  22.         return "GLOBAL"
  23.     } 

圖解

ShufflePartitioner

簡介

隨機選擇一個下游算子實例進行發(fā)送

源碼解讀

  1. /** 
  2.  * 隨機的選擇一個channel進行發(fā)送 
  3.  * @param <T> 
  4.  */ 
  5. @Internal 
  6. public class ShufflePartitioner<T> extends StreamPartitioner<T> { 
  7.     private static final long serialVersionUID = 1L; 
  8.  
  9.     private Random random = new Random(); 
  10.  
  11.     @Override 
  12.     public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { 
  13.         //產(chǎn)生[0,numberOfChannels)偽隨機數(shù),隨機發(fā)送到下游的某個task 
  14.         return random.nextInt(numberOfChannels); 
  15.     } 
  16.  
  17.     @Override 
  18.     public StreamPartitioner<T> copy() { 
  19.         return new ShufflePartitioner<T>(); 
  20.     } 
  21.  
  22.     @Override 
  23.     public String toString() { 
  24.         return "SHUFFLE"
  25.     } 

圖解

BroadcastPartitioner

簡介

發(fā)送到下游所有的算子實例

源碼解讀

  1. /** 
  2.  * 發(fā)送到所有的channel 
  3.  */ 
  4. @Internal 
  5. public class BroadcastPartitioner<T> extends StreamPartitioner<T> { 
  6.     private static final long serialVersionUID = 1L; 
  7.     /** 
  8.      * Broadcast模式是直接發(fā)送到下游的所有task,所以不需要通過下面的方法選擇發(fā)送的通道 
  9.      */ 
  10.     @Override 
  11.     public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { 
  12.         throw new UnsupportedOperationException("Broadcast partitioner does not support select channels."); 
  13.     } 
  14.  
  15.     @Override 
  16.     public boolean isBroadcast() { 
  17.         return true
  18.     } 
  19.  
  20.     @Override 
  21.     public StreamPartitioner<T> copy() { 
  22.         return this; 
  23.     } 
  24.  
  25.     @Override 
  26.     public String toString() { 
  27.         return "BROADCAST"
  28.     } 

圖解

RebalancePartitioner

簡介

通過循環(huán)的方式依次發(fā)送到下游的task

源碼解讀

  1. /** 
  2.  *通過循環(huán)的方式依次發(fā)送到下游的task 
  3.  * @param <T> 
  4.  */ 
  5. @Internal 
  6. public class RebalancePartitioner<T> extends StreamPartitioner<T> { 
  7.     private static final long serialVersionUID = 1L; 
  8.  
  9.     private int nextChannelToSendTo; 
  10.  
  11.     @Override 
  12.     public void setup(int numberOfChannels) { 
  13.         super.setup(numberOfChannels); 
  14.         //初始化channel的id,返回[0,numberOfChannels)的偽隨機數(shù) 
  15.         nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels); 
  16.     } 
  17.  
  18.     @Override 
  19.     public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { 
  20.         //循環(huán)依次發(fā)送到下游的task,比如:nextChannelToSendTo初始值為0,numberOfChannels(下游算子的實例個數(shù),并行度)值為2 
  21.         //則第一次發(fā)送到ID = 1的task,第二次發(fā)送到ID = 0的task,第三次發(fā)送到ID = 1的task上...依次類推 
  22.         nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels; 
  23.         return nextChannelToSendTo; 
  24.     } 
  25.  
  26.     public StreamPartitioner<T> copy() { 
  27.         return this; 
  28.     } 
  29.  
  30.     @Override 
  31.     public String toString() { 
  32.         return "REBALANCE"
  33.     } 

圖解

RescalePartitioner

簡介

基于上下游Operator的并行度,將記錄以循環(huán)的方式輸出到下游Operator的每個實例。

舉例: 上游并行度是2,下游是4,則上游一個并行度以循環(huán)的方式將記錄輸出到下游的兩個并行度上;上游另一個并行度以循環(huán)的方式將記錄輸出到下游另兩個并行度上。

若上游并行度是4,下游并行度是2,則上游兩個并行度將記錄輸出到下游一個并行度上;上游另兩個并行度將記錄輸出到下游另一個并行度上。

源碼解讀

  1. @Internal 
  2. public class RescalePartitioner<T> extends StreamPartitioner<T> { 
  3.     private static final long serialVersionUID = 1L; 
  4.  
  5.     private int nextChannelToSendTo = -1; 
  6.  
  7.     @Override 
  8.     public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { 
  9.         if (++nextChannelToSendTo >= numberOfChannels) { 
  10.             nextChannelToSendTo = 0; 
  11.         } 
  12.         return nextChannelToSendTo; 
  13.     } 
  14.  
  15.     public StreamPartitioner<T> copy() { 
  16.         return this; 
  17.     } 
  18.  
  19.     @Override 
  20.     public String toString() { 
  21.         return "RESCALE"
  22.     } 

圖解

尖叫提示

Flink 中的執(zhí)行圖可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執(zhí)行圖。

StreamGraph:是根據(jù)用戶通過 Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓?fù)浣Y(jié)構(gòu)。

JobGraph:StreamGraph經(jīng)過優(yōu)化后生成了 JobGraph,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)。主要的優(yōu)化為,將多個符合條件的節(jié)點 chain 在一起作為一個節(jié)點,這樣可以減少數(shù)據(jù)在節(jié)點之間流動所需要的序列化/反序列化/傳輸消耗。

ExecutionGraph:JobManager 根據(jù) JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)。

物理執(zhí)行圖:JobManager 根據(jù) ExecutionGraph 對 Job 進行調(diào)度后,在各個TaskManager 上部署 Task 后形成的“圖”,并不是一個具體的數(shù)據(jù)結(jié)構(gòu)。

而StreamingJobGraphGenerator就是StreamGraph轉(zhuǎn)換為JobGraph。在這個類中,把ForwardPartitioner和RescalePartitioner列為POINTWISE分配模式,其他的為ALL_TO_ALL分配模式。代碼如下:

  1. if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) { 
  2.             jobEdge = downStreamVertex.connectNewDataSetAsInput( 
  3.                 headVertex, 
  4.  
  5.                // 上游算子(生產(chǎn)端)的實例(subtask)連接下游算子(消費端)的一個或者多個實例(subtask) 
  6.                 DistributionPattern.POINTWISE, 
  7.                 resultPartitionType); 
  8.         } else { 
  9.             jobEdge = downStreamVertex.connectNewDataSetAsInput( 
  10.                 headVertex, 
  11.                 // 上游算子(生產(chǎn)端)的實例(subtask)連接下游算子(消費端)的所有實例(subtask) 
  12.                 DistributionPattern.ALL_TO_ALL, 
  13.                 resultPartitionType); 
  14.         } 

ForwardPartitioner

簡介

發(fā)送到下游對應(yīng)的第一個task,保證上下游算子并行度一致,即上有算子與下游算子是1:1的關(guān)系

源碼解讀

  1. /** 
  2.  * 發(fā)送到下游對應(yīng)的第一個task 
  3.  * @param <T> 
  4.  */ 
  5. @Internal 
  6. public class ForwardPartitioner<T> extends StreamPartitioner<T> { 
  7.     private static final long serialVersionUID = 1L; 
  8.  
  9.     @Override 
  10.     public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { 
  11.         return 0; 
  12.     } 
  13.  
  14.     public StreamPartitioner<T> copy() { 
  15.         return this; 
  16.     } 
  17.  
  18.     @Override 
  19.     public String toString() { 
  20.         return "FORWARD"
  21.     } 

圖解

尖叫提示

在上下游的算子沒有指定分區(qū)器的情況下,如果上下游的算子并行度一致,則使用ForwardPartitioner,否則使用RebalancePartitioner,對于ForwardPartitioner,必須保證上下游算子并行度一致,否則會拋出異常

  1. //在上下游的算子沒有指定分區(qū)器的情況下,如果上下游的算子并行度一致,則使用ForwardPartitioner,否則使用RebalancePartitioner 
  2.             if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) { 
  3.                 partitioner = new ForwardPartitioner<Object>(); 
  4.             } else if (partitioner == null) { 
  5.                 partitioner = new RebalancePartitioner<Object>(); 
  6.             } 
  7.  
  8.             if (partitioner instanceof ForwardPartitioner) { 
  9.                 //如果上下游的并行度不一致,會拋出異常 
  10.                 if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) { 
  11.                     throw new UnsupportedOperationException("Forward partitioning does not allow " + 
  12.                         "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() + 
  13.                         ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() + 
  14.                         " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global."); 
  15.                 } 
  16.             } 

KeyGroupStreamPartitioner

簡介

根據(jù)key的分組索引選擇發(fā)送到相對應(yīng)的下游subtask

源碼解讀

  1. /** 
  2.  * 根據(jù)key的分組索引選擇發(fā)送到相對應(yīng)的下游subtask 
  3.  * @param <T> 
  4.  * @param <K> 
  5.  */ 
  6. @Internal 
  7. public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implements ConfigurableStreamPartitioner { 
  8. ... 
  9.  
  10.     @Override 
  11.     public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { 
  12.         K key
  13.         try { 
  14.             key = keySelector.getKey(record.getInstance().getValue()); 
  15.         } catch (Exception e) { 
  16.             throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e); 
  17.         } 
  18.         //調(diào)用KeyGroupRangeAssignment類的assignKeyToParallelOperator方法,代碼如下所示 
  19.         return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels); 
  20.     } 
  21. ... 
  • org.apache.flink.runtime.state.KeyGroupRangeAssignment
  1. public final class KeyGroupRangeAssignment { 
  2. ... 
  3.  
  4.     /** 
  5.      * 根據(jù)key分配一個并行算子實例的索引,該索引即為該key要發(fā)送的下游算子實例的路由信息, 
  6.      * 即該key發(fā)送到哪一個task 
  7.      */ 
  8.     public static int assignKeyToParallelOperator(Object keyint maxParallelism, int parallelism) { 
  9.         Preconditions.checkNotNull(key"Assigned key must not be null!"); 
  10.         return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism)); 
  11.     } 
  12.  
  13.     /** 
  14.      *根據(jù)key分配一個分組id(keyGroupId) 
  15.      */ 
  16.     public static int assignToKeyGroup(Object keyint maxParallelism) { 
  17.         Preconditions.checkNotNull(key"Assigned key must not be null!"); 
  18.         //獲取key的hashcode 
  19.         return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism); 
  20.     } 
  21.  
  22.     /** 
  23.      * 根據(jù)key分配一個分組id(keyGroupId), 
  24.      */ 
  25.     public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) { 
  26.  
  27.         //與maxParallelism取余,獲取keyGroupId 
  28.         return MathUtils.murmurHash(keyHash) % maxParallelism; 
  29.     } 
  30.  
  31.     //計算分區(qū)index,即該key group應(yīng)該發(fā)送到下游的哪一個算子實例 
  32.     public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) { 
  33.         return keyGroupId * parallelism / maxParallelism; 
  34.     } 
  35. ... 

圖解

CustomPartitionerWrapper

簡介

通過Partitioner實例的partition方法(自定義的)將記錄輸出到下游。

  1. public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> { 
  2.     private static final long serialVersionUID = 1L; 
  3.  
  4.     Partitioner<K> partitioner; 
  5.     KeySelector<T, K> keySelector; 
  6.  
  7.     public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) { 
  8.         this.partitioner = partitioner; 
  9.         this.keySelector = keySelector; 
  10.     } 
  11.  
  12.     @Override 
  13.     public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { 
  14.         K key
  15.         try { 
  16.             key = keySelector.getKey(record.getInstance().getValue()); 
  17.         } catch (Exception e) { 
  18.             throw new RuntimeException("Could not extract key from " + record.getInstance(), e); 
  19.         } 
  20. //實現(xiàn)Partitioner接口,重寫partition方法 
  21.         return partitioner.partition(key, numberOfChannels); 
  22.     } 
  23.  
  24.     @Override 
  25.     public StreamPartitioner<T> copy() { 
  26.         return this; 
  27.     } 
  28.  
  29.     @Override 
  30.     public String toString() { 
  31.         return "CUSTOM"
  32.     } 

比如:

  1. public class CustomPartitioner implements Partitioner<String> { 
  2.       // key: 根據(jù)key的值來分區(qū) 
  3.       // numPartitions: 下游算子并行度 
  4.       @Override 
  5.       public int partition(String keyint numPartitions) { 
  6.          return key.length() % numPartitions;//在此處定義分區(qū)策略 
  7.       } 
  8.   } 

小結(jié)

本文主要從源碼層面對Flink的8中分區(qū)策略進行了一一分析,并對每一種分區(qū)策略給出了相對應(yīng)的圖示,方便快速理解源碼。如果你覺得本文對你有用,可以關(guān)注我,了解更多精彩內(nèi)容。

 

責(zé)任編輯:武曉燕 來源: 大數(shù)據(jù)技術(shù)與數(shù)倉
相關(guān)推薦

2023-03-10 15:31:45

2024-02-27 08:05:32

Flink分區(qū)機制數(shù)據(jù)傳輸

2020-07-08 12:05:55

Java線程池策略

2010-05-10 16:20:32

負(fù)載均衡策略

2009-09-25 14:20:28

Hibernate繼承映射

2011-06-09 13:48:48

程序員

2024-09-06 09:37:45

WebApp類加載器Web 應(yīng)用

2023-11-20 13:52:00

Redis數(shù)據(jù)庫

2010-06-28 09:19:07

微軟開源

2024-09-04 09:18:03

分區(qū)策略

2009-10-23 14:34:00

光纖接入技術(shù)

2010-10-11 10:31:51

MySQL分區(qū)

2023-03-30 09:06:20

HiveSpark大數(shù)據(jù)

2010-09-09 08:39:30

2016-07-05 14:09:02

AndroidJAVA內(nèi)存

2010-08-24 09:49:44

2021-08-02 10:46:02

云計算用途

2015-06-15 10:32:44

Java核心源碼解讀

2024-10-28 08:15:32

2010-01-27 10:37:17

Android圖片瀏覽
點贊
收藏

51CTO技術(shù)棧公眾號