真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

spark(二):spark架構(gòu)及物理執(zhí)行圖

spark(二):spark架構(gòu)及物理執(zhí)行圖
上圖是一個job的提交流程圖,job提交的具體步驟如下

創(chuàng)新互聯(lián)公司長期為成百上千客戶提供的網(wǎng)站建設(shè)服務(wù),團隊從業(yè)經(jīng)驗10年,關(guān)注不同地域、不同群體,并針對不同對象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺,與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為道外企業(yè)提供專業(yè)的網(wǎng)站制作、網(wǎng)站設(shè)計,道外網(wǎng)站改版等技術(shù)服務(wù)。擁有10年豐富建站經(jīng)驗和眾多成功案例,為您定制開發(fā)。

  1. 一旦有action,就會觸發(fā)DagScheduler.runJob來提交任務(wù),主要是先生成邏輯執(zhí)行圖DAG,然后調(diào)用 finalStage = newStage() 來劃分 stage。
  2. new Stage() 的時候會調(diào)用 finalRDD 的 getParentStages();
  3. getParentStages() 從 finalRDD 出發(fā),反向 visit 邏輯執(zhí)行圖,遇到 NarrowDependency 就將依賴的 RDD 加入到 stage,遇到 ShuffleDependency 切開 stage,并遞歸到 ShuffleDepedency 依賴的 stage。
  4. 一個 ShuffleMapStage(不是最后形成 result 的 stage)形成后,會將該 stage 最后一個 RDD 注冊到MapOutputTrackerMaster.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size),這一步很重要,因為 shuffle 過程需要 MapOutputTrackerMaster 來指示 ShuffleMapTask 輸出數(shù)據(jù)的位置。
  5. 之后是submitStage(finalStage)
  6. 先確定該 stage 的 missingParentStages,使用getMissingParentStages(stage)。如果 parentStages 都可能已經(jīng)執(zhí)行過了,那么就為空了。
  7. 如果 missingParentStages 不為空,那么先遞歸提交 missing 的 parent stages,并將自己加入到 waitingStages 里面,等到 parent stages 執(zhí)行結(jié)束后,會觸發(fā)提交 waitingStages 里面的 stage。
  8. 如果 missingParentStages 為空,說明該 stage 可以立即執(zhí)行,那么就調(diào)用submitMissingTasks(stage, jobId)來生成和提交具體的 task。如果 stage 是 ShuffleMapStage,那么 new 出來與該 stage 最后一個 RDD 的 partition 數(shù)相同的 ShuffleMapTasks。如果 stage 是 ResultStage,那么 new 出來與 stage 最后一個 RDD 的 partition 個數(shù)相同的 ResultTasks。一個 stage 里面的 task 組成一個 TaskSet,最后調(diào)用taskScheduler.submitTasks(taskSet)來提交一整個 taskSet。
  9. taskScheduler會把task發(fā)給DriverActor進程,DriverActor序列話之后發(fā)給exector真正執(zhí)行。

spark(二):spark架構(gòu)及物理執(zhí)行圖
上圖是task執(zhí)行流程,具體執(zhí)行過程如下

  1. Worker 端接收到 tasks 后,executor 將 task 包裝成 taskRunner,并從線程池中抽取出一個空閑線程運行 task。
  2. Executor 收到 serialized 的 task 后,先 deserialize 出正常的 task,然后運行 task 得到其執(zhí)行結(jié)果 directResult,這個結(jié)果要送回到 driver 那里。
  3. 如果 result 比較大(比如 groupByKey 的 result)先把 result 存放到本地的“內(nèi)存+磁盤”上,由 blockManager 來管理,只把存儲位置信息(indirectResult)發(fā)送給 driver。
  4. ShuffleMapTask 生成的是 MapStatus,MapStatus 包含兩項內(nèi)容:一是該 task 所在的 BlockManager 的 BlockManagerId(實際是 executorId + host, port, nettyPort),二是 task 輸出的每個 FileSegment 大小。
  5. ResultTask 生成的 result 的是 func 在 partition 上的執(zhí)行結(jié)果。**比如 count() 的 func 就是統(tǒng)計 partition 中 records 的個數(shù)。
  6. Driver 收到 task 的執(zhí)行結(jié)果 result 后會進行一系列的操作:
  7. a,首先告訴 taskScheduler 這個 task 已經(jīng)執(zhí)行完,然后去分析 result。
  8. b,如果是 ResultTask 的 result,那么可以使用 ResultHandler 對 result 進行 driver 端的計算(比如 count() 會對所有 ResultTask 的 result 作 sum)
  9. c,如果 result 是 ShuffleMapTask 的 MapStatus,那么需要將 MapStatus(ShuffleMapTask 輸出的 FileSegment 的位置和大小信息)存放到 mapOutputTrackerMaster 中的 mapStatuses 數(shù)據(jù)結(jié)構(gòu)中以便以后 reducer shuffle 的時候查詢
  10. d,如果 driver 收到的 task 是該 stage 中的最后一個 task,那么可以 submit 下一個 stage,如果該 stage 已經(jīng)是最后一個 stage,那么告訴 dagScheduler job 已經(jīng)完成

當前文章:spark(二):spark架構(gòu)及物理執(zhí)行圖
分享鏈接:http://weahome.cn/article/jjjpcd.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部