這篇文章主要介紹“coalesce與repartition怎么使用”,在日常操作中,相信很多人在coalesce與repartition怎么使用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”coalesce與repartition怎么使用”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
為定邊等地區(qū)用戶提供了全套網(wǎng)頁設(shè)計制作服務(wù),及定邊網(wǎng)站建設(shè)行業(yè)解決方案。主營業(yè)務(wù)為成都網(wǎng)站建設(shè)、成都網(wǎng)站制作、定邊網(wǎng)站設(shè)計,以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠的服務(wù)。我們深信只要達(dá)到每一位用戶的要求,就會得到認(rèn)可,從而選擇與我們長期合作。這樣,我們也可以走得更遠(yuǎn)!
def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer:Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T]
coalesce算子最基本的功能就是返回一個numPartitions個partition的RDD。
這個算子的結(jié)果默認(rèn)是窄依賴,舉個例子
coalesce(100)
如果你想把1000個partition減少到100個partition,此時不會發(fā)生shuffle,而是每一個你設(shè)定的新partition都會替代原來的10個partition。如果初始的最大partition是100個,而你想用coalesce(1000)把partition數(shù)增至1000,這是不行的。
現(xiàn)在有一個需求,需要將某一個文件做ETL,最后想輸出成一個文件,你會怎么辦呢?
這樣么?
val logs=sc.textFile(args(0),6)//你想初始化6個分區(qū),并行執(zhí)行,之后再合并成1個文件 logs.map(x=>{ if(x.split("\t").length==72){ val clean=parse(x) //此處是進(jìn)行了ETL clean } }).coalesce(2).saveAsTextFile(args(1))
如果你同意的話,可以寫個demo測試一下,你會發(fā)現(xiàn),僅僅有一個task!在生產(chǎn)上這是絕對不行!因?yàn)樯鲜鯡TL的spark job僅僅有一個stage,你雖然初始化RDD是設(shè)定的6個partition,但是在action之前你使用了.coalesce(1),此時會優(yōu)先使用coalesce里面的partition數(shù)量初始化RDD,所以僅僅有一個task。生產(chǎn)中文件很大的話,你就只能用兩個節(jié)點(diǎn)處理,這樣無法發(fā)揮集群的優(yōu)勢了。解決:要在coalesce中加shuffle=tule
val logs=sc.textFile(args(0),6) logs.map(x=>{ if(x.split("\t").length==72){ val clean=parse(x) //此處是進(jìn)行了ETL clean } }).coalesce(2,shuffle = true).saveAsTextFile(args(1))
這樣,我們就會有兩個stage,stage1是6個并行高速ETL處理,stage2是通過shuffle合并成2個文件
如下圖
我們知道了,可以手動設(shè)定shuffle的發(fā)生,那么問題來了,剛剛我們不能將初始化的分區(qū)數(shù)變大,如果加上shuffle可不可以呢?答案是可以的~
如果出事RDD為100個分區(qū),你覺得并行度不夠,你可以coalesce(1000,shuffle = true),將分區(qū)數(shù)增加到1000(默認(rèn)hash partitioner進(jìn)行重新),當(dāng)然你也可以使用自定義分區(qū)器,但是一定要序列化。
coalesce算子默認(rèn)只能減少分區(qū)數(shù)量,但是可以通過開啟shuffle增加分區(qū)數(shù)量
coalesce的作用常常是減少分區(qū)數(shù),已達(dá)到輸出時合并小文件的效果。
在一個stage中,coalesce中設(shè)定的分區(qū)數(shù)是優(yōu)先級最高的,如果想增加并行度,并合并文件,那么請開啟coalesce中的shuffle,這樣就會變成兩個stage。達(dá)到并行且合并的效果。
/** * Return a new RDD that has exactly numPartitions partitions. * * Can increase or decrease the level of parallelism in this RDD. Internally, this uses * a shuffle to redistribute data. * * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, * which can avoid performing a shuffle. * * TODO Fix the Shuffle+Repartition data loss issue described in SPARK-23207. */ def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) }
這個算子前后是一個寬依賴,字面就是重新分區(qū)的意思,與coalesce不同,repartition一定會將分區(qū)變成numPartitions個的!通過看源碼可知,它底層時調(diào)用的coalesce算子,并且使用該算子一定會shuffle。
到此,關(guān)于“coalesce與repartition怎么使用”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!