真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Flink中分區(qū)策略源碼是什么

這篇文章將為大家詳細講解有關Flink中分區(qū)策略源碼是什么,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

創(chuàng)新互聯(lián)是一家專業(yè)提供江津企業(yè)網(wǎng)站建設,專注與網(wǎng)站設計制作、網(wǎng)站制作、H5場景定制、小程序制作等業(yè)務。10年已為江津眾多企業(yè)、政府機構(gòu)等服務。創(chuàng)新互聯(lián)專業(yè)的建站公司優(yōu)惠進行中。

繼承關系圖

接口

名稱

ChannelSelector

實現(xiàn)

public interface ChannelSelector {      /**      * 初始化channels數(shù)量,channel可以理解為下游Operator的某個實例(并行算子的某個subtask).      */     void setup(int numberOfChannels);      /**      *根據(jù)當前的record以及Channel總數(shù),      *決定應將record發(fā)送到下游哪個Channel。      *不同的分區(qū)策略會實現(xiàn)不同的該方法。      */     int selectChannel(T record);      /**     *是否以廣播的形式發(fā)送到下游所有的算子實例      */     boolean isBroadcast(); }

抽象類

名稱

StreamPartitioner

實現(xiàn)

public abstract class StreamPartitioner implements         ChannelSelector>>, Serializable {     private static final long serialVersionUID = 1L;      protected int numberOfChannels;      @Override     public void setup(int numberOfChannels) {         this.numberOfChannels = numberOfChannels;     }      @Override     public boolean isBroadcast() {         return false;     }      public abstract StreamPartitioner copy(); }

繼承關系圖

Flink中分區(qū)策略源碼是什么

GlobalPartitioner

簡介

該分區(qū)器會將所有的數(shù)據(jù)都發(fā)送到下游的某個算子實例(subtask id = 0)

源碼解讀

/**  * 發(fā)送所有的數(shù)據(jù)到下游算子的第一個task(ID = 0)  * @param   */ @Internal public class GlobalPartitioner extends StreamPartitioner {     private static final long serialVersionUID = 1L;      @Override     public int selectChannel(SerializationDelegate> record) {         //只返回0,即只發(fā)送給下游算子的第一個task         return 0;     }      @Override     public StreamPartitioner copy() {         return this;     }      @Override     public String toString() {         return "GLOBAL";     } }

圖解

Flink中分區(qū)策略源碼是什么

ShufflePartitioner

簡介

隨機選擇一個下游算子實例進行發(fā)送

源碼解讀

/**  * 隨機的選擇一個channel進行發(fā)送  * @param   */ @Internal public class ShufflePartitioner extends StreamPartitioner {     private static final long serialVersionUID = 1L;      private Random random = new Random();      @Override     public int selectChannel(SerializationDelegate> record) {         //產(chǎn)生[0,numberOfChannels)偽隨機數(shù),隨機發(fā)送到下游的某個task         return random.nextInt(numberOfChannels);     }      @Override     public StreamPartitioner copy() {         return new ShufflePartitioner();     }      @Override     public String toString() {         return "SHUFFLE";     } }

圖解

Flink中分區(qū)策略源碼是什么

BroadcastPartitioner

簡介

發(fā)送到下游所有的算子實例

源碼解讀

/**  * 發(fā)送到所有的channel  */ @Internal public class BroadcastPartitioner extends StreamPartitioner {     private static final long serialVersionUID = 1L;     /**      * Broadcast模式是直接發(fā)送到下游的所有task,所以不需要通過下面的方法選擇發(fā)送的通道      */     @Override     public int selectChannel(SerializationDelegate> record) {         throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");     }      @Override     public boolean isBroadcast() {         return true;     }      @Override     public StreamPartitioner copy() {         return this;     }      @Override     public String toString() {         return "BROADCAST";     } }

圖解

Flink中分區(qū)策略源碼是什么

RebalancePartitioner

簡介

通過循環(huán)的方式依次發(fā)送到下游的task

源碼解讀

/**  *通過循環(huán)的方式依次發(fā)送到下游的task  * @param   */ @Internal public class RebalancePartitioner extends StreamPartitioner {     private static final long serialVersionUID = 1L;      private int nextChannelToSendTo;      @Override     public void setup(int numberOfChannels) {         super.setup(numberOfChannels);         //初始化channel的id,返回[0,numberOfChannels)的偽隨機數(shù)         nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);     }      @Override     public int selectChannel(SerializationDelegate> record) {         //循環(huán)依次發(fā)送到下游的task,比如:nextChannelToSendTo初始值為0,numberOfChannels(下游算子的實例個數(shù),并行度)值為2         //則第一次發(fā)送到ID = 1的task,第二次發(fā)送到ID = 0的task,第三次發(fā)送到ID = 1的task上...依次類推         nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;         return nextChannelToSendTo;     }      public StreamPartitioner copy() {         return this;     }      @Override     public String toString() {         return "REBALANCE";     } }

圖解

Flink中分區(qū)策略源碼是什么

RescalePartitioner

簡介

基于上下游Operator的并行度,將記錄以循環(huán)的方式輸出到下游Operator的每個實例。

舉例:  上游并行度是2,下游是4,則上游一個并行度以循環(huán)的方式將記錄輸出到下游的兩個并行度上;上游另一個并行度以循環(huán)的方式將記錄輸出到下游另兩個并行度上。

若上游并行度是4,下游并行度是2,則上游兩個并行度將記錄輸出到下游一個并行度上;上游另兩個并行度將記錄輸出到下游另一個并行度上。

源碼解讀

@Internal public class RescalePartitioner extends StreamPartitioner {     private static final long serialVersionUID = 1L;      private int nextChannelToSendTo = -1;      @Override     public int selectChannel(SerializationDelegate> record) {         if (++nextChannelToSendTo >= numberOfChannels) {             nextChannelToSendTo = 0;         }         return nextChannelToSendTo;     }      public StreamPartitioner copy() {         return this;     }      @Override     public String toString() {         return "RESCALE";     } }

圖解

Flink中分區(qū)策略源碼是什么

尖叫提示

Flink 中的執(zhí)行圖可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph ->  物理執(zhí)行圖。

StreamGraph:是根據(jù)用戶通過 Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓撲結(jié)構(gòu)。

JobGraph:StreamGraph經(jīng)過優(yōu)化后生成了 JobGraph,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)。主要的優(yōu)化為,將多個符合條件的節(jié)點  chain 在一起作為一個節(jié)點,這樣可以減少數(shù)據(jù)在節(jié)點之間流動所需要的序列化/反序列化/傳輸消耗。

ExecutionGraph:JobManager 根據(jù) JobGraph  生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)。

物理執(zhí)行圖:JobManager 根據(jù) ExecutionGraph 對 Job 進行調(diào)度后,在各個TaskManager 上部署 Task  后形成的“圖”,并不是一個具體的數(shù)據(jù)結(jié)構(gòu)。

而StreamingJobGraphGenerator就是StreamGraph轉(zhuǎn)換為JobGraph。在這個類中,把ForwardPartitioner和RescalePartitioner列為POINTWISE分配模式,其他的為ALL_TO_ALL分配模式。代碼如下:

if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) {             jobEdge = downStreamVertex.connectNewDataSetAsInput(                 headVertex,                 // 上游算子(生產(chǎn)端)的實例(subtask)連接下游算子(消費端)的一個或者多個實例(subtask)                 DistributionPattern.POINTWISE,                 resultPartitionType);         } else {             jobEdge = downStreamVertex.connectNewDataSetAsInput(                 headVertex,                 // 上游算子(生產(chǎn)端)的實例(subtask)連接下游算子(消費端)的所有實例(subtask)                 DistributionPattern.ALL_TO_ALL,                 resultPartitionType);         }

ForwardPartitioner

簡介

發(fā)送到下游對應的第一個task,保證上下游算子并行度一致,即上有算子與下游算子是1:1的關系

源碼解讀

/**  * 發(fā)送到下游對應的第一個task  * @param   */ @Internal public class ForwardPartitioner extends StreamPartitioner {     private static final long serialVersionUID = 1L;      @Override     public int selectChannel(SerializationDelegate> record) {         return 0;     }      public StreamPartitioner copy() {         return this;     }      @Override     public String toString() {         return "FORWARD";     } }

圖解

Flink中分區(qū)策略源碼是什么

尖叫提示

在上下游的算子沒有指定分區(qū)器的情況下,如果上下游的算子并行度一致,則使用ForwardPartitioner,否則使用RebalancePartitioner,對于ForwardPartitioner,必須保證上下游算子并行度一致,否則會拋出異常

//在上下游的算子沒有指定分區(qū)器的情況下,如果上下游的算子并行度一致,則使用ForwardPartitioner,否則使用RebalancePartitioner             if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {                 partitioner = new ForwardPartitioner();             } else if (partitioner == null) {                 partitioner = new RebalancePartitioner();             }              if (partitioner instanceof ForwardPartitioner) {                 //如果上下游的并行度不一致,會拋出異常                 if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {                     throw new UnsupportedOperationException("Forward partitioning does not allow " +                         "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +                         ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +                         " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");                 }             }

KeyGroupStreamPartitioner

簡介

根據(jù)key的分組索引選擇發(fā)送到相對應的下游subtask

源碼解讀

/**  * 根據(jù)key的分組索引選擇發(fā)送到相對應的下游subtask  * @param   * @param   */ @Internal public class KeyGroupStreamPartitioner extends StreamPartitioner implements ConfigurableStreamPartitioner { ...      @Override     public int selectChannel(SerializationDelegate> record) {         K key;         try {             key = keySelector.getKey(record.getInstance().getValue());         } catch (Exception e) {             throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);         }         //調(diào)用KeyGroupRangeAssignment類的assignKeyToParallelOperator方法,代碼如下所示         return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);     } ... }
  • org.apache.flink.runtime.state.KeyGroupRangeAssignment

public final class KeyGroupRangeAssignment { ...      /**      * 根據(jù)key分配一個并行算子實例的索引,該索引即為該key要發(fā)送的下游算子實例的路由信息,      * 即該key發(fā)送到哪一個task      */     public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {         Preconditions.checkNotNull(key, "Assigned key must not be null!");         return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));     }      /**      *根據(jù)key分配一個分組id(keyGroupId)      */     public static int assignToKeyGroup(Object key, int maxParallelism) {         Preconditions.checkNotNull(key, "Assigned key must not be null!");         //獲取key的hashcode         return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);     }      /**      * 根據(jù)key分配一個分組id(keyGroupId),      */     public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {          //與maxParallelism取余,獲取keyGroupId         return MathUtils.murmurHash(keyHash) % maxParallelism;     }      //計算分區(qū)index,即該key group應該發(fā)送到下游的哪一個算子實例     public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {         return keyGroupId * parallelism / maxParallelism;     } ...

圖解

Flink中分區(qū)策略源碼是什么

CustomPartitionerWrapper

簡介

通過Partitioner實例的partition方法(自定義的)將記錄輸出到下游。

public class CustomPartitionerWrapper extends StreamPartitioner {     private static final long serialVersionUID = 1L;      Partitioner partitioner;     KeySelector keySelector;      public CustomPartitionerWrapper(Partitioner partitioner, KeySelector keySelector) {         this.partitioner = partitioner;         this.keySelector = keySelector;     }      @Override     public int selectChannel(SerializationDelegate> record) {         K key;         try {             key = keySelector.getKey(record.getInstance().getValue());         } catch (Exception e) {             throw new RuntimeException("Could not extract key from " + record.getInstance(), e);         } //實現(xiàn)Partitioner接口,重寫partition方法         return partitioner.partition(key, numberOfChannels);     }      @Override     public StreamPartitioner copy() {         return this;     }      @Override     public String toString() {         return "CUSTOM";     } }

比如:

public class CustomPartitioner implements Partitioner {       // key: 根據(jù)key的值來分區(qū)       // numPartitions: 下游算子并行度       @Override       public int partition(String key, int numPartitions) {          return key.length() % numPartitions;//在此處定義分區(qū)策略       }   }

關于“Flink中分區(qū)策略源碼是什么”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。


網(wǎng)站欄目:Flink中分區(qū)策略源碼是什么
文章來源:http://weahome.cn/article/jsoddd.html

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部