Spark閉包中driver及executor程序代碼是怎樣執(zhí)行的,針對(duì)這個(gè)問(wèn)題,這篇文章詳細(xì)介紹了相對(duì)應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問(wèn)題的小伙伴找到更簡(jiǎn)單易行的方法。
侯馬ssl適用于網(wǎng)站、小程序/APP、API接口等需要進(jìn)行數(shù)據(jù)傳輸應(yīng)用場(chǎng)景,ssl證書(shū)未來(lái)市場(chǎng)廣闊!成為成都創(chuàng)新互聯(lián)公司的ssl證書(shū)銷(xiāo)售渠道,可以享受市場(chǎng)價(jià)格4-6折優(yōu)惠!如果有意向歡迎電話(huà)聯(lián)系或者加微信:13518219792(備注:SSL證書(shū)合作)期待與您的合作!
閉包的作用可以理解為:函數(shù)可以訪(fǎng)問(wèn)函數(shù)外部定義的變量,但是函數(shù)內(nèi)部對(duì)該變量進(jìn)行的修改,在函數(shù)外是不可見(jiàn)的,即對(duì)函數(shù)外源變量不會(huì)產(chǎn)生影響。
其實(shí),在學(xué)習(xí)Spark時(shí),一個(gè)比較難理解的點(diǎn)就是,在集群模式下,定義的變量和方法作用域的范圍和生命周期。這在你操作RDD時(shí),比如調(diào)用一些函數(shù)map、foreach時(shí),訪(fǎng)問(wèn)其外部變量進(jìn)行操作時(shí),很容易產(chǎn)生疑惑。為什么我本地程序運(yùn)行良好且結(jié)果正確,放到集群上卻得不到想要的結(jié)果呢?
首先通過(guò)下邊對(duì)RDD中的元素進(jìn)行求和的示例,來(lái)看相同的代碼本地模式和集群模式運(yùn)行結(jié)果的區(qū)別:Spark為了執(zhí)行
任務(wù),會(huì)將RDD的操作分解為多個(gè)task,并且這些task是由executor執(zhí)行的。
在執(zhí)行之前,Spark會(huì)計(jì)算task的閉包即定義的一些變量和方法,比如例子中的counter變量和foreach方法,并且閉包必須對(duì)executor而言是可見(jiàn)的,這些閉包會(huì)被序列化發(fā)送到每個(gè)executor。在集群
模式下,driver和executor運(yùn)行在不同的JVM進(jìn)程中,發(fā)送給每個(gè)executor的閉包中的變量是driver端變量的副本。
因此,當(dāng)foreach函數(shù)內(nèi)引用counter時(shí),其實(shí)處理的只是driver端變量的副本,與driver端本身的counter無(wú)關(guān)。
driver節(jié)點(diǎn)的內(nèi)存中仍有一個(gè)計(jì)數(shù)器,但該變量對(duì)executor是不可見(jiàn)的!
executor只能看到序列化閉包的副本。
因此,上述例子輸出的counter最終值仍然為零,因?yàn)閏ounter上的所有操作都只是引用了序列化閉包內(nèi)的值。在本地模式下,往往driver和executor運(yùn)行在同一JVM進(jìn)程中。那么這些閉包將會(huì)被共享,executor操作的counter和driver持有的counter是同一個(gè),那么counter在處理后最終值為6。但是在生產(chǎn)中,我們的任務(wù)都是在集群模式下運(yùn)行,如何能滿(mǎn)足這種業(yè)務(wù)場(chǎng)景呢?
這就必須引出一個(gè)后續(xù)要重點(diǎn)講解的概念:Accumulator即累加器。Spark中的累加器專(zhuān)門(mén)用于提供一種機(jī)制,用于在集群中的各個(gè)worker節(jié)點(diǎn)之間執(zhí)行時(shí)安全地更新變量。
一般來(lái)
說(shuō),closures - constructs比如循環(huán)或本地定義的方法,就不應(yīng)該被用來(lái)改變一些全局狀態(tài),Spark并沒(méi)有定義或保證對(duì)從閉包外引用的對(duì)象進(jìn)行更新的行為。
如果你這樣操作只會(huì)導(dǎo)致一些代碼在本地模式下能夠達(dá)到預(yù)期的效果,但是在分布式環(huán)境下卻事與愿違。
如果需要某些全局聚合,請(qǐng)改用累加器。
對(duì)于其他的業(yè)務(wù)場(chǎng)景,我們適時(shí)考慮引入外部存儲(chǔ)系統(tǒng)、廣播變量等。
閉包函數(shù)從產(chǎn)生到在executor執(zhí)行經(jīng)歷了什么?首先,對(duì)RDD相關(guān)的操作需要傳入閉包函數(shù),如果這個(gè)函數(shù)需要訪(fǎng)問(wèn)外部定義的變量,就需要滿(mǎn)足一定條件(比如必須可被序列化),否則會(huì)拋出運(yùn)行時(shí)異常。閉包函數(shù)在最終傳入到executor執(zhí)行,需要經(jīng)歷以下步驟:
1.driver通過(guò)反射,運(yùn)行時(shí)找到閉包訪(fǎng)問(wèn)的變量,并封裝成一個(gè)對(duì)象,然后序列化該對(duì)象
2.將序列化后的對(duì)象通過(guò)網(wǎng)絡(luò)傳輸?shù)絯orker節(jié)點(diǎn)
3.worker節(jié)點(diǎn)反序列化閉包對(duì)象
4.worker節(jié)點(diǎn)的executor執(zhí)行閉包函數(shù)
簡(jiǎn)而言之,就是要通過(guò)網(wǎng)絡(luò)傳遞函數(shù)、然后執(zhí)行,期間會(huì)經(jīng)歷序列化和反序列化,所以要求被傳遞的變量必須可以被序列化和反序列化,否則會(huì)拋類(lèi)似Error:Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects這樣的異常。即使是本地執(zhí)行時(shí),也會(huì)按照上述的步驟執(zhí)行,這也是為什么不允許在RDD內(nèi)部直接操作RDD的原因(SparkContext不支持序列化)。同時(shí),在這些算子閉包內(nèi)修改外部定義的變量不會(huì)被反饋到driver端。driver是運(yùn)行用戶(hù)編寫(xiě)Application 的main()函數(shù)的地方,具體負(fù)責(zé)DAG的構(gòu)建、任務(wù)的劃分、task的生成與調(diào)度等。job,stage,task生成都離不開(kāi)rdd自身,rdd的相關(guān)的操作不能缺少driver端的sparksession/sparkcontext。
executor是真正執(zhí)行task地方,而task執(zhí)行離不開(kāi)具體的數(shù)據(jù),這些task運(yùn)行的結(jié)果可以是shuffle中間結(jié)果,也可以持久化到外部存儲(chǔ)系統(tǒng)。一般都是將結(jié)果、狀態(tài)等匯集到driver。但是,目前executor之間不能互相通信,只能借助第三方來(lái)實(shí)現(xiàn)數(shù)據(jù)的共享或者通信。編寫(xiě)的Spark程序代碼,運(yùn)行在driver端還是executor端呢?先看個(gè)簡(jiǎn)單例子:通常我們?cè)诒镜販y(cè)試程序的時(shí)候,要打印RDD中的數(shù)據(jù)。在本地模式下,直接使用rdd.foreach(println)或rdd.map(println)在單臺(tái)機(jī)器上,能夠按照預(yù)期打印并輸出所有RDD的元素。但是,在集群模式下,由executor執(zhí)行輸出寫(xiě)入的是executor的stdout,而不是driver上的stdout,所以driver的stdout不會(huì)顯示這些!要想在driver端打印所有元素,可以使用collect()方法先將RDD數(shù)據(jù)帶到driver節(jié)點(diǎn),然后在調(diào)用foreach(println)(但需要注意一點(diǎn),由于會(huì)把RDD中所有元素都加載到driver端,可能引起driver端內(nèi)存不足導(dǎo)致OOM。如果你只是想獲取RDD中的部分元素,可以考慮使用take或者top方法)總之,在這里RDD中的元素即為具體的數(shù)據(jù),對(duì)這些數(shù)據(jù)的操作都是由負(fù)責(zé)task執(zhí)行的executor處理的,所以想在driver端輸出這些數(shù)據(jù)就必須先將數(shù)據(jù)加載到driver端進(jìn)行處理。最后做個(gè)總結(jié):所有對(duì)RDD具體數(shù)據(jù)的操作都是在executor上執(zhí)行的,所有對(duì)rdd自身的操作都是在driver上執(zhí)行的。比如foreach、foreachPartition都是針對(duì)rdd內(nèi)部數(shù)據(jù)進(jìn)行處理的,所以我們傳遞給這些算子的函數(shù)都是執(zhí)行于executor端的。但是像foreachRDD、transform則是對(duì)RDD本身進(jìn)行一列操作,所以它的參數(shù)函數(shù)是執(zhí)行在driver端的,那么它內(nèi)部是可以使用外部變量,比如在SparkStreaming程序中操作offset、動(dòng)態(tài)更新廣播變量等。
關(guān)于Spark閉包中driver及executor程序代碼是怎樣執(zhí)行的問(wèn)題的解答就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,如果你還有很多疑惑沒(méi)有解開(kāi),可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識(shí)。
文章題目:Spark閉包中driver及executor程序代碼是怎樣執(zhí)行的
網(wǎng)站網(wǎng)址:
http://weahome.cn/article/ggchhg.html