???
?RDD(Resilient Distributed Dataset)叫做分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象,它代表一個(gè)不可變(RDD中的數(shù)據(jù),不能增刪改),可分區(qū)、元素可并行計(jì)算的集合。
?具有數(shù)據(jù)流的模型的特點(diǎn),自動(dòng)容錯(cuò)、位置感知性調(diào)度和可伸縮性。RDD允許用戶在執(zhí)行多個(gè)查詢時(shí)顯示的將工作集緩存在內(nèi)存中。后續(xù)的查詢能夠重用工作集,這極大地提升了查詢速度。
?RDD可以從 三方面理解:
?? - 數(shù)據(jù)集:RDD是數(shù)據(jù)集合的抽象,是復(fù)雜物理介質(zhì)上存在數(shù)據(jù)的一種邏輯視圖。從外部看RDD的確可以被看待成經(jīng)過(guò)封裝,帶擴(kuò)展特性(如容錯(cuò)性)的數(shù)據(jù)集合。
?? - 分布式:RDD的數(shù)據(jù)可能存儲(chǔ)在多個(gè)節(jié)點(diǎn)的磁盤上或者內(nèi)存中,也就是所謂的多級(jí)存儲(chǔ)。
?? - 彈性:雖然 RDD 內(nèi)部存儲(chǔ)的數(shù)據(jù)是只讀的,但是,我們可以去修改(例如通 過(guò) repartition 轉(zhuǎn)換操作)并行計(jì)算計(jì)算單元的劃分結(jié)構(gòu),也就是分區(qū)的數(shù)量。
?總之:RDD就是一個(gè)大集合,將所有的數(shù)據(jù)都加載到內(nèi)存中,方便多次進(jìn)行重用。它的數(shù)據(jù)可以在多個(gè)節(jié)點(diǎn)上,并且RDD可以保存在內(nèi)存中,當(dāng)如果某個(gè)階段的RDD丟失,不需要重新計(jì)算,只需要提取上一次的RDD,在相應(yīng)的計(jì)算即可。
成都創(chuàng)新互聯(lián)公司是一家集網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)、網(wǎng)站頁(yè)面設(shè)計(jì)、網(wǎng)站優(yōu)化SEO優(yōu)化為一體的專業(yè)網(wǎng)絡(luò)公司,已為成都等多地近百家企業(yè)提供網(wǎng)站建設(shè)服務(wù)。追求良好的瀏覽體驗(yàn),以探求精品塑造與理念升華,設(shè)計(jì)最適合用戶的網(wǎng)站頁(yè)面。 合作只是第一步,服務(wù)才是根本,我們始終堅(jiān)持講誠(chéng)信,負(fù)責(zé)任的原則,為您進(jìn)行細(xì)心、貼心、認(rèn)真的服務(wù),與眾多客戶在蓬勃發(fā)展的市場(chǎng)環(huán)境中,互促共生。
??
??一個(gè)分區(qū)通常與一個(gè)任務(wù)向關(guān)聯(lián),分區(qū)的個(gè)數(shù)決定了并行的粒度。分區(qū)的個(gè)數(shù)可以在創(chuàng)建RDD的時(shí)候指定,如果不指定,那么默認(rèn)的由節(jié)點(diǎn)的cores個(gè)數(shù)決定。最終每一個(gè)分區(qū)會(huì)被映射成為BlockManager 中的一個(gè)Block,而這個(gè)Block會(huì)被下一個(gè)task使用進(jìn)行計(jì)算。
??每一個(gè)RDD都會(huì)實(shí)現(xiàn)compute,用于分區(qū)進(jìn)行計(jì)算
??RDD 的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的 RDD,所以 RDD 之間就會(huì)形成類似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark 可以通過(guò)這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù), 而不是對(duì) RDD 的所有分區(qū)進(jìn)行重新計(jì)算。
寬依賴和窄依賴:
窄依賴(完全依賴):一個(gè)父分區(qū)唯一對(duì)應(yīng)一個(gè)子分區(qū),例:map操作
寬依賴(部分依賴):一個(gè)父分區(qū)對(duì)應(yīng)多個(gè)子分區(qū),如:reduce、group操作
區(qū)分寬依賴和窄依賴:當(dāng)前這個(gè)算子的執(zhí)行過(guò)程中是否有shuffle操作。
??當(dāng)前 Spark 中實(shí)現(xiàn)了兩種類型的分片函數(shù),一個(gè)是基于哈希的 HashPartitioner,另外一個(gè)是基于范圍的 RangePartitioner。只有對(duì)于 key-value 的 RDD,才會(huì)有 Partitioner,非 key-value的 RDD 的 Parititioner 的值是 None。Partitioner 函數(shù)不但決定了 RDD 本身的分片數(shù)量,也決 定了 parent RDD Shuffle 輸出時(shí)的分片數(shù)量。
??一個(gè)列表,存儲(chǔ)存取每個(gè) Partition 的優(yōu)先位置(preferred location)。按照”移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算”的理念,Spark 在進(jìn)行任務(wù)調(diào)度的時(shí)候,會(huì)盡可能地將計(jì)算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲(chǔ)位置。而這個(gè)列表中就存放著每個(gè)分區(qū)的優(yōu)先位置。
??RDD編程中有兩種中形式:Transformation(轉(zhuǎn)換)和Action(行動(dòng))。
?Transformation:表示把一個(gè)RDD ---->RDD。
?Action:表示把RDD----?集合或者scala對(duì)象。
object SparktTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("SparktTest")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext()
//由一個(gè)已經(jīng)存在的 Scala 數(shù)據(jù)集合創(chuàng)建
val arr=Array(1,2,3,4)
val arr1RDD: RDD[Int] = sc.parallelize(arr)
val arr2RDD: RDD[Int] = sc.makeRDD(arr)
//由外部存儲(chǔ)系統(tǒng)的數(shù)據(jù)創(chuàng)建(HDFS、HBase...)
val HDFSRDD: RDD[String] = sc.textFile("/data/input")
}
}
??官網(wǎng):http://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations
?注意:RDD中所有的轉(zhuǎn)換(Transformation)都是延遲加載,也就是說(shuō),他們并不是直接計(jì)算結(jié)果,相反的,他們只是記住這些應(yīng)用到基礎(chǔ)數(shù)據(jù)集,上的一個(gè)轉(zhuǎn)換動(dòng)作,只有當(dāng)發(fā)生一個(gè)要求返回一個(gè)Driver動(dòng)作的時(shí)候,這些轉(zhuǎn)換才真正運(yùn)行。
map()算子:
val HDFSRDD: RDD[String] = sc.textFile("/data/input")
/**
* map 算子,返回一個(gè)新的RDD,該RDD由每一個(gè)輸入元素經(jīng)過(guò)function函數(shù)轉(zhuǎn)換后組成
*/
val mapRDD: RDD[(String, Int)] = HDFSRDD.map(ele=>(ele,1))
flatMap()算子:
val arr=Array("hive hbase hadoop","spark hadoop","yarn hdfs")
val lineRDD: RDD[String] = sc.parallelize(arr)
/**
* flagMap:類似于map,但是每一個(gè)元素輸入的元素可以被
* 映射成為0個(gè)或者多個(gè)輸出的元素(返回的是一個(gè)序列,而不是單一的元素)
*/
//返回一個(gè)集合hive hbase hadoop spark hadoop yarn hdfs
val wordRDD: RDD[String] = lineRDD.flatMap(line=>line.split("\\s+"))
filter()算子:
val arr=Array(1,2,3,4,5)
val arrRDD: RDD[Int] = sc.parallelize(arr)
/**
* filter過(guò)濾:返回一個(gè)新的RDD,該RDD由經(jīng)過(guò)func函數(shù)計(jì)算后返回
* 值為true的輸入元素組成
*/
val filterRDD: RDD[Int] = arrRDD.filter(num=>num%2==0)
mapPartitions()算子:
val hdfsRDD: RDD[String] = sc.textFile("/data/input")
/**
* mapPartitions與map的唯一區(qū)別就是,mapPartitions迭代的是一個(gè)分區(qū),
* 而map遍歷的每一個(gè)元素,mapPartitions參數(shù)是一個(gè)迭代對(duì)象,返回的也是一個(gè)迭代對(duì)象
*/
val partitionRDD: RDD[String] = hdfsRDD.mapPartitions((x: Iterator[String]) => {
val temp = x.toList.map(line => line + "!")
temp.toIterator
})
mapPartitionsWithIndex()算子:
val hdfsRDD: RDD[String] = sc.textFile("/data/input")
/**
* 第一個(gè)參數(shù)是分區(qū)編號(hào):分區(qū)編號(hào)是從0開(kāi)始的不間斷的連續(xù)編號(hào)
* 第二個(gè)參數(shù)和mapPartitions相同
*/
val partitionRDD: RDD[String] = hdfsRDD.mapPartitionsWithIndex((parnum:Int,x: Iterator[String]) => {
println(parnum) //分區(qū)編號(hào)
val temp = x.toList.map(line => line + "!")
temp.toIterator
})
sample()算子:
val list=1 to 5000
/**
* sample方法有三個(gè)參數(shù):
* withReplacement:代表是否有放回的抽?。╢alse 不放回,true:放回)
* fraction:抽取樣本空間占總體的比例,(以分?jǐn)?shù)的形式) 0<=fraction <=1
* seed:隨機(jī)數(shù)生成器,new Random().nextInt(10),不傳表示使用系統(tǒng)的
* 注意:我們使用的sample算子,不能保證提供集合大小就恰巧是rdd.size()*fraction,結(jié)果大小會(huì)上下浮動(dòng)
* sample在做抽樣調(diào)查的時(shí)候,特別受用
*/
val listRDD: RDD[Int] = sc.parallelize(list)
val sampleRDD: RDD[Int] = listRDD.sample(false,0.2)
println(sampleRDD.count()) //大概是5000*0.2 上下浮動(dòng)
groupByKey()算子:
val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
/**
* groupByKey,分組
* 建議groupByKey在實(shí)踐中,能不用就不用,主要因?yàn)間roupByKey的效率低,
* 因?yàn)橛写罅康臄?shù)據(jù)在網(wǎng)絡(luò)中傳輸,而且還沒(méi)有進(jìn)行本地的預(yù)處理
* 可以使用reduceByKey或者aggregateByKey或者combineByKey代替這個(gè)groupByKey
*/
val stuRDD: RDD[(String, Int)] = sc.parallelize(list)
//分組
val groupRDD: RDD[(String, Iterable[Int])] = stuRDD.groupByKey()
//求平均值
val result: RDD[(String, Double)] = groupRDD.map { case (name, score) => {
val avg = score.sum.toDouble / (score.size)
(name, avg)
}
}
reduceByKey算子:
val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
/**
* reduceByKey:在一個(gè)(K,V)對(duì)的數(shù)據(jù)集上使用,返回一個(gè)(K,V)對(duì)的數(shù)據(jù)
* 集,key 相同的值,都被使用指定的 reduce 函數(shù)聚合
* 到一起。和 groupByKey 類似,任務(wù)的個(gè)數(shù)是可以通過(guò)
* 第二個(gè)可選參數(shù)來(lái)配置的。
*/
val stuRDD: RDD[(String, Int)] = sc.parallelize(list)
//分組,求總分
val sumRDD: RDD[(String, Int)] = stuRDD.reduceByKey((x, y)=>x+y)
sumRDD.foreach(println) //打?。海╤base,36)(math,18)(hbase,18)
sortByKey()算子:
val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
/**
* sortByKey:在一個(gè)(K,V)的 RDD 上調(diào)用,K 必須實(shí)現(xiàn) Ordered 接口,
* 返回一個(gè)按照 key 進(jìn)行排序的(K,V)的 RDD
*/
//分組,求總分,排序
val stuRDD: RDD[(String, Int)] = sc.parallelize(list)
val sumRDD: RDD[(String, Int)] = stuRDD.reduceByKey((x, y)=>x+y)
sumRDD.foreach(println) //打印:(hbase,36)(math,18)(hbase,18)
val sortRDD: RDD[(String, Int)] = sumRDD.map(kv=>(kv._2,kv._1)).sortByKey().map(kv=>(kv._2,kv._1))
sortRDD.foreach(println)
sortBy算子:
val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
/**
* sortBy(func,[ascending], [numTasks])
* 與 sortByKey 類似,但是更靈活
* 第一個(gè)參數(shù)是根據(jù)什么排序
* 第二個(gè)是怎么排序,true 正序,false 倒序
* 第三個(gè)排序后分區(qū)數(shù),默認(rèn)與原 RDD 一樣
*/
//分組,求總分,排序
val stuRDD: RDD[(String, Int)] = sc.parallelize(list)
val sumRDD: RDD[(String, Int)] = stuRDD.reduceByKey((x, y)=>x+y)
sumRDD.foreach(println) //打?。海╤base,36)(math,18)(hbase,18)
val sortRDD: RDD[(String, Int)] = sumRDD.sortBy(kv=>kv._2,false,2)
aggregateByKey()算子:
object SparktTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("SparktTest")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext()
/**
* aggregateByKey(zeroValue)(seqOp,combOp, [numTasks])
* 先按分區(qū)聚合再總的聚合,每次要跟初始值交流
* zeroValue:初始值
* seqOp:迭代操作,拿RDD中的每一個(gè)元素跟初始值進(jìn)行合并
* combOp:分區(qū)結(jié)果的最終合并
* numTasks:分區(qū)個(gè)數(shù)
* aggregate+groupByKey=aggregateByKey
* aggregate對(duì)單個(gè)值進(jìn)行RDD,aggregateByKey對(duì)(K,V)值進(jìn)行RDD
*/
//aggregate
val list = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val listRDD: RDD[Int] = sc.parallelize(list)
//求平均值
/**
* seqOp: (U, T) => U
* combOp: (U, U) => U
* u:(Int,Int) 總和,總次數(shù)
*/
val result: (Int, Int) = listRDD.aggregate(0, 0)((u: (Int, Int), x: Int) => {
(u._1 + x, u._2 + 1)
}
, (u1: (Int, Int), u2: (Int, Int)) => {
(u1._1 + u2._1, u1._2 + u2._2)
})
println(result._1 / result._2)
//aggregateByKey已經(jīng)根據(jù)(k,v)k 進(jìn)行分組,以下的操作,是對(duì)v進(jìn)行操作
//以下操作時(shí)求平均值
val list1 = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))
val stuRDD: RDD[(String, Int)] = sc.parallelize(list1)
val reslutRDD2: RDD[(String, (Int, Int))] = stuRDD.aggregateByKey((0, 0))((x: (Int, Int), y: Int) => {
(x._1 + y, x._2 + 1)
}, (x: (Int, Int), y: (Int, Int)) => {
(x._1 + y._1, x._2 + y._2)
})
reslutRDD2.foreach(kv=>{
val name=kv._1
val avg=kv._2._1.toDouble/kv._2._2
})
}
}
foldLeft()算子:(不是spark的算子,是scala的高級(jí)操作)
/**
* foldLeft
* (zeroValue: T) 初值值
* (B, A) => B B是一個(gè)元組,B._1 表示累加元素,B._2 表示個(gè)數(shù), A 表示下一個(gè)元素
*/
//aggregate
val list = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val result: (Int, Int) = list.foldLeft((0,0))((x, y)=>{(x._1+y,x._2+1)})
println(result._1.toDouble/result._2)
combineByKey()算子:
object SparktTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("SparktTest")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
/**
* combineByKey:
* 合并相同的 key 的值 rdd1.combineByKey(x => x, (a: Int,
* b: Int) => a + b, (m: Int, n: Int) => m + n)
*/
//求平均值
val list1 = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))
val listRDD: RDD[(String, Int)] = sc.parallelize(list1)
/**
* createCombiner: V => C,
* mergeValue: (C, V) => C,
* mergeCombiners: (C, C) => C): RDD[(K, C)]
*/
val resultRDD: RDD[(String, (Int, Int))] = listRDD.combineByKey(x => {
(x, 1)
},
(x: (Int, Int), y: Int) => {
(x._1 + y, x._2 + 1)
},
(x: (Int, Int), y: (Int, Int)) => {
(x._1 + y._1, x._2 + y._2)
})
resultRDD.foreach{case (name,(sum,count))=>{
val avg=sum.toDouble/count
println(s"${name}:${avg}")
}}
}
}
連接操作:
object SparktTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("SparktTest")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
val arr1 = Array(1, 2, 4, 5)
val arr1RDD = sc.parallelize(arr1)
val arr2 = Array(4, 5, 6, 7)
val arr2RDD = sc.parallelize(arr2)
//cartesian 笛卡爾積
val cartesianRDD: RDD[(Int, Int)] = arr1RDD.cartesian(arr2RDD)
//union : 連接
val unionRDD: RDD[Int] = arr1RDD.union(arr2RDD)
//subtract,求,差集
val sbutractRDD: RDD[Int] = arr1RDD.subtract(arr2RDD)
//join
val list1 = List(("a", 1), ("b", 2), ("c", 3))
val list1RDD = sc.parallelize(list1)
val list2 = List(("a", "zs"), ("b", "sl"))
val list2RDD = sc.parallelize(list2)
/**
* 根據(jù)元組中的key進(jìn)行join 操作,相同的key向連接
* 返回的是RDD[(String, (Int, String))] (key,連接結(jié)果)
*/
val joinRDD: RDD[(String, (Int, String))] = list1RDD.join(list2RDD)
//cogroup
/**
* (String key ,
* (Iterable[Int] arr1中的相應(yīng)的key所有value的集合
* , Iterable[String])) arr2中的相應(yīng)的key所有value的集合
*/
val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[String]))] = list1RDD.cogroup(list2RDD)
}
}
分區(qū)操作:
object SparktTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("SparktTest")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
val hdfsRDD: RDD[String] = sc.textFile("/data/word.txt")
/**
* 表示在執(zhí)行了filter操作之后,由于大量的數(shù)據(jù)被過(guò)濾,導(dǎo)致之前設(shè)定的分區(qū)task個(gè)數(shù),
* 處理剩下的數(shù)據(jù)導(dǎo)致資源浪費(fèi),為了合理高效的利用資源,
* 可以對(duì)task重新定義,在coalesce方法中的分區(qū)個(gè)數(shù)一定要小于之前設(shè)置的分區(qū)個(gè)數(shù)。
*/
hdfsRDD.coalesce(2)
//打亂數(shù)據(jù),重新分區(qū),分區(qū)規(guī)則為隨機(jī)分區(qū)
hdfsRDD.repartition(3)
//自定義分區(qū)規(guī)則(注意,只在有key-value的RDD中可以使用)
var arr1 = Array(("a", 1), ("a", 2), ("c", 1), ("b", 2), ("d", 2)
("b", 2), ("e", 2)
, ("b", 2)
, ("f", 2), ("g", 2), ("h", 2))
val arrRDD: RDD[(String, Int)] = sc.parallelize(arr1,4)
arrRDD.partitionBy(new MyPartitioner(3))
}
}
class MyPartitioner(val numPTN:Int) extends Partitioner{
//分區(qū)個(gè)數(shù)
override def numPartitions: Int = numPTN
//分區(qū)規(guī)則
override def getPartition(key: Any): Int = {
val num=key.hashCode()&Integer.MAX_VALUE%numPTN
return num
}
}
總結(jié):
- Transformation返回的仍然是一個(gè)RDD
- 它使用了鏈?zhǔn)秸{(diào)用的設(shè)計(jì)模式,對(duì)一個(gè) RDD 進(jìn)行計(jì) 算后,變換成另外一個(gè) RDD,然后這個(gè) RDD 又可以進(jìn)行另外一次轉(zhuǎn)換。這個(gè)過(guò)程是分布式的。
常見(jiàn)操作:
object SparktTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("SparktTest")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
val list = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))
val listRDD: RDD[(String, Int)] = sc.parallelize(list)
//action rdd ---map
listRDD.reduceByKeyLocally((x,y)=>x+y)
//調(diào)用collect的目的是:觸發(fā)所有的計(jì)算,最終收集當(dāng)前這個(gè)調(diào)用者RDD的所有數(shù)據(jù),返回到客戶端,如果數(shù)據(jù)量比較大,謹(jǐn)慎使用
listRDD.collect()
//統(tǒng)計(jì)RDD中有多少記錄
listRDD.count()
//取出RDD中的第一條記錄
listRDD.first()
//取出RDD前幾條記錄
listRDD.take(5)
//隨機(jī)采樣
listRDD.takeSample(false,20)
//按照某種格式,排序后的前幾條
listRDD.top(50)
//按照升序或者降序,取相應(yīng)的條數(shù)的記錄(其中的元素必須繼承Ordered)
listRDD.takeOrdered(3)
//統(tǒng)計(jì)每一個(gè)key中的value有多少個(gè)
listRDD.countByKey()
//統(tǒng)計(jì)有多少個(gè)元素
listRDD.countByValue()
//遍歷RDD中每一個(gè)元素
listRDD.foreach(kv=>{})
//分區(qū)遍歷RDD中的元素
listRDD.foreachPartition(kv=>{})
//將RDD的結(jié)果,保存到相應(yīng)的文件系統(tǒng)中(注意這個(gè)目錄一定是不存在的目錄)
listRDD.saveAsTextFile("/data/output")
}
}
總結(jié):Action返回值不是一個(gè)RDD。它要么是一個(gè)scala的集合,要么是一個(gè)值,要么是空。最終返回到Driver程序,或者把RDD寫(xiě)入到文件系統(tǒng)中。