RDD的依賴關系分為兩種:窄依賴(Narrow Dependencies)與寬依賴(Wide Dependencies,源碼中稱為Shuffle Dependencies)
依賴有2個作用,其一用來解決數據容錯的高效性;其二用來劃分stage。
窄依賴:每個父RDD的一個Partition最多被子RDD的一個Partition所使用(1:1 或 n:1)。例如map、filter、union等操作都會產生窄依賴;
子RDD分區(qū)通常對應常數個父RDD分區(qū)(O(1),與數據規(guī)模無關。
寬依賴:一個父RDD的Partition會被多個子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作都會產生寬依賴;(1:m 或 n:m)
(子RDD分區(qū)通常對應所有的父RDD分區(qū)(O(n),與數據規(guī)模有關)
相比于寬依賴,窄依賴對優(yōu)化很有利 ,主要基于以下兩點:
1、寬依賴往往對應著shuffle操作,需要在運行過程中將同一個父RDD的分區(qū)傳入到不同的子RDD分區(qū)中,中間可能涉及多個節(jié)點之間的數據傳輸;而窄依賴的每個父RDD的分區(qū)只會傳入到一個子RDD分區(qū)中,通常可以在一個節(jié)點內完成轉換。
2、當RDD分區(qū)丟失時(某個節(jié)點故障),spark會對數據進行重算。
? 對于窄依賴,由于父RDD的一個分區(qū)只對應一個子RDD分區(qū),這樣只需要重算和子RDD分區(qū)對應的父RDD分區(qū)即可,所以這個重算對數據的利用率是100%的;
? 對于寬依賴,重算的父RDD分區(qū)對應多個子RDD分區(qū),這樣實際上父RDD 中只有一部分的數據是被用于恢復這個丟失的子RDD分區(qū)的,另一部分對應子RDD的其它未丟失分區(qū),這就造成了多余的計算;更一般的,寬依賴中子RDD分區(qū)通常來自多個父RDD分區(qū),極端情況下,所有的父RDD分區(qū)都要進行重新計算。
? 如下圖所示,b1分區(qū)丟失,則需要重新計算a1,a2和a3,這就產生了冗余計算(a1,a2,a3中對應b2的數據)。
區(qū)分這兩種依賴很有用。首先,窄依賴允許在一個集群節(jié)點上以流水線的方式(pipeline)計算所有父分區(qū)。例如,逐個元素地執(zhí)行map、然后filter操作;而寬依賴則需要首先計算好所有父分區(qū)數據,然后在節(jié)點之間進行Shuffle,這與MapReduce類似。第二,窄依賴能夠更有效地進行失效節(jié)點的恢復,即只需重新計算丟失RDD分區(qū)的父分區(qū),而且不同節(jié)點之間可以并行計算;而對于一個寬依賴關系的Lineage圖,單個節(jié)點失效可能導致這個RDD的所有祖先丟失部分分區(qū),因而需要整體重新計算。
【誤解】之前一直理解錯了,以為窄依賴中每個子RDD可能對應多個父RDD,當子RDD丟失時會導致多個父RDD進行重新計算,所以窄依賴不如寬依賴有優(yōu)勢。而實際上應該深入到分區(qū)級別去看待這個問題,而且重算的效用也不在于算的多少,而在于有多少是冗余的計算。窄依賴中需要重算的都是必須的,所以重算不冗余。
窄依賴的函數有:map、filter、union、join(父RDD是hash-partitioned )、mapPartitions、mapValues
寬依賴的函數有:groupByKey、join(父RDD不是hash-partitioned )、partitionBy
依賴的繼承關系:
val rdd1 = sc.parallelize(1 to 10, 1)
val rdd2 = sc.parallelize(11 to 20, 1)
val rdd3 = rdd1.union(rdd2)
rdd3.dependencies.size
// 長度為2,值為rdd1、rdd2,意為rdd3依賴rdd1、rdd2
rdd3.dependencies
// 結果:
rdd3.dependencies(0).rdd.collect
// 打印rdd1的數據
rdd3.dependencies(1).rdd.collect
// 打印rdd2的數據
rdd3.dependencies(3).rdd.collect
// 數組越界,報錯
哪些RDD Actions對應shuffleDependency?下面的join(r5)好像就沒有shuffleDependency
val r1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"))
val r2 = r1.keyBy(_.length)
val r3 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"))
val r4 = r3.keyBy(_.length)
val r5 = r2.join(r4)
回答:join不一定會有shuffleDependency,上面的操作中就沒有。
redueceByKey會產生shuffleDependency。
注意上面操作中的keyBy,和我的想象不太一樣。要注意一下。
keyBy:與map操作較為類似,給每個元素增加了一個key
以下這個例子有點意思:
val r1 = sc.textFile("hdfs:///user/hadoop/data/block_test1.csv")
r1
val r2 = r1.dependencies(0).rdd
r2.partitions.size
r2.preferredLocations(r2.partitions(0))
r2.preferredLocations(r2.partitions(3))
有意思的地方在于(查找依賴、優(yōu)先位置):
1、r1的類型為MapPartitionsRDD
2、r1依賴于r2,如果沒有這個賦值語句是看不出來的。r2的類型為:HadoopRDD
3、可以檢索r2各個分區(qū)的位置,該hdfs文件系統(tǒng)的副本數設置為2
一般來說,分布式數據集的容錯性有兩種方式:數據檢查點和記錄數據的更新(CheckPoint Data,和Logging The Updates)。
面向大規(guī)模數據分析,數據檢查點操作成本很高,需要通過數據中心的網絡連接在機器之間復制龐大的數據集,而網絡帶寬往往比內存帶寬低得多,同時還需要消耗更多的存儲資源。
因此,Spark選擇記錄更新的方式。但是,如果更新粒度太細太多,那么記錄更新成本也不低。因此,RDD只支持粗粒度轉換,即只記錄單個塊上執(zhí)行的單個操作(記錄如何從其他RDD轉換而來,即lineage),然后將創(chuàng)建RDD的一系列變換序列(每個RDD都包含了他是如何由其他RDD變換過來的以及如何重建某一塊數據的信息。因此RDD的容錯機制又稱“血統(tǒng)(Lineage)”容錯)記錄下來,以便恢復丟失的分區(qū)。
Lineage本質上很類似于數據庫中的重做日志(Redo Log),只不過這個重做日志粒度很大,是對全局數據做同樣的重做進而恢復數據。
Lineage容錯原理:在容錯機制中,如果一個節(jié)點死機了,而且運算窄依賴,則只要把丟失的父RDD分區(qū)重算即可,不依賴于其他節(jié)點。而寬依賴需要父RDD的所有分區(qū)都存在,重算就很昂貴了??梢赃@樣理解開銷的經濟與否:在窄依賴中,在子RDD的分區(qū)丟失、重算父RDD分區(qū)時,父RDD相應分區(qū)的所有數據都是子RDD分區(qū)的數據,并不存在冗余計算。在寬依賴情況下,丟失一個子RDD分區(qū)重算的每個父RDD的每個分區(qū)的所有數據并不是都給丟失的子RDD分區(qū)用的,會有一部分數據相當于對應的是未丟失的子RDD分區(qū)中需要的數據,這樣就會產生冗余計算開銷,這也是寬依賴開銷更大的原因。因此如果使用Checkpoint算子來做檢查點,不僅要考慮Lineage是否足夠長,也要考慮是否有寬依賴,對寬依賴加Checkpoint是最物有所值的。
Checkpoint機制。在以下2種情況下,RDD需要加檢查點:
? DAG中的Lineage過長,如果重算,則開銷太大(如在多次迭代中)
? 在寬依賴上做Checkpoint獲得的收益更大
由于RDD是只讀的,所以Spark的RDD計算中一致性不是主要關心的內容,內存相對容易管理,這也是設計者很有遠見的地方,這樣減少了框架的復雜性,提升了性能和可擴展性,為以后上層框架的豐富奠定了強有力的基礎。
在RDD計算中,通過檢查點機制進行容錯,傳統(tǒng)做檢查點有兩種方式:通過冗余數據和日志記錄更新操作。在RDD中的doCheckPoint方法相當于通過冗余數據來緩存數據,而之前介紹的血統(tǒng)就是通過相當粗粒度的記錄更新操作來實現容錯的。
檢查點(本質是通過將RDD寫入Disk做檢查點)是為了通過lineage做容錯的輔助,Lineage過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果之后有節(jié)點出現問題而丟失分區(qū),從做檢查點的RDD開始重做Lineage,就會減少開銷。
1、從本質上說:checkpoint是容錯機制;cache是優(yōu)化機制
2、checkpoint將數據寫到共享存儲中(hdfs);cache通常是內存中
3、運算時間很長或運算量太大才能得到的 RDD,computing chain 過長或依賴其他 RDD 很多的RDD,需要做checkpoint。會被重復使用的(但不能太大)RDD,做cache。
實際上,將 ShuffleMapTask 的輸出結果存放到本地磁盤也算是 checkpoint,只不過這個checkpoint 的主要目的是去 partition 輸出數據。
4、RDD 的checkpoint 操作完成后會斬斷l(xiāng)ineage,cache操作對lineage沒有影響。
checkpoint 在 Spark Streaming中特別重要,spark streaming 中對于一些有狀態(tài)的操作,這在某些 stateful 轉換中是需要的,在這種轉換中,生成 RDD 需要依賴前面的 batches,會導致依賴鏈隨著時間而變長。為了避免這種沒有盡頭的變長,要定期將中間生成的 RDDs 保存到可靠存儲來切斷依賴鏈,必須隔一段時間進行一次checkpoint。
cache 和 checkpoint 是有顯著區(qū)別的,緩存把 RDD 計算出來然后放在內存中, 但是RDD 的依賴鏈(相當于數據庫中的redo 日志),也不能丟掉,當某個點某個 executor 宕了,上面cache 的RDD就會丟掉,需要通過依賴鏈重放計算出來,不同的是,checkpoint 是把 RDD 保存在 HDFS中,是多副本可靠存儲,所以依賴鏈就可以丟掉了,即斬斷了依賴鏈,是通過復制實現的高容錯。
注意:checkpoint需要把 job 重新從頭算一遍,最好先cache一下,checkpoint就可以直接保存緩存中的 RDD 了,就不需要重頭計算一遍了,對性能有極大的提升。
checkpoint 的正確使用姿勢
val data = sc.textFile("/tmp/spark/1.data").cache() // 注意要cache
sc.setCheckpointDir("/tmp/spark/checkpoint")
data.checkpoint
data.count
//問題:cache和checkpoint有沒有先后的問題;有了cache可以避免第二次計算,我在代碼中可以看見相關的說明!?。?/p>
使用很簡單, 就是設置一下 checkpoint 目錄,然后再rdd上調用 checkpoint 方法, action 的時候就對數據進行了 checkpoint
checkpoint 寫流程
RDD checkpoint 過程中會經過以下幾個狀態(tài),
[ Initialized –> marked for checkpointing –> checkpointing in progress –> checkpointed ]
另外有需要云服務器可以了解下創(chuàng)新互聯scvps.cn,海內外云服務器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務器、裸金屬服務器、高防服務器、香港服務器、美國服務器、虛擬主機、免備案服務器”等云主機租用服務以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應用場景需求。