這篇文章主要講解了“Spark生產(chǎn)作業(yè)容錯能力的負(fù)面影響有哪些”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Spark生產(chǎn)作業(yè)容錯能力的負(fù)面影響有哪些”吧!
創(chuàng)新互聯(lián)公司是一家專注于網(wǎng)站建設(shè)、做網(wǎng)站和綿陽主機托管的網(wǎng)絡(luò)公司,有著豐富的建站經(jīng)驗和案例。
在 Spark 中數(shù)據(jù)本地性通過 TaskLocality 來表示,有如下幾個級別,
PROCESS_LOCAL
NODE_LOCAL
NO_PREF
RACK_LOCAL
ANY
從上到下數(shù)據(jù)本地性依次遞減。
Spark 在執(zhí)行前通過數(shù)據(jù)的分區(qū)信息進行計算 Task 的 Locality,Task 總是會被優(yōu)先分配到它要計算的數(shù)據(jù)所在節(jié)點以盡可能地減少網(wǎng)絡(luò) IO。這個計算的過程通過 spark.locality.wait
默認(rèn)為3s,控制這個計算的過程。
原理這里不細講,簡而言之就是重試。Spark 規(guī)定了同一個 Job 中同一個 Stage 連續(xù)失敗重試的上限(spark.stage.maxConsecutiveAttempts
),默認(rèn)為4,也規(guī)定了一個 Stage 中 同一個 Task 可以失敗重試的次數(shù)(spark.task.maxFailures
),默認(rèn)為4。當(dāng)其中任何一個閾值達到上限,Spark 都會使整個 Job 失敗,停止可能的“無意義”的重試。
我們首先來看一個例子,如圖所示,圖為 Spark Stage 頁面下 Task Page 的詳細視圖。
第一列表示該 Task 進行了4次重試,所以這個 Task 對應(yīng)的 Job 也因此失敗了。
第三列表示該 Task 的數(shù)據(jù)本地性,都是 NODE_LOCAL 級別,對于一個從HDFS讀取數(shù)據(jù)的任務(wù),顯然獲得了最優(yōu)的數(shù)據(jù)本地性
第四列表示的是 Executor ID,我們可以看到我們?nèi)蝿?wù)的重試被分配到ID 為5和6兩個 Executor 上
第五列表示我們運行這些重試的 Task 所在的 Executor 所在的物理機地址,我們可以看到他們都被調(diào)度到了同一個
最后列表示每次重試失敗的錯誤棧
結(jié)合硬件層面的排查,發(fā)現(xiàn)是 NodeManager 物理節(jié)點上掛在的 /mnt/dfs/4,出現(xiàn)硬件故障導(dǎo)致盤只讀,ShuffleMapTask 在即將完成時,將index文件和data文件commit時,獲取index的臨時文件時候發(fā)生FileNotFoundException
。
java.io.FileNotFoundException: /mnt/dfs/4/yarn/local/usercache/da_haitao/appcache/application_1568691584183_1953115/blockmgr-1b6553f2-a564-4b31-a4a6-031f21c9c30f/0a/shuffle_96_2685_0.index.82594412-1f46-465e-a067-2c5e386a978e (No such file or directory) at java.io.FileOutputStream.open0(Native Method) at java.io.FileOutputStream.open(FileOutputStream.java:270) at java.io.FileOutputStream.(FileOutputStream.java:213) at java.io.FileOutputStream. (FileOutputStream.java:162) at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:144) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:245) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:190) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109)
3.2 問題二:為什么該 Task 的4次重試都在同一個物理節(jié)點?
這是由于 Driver 在調(diào)度該 Task 的時候進行了數(shù)據(jù)本地性的運算,而且在
spark.locality.wait
默認(rèn)為3s的時間約束內(nèi)成功獲得了NODE_LOCAL級別的數(shù)據(jù)本地性,故而都調(diào)度到了同一個
NodeManger
物理節(jié)點。
1. /mnt/dfs/4/yarn/local/2. usercache/da_haitao/appcache/application_1568691584183_1953115/ blockmgr-1b6553f2-a564-4b31-a4a6-031f21c9c30f/3. 0a/4. shuffle_96_2685_0.index5. .82594412-1f46-465e-a067-2c5e386a978e
spark.diskStore.subDirectories
默認(rèn)為64控制.data
結(jié)尾,另一個就是這個與之對應(yīng)的 .index
文件。96是 ShuffleID 表標(biāo)識是哪個Shuffle 過程,2685是 MapID 對應(yīng)的是 一個RDD 所以有分區(qū)中其中一個的順序號, 而0是一個固定值,原本表示是ReduceID,Spark Sort Based Shuffle 的實現(xiàn)不需要依賴這個值,所以被固定為了0。通過Shuffle ID和 MapId,Shufle Write 階段就可以生成類似shuffle_96_2685_0.index這樣的文件,而Shuffle Read 階段也可以通過兩個ID 定位到這個文件。scala> math.abs("shuffle_96_2685_0.index".hashCode) % 12res0: Int = 6
scala> def randomizeInPlace[T](arr: Array[Int], rand: java.util.Random = new java.util.Random): Array[Int] = {
| for (i <- (arr.length - 1) to 1 by -1) {
| val j = rand.nextInt(i + 1)
| val tmp = arr(j)
| arr(j) = arr(i)
| arr(i) = tmp
| }
| arr
| }
randomizeInPlace: [T](arr: Array[Int], rand: java.util.Random)Array[Int]
scala> randomizeInPlace(res11)
res23: Array[Int] = Array(3, 2, 4, 1)
scala> randomizeInPlace(res11)
res24: Array[Int] = Array(2, 3, 4, 1)
scala> randomizeInPlace(res11)
res25: Array[Int] = Array(2, 1, 3, 4)
scala> randomizeInPlace(res11)
res26: Array[Int] = Array(4, 2, 1, 3)
scala> randomizeInPlace(res11)
res27: Array[Int] = Array(2, 3, 4, 1)
感謝各位的閱讀,以上就是“Spark生產(chǎn)作業(yè)容錯能力的負(fù)面影響有哪些”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對Spark生產(chǎn)作業(yè)容錯能力的負(fù)面影響有哪些這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!