val acc = sc.accumulator(0, “Error Accumulator”)
val data = sc.parallelize(1 to 10)
val newData = data.map(x => {
if (x % 2 == 0) {
accum += 1
}
})
newData.count
acc.value
newData.foreach(println)
acc.value
上述現(xiàn)象,會造成acc.value的最終值變?yōu)?0
站在用戶的角度思考問題,與客戶深入溝通,找到普寧網(wǎng)站設(shè)計與普寧網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗(yàn),讓設(shè)計與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個性化、用戶體驗(yàn)好的作品,建站類型包括:網(wǎng)站設(shè)計制作、網(wǎng)站制作、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣、空間域名、虛擬空間、企業(yè)郵箱。業(yè)務(wù)覆蓋普寧地區(qū)。
Spark中的一系列transform操作都會構(gòu)造成一長串的任務(wù)鏈,此時就需要通過一個action操作來觸發(fā)(lazy的特性),accumulator也是如此。
原因就在于第二次action操作的時候,又執(zhí)行了一次累加器的操作,同個累加器,在原有的基礎(chǔ)上又加了5,從而變成了10
通過上述的現(xiàn)象描述,我們可以很快知道解決的方法:只進(jìn)行一次action操作?;诖耍覀冎灰袛嗳蝿?wù)之間的依賴關(guān)系就可以了,即使用cache、persist。這樣操作之后,那么后續(xù)的累加器操作就不會受前面的transform操作影響了
需求
使用Accumulators統(tǒng)計emp表中NULL出現(xiàn)的次數(shù)以及正常數(shù)據(jù)的條數(shù) & 打印正常數(shù)據(jù)的信息
數(shù)據(jù)
7369 SMITH CLERK 7902 1980-12-17 800.00 20
7499 ALLEN SALESMAN 7698 1981-2-20 1600.00 300.00 30
7521 WARD SALESMAN 7698 1981-2-22 1250.00 500.00 30
7566 JONES MANAGER 7839 1981-4-2 2975.00 20
7654 MARTIN SALESMAN 7698 1981-9-28 1250.00 1400.00 30
7698 BLAKE MANAGER 7839 1981-5-1 2850.00 30
7782 CLARK MANAGER 7839 1981-6-9 2450.00 10
7788 SCOTT ANALYST 7566 1987-4-19 3000.00 20
7839 KING PRESIDENT 1981-11-17 5000.00 10
7844 TURNER SALESMAN 7698 1981-9-8 1500.00 0.00 30
7876 ADAMS CLERK 7788 1987-5-23 1100.00 20
7900 JAMES CLERK 7698 1981-12-3 950.00 30
7902 FORD ANALYST 7566 1981-12-3 3000.00 20
7934 MILLER CLERK 7782 1982-1-23 1300.00 10
遇到的坑 & 解決方法
現(xiàn)象描述 & 原因分析:
我們都知道,spark中的一系列transform操作會構(gòu)成一串長的任務(wù)鏈,此時就需要通過一個action操作來觸發(fā); accumulator也是一樣的,只有當(dāng)action操作執(zhí)行時,才會觸發(fā)accumulator的執(zhí)行; 因此在一個action操作之前,我們調(diào)用accumulator的value方法是無法查看其數(shù)值的,肯定是沒有任何變化的; 所以在對normalData進(jìn)行foreach操作之后,即action操作之后,我們會發(fā)現(xiàn)累加器的數(shù)值就變成了11; 之后,我們對normalData再進(jìn)行一次count操作之后,即又一次的action操作之后,其實(shí)這時候,又去執(zhí)行了一次前面的transform操作; 因此累加器的值又增加了11,變成了22
解決辦法:
經(jīng)過上面的分析,我們可以知道,使用累加器的時候,我們只有使用一次action操作才能夠保證結(jié)果的準(zhǔn)確性 因此,我們面對這種情況,是有辦法的,做法就是切斷它們相互之間的依賴關(guān)系即可 因此對normalData使用cache方法,當(dāng)RDD第一次被計算出來時,就會被直接緩存起來 再調(diào)用時,相同的計算操作就不會再重新計算一遍
import org.apache.spark.{SparkConf, SparkContext}
/**
* 使用Spark Accumulators完成Job的數(shù)據(jù)量處理
* 統(tǒng)計emp表中NULL出現(xiàn)的次數(shù)以及正常數(shù)據(jù)的條數(shù) & 打印正常數(shù)據(jù)的信息
*/
object AccumulatorsApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("AccumulatorsApp")
val sc = new SparkContext(conf)
val lines = sc.textFile("E:/emp.txt")
// long類型的累加器值
val nullNum = sc.longAccumulator("NullNumber")
val normalData = lines.filter(line => {
var flag = true
val splitLines = line.split("\t")
for (splitLine <- splitLines){
if ("".equals(splitLine)){
flag = false
nullNum.add(1)
}
}
flag
})
// 使用cache方法,將RDD的第一次計算結(jié)果進(jìn)行緩存;防止后面RDD進(jìn)行重復(fù)計算,導(dǎo)致累加器的值不準(zhǔn)確
normalData.cache()
// 打印每一條正常數(shù)據(jù)
normalData.foreach(println)
// 打印正常數(shù)據(jù)的條數(shù)
println("NORMAL DATA NUMBER: " + normalData.count())
// 打印emp表中NULL出現(xiàn)的次數(shù)
println("NULL: " + nullNum.value)
sc.stop()
}
}