Flink的八種分區(qū)策略源碼解讀
本文轉(zhuǎn)載自微信公眾號「大數(shù)據(jù)技術(shù)與數(shù)倉」,作者西貝。轉(zhuǎn)載本文請聯(lián)系大數(shù)據(jù)技術(shù)與數(shù)倉公眾號。
Flink包含8中分區(qū)策略,這8中分區(qū)策略(分區(qū)器)分別如下面所示,本文將從源碼的角度一一解讀每個分區(qū)器的實現(xiàn)方式。
- GlobalPartitioner
- ShufflePartitioner
- RebalancePartitioner
- RescalePartitioner
- BroadcastPartitioner
- ForwardPartitioner
- KeyGroupStreamPartitioner
- CustomPartitionerWrapper
繼承關(guān)系圖
接口
名稱
ChannelSelector
實現(xiàn)
- public interface ChannelSelector<T extends IOReadableWritable> {
- /**
- * 初始化channels數(shù)量,channel可以理解為下游Operator的某個實例(并行算子的某個subtask).
- */
- void setup(int numberOfChannels);
- /**
- *根據(jù)當(dāng)前的record以及Channel總數(shù),
- *決定應(yīng)將record發(fā)送到下游哪個Channel。
- *不同的分區(qū)策略會實現(xiàn)不同的該方法。
- */
- int selectChannel(T record);
- /**
- *是否以廣播的形式發(fā)送到下游所有的算子實例
- */
- boolean isBroadcast();
- }
抽象類
名稱
StreamPartitioner
實現(xiàn)
- public abstract class StreamPartitioner<T> implements
- ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable {
- private static final long serialVersionUID = 1L;
- protected int numberOfChannels;
- @Override
- public void setup(int numberOfChannels) {
- this.numberOfChannels = numberOfChannels;
- }
- @Override
- public boolean isBroadcast() {
- return false;
- }
- public abstract StreamPartitioner<T> copy();
- }
繼承關(guān)系圖
GlobalPartitioner
簡介
該分區(qū)器會將所有的數(shù)據(jù)都發(fā)送到下游的某個算子實例(subtask id = 0)
源碼解讀
- /**
- * 發(fā)送所有的數(shù)據(jù)到下游算子的第一個task(ID = 0)
- * @param <T>
- */
- @Internal
- public class GlobalPartitioner<T> extends StreamPartitioner<T> {
- private static final long serialVersionUID = 1L;
- @Override
- public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
- //只返回0,即只發(fā)送給下游算子的第一個task
- return 0;
- }
- @Override
- public StreamPartitioner<T> copy() {
- return this;
- }
- @Override
- public String toString() {
- return "GLOBAL";
- }
- }
圖解
ShufflePartitioner
簡介
隨機選擇一個下游算子實例進行發(fā)送
源碼解讀
- /**
- * 隨機的選擇一個channel進行發(fā)送
- * @param <T>
- */
- @Internal
- public class ShufflePartitioner<T> extends StreamPartitioner<T> {
- private static final long serialVersionUID = 1L;
- private Random random = new Random();
- @Override
- public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
- //產(chǎn)生[0,numberOfChannels)偽隨機數(shù),隨機發(fā)送到下游的某個task
- return random.nextInt(numberOfChannels);
- }
- @Override
- public StreamPartitioner<T> copy() {
- return new ShufflePartitioner<T>();
- }
- @Override
- public String toString() {
- return "SHUFFLE";
- }
- }
圖解
BroadcastPartitioner
簡介
發(fā)送到下游所有的算子實例
源碼解讀
- /**
- * 發(fā)送到所有的channel
- */
- @Internal
- public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
- private static final long serialVersionUID = 1L;
- /**
- * Broadcast模式是直接發(fā)送到下游的所有task,所以不需要通過下面的方法選擇發(fā)送的通道
- */
- @Override
- public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
- throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");
- }
- @Override
- public boolean isBroadcast() {
- return true;
- }
- @Override
- public StreamPartitioner<T> copy() {
- return this;
- }
- @Override
- public String toString() {
- return "BROADCAST";
- }
- }
圖解
RebalancePartitioner
簡介
通過循環(huán)的方式依次發(fā)送到下游的task
源碼解讀
- /**
- *通過循環(huán)的方式依次發(fā)送到下游的task
- * @param <T>
- */
- @Internal
- public class RebalancePartitioner<T> extends StreamPartitioner<T> {
- private static final long serialVersionUID = 1L;
- private int nextChannelToSendTo;
- @Override
- public void setup(int numberOfChannels) {
- super.setup(numberOfChannels);
- //初始化channel的id,返回[0,numberOfChannels)的偽隨機數(shù)
- nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
- }
- @Override
- public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
- //循環(huán)依次發(fā)送到下游的task,比如:nextChannelToSendTo初始值為0,numberOfChannels(下游算子的實例個數(shù),并行度)值為2
- //則第一次發(fā)送到ID = 1的task,第二次發(fā)送到ID = 0的task,第三次發(fā)送到ID = 1的task上...依次類推
- nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
- return nextChannelToSendTo;
- }
- public StreamPartitioner<T> copy() {
- return this;
- }
- @Override
- public String toString() {
- return "REBALANCE";
- }
- }
圖解
RescalePartitioner
簡介
基于上下游Operator的并行度,將記錄以循環(huán)的方式輸出到下游Operator的每個實例。
舉例: 上游并行度是2,下游是4,則上游一個并行度以循環(huán)的方式將記錄輸出到下游的兩個并行度上;上游另一個并行度以循環(huán)的方式將記錄輸出到下游另兩個并行度上。
若上游并行度是4,下游并行度是2,則上游兩個并行度將記錄輸出到下游一個并行度上;上游另兩個并行度將記錄輸出到下游另一個并行度上。
源碼解讀
- @Internal
- public class RescalePartitioner<T> extends StreamPartitioner<T> {
- private static final long serialVersionUID = 1L;
- private int nextChannelToSendTo = -1;
- @Override
- public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
- if (++nextChannelToSendTo >= numberOfChannels) {
- nextChannelToSendTo = 0;
- }
- return nextChannelToSendTo;
- }
- public StreamPartitioner<T> copy() {
- return this;
- }
- @Override
- public String toString() {
- return "RESCALE";
- }
- }
圖解
尖叫提示
Flink 中的執(zhí)行圖可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執(zhí)行圖。
StreamGraph:是根據(jù)用戶通過 Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓?fù)浣Y(jié)構(gòu)。
JobGraph:StreamGraph經(jīng)過優(yōu)化后生成了 JobGraph,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)。主要的優(yōu)化為,將多個符合條件的節(jié)點 chain 在一起作為一個節(jié)點,這樣可以減少數(shù)據(jù)在節(jié)點之間流動所需要的序列化/反序列化/傳輸消耗。
ExecutionGraph:JobManager 根據(jù) JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)。
物理執(zhí)行圖:JobManager 根據(jù) ExecutionGraph 對 Job 進行調(diào)度后,在各個TaskManager 上部署 Task 后形成的“圖”,并不是一個具體的數(shù)據(jù)結(jié)構(gòu)。
而StreamingJobGraphGenerator就是StreamGraph轉(zhuǎn)換為JobGraph。在這個類中,把ForwardPartitioner和RescalePartitioner列為POINTWISE分配模式,其他的為ALL_TO_ALL分配模式。代碼如下:
- if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) {
- jobEdge = downStreamVertex.connectNewDataSetAsInput(
- headVertex,
- // 上游算子(生產(chǎn)端)的實例(subtask)連接下游算子(消費端)的一個或者多個實例(subtask)
- DistributionPattern.POINTWISE,
- resultPartitionType);
- } else {
- jobEdge = downStreamVertex.connectNewDataSetAsInput(
- headVertex,
- // 上游算子(生產(chǎn)端)的實例(subtask)連接下游算子(消費端)的所有實例(subtask)
- DistributionPattern.ALL_TO_ALL,
- resultPartitionType);
- }
ForwardPartitioner
簡介
發(fā)送到下游對應(yīng)的第一個task,保證上下游算子并行度一致,即上有算子與下游算子是1:1的關(guān)系
源碼解讀
- /**
- * 發(fā)送到下游對應(yīng)的第一個task
- * @param <T>
- */
- @Internal
- public class ForwardPartitioner<T> extends StreamPartitioner<T> {
- private static final long serialVersionUID = 1L;
- @Override
- public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
- return 0;
- }
- public StreamPartitioner<T> copy() {
- return this;
- }
- @Override
- public String toString() {
- return "FORWARD";
- }
- }
圖解
尖叫提示
在上下游的算子沒有指定分區(qū)器的情況下,如果上下游的算子并行度一致,則使用ForwardPartitioner,否則使用RebalancePartitioner,對于ForwardPartitioner,必須保證上下游算子并行度一致,否則會拋出異常
- //在上下游的算子沒有指定分區(qū)器的情況下,如果上下游的算子并行度一致,則使用ForwardPartitioner,否則使用RebalancePartitioner
- if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
- partitioner = new ForwardPartitioner<Object>();
- } else if (partitioner == null) {
- partitioner = new RebalancePartitioner<Object>();
- }
- if (partitioner instanceof ForwardPartitioner) {
- //如果上下游的并行度不一致,會拋出異常
- if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
- throw new UnsupportedOperationException("Forward partitioning does not allow " +
- "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
- ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
- " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
- }
- }
KeyGroupStreamPartitioner
簡介
根據(jù)key的分組索引選擇發(fā)送到相對應(yīng)的下游subtask
源碼解讀
- /**
- * 根據(jù)key的分組索引選擇發(fā)送到相對應(yīng)的下游subtask
- * @param <T>
- * @param <K>
- */
- @Internal
- public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implements ConfigurableStreamPartitioner {
- ...
- @Override
- public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
- K key;
- try {
- key = keySelector.getKey(record.getInstance().getValue());
- } catch (Exception e) {
- throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
- }
- //調(diào)用KeyGroupRangeAssignment類的assignKeyToParallelOperator方法,代碼如下所示
- return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
- }
- ...
- }
- org.apache.flink.runtime.state.KeyGroupRangeAssignment
- public final class KeyGroupRangeAssignment {
- ...
- /**
- * 根據(jù)key分配一個并行算子實例的索引,該索引即為該key要發(fā)送的下游算子實例的路由信息,
- * 即該key發(fā)送到哪一個task
- */
- public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
- Preconditions.checkNotNull(key, "Assigned key must not be null!");
- return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
- }
- /**
- *根據(jù)key分配一個分組id(keyGroupId)
- */
- public static int assignToKeyGroup(Object key, int maxParallelism) {
- Preconditions.checkNotNull(key, "Assigned key must not be null!");
- //獲取key的hashcode
- return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
- }
- /**
- * 根據(jù)key分配一個分組id(keyGroupId),
- */
- public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
- //與maxParallelism取余,獲取keyGroupId
- return MathUtils.murmurHash(keyHash) % maxParallelism;
- }
- //計算分區(qū)index,即該key group應(yīng)該發(fā)送到下游的哪一個算子實例
- public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
- return keyGroupId * parallelism / maxParallelism;
- }
- ...
圖解
CustomPartitionerWrapper
簡介
通過Partitioner實例的partition方法(自定義的)將記錄輸出到下游。
- public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
- private static final long serialVersionUID = 1L;
- Partitioner<K> partitioner;
- KeySelector<T, K> keySelector;
- public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
- this.partitioner = partitioner;
- this.keySelector = keySelector;
- }
- @Override
- public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
- K key;
- try {
- key = keySelector.getKey(record.getInstance().getValue());
- } catch (Exception e) {
- throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
- }
- //實現(xiàn)Partitioner接口,重寫partition方法
- return partitioner.partition(key, numberOfChannels);
- }
- @Override
- public StreamPartitioner<T> copy() {
- return this;
- }
- @Override
- public String toString() {
- return "CUSTOM";
- }
- }
比如:
- public class CustomPartitioner implements Partitioner<String> {
- // key: 根據(jù)key的值來分區(qū)
- // numPartitions: 下游算子并行度
- @Override
- public int partition(String key, int numPartitions) {
- return key.length() % numPartitions;//在此處定義分區(qū)策略
- }
- }
小結(jié)
本文主要從源碼層面對Flink的8中分區(qū)策略進行了一一分析,并對每一種分區(qū)策略給出了相對應(yīng)的圖示,方便快速理解源碼。如果你覺得本文對你有用,可以關(guān)注我,了解更多精彩內(nèi)容。