這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)碛嘘P(guān)spark shuffle如何理解,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
創(chuàng)新互聯(lián)主要從事成都網(wǎng)站設(shè)計(jì)、成都做網(wǎng)站、網(wǎng)頁設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)白城,10余年網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專業(yè),歡迎來電咨詢建站服務(wù):028-86922220
一個(gè)spark的RDD有一組固定的分區(qū)組成,每個(gè)分區(qū)有一系列的記錄組成。對(duì)于由窄依賴變換(例如map和filter)返回的RDD,會(huì)延續(xù)父RDD的分區(qū)信息,以pipeline的形式計(jì)算。每個(gè)對(duì)象僅依賴于父RDD中的單個(gè)對(duì)象。諸如coalesce之類的操作可能導(dǎo)致任務(wù)處理多個(gè)輸入分區(qū),但轉(zhuǎn)換仍然被認(rèn)為是窄依賴的,因?yàn)橐粋€(gè)父RDD的分區(qū)只會(huì)被一個(gè)子RDD分區(qū)繼承。
Spark還支持寬依賴的轉(zhuǎn)換,例如groupByKey和reduceByKey。在這些依賴項(xiàng)中,計(jì)算單個(gè)分區(qū)中的記錄所需的數(shù)據(jù)可以來自于父數(shù)據(jù)集的許多分區(qū)中。要執(zhí)行這些轉(zhuǎn)換,具有相同key的所有元組必須最終位于同一分區(qū)中,由同一任務(wù)處理。為了滿足這一要求,Spark產(chǎn)生一個(gè)shuffle,它在集群內(nèi)部傳輸數(shù)據(jù),并產(chǎn)生一個(gè)帶有一組新分區(qū)的新stage。
可以看下面的代碼片段:
sc.textFile("someFile.txt").map(mapFunc).flatMap(flatMapFunc).filter(filterFunc).count()
上面的代碼片段只有一個(gè)action操作,count,從輸入textfile到action經(jīng)過了三個(gè)轉(zhuǎn)換操作。這段代碼只會(huì)在一個(gè)stage中運(yùn)行,因?yàn)?,三個(gè)轉(zhuǎn)換操作沒有shuffle,也即是三個(gè)轉(zhuǎn)換操作的每個(gè)分區(qū)都是只依賴于它的父RDD的單個(gè)分區(qū)。
但是,下面的單詞統(tǒng)計(jì)就跟上面有很大區(qū)別:
val tokenized = sc.textFile(args(0)).flatMap(_.split(' '))
val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)
val filtered = wordCounts.filter(_._2 >= 1000)
val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _)
charCounts.collect()
這段代碼里有兩個(gè)reducebykey操作,三個(gè)stage。
下面圖更復(fù)雜,因?yàn)橛幸粋€(gè)join操作:
粉框圈住的就是整個(gè)DAG的stage劃分。
在每個(gè)stage的邊界,父stage的task會(huì)將數(shù)據(jù)寫入磁盤,子stage的task會(huì)將數(shù)據(jù)通過網(wǎng)絡(luò)讀取。由于它們會(huì)導(dǎo)致很高的磁盤和網(wǎng)絡(luò)IO,所以shuffle代價(jià)相當(dāng)高,應(yīng)該盡量避免。父stage的數(shù)據(jù)分區(qū)往往和子stage的分區(qū)數(shù)不同。觸發(fā)shuffle的操作算子往往可以指定分區(qū)數(shù)的,也即是numPartitions代表下個(gè)stage會(huì)有多少個(gè)分區(qū)。就像mr任務(wù)中reducer的數(shù)據(jù)是非常重要的一個(gè)參數(shù)一樣,shuffle的時(shí)候指定分區(qū)數(shù)也將在很大程度上決定一個(gè)應(yīng)用程序的性能。
通常情況可以選擇使用產(chǎn)生相同結(jié)果的action和transform相互替換。但是并不是產(chǎn)生相同結(jié)果的算子就會(huì)有相同的性能。通常避免常見的陷阱并選擇正確的算子可以顯著提高應(yīng)用程序的性能。
當(dāng)選擇轉(zhuǎn)換操作的時(shí)候,應(yīng)最小化shuffle次數(shù)和shuffle的數(shù)據(jù)量。shuffle是非常消耗性能的操作。所有的shuffle數(shù)據(jù)都會(huì)被寫入磁盤,然后通過網(wǎng)絡(luò)傳輸。repartition , join, cogroup, 和 *By 或者 *ByKey 類型的操作都會(huì)產(chǎn)生shuffle。我們可以對(duì)一下幾個(gè)操作算子進(jìn)行優(yōu)化:
1. groupByKey某些情況下可以被reducebykey代替。
2. reduceByKey某些情況下可以被 aggregatebykey代替。
3. flatMap-join-groupBy某些情況下可以被cgroup代替。
在某些情況下,前面描述的轉(zhuǎn)換操作不會(huì)導(dǎo)致shuffle。當(dāng)先前的轉(zhuǎn)換操作已經(jīng)使用了和shuffle相同的分區(qū)器分區(qū)數(shù)據(jù)的時(shí)候,spark就不會(huì)產(chǎn)生shuffle。
舉個(gè)例子:
rdd1 = someRdd.reduceByKey(...)
rdd2 = someOtherRdd.reduceByKey(...)
rdd3 = rdd1.join(rdd2)
由于使用redcuebykey的時(shí)候沒有指定分區(qū)器,所以都是使用的默認(rèn)分區(qū)器,會(huì)導(dǎo)致rdd1和rdd2都采用的是hash分區(qū)器。兩個(gè)reducebykey操作會(huì)產(chǎn)生兩個(gè)shuffle過程。如果,數(shù)據(jù)集有相同的分區(qū)數(shù),執(zhí)行join操作的時(shí)候就不需要進(jìn)行額外的shuffle。由于數(shù)據(jù)集的分區(qū)相同,因此rdd1的任何單個(gè)分區(qū)中的key集合只能出現(xiàn)在rdd2的單個(gè)分區(qū)中。 因此,rdd3的任何單個(gè)輸出分區(qū)的內(nèi)容僅取決于rdd1中單個(gè)分區(qū)的內(nèi)容和rdd2中的單個(gè)分區(qū),并且不需要第三個(gè)shuffle。
例如,如果someRdd有四個(gè)分區(qū),someOtherRdd有兩個(gè)分區(qū),而reduceByKeys都使用三個(gè)分區(qū),運(yùn)行的任務(wù)集如下所示:
如果rdd1和rdd2使用不同的分區(qū)器或者相同的分區(qū)器不同的分區(qū)數(shù),僅僅一個(gè)數(shù)據(jù)集在join的過程中需要重新shuffle
在join的過程中為了避免shuffle,可以使用廣播變量。當(dāng)executor內(nèi)存可以存儲(chǔ)數(shù)據(jù)集,在driver端可以將其加載到一個(gè)hash表中,然后廣播到executor。然后,map轉(zhuǎn)換可以引用哈希表來執(zhí)行查找。
有時(shí)候需要打破最小化shuffle次數(shù)的規(guī)則。
當(dāng)增加并行度的時(shí)候,額外的shuffle是有利的。例如,數(shù)據(jù)中有一些文件是不可分割的,那么該大文件對(duì)應(yīng)的分區(qū)就會(huì)有大量的記錄,而不是說將數(shù)據(jù)分散到盡可能多的分區(qū)內(nèi)部來使用所有已經(jīng)申請(qǐng)cpu。在這種情況下,使用reparition重新產(chǎn)生更多的分區(qū)數(shù),以滿足后面轉(zhuǎn)換算子所需的并行度,這會(huì)提升很大性能。
使用reduce和aggregate操作將數(shù)據(jù)聚合到driver端,也是修改區(qū)數(shù)的很好的例子。
在對(duì)大量分區(qū)執(zhí)行聚合的時(shí)候,在driver的單線程中聚合會(huì)成為瓶頸。要減driver的負(fù)載,可以首先使用reducebykey或者aggregatebykey執(zhí)行一輪分布式聚合,同時(shí)將結(jié)果數(shù)據(jù)集分區(qū)數(shù)減少。實(shí)際思路是首先在每個(gè)分區(qū)內(nèi)部進(jìn)行初步聚合,同時(shí)減少分區(qū)數(shù),然后再將聚合的結(jié)果發(fā)到driver端實(shí)現(xiàn)最終聚合。典型的操作是treeReduce 和 treeAggregate。
當(dāng)聚合已經(jīng)按照key進(jìn)行分組時(shí),此方法特別適用。例如,假如一個(gè)程序計(jì)算語料庫中每個(gè)單詞出現(xiàn)的次數(shù),并將結(jié)果使用map返回到driver。一種方法是可以使用聚合操作完成在每個(gè)分區(qū)計(jì)算局部map,然后在driver中合并map??梢杂胊ggregateByKey以完全分布的方式進(jìn)行統(tǒng)計(jì),然后簡單的用collectAsMap將結(jié)果返回到driver。
上述就是小編為大家分享的spark shuffle如何理解了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。