今天就跟大家聊聊有關(guān)Spark操作中的之a(chǎn)ggregate、aggregateByKey怎么理解,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
在安鄉(xiāng)等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供做網(wǎng)站、網(wǎng)站設(shè)計 網(wǎng)站設(shè)計制作按需設(shè)計網(wǎng)站,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),成都品牌網(wǎng)站建設(shè),成都全網(wǎng)營銷推廣,外貿(mào)營銷網(wǎng)站建設(shè),安鄉(xiāng)網(wǎng)站建設(shè)費用合理。
1. aggregate函數(shù)
將每個分區(qū)里面的元素進(jìn)行聚合,然后用combine函數(shù)將每個分區(qū)的結(jié)果和初始值(zeroValue)進(jìn)行combine操作。這個函數(shù)最終返回的類型不需要和RDD中元素類型一致。
seqOp操作會聚合各分區(qū)中的元素,然后combOp操作把所有分區(qū)的聚合結(jié)果再次聚合,兩個操作的初始值都是zeroValue. seqOp的操作是遍歷分區(qū)中的所有元素(T),第一個T跟zeroValue做操作,結(jié)果再作為與第二個T做操作的zeroValue,直到遍歷完整個分區(qū)。combOp操作是把各分區(qū)聚合的結(jié)果,再聚合。aggregate函數(shù)返回一個跟RDD不同類型的值。因此,需要一個操作seqOp來把分區(qū)中的元素T合并成一個U,另外一個操作combOp把所有U聚合。
例子程序:
scala> val rdd = List(1,2,3,4,5,6,7,8,9)rdd: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9)scala> rdd.par.aggregate((0,0))((acc,number) => (acc._1 + number, acc._2 + 1),(par1,par2) => (par1._1 + par2._1, par1._2 + par2._2))res0: (Int, Int) = (45,9)scala> res0._1 / res0._2res1: Int = 5
過程大概這樣:
首先,初始值是(0,0),這個值在后面2步會用到。然后,(acc,number) => (acc._1 + number, acc._2 + 1),number即是函數(shù)定義中的T,這里即是List中的元素。所以acc._1 + number,acc._2 + 1的過程如下。
1. 0+1, 0+12. 1+2, 1+13. 3+3, 2+14. 6+4, 3+15. 10+5, 4+16. 15+6, 5+17. 21+7, 6+18. 28+8, 7+19. 36+9, 8+1
結(jié)果即是(45,9)。這里演示的是單線程計算過程,實際Spark執(zhí)行中是分布式計算,可能會把List分成多個分區(qū),假如3個,p1(1,2,3,4),p2(5,6,7,8),p3(9),經(jīng)過計算各分區(qū)的的結(jié)果(10,4),(26,4),(9,1),這樣,執(zhí)行(par1,par2) =>(par1._1 + par2._1, par1._2 + par2._2)就是(10+26+9,4+4+1)即(45,9),再求平均值就簡單了。
2. aggregateByKey函數(shù):
對PairRDD中相同的Key值進(jìn)行聚合操作,在聚合過程中同樣使用了一個中立的初始值。和aggregate函數(shù)類似,aggregateByKey返回值的類型不需要和RDD中value的類型一致。因為aggregateByKey是對相同Key中的值進(jìn)行聚合操作,所以aggregateByKey'函數(shù)最終返回的類型還是PairRDD,對應(yīng)的結(jié)果是Key和聚合后的值,而aggregate函數(shù)直接返回的是非RDD的結(jié)果。
例子程序:
import org.apache.spark.SparkConfimport org.apache.spark.SparkContextobject AggregateByKeyOp { def main(args:Array[String]){ val sparkConf: SparkConf = new SparkConf().setAppName("AggregateByKey").setMaster("local") val sc: SparkContext = new SparkContext(sparkConf) val data=List((1,3),(1,2),(1,4),(2,3)) val rdd=sc.parallelize(data, 2)
//合并不同partition中的值,a,b得數(shù)據(jù)類型為zeroValue的數(shù)據(jù)類型 def combOp(a:String,b:String):String={ println("combOp: "+a+"\t"+b) a+b }
//合并在同一個partition中的值,a的數(shù)據(jù)類型為zeroValue的數(shù)據(jù)類型,b的數(shù)據(jù)類型為原value的數(shù)據(jù)類型 def seqOp(a:String,b:Int):String={ println("SeqOp:"+a+"\t"+b) a+b } rdd.foreach(println)
//zeroValue:中立值,定義返回value的類型,并參與運算
//seqOp:用來在同一個partition中合并值
//combOp:用來在不同partiton中合并值
val aggregateByKeyRDD=rdd.aggregateByKey("100")(seqOp, combOp) sc.stop() }}
運行結(jié)果:
將數(shù)據(jù)拆分成兩個分區(qū)
//分區(qū)一數(shù)據(jù)(1,3)(1,2)//分區(qū)二數(shù)據(jù)(1,4)(2,3)
//分區(qū)一相同key的數(shù)據(jù)進(jìn)行合并seq: 100 3
//(1,3)開始和中立值進(jìn)行合并 合并結(jié)果為 1003seq: 1003 2 //(1,2)再次合并 結(jié)果為 10032
//分區(qū)二相同key的數(shù)據(jù)進(jìn)行合并seq: 100 4
//(1,4) 開始和中立值進(jìn)行合并 1004seq: 100 3 //(2,3) 開始和中立值進(jìn)行合并 1003
將兩個分區(qū)的結(jié)果進(jìn)行合并/
/key為2的,只在一個分區(qū)存在,不需要合并 (2,1003)(2,1003)
//key為1的, 在兩個分區(qū)存在,并且數(shù)據(jù)類型一致,合并comb: 10032 1004(1,100321004)
看完上述內(nèi)容,你們對Spark操作中的之a(chǎn)ggregate、aggregateByKey怎么理解有進(jìn)一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。