這篇文章將為大家詳細講解有關(guān)分布式數(shù)據(jù)集SparkRDD的依賴與緩存是怎樣的,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。
創(chuàng)新互聯(lián)公司從2013年成立,先為利津等服務(wù)建站,利津等地企業(yè),進行企業(yè)商務(wù)咨詢服務(wù)。為利津企業(yè)網(wǎng)站制作PC+手機+微官網(wǎng)三網(wǎng)同步一站式服務(wù)解決您的所有建站問題。
RDD簡介
RDD(Resilient Distributed Dataset)叫做分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象,它代表一個不可變、可分區(qū)、里面的元素可并行計算的集合。RDD是一個類
RDD的屬性
1.一個列表,存儲存取每個Partition的優(yōu)先位置(preferred location)。對于一個HDFS文件來說,這個列表保存的就是每個Partition所在的塊的位置。按照“移動數(shù)據(jù)不如移動計算”的理念,Spark在進行任務(wù)調(diào)度的時候,會盡可能地將計算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲位置。
2.保存了計算每個分區(qū)的函數(shù),這個計算方法會應(yīng)用到每一個數(shù)據(jù)塊上,Spark中RDD的計算是以分片為單位的,每個RDD都會實現(xiàn)compute函數(shù)以達到這個目的。compute函數(shù)會對迭代器進行復(fù)合,不需要保存每次計算的結(jié)果。
3.RDD之間的依賴關(guān)系。RDD的每次轉(zhuǎn)換都會生成一個新的RDD,所以RDD之間就會形成類似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時,Spark可以通過這個依賴關(guān)系重新計算丟失的分區(qū)數(shù)據(jù),而不是對RDD的所有分區(qū)進行重新計算。
4.RDD的分片函數(shù)(Partitioner),一個是基于哈希的HashPartitioner,另外一個是基于范圍的RangePartitioner。只有對于于key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量,也決定了parent RDD Shuffle輸出時的分片數(shù)量。
5.一組分片(Partition),即數(shù)據(jù)集的基本組成單位。對于RDD來說,每個分片都會被一個計算任務(wù)處理,并決定并行計算的粒度。用戶可以在創(chuàng)建RDD時指定RDD的分片個數(shù),如果沒有指定,那么就會采用默認值。默認值就是程序所分配到的CPU Core的數(shù)目。
如何創(chuàng)建RDD
1.通過序列化集合的方式創(chuàng)建RDD(parallelize,makeRDD)
2.通過讀取外部的數(shù)據(jù)源(testFile)
3.通過其他的rdd做transformation操作轉(zhuǎn)換成行的RDD
RDD的兩種算子:
1.Transformation
map(func) :返回一個新的分布式數(shù)據(jù)集,由每個原元素經(jīng)過func函數(shù)轉(zhuǎn)換后組成
filter(func) : 返回一個新的數(shù)據(jù)集,由經(jīng)過func函數(shù)后返回值為true的原元素組成
flatMap(func) : 類似于map,但是每一個輸入元素,會被映射為0到多個輸出元素(因此,func函數(shù)的返回值是一個Seq,而不是單一元素)
flatMap(func) : 類似于map,但是每一個輸入元素,會被映射為0到多個輸出元素(因此,func函數(shù)的返回值是一個Seq,而不是單一元素)
sample(withReplacement, frac, seed) :
根據(jù)fraction指定的比例對數(shù)據(jù)進行采樣,可以選擇是否使用隨機數(shù)進行替換,seed用于指定隨機數(shù)生成器種子
union(otherDataset) : 返回一個新的數(shù)據(jù)集,由原數(shù)據(jù)集和參數(shù)聯(lián)合而成
reduceByKey(func, [numTasks]) : 在一個(K,V)對的數(shù)據(jù)集上使用,返回一個(K,V)對的數(shù)據(jù)集,key相同的值,都被使用指定的reduce函數(shù)聚合到一起。和groupbykey類似,任務(wù)的個數(shù)是可以通過第二個可選參數(shù)來配置的。
join(otherDataset, [numTasks]) :
在類型為(K,V)和(K,W)類型的數(shù)據(jù)集上調(diào)用,返回一個(K,(V,W))對,每個key中的所有元素都在一起的數(shù)據(jù)集
groupWith(otherDataset, [numTasks]) : 在類型為(K,V)和(K,W)類型的數(shù)據(jù)集上調(diào)用,返回一個數(shù)據(jù)集,組成元素為(K, Seq[V], Seq[W]) Tuples。這個操作在其它框架,稱為CoGroup
cartesian(otherDataset) : 笛卡爾積。但在數(shù)據(jù)集T和U上調(diào)用時,返回一個(T,U)對的數(shù)據(jù)集,所有元素交互進行笛卡爾積。
intersection(otherDataset):對源RDD和參數(shù)RDD求交集后返回一個新的RDD
distinct([numTasks])) 對源RDD進行去重后返回一個新的RDD
groupByKey([numTasks]) 在一個(K,V)的RDD上調(diào)用,返回一個(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一個(K,V)的RDD上調(diào)用,返回一個(K,V)的RDD,使用指定的reduce函數(shù),將相同key的值聚合到一起,與groupByKey類似,reduce任務(wù)的個數(shù)可以通過第二個可選的參數(shù)來設(shè)置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks]) 在一個(K,V)的RDD上調(diào)用,K必須實現(xiàn)Ordered接口,返回一個按照key進行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 與sortByKey類似,但是更靈活
join(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調(diào)用,返回一個相同key對應(yīng)的所有元素對在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調(diào)用,返回一個(K,(Iterable
2.Action
reduce(func) 通過func函數(shù)聚集RDD中的所有元素,這個功能必須是課交換且可并聯(lián)的
collect() 在驅(qū)動程序中,以數(shù)組的形式返回數(shù)據(jù)集的所有元素
count() 返回RDD的元素個數(shù)
first() 返回RDD的***個元素(類似于take(1))
take(n) 返回一個由數(shù)據(jù)集的前n個元素組成的數(shù)組
takeSample(withReplacement,num, [seed]) 返回一個數(shù)組,該數(shù)組由從數(shù)據(jù)集中隨機采樣的num個元素組成,可以選擇是否用隨機數(shù)替換不足的部分,seed用于指定隨機數(shù)生成器種子
takeOrdered(n, [ordering])
saveAsTextFile(path) 將數(shù)據(jù)集的元素以textfile的形式保存到HDFS文件系統(tǒng)或者其他支持的文件系統(tǒng),對于每個元素,Spark將會調(diào)用toString方法,將它裝換為文件中的文本
saveAsSequenceFile(path) 將數(shù)據(jù)集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統(tǒng)。
saveAsObjectFile(path)
countByKey() 針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應(yīng)的元素個數(shù)。
foreach(func) 在數(shù)據(jù)集的每一個元素上,運行函數(shù)func進行更新。
RDD的依賴關(guān)系
1.窄依賴
窄依賴指的是每一個父RDD的Partition最多被子RDD的一個Partition使用
總結(jié):窄依賴我們形象的比喻為獨生子女
2.寬依賴
寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition
總結(jié):窄依賴我們形象的比喻為超生
3.Lineage(血統(tǒng))
RDD只支持粗粒度轉(zhuǎn)換,即在大量記錄上執(zhí)行的單個操作。將創(chuàng)建RDD的一系列Lineage(即血統(tǒng))記錄下來,以便恢復(fù)丟失的分區(qū)。RDD的Lineage會記錄RDD的元數(shù)據(jù)信息和轉(zhuǎn)換行為,當該RDD的部分分區(qū)數(shù)據(jù)丟失時,它可以根據(jù)這些信息來重新運算和恢復(fù)丟失的數(shù)據(jù)分區(qū)。
DAG的生成
DAG(Directed Acyclic Graph)叫做有向無環(huán)圖,原始的RDD通過一系列的轉(zhuǎn)換就就形成了DAG,根據(jù)RDD之間的依賴關(guān)系的不同將DAG劃分成不同的Stage,對于窄依賴,partition的轉(zhuǎn)換處理在Stage中完成計算。對于寬依賴,由于有Shuffle的存在,只能在parent RDD處理完成后,才能開始接下來的計算,因此寬依賴是劃分Stage的依據(jù)。
RDD的緩存
Spark速度非??斓脑蛑唬褪窃诓煌僮髦锌梢栽趦?nèi)存中持久化或緩存?zhèn)€數(shù)據(jù)集。當持久化某個RDD后,每一個節(jié)點都將把計算的分片結(jié)果保存在內(nèi)存中,并在對此RDD或衍生出的RDD進行的其他動作中重用。這使得后續(xù)的動作變得更加迅速。RDD相關(guān)的持久化和緩存,是Spark最重要的特征之一??梢哉f,緩存是Spark構(gòu)建迭代式算法和快速交互式查詢的關(guān)鍵。
找依賴關(guān)系劃分stage的目的之一就是劃分緩存, 如何通過stage的劃分設(shè)置緩存?
(1)在窄依賴想設(shè)置緩存時用cache
(2)在寬依賴想設(shè)置緩存時用checkpoint
如何設(shè)置cache和checkpoint?
cache:someRDD.cache()就添加成功緩存,放入到內(nèi)存中
someRDD.persist(StorageLevel.MEMORY_AND_DISK):根據(jù)自己的需要設(shè)置緩存的位置(內(nèi)存和硬盤)
checkpoint:可以把RDD計算后的數(shù)據(jù)存儲在本地磁盤上,也可以是hdfs
sc.setCheckpointDIr("hdfs://hadoop1:9000/checkpoint")設(shè)置checkpoint的路徑 在寬依賴前設(shè)置
someRDD.checkpoint()設(shè)置checkpoint
cache 和checkpoint的區(qū)別
cache只是緩存數(shù)據(jù),不改變RDD的依賴關(guān)系,checkpoint生成了一個新的RDD,后面的RDD將依賴新的RDD依賴關(guān)系已經(jīng)改變 。數(shù)據(jù)恢復(fù)的順序:checkpoint ---》cache--》重算
關(guān)于分布式數(shù)據(jù)集SparkRDD的依賴與緩存是怎樣的就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。