這期內(nèi)容當(dāng)中小編將會給大家?guī)碛嘘P(guān)如何進(jìn)行Spark Shuffle 原理分析,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
10年積累的網(wǎng)站制作、成都網(wǎng)站制作經(jīng)驗(yàn),可以快速應(yīng)對客戶對網(wǎng)站的新想法和需求。提供各種問題對應(yīng)的解決方案。讓選擇我們的客戶得到更好、更有力的網(wǎng)絡(luò)服務(wù)。我雖然不認(rèn)識你,你也不認(rèn)識我。但先網(wǎng)站設(shè)計(jì)后付款的網(wǎng)站建設(shè)流程,更有江北免費(fèi)網(wǎng)站建設(shè)讓你可以放心的選擇與我們合作。
Shuffle就是對數(shù)據(jù)進(jìn)行重組,由于分布式計(jì)算的特性和要求,在實(shí)現(xiàn)細(xì)節(jié)上更加繁瑣和復(fù)雜。 在MapReduce框架,Shuffle是連接Map和Reduce之間的橋梁,Map階段通過shuffle讀取數(shù)據(jù)并輸出到對應(yīng)的Reduce;而Reduce階段負(fù)責(zé)從Map端拉取數(shù)據(jù)并進(jìn)行計(jì)算。在整個shuffle過程中,往往伴隨著大量的磁盤和網(wǎng)絡(luò)I/O。所以shuffle性能的高低也直接決定了整個程序的性能高低。Spark也會有自己的shuffle實(shí)現(xiàn)過程。
在DAG調(diào)度的過程中,Stage階段的劃分是根據(jù)是否有shuffle過程,也就是存在wide Dependency寬依賴的時候,需要進(jìn)行shuffle,這時候會將作業(yè)job劃分成多個Stage,每一個stage內(nèi)部有很多可以并行運(yùn)行的task。 stage與stage之間的過程就是shuffle階段,在Spark的中,負(fù)責(zé)shuffle過程的執(zhí)行、計(jì)算和處理的組件主要就是ShuffleManager,也即shuffle管理器。ShuffleManager隨著Spark的發(fā)展有兩種實(shí)現(xiàn)的方式,分別為HashShuffleManager和SortShuffleManager,因此spark的Shuffle有Hash Shuffle和Sort Shuffle兩種。
在Spark 1.2以前,默認(rèn)的shuffle計(jì)算引擎是HashShuffleManager。 該ShuffleManager-HashShuffleManager有著一個非常嚴(yán)重的弊端,就是會產(chǎn)生大量的中間磁盤文件,進(jìn)而由大量的磁盤IO操作影響了性能。因此在Spark 1.2以后的版本中,默認(rèn)的ShuffleManager改成了SortShuffleManager。 SortShuffleManager相較于HashShuffleManager來說,有了一定的改進(jìn)。主要就在于每個Task在進(jìn)行shuffle操作時,雖然也會產(chǎn)生較多的臨時磁盤文件,但是最后會將所有的臨時文件合并(merge)成一個磁盤文件,因此每個Task就只有一個磁盤文件。在下一個stage的shuffle read task拉取自己的數(shù)據(jù)時,只要根據(jù)索引讀取每個磁盤文件中的部分?jǐn)?shù)據(jù)即可。
Hash shuffle
一種是普通運(yùn)行機(jī)制
另一種是合并的運(yùn)行機(jī)制。
HashShuffleManager的運(yùn)行機(jī)制主要分成兩種
合并機(jī)制主要是通過復(fù)用buffer來優(yōu)化Shuffle過程中產(chǎn)生的小文件的數(shù)量。
Hash shuffle是不具有排序的Shuffle。
這里我們先明確一個假設(shè)前提:每個Executor只有1個CPU core,也就是說,無論這個Executor上分配多少個task線程,同一時間都只能執(zhí)行一個task線程。 圖中有3個ReduceTask,從ShuffleMapTask 開始那邊各自把自己進(jìn)行 Hash 計(jì)算(分區(qū)器:hash/numreduce取模),分類出3個不同的類別,每個 ShuffleMapTask 都分成3種類別的數(shù)據(jù),想把不同的數(shù)據(jù)匯聚然后計(jì)算出最終的結(jié)果,所以ReduceTask 會在屬于自己類別的數(shù)據(jù)收集過來,匯聚成一個同類別的大集合,每1個 ShuffleMapTask 輸出3份本地文件,這里有4個 ShuffleMapTask,所以總共輸出了4 x 3個分類文件 = 12個本地小文件。
主要就是在一個stage結(jié)束計(jì)算之后,為了下一個stage可以執(zhí)行shuffle類的算子(比如reduceByKey,groupByKey),而將每個task處理的數(shù)據(jù)按key進(jìn)行“分區(qū)”。所謂“分區(qū)”,就是對相同的key執(zhí)行hash算法,從而將相同key都寫入同一個磁盤文件中,而每一個磁盤文件都只屬于reduce端的stage的一個task。在將數(shù)據(jù)寫入磁盤之前,會先將數(shù)據(jù)寫入內(nèi)存緩沖中,當(dāng)內(nèi)存緩沖填滿之后,才會溢寫到磁盤文件中去。 那么每個執(zhí)行shuffle write的task,要為下一個stage創(chuàng)建多少個磁盤文件呢? 很簡單,下一個stage的task有多少個,當(dāng)前stage的每個task就要創(chuàng)建多少份磁盤文件。比如下一個stage總共有100個task,那么當(dāng)前stage的每個task都要創(chuàng)建100份磁盤文件。如果當(dāng)前stage有50個task,總共有10個Executor,每個Executor執(zhí)行5個Task,那么每個Executor上總共就要創(chuàng)建500個磁盤文件,所有Executor上會創(chuàng)建5000個磁盤文件。由此可見,未經(jīng)優(yōu)化的shuffle write操作所產(chǎn)生的磁盤文件的數(shù)量是極其驚人的。
shuffle read,通常就是一個stage剛開始時要做的事情。此時該stage的每一個task就需要將上一個stage的計(jì)算結(jié)果中的所有相同key,從各個節(jié)點(diǎn)上通過網(wǎng)絡(luò)都拉取到自己所在的節(jié)點(diǎn)上,然后進(jìn)行key的聚合或連接等操作。由于shuffle write的過程中,task給Reduce端的stage的每個task都創(chuàng)建了一個磁盤文件,因此shuffle read的過程中,每個task只要從上游stage的所有task所在節(jié)點(diǎn)上,拉取屬于自己的那一個磁盤文件即可。 shuffle read的拉取過程是一邊拉取一邊進(jìn)行聚合的。每個shuffle read task都會有一個自己的buffer緩沖,每次都只能拉取與buffer緩沖相同大小的數(shù)據(jù),然后通過內(nèi)存中的一個Map進(jìn)行聚合等操作。聚合完一批數(shù)據(jù)后,再拉取下一批數(shù)據(jù),并放到buffer緩沖中進(jìn)行聚合操作。以此類推,直到最后將所有數(shù)據(jù)到拉取完,并得到最終的結(jié)果。
(1)buffer起到的是緩存作用,緩存能夠加速寫磁盤,提高計(jì)算的效率,buffer的默認(rèn)大小32k。 (2)分區(qū)器:根據(jù)hash/numRedcue取模決定數(shù)據(jù)由幾個Reduce處理,也決定了寫入幾個buffer中 (3)block file:磁盤小文件,從圖中我們可以知道磁盤小文件的個數(shù)計(jì)算公式: block file=M*R (4) M為map task的數(shù)量,R為Reduce的數(shù)量,一般Reduce的數(shù)量等于buffer的數(shù)量,都是由分區(qū)器決定的
(1).Shuffle階段在磁盤上會產(chǎn)生海量的小文件,建立通信和拉取數(shù)據(jù)的次數(shù)變多,此時會產(chǎn)生大量耗時低效的 IO 操作 (因?yàn)楫a(chǎn)生過多的小文件) (2).可能導(dǎo)致OOM,大量耗時低效的 IO 操作 ,導(dǎo)致寫磁盤時的對象過多,讀磁盤時候的對象也過多,這些對象存儲在堆內(nèi)存中,會導(dǎo)致堆內(nèi)存不足,相應(yīng)會導(dǎo)致頻繁的GC,GC會導(dǎo)致OOM。由于內(nèi)存中需要保存海量文件操作句柄和臨時信息,如果數(shù)據(jù)處理的規(guī)模比較龐大的話,內(nèi)存不可承受,會出現(xiàn) OOM 等問題
合并機(jī)制就是復(fù)用buffer緩沖區(qū),開啟合并機(jī)制的配置是spark.shuffle.consolidateFiles。該參數(shù)默認(rèn)值為false,將其設(shè)置為true即可開啟優(yōu)化機(jī)制。通常來說,如果我們使用HashShuffleManager,那么都建議開啟這個選項(xiàng)。
這里有6個這里有6個shuffleMapTask,數(shù)據(jù)類別還是分成3種類型,因?yàn)镠ash算法會根據(jù)你的 Key 進(jìn)行分類,在同一個進(jìn)程中,無論是有多少過Task,都會把同樣的Key放在同一個Buffer里,然后把Buffer中的數(shù)據(jù)寫入以Core數(shù)量為單位的本地文件中,(一個Core只有一種類型的Key的數(shù)據(jù)),每1個Task所在的進(jìn)程中,分別寫入共同進(jìn)程中的3份本地文件,這里有6個shuffleMapTasks,所以總共輸出是 2個Cores x 3個分類文件 = 6個本地小文件。
(1).啟動HashShuffle的合并機(jī)制ConsolidatedShuffle的配置 spark.shuffle.consolidateFiles=true(2).block file=Core*R Core為CPU的核數(shù),R為Reduce的數(shù)量
如果 Reducer 端的并行任務(wù)或者是數(shù)據(jù)分片過多的話則 Core * Reducer Task 依舊過大,也會產(chǎn)生很多小文件。
SortShuffleManager的運(yùn)行機(jī)制主要分成兩種,
一種是普通運(yùn)行機(jī)制
另一種是bypass運(yùn)行機(jī)制
在該模式下,數(shù)據(jù)會先寫入一個數(shù)據(jù)結(jié)構(gòu),聚合算子寫入Map,一邊通過Map局部聚合,一邊寫入內(nèi)存。Join算子寫入ArrayList直接寫入內(nèi)存中。然后需要判斷是否達(dá)到閾值(5M),如果達(dá)到就會將內(nèi)存數(shù)據(jù)結(jié)構(gòu)的數(shù)據(jù)寫入到磁盤,清空內(nèi)存數(shù)據(jù)結(jié)構(gòu)。 在溢寫磁盤前,先根據(jù)key進(jìn)行排序,排序過后的數(shù)據(jù),會分批寫入到磁盤文件中。默認(rèn)批次為10000條,數(shù)據(jù)會以每批一萬條寫入到磁盤文件。寫入磁盤文件通過緩沖區(qū)溢寫的方式,每次溢寫都會產(chǎn)生一個磁盤文件,也就是說一個task過程會產(chǎn)生多個臨時文件 。 最后在每個task中,將所有的臨時文件合并,這就是merge過程,此過程將所有臨時文件讀取出來,一次寫入到最終文件。意味著一個task的所有數(shù)據(jù)都在這一個文件中。同時單獨(dú)寫一份索引文件,標(biāo)識下游各個task的數(shù)據(jù)在文件中的索引start offset和end offset。 這樣算來如果第一個stage 50個task,每個Executor執(zhí)行一個task,那么無論下游有幾個task,就需要50*2=100個磁盤文件。
1. 小文件明顯變少了,一個task只生成一個file文件 2. file文件整體有序,加上索引文件的輔助,查找變快,雖然排序浪費(fèi)一些性能,但是查找變快很多
shuffle map task數(shù)量小于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值
不是聚合類的shuffle算子(比如reduceByKey)
該機(jī)制與sortshuffle的普通機(jī)制相比,在shuffleMapTask不多的情況下,首先寫的機(jī)制是不同,其次不會進(jìn)行排序。這樣就可以節(jié)約一部分性能開銷。
在shuffleMapTask數(shù)量小于默認(rèn)值200時,啟用bypass模式的sortShuffle(原因是數(shù)據(jù)量本身比較少,沒必要進(jìn)行sort全排序,因?yàn)閿?shù)據(jù)量少本身查詢速度就快,正好省了sort的那部分性能開銷。) 該機(jī)制與普通SortShuffleManager運(yùn)行機(jī)制的不同在于: 第一: 磁盤寫機(jī)制不同; 第二: 不會進(jìn)行sort排序;
碰到ShuffleDenpendency就進(jìn)行stage的劃分,ShuffleMapStage: 為shuffle提供數(shù)據(jù)的中間stage,ResultStage: 為一個action操作計(jì)算結(jié)果的stage。
解決的一個問題是resut task如何知道從哪個Executor去拉取Shuffle data
ShuffleWriter
(1)HashShuffleWriter
特點(diǎn):根據(jù)Hash分區(qū),分區(qū)數(shù)是m * n 個。
val counts: RDD[(String, Int)] = wordCount.reduceByKey(new HashPartitioner(2), (x, y) => x + y)
(2)SortShuffleWriter
特點(diǎn):
a、文件數(shù)量為m
b、如果需要排序或者需要combine,那么每一個partition數(shù)據(jù)排序要自己實(shí)現(xiàn)。(SortShuffleWriter里的sort指的是對partition的分區(qū)號進(jìn)行排序)
c、數(shù)據(jù)先放在內(nèi)存,內(nèi)存不夠則寫到磁盤中,最后再全部寫到磁盤。
(3)BypassMergeSortShuffleWriter
這種模式同時具有HashShuffleWriter和SortShuffleter的特點(diǎn)。因?yàn)槠鋵?shí)HashShufflerWriter的性能不錯,但是如果task數(shù)太多的話,性能話下降,所以Spark在task數(shù)較少的時候自動使用了這種模式,一開始還是像HashShufflerWriter那種生成多個文件,但是最后會把多個文件合并成一個文件。然后下游來讀取文件。默認(rèn)map的分區(qū)需要小于spark.shuffle.sort.bypassMergeThreshold(默認(rèn)是200),因?yàn)槿绾畏謪^(qū)數(shù)太多,產(chǎn)生的小文件就會很多性能就會下降。
默認(rèn)值:32k
參數(shù)說明:該參數(shù)用于設(shè)置shuffle write task的BufferedOutputStream的buffer緩沖大小。將數(shù)據(jù)寫到磁盤文件之前,會先寫入buffer緩沖中,待緩沖寫滿之后,才會溢寫到磁盤。
調(diào)優(yōu)建議:如果作業(yè)可用的內(nèi)存資源較為充足的話,可以適當(dāng)增加這個參數(shù)的大?。ū热?4k),從而減少shuffle write過程中溢寫磁盤文件的次數(shù),也就可以減少磁盤IO次數(shù),進(jìn)而提升性能。在實(shí)踐中發(fā)現(xiàn),合理調(diào)節(jié)該參數(shù),性能會有1%~5%的提升。
默認(rèn)值:48m
參數(shù)說明:該參數(shù)用于設(shè)置shuffle read task的buffer緩沖大小,而這個buffer緩沖決定了每次能夠拉取多少數(shù)據(jù)。
調(diào)優(yōu)建議:如果作業(yè)可用的內(nèi)存資源較為充足的話,可以適當(dāng)增加這個參數(shù)的大?。ū热?6m),從而減少拉取數(shù)據(jù)的次數(shù),也就可以減少網(wǎng)絡(luò)傳輸?shù)拇螖?shù),進(jìn)而提升性能。在實(shí)踐中發(fā)現(xiàn),合理調(diào)節(jié)該參數(shù),性能會有1%~5%的提升。
默認(rèn)值:3
參數(shù)說明:shuffle read task從shuffle write task所在節(jié)點(diǎn)拉取屬于自己的數(shù)據(jù)時,如果因?yàn)榫W(wǎng)絡(luò)異常導(dǎo)致拉取失敗,是會自動進(jìn)行重試的。該參數(shù)就代表了可以重試的最大次數(shù)。如果在指定次數(shù)之內(nèi)拉取還是沒有成功,就可能會導(dǎo)致作業(yè)執(zhí)行失敗。
調(diào)優(yōu)建議:對于那些包含了特別耗時的shuffle操作的作業(yè),建議增加重試最大次數(shù)(比如60次),以避免由于JVM的full gc或者網(wǎng)絡(luò)不穩(wěn)定等因素導(dǎo)致的數(shù)據(jù)拉取失敗。在實(shí)踐中發(fā)現(xiàn),對于針對超大數(shù)據(jù)量(數(shù)十億~上百億)的shuffle過程,調(diào)節(jié)該參數(shù)可以大幅度提升穩(wěn)定性。
默認(rèn)值:5s
參數(shù)說明:具體解釋同上,該參數(shù)代表了每次重試?yán)?shù)據(jù)的等待間隔,默認(rèn)是5s。
調(diào)優(yōu)建議:建議加大間隔時長(比如60s),以增加shuffle操作的穩(wěn)定性。
(Spark1.6是這個參數(shù),1.6以后參數(shù)變了,具體參考上一講Spark內(nèi)存模型知識)
默認(rèn)值:0.2
參數(shù)說明:該參數(shù)代表了Executor內(nèi)存中,分配給shuffle read task進(jìn)行聚合操作的內(nèi)存比例,默認(rèn)是20%。
調(diào)優(yōu)建議:在資源參數(shù)調(diào)優(yōu)中講解過這個參數(shù)。如果內(nèi)存充足,而且很少使用持久化操作,建議調(diào)高這個比例,給shuffle read的聚合操作更多內(nèi)存,以避免由于內(nèi)存不足導(dǎo)致聚合過程中頻繁讀寫磁盤。在實(shí)踐中發(fā)現(xiàn),合理調(diào)節(jié)該參數(shù)可以將性能提升10%左右。
默認(rèn)值:sort
參數(shù)說明:該參數(shù)用于設(shè)置ShuffleManager的類型。Spark 1.5以后,有三個可選項(xiàng):hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默認(rèn)選項(xiàng),但是Spark 1.2以及之后的版本默認(rèn)都是SortShuffleManager了。Spark1.6以后把hash方式給移除了,tungsten-sort與sort類似,但是使用了tungsten計(jì)劃中的堆外內(nèi)存管理機(jī)制,內(nèi)存使用效率更高。
調(diào)優(yōu)建議:由于SortShuffleManager默認(rèn)會對數(shù)據(jù)進(jìn)行排序,因此如果你的業(yè)務(wù)邏輯中需要該排序機(jī)制的話,則使用默認(rèn)的SortShuffleManager就可以;而如果你的業(yè)務(wù)邏輯不需要對數(shù)據(jù)進(jìn)行排序,那么建議參考后面的幾個參數(shù)調(diào)優(yōu),通過bypass機(jī)制或優(yōu)化的HashShuffleManager來避免排序操作,同時提供較好的磁盤讀寫性能。這里要注意的是,tungsten-sort要慎用,因?yàn)橹鞍l(fā)現(xiàn)了一些相應(yīng)的bug。
默認(rèn)值:200
參數(shù)說明:當(dāng)ShuffleManager為SortShuffleManager時,如果shuffle read task的數(shù)量小于這個閾值(默認(rèn)是200),則shuffle write過程中不會進(jìn)行排序操作,而是直接按照未經(jīng)優(yōu)化的HashShuffleManager的方式去寫數(shù)據(jù),但是最后會將每個task產(chǎn)生的所有臨時磁盤文件都合并成一個文件,并會創(chuàng)建單獨(dú)的索引文件。
調(diào)優(yōu)建議:當(dāng)你使用SortShuffleManager時,如果的確不需要排序操作,那么建議將這個參數(shù)調(diào)大一些,大于shuffle read task的數(shù)量。那么此時就會自動啟用bypass機(jī)制,map-side就不會進(jìn)行排序了,減少了排序的性能開銷。但是這種方式下,依然會產(chǎn)生大量的磁盤文件,因此shuffle write性能有待提高。
上述就是小編為大家分享的如何進(jìn)行Spark Shuffle 原理分析了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。