真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

spark常用算子有哪些-創(chuàng)新互聯(lián)

這篇文章將為大家詳細(xì)講解有關(guān)spark常用算子有哪些,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

成都創(chuàng)新互聯(lián)公司主營右江網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,app軟件定制開發(fā),右江h(huán)5小程序開發(fā)搭建,右江網(wǎng)站營銷推廣歡迎右江等地區(qū)企業(yè)咨詢
一些經(jīng)常用到的RDD算子
map:將rdd的值輸入,并返回一個自定義的類型,如下輸入原始類型,輸出一個tuple類型的數(shù)組
scala> val rdd1 = sc.parallelize(List("a","b","c","d"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at :24
scala> rdd1.map((_,1)).collect
res1: Array[(String, Int)] = Array((a,1), (b,1), (c,1), (d,1))                  
-----------------------------------------------------------------------------------------------------------------
mapPartitionsWithIndex:輸出數(shù)據(jù)對應(yīng)的分區(qū)以及分區(qū)的值
scala> val rdd1 = sc.parallelize(List("a","b","c","d"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at :24
scala>     val func = (xpar:Int,y:Iterator[String])=>{
     |       y.toList.map(x=>"partition:"+xpar+" value:"+x).iterator
     |     }
func: (Int, Iterator[String]) => Iterator[String] = 
scala> rdd1.mapPartitionsWithIndex(func).collect
res2: Array[String] = Array(partition:0 value:a, partition:0 value:b, partition:1 value:c, partition:1 value:d)
----------------------------------------------------------------------------------------------------------------------
aggregate(zeroValue)(seqOp, combOp):對rdd的數(shù)據(jù)先按照分區(qū)匯總?cè)缓髮⒎謪^(qū)的數(shù)據(jù)在匯總(迭代匯總,seqOp或者combOp的值會和下一個值進(jìn)行比較)
scala> val rdd1 = sc.parallelize(List("a","b","c","d"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at :24
scala> rdd1.aggregate("")(_+_,_+_)
res3: String = abcd
-----------------------------------------------------------------------------------------------------------------------
aggregateByKey:適用于那種鍵值對類型的RDD,會根據(jù)key進(jìn)行對value的操作,類似aggregate
scala> val rdd = sc.parallelize(List((1,1),(1,2),(2,2),(2,3)), 2)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[12] at parallelize at :24
scala> rdd.aggregateByKey(0)((x,y)=>x+y, (x,y)=>(x+y)).collect
res36: Array[(Int, Int)] = Array((2,5), (1,3))
-------------------------------------------------------------------------------------------------------------------------
coalesce, repartition:repartition與coalesce相似,只不過repartition內(nèi)部調(diào)用了coalesce,coalesce傳入的參數(shù)比repartition傳入的參數(shù)多一個,repartition有該參數(shù)的默認(rèn)值,即:是否進(jìn)行shuffule
scala> val rdd = sc.parallelize(List(1,2,3,4,5), 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at :24
scala> rdd.repartition(3)
res42: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[28] at repartition at :27
scala> res42.partitions.length
res43: Int = 3
-----------------------------------------------------------------------------------------------------------------------
collectAsMap:將結(jié)果一map方式展示
scala> val rdd = sc.parallelize(List(("a",2),("b",10),("x",22)), 2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[29] at parallelize at :24
scala> rdd.collectAsMap
res44: scala.collection.Map[String,Int] = Map(b -> 10, a -> 2, x -> 22)
-----------------------------------------------------------------------------------------------------------------------
combineByKey : 和reduceByKey是相同的效果。需要三個參數(shù) 第一個每個key對應(yīng)的value 第二個,局部的value操作, 第三個:全局value操作
scala> val rdd = sc.parallelize(List(("a",2),("b",10),("x",22),("a",200),("x",89)), 2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at parallelize at :24
scala> rdd.combineByKey(x=>x, (a:Int,b:Int)=>a+b, (a:Int,b:Int)=>a+b)
res45: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[31] at combineByKey at :27
scala> res45.collect
res46: Array[(String, Int)] = Array((x,111), (b,10), (a,202))
---------------------------------------------------------------------------------------------------------------------------
countByKey:通過Key統(tǒng)計條數(shù)
scala> val rdd = sc.parallelize(List(("a",2),("b",10),("x",22),("a",200),("x",89)), 2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[33] at parallelize at :24
scala> rdd.countByKey
res49: scala.collection.Map[String,Long] = Map(x -> 2, b -> 1, a -> 2)
------------------------------------------------------------------------------------------------------------------------
filterByRange:返回符合過濾返回的數(shù)據(jù)
scala> val rdd = sc.parallelize(List(("a",2),("b",10),("x",22),("a",200),("x",89)), 2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[36] at parallelize at :24
scala> rdd.filterByRange("a","b")
res51: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[37] at filterByRange at :27
scala> res51.collect
res52: Array[(String, Int)] = Array((a,2), (b,10), (a,200))
------------------------------------------------------------------------------------------------------------
flatMapValues
scala>  val rdd = sc.parallelize(List(("a"->"1 2 3 "),("b"->"1 2 3 "),("x"->"1 2 3 "),("a"->"1 2 3 "),("x"->"1 2 3 ")), 2)
rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[39] at parallelize at :24
scala>  rdd.flatMapValues(x=>x.split(" ")).collect
res53: Array[(String, String)] = Array((a,1), (a,2), (a,3), (b,1), (b,2), (b,3), (x,1), (x,2), (x,3), (a,1), (a,2), (a,3), (x,1), (x,2), (x,3))
----------------------------------------------------------------------------------------------------------------
foldByKey:通過key聚集數(shù)據(jù)然后做操作
scala> val rdd = sc.parallelize(List(("a",2),("b",10),("x",22),("a",200),("x",89)), 2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[41] at parallelize at :24
scala> rdd.foldByKey(0)(_+_).collect
res55: Array[(String, Int)] = Array((x,111), (b,10), (a,202))
----------------------------------------------------------------------------------------------------------------
keyBy : 以傳入的參數(shù)做key
scala> val rdd1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[43] at parallelize at :24
scala> val rdd2 = rdd1.keyBy(_.length).collect
rdd2: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))
----------------------------------------------------------------------------------------------------------------
keys values
scala> val rdd1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[45] at parallelize at :24
scala> val rdd2 = rdd1.map(x=>(x.length,x))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[47] at map at :26
scala> rdd2.keys
res63: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[48] at keys at :29
scala> rdd2.keys.collect
res64: Array[Int] = Array(3, 6, 6, 3, 8)
scala> rdd2.values.collect
res65: Array[String] = Array(dog, salmon, salmon, rat, elephant)

關(guān)于spark常用算子有哪些就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。


分享文章:spark常用算子有哪些-創(chuàng)新互聯(lián)
標(biāo)題鏈接:http://weahome.cn/article/phspi.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部