今天就跟大家聊聊有關(guān)大數(shù)據(jù)中Spark數(shù)據(jù)傾斜表現(xiàn)及解決方案是什么,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
我們提供的服務(wù)有:成都做網(wǎng)站、網(wǎng)站制作、微信公眾號開發(fā)、網(wǎng)站優(yōu)化、網(wǎng)站認(rèn)證、北海街道ssl等。為1000+企事業(yè)單位解決了網(wǎng)站和推廣的問題。提供周到的售前咨詢和貼心的售后服務(wù),是有科學(xué)管理、有技術(shù)的北海街道網(wǎng)站制作公司
Spark數(shù)據(jù)傾斜表現(xiàn)
Spark引擎的大部分task執(zhí)行時間比較一致,但是存在一些task的執(zhí)行時間特別長,例如,500個task,其中498個執(zhí)行較快,10分鐘執(zhí)行完成,剩余的兩個task需要執(zhí)行半個小時以上。
例行化執(zhí)行的代碼,某一天發(fā)生OOM問題,大概率是有數(shù)據(jù)傾斜了。
數(shù)據(jù)傾斜產(chǎn)生的原因是:shuffle的時候,需要將各個節(jié)點的相同的key拉取到同一個節(jié)點上,如果這個key對應(yīng)的數(shù)據(jù)量非常大的時候,就會發(fā)生數(shù)據(jù)傾斜。
數(shù)據(jù)傾斜只會發(fā)生在shuffle過程中,Spark引擎會觸發(fā)Shuffle的RDD算子有:distinct、repartition、reduceByKey、groupByKey、aggregateByKey、join
需要Shuffle的操作算子上直接設(shè)置并行度或者使用spark.default.parallelism設(shè)置。如果是Spark SQL,還可通過SET spark.sql.shuffle.partitions=num_tasks設(shè)置并行度。
該方法使用場景少,只能緩解數(shù)據(jù)傾斜,不能徹底解決數(shù)據(jù)傾斜。
通過Spark的Broadcast機制,將Reduce Join轉(zhuǎn)化為Map Join,避免Shuffle,從而完全消除Shuffle帶來的數(shù)據(jù)傾斜。
參與Join的一側(cè)數(shù)據(jù)集足夠小,并且主要適用于Join的場景,不適合聚合的場景,適用條件有限。
通過Spark的reduceByKey,統(tǒng)計每一個key的數(shù)量,超過指定數(shù)量的key或者數(shù)量top的key,作為異常key。當(dāng)然也可以使用Sample對RDD進(jìn)行抽樣后,進(jìn)行key的統(tǒng)計。
該方法的特點是:簡單、粗暴,有一定的適用場景。
這個可以理解為大招
對于單個RDD的Shuffle操作,如groupByKey,將key值加上一個隨機數(shù)的前綴。這樣就需要執(zhí)行二次聚合操作。
對于多個RDD的Shuffle操作,如join,將其中的一個有明顯數(shù)據(jù)傾斜的RDD的key,加上n以內(nèi)的隨機數(shù)的前綴,另一個RDD的每一個key,都加上0-n的前綴,相當(dāng)于RDD膨脹了n倍。
實際場景中可能需要上述方案的組合操作,比如:異常值過濾 + key值轉(zhuǎn)換:加隨機數(shù),可以進(jìn)行性能的優(yōu)化:根據(jù)異常值,對RDD進(jìn)行拆分:分別拆分成兩個RDD,對于沒有數(shù)據(jù)傾斜的,正常操作。對于有數(shù)據(jù)傾斜的加上隨機前綴,再進(jìn)行Shuffle操作。
看完上述內(nèi)容,你們對大數(shù)據(jù)中Spark數(shù)據(jù)傾斜表現(xiàn)及解決方案是什么有進(jìn)一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。