Spark源碼分析之分區(qū)器的作用
最近因為手抖,在Spark中給自己挖了一個數(shù)據(jù)傾斜的坑。為了解決這個問題,順便研究了下Spark分區(qū)器的原理,趁著周末加班總結(jié)一下~
先說說數(shù)據(jù)傾斜
數(shù)據(jù)傾斜是指Spark中的RDD在計算的時候,每個RDD內(nèi)部的分區(qū)包含的數(shù)據(jù)不平均。比如一共有5個分區(qū),其中一個占有了90%的數(shù)據(jù),這就導(dǎo)致本來5個分區(qū)可以5個人一起并行干活,結(jié)果四個人不怎么干活,工作全都壓到一個人身上了。遇到這種問題,網(wǎng)上有很多的解決辦法。
但是如果是底層數(shù)據(jù)的問題,無論怎么優(yōu)化,還是無法解決數(shù)據(jù)傾斜的。
比如你想要對某個rdd做groupby,然后做join操作,如果分組的key就是分布不均勻的,那么真樣都是無法優(yōu)化的。因為一旦這個key被切分,就無法完整的做join了,如果不對這個key切分,必然會造成對應(yīng)的分區(qū)數(shù)據(jù)傾斜。
不過,了解數(shù)據(jù)為什么會傾斜還是很重要的,繼續(xù)往下看吧!
分區(qū)的作用
在PairRDD即(key,value)這種格式的rdd中,很多操作都是基于key的,因此為了獨立分割任務(wù),會按照key對數(shù)據(jù)進行重組。比如groupbykey
重組肯定是需要一個規(guī)則的,最常見的就是基于Hash,Spark還提供了一種稍微復(fù)雜點的基于抽樣的Range分區(qū)方法。
下面我們先看看分區(qū)器在Spark計算流程中是怎么使用的:
Paritioner的使用
就拿groupbykey來說:
- def groupByKey(): JavaPairRDD[K, JIterable[V]] =
- fromRDD(groupByResultToJava(rdd.groupByKey()))
它會調(diào)用PairRDDFunction的groupByKey()方法
- def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
- groupByKey(defaultPartitioner(self))
- }
在這個方法里面創(chuàng)建了默認的分區(qū)器。默認的分區(qū)器是這樣定義的:
- def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
- val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
- for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
- return r.partitioner.get
- }
- if (rdd.context.conf.contains("spark.default.parallelism")) {
- new HashPartitioner(rdd.context.defaultParallelism)
- } else {
- new HashPartitioner(bySize.head.partitions.size)
- }
- }
首先獲取當前分區(qū)的分區(qū)個數(shù),如果沒有設(shè)置spark.default.parallelism參數(shù),則創(chuàng)建一個跟之前分區(qū)個數(shù)一樣的Hash分區(qū)器。
當然,用戶也可以自定義分區(qū)器,或者使用其他提供的分區(qū)器。API里面也是支持的:
- // 傳入分區(qū)器對象
- def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]] =
- fromRDD(groupByResultToJava(rdd.groupByKey(partitioner)))
- // 傳入分區(qū)的個數(shù)
- def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]] =
- fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))
HashPatitioner
Hash分區(qū)器,是最簡單也是默認提供的分區(qū)器,了解它的分區(qū)規(guī)則,對我們處理數(shù)據(jù)傾斜或者設(shè)計分組的key時,還是很有幫助的。
- class HashPartitioner(partitions: Int) extends Partitioner {
- require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
- def numPartitions: Int = partitions
- // 通過key計算其HashCode,并根據(jù)分區(qū)數(shù)取模。如果結(jié)果小于0,直接加上分區(qū)數(shù)。
- def getPartition(key: Any): Int = key match {
- case null => 0
- case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
- }
- // 對比兩個分區(qū)器是否相同,直接對比其分區(qū)個數(shù)就行
- override def equals(other: Any): Boolean = other match {
- case h: HashPartitioner =>
- h.numPartitions == numPartitions
- case _ =>
- false
- }
- override def hashCode: Int = numPartitions
- }
這里最重要的是這個Utils.nonNegativeMod(key.hashCode, numPartitions),它決定了數(shù)據(jù)進入到哪個分區(qū)。
- def nonNegativeMod(x: Int, mod: Int): Int = {
- val rawMod = x % mod
- rawMod + (if (rawMod < 0) mod else 0)
- }
說白了,就是基于這個key獲取它的hashCode,然后對分區(qū)個數(shù)取模。由于HashCode可能為負,這里直接判斷下,如果小于0,再加上分區(qū)個數(shù)即可。
因此,基于hash的分區(qū),只要保證你的key是分散的,那么最終數(shù)據(jù)就不會出現(xiàn)數(shù)據(jù)傾斜的情況。
RangePartitioner
這個分區(qū)器,適合想要把數(shù)據(jù)打散的場景,但是如果相同的key重復(fù)量很大,依然會出現(xiàn)數(shù)據(jù)傾斜的情況。
每個分區(qū)器,最核心的方法,就是getPartition
- def getPartition(key: Any): Int = {
- val k = key.asInstanceOf[K]
- var partition = 0
- if (rangeBounds.length <= 128) {
- // If we have less than 128 partitions naive search
- while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
- partition += 1
- }
- } else {
- // Determine which binary search method to use only once.
- partition = binarySearch(rangeBounds, k)
- // binarySearch either returns the match location or -[insertion point]-1
- if (partition < 0) {
- partition = -partition-1
- }
- if (partition > rangeBounds.length) {
- partition = rangeBounds.length
- }
- }
- if (ascending) {
- partition
- } else {
- rangeBounds.length - partition
- }
- }
在range分區(qū)中,會存儲一個邊界的數(shù)組,比如[1,100,200,300,400],然后對比傳進來的key,返回對應(yīng)的分區(qū)id。
那么這個邊界是怎么確定的呢?
這就是Range分區(qū)最核心的算法了,大概描述下,就是遍歷每個paritiion,對里面的數(shù)據(jù)進行抽樣,把抽樣的數(shù)據(jù)進行排序,并按照對應(yīng)的權(quán)重確定邊界。
有幾個比較重要的地方:
1 抽樣
2 確定邊界
關(guān)于抽樣,有一個很常見的算法題,即在不知道數(shù)據(jù)規(guī)模的情況下,如何以等概率的方式,隨機選擇一個值。
最笨的辦法,就是遍歷一次數(shù)據(jù),知道數(shù)據(jù)的規(guī)模,然后隨機一個數(shù),取其對應(yīng)的值。其實這樣相當于遍歷了兩次(第二次的取值根據(jù)不同的存儲介質(zhì),可能不同)。
在Spark中,是使用水塘抽樣這種算法。即首先取***個值,然后依次往后遍歷;第二個值有二分之一的幾率替換選出來的值;第三個值有三分之一的幾率替換選出來的值;…;直到遍歷到***一個值。這樣,通過依次遍歷就取出來隨機的數(shù)值了。
算法參考源碼:
- private var rangeBounds: Array[K] = {
- if (partitions <= 1) {
- Array.empty
- } else {
- // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
- // ***采樣數(shù)量不能超過1M。比如,如果分區(qū)是5,采樣數(shù)為100
- val sampleSize = math.min(20.0 * partitions, 1e6)
- // Assume the input partitions are roughly balanced and over-sample a little bit.
- // 每個分區(qū)的采樣數(shù)為平均值的三倍,避免數(shù)據(jù)傾斜造成的數(shù)據(jù)量過少
- val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt
- // 真正的采樣算法(參數(shù)1:rdd的key數(shù)組, 采樣個數(shù))
- val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
- if (numItems == 0L) {
- Array.empty
- } else {
- // If a partition contains much more than the average number of items, we re-sample from it
- // to ensure that enough items are collected from that partition.
- // 如果有的分區(qū)包含的數(shù)量遠超過平均值,那么需要對它重新采樣。每個分區(qū)的采樣數(shù)/采樣返回的總的記錄數(shù)
- val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
- //保存有效的采樣數(shù)
- val candidates = ArrayBuffer.empty[(K, Float)]
- //保存數(shù)據(jù)傾斜導(dǎo)致的采樣數(shù)過多的信息
- val imbalancedPartitions = mutable.Set.empty[Int]
- sketched.foreach { case (idx, n, sample) =>
- if (fraction * n > sampleSizePerPartition) {
- imbalancedPartitions += idx
- } else {
- // The weight is 1 over the sampling probability.
- val weight = (n.toDouble / sample.size).toFloat
- for (key <- sample) {
- candidates += ((key, weight))
- }
- }
- }
- if (imbalancedPartitions.nonEmpty) {
- // Re-sample imbalanced partitions with the desired sampling probability.
- val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
- val seed = byteswap32(-rdd.id - 1)
- //基于RDD獲取采樣數(shù)據(jù)
- val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
- val weight = (1.0 / fraction).toFloat
- candidates ++= reSampled.map(x => (x, weight))
- }
- RangePartitioner.determineBounds(candidates, partitions)
- }
- }
- }
- def sketch[K : ClassTag](
- rdd: RDD[K],
- sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
- val shift = rdd.id
- // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
- val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
- val seed = byteswap32(idx ^ (shift << 16))
- val (sample, n) = SamplingUtils.reservoirSampleAndCount(
- iter, sampleSizePerPartition, seed)
- //包裝成三元組,(索引號,分區(qū)的內(nèi)容個數(shù),抽樣的內(nèi)容)
- Iterator((idx, n, sample))
- }.collect()
- val numItems = sketched.map(_._2).sum
- //返回(數(shù)據(jù)條數(shù),(索引號,分區(qū)的內(nèi)容個數(shù),抽樣的內(nèi)容))
- (numItems, sketched)
- }
真正的抽樣算法在SamplingUtils中,由于在Spark中是需要一次性取多個值的,因此直接去前n個數(shù)值,然后依次概率替換即可:
- def reservoirSampleAndCount[T: ClassTag](
- input: Iterator[T],
- k: Int,
- seed: Long = Random.nextLong())
- : (Array[T], Long) = {
- //創(chuàng)建臨時數(shù)組
- val reservoir = new Array[T](k)
- // Put the first k elements in the reservoir.
- // 取出前k個數(shù),并把對應(yīng)的rdd中的數(shù)據(jù)放入對應(yīng)的序號的數(shù)組中
- var i = 0
- while (i < k && input.hasNext) {
- val item = input.next()
- reservoir(i) = item
- i += 1
- }
- // If we have consumed all the elements, return them. Otherwise do the replacement.
- // 如果全部的元素,比要抽取的采樣數(shù)少,那么直接返回
- if (i < k) {
- // If input size < k, trim the array to return only an array of input size.
- val trimReservoir = new Array[T](i)
- System.arraycopy(reservoir, 0, trimReservoir, 0, i)
- (trimReservoir, i)
- // 否則開始抽樣替換
- } else {
- // If input size > k, continue the sampling process.
- // 從剛才的序號開始,繼續(xù)遍歷
- var l = i.toLong
- // 隨機數(shù)
- val rand = new XORShiftRandom(seed)
- while (input.hasNext) {
- val item = input.next()
- // 隨機一個數(shù)與當前的l相乘,如果小于采樣數(shù)k,就替換。(越到后面,替換的概率越小...)
- val replacementIndex = (rand.nextDouble() * l).toLong
- if (replacementIndex < k) {
- reservoir(replacementIndex.toInt) = item
- }
- l += 1
- }
- (reservoir, l)
- }
- }
確定邊界
***就可以通過獲取的樣本數(shù)據(jù),確定邊界了。
- def determineBounds[K : Ordering : ClassTag](
- candidates: ArrayBuffer[(K, Float)],
- partitions: Int): Array[K] = {
- val ordering = implicitly[Ordering[K]]
- // 數(shù)據(jù)格式為(key,權(quán)重)
- val ordered = candidates.sortBy(_._1)
- val numCandidates = ordered.size
- val sumWeights = ordered.map(_._2.toDouble).sum
- val step = sumWeights / partitions
- var cumWeight = 0.0
- var target = step
- val bounds = ArrayBuffer.empty[K]
- var i = 0
- var j = 0
- var previousBound = Option.empty[K]
- while ((i < numCandidates) && (j < partitions - 1)) {
- val (key, weight) = ordered(i)
- cumWeight += weight
- if (cumWeight >= target) {
- // Skip duplicate values.
- if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
- bounds += key
- target += step
- j += 1
- previousBound = Some(key)
- }
- }
- i += 1
- }
- bounds.toArray
- }
直接看代碼,還是有些晦澀難懂,我們舉個例子,一步一步解釋下:
按照上面的算法流程,大致可以理解:
- 抽樣-->確定邊界(排序)
首先對spark有一定了解的都應(yīng)該知道,在spark中每個RDD可以理解為一組分區(qū),這些分區(qū)對應(yīng)了內(nèi)存塊block,他們才是數(shù)據(jù)最終的載體。那么一個RDD由不同的分區(qū)組成,這樣在處理一些map,filter等算子的時候,就可以直接以分區(qū)為單位并行計算了。直到遇到shuffle的時候才需要和其他的RDD配合。
在上面的圖中,如果我們不特殊設(shè)置的話,一個RDD由3個分區(qū)組成,那么在對它進行g(shù)roupbykey的時候,就會按照3進行分區(qū)。
按照上面的算法流程,如果分區(qū)數(shù)為3,那么采樣的大小為:
- val sampleSize = math.min(20.0 * partitions, 1e6)
即采樣數(shù)為60,每個分區(qū)取60個數(shù)。但是考慮到數(shù)據(jù)傾斜的情況,有的分區(qū)可能數(shù)據(jù)很多,因此在實際的采樣時,會按照3倍大小采樣:
- val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt
也就是說,最多會取60個樣本數(shù)據(jù)。
然后就是遍歷每個分區(qū),取對應(yīng)的樣本數(shù)。
- val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
- val seed = byteswap32(idx ^ (shift << 16))
- val (sample, n) = SamplingUtils.reservoirSampleAndCount(
- iter, sampleSizePerPartition, seed)
- //包裝成三元組,(索引號,分區(qū)的內(nèi)容個數(shù),抽樣的內(nèi)容)
- Iterator((idx, n, sample))
- }.collect()
然后檢查,是否有分區(qū)的樣本數(shù)過多,如果多于平均值,則繼續(xù)采樣,這時直接用sample 就可以了
- sketched.foreach { case (idx, n, sample) =>
- if (fraction * n > sampleSizePerPartition) {
- imbalancedPartitions += idx
- } else {
- // The weight is 1 over the sampling probability.
- val weight = (n.toDouble / sample.size).toFloat
- for (key <- sample) {
- candidates += ((key, weight))
- }
- }
- }
- if (imbalancedPartitions.nonEmpty) {
- // Re-sample imbalanced partitions with the desired sampling probability.
- val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
- val seed = byteswap32(-rdd.id - 1)
- //基于RDD獲取采樣數(shù)據(jù)
- val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
- val weight = (1.0 / fraction).toFloat
- candidates ++= reSampled.map(x => (x, weight))
- }
取出樣本后,就到了確定邊界的時候了。
注意每個key都會有一個權(quán)重,這個權(quán)重是 【分區(qū)的數(shù)據(jù)總數(shù)/樣本數(shù)】
- RangePartitioner.determineBounds(candidates, partitions)
首先排序val ordered = candidates.sortBy(_._1),然后確定一個權(quán)重的步長
- val sumWeights = ordered.map(_._2.toDouble).sum
- val step = sumWeights / partitions
基于該步長,確定邊界,***就形成了幾個范圍數(shù)據(jù)。
然后分區(qū)器形成二叉樹,遍歷該數(shù)確定每個key對應(yīng)的分區(qū)id
- partition = binarySearch(rangeBounds, k)
實踐 —— 自定義分區(qū)器
自定義分區(qū)器,也是很簡單的,只需要實現(xiàn)對應(yīng)的兩個方法就行:
- public class MyPartioner extends Partitioner {
- @Override
- public int numPartitions() {
- return 1000;
- }
- @Override
- public int getPartition(Object key) {
- String k = (String) key;
- int code = k.hashCode() % 1000;
- System.out.println(k+":"+code);
- return code < 0?code+1000:code;
- }
- @Override
- public boolean equals(Object obj) {
- if(obj instanceof MyPartioner){
- if(this.numPartitions()==((MyPartioner) obj).numPartitions()){
- return true;
- }
- return false;
- }
- return super.equals(obj);
- }
- }
使用的時候,可以直接new一個對象即可。
- pairRdd.groupbykey(new MyPartitioner())
這樣自定義分區(qū)器就完成了。