一,基本概念
創(chuàng)新互聯(lián)主要從事成都網(wǎng)站制作、網(wǎng)站建設(shè)、外貿(mào)網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)加查,十多年網(wǎng)站建設(shè)經(jīng)驗,價格優(yōu)惠、服務(wù)專業(yè),歡迎來電咨詢建站服務(wù):18982081108累加器是Spark的一種變量,顧名思義該變量只能增加。有以下特點:
1,累加器只能在Driver端構(gòu)建及并只能是Driver讀取結(jié)果,Task只能累加。
2,累加器不會改變Spark Lazy計算的特點。只會在Job觸發(fā)的時候進行相關(guān)累加操作。
3,現(xiàn)有累加器的類型。
相信有很多學(xué)習(xí)大數(shù)據(jù)的道友,在這里我給大家說說我滴群哦,大數(shù)據(jù)海量知識分享,784789432.在此我保證,絕對大數(shù)據(jù)的干貨,等待各位的到來,我們一同從入門到精通吧!
二,累加器的使用
Driver端初始化,并在Action之后獲取值。
val accum = sc.accumulator(0, "test Accumulator")
accum.value
Executor端進行計算
accum+=1;
三,累加器的重點類
Class Accumulator extends Accumulable
主要是實現(xiàn)了累加器的初始化及封裝了相關(guān)的累加器操作方法。同時在類對象構(gòu)建的時候向我們的Accumulators注冊了累加器。累加器的add操作的返回值類型和我們傳入的值類型可以不一樣。所以,我們一定要定義好如何累加和合并值。也即add方法
object Accumulators:
該方法在Driver端管理著我們的累加器,也包含了特定累加器的聚合操作。
trait AccumulatorParam[T] extends AccumulableParam[T, T]:
AccumulatorParam的addAccumulator操作的泛型封裝,具體的實現(xiàn)還是要再具體實現(xiàn)類里面實現(xiàn)addInPlace方法。
object AccumulatorParam:
主要是進行隱式類型轉(zhuǎn)換的操作。
TaskContextImpl:
在Executor端管理著我們的累加器。
四,累加器的源碼解析
1,Driver端的初始化
val accum = sc.accumulator(0, "test Accumulator")
val acc = new Accumulator(initialValue, param, Some(name))
主要是在Accumulable(Accumulator)中調(diào)用了,這樣我們就可以使用Accumulator使用了。
Accumulators.register(this)
2,Executor端的反序列化得到我們對象的過程
首先,我們的value_ 可以看到其并不支持序列化
@volatile @transient private var value_ : R = initialValue // Current value on master
其初始化是在我們反序列化的時候做的,反序列化還完成了Accumulator向我們的TaskContextImpl的注冊
反序列化是在調(diào)用ResultTask的RunTask方法的時候做的
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
過程中會調(diào)用
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
value_ = zero
deserialized = true
// Automatically register the accumulator when it is deserialized with the task closure.
//
// Note internal accumulators sent with task are deserialized before the TaskContext is created
// and are registered in the TaskContext constructor. Other internal accumulators, such SQL
// metrics, still need to register here.
val taskContext = TaskContext.get()
if (taskContext != null) {
taskContext.registerAccumulator(this)
}
}
3,累加器的累加
accum+=1;
param.addAccumulator(value_, term)
根據(jù)不同的累加器參數(shù)有不同的實現(xiàn)AccumulableParam
如,int類型。最終調(diào)用的AccumulatorParam特質(zhì)的addAccumulator方法。
trait AccumulatorParam[T] extends AccumulableParam[T, T] {
def addAccumulator(t1: T, t2: T): T = {
addInPlace(t1, t2)
}
}
然后,調(diào)用的是各個具體實現(xiàn)的addInPlace方法
implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
def zero(initialValue: Int): Int = 0
}
返回后更新了我們的Accumulators的value_的值。
4,Accumulator的各個節(jié)點累加的之后的聚合操作
在Task類的run方法里面得到并返回的
(runTask(context), context.collectAccumulators())
最終在DAGScheduler里面調(diào)用了updateAccumulators(event)
在updateAccumulators方法中
Accumulators.add(event.accumUpdates)
具體內(nèi)容如下:
def add(values: Map[Long, Any]): Unit = synchronized {
for ((id, value) <- values) {
if (originals.contains(id)) {
// Since we are now storing weak references, we must check whether the underlying data
// is valid.
originals(id).get match {
case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
case None =>
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
}
} else {
logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
}
}
}
5,最后我們就可以獲取到累加器的值了
accum.value
五,累加器使用注意事項
累加器不會改變我們RDD的Lazy的特性,之后再Action之后完成計算和更新。
但是假如出現(xiàn)兩個Action公用一個轉(zhuǎn)化操作,如map,在map里面進行累加器累加,那么每次action都會累加,造成某些我們不需要的結(jié)果。
六,自定義累加器
自定義累加器輸出
七,總結(jié)
主要牽涉點就是序列化及類加載執(zhí)行,這是深入玩spark的必須.
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機、免備案服務(wù)器”等云主機租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。