真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

大數(shù)據(jù)開發(fā)中Spark-RDD的持久化和緩存該如何實現(xiàn)

大數(shù)據(jù)開發(fā)中Spark-RDD的持久化和緩存該如何實現(xiàn),相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。

創(chuàng)新互聯(lián)公司專注于企業(yè)網(wǎng)絡(luò)營銷推廣、網(wǎng)站重做改版、湄潭網(wǎng)站定制設(shè)計、自適應(yīng)品牌網(wǎng)站建設(shè)、H5建站、商城建設(shè)、集團(tuán)公司官網(wǎng)建設(shè)、成都外貿(mào)網(wǎng)站制作、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁設(shè)計等建站業(yè)務(wù),價格優(yōu)惠性價比高,為湄潭等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。

1.RDD緩存機制 cache, persist

Spark 速度非??斓囊粋€原因是 RDD 支持緩存。成功緩存后,如果之后的操作使用到了該數(shù)據(jù)集,則直接從緩存中獲取。雖然緩存也有丟失的風(fēng)險,但是由于 RDD 之間的依賴關(guān)系,如果某個分區(qū)的緩存數(shù)據(jù)丟失,只需要重新計算該分區(qū)即可。

涉及到的算子:persist、cache、unpersist;都是 Transformation

緩存是將計算結(jié)果寫入不同的介質(zhì),用戶定義可定義存儲級別(存儲級別定義了緩存存儲的介質(zhì),目前支持內(nèi)存、堆 外內(nèi)存、磁盤);

通過緩存,Spark避免了RDD上的重復(fù)計算,能夠極大地提升計算速度; RDD持久化或緩存,是Spark最重要的特征之一??梢哉f,緩存是Spark構(gòu)建迭代式算法和快速交互式查詢的關(guān)鍵因 素;

Spark速度非??斓脑蛑?,就是在內(nèi)存中持久化(或緩存)一個數(shù)據(jù)集。當(dāng)持久化一個RDD后,每一個節(jié)點都將 把計算的分片結(jié)果保存在內(nèi)存中,并在對此數(shù)據(jù)集(或者衍生出的數(shù)據(jù)集)進(jìn)行的其他動作(Action)中重用。這使 得后續(xù)的動作變得更加迅速;使用persist()方法對一個RDD標(biāo)記為持久化。之所以說“標(biāo)記為持久化”,是因為出現(xiàn)persist()語句的地方,并不會馬 上計算生成RDD并把它持久化,而是要等到遇到第一個行動操作觸發(fā)真正計算以后,才會把計算結(jié)果進(jìn)行持久化;通過persist()或cache()方法可以標(biāo)記一個要被持久化的RDD,持久化被觸發(fā),RDD將會被保留在計算節(jié)點的內(nèi)存中 并重用;

什么時候緩存數(shù)據(jù),需要對空間和速度進(jìn)行權(quán)衡。一般情況下,如果多個動作需要用到某個 RDD,而它的計算代價 又很高,那么就應(yīng)該把這個 RDD 緩存起來;

緩存有可能丟失,或者存儲于內(nèi)存的數(shù)據(jù)由于內(nèi)存不足而被刪除。RDD的緩存的容錯機制保證了即使緩存丟失也能保 證計算的正確執(zhí)行。通過基于RDD的一系列的轉(zhuǎn)換,丟失的數(shù)據(jù)會被重算。RDD的各個Partition是相對獨立的,因此 只需要計算丟失的部分即可,并不需要重算全部Partition。

啟動堆外內(nèi)存需要配置兩個參數(shù):

  • spark.memory.offHeap.enabled:是否開啟堆外內(nèi)存,默認(rèn)值為 false,需要設(shè)置為 true;

  • spark.memory.offHeap.size: 堆外內(nèi)存空間的大小,默認(rèn)值為 0,需要設(shè)置為正值。

1.1 緩存級別

Spark 速度非??斓囊粋€原因是 RDD 支持緩存。成功緩存后,如果之后的操作使用到了該數(shù)據(jù)集,則直接從緩存中獲取。雖然緩存也有丟失的風(fēng)險,但是由于 RDD 之間的依賴關(guān)系,如果某個分區(qū)的緩存數(shù)據(jù)丟失,只需要重新計算該分區(qū)即可。

大數(shù)據(jù)開發(fā)中Spark-RDD的持久化和緩存該如何實現(xiàn)

大數(shù)據(jù)開發(fā)中Spark-RDD的持久化和緩存該如何實現(xiàn)

Spark 支持多種緩存級別 :

Storage Level(存儲級別)Meaning(含義)
MEMORY_ONLY默認(rèn)的緩存級別,將 RDD 以反序列化的 Java 對象的形式存儲在 JVM 中。如果內(nèi)存空間不夠,則部分分區(qū)數(shù)據(jù)將不再緩存。
MEMORY_AND_DISK將 RDD 以反序列化的 Java 對象的形式存儲 JVM 中。如果內(nèi)存空間不夠,將未緩存的分區(qū)數(shù)據(jù)存儲到磁盤,在需要使用這些分區(qū)時從磁盤讀取。
MEMORY_ONLY_SER將 RDD 以序列化的 Java 對象的形式進(jìn)行存儲(每個分區(qū)為一個 byte 數(shù)組)。這種方式比反序列化對象節(jié)省存儲空間,但在讀取時會增加 CPU 的計算負(fù)擔(dān)。僅支持 Java 和 Scala 。
MEMORY_AND_DISK_SER類似于 MEMORY_ONLY_SER,但是溢出的分區(qū)數(shù)據(jù)會存儲到磁盤,而不是在用到它們時重新計算。僅支持 Java 和 Scala。
DISK_ONLY只在磁盤上緩存 RDD
MEMORY_ONLY_2, MEMORY_AND_DISK_2與上面的對應(yīng)級別功能相同,但是會為每個分區(qū)在集群中的兩個節(jié)點上建立副本。
OFF_HEAPMEMORY_ONLY_SER 類似,但將數(shù)據(jù)存儲在堆外內(nèi)存中。這需要啟用堆外內(nèi)存。

啟動堆外內(nèi)存需要配置兩個參數(shù):

  • spark.memory.offHeap.enabled:是否開啟堆外內(nèi)存,默認(rèn)值為 false,需要設(shè)置為 true;

  • spark.memory.offHeap.size: 堆外內(nèi)存空間的大小,默認(rèn)值為 0,需要設(shè)置為正值。

1.2 使用緩存

緩存數(shù)據(jù)的方法有兩個:persistcache 。cache 內(nèi)部調(diào)用的也是 persist,它是 persist 的特殊化形式,等價于 persist(StorageLevel.MEMORY_ONLY)。示例如下:

// 所有存儲級別均定義在 StorageLevel 對象中
fileRDD.persist(StorageLevel.MEMORY_AND_DISK)
fileRDD.cache()

被緩存的RDD在DAG圖中有一個綠色的圓點。

大數(shù)據(jù)開發(fā)中Spark-RDD的持久化和緩存該如何實現(xiàn)

1.3 移除緩存

Spark 會自動監(jiān)視每個節(jié)點上的緩存使用情況,并按照最近最少使用(LRU)的規(guī)則刪除舊數(shù)據(jù)分區(qū)。當(dāng)然,你也可以使用 RDD.unpersist() 方法進(jìn)行手動刪除。

2.RDD容錯機制Checkpoint

2.1 涉及到的算子:checkpoint;也是 Transformation

Spark中對于數(shù)據(jù)的保存除了持久化操作之外,還提供了檢查點的機制;檢查點本質(zhì)是通過將RDD寫入高可靠的磁盤,主要目的是為了容錯。檢查點通過將數(shù)據(jù)寫入到HDFS文件系統(tǒng)實現(xiàn)了

RDD的檢查點功能。Lineage過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果之后有節(jié)點出現(xiàn)問題而丟失分區(qū),從

做檢查點的RDD開始重做Lineage,就會減少開銷。

2.2 cache 和 checkpoint 區(qū)別

cache 和 checkpoint 是有顯著區(qū)別的,緩存把 RDD 計算出來然后放在內(nèi)存中,但是 RDD 的依賴鏈不能丟掉, 當(dāng)某個點某個 executor 宕了,上面 cache 的RDD就會丟掉, 需要通過依賴鏈重放計算。不同的是,checkpoint 是把

RDD 保存在 HDFS中,是多副本可靠存儲,此時依賴鏈可以丟掉,所以斬斷了依賴鏈。

2.3 checkpoint適合場景

以下場景適合使用檢查點機制:

  1. DAG中的Lineage過長,如果重算,則開銷太大

  2. 在寬依賴上做 Checkpoint 獲得的收益更大

與cache類似 checkpoint 也是 lazy 的。

val rdd1 = sc.parallelize(1 to 100000)
// 設(shè)置檢查點目錄

sc.setCheckpointDir("/tmp/checkpoint")

val rdd2 = rdd1.map(_*2)

rdd2.checkpoint

// checkpoint是lazy操作

rdd2.isCheckpointed

// checkpoint之前的rdd依賴關(guān)系

rdd2.dependencies(0).rdd

rdd2.dependencies(0).rdd.collect

// 執(zhí)行一次action,觸發(fā)checkpoint的執(zhí)行

rdd2.count

rdd2.isCheckpointed

// 再次查看RDD的依賴關(guān)系??梢钥吹絚heckpoint后,RDD的lineage被截斷,變成從checkpointRDD開始

rdd2.dependencies(0).rdd

rdd2.dependencies(0).rdd.collect

//查看RDD所依賴的checkpoint文件

rdd2.getCheckpointFile

看完上述內(nèi)容,你們掌握大數(shù)據(jù)開發(fā)中Spark-RDD的持久化和緩存該如何實現(xiàn)的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!


網(wǎng)站標(biāo)題:大數(shù)據(jù)開發(fā)中Spark-RDD的持久化和緩存該如何實現(xiàn)
URL分享:http://weahome.cn/article/jhogec.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部