spark任務(wù)運行的源碼分析
在整個spark任務(wù)的編寫、提交、執(zhí)行分三個部分:
① 編寫程序和提交任務(wù)到集群中
②sparkContext的初始化
③觸發(fā)action算子中的runJob方法,執(zhí)行任務(wù)
目前創(chuàng)新互聯(lián)建站已為成百上千的企業(yè)提供了網(wǎng)站建設(shè)、域名、雅安服務(wù)器托管、網(wǎng)站托管、企業(yè)網(wǎng)站設(shè)計、義馬網(wǎng)站維護等服務(wù),公司將堅持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長,共同發(fā)展。
(1)編程程序并提交到集群:
①編程spark程序的代碼
②打成jar包到集群中運行
③使用spark-submit命令提交任務(wù)
在提交任務(wù)時,需要指定 --class 程序的入口(有main方法的類),
1) spark-submit --class xxx
2) ${SPARK_HOME}/bin/spark-class org.apache.spark.deploy.SparkSubmit $@
3) org.apache.spark.launcher.Main
submit(appArgs, uninitLog)
doRunMain()
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
childMainClass:…./.WordCount (自己編寫的代碼的主類)
mainClass = Utils.classForName(childMainClass)
val app: SparkApplication = if() {} else {new JavaMainApplication(mainClass)}
app.start(childArgs.toArray, sparkConf) // 通過反射調(diào)用mainClass執(zhí)行
// 到此為止,相當(dāng)于調(diào)用了我們的自己編寫的任務(wù)類的main方法執(zhí)行了。?。。?br/>val mainMethod = klass.getMethod("main", new ArrayString.getClass)
mainMethod.invoke(null, args)
④開始執(zhí)行自己編寫的代碼
(2)初始化sparkContext:
當(dāng)自己編寫的程序運行到:new SparkContext()時,就開始了精妙而細致的sparkContext的初始化。
sparkContext的相關(guān)介紹:sparkContext是用戶通往spark集群的唯一入口,可以用來在spark集群中創(chuàng)建RDD、累加器和廣播變量。sparkContext也是整個spark應(yīng)用程序的一個至關(guān)重要的對象,是整個應(yīng)用程序運行調(diào)度的核心(不是資源調(diào)度的核心)。在初始化sparkContext時,同時的會初始化DAGScheduler、TaskScheduler和SchedulerBackend,這些至關(guān)重要的對象。
sparkContext的構(gòu)建過程:
1)Driver端執(zhí)行的代碼:
初始化 TaskScheduler
初始化 SchedulerBackend
初始化 DAGScheduler
2)worker和master端執(zhí)行的代碼:
driver向master注冊申請資源。
Worker負責(zé)啟動executor。
(3)觸發(fā)action算子中的runJob方法:
spark任務(wù)運行總結(jié):
- 將編寫的程序打成jar包
- 調(diào)用spark-submit提交任務(wù)到集群上運行
- 運行sparkSubmit 的main方法,在這個方法中通過反射的方式創(chuàng)建我們編寫的主類的實例對象,然后調(diào)用該對象的main方法,開始執(zhí)行我們編寫的代碼
- 當(dāng)代碼運行到new SparkContext對象的的時候,就開始了復(fù)雜和精致的sparkContext對象的初始化
- 在初始化SparkContext對象的時候,會創(chuàng)建兩個特別重要的對象,分別是:DAGScheduler 和 TaskScheduler,其中【DAGScheduler 的作用】將RDD的依賴切成一個一個的stage,然后stage作為taskSet提交給Taskscheduler。
- 在構(gòu)建TaskScheduler的同時,會創(chuàng)建兩個非常重要的對象,分別是 DriverActor 和 ClientActor,DriverActor負責(zé)接收executor的反向注冊,將任務(wù)提交給executor運行,clientActor是負責(zé)向master注冊并提交任務(wù)
- 當(dāng)clientActor啟動時,會將用戶提交的任務(wù)相關(guān)的參數(shù)分裝到applicationDescription對象中去,然后提交給master進行任務(wù)注冊
- 當(dāng)master接收到clientActor提交的任務(wù)請求時,會將請求的參數(shù)進行分析,并封裝成application,然后將其持久化,然后將其加入到任務(wù)隊列waitingApps中。
- 當(dāng)輪到我們提交任務(wù)的時候,就開始執(zhí)行schedule(),進行任務(wù)資源的調(diào)度
- worker接收到master發(fā)送來的launchExecutor 時,會將其解壓并封裝到ExecutorRunner中,然后調(diào)用這個對象的start方法,啟動executor
- executor啟動后會向driver反向注冊
- driver會發(fā)送注冊成功信息,給executor
- executor接收到driver actor注冊成功信息后,就會創(chuàng)建一個線程池,用于執(zhí)行driveractor發(fā)送過來的任務(wù)
- 當(dāng)屬于這個任務(wù)的所有的 Executor 啟動并反向注冊成功后,就意味著運行這個任務(wù)的 環(huán)境已經(jīng)準(zhǔn)備好了,driver 會結(jié)束 SparkContext 對象的初始化,也就意味著 new SparkContext 這句代碼運行完成
- 當(dāng)sparkContext初始化完成之后,就會繼續(xù)運行我們的代碼,直到運行到action算子時,也就意味著觸發(fā)了一個job的提交
- driver 會將這個 job 提交給 DAGScheduler
- DAGScheduler將接收到的job,從最后一個算子開始推導(dǎo),將DAG根據(jù)依賴關(guān)系劃分成為一個個stage,然后將stage封裝成一個taskSet,并將taskSet中的task提交給taskScheduler
- taskScheduler接收到DAGScheduler發(fā)送過來的task,會拿到一個序列化器,對task進行序列化,然后將序列化好的task封裝到launchTask中,然后將launchTask發(fā)送給指定的executor中運行
- executor接收到了DriverActor 發(fā)送過來的launchTask 時,會拿到一個反序列化器,對launchTask 進行反序列化,封裝到一個TaskRunner 中,然后從executor這個線程池中獲取一個線程,將反序列化好的任務(wù)中的算子作用在RDD對應(yīng)的分區(qū)上。
- 最終當(dāng)所有的task任務(wù)完成之后,整個application執(zhí)行完成,關(guān)閉sparkContext對象。
本文題目:spark任務(wù)運行過程的源碼分析
URL分享:
http://weahome.cn/article/jpdjge.html