真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

spark源碼系列之累加器實(shí)現(xiàn)機(jī)制及自定義累加器

一,基本概念

網(wǎng)站設(shè)計(jì)制作過(guò)程拒絕使用模板建站;使用PHP+MYSQL原生開(kāi)發(fā)可交付網(wǎng)站源代碼;符合網(wǎng)站優(yōu)化排名的后臺(tái)管理系統(tǒng);做網(wǎng)站、成都網(wǎng)站設(shè)計(jì)收費(fèi)合理;免費(fèi)進(jìn)行網(wǎng)站備案等企業(yè)網(wǎng)站建設(shè)一條龍服務(wù).我們是一家持續(xù)穩(wěn)定運(yùn)營(yíng)了十年的創(chuàng)新互聯(lián)建站網(wǎng)站建設(shè)公司。

累加器是Spark的一種變量,顧名思義該變量只能增加。有以下特點(diǎn):

1,累加器只能在Driver端構(gòu)建及并只能是Driver讀取結(jié)果,Task只能累加。

2,累加器不會(huì)改變Spark Lazy計(jì)算的特點(diǎn)。只會(huì)在Job觸發(fā)的時(shí)候進(jìn)行相關(guān)累加操作。

3,現(xiàn)有累加器的類(lèi)型。
相信有很多學(xué)習(xí)大數(shù)據(jù)的道友,在這里我給大家說(shuō)說(shuō)我滴群哦,大數(shù)據(jù)海量知識(shí)分享,784789432.在此我保證,絕對(duì)大數(shù)據(jù)的干貨,等待各位的到來(lái),我們一同從入門(mén)到精通吧!

二,累加器的使用

Driver端初始化,并在Action之后獲取值。

val accum = sc.accumulator(0, "test Accumulator")
accum.value

Executor端進(jìn)行計(jì)算

accum+=1;

三,累加器的重點(diǎn)類(lèi)

Class Accumulator extends Accumulable

主要是實(shí)現(xiàn)了累加器的初始化及封裝了相關(guān)的累加器操作方法。同時(shí)在類(lèi)對(duì)象構(gòu)建的時(shí)候向我們的Accumulators注冊(cè)了累加器。累加器的add操作的返回值類(lèi)型和我們傳入的值類(lèi)型可以不一樣。所以,我們一定要定義好如何累加和合并值。也即add方法

object Accumulators:

該方法在Driver端管理著我們的累加器,也包含了特定累加器的聚合操作。

trait AccumulatorParam[T] extends AccumulableParam[T, T]:

AccumulatorParam的addAccumulator操作的泛型封裝,具體的實(shí)現(xiàn)還是要再具體實(shí)現(xiàn)類(lèi)里面實(shí)現(xiàn)addInPlace方法。

object AccumulatorParam:

主要是進(jìn)行隱式類(lèi)型轉(zhuǎn)換的操作。

TaskContextImpl:

在Executor端管理著我們的累加器。

四,累加器的源碼解析

1,Driver端的初始化

val accum = sc.accumulator(0, "test Accumulator")

val acc = new Accumulator(initialValue, param, Some(name))

主要是在Accumulable(Accumulator)中調(diào)用了,這樣我們就可以使用Accumulator使用了。

Accumulators.register(this)

2,Executor端的反序列化得到我們對(duì)象的過(guò)程

首先,我們的value_ 可以看到其并不支持序列化

@volatile @transient private var value_ : R = initialValue // Current value on master

其初始化是在我們反序列化的時(shí)候做的,反序列化還完成了Accumulator向我們的TaskContextImpl的注冊(cè)

反序列化是在調(diào)用ResultTask的RunTask方法的時(shí)候做的

val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

過(guò)程中會(huì)調(diào)用

private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
value_ = zero
deserialized = true
// Automatically register the accumulator when it is deserialized with the task closure.
//
// Note internal accumulators sent with task are deserialized before the TaskContext is created
// and are registered in the TaskContext constructor. Other internal accumulators, such SQL
// metrics, still need to register here.
val taskContext = TaskContext.get()
if (taskContext != null) {
taskContext.registerAccumulator(this)
}
}

3,累加器的累加

accum+=1;

param.addAccumulator(value_, term)

根據(jù)不同的累加器參數(shù)有不同的實(shí)現(xiàn)AccumulableParam

如,int類(lèi)型。最終調(diào)用的AccumulatorParam特質(zhì)的addAccumulator方法。

trait AccumulatorParam[T] extends AccumulableParam[T, T] {
def addAccumulator(t1: T, t2: T): T = {
addInPlace(t1, t2)
}
}

然后,調(diào)用的是各個(gè)具體實(shí)現(xiàn)的addInPlace方法

implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
def zero(initialValue: Int): Int = 0
}

返回后更新了我們的Accumulators的value_的值。

4,Accumulator的各個(gè)節(jié)點(diǎn)累加的之后的聚合操作

在Task類(lèi)的run方法里面得到并返回的

(runTask(context), context.collectAccumulators())

最終在DAGScheduler里面調(diào)用了updateAccumulators(event)

在updateAccumulators方法中

Accumulators.add(event.accumUpdates)

具體內(nèi)容如下:

def add(values: Map[Long, Any]): Unit = synchronized {
for ((id, value) <- values) {
if (originals.contains(id)) {
// Since we are now storing weak references, we must check whether the underlying data
// is valid.
originals(id).get match {
case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
case None =>
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
}
} else {
logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
}
}
}

5,最后我們就可以獲取到累加器的值了

accum.value

五,累加器使用注意事項(xiàng)

累加器不會(huì)改變我們RDD的Lazy的特性,之后再Action之后完成計(jì)算和更新。

但是假如出現(xiàn)兩個(gè)Action公用一個(gè)轉(zhuǎn)化操作,如map,在map里面進(jìn)行累加器累加,那么每次action都會(huì)累加,造成某些我們不需要的結(jié)果。

六,自定義累加器

自定義累加器輸出

七,總結(jié)

主要牽涉點(diǎn)就是序列化及類(lèi)加載執(zhí)行,這是深入玩spark的必須.

文章題目:spark源碼系列之累加器實(shí)現(xiàn)機(jī)制及自定義累加器
標(biāo)題路徑:http://weahome.cn/article/ipeoig.html

其他資訊

在線咨詢(xún)

微信咨詢(xún)

電話(huà)咨詢(xún)

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部