Spark的閉包清理機(jī)制怎么理解,相信很多沒(méi)有經(jīng)驗(yàn)的人對(duì)此束手無(wú)策,為此本文總結(jié)了問(wèn)題出現(xiàn)的原因和解決方法,通過(guò)這篇文章希望你能解決這個(gè)問(wèn)題。
成都創(chuàng)新互聯(lián)2013年至今,先為興山等服務(wù)建站,興山等地企業(yè),進(jìn)行企業(yè)商務(wù)咨詢服務(wù)。為興山企業(yè)網(wǎng)站制作PC+手機(jī)+微官網(wǎng)三網(wǎng)同步一站式服務(wù)解決您的所有建站問(wèn)題。
關(guān)于Spark任務(wù)運(yùn)行時(shí)發(fā)生不可序列話的問(wèn)題。今天就統(tǒng)一講解一下這塊的內(nèi)容。
首先,要先讀懂scala的閉包是怎么回事兒。
簡(jiǎn)單理解scala的閉包
接著就是要理解Spark 算子閉包生成及我們編寫的閉包執(zhí)行的原理。接下來(lái)我們就拿map和mapPartition兩個(gè)算子來(lái)開(kāi)啟本文講解:
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
對(duì)于任務(wù)劃分,調(diào)度,執(zhí)行,結(jié)果返回的原理浪尖就不在這里擴(kuò)展了,浪尖在知識(shí)星球里分享過(guò)一套Spark 源碼的視頻,可以參考閱讀。
map和mapPartitions的區(qū)別面試常考的,對(duì)于兩者的區(qū)別從源碼里看很明顯,一個(gè)是f被迭代器迭代調(diào)用,一個(gè)是f的參數(shù)是迭代器。浪尖很早以前發(fā)過(guò)一篇文章,徹底講解過(guò)foreach和foreachPartition的區(qū)別??梢詤⒖祭斫?/p>
Spark源碼系列之foreach和foreachPartition的區(qū)別
回到正題,之所以會(huì)發(fā)生不可序列化的錯(cuò)誤,主要原因是傳遞給map的f函數(shù)不是在driver端執(zhí)行的,所以會(huì)被序列化傳輸?shù)絜xecutor節(jié)點(diǎn),然后在executor節(jié)點(diǎn)反序列化然后執(zhí)行。假如f函數(shù)里引用了map外部不可序列化的對(duì)象就會(huì)報(bào)不可序列化的異常。
但是,很多時(shí)候我們并沒(méi)有直接去在閉包里使用不可序列化的對(duì)象,這個(gè)時(shí)候報(bào)異常就有點(diǎn)不合適了。比如下面的例子:
* class SomethingNotSerializable {
* def someValue = 1
* def scope(name: String)(body: => Unit) = body
* def someMethod(): Unit = scope("one") {
* def x = someValue
* def y = 2
* scope("two") { println(y + 1) }
* }
* }
此示例中,scope(two) 不可序列化,因?yàn)樗昧藄cope(one)(通過(guò)y),而scope(one)引用了SomethingNotSerializable(通過(guò)someValue)。但是,其實(shí)scope(two)并不直接依賴于SomethingNotSerializable。假如這種情況下拋出不可序列化異常就不科學(xué)了,所以Spark會(huì)對(duì)閉包進(jìn)行一些清理操作,也即是本文中所要講的。
主要工具類是ClosureCleaner。該工具的主要作用是遍歷閉包的層次結(jié)構(gòu),并且將沒(méi)有被閉包實(shí)際引用的鏈路設(shè)置為null,但是仍然包含在已經(jīng)編譯的匿名類中。請(qǐng)注意直接修改封閉中的閉包是不安全的,因?yàn)榭赡苡衅渌a路徑會(huì)依賴于他們。所以,我們會(huì)克隆封閉中的閉包并且相應(yīng)地設(shè)置父指針。
默認(rèn)情況下,可以傳遞清除閉包。這就意味著,我們需要檢測(cè)封閉對(duì)象是否由起始對(duì)象實(shí)際引用,(要么直接引用要么間接引用),如果沒(méi)有被實(shí)際使用則從層次結(jié)構(gòu)中切斷這些閉包。換句話說(shuō),除了清空無(wú)用字段的引用之外,也會(huì)將沒(méi)有被起始閉包引用的引用封閉對(duì)象的父指針清空。傳遞性的確定是通過(guò)遍歷閉包所調(diào)用的
再回到前面的例子,scope(two) 不可序列化,因?yàn)樗昧藄cope(one)(通過(guò)y),而scope(one)引用了SomethingNotSerializable(通過(guò)someValue)。但是,其實(shí)scope(two)并不直接依賴于SomethingNotSerializable。這就意味著我們可以安全的將其副本scope(one)的父指針清空,同時(shí)將其設(shè)置為scope(two)的父級(jí),這樣scope(two)就不再需要間接傳遞引用SomethingNotSerializable了。
解決方法
實(shí)現(xiàn)序列化是最直接的,假如不能的話。那就讀下面的話:
那么為了不實(shí)現(xiàn)序列化還能盡量避免不可序列化錯(cuò)誤,就不要在map等算子里引用外部變量,而是直接在算子中實(shí)例化,假如每次實(shí)例化代價(jià)高,那就使用mapPartitions。
看完上述內(nèi)容,你們掌握Spark的閉包清理機(jī)制怎么理解的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!