真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Spark系列(三)——彈性式數(shù)據(jù)集RDDs

彈性式數(shù)據(jù)集RDDs

一、RDD簡介

RDD 全稱為 Resilient Distributed Datasets,是 Spark 最基本的數(shù)據(jù)抽象,它是只讀的、分區(qū)記錄的集合,支持并行操作,可以由外部數(shù)據(jù)集或其他 RDD 轉(zhuǎn)換而來,它具有以下特性:

松原網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)公司!從網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、自適應網(wǎng)站建設(shè)等網(wǎng)站項目制作,到程序開發(fā),運營維護。創(chuàng)新互聯(lián)公司2013年至今到現(xiàn)在10年的時間,我們擁有了豐富的建站經(jīng)驗和運維經(jīng)驗,來保證我們的工作的順利進行。專注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)公司。

  • 一個 RDD 由一個或者多個分區(qū)(Partitions)組成。對于 RDD 來說,每個分區(qū)會被一個計算任務所處理,用戶可以在創(chuàng)建 RDD 時指定其分區(qū)個數(shù),如果沒有指定,則默認采用程序所分配到的 CPU 的核心數(shù);
  • RDD 擁有一個用于計算分區(qū)的函數(shù) compute;
  • RDD 會保存彼此間的依賴關(guān)系,RDD 的每次轉(zhuǎn)換都會生成一個新的依賴關(guān)系,這種 RDD 之間的依賴關(guān)系就像流水線一樣。在部分分區(qū)數(shù)據(jù)丟失后,可以通過這種依賴關(guān)系重新計算丟失的分區(qū)數(shù)據(jù),而不是對 RDD 的所有分區(qū)進行重新計算;
  • Key-Value 型的 RDD 還擁有 Partitioner(分區(qū)器),用于決定數(shù)據(jù)被存儲在哪個分區(qū)中,目前 Spark 中支持 HashPartitioner(按照哈希分區(qū)) 和 RangeParationer(按照范圍進行分區(qū));
  • 一個優(yōu)先位置列表 (可選),用于存儲每個分區(qū)的優(yōu)先位置 (prefered location)。對于一個 HDFS 文件來說,這個列表保存的就是每個分區(qū)所在的塊的位置,按照“移動數(shù)據(jù)不如移動計算“的理念,Spark 在進行任務調(diào)度的時候,會盡可能的將計算任務分配到其所要處理數(shù)據(jù)塊的存儲位置。

RDD[T] 抽象類的部分相關(guān)代碼如下:

// 由子類實現(xiàn)以計算給定分區(qū)
def compute(split: Partition, context: TaskContext): Iterator[T]

// 獲取所有分區(qū)
protected def getPartitions: Array[Partition]

// 獲取所有依賴關(guān)系
protected def getDependencies: Seq[Dependency[_]] = deps

// 獲取優(yōu)先位置列表
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

// 分區(qū)器 由子類重寫以指定它們的分區(qū)方式
@transient val partitioner: Option[Partitioner] = None

二、創(chuàng)建RDD

RDD 有兩種創(chuàng)建方式,分別介紹如下:

2.1 由現(xiàn)有集合創(chuàng)建

這里使用 spark-shell 進行測試,啟動命令如下:

spark-shell --master local[4]

啟動 spark-shell 后,程序會自動創(chuàng)建應用上下文,相當于執(zhí)行了下面的 Scala 語句:

val conf = new SparkConf().setAppName("Spark shell").setMaster("local[4]")
val sc = new SparkContext(conf)

由現(xiàn)有集合創(chuàng)建 RDD,你可以在創(chuàng)建時指定其分區(qū)個數(shù),如果沒有指定,則采用程序所分配到的 CPU 的核心數(shù):

val data = Array(1, 2, 3, 4, 5)
// 由現(xiàn)有集合創(chuàng)建 RDD,默認分區(qū)數(shù)為程序所分配到的 CPU 的核心數(shù)
val dataRDD = sc.parallelize(data) 
// 查看分區(qū)數(shù)
dataRDD.getNumPartitions
// 明確指定分區(qū)數(shù)
val dataRDD = sc.parallelize(data,2)


2.2 引用外部存儲系統(tǒng)中的數(shù)據(jù)集

引用外部存儲系統(tǒng)中的數(shù)據(jù)集,例如本地文件系統(tǒng),HDFS,HBase 或支持 Hadoop InputFormat 的任何數(shù)據(jù)源。

val fileRDD = sc.textFile("/usr/file/emp.txt")
// 獲取第一行文本
fileRDD.take(1)

使用外部存儲系統(tǒng)時需要注意以下兩點:

  • 如果在集群環(huán)境下從本地文件系統(tǒng)讀取數(shù)據(jù),則要求該文件必須在集群中所有機器上都存在,且路徑相同;
  • 支持目錄路徑,支持壓縮文件,支持使用通配符。

2.3 textFile & wholeTextFiles

兩者都可以用來讀取外部文件,但是返回格式是不同的:

  • textFile:其返回格式是 RDD[String] ,返回的是就是文件內(nèi)容,RDD 中每一個元素對應一行數(shù)據(jù);
  • wholeTextFiles:其返回格式是 RDD[(String, String)],元組中第一個參數(shù)是文件路徑,第二個參數(shù)是文件內(nèi)容;
  • 兩者都提供第二個參數(shù)來控制最小分區(qū)數(shù);
  • 從 HDFS 上讀取文件時,Spark 會為每個塊創(chuàng)建一個分區(qū)。
def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {...}
def wholeTextFiles(path: String,minPartitions: Int = defaultMinPartitions): RDD[(String, String)]={..}

三、操作RDD

RDD 支持兩種類型的操作:transformations(轉(zhuǎn)換,從現(xiàn)有數(shù)據(jù)集創(chuàng)建新數(shù)據(jù)集)和 actions(在數(shù)據(jù)集上運行計算后將值返回到驅(qū)動程序)。RDD 中的所有轉(zhuǎn)換操作都是惰性的,它們只是記住這些轉(zhuǎn)換操作,但不會立即執(zhí)行,只有遇到 action 操作后才會真正的進行計算,這類似于函數(shù)式編程中的惰性求值。

val list = List(1, 2, 3)
// map 是一個 transformations 操作,而 foreach 是一個 actions 操作
sc.parallelize(list).map(_ * 10).foreach(println)
// 輸出: 10 20 30

四、緩存RDD

4.1 緩存級別

Spark 速度非??斓囊粋€原因是 RDD 支持緩存。成功緩存后,如果之后的操作使用到了該數(shù)據(jù)集,則直接從緩存中獲取。雖然緩存也有丟失的風險,但是由于 RDD 之間的依賴關(guān)系,如果某個分區(qū)的緩存數(shù)據(jù)丟失,只需要重新計算該分區(qū)即可。

Spark 支持多種緩存級別 :

Storage Level
(存儲級別)
Meaning(含義)
MEMORY_ONLY默認的緩存級別,將 RDD 以反序列化的 Java 對象的形式存儲在 JVM 中。如果內(nèi)存空間不夠,則部分分區(qū)數(shù)據(jù)將不再緩存。
MEMORY_AND_DISK將 RDD 以反序列化的 Java 對象的形式存儲 JVM 中。如果內(nèi)存空間不夠,將未緩存的分區(qū)數(shù)據(jù)存儲到磁盤,在需要使用這些分區(qū)時從磁盤讀取。
MEMORY_ONLY_SER
將 RDD 以序列化的 Java 對象的形式進行存儲(每個分區(qū)為一個 byte 數(shù)組)。這種方式比反序列化對象節(jié)省存儲空間,但在讀取時會增加 CPU 的計算負擔。僅支持 Java 和 Scala 。
MEMORY_AND_DISK_SER
類似于 MEMORY_ONLY_SER,但是溢出的分區(qū)數(shù)據(jù)會存儲到磁盤,而不是在用到它們時重新計算。僅支持 Java 和 Scala。
DISK_ONLY只在磁盤上緩存 RDD
MEMORY_ONLY_2,
MEMORY_AND_DISK_2, etc
與上面的對應級別功能相同,但是會為每個分區(qū)在集群中的兩個節(jié)點上建立副本。
OFF_HEAPMEMORY_ONLY_SER 類似,但將數(shù)據(jù)存儲在堆外內(nèi)存中。這需要啟用堆外內(nèi)存。

啟動堆外內(nèi)存需要配置兩個參數(shù):

  • spark.memory.offHeap.enabled:是否開啟堆外內(nèi)存,默認值為 false,需要設(shè)置為 true;
  • spark.memory.offHeap.size: 堆外內(nèi)存空間的大小,默認值為 0,需要設(shè)置為正值。

4.2 使用緩存

緩存數(shù)據(jù)的方法有兩個:persistcachecache 內(nèi)部調(diào)用的也是 persist,它是 persist 的特殊化形式,等價于 persist(StorageLevel.MEMORY_ONLY)。示例如下:

// 所有存儲級別均定義在 StorageLevel 對象中
fileRDD.persist(StorageLevel.MEMORY_AND_DISK)
fileRDD.cache()

4.3 移除緩存

Spark 會自動監(jiān)視每個節(jié)點上的緩存使用情況,并按照最近最少使用(LRU)的規(guī)則刪除舊數(shù)據(jù)分區(qū)。當然,你也可以使用 RDD.unpersist() 方法進行手動刪除。

五、理解shuffle

5.1 shuffle介紹

在 Spark 中,一個任務對應一個分區(qū),通常不會跨分區(qū)操作數(shù)據(jù)。但如果遇到 reduceByKey 等操作,Spark 必須從所有分區(qū)讀取數(shù)據(jù),并查找所有鍵的所有值,然后匯總在一起以計算每個鍵的最終結(jié)果 ,這稱為 Shuffle。

5.2 Shuffle的影響

Shuffle 是一項昂貴的操作,因為它通常會跨節(jié)點操作數(shù)據(jù),這會涉及磁盤 I/O,網(wǎng)絡(luò) I/O,和數(shù)據(jù)序列化。某些 Shuffle 操作還會消耗大量的堆內(nèi)存,因為它們使用堆內(nèi)存來臨時存儲需要網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)。Shuffle 還會在磁盤上生成大量中間文件,從 Spark 1.3 開始,這些文件將被保留,直到相應的 RDD 不再使用并進行垃圾回收,這樣做是為了避免在計算時重復創(chuàng)建 Shuffle 文件。如果應用程序長期保留對這些 RDD 的引用,則垃圾回收可能在很長一段時間后才會發(fā)生,這意味著長時間運行的 Spark 作業(yè)可能會占用大量磁盤空間,通??梢允褂?spark.local.dir 參數(shù)來指定這些臨時文件的存儲目錄。

5.3 導致Shuffle的操作

由于 Shuffle 操作對性能的影響比較大,所以需要特別注意使用,以下操作都會導致 Shuffle:

  • 涉及到重新分區(qū)操作: 如 repartitioncoalesce;
  • 所有涉及到 ByKey 的操作:如 groupByKeyreduceByKey,但 countByKey 除外;
  • 聯(lián)結(jié)操作:如 cogroupjoin。

五、寬依賴和窄依賴

RDD 和它的父 RDD(s) 之間的依賴關(guān)系分為兩種不同的類型:

  • 窄依賴 (narrow dependency):父 RDDs 的一個分區(qū)最多被子 RDDs 一個分區(qū)所依賴;
  • 寬依賴 (wide dependency):父 RDDs 的一個分區(qū)可以被子 RDDs 的多個子分區(qū)所依賴。

如下圖,每一個方框表示一個 RDD,帶有顏色的矩形表示分區(qū):

區(qū)分這兩種依賴是非常有用的:

  • 首先,窄依賴允許在一個集群節(jié)點上以流水線的方式(pipeline)對父分區(qū)數(shù)據(jù)進行計算,例如先執(zhí)行 map 操作,然后執(zhí)行 filter 操作。而寬依賴則需要計算好所有父分區(qū)的數(shù)據(jù),然后再在節(jié)點之間進行 Shuffle,這與 MapReduce 類似。
  • 窄依賴能夠更有效地進行數(shù)據(jù)恢復,因為只需重新對丟失分區(qū)的父分區(qū)進行計算,且不同節(jié)點之間可以并行計算;而對于寬依賴而言,如果數(shù)據(jù)丟失,則需要對所有父分區(qū)數(shù)據(jù)進行計算并再次 Shuffle。

六、DAG的生成

RDD(s) 及其之間的依賴關(guān)系組成了 DAG(有向無環(huán)圖),DAG 定義了這些 RDD(s) 之間的 Lineage(血統(tǒng)) 關(guān)系,通過血統(tǒng)關(guān)系,如果一個 RDD 的部分或者全部計算結(jié)果丟失了,也可以重新進行計算。那么 Spark 是如何根據(jù) DAG 來生成計算任務呢?主要是根據(jù)依賴關(guān)系的不同將 DAG 劃分為不同的計算階段 (Stage):

  • 對于窄依賴,由于分區(qū)的依賴關(guān)系是確定的,其轉(zhuǎn)換操作可以在同一個線程執(zhí)行,所以可以劃分到同一個執(zhí)行階段;
  • 對于寬依賴,由于 Shuffle 的存在,只能在父 RDD(s) 被 Shuffle 處理完成后,才能開始接下來的計算,因此遇到寬依賴就需要重新劃分階段。

參考資料

  1. 張安站 . Spark 技術(shù)內(nèi)幕:深入解析 Spark 內(nèi)核架構(gòu)設(shè)計與實現(xiàn)原理[M] . 機械工業(yè)出版社 . 2015-09-01
  2. RDD Programming Guide
  3. RDD:基于內(nèi)存的集群計算容錯抽象

更多大數(shù)據(jù)系列文章可以參見 GitHub 開源項目大數(shù)據(jù)入門指南


網(wǎng)站題目:Spark系列(三)——彈性式數(shù)據(jù)集RDDs
文章來源:http://weahome.cn/article/ijecpe.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部