真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

好程序員大數(shù)據(jù)學(xué)習(xí)路線分享彈性分布式數(shù)據(jù)集RDD

好程序員大數(shù)據(jù)學(xué)習(xí)路線分享彈性分布式數(shù)據(jù)集RDD,RDD定義,RDD(Resilient Distributed Dataset)叫做分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象,它代表一個不可變(數(shù)據(jù)和元數(shù)據(jù))、可分區(qū)、里面的元素可并行計算的集合。

創(chuàng)新互聯(lián)建站是一家專業(yè)提供義安企業(yè)網(wǎng)站建設(shè),專注與成都做網(wǎng)站、網(wǎng)站制作、成都h5網(wǎng)站建設(shè)、小程序制作等業(yè)務(wù)。10年已為義安眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)絡(luò)公司優(yōu)惠進(jìn)行中。

RDD的特點(diǎn):自動容錯,位置感知性調(diào)度和可伸縮性

RDD的屬性

1.一組分片

即數(shù)據(jù)集的基本組成單位。對于RDD來說,每個分片都會被一個計算任務(wù)處理,并決定并行計算的粒度。用戶可以在創(chuàng)建RDD時指定RDD的分片個數(shù),如果沒有指定,那么就會采用默認(rèn)值。默認(rèn)值就是程序所分配到的CPU Core的數(shù)目。

2.一個計算每個分區(qū)的函數(shù)。

Spark中RDD的計算是以分片為單位的,每個RDD都會實(shí)現(xiàn)compute函數(shù)以達(dá)到這個目的。compute函數(shù)會對迭代器進(jìn)行復(fù)合,不需要保存每次計算的結(jié)果。

3.RDD之間的依賴關(guān)系。

RDD的每次轉(zhuǎn)換都會生成一個新的RDD,所以RDD之間就會形成類似于流水線一樣的前后依賴關(guān)系。

容錯處理:在部分分區(qū)數(shù)據(jù)丟失時,Spark可以通過這個依賴關(guān)系重新計算丟失的分區(qū)數(shù)據(jù),而不是對RDD的所有分區(qū)進(jìn)行重新計算。

4.一個Partitioner,分區(qū)器

即RDD的分片函數(shù)。當(dāng)前Spark中實(shí)現(xiàn)了兩種類型的分片函數(shù),一個是基于哈希的HashPartitioner,另外一個是基于范圍的RangePartitioner。只有對于key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量,也決定了parent RDD Shuffle輸出時的分片數(shù)量。

5.一個列表

存儲存取每個Partition的優(yōu)先位置(preferred location)。-> 就近原則

對于一個HDFS文件來說,這個列表保存的就是每個Partition所在的塊的位置。按照“移動數(shù)據(jù)不如移動計算”的理念,Spark在進(jìn)行任務(wù)調(diào)度的時候,會盡可能地將計算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲位置。

RDD類型

1.Transformation -> 記錄計算過程(記錄參數(shù),計算方法)

轉(zhuǎn)換

含義

map(func)

返回一個新的RDD,該RDD由每一個輸入元素經(jīng)過func函數(shù)轉(zhuǎn)換后組成

filter(func)

返回一個新的RDD,該RDD由經(jīng)過func函數(shù)計算后返回值為true的輸入元素組成

flatMap(func)

類似于map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應(yīng)該返回一個序列,而不是單一元素)

mapPartitions(func)

類似于map,但獨(dú)立地在RDD的每一個分片上運(yùn)行,因此在類型為T的RDD上運(yùn)行時,func的函數(shù)類型必須是Iterator[T] => Iterator[U]

mapPartitionsWithIndex(func)

類似于mapPartitions,但func帶有一個整數(shù)參數(shù)表示分片的索引值,因此在類型為T的RDD上運(yùn)行時,func的函數(shù)類型必須是

(Int, Iterator[T]) => Iterator[U]

sample(withReplacement, fraction, seed)

根據(jù)fraction指定的比例對數(shù)據(jù)進(jìn)行采樣,可以選擇是否使用隨機(jī)數(shù)進(jìn)行替換,seed用于指定隨機(jī)數(shù)生成器種子

union(otherDataset)

對源RDD和參數(shù)RDD求并集后返回一個新的RDD

intersection(otherDataset)

diff -> 差集

對源RDD和參數(shù)RDD求交集后返回一個新的RDD

distinct([numTasks]))

?????????[改變分區(qū)數(shù)]

對源RDD進(jìn)行去重后返回一個新的RDD

groupByKey([numTasks])

在一個(K,V)的RDD上調(diào)用,返回一個(K, Iterator[V])的RDD

reduceByKey(func, [numTasks])

在一個(K,V)的RDD上調(diào)用,返回一個(K,V)的RDD,使用指定的reduce函數(shù),將相同key的值聚合到一起,與groupByKey類似,reduce任務(wù)的個數(shù)可以通過第二個可選的參數(shù)來設(shè)置

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])


sortByKey([ascending], [numTasks])

在一個(K,V)的RDD上調(diào)用,K必須實(shí)現(xiàn)Ordered接口,返回一個按照key進(jìn)行排序的(K,V)的RDD

sortBy(func,[ascending], [numTasks])

與sortByKey類似,但是更靈活

join(otherDataset, [numTasks])

在類型為(K,V)和(K,W)的RDD上調(diào)用,返回一個相同key對應(yīng)的所有元素對在一起的(K,(V,W))的RDD

cogroup(otherDataset, [numTasks])

在類型為(K,V)和(K,W)的RDD上調(diào)用,返回一個(K,(Iterable,Iterable))類型的RDD

cartesian(otherDataset)

笛卡爾積

pipe(command, [envVars])


coalesce(numPartitions)


repartition(numPartitions)

?重新分區(qū)

repartitionAndSortWithinPartitions(partitioner)


2.Action ?-> 觸發(fā)生成job(一個job對應(yīng)一個action算子)

動作

含義

reduce(func)

通過func函數(shù)聚集RDD中的所有元素,這個功能必須是可交換且可并聯(lián)的

collect()

在驅(qū)動程序中,以數(shù)組的形式返回數(shù)據(jù)集的所有元素

count()

返回RDD的元素個數(shù)

first()

返回RDD的第一個元素(類似于take(1))

take(n)

取數(shù)據(jù)集的前n個元素組成的數(shù)組

takeSample(withReplacement,num, [seed])

返回一個數(shù)組,該數(shù)組由從數(shù)據(jù)集中隨機(jī)采樣的num個元素組成,可以選擇是否用隨機(jī)數(shù)替換不足的部分,seed用于指定隨機(jī)數(shù)生成器種子

takeOrdered(n,?[ordering])

takeOrdered和top類似,只不過以和top相反的順序返回元素

saveAsTextFile(path)

將數(shù)據(jù)集的元素以textfile的形式保存到HDFS文件系統(tǒng)或者其他支持的文件系統(tǒng),對于每個元素,Spark將會調(diào)用toString方法,將它裝換為文件中的文本

saveAsSequenceFile(path)?

將數(shù)據(jù)集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統(tǒng)。

saveAsObjectFile(path)?


countByKey()

針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應(yīng)的元素個數(shù)。

foreach(func)

在數(shù)據(jù)集的每一個元素上,運(yùn)行函數(shù)func進(jìn)行更新。

創(chuàng)建RDD

Linux進(jìn)入sparkShell:

/usr/local/spark.../bin/spark-shell \

--master spark://hadoop01:7077 \

--executor-memory 512m \

--total-executor-cores 2

或在Maven下:

object lx03 {

??def main(args: Array[String]): Unit = {

????val conf : SparkConf = new SparkConf()

??????.setAppName("SparkAPI")

??????.setMaster("local[*]")

????val sc: SparkContext = new SparkContext(conf)

????//通過并行化生成rdd

????val rdd1: RDD[Int] = sc.parallelize(List(24,56,3,2,1))

????//對add1的每個元素乘以2然后排序

????val rdd2: RDD[Int] = rdd1.map(_ * 2).sortBy(x => x,true)

????println(rdd2.collect().toBuffer)

????//過濾出大于等于10的元素

// ???val rdd3: RDD[Int] = rdd2.filter(_ >= 10)

// ???println(rdd3.collect().toBuffer)

??}

練習(xí)2

val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))

//將rdd1里面的每一個元素先切分在壓平

val rdd2 = rdd1.flatMap(_.split(' '))

rdd2.collect

//復(fù)雜的:

val rdd1 = sc.parallelize(List(List("a b c", "a b b"), List("e f g", "a f g"), List("h i j", "a a b")))

//將rdd1里面的每一個元素先切分在壓平

val rdd2 = rdd1.flatMap(_.flatMap(_.split(" ")))

練習(xí)3

val rdd1 = sc.parallelize(List(5, 6, 4, 3))

val rdd2 = sc.parallelize(List(1, 2, 3, 4))

//求并集

val rdd3 = rdd1.union(rdd2)

//求交集

val rdd4 = rdd1.intersection(rdd2)

//去重

rdd3.distinct.collect

rdd4.collect

練習(xí)4

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))

val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))

//求join

val rdd3 = rdd1.join(rdd2) ?-> 相同的key組成新的key,value

//結(jié)果: Array[(String,(Int,Int))] = Array((tom,(1,1)),(jerry,(3,2)))

rdd3.collect

//求左連接和右連接

val rdd3 = rdd1.leftOuterJoin(rdd2)

rdd3.collect

val rdd3 = rdd1.rightOuterJoin(rdd2)

rdd3.collect

//求并集

val rdd4 = rdd1 union rdd2

//按key進(jìn)行分組

rdd4.groupByKey

rdd4.collect

//分別用groupByKey和reduceByKey實(shí)現(xiàn)單詞計數(shù)

val rdd3 = rdd1 union rdd2

rdd3.groupByKey().mapValues(_.sum).collect

rdd3.reduceByKey(_+_).collect

groupByKey和reduceByKey的區(qū)別

reduceByKey算子比較特殊,它首先會進(jìn)行局部聚合,再全局聚合,我們只需要傳一個局部聚合的函數(shù)就可以了

好程序員大數(shù)據(jù)學(xué)習(xí)路線分享彈性分布式數(shù)據(jù)集RDD

練習(xí)5

val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))

val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))

//cogroup

val rdd3 = rdd1.cogroup(rdd2)

//注意cogroup與groupByKey的區(qū)別

rdd3.collect

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))

//reduce聚合

val rdd2 = rdd1.reduce(_ + _)

//按value的降序排序

val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))

rdd5.collect

//笛卡爾積

val rdd3 = rdd1.cartesian(rdd2)

計算元素個數(shù)

scala> val rdd1 = sc.parallelize(List(2,3,1,5,7,3,4))

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :27

scala> rdd1.count

res0: Long = 7 ?

top先升序排序在取值

scala> rdd1.top(3)

res1: Array[Int] = Array(7, 5, 4) ??????????????????????????????????????????????

scala> rdd1.top(0)

res2: Array[Int] = Array()

scala> rdd1.top(100)

res3: Array[Int] = Array(7, 5, 4, 3, 3, 2, 1)

take原集合前N個,有幾個取幾個

scala> rdd1.take(3)

res4: Array[Int] = Array(2, 3, 1)

scala> rdd1.take(100)

res5: Array[Int] = Array(2, 3, 1, 5, 7, 3, 4)

scala> rdd1.first

res6: Int = 2

takeordered倒序排序再取值

scala> rdd1.takeOrdered(3)

res7: Array[Int] = Array(1, 2, 3)

scala> rdd1.takeOrdered(30)

res8: Array[Int] = Array(1, 2, 3, 3, 4, 5, 7)

?????????????????????????????

生成RDD的兩種方式

1.并行化方式生成 (默認(rèn)分區(qū)兩個)

手動指定分區(qū)

scala> val rdd1 = sc.parallelize(List(1,2,3,5))

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at :27

scala> rdd1.partitions.length ?//獲取分區(qū)數(shù)

res9: Int = 2

scala> val rdd1 = sc.parallelize(List(1,2,3,5),3)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at :27

scala> rdd1.partitions.length

res10: Int = 3

2.使用textFile讀取文件存儲系統(tǒng)里的數(shù)據(jù) ?

scala> val rdd2 = sc.textFile("hdfs://hadoop01:9000/wordcount/input/a.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[11] at reduceByKey at :27

scala> rdd2.collect ?//調(diào)用算子得到RDD顯示結(jié)果

res11: Array[(String, Int)] = Array((hello,6), (beijing,1), (java,1), (gp1808,1), (world,1), (good,1), (qianfeng,1))

scala> val rdd2 = ?sc.textFile("hdfs://hadoop01:9000/wordcount/input/a.txt",4).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[26] at reduceByKey at :27

scala> rdd2.partitions.length ???//也可以自己指定分區(qū)數(shù)

res15: Int = 4


當(dāng)前標(biāo)題:好程序員大數(shù)據(jù)學(xué)習(xí)路線分享彈性分布式數(shù)據(jù)集RDD
轉(zhuǎn)載來源:http://weahome.cn/article/ipphpg.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部