真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

5.sparkcore之RDD編程

??spark提供了對數(shù)據(jù)的核心抽象——彈性分布式數(shù)據(jù)集(Resilient Distributed Dataset,簡稱RDD)。RDD是一個(gè)分布式的數(shù)據(jù)集合,數(shù)據(jù)可以跨越集群中的多個(gè)機(jī)器節(jié)點(diǎn),被分區(qū)并行執(zhí)行。
?在spark中,對數(shù)據(jù)的所有操作不外乎創(chuàng)建RDD、轉(zhuǎn)化已有RDD及調(diào)用RDD操作進(jìn)行求值。spark會自動(dòng)地將RDD中的數(shù)據(jù)分發(fā)到集群中并行執(zhí)行。

專注于為中小企業(yè)提供網(wǎng)站建設(shè)、成都網(wǎng)站設(shè)計(jì)服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)金牛免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動(dòng)了上千多家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。

五大特性

  • a list of partitions
    ?RDD是一個(gè)由多個(gè)partition(某個(gè)節(jié)點(diǎn)里的某一片連續(xù)的數(shù)據(jù))組成的的list;將數(shù)據(jù)加載為RDD時(shí),一般會遵循數(shù)據(jù)的本地性(一般一個(gè)hdfs里的block會加載為一個(gè)partition)。
  • a function for computing each split
    ?RDD的每個(gè)partition中都會有function,即函數(shù)應(yīng)用,其作用是實(shí)現(xiàn)RDD之間partition的轉(zhuǎn)換。
  • a list of dependencies on other RDDs
    ?RDD會記錄它的依賴,為了容錯(cuò)(重算,cache,checkpoint),即內(nèi)存中的RDD操作出錯(cuò)或丟失時(shí)會進(jìn)行重算。
  • Optionally,a Partitioner for Key-value RDDs
    ?可選項(xiàng),如果RDD里面存的數(shù)據(jù)是key-value形式,則可以傳遞一個(gè)自定義的Partitioner進(jìn)行重新分區(qū),例如自定義的Partitioner是基于key進(jìn)行分區(qū),那則會將不同RDD里面的相同key的數(shù)據(jù)放到同一個(gè)partition里面。
  • Optionally, a list of preferred locations to compute each split on
    ?可選項(xiàng),最優(yōu)的位置去計(jì)算每個(gè)分片,即數(shù)據(jù)的本地性。

    創(chuàng)建RDD

    ??spark提供了兩種創(chuàng)建RDD的方式:讀取外部數(shù)據(jù)源、將驅(qū)動(dòng)器程序中的集合進(jìn)行并行化。

    并行化集合

    ??使用sparkContext的parallelize()方法將集合并行化。
    ?parallelize()方法第二個(gè)參數(shù)可指定分區(qū)數(shù)。spark會為每個(gè)分區(qū)創(chuàng)建一個(gè)task任務(wù),通常每個(gè)cpu需要2-4個(gè)分區(qū)。spark會自動(dòng)地根據(jù)集群大小設(shè)置分區(qū)數(shù),也支持通過parallelize()方法的第二個(gè)參數(shù)手動(dòng)指定。

    scala
    val data = Array(1, 2, 3, 4, 5)
    val distData = sc.parallelize(data)
    java
    List data = Arrays.asList(1, 2, 3, 4, 5);
    JavaRDD distData = sc.parallelize(data);
    python
    data = [1, 2, 3, 4, 5]
    distData = sc.parallelize(data)

    ??注:除了開發(fā)和測試外,這種方式用得不多。這種方式需要把整個(gè)數(shù)據(jù)集先放到一臺機(jī)器的內(nèi)存中。

    讀取外部數(shù)據(jù)源

    ??spark可接入多種hadoop支持的數(shù)據(jù)源來創(chuàng)建分布式數(shù)據(jù)集。包括:本地文件系統(tǒng)、HDFS、Cassandra、HBase、Amazon S3等。
    ?spark支持多種存儲格式,包括textFiles、SequenceFiles及其他hadoop存儲格式。

    scala

    scala> val distFile = sc.textFile("data.txt")
    distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at :26

    java

    JavaRDD distFile = sc.textFile("data.txt");

    python

    >>> distFile = sc.textFile("data.txt")

RDD操作

??RDD支持兩種操作:轉(zhuǎn)化操作和行動(dòng)操作。
5.spark core之RDD編程

轉(zhuǎn)化操作

??RDD的轉(zhuǎn)化操作會返回一個(gè)新的RDD。轉(zhuǎn)化操作是惰性求值的,只有行動(dòng)操作用到轉(zhuǎn)化操作生成的RDD時(shí),才會真正進(jìn)行轉(zhuǎn)化。
5.spark core之RDD編程
?spark使用lineage(血統(tǒng))來記錄轉(zhuǎn)化操作生成的不同RDD之間的依賴關(guān)系。依賴分為窄依賴(narrow dependencies)和寬依賴(wide dependencies)。

  • 窄依賴
    • 子RDD的每個(gè)分區(qū)依賴于常數(shù)個(gè)父分區(qū)
    • 輸入輸出一對一,結(jié)果RDD的分區(qū)結(jié)構(gòu)不變,主要是map、flatMap
    • 輸入輸出一對一,但結(jié)果RDD的分區(qū)結(jié)構(gòu)發(fā)生變化,如union、coalesce
    • 從輸入中選擇部分元素的算子,如filter、distinct、subtract、sample
  • 寬依賴

    • 子RDD的每個(gè)分區(qū)依賴于所有父RDD分區(qū)
    • 對單個(gè)RDD基于key進(jìn)行重組和reduce,如groupByKey、reduceByKey
    • 對兩個(gè)RDD基于key進(jìn)行合并和重組,如join
      5.spark core之RDD編程

      行動(dòng)操作

      ??行動(dòng)操作則會向驅(qū)動(dòng)器程序返回結(jié)果或把結(jié)果寫入外部系統(tǒng),會觸發(fā)實(shí)際的計(jì)算。
      5.spark core之RDD編程

      緩存方式

      ??RDD通過persist方法或cache方法可以將前面的計(jì)算結(jié)果緩存,但是并不是這兩個(gè)方法被調(diào)用時(shí)立即緩存,而是觸發(fā)后面的action時(shí),該RDD將會被緩存在計(jì)算節(jié)點(diǎn)的內(nèi)存中,并供后面重用。
      ?cache最終也是調(diào)用了persist方法,默認(rèn)的存儲級別是僅在內(nèi)存存儲一份。
      5.spark core之RDD編程
      ?Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。
      5.spark core之RDD編程
      ?緩存有可能丟失,RDD的緩存容錯(cuò)機(jī)制保證即使緩存丟失也能保證計(jì)算正確執(zhí)行。通過基于RDD的一系列轉(zhuǎn)換,丟失的數(shù)據(jù)會被重算,由于RDD的各個(gè)Partition是相對獨(dú)立的,因此只需要計(jì)算丟失的部分即可,并不需要重算全部Partition。

      容錯(cuò)機(jī)制

      • Lineage機(jī)制

        • RDD的Lineage記錄的是粗粒度的特定數(shù)據(jù)Transformation操作行為。當(dāng)RDD的部分分區(qū)數(shù)據(jù)丟失時(shí),可以通過Lineage來重新運(yùn)算和恢復(fù)丟失的數(shù)據(jù)分區(qū)。這種粗顆粒的數(shù)據(jù)模型,限制了Spark的運(yùn)用場合,所以Spark并不適用于所有高性能要求的場景,但同時(shí)相比細(xì)顆粒度的數(shù)據(jù)模型,也帶來了性能的提升。

        • Spark Lineage機(jī)制是通過RDD的依賴關(guān)系來執(zhí)行的

          • 窄依賴可以在某個(gè)計(jì)算節(jié)點(diǎn)上直接通過計(jì)算父RDD的某塊數(shù)據(jù)計(jì)算得到子RDD對應(yīng)的某塊數(shù)據(jù)。

          • 寬依賴則要等到父RDD所有數(shù)據(jù)都計(jì)算完成后,將父RDD的計(jì)算結(jié)果進(jìn)行hash并傳到對應(yīng)節(jié)點(diǎn)上之后才能計(jì)算子RDD。寬依賴要將祖先RDD中的所有數(shù)據(jù)塊全部重新計(jì)算,所以在長“血統(tǒng)”鏈特別是有寬依賴的時(shí)候,需要在適當(dāng)?shù)臅r(shí)機(jī)設(shè)置數(shù)據(jù)檢查點(diǎn)。
      • Checkpoint機(jī)制

        • 簡介

          • 當(dāng)RDD的action算子觸發(fā)計(jì)算結(jié)束后會執(zhí)行checkpoint;Task計(jì)算失敗的時(shí)候會從checkpoint讀取數(shù)據(jù)進(jìn)行計(jì)算。
        • 實(shí)現(xiàn)方式(checkpoint有兩種實(shí)現(xiàn)方式,如果代碼中沒有設(shè)置checkpoint,則使用local的checkpoint模式,如果設(shè)置路徑,則使用reliable的checkpoint模式。)

          • LocalRDDCheckpointData:臨時(shí)存儲在本地executor的磁盤和內(nèi)存上。該實(shí)現(xiàn)的特點(diǎn)是比較快,適合lineage信息需要經(jīng)常被刪除的場景(如GraphX),可容忍executor掛掉。

          • ReliableRDDCheckpointData:存儲在外部可靠存儲(如hdfs),可以達(dá)到容忍driver 掛掉情況。雖然效率沒有存儲本地高,但是容錯(cuò)級別最好。

忠于技術(shù),熱愛分享。歡迎關(guān)注公眾號:java大數(shù)據(jù)編程,了解更多技術(shù)內(nèi)容。

5.spark core之RDD編程


網(wǎng)頁標(biāo)題:5.sparkcore之RDD編程
標(biāo)題URL:http://weahome.cn/article/jpjcid.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部