上圖是一個job的提交流程圖,job提交的具體步驟如下
創(chuàng)新互聯(lián)公司長期為成百上千客戶提供的網(wǎng)站建設(shè)服務(wù),團隊從業(yè)經(jīng)驗10年,關(guān)注不同地域、不同群體,并針對不同對象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺,與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為道外企業(yè)提供專業(yè)的網(wǎng)站制作、網(wǎng)站設(shè)計,道外網(wǎng)站改版等技術(shù)服務(wù)。擁有10年豐富建站經(jīng)驗和眾多成功案例,為您定制開發(fā)。
- 一旦有action,就會觸發(fā)DagScheduler.runJob來提交任務(wù),主要是先生成邏輯執(zhí)行圖DAG,然后調(diào)用 finalStage = newStage() 來劃分 stage。
- new Stage() 的時候會調(diào)用 finalRDD 的 getParentStages();
- getParentStages() 從 finalRDD 出發(fā),反向 visit 邏輯執(zhí)行圖,遇到 NarrowDependency 就將依賴的 RDD 加入到 stage,遇到 ShuffleDependency 切開 stage,并遞歸到 ShuffleDepedency 依賴的 stage。
- 一個 ShuffleMapStage(不是最后形成 result 的 stage)形成后,會將該 stage 最后一個 RDD 注冊到MapOutputTrackerMaster.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size),這一步很重要,因為 shuffle 過程需要 MapOutputTrackerMaster 來指示 ShuffleMapTask 輸出數(shù)據(jù)的位置。
- 之后是submitStage(finalStage)
- 先確定該 stage 的 missingParentStages,使用getMissingParentStages(stage)。如果 parentStages 都可能已經(jīng)執(zhí)行過了,那么就為空了。
- 如果 missingParentStages 不為空,那么先遞歸提交 missing 的 parent stages,并將自己加入到 waitingStages 里面,等到 parent stages 執(zhí)行結(jié)束后,會觸發(fā)提交 waitingStages 里面的 stage。
- 如果 missingParentStages 為空,說明該 stage 可以立即執(zhí)行,那么就調(diào)用submitMissingTasks(stage, jobId)來生成和提交具體的 task。如果 stage 是 ShuffleMapStage,那么 new 出來與該 stage 最后一個 RDD 的 partition 數(shù)相同的 ShuffleMapTasks。如果 stage 是 ResultStage,那么 new 出來與 stage 最后一個 RDD 的 partition 個數(shù)相同的 ResultTasks。一個 stage 里面的 task 組成一個 TaskSet,最后調(diào)用taskScheduler.submitTasks(taskSet)來提交一整個 taskSet。
- taskScheduler會把task發(fā)給DriverActor進程,DriverActor序列話之后發(fā)給exector真正執(zhí)行。
上圖是task執(zhí)行流程,具體執(zhí)行過程如下
- Worker 端接收到 tasks 后,executor 將 task 包裝成 taskRunner,并從線程池中抽取出一個空閑線程運行 task。
- Executor 收到 serialized 的 task 后,先 deserialize 出正常的 task,然后運行 task 得到其執(zhí)行結(jié)果 directResult,這個結(jié)果要送回到 driver 那里。
- 如果 result 比較大(比如 groupByKey 的 result)先把 result 存放到本地的“內(nèi)存+磁盤”上,由 blockManager 來管理,只把存儲位置信息(indirectResult)發(fā)送給 driver。
- ShuffleMapTask 生成的是 MapStatus,MapStatus 包含兩項內(nèi)容:一是該 task 所在的 BlockManager 的 BlockManagerId(實際是 executorId + host, port, nettyPort),二是 task 輸出的每個 FileSegment 大小。
- ResultTask 生成的 result 的是 func 在 partition 上的執(zhí)行結(jié)果。**比如 count() 的 func 就是統(tǒng)計 partition 中 records 的個數(shù)。
- Driver 收到 task 的執(zhí)行結(jié)果 result 后會進行一系列的操作:
- a,首先告訴 taskScheduler 這個 task 已經(jīng)執(zhí)行完,然后去分析 result。
- b,如果是 ResultTask 的 result,那么可以使用 ResultHandler 對 result 進行 driver 端的計算(比如 count() 會對所有 ResultTask 的 result 作 sum)
- c,如果 result 是 ShuffleMapTask 的 MapStatus,那么需要將 MapStatus(ShuffleMapTask 輸出的 FileSegment 的位置和大小信息)存放到 mapOutputTrackerMaster 中的 mapStatuses 數(shù)據(jù)結(jié)構(gòu)中以便以后 reducer shuffle 的時候查詢
- d,如果 driver 收到的 task 是該 stage 中的最后一個 task,那么可以 submit 下一個 stage,如果該 stage 已經(jīng)是最后一個 stage,那么告訴 dagScheduler job 已經(jīng)完成
當前文章:spark(二):spark架構(gòu)及物理執(zhí)行圖
分享鏈接:
http://weahome.cn/article/jjjpcd.html