真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

spark的RDD、算子、持久化算子分別是什么

本篇內(nèi)容主要講解“spark的RDD、算子、持久化算子分別是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“spark的RDD、算子、持久化算子分別是什么”吧!

創(chuàng)新互聯(lián)主要從事成都網(wǎng)站制作、做網(wǎng)站、網(wǎng)頁設計、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務。立足成都服務大石橋,十多年網(wǎng)站建設經(jīng)驗,價格優(yōu)惠、服務專業(yè),歡迎來電咨詢建站服務:18980820575

一:RDD的介紹

RDD(Resilient Distributed Dateset),彈性分布式數(shù)據(jù)集。
RDD的五大特性:
1.RDD是由一系列的partition組成的。
2.函數(shù)是作用在每一個partition(split)上的。
3.RDD之間有一系列的依賴關系。
4.分區(qū)器是作用在K,V格式的RDD上。
5.RDD提供一系列最佳的計算位置。

  1. 注意:

  • textFile方法底層封裝的是讀取MR讀取文件的方式,讀取文件之前先split,默認split大小是一個block大小。

  • RDD實際上不存儲數(shù)據(jù),這里方便理解,暫時理解為存儲數(shù)據(jù)。

  • 什么是K,V格式的RDD?

  • 如果RDD里面存儲的數(shù)據(jù)都是二元組對象,那么這個RDD我們就叫做K,V格式的RDD。

  • 哪里體現(xiàn)RDD的彈性(容錯)?

  • partition數(shù)量,大小沒有限制,體現(xiàn)了RDD的彈性。

  • RDD之間依賴關系,可以基于上一個RDD重新計算出RDD。

  • 哪里體現(xiàn)RDD的分布式?

  • RDD是由Partition組成,partition是分布在不同節(jié)點上的。RDD提供計算最佳位置,體現(xiàn)了數(shù)據(jù)本地化。體現(xiàn)了大數(shù)據(jù)中“計算移動數(shù)據(jù)不移動”的理念。

RDD的創(chuàng)建方式

XX.parallelize()
XX.makeRDD()

二、算子

Transformations轉(zhuǎn)換算子有如下:

filter
過濾符合條件的記錄數(shù),true保留,false過濾掉

map
將一個RDD中的每個數(shù)據(jù)項,通過map中的函數(shù)映射變?yōu)橐粋€新的元素
特點:輸入一條,輸出一條數(shù)據(jù)

flatMap
先map后flat。與map類似,每個輸入項可以映射為0到多個輸出項

sample
隨機抽樣算子,根據(jù)傳進去的小數(shù)按比例進行又放回或者無放回的抽樣

reduceByKey
將相同的Key根據(jù)相應的邏輯進行處理

sortByKey/sortBy
作用在K,V格式的RDD上,對key進行升序或者降序排序

join,leftOuterJoin,rightOuterJoin,fullOuterJoin
作用在K,V格式的RDD上。根據(jù)K進行連接,對(K,V)join(K,W)返回(K,(V,W)),join后的分區(qū)數(shù)與父RDD分區(qū)數(shù)多的那一個相同。

union
合并兩個數(shù)據(jù)集。兩個數(shù)據(jù)集的類型要一致,返回新的RDD的分區(qū)數(shù)是合并RDD分區(qū)數(shù)的總和。

intersection
取兩個數(shù)據(jù)集的交集,返回新的RDD與父RDD分區(qū)多的一致

subtract
取兩個數(shù)據(jù)集的差集,結(jié)果RDD的分區(qū)數(shù)與subtract前面的RDD的分區(qū)數(shù)一致。

mapPartitions
與map類似,遍歷的單位是每個partition上的數(shù)據(jù)。

distinct
實際內(nèi)部使用的是map+reduceByKey+map,就是去重的意思

cogroup 
當調(diào)用類型(K,V)和(K,W)的數(shù)據(jù)上時,返回一個數(shù)據(jù)集(K,(Iterable,Iterable)),子RDD的分區(qū)與父RDD多的一致

mapPartitionWithIndex
類似于mapPartitions,除此之外還會攜帶分區(qū)的索引值。

repartition
增加或減少分區(qū)。會產(chǎn)生shuffle。(多個分區(qū)分到一個分區(qū)不會產(chǎn)生shuffle)

coalesce
用來減少分區(qū),第二個參數(shù)是減少分區(qū)的過程中是否產(chǎn)生shuffle。true為產(chǎn)生shuffle,false不產(chǎn)生shuffle。默認是false。
如果coalesce設置的分區(qū)數(shù)比原來的RDD的分區(qū)數(shù)還多的話,第二個參數(shù)設置為false不會起作用,如果設置成true,效果和repartition一樣。即repartition(numPartitions) = coalesce(numPartitions,true)

groupByKey
作用在K,V格式的RDD上。根據(jù)Key進行分組。作用在(K,V),返回(K,Iterable )。

zip
將兩個RDD中的元素(KV格式/非KV格式)變成一個KV格式的RDD,兩個RDD的每個分區(qū)元素個數(shù)必須相同

zipWithIndex
該函數(shù)將RDD中的元素和這個元素在RDD中的索引號(從0開始)組合成(K,V)對。

Action行動算子有如下:

count
返回數(shù)據(jù)集中的元素數(shù)。會在結(jié)果計算完成后回收到Driver端

take(n)
返回一個包含數(shù)據(jù)集前n個元素的集合

first
first=take(1),返回數(shù)據(jù)集中的第一個元素

foreach
循環(huán)遍歷數(shù)據(jù)集中的每個元素,運行相應的邏輯

collect
將計算結(jié)果回收到Driver端

轉(zhuǎn)換算子和行動算子的區(qū)別:

大多數(shù)情況下轉(zhuǎn)換算子返回的內(nèi)心是RDD類型,行動算子返回的類型大多常常是普通類型,當不知道算子是屬于那個類型算子的時候,可以用這個推測

最后一種叫做:控制算子,分別是cache、persist、checkpoint,cache和persist都是懶執(zhí)行的。必須有一個action類算子觸發(fā)執(zhí)行。checkpoint算子不僅能將RDD持久化到磁盤,還能切斷RDD之間的依賴關系

三、案列

案列1:cache

請求數(shù)據(jù):words數(shù)據(jù)有約1700萬條記錄,文件大小約200m大小

結(jié)果:

 沒用做持久化算子處理:56995 毫秒
 cache: 274 毫秒

scala代碼

 def main(args: Array[String]): Unit = {
    /*cache*/
   val conf = new SparkConf()
    conf.setAppName("spark04")
    conf.setMaster("local")

    val context = new SparkContext(conf)
    val sc = context.textFile("./words")
    val result = sc.flatMap(_.split("")).map((_,1)).reduceByKey((_+_))

    val startTime1 = System.currentTimeMillis()
    //默認將RDD的數(shù)據(jù)持久化到內(nèi)存中。cache是懶執(zhí)行,使用persist時候,與cache搭配使用,效率要高點
    //不能這樣寫:rdd.cache().count()返回的不是持久化的RDD,而是一個數(shù)值了
    result.count()
    println("沒用做持久化算子處理:"+(System.currentTimeMillis()-startTime1)+" 毫秒")

    val startTime2 = System.currentTimeMillis()
    //默認將RDD的數(shù)據(jù)持久化到內(nèi)存中。cache是懶執(zhí)行
    result.cache()
    //cache和persist都是懶執(zhí)行,必須有一個action類算子觸發(fā)執(zhí)行
    result.count()
    println("cache: "+(System.currentTimeMillis()-startTime2).toString+" 毫秒")
  }

案列2: persist

數(shù)據(jù):words數(shù)據(jù)有約1700萬條記錄,文件大小約200m大小

結(jié)果:

沒用做持久化算子處理:55350毫秒 persist: 312 毫秒

scala代碼

 def main(args: Array[String]): Unit = {
    /*cache*/
   val conf = new SparkConf()
    conf.setAppName("spark04")
    conf.setMaster("local")

    val context = new SparkContext(conf)
    val sc = context.textFile("./words")
    val result = sc.flatMap(_.split("")).map((_,1)).reduceByKey((_+_))

    val startTime1 = System.currentTimeMillis()
    result.count()
    println("沒用做持久化算子處理:"+(System.currentTimeMillis()-startTime1)+" 毫秒")

    val startTime2 = System.currentTimeMillis()
    //默認將RDD的數(shù)據(jù)持久化到內(nèi)存中。cache是懶執(zhí)行,使用persist時候,與cache搭配使用,效率要高點
    //不能這樣寫:rdd.persist().count()返回的不是持久化的RDD,而是一個數(shù)值了
    //persist()內(nèi)部就是調(diào)用cache()
    result.persist()
    //1cache和persist都是懶執(zhí)行,必須有一個action類算子觸發(fā)執(zhí)行。
    result.count()
    println("persist: "+(System.currentTimeMillis()-startTime2).toString+" 毫秒")
  }
}

案列3: checkpoint

數(shù)據(jù):words數(shù)據(jù)有約1700萬條記錄,文件大小約200m大小

結(jié)果:

沒用做持久化算子處理:55575 毫秒
checkpoint: 55851 毫秒
同時會在指定的輸出路徑中多出持久化到硬盤的數(shù)據(jù)文件

由于要持久化到硬盤中,速度要慢很多
可以與result.cache()搭配使用

搭配使用后的運行結(jié)果
checkpoint: 54621 毫秒
比之前的少了1000多毫秒,result使用cache后,以前處理的數(shù)據(jù)都放到緩存中去了,所以要稍微快一點

scala代碼

 def main(args: Array[String]): Unit = {
    /*cache*/
   val conf = new SparkConf()
    conf.setAppName("spark04")
    conf.setMaster("local")

    val context = new SparkContext(conf)
    //設置checkpoint輸出路徑
    context.setCheckpointDir("./checkpoint_file")
    //數(shù)據(jù)來源
    val sc = context.textFile("./words")

    val result = sc.flatMap(_.split("")).map((_,1)).reduceByKey((_+_))

    val startTime1 = System.currentTimeMillis()
    result.count()
    println("沒用做持久化算子處理:"+(System.currentTimeMillis()-startTime1)+" 毫秒")

    val startTime2 = System.currentTimeMillis()
    //不能這樣:rdd.checkpoint().count()
    result.checkpoint()
    /** 一個action類原子就是會啟動一個job
      1.當RDD的job執(zhí)行完畢后,會從finalRDD從后往前回溯。
      2.當回溯到某一個RDD調(diào)用了checkpoint方法,會對當前的RDD做一個標記。
      3.Spark框架會自動啟動一個新的job,重新計算這個RDD的數(shù)據(jù),將數(shù)據(jù)持久化到HDFS上。
      優(yōu)化:對RDD執(zhí)行checkpoint之前,最好對這個RDD先執(zhí)行cache,這樣新啟動的job只需要將內(nèi)存中的數(shù)據(jù)拷貝到HDFS上就可以,省去了重新計算這一步
      */
    //result.cache()
    result.count()
    println("checkpoint: "+(System.currentTimeMillis()-startTime2).toString+" 毫秒")
  }

總結(jié):

cache和persist的注意事項:
1.cache和persist都是懶執(zhí)行,必須有一個action類算子觸發(fā)執(zhí)行。
2.cache和persist算子的返回值可以賦值給一個變量,在其他job中直接使用這個變量就是使用持久化的數(shù)據(jù)了。持久化的單位是partition。
3.cache和persist算子后不能立即緊跟action算子。
4.cache和persist算子持久化的數(shù)據(jù)當applilcation執(zhí)行完成之后會被清除。
錯誤:rdd.cache().count() 返回的不是持久化的RDD,而是一個數(shù)值了。

checkpoint
checkpoint將RDD持久化到磁盤,還可以切斷RDD之間的依賴關系。checkpoint目錄數(shù)據(jù)當application執(zhí)行完之后不會被清除。
checkpoint 的執(zhí)行原理:
1.當RDD的job執(zhí)行完畢后,會從finalRDD從后往前回溯。
2.當回溯到某一個RDD調(diào)用了checkpoint方法,會對當前的RDD做一個標記。
3.Spark框架會自動啟動一個新的job,重新計算這個RDD的數(shù)據(jù),將數(shù)據(jù)持久化到HDFS上。
優(yōu)化:對RDD執(zhí)行checkpoint之前,最好對這個RDD先執(zhí)行cache,這樣新啟動的job只需要將內(nèi)存中的數(shù)據(jù)拷貝到HDFS上就可以,省去了重新計算這一步

到此,相信大家對“spark的RDD、算子、持久化算子分別是什么”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關內(nèi)容可以進入相關頻道進行查詢,關注我們,繼續(xù)學習!


當前文章:spark的RDD、算子、持久化算子分別是什么
本文路徑:http://weahome.cn/article/pooisi.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部