本篇內(nèi)容介紹了“Spark的RDD如何創(chuàng)建”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!
網(wǎng)站建設(shè)哪家好,找成都創(chuàng)新互聯(lián)公司!專注于網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、重慶小程序開發(fā)公司、集團企業(yè)網(wǎng)站建設(shè)等服務(wù)項目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了息縣免費建站歡迎大家使用!
一:Scala
Scala 是一門現(xiàn)代的多范式編程語言,志在以簡練、優(yōu)雅及類型安全的方式來表達常用編程模式。它平滑地集成了面向?qū)ο蠛秃瘮?shù)語言的特性。Scala 運行于 Java 平臺(JVM,Java 虛擬機),并兼容現(xiàn)有的 Java 程序。
執(zhí)行以下命令,啟動spark-shell:
hadoop@master:/mysoftware/spark-1.6.1$ spark-shell
二:彈性分布式數(shù)據(jù)集(RDD)
1.RDD(Resilient Distributed Dataset,彈性分布式數(shù)據(jù)集)。
Spark是一個分布式計算框架,而RDD是其對分布式內(nèi)存數(shù)據(jù)的抽象,可以認為RDD就是Spark分布式算法的數(shù)據(jù)結(jié)構(gòu)。而RDD之上的操作是Spark分布式算法的核心原語,由數(shù)據(jù)結(jié)構(gòu)和原語設(shè)計上層算法。Spark最終會將算法翻譯為DAG形式的工作流進行調(diào)度,并進行分布式任務(wù)的發(fā)布。
RDD,它在集群中的多臺機器上進行了數(shù)據(jù)分區(qū),邏輯上可以認為是一個分布式的數(shù)組,而數(shù)組中每個記錄可以是用戶自定義的任意數(shù)據(jù)結(jié)構(gòu)。RDD是Spark的核心數(shù)據(jù)結(jié)構(gòu),通過RDD的依賴關(guān)系形成Spark的調(diào)度順序,通過對RDD的操作形成了整個Spark程序。
2.RDD的創(chuàng)建方式
2.1 從Hadoop文件系統(tǒng)(或與Hadoop兼容的其他持久化存儲系統(tǒng),如Hive,HBase)輸出(HDFS)創(chuàng)建。
2.2 從父RDD轉(zhuǎn)換得到新的RDD
2.3 通過parallelize或makeRDD將單擊數(shù)據(jù)創(chuàng)建為分布式RDD。
scala> var textFile = sc.textFile("hdfs://192.168.226.129:9000/txt/sparkshell/sparkshell.txt"); textFile: org.apache.spark.rdd.RDD[String] = hdfs://192.168.226.129:9000/txt/sparkshell/sparkshell.txt MapPartitionsRDD[1] at textFile at:27 scala> val a = sc.parallelize(1 to 9, 3) a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at :27 scala>
3.RDD的兩種操作算子: 轉(zhuǎn)換(Transformation),行動(Action)。
3.1 轉(zhuǎn)換(Transformation):延遲計算的,也就是說從一個RDD轉(zhuǎn)換生成另外一個RDD的轉(zhuǎn)換操作不是馬上執(zhí)行,需要等到有Action操作的時候才會真正觸發(fā)運算。
3.2 行動(Action):Action算子會觸發(fā)Spark提交作業(yè)(Job),并將數(shù)據(jù)輸出Spark系統(tǒng)。
4.RDD的重要內(nèi)部屬性。
4.1 分區(qū)列表:通過分區(qū)列表可以找到一個RDD中包含的所有分區(qū)及其所在地址。
4.2 計算每個分片的函數(shù):通過函數(shù)可以對每個數(shù)據(jù)塊進行RDD需要進行的用戶自定義函數(shù) 運算。
4.3 對父RDD的依賴列表:為了能夠回溯帶父RDD,為容錯等提供支持。
4.4 對 key-value pair數(shù)據(jù)類型RDD的分區(qū)器,控制分區(qū)策略和分區(qū)數(shù)。通過分區(qū)函數(shù)可以確定數(shù)據(jù)記錄在各個分區(qū)和節(jié)點上的分配,減少分布不平衡。
4.5 每個數(shù)據(jù)分區(qū)的地址列表(如 HDFS 上的數(shù)據(jù)塊的地址)。
如果數(shù)據(jù)有副本,則通過地址列表可以獲知單個數(shù)據(jù)塊的所有副本地址,為負載均衡和容錯提供支持。
4. Spark 計算工作流
途中描述了Spark的輸入,運行轉(zhuǎn)換,輸出。在運行轉(zhuǎn)換中通過算子對RDD進行轉(zhuǎn)換。算子RDD中定義的函數(shù),可以對RDD中的數(shù)據(jù)進行轉(zhuǎn)換和操作。
輸入:在Spark程序運行中,數(shù)據(jù)從外部數(shù)據(jù)空間(eg:HDFS)輸入到Spark,數(shù)據(jù)就進入了Spark運行時數(shù)據(jù)空間,會轉(zhuǎn)化為Spark中的數(shù)據(jù)塊,通過BlockManager進行管理。
運行:在Spark數(shù)據(jù)輸入形成RDD后,便可以通過變換算子fliter等,對數(shù)據(jù)操作并將RDD轉(zhuǎn)換為新的RDD,通過行動Action算子,觸發(fā)Spark提交作業(yè)。如果數(shù)據(jù)需要服用,可以通過Cache算子,將數(shù)據(jù)緩存到內(nèi)存。
輸出:程序運行結(jié)束數(shù)據(jù)會輸出Spark運行時空間,存儲到分布式存儲中(如saveAsTextFile 輸出到 HDFS)或 Scala 數(shù)據(jù)或集合中( collect 輸出到 Scala 集合,count 返回 Scala Int 型數(shù)據(jù))。
Spark的核心數(shù)據(jù)模型是RDD,但RDD是個抽象類,具體由各子類實現(xiàn),如MappedRDD,ShuffledRDD等子類。Spark將常用的大數(shù)據(jù)操作都轉(zhuǎn)換成為RDD的子類。
對其一些基本操作的使用:
scala> 3*7 res0: Int = 21 scala> var textFile = sc.textFile("hdfs://192.168.226.129:9000/txt/sparkshell/sparkshell.txt"); textFile: org.apache.spark.rdd.RDD[String] = hdfs://192.168.226.129:9000/txt/sparkshell/sparkshell.txt MapPartitionsRDD[1] at textFile at:27 scala> textFile.count() res1: Long = 3 scala> textFile.first() res2: String = 1 spark scala> textFile.filter(line => line.contains("berg")).count() res3: Long = 1 scala> textFile.filter(line => line.contains("bergs")).count() res4: Long = 0 scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res5: Int = 1 scala> textFile.map(line => line.split("\t").size).reduce((a, b) => if (a > b) a else b) res6: Int = 2
“Spark的RDD如何創(chuàng)建”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!