本篇內(nèi)容主要講解“spark的RDD、算子、持久化算子分別是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“spark的RDD、算子、持久化算子分別是什么”吧!
創(chuàng)新互聯(lián)主要從事成都網(wǎng)站制作、做網(wǎng)站、網(wǎng)頁設計、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務。立足成都服務大石橋,十多年網(wǎng)站建設經(jīng)驗,價格優(yōu)惠、服務專業(yè),歡迎來電咨詢建站服務:18980820575
一:RDD的介紹
RDD(Resilient Distributed Dateset),彈性分布式數(shù)據(jù)集。
RDD的五大特性:
1.RDD是由一系列的partition組成的。
2.函數(shù)是作用在每一個partition(split)上的。
3.RDD之間有一系列的依賴關系。
4.分區(qū)器是作用在K,V格式的RDD上。
5.RDD提供一系列最佳的計算位置。
注意:
textFile方法底層封裝的是讀取MR讀取文件的方式,讀取文件之前先split,默認split大小是一個block大小。
RDD實際上不存儲數(shù)據(jù),這里方便理解,暫時理解為存儲數(shù)據(jù)。
什么是K,V格式的RDD?
如果RDD里面存儲的數(shù)據(jù)都是二元組對象,那么這個RDD我們就叫做K,V格式的RDD。
哪里體現(xiàn)RDD的彈性(容錯)?
partition數(shù)量,大小沒有限制,體現(xiàn)了RDD的彈性。
RDD之間依賴關系,可以基于上一個RDD重新計算出RDD。
哪里體現(xiàn)RDD的分布式?
RDD是由Partition組成,partition是分布在不同節(jié)點上的。RDD提供計算最佳位置,體現(xiàn)了數(shù)據(jù)本地化。體現(xiàn)了大數(shù)據(jù)中“計算移動數(shù)據(jù)不移動”的理念。
RDD的創(chuàng)建方式
XX.parallelize() XX.makeRDD()
二、算子
Transformations轉(zhuǎn)換算子有如下:
filter 過濾符合條件的記錄數(shù),true保留,false過濾掉 map 將一個RDD中的每個數(shù)據(jù)項,通過map中的函數(shù)映射變?yōu)橐粋€新的元素 特點:輸入一條,輸出一條數(shù)據(jù) flatMap 先map后flat。與map類似,每個輸入項可以映射為0到多個輸出項 sample 隨機抽樣算子,根據(jù)傳進去的小數(shù)按比例進行又放回或者無放回的抽樣 reduceByKey 將相同的Key根據(jù)相應的邏輯進行處理 sortByKey/sortBy 作用在K,V格式的RDD上,對key進行升序或者降序排序 join,leftOuterJoin,rightOuterJoin,fullOuterJoin 作用在K,V格式的RDD上。根據(jù)K進行連接,對(K,V)join(K,W)返回(K,(V,W)),join后的分區(qū)數(shù)與父RDD分區(qū)數(shù)多的那一個相同。 union 合并兩個數(shù)據(jù)集。兩個數(shù)據(jù)集的類型要一致,返回新的RDD的分區(qū)數(shù)是合并RDD分區(qū)數(shù)的總和。 intersection 取兩個數(shù)據(jù)集的交集,返回新的RDD與父RDD分區(qū)多的一致 subtract 取兩個數(shù)據(jù)集的差集,結(jié)果RDD的分區(qū)數(shù)與subtract前面的RDD的分區(qū)數(shù)一致。 mapPartitions 與map類似,遍歷的單位是每個partition上的數(shù)據(jù)。 distinct 實際內(nèi)部使用的是map+reduceByKey+map,就是去重的意思 cogroup 當調(diào)用類型(K,V)和(K,W)的數(shù)據(jù)上時,返回一個數(shù)據(jù)集(K,(Iterable,Iterable )),子RDD的分區(qū)與父RDD多的一致 mapPartitionWithIndex 類似于mapPartitions,除此之外還會攜帶分區(qū)的索引值。 repartition 增加或減少分區(qū)。會產(chǎn)生shuffle。(多個分區(qū)分到一個分區(qū)不會產(chǎn)生shuffle) coalesce 用來減少分區(qū),第二個參數(shù)是減少分區(qū)的過程中是否產(chǎn)生shuffle。true為產(chǎn)生shuffle,false不產(chǎn)生shuffle。默認是false。 如果coalesce設置的分區(qū)數(shù)比原來的RDD的分區(qū)數(shù)還多的話,第二個參數(shù)設置為false不會起作用,如果設置成true,效果和repartition一樣。即repartition(numPartitions) = coalesce(numPartitions,true) groupByKey 作用在K,V格式的RDD上。根據(jù)Key進行分組。作用在(K,V),返回(K,Iterable )。 zip 將兩個RDD中的元素(KV格式/非KV格式)變成一個KV格式的RDD,兩個RDD的每個分區(qū)元素個數(shù)必須相同 zipWithIndex 該函數(shù)將RDD中的元素和這個元素在RDD中的索引號(從0開始)組合成(K,V)對。
Action行動算子有如下:
count 返回數(shù)據(jù)集中的元素數(shù)。會在結(jié)果計算完成后回收到Driver端 take(n) 返回一個包含數(shù)據(jù)集前n個元素的集合 first first=take(1),返回數(shù)據(jù)集中的第一個元素 foreach 循環(huán)遍歷數(shù)據(jù)集中的每個元素,運行相應的邏輯 collect 將計算結(jié)果回收到Driver端
轉(zhuǎn)換算子和行動算子的區(qū)別:
大多數(shù)情況下轉(zhuǎn)換算子返回的內(nèi)心是RDD類型,行動算子返回的類型大多常常是普通類型,當不知道算子是屬于那個類型算子的時候,可以用這個推測
最后一種叫做:控制算子,分別是cache、persist、checkpoint,cache和persist都是懶執(zhí)行的。必須有一個action類算子觸發(fā)執(zhí)行。checkpoint算子不僅能將RDD持久化到磁盤,還能切斷RDD之間的依賴關系
三、案列
案列1:cache
請求數(shù)據(jù):words數(shù)據(jù)有約1700萬條記錄,文件大小約200m大小
結(jié)果:
沒用做持久化算子處理:56995 毫秒 cache: 274 毫秒
scala代碼
def main(args: Array[String]): Unit = { /*cache*/ val conf = new SparkConf() conf.setAppName("spark04") conf.setMaster("local") val context = new SparkContext(conf) val sc = context.textFile("./words") val result = sc.flatMap(_.split("")).map((_,1)).reduceByKey((_+_)) val startTime1 = System.currentTimeMillis() //默認將RDD的數(shù)據(jù)持久化到內(nèi)存中。cache是懶執(zhí)行,使用persist時候,與cache搭配使用,效率要高點 //不能這樣寫:rdd.cache().count()返回的不是持久化的RDD,而是一個數(shù)值了 result.count() println("沒用做持久化算子處理:"+(System.currentTimeMillis()-startTime1)+" 毫秒") val startTime2 = System.currentTimeMillis() //默認將RDD的數(shù)據(jù)持久化到內(nèi)存中。cache是懶執(zhí)行 result.cache() //cache和persist都是懶執(zhí)行,必須有一個action類算子觸發(fā)執(zhí)行 result.count() println("cache: "+(System.currentTimeMillis()-startTime2).toString+" 毫秒") }
案列2: persist
數(shù)據(jù):words數(shù)據(jù)有約1700萬條記錄,文件大小約200m大小
結(jié)果:
沒用做持久化算子處理:55350毫秒 persist: 312 毫秒
scala代碼
def main(args: Array[String]): Unit = { /*cache*/ val conf = new SparkConf() conf.setAppName("spark04") conf.setMaster("local") val context = new SparkContext(conf) val sc = context.textFile("./words") val result = sc.flatMap(_.split("")).map((_,1)).reduceByKey((_+_)) val startTime1 = System.currentTimeMillis() result.count() println("沒用做持久化算子處理:"+(System.currentTimeMillis()-startTime1)+" 毫秒") val startTime2 = System.currentTimeMillis() //默認將RDD的數(shù)據(jù)持久化到內(nèi)存中。cache是懶執(zhí)行,使用persist時候,與cache搭配使用,效率要高點 //不能這樣寫:rdd.persist().count()返回的不是持久化的RDD,而是一個數(shù)值了 //persist()內(nèi)部就是調(diào)用cache() result.persist() //1cache和persist都是懶執(zhí)行,必須有一個action類算子觸發(fā)執(zhí)行。 result.count() println("persist: "+(System.currentTimeMillis()-startTime2).toString+" 毫秒") } }
案列3: checkpoint
數(shù)據(jù):words數(shù)據(jù)有約1700萬條記錄,文件大小約200m大小
結(jié)果:
沒用做持久化算子處理:55575 毫秒 checkpoint: 55851 毫秒 同時會在指定的輸出路徑中多出持久化到硬盤的數(shù)據(jù)文件 由于要持久化到硬盤中,速度要慢很多 可以與result.cache()搭配使用 搭配使用后的運行結(jié)果 checkpoint: 54621 毫秒 比之前的少了1000多毫秒,result使用cache后,以前處理的數(shù)據(jù)都放到緩存中去了,所以要稍微快一點
scala代碼
def main(args: Array[String]): Unit = { /*cache*/ val conf = new SparkConf() conf.setAppName("spark04") conf.setMaster("local") val context = new SparkContext(conf) //設置checkpoint輸出路徑 context.setCheckpointDir("./checkpoint_file") //數(shù)據(jù)來源 val sc = context.textFile("./words") val result = sc.flatMap(_.split("")).map((_,1)).reduceByKey((_+_)) val startTime1 = System.currentTimeMillis() result.count() println("沒用做持久化算子處理:"+(System.currentTimeMillis()-startTime1)+" 毫秒") val startTime2 = System.currentTimeMillis() //不能這樣:rdd.checkpoint().count() result.checkpoint() /** 一個action類原子就是會啟動一個job 1.當RDD的job執(zhí)行完畢后,會從finalRDD從后往前回溯。 2.當回溯到某一個RDD調(diào)用了checkpoint方法,會對當前的RDD做一個標記。 3.Spark框架會自動啟動一個新的job,重新計算這個RDD的數(shù)據(jù),將數(shù)據(jù)持久化到HDFS上。 優(yōu)化:對RDD執(zhí)行checkpoint之前,最好對這個RDD先執(zhí)行cache,這樣新啟動的job只需要將內(nèi)存中的數(shù)據(jù)拷貝到HDFS上就可以,省去了重新計算這一步 */ //result.cache() result.count() println("checkpoint: "+(System.currentTimeMillis()-startTime2).toString+" 毫秒") }
總結(jié):
cache和persist的注意事項: 1.cache和persist都是懶執(zhí)行,必須有一個action類算子觸發(fā)執(zhí)行。 2.cache和persist算子的返回值可以賦值給一個變量,在其他job中直接使用這個變量就是使用持久化的數(shù)據(jù)了。持久化的單位是partition。 3.cache和persist算子后不能立即緊跟action算子。 4.cache和persist算子持久化的數(shù)據(jù)當applilcation執(zhí)行完成之后會被清除。 錯誤:rdd.cache().count() 返回的不是持久化的RDD,而是一個數(shù)值了。 checkpoint checkpoint將RDD持久化到磁盤,還可以切斷RDD之間的依賴關系。checkpoint目錄數(shù)據(jù)當application執(zhí)行完之后不會被清除。 checkpoint 的執(zhí)行原理: 1.當RDD的job執(zhí)行完畢后,會從finalRDD從后往前回溯。 2.當回溯到某一個RDD調(diào)用了checkpoint方法,會對當前的RDD做一個標記。 3.Spark框架會自動啟動一個新的job,重新計算這個RDD的數(shù)據(jù),將數(shù)據(jù)持久化到HDFS上。 優(yōu)化:對RDD執(zhí)行checkpoint之前,最好對這個RDD先執(zhí)行cache,這樣新啟動的job只需要將內(nèi)存中的數(shù)據(jù)拷貝到HDFS上就可以,省去了重新計算這一步
到此,相信大家對“spark的RDD、算子、持久化算子分別是什么”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關內(nèi)容可以進入相關頻道進行查詢,關注我們,繼續(xù)學習!