??spark提供了對數(shù)據(jù)的核心抽象——彈性分布式數(shù)據(jù)集(Resilient Distributed Dataset,簡稱RDD)。RDD是一個(gè)分布式的數(shù)據(jù)集合,數(shù)據(jù)可以跨越集群中的多個(gè)機(jī)器節(jié)點(diǎn),被分區(qū)并行執(zhí)行。
?在spark中,對數(shù)據(jù)的所有操作不外乎創(chuàng)建RDD、轉(zhuǎn)化已有RDD及調(diào)用RDD操作進(jìn)行求值。spark會自動(dòng)地將RDD中的數(shù)據(jù)分發(fā)到集群中并行執(zhí)行。
專注于為中小企業(yè)提供網(wǎng)站建設(shè)、成都網(wǎng)站設(shè)計(jì)服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)金牛免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動(dòng)了上千多家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。
??spark提供了兩種創(chuàng)建RDD的方式:讀取外部數(shù)據(jù)源、將驅(qū)動(dòng)器程序中的集合進(jìn)行并行化。
??使用sparkContext的parallelize()方法將集合并行化。
?parallelize()方法第二個(gè)參數(shù)可指定分區(qū)數(shù)。spark會為每個(gè)分區(qū)創(chuàng)建一個(gè)task任務(wù),通常每個(gè)cpu需要2-4個(gè)分區(qū)。spark會自動(dòng)地根據(jù)集群大小設(shè)置分區(qū)數(shù),也支持通過parallelize()方法的第二個(gè)參數(shù)手動(dòng)指定。
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
List data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD distData = sc.parallelize(data);
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
??注:除了開發(fā)和測試外,這種方式用得不多。這種方式需要把整個(gè)數(shù)據(jù)集先放到一臺機(jī)器的內(nèi)存中。
??spark可接入多種hadoop支持的數(shù)據(jù)源來創(chuàng)建分布式數(shù)據(jù)集。包括:本地文件系統(tǒng)、HDFS、Cassandra、HBase、Amazon S3等。
?spark支持多種存儲格式,包括textFiles、SequenceFiles及其他hadoop存儲格式。
scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at :26
JavaRDD distFile = sc.textFile("data.txt");
>>> distFile = sc.textFile("data.txt")
??RDD支持兩種操作:轉(zhuǎn)化操作和行動(dòng)操作。
??RDD的轉(zhuǎn)化操作會返回一個(gè)新的RDD。轉(zhuǎn)化操作是惰性求值的,只有行動(dòng)操作用到轉(zhuǎn)化操作生成的RDD時(shí),才會真正進(jìn)行轉(zhuǎn)化。
?spark使用lineage(血統(tǒng))來記錄轉(zhuǎn)化操作生成的不同RDD之間的依賴關(guān)系。依賴分為窄依賴(narrow dependencies)和寬依賴(wide dependencies)。
寬依賴
對兩個(gè)RDD基于key進(jìn)行合并和重組,如join
??行動(dòng)操作則會向驅(qū)動(dòng)器程序返回結(jié)果或把結(jié)果寫入外部系統(tǒng),會觸發(fā)實(shí)際的計(jì)算。
??RDD通過persist方法或cache方法可以將前面的計(jì)算結(jié)果緩存,但是并不是這兩個(gè)方法被調(diào)用時(shí)立即緩存,而是觸發(fā)后面的action時(shí),該RDD將會被緩存在計(jì)算節(jié)點(diǎn)的內(nèi)存中,并供后面重用。
?cache最終也是調(diào)用了persist方法,默認(rèn)的存儲級別是僅在內(nèi)存存儲一份。
?Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。
?緩存有可能丟失,RDD的緩存容錯(cuò)機(jī)制保證即使緩存丟失也能保證計(jì)算正確執(zhí)行。通過基于RDD的一系列轉(zhuǎn)換,丟失的數(shù)據(jù)會被重算,由于RDD的各個(gè)Partition是相對獨(dú)立的,因此只需要計(jì)算丟失的部分即可,并不需要重算全部Partition。
Lineage機(jī)制
RDD的Lineage記錄的是粗粒度的特定數(shù)據(jù)Transformation操作行為。當(dāng)RDD的部分分區(qū)數(shù)據(jù)丟失時(shí),可以通過Lineage來重新運(yùn)算和恢復(fù)丟失的數(shù)據(jù)分區(qū)。這種粗顆粒的數(shù)據(jù)模型,限制了Spark的運(yùn)用場合,所以Spark并不適用于所有高性能要求的場景,但同時(shí)相比細(xì)顆粒度的數(shù)據(jù)模型,也帶來了性能的提升。
Spark Lineage機(jī)制是通過RDD的依賴關(guān)系來執(zhí)行的
窄依賴可以在某個(gè)計(jì)算節(jié)點(diǎn)上直接通過計(jì)算父RDD的某塊數(shù)據(jù)計(jì)算得到子RDD對應(yīng)的某塊數(shù)據(jù)。
Checkpoint機(jī)制
簡介
實(shí)現(xiàn)方式(checkpoint有兩種實(shí)現(xiàn)方式,如果代碼中沒有設(shè)置checkpoint,則使用local的checkpoint模式,如果設(shè)置路徑,則使用reliable的checkpoint模式。)
LocalRDDCheckpointData:臨時(shí)存儲在本地executor的磁盤和內(nèi)存上。該實(shí)現(xiàn)的特點(diǎn)是比較快,適合lineage信息需要經(jīng)常被刪除的場景(如GraphX),可容忍executor掛掉。
忠于技術(shù),熱愛分享。歡迎關(guān)注公眾號:java大數(shù)據(jù)編程,了解更多技術(shù)內(nèi)容。