RDD
全稱為 Resilient Distributed Datasets,是 Spark 最基本的數(shù)據(jù)抽象,它是只讀的、分區(qū)記錄的集合,支持并行操作,可以由外部數(shù)據(jù)集或其他 RDD 轉(zhuǎn)換而來,它具有以下特性:
RDD[T]
抽象類的部分相關(guān)代碼如下:
// 由子類實(shí)現(xiàn)以計(jì)算給定分區(qū)
def compute(split: Partition, context: TaskContext): Iterator[T]
// 獲取所有分區(qū)
protected def getPartitions: Array[Partition]
// 獲取所有依賴關(guān)系
protected def getDependencies: Seq[Dependency[_]] = deps
// 獲取優(yōu)先位置列表
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
// 分區(qū)器 由子類重寫以指定它們的分區(qū)方式
@transient val partitioner: Option[Partitioner] = None
RDD 有兩種創(chuàng)建方式,分別介紹如下:
這里使用 spark-shell
進(jìn)行測(cè)試,啟動(dòng)命令如下:
spark-shell --master local[4]
啟動(dòng) spark-shell
后,程序會(huì)自動(dòng)創(chuàng)建應(yīng)用上下文,相當(dāng)于執(zhí)行了下面的 Scala 語句:
val conf = new SparkConf().setAppName("Spark shell").setMaster("local[4]")
val sc = new SparkContext(conf)
由現(xiàn)有集合創(chuàng)建 RDD,你可以在創(chuàng)建時(shí)指定其分區(qū)個(gè)數(shù),如果沒有指定,則采用程序所分配到的 CPU 的核心數(shù):
val data = Array(1, 2, 3, 4, 5)
// 由現(xiàn)有集合創(chuàng)建 RDD,默認(rèn)分區(qū)數(shù)為程序所分配到的 CPU 的核心數(shù)
val dataRDD = sc.parallelize(data)
// 查看分區(qū)數(shù)
dataRDD.getNumPartitions
// 明確指定分區(qū)數(shù)
val dataRDD = sc.parallelize(data,2)
執(zhí)行結(jié)果如下:
引用外部存儲(chǔ)系統(tǒng)中的數(shù)據(jù)集,例如本地文件系統(tǒng),HDFS,HBase 或支持 Hadoop InputFormat 的任何數(shù)據(jù)源。
val fileRDD = sc.textFile("/usr/file/emp.txt")
// 獲取第一行文本
fileRDD.take(1)
使用外部存儲(chǔ)系統(tǒng)時(shí)需要注意以下兩點(diǎn):
兩者都可以用來讀取外部文件,但是返回格式是不同的:
RDD[String]
,返回的是就是文件內(nèi)容,RDD 中每一個(gè)元素對(duì)應(yīng)一行數(shù)據(jù);RDD[(String, String)]
,元組中第一個(gè)參數(shù)是文件路徑,第二個(gè)參數(shù)是文件內(nèi)容;def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {...}
def wholeTextFiles(path: String,minPartitions: Int = defaultMinPartitions): RDD[(String, String)]={..}
RDD 支持兩種類型的操作:transformations(轉(zhuǎn)換,從現(xiàn)有數(shù)據(jù)集創(chuàng)建新數(shù)據(jù)集)和 actions(在數(shù)據(jù)集上運(yùn)行計(jì)算后將值返回到驅(qū)動(dòng)程序)。RDD 中的所有轉(zhuǎn)換操作都是惰性的,它們只是記住這些轉(zhuǎn)換操作,但不會(huì)立即執(zhí)行,只有遇到 action 操作后才會(huì)真正的進(jìn)行計(jì)算,這類似于函數(shù)式編程中的惰性求值。
val list = List(1, 2, 3)
// map 是一個(gè) transformations 操作,而 foreach 是一個(gè) actions 操作
sc.parallelize(list).map(_ * 10).foreach(println)
// 輸出: 10 20 30
Spark 速度非??斓囊粋€(gè)原因是 RDD 支持緩存。成功緩存后,如果之后的操作使用到了該數(shù)據(jù)集,則直接從緩存中獲取。雖然緩存也有丟失的風(fēng)險(xiǎn),但是由于 RDD 之間的依賴關(guān)系,如果某個(gè)分區(qū)的緩存數(shù)據(jù)丟失,只需要重新計(jì)算該分區(qū)即可。
Spark 支持多種緩存級(jí)別 :
Storage Level (存儲(chǔ)級(jí)別) |
Meaning(含義) |
---|---|
MEMORY_ONLY | 默認(rèn)的緩存級(jí)別,將 RDD 以反序列化的 Java 對(duì)象的形式存儲(chǔ)在 JVM 中。如果內(nèi)存空間不夠,則部分分區(qū)數(shù)據(jù)將不再緩存。 |
MEMORY_AND_DISK | 將 RDD 以反序列化的 Java 對(duì)象的形式存儲(chǔ) JVM 中。如果內(nèi)存空間不夠,將未緩存的分區(qū)數(shù)據(jù)存儲(chǔ)到磁盤,在需要使用這些分區(qū)時(shí)從磁盤讀取。 |
MEMORY_ONLY_SER | 將 RDD 以序列化的 Java 對(duì)象的形式進(jìn)行存儲(chǔ)(每個(gè)分區(qū)為一個(gè) byte 數(shù)組)。這種方式比反序列化對(duì)象節(jié)省存儲(chǔ)空間,但在讀取時(shí)會(huì)增加 CPU 的計(jì)算負(fù)擔(dān)。僅支持 Java 和 Scala 。 |
MEMORY_AND_DISK_SER | 類似于 MEMORY_ONLY_SER ,但是溢出的分區(qū)數(shù)據(jù)會(huì)存儲(chǔ)到磁盤,而不是在用到它們時(shí)重新計(jì)算。僅支持 Java 和 Scala。 |
DISK_ONLY | 只在磁盤上緩存 RDD |
MEMORY_ONLY_2 , MEMORY_AND_DISK_2 , etc | 與上面的對(duì)應(yīng)級(jí)別功能相同,但是會(huì)為每個(gè)分區(qū)在集群中的兩個(gè)節(jié)點(diǎn)上建立副本。 |
OFF_HEAP | 與 MEMORY_ONLY_SER 類似,但將數(shù)據(jù)存儲(chǔ)在堆外內(nèi)存中。這需要啟用堆外內(nèi)存。 |
啟動(dòng)堆外內(nèi)存需要配置兩個(gè)參數(shù):
- spark.memory.offHeap.enabled :是否開啟堆外內(nèi)存,默認(rèn)值為 false,需要設(shè)置為 true;
- spark.memory.offHeap.size : 堆外內(nèi)存空間的大小,默認(rèn)值為 0,需要設(shè)置為正值。
緩存數(shù)據(jù)的方法有兩個(gè):persist
和 cache
。cache
內(nèi)部調(diào)用的也是 persist
,它是 persist
的特殊化形式,等價(jià)于 persist(StorageLevel.MEMORY_ONLY)
。示例如下:
// 所有存儲(chǔ)級(jí)別均定義在 StorageLevel 對(duì)象中
fileRDD.persist(StorageLevel.MEMORY_AND_DISK)
fileRDD.cache()
Spark 會(huì)自動(dòng)監(jiān)視每個(gè)節(jié)點(diǎn)上的緩存使用情況,并按照最近最少使用(LRU)的規(guī)則刪除舊數(shù)據(jù)分區(qū)。當(dāng)然,你也可以使用 RDD.unpersist()
方法進(jìn)行手動(dòng)刪除。
在 Spark 中,一個(gè)任務(wù)對(duì)應(yīng)一個(gè)分區(qū),通常不會(huì)跨分區(qū)操作數(shù)據(jù)。但如果遇到 reduceByKey
等操作,Spark 必須從所有分區(qū)讀取數(shù)據(jù),并查找所有鍵的所有值,然后匯總在一起以計(jì)算每個(gè)鍵的最終結(jié)果 ,這稱為 Shuffle
。
Shuffle 是一項(xiàng)昂貴的操作,因?yàn)樗ǔ?huì)跨節(jié)點(diǎn)操作數(shù)據(jù),這會(huì)涉及磁盤 I/O,網(wǎng)絡(luò) I/O,和數(shù)據(jù)序列化。某些 Shuffle 操作還會(huì)消耗大量的堆內(nèi)存,因?yàn)樗鼈兪褂枚褍?nèi)存來臨時(shí)存儲(chǔ)需要網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)。Shuffle 還會(huì)在磁盤上生成大量中間文件,從 Spark 1.3 開始,這些文件將被保留,直到相應(yīng)的 RDD 不再使用并進(jìn)行垃圾回收,這樣做是為了避免在計(jì)算時(shí)重復(fù)創(chuàng)建 Shuffle 文件。如果應(yīng)用程序長(zhǎng)期保留對(duì)這些 RDD 的引用,則垃圾回收可能在很長(zhǎng)一段時(shí)間后才會(huì)發(fā)生,這意味著長(zhǎng)時(shí)間運(yùn)行的 Spark 作業(yè)可能會(huì)占用大量磁盤空間,通??梢允褂?spark.local.dir
參數(shù)來指定這些臨時(shí)文件的存儲(chǔ)目錄。
由于 Shuffle 操作對(duì)性能的影響比較大,所以需要特別注意使用,以下操作都會(huì)導(dǎo)致 Shuffle:
repartition
和 coalesce
;groupByKey
和 reduceByKey
,但 countByKey
除外;cogroup
和 join
。RDD 和它的父 RDD(s) 之間的依賴關(guān)系分為兩種不同的類型:
如下圖,每一個(gè)方框表示一個(gè) RDD,帶有顏色的矩形表示分區(qū):
區(qū)分這兩種依賴是非常有用的:
RDD(s) 及其之間的依賴關(guān)系組成了 DAG(有向無環(huán)圖),DAG 定義了這些 RDD(s) 之間的 Lineage(血統(tǒng)) 關(guān)系,通過血統(tǒng)關(guān)系,如果一個(gè) RDD 的部分或者全部計(jì)算結(jié)果丟失了,也可以重新進(jìn)行計(jì)算。那么 Spark 是如何根據(jù) DAG 來生成計(jì)算任務(wù)呢?主要是根據(jù)依賴關(guān)系的不同將 DAG 劃分為不同的計(jì)算階段 (Stage):
更多大數(shù)據(jù)系列文章可以參見 GitHub 開源項(xiàng)目: 大數(shù)據(jù)入門指南
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。