spark的RDD以及代碼實(shí)操是怎樣進(jìn)行的,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。
創(chuàng)新互聯(lián)專注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務(wù),包含不限于成都做網(wǎng)站、成都網(wǎng)站制作、大姚網(wǎng)絡(luò)推廣、微信小程序開發(fā)、大姚網(wǎng)絡(luò)營銷、大姚企業(yè)策劃、大姚品牌公關(guān)、搜索引擎seo、人物專訪、企業(yè)宣傳片、企業(yè)代運(yùn)營等,從售前售中售后,我們都將竭誠為您服務(wù),您的肯定,是我們最大的嘉獎;創(chuàng)新互聯(lián)為所有大學(xué)生創(chuàng)業(yè)者提供大姚建站搭建服務(wù),24小時服務(wù)熱線:18980820575,官方網(wǎng)址:www.cdcxhl.com
??在開始學(xué)習(xí)Spark工作原理之前, 先來介紹一下Spark中兩個最為重要的概念-- 彈性分布式數(shù)據(jù)集(Resilient Distributed Datasets, RDD) 和算子(Operation).
RDD背景
??Spark的核心是建立在RDD之上, 使Spark中的各個組件可以無縫進(jìn)行集成, 從而在一個應(yīng)用程序中完成大數(shù)據(jù)計算. 這也是為什么說在SparkCore中一切得計算都是基于RDD來完成的. RDD的設(shè)計理念源自AMP實(shí)驗(yàn)室發(fā)表的論文–Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing.
??MapReduce計算框架在實(shí)際應(yīng)用中, 許多迭代式算法和交互式數(shù)據(jù)挖掘過程中的計算結(jié)果會寫到磁盤, 然后再重復(fù)使用, 這就帶來了大量的磁盤IO和序列化開銷. 為解決中間過程數(shù)據(jù)落地花費(fèi)大量時間的需求, 出現(xiàn)了一種抽象的數(shù)據(jù)結(jié)構(gòu), 讓我們不必再考慮數(shù)據(jù)的分布式特性, 只需保存具體的邏輯轉(zhuǎn)換表達(dá)式即可, 這種數(shù)據(jù)結(jié)構(gòu)就是RDD.
??RDD之間的轉(zhuǎn)換操作使父子RDD之間具有依賴關(guān)系, 滿足條件的RDD之間形成管道(Pipeline), 從而避免中間結(jié)果落地, 極大的降低了磁盤IO和序列化消耗的時間.
RDD介紹
??RDD(彈性分布式數(shù)據(jù)集), 雖然叫做數(shù)據(jù)集, 但RDD并不像集合一樣存儲真實(shí)的數(shù)據(jù), 而是存儲這些數(shù)據(jù)轉(zhuǎn)換的邏輯, 可以將RDD理解為一個大的數(shù)據(jù)集合以分布式的形式保存在集群服務(wù)器的內(nèi)存中. 每個RDD可以分成多個分區(qū), 每個分區(qū)就是一個數(shù)據(jù)集片段, 并且一個RDD的不同分區(qū)可以被保存到集群中不同的節(jié)點(diǎn)上(但是同一個分區(qū)不能被拆分保存), 從而可以在集群中的不同節(jié)點(diǎn)上進(jìn)行并行計算.
??RDD提供了一種高度受限的共享內(nèi)存模型, 即RDD是只讀的記錄分區(qū)的集合, 不能直接修改, 只能基于穩(wěn)定的物理存儲中的數(shù)據(jù)集來創(chuàng)建RDD, 或者通過在其他RDD上執(zhí)行轉(zhuǎn)換操作(如map、join和groupBy) 創(chuàng)建得到新的RDD.
Operation介紹
??算子(Operation)是Spark中定義的函數(shù), 用于對RDD中的數(shù)據(jù)結(jié)構(gòu)進(jìn)行操作和轉(zhuǎn)換等. Spark中的算子可以分為4類:
創(chuàng)建類(creation)算子, 用于將內(nèi)存中的集合或外部文件創(chuàng)建為RDD.
轉(zhuǎn)換(transformation)算子, 用于將一種格式的RDD轉(zhuǎn)換為其他自定義格式.
緩存(cache)算子, 用于將RDD緩存在內(nèi)存(memory)或磁盤(disk)中, 一般后續(xù)計算會用到重復(fù)數(shù)據(jù)時才會使用.
行動(action)算子, 用于觸發(fā)執(zhí)行Spark作業(yè), 并將計算結(jié)果保存為集合, 標(biāo)量或保存到外部文件, 數(shù)據(jù)庫中.
??典型的RDD執(zhí)行過程如下:
讀入外部數(shù)據(jù)源(或者內(nèi)存中的集合) ,然后Create RDD;
RDD經(jīng)過一系列Transformation, 每一次都會產(chǎn)生不同的RDD, 供給下一個Transformation 使用;
最后一個RDD經(jīng)Action進(jìn)行處理, 得到最后想要的值, 并進(jìn)行后續(xù)輸出操作.
??需注意: RDD采用惰性調(diào)用, 即在RDD的執(zhí)行過程中, 如圖所示, 真正的計算發(fā)生在RDD的Action操作, 對于Action之前的所有Transformation操作, Spark只是記錄下Transformation操作應(yīng)用的一些基礎(chǔ)數(shù)據(jù)集以及RDD生成的軌跡, 即相互之間的依賴關(guān)系, 而不會觸發(fā)真正的計算.
??RDD提供的轉(zhuǎn)換接口都非常簡單, 都是類似map, filter, groupBy, join等粗粒度的數(shù)據(jù)轉(zhuǎn)換操作, 而不是針對某個數(shù)據(jù)項(xiàng)的細(xì)粒度修改. 因此, RDD比較適合對于數(shù)據(jù)集中元素執(zhí)行相同操作的批處理式應(yīng)用, 而不適合用于需要異步/細(xì)粒度狀態(tài)的應(yīng)用, 比如Web應(yīng)用系統(tǒng), 增量式的網(wǎng)頁爬蟲等.
??轉(zhuǎn)換和行動兩種類型的算子, 前者指定RDD之間的相互依賴關(guān)系, 后者用于執(zhí)行計算并指定輸出的形式. 兩類操作的主要區(qū)別是, 轉(zhuǎn)換操作接受RDD并返回RDD, 而行動操作(如count、collect等) 接受RDD但是返回非RDD(即輸出一個值或結(jié)果).
RDD五大特性
RDD是由一系列的Partition(分區(qū))組成;
每一個函數(shù)作用在每一個分區(qū)上;
RDD之間存在依賴關(guān)系;
[可選項(xiàng)]分區(qū)器作用在KV格式的RDD上;
[可選項(xiàng)]RDD會提供最佳計算位置.
??接下來, 結(jié)合Spark實(shí)現(xiàn)的WC案例, 來理解這五個特性以及其他注意點(diǎn)(圖中綠色為block塊, 藍(lán)色為Partition分區(qū)):
HDFS存儲文件是以block塊的形式, Spark應(yīng)用在讀取HDFS上的數(shù)據(jù)后, 會將同一個block塊中的數(shù)據(jù)轉(zhuǎn)換邏輯保存在同一個Partition中, 一個文件對應(yīng)的所有Partition構(gòu)成一個RDD. 即一個RDD中的Partition個數(shù)等于這個文件存儲在HDFS中的block個數(shù). 但有一個例外, 如果一個block塊的最后存儲了某個數(shù)據(jù)的大部分字節(jié)后達(dá)到block規(guī)定的大小, 僅有少量字節(jié)存儲在另外一個block塊中, 這時這多余的小部分?jǐn)?shù)據(jù)會放在與大部分?jǐn)?shù)據(jù)相同的Partition中, 即Partition數(shù)小于block塊數(shù).
Spark中沒有讀文件的方法, 但Spark依然能夠讀取文件內(nèi)容依賴的是MapReduce中讀文件的方法. MR讀文件前, 會先將文件劃分為一個個的split(切片), 一個split的大小 = 一個block的大小; 但這個文件的split個數(shù) ≈ 存儲這個文件的block個數(shù)(同上一個例外情況); 一個RDD中Partition的個數(shù) = 這個文件切分的split個數(shù).
每一個函數(shù)作用在每一個分區(qū)上, 即每個函數(shù)會在每一個Partition中各執(zhí)行一次.
RDD之間存在依賴關(guān)系, 通過一個算子關(guān)聯(lián)的兩個RDD稱為父子RDD, 父子RDD之間存在寬窄依賴(后續(xù)講解), 子RDD知道它的父RDD是誰, 但父RDD不知道它的子RDD有誰. 這種依賴關(guān)系的優(yōu)勢在于當(dāng)數(shù)據(jù)因某種情形丟失時, 可以通過算子和父RDD重寫計算出子RDD, 從而提高了計算的容錯性. (RDD的依賴關(guān)系也被稱為RDD的血統(tǒng)–Lineage)
KV格式的RDD指RDD中的數(shù)據(jù)是二元組類型, 對于這類RDD可以使用分區(qū)器按照Key或者Value進(jìn)行分組, 進(jìn)而完成聚合計算. 在WC中, pairRDD和restRDD均為KV格式的RDD. 分區(qū)器用于決定數(shù)據(jù)被放到哪一個reduce task中處理.
每一個算子作用在每一個Partition上, Partition會分布式的存儲在集群各個節(jié)點(diǎn)的內(nèi)存中, 對一個Partition的連續(xù)處理可以看作是一個task任務(wù), 每一個task計算任務(wù)都在數(shù)據(jù)所在節(jié)點(diǎn)上執(zhí)行, 從而實(shí)現(xiàn)數(shù)據(jù)本地化, 減少網(wǎng)絡(luò)IO. 簡單來說, RDD會提供一個方法接口, 調(diào)用這個接口就能直接拿到這個RDD所有Partition的位置, 拿到位置之后就可以分發(fā)task了. 至于這個接口是什么不需要我們關(guān)心, Spark應(yīng)用在執(zhí)行時會自動尋找.
實(shí)際操作:
案例說明
??大數(shù)據(jù)分析處理萬變不離其宗, 核心思想就是一個WorldCount–單詞統(tǒng)計. 單詞統(tǒng)計, 顧名思義就是將一個文件中出現(xiàn)的所有單詞讀一遍, 并對相同單詞的個數(shù)進(jìn)行統(tǒng)計. 如何處理這個文件? 如何得到每一個單詞? 如何對相同的單詞進(jìn)行統(tǒng)計? 這三個問題是需要解決的核心問題, 接下來就一起來看看是如何對一個文件進(jìn)行WordCount的.
??首先, 來看一下我們測試的數(shù)據(jù), 在這匹數(shù)據(jù)中, 同一行中每個單詞之間使用制表符’\t’ 來分隔, 接下來我們先對這批數(shù)據(jù)的計算思想進(jìn)行解析, 然后再分別使用MapReduce和Spark技術(shù)的API編碼實(shí)現(xiàn).
??通過對這兩種技術(shù)編碼的比較, 可以幫助大家更好的理解之前所說的Spark在表達(dá)能力上相較于Hadoop(MR)的優(yōu)勢 Spark優(yōu)勢鏈接. 除此之外, 更重要的一點(diǎn)是引入SparkCore中彈性分布式數(shù)據(jù)集(RDD) 的概念, 對RDD有一定認(rèn)識之后, 將有利于學(xué)習(xí)RDD的具體原理以及如何使用等知識.
??在Spark中, 一切計算都是基于RDD實(shí)現(xiàn)的, RDD可以看作是一個集合, 類似于Scala中的List, Map, 它有著與這些普通集合相同的方法(map, flatmap, foreach…), 但是RDD是重新寫的這些方法, 初次之外還有許多其他的方法, 這些方法在Spark中稱為算子, 之后的博客中會對它們進(jìn)行詳細(xì)介紹.
計算分析
無論是MapReduce還是Spark, 在讀取數(shù)據(jù)時都是一行一行讀取的而且讀取的數(shù)據(jù)都是字符串類型, 因此在處理時要把一行數(shù)據(jù)看成一條記錄;
既然一行是一條記錄, 那么我們在處理時只需要關(guān)注這一條記錄即可, 其余記錄格式與之相同, 不相同格式的數(shù)據(jù)一般為臟數(shù)據(jù), 需要過濾掉. 相同格式的按照規(guī)律進(jìn)行切分(split).
數(shù)據(jù)切分完成后, 就可以得到每一個單詞, 然后將每一個單詞當(dāng)作key, 把它的value置為1, 得到一些列KV格式的數(shù)據(jù), 這些數(shù)據(jù)中有的key相同, 有的key不同, 但value都是1.
對這一系列KV格式的數(shù)據(jù)進(jìn)行統(tǒng)計, 先按照Key進(jìn)行分組, 相同Key, 即同一個單詞為一組, 這個Key對應(yīng)多個Value, 構(gòu)成一個有一個或多個元素1組成的集合. 然后再將同一個Key中所有的Value進(jìn)行累加, 累加完成之后將累加值最為新的Value, Key還是原來的Key.
最新的KV格式的數(shù)據(jù)中, Key代表的是出現(xiàn)的每一個單詞, Value則對應(yīng)該單詞出現(xiàn)的次數(shù).
??圖解:
————————————————
代碼實(shí)現(xiàn)
在SparkCore中一切得計算都是基于RDD(彈性分布式數(shù)據(jù)集), R(Resilient) D(Distributed ) D(Dataset). RDD 調(diào)用的方法稱為算子,一般情況下RDD的算子返回的還是RDD. 先對RDD有個大概的了解, 之后再對其進(jìn)行詳細(xì)地介紹.
??準(zhǔn)備環(huán)境:
Scala運(yùn)行環(huán)境
導(dǎo)入jar包, 開發(fā)Spark應(yīng)用程序時, 只需要導(dǎo)入一個整合包即可.
??用Spark寫WC:
package com.hpe.spark.core
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object WCSpark {
def main(args: Array[String]): Unit = {
//創(chuàng)建配置對象
val conf = new SparkConf()
//設(shè)置App的名稱-->方便在監(jiān)控頁面找到
conf.setAppName("WCSpark")
//設(shè)置Spark的運(yùn)行模式-->local本地運(yùn)行-->用于測試環(huán)境
conf.setMaster("local")
//創(chuàng)建Spark上下文 他是通往集群的唯一通道
val sc = new SparkContext(conf)
// textFile()讀取上述數(shù)據(jù),讀取時是一行行讀取,可以是本地也可是HDFS的數(shù)據(jù),返回RDD類型的數(shù)據(jù)
val lineRDD = sc.textFile("d:/wc.txt")
// 基于lineRDD中的數(shù)據(jù)按照\t進(jìn)行分詞
val wordRDD = lineRDD.flatMap { _.split("\t") }
// 將wordRDD中的每一條數(shù)據(jù)封裝成一個二元組,每一個單詞計數(shù)為1 pairRDD[(K:word V:1)]
val pairRDD = wordRDD.map { (_,1) }
// 將pairRDD中相同的單詞分為一組,對組內(nèi)的數(shù)據(jù)進(jìn)行累加
val restRDD = pairRDD.reduceByKey((v1,v2)=>v1+v2)
//可簡寫為:val restRDD = pairRDD.reduceByKey(_+_)
// 根據(jù)單詞出現(xiàn)的次數(shù)來排序,sortBy():根據(jù)指定字段來排序,false:指定為降序;
// foreach對RDD中排好序的數(shù)據(jù)進(jìn)行遍歷
restRDD
.sortBy(x=>x._2, false)
.foreach(println)
//一直啟動,為查看而寫
while(true){}
//釋放資源
sc.stop()
}
}
??但從代碼的編寫上來看, 不難發(fā)現(xiàn), Spark的表達(dá)能力著實(shí)比MR強(qiáng), 上述代碼中間處理部分其實(shí)還可以更加簡潔:
val lineRDD = sc.textFile("d:/wc.txt") .flatMap { _.split("\t") } .map { (_,1) } .reduceByKey(_+_) .sortBy(_._2, false) .foreach(println)
??MR中復(fù)雜的程序, 在Spark中了了幾行就可以輕松解決, 既可以看出Scala語言的靈活性, 又表現(xiàn)了Spark超強(qiáng)的表達(dá)能力, 因此Spark在計算上逐漸取代MR.
??這里最后一句while(true){} ,讓程序一直執(zhí)行, 可以在WebUI的監(jiān)控頁面http://localhost:4040進(jìn)行查看。
看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進(jìn)一步的了解或閱讀更多相關(guān)文章,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝您對創(chuàng)新互聯(lián)的支持。