這篇文章主要講解了“Spark編程知識點(diǎn)有哪些”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Spark編程知識點(diǎn)有哪些”吧!
我們提供的服務(wù)有:成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站制作、微信公眾號開發(fā)、網(wǎng)站優(yōu)化、網(wǎng)站認(rèn)證、安陽ssl等。為上千多家企事業(yè)單位解決了網(wǎng)站和推廣的問題。提供周到的售前咨詢和貼心的售后服務(wù),是有科學(xué)管理、有技術(shù)的安陽網(wǎng)站制作公司
#Spark開發(fā)指南#
##簡介## 總的來說,每一個(gè)Spark應(yīng)用程序,都是由一個(gè)驅(qū)動(dòng)程序組成,它運(yùn)行用戶的main函數(shù),并且在一個(gè)集群上執(zhí)行各種各樣的并行操作。Spark提供的主要的抽象(概念)是一個(gè)彈性分布式數(shù)據(jù)集,它是一個(gè)元素集合,劃分到集群的不同節(jié)點(diǎn)上,可以被并行操作。RDDs的創(chuàng)建可以從Hadoop文件系統(tǒng)(或者任何支持Hadoop的文件系統(tǒng))上的一個(gè)文件開始,或者通過轉(zhuǎn)換這個(gè)驅(qū)動(dòng)程序中已存在的Scala集合而來。用戶也可以使Spark持久化一個(gè)RDD到內(nèi)存中,使其能在并行操作中被有效的重用。最后,RDDs能自動(dòng)從節(jié)點(diǎn)故障中恢復(fù)。
Spark中的第二個(gè)抽象(概念)是共享變量,他可以在并行操作中使用。默認(rèn)情況下,Spark通過不同節(jié)點(diǎn)上的一系列任務(wù)來并行運(yùn)行一個(gè)函數(shù)。他將每一個(gè)函數(shù)中用的到變量的拷貝傳遞到每一個(gè)任務(wù)中。有時(shí)候,一個(gè)變量需要在不同的任務(wù)之間,或者任務(wù)和驅(qū)動(dòng)程序之間共享。Spark支持兩種類型的共享變量:廣播變量,可以再所有節(jié)點(diǎn)的內(nèi)存中緩存一個(gè)值,累加器,一個(gè)只能做加法的變量,例如計(jì)數(shù)器和求和。
本指南通過每一種Spark支持的語言來展示Spark的每個(gè)特性。It is easiest to follow along with if you launch Spark’s interactive shell – either bin/spark-shell for the Scala shell or bin/pyspark for the Python one.
##接入Spark##
###Java###
Spark1.0.2工作在Java6或者java6以后之上。如果你在使用Java8,Spark支持lamdba表達(dá)式來簡化函數(shù)編寫,否則,你可以使用org.apache.spark.api.java.function 包下的類。
用Java編寫Spark應(yīng)用,你需要添加Spark的依賴,Spark可以通過Maven Central使用:
groupId=org.apache.spark artifactId=spark-core_2.10 version=1.0.2
另外,如果你想訪問一個(gè)HDFS集群,你需要根據(jù)你的HDFS版本添加一個(gè)hadoop-client依賴。一些常用的HDFS版本標(biāo)簽顯示在頁面。
groupId=org.apache.hadoop artifactId=hadoop-client version=
最后,你需要在你的程序中導(dǎo)入一些Spark類,通過添加如下幾行:
import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.api.java.JavaRDD import org.apache.spark.SparkConf
##初始化Spark##
###Java### Spark程序需要做的第一件事就是創(chuàng)建一個(gè)JavaSparkContext對象 ,它將告訴Spark怎樣訪問一個(gè)集群。創(chuàng)建一個(gè)SparkContext,你首先必須創(chuàng)建SparkConf對象,它包含關(guān)于你的應(yīng)用程序的信息。
SparkConf conf=new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc=new JavaSparkContext(conf);
appName參數(shù)是你的應(yīng)用程序的名字,將會(huì)在集群的UI上顯示。master是Spark、Mesos、或者YARN 集群URL,或者一個(gè)專用的字符串”local“使其在本地模式下運(yùn)行。在實(shí)踐中,當(dāng)運(yùn)行在一個(gè)集群上,你將不會(huì)想要把master硬編碼到程序中,而是通過使用spark-submit運(yùn)行程序并且接受master。但是,在本地測試或者單元測試中,你可以傳遞”local“在進(jìn)程內(nèi)運(yùn)行Spark。
##彈性分布式數(shù)據(jù)集## Spark反復(fù)圍繞的一個(gè)概念是彈性分布式數(shù)據(jù)集。它是一個(gè)有容錯(cuò)機(jī)制的元素集合,并且可以被并行操作。有兩種創(chuàng)建RDDs的方法。并行化你的驅(qū)動(dòng)程序中已存在的集合,或者引用一個(gè)外部存儲系統(tǒng)的數(shù)據(jù)集,例如一個(gè)共享文件系統(tǒng),HDFS、HBase、或者任何可以提供一個(gè)Hadoop InputFormat的數(shù)據(jù)源。
###并行集合### 并行集合通過調(diào)用JavaSparkContext的parallelize方法,在你的驅(qū)動(dòng)程序中已存在的Collection上創(chuàng)建。集合的元素將會(huì)拷貝組成一個(gè)可以被并行操作的分布式數(shù)據(jù)集。例如,下面是如何創(chuàng)建一個(gè)包含數(shù)字1到5的并行集合:
List
data=Arrays.asList(1,2,3,4,5); JavaRDD distData=sc.parallelize(data);
一旦創(chuàng)建,分布式數(shù)據(jù)集(distData)就可以并行操作。例如,我們可以調(diào)用 distData.reduce((a,b)->a+b)來將列表中的元素相加。我們稍后將會(huì)在分布式數(shù)據(jù)集的操作中描述。
注意:在這個(gè)指南中,我們經(jīng)常使用簡潔的Java8 lamdba語法來定義java functions,但是在老的Java版本中,你可以實(shí)現(xiàn)org.apache.spark.api.java.function包中的接口。我們將會(huì)在下面詳細(xì)描述passing functions to Spark。
并行集合的另一個(gè)重要的參數(shù)是數(shù)據(jù)集被切分成切片(slices)的數(shù)量。Spark將會(huì)為集群中的每一個(gè)slice運(yùn)行一個(gè)task。通常情況下,你要為集群中的每個(gè)CPU 2-4個(gè)slice。通常,Spark會(huì)嘗試根據(jù)你的集群自動(dòng)設(shè)置slice的數(shù)量。然而,你可以手動(dòng)的設(shè)置它,把它作為第二個(gè)參數(shù)傳遞給parallelize(例如:sc.parallelize(data,10)).
###外部數(shù)據(jù)集### Spark可以通過任何Hadoop支持的存儲源創(chuàng)建分布式數(shù)據(jù)集。包括你的本地文件系統(tǒng),HDFS,Cassandra,HBase,Amazon S3等等。Spark支持text files(文本文件),SequenceFiles(序列化文件),和任何其他的Hadoop InputFormat(輸入格式)。
Text file 可以通過使用SparkContext的textFile方式創(chuàng)建。這個(gè)方法接受一個(gè)文件的URI(或者機(jī)器上的一個(gè)本地路徑,或者h(yuǎn)dfs://,s3n:// 等URI)并且把這個(gè)文件讀成一個(gè)行的集合。下面是一個(gè)調(diào)用的例子:
JavaRDD
distFile=sc.textFile("data.txt");
一旦創(chuàng)建,distFile可以被進(jìn)行數(shù)據(jù)集操作。例如:我們可以通過使用map和reduce將所有數(shù)據(jù)行的長度相加.例如:distFile.map(s->s.length()).reduce((a,b)->(a+b)).
Spark讀文件時(shí)的一些注意事項(xiàng):
如果使用本地文件系統(tǒng)上的路徑,
Spark的所有基于文件的輸入方法,包括textFile,支持運(yùn)行目錄,壓縮文件盒通配符。例如,你可以食用textFile("/my/directory/"),textFile("/my/directory/.txt"),和textFile("/my/directory/.gz")
textFile方法也可以接受一個(gè)可選的第二參數(shù)來控制這個(gè)文件的slice數(shù)目。默認(rèn)情況下,Spark為每一個(gè)文件創(chuàng)建一個(gè)slice(HDFS中block默認(rèn)為64MB)。但是你可以通過傳遞一個(gè)較大的值來指定一個(gè)跟高的slice值。注意你的slice數(shù)不能小于block數(shù)。
除了文本文件,Spark的Java API 也支持集中其他數(shù)據(jù)格式。
JavaSparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file.
對于序列化文件(SequenceFiles),使用SparkContext的sequenceFile[K,V],K和V是文件中key和value的類型。他們必須是Hadoop的Writeable接口的子類,像IntWriteable和Text。
對于其他的Hadoop輸入格式,你可以使用JavaSparkContext.hadoopRDD方法。它可以接受任意(類型)的JobConf和輸入格式類,key類和value類。按照像Hadoop Job一樣,來設(shè)置輸入源就可以了。你也可以為InputFormats使用JavaSparkContext.newHadoopRDD,基于”new“MapReduce API(org.apache.hadoop.mapreduce).
JavaRDD.saveAsObjectFile 和JavaContext.objectFile支持以一種由Java對象序列化組成的簡單的格式保存RDD。雖然這不是有效地專門的格式向Avro,但是它提供了一個(gè)簡單的方式存儲RDD。
###RDD操作### RDDs支持兩種類型的操作:轉(zhuǎn)換(transformations),它從一個(gè)現(xiàn)有的數(shù)據(jù)集創(chuàng)建一個(gè)新的數(shù)據(jù)集。動(dòng)作(actions),它在數(shù)據(jù)集上運(yùn)行計(jì)算后,返回一個(gè)值給驅(qū)動(dòng)程序。例如:map就是一個(gè)轉(zhuǎn)換,它將數(shù)據(jù)集的每一個(gè)元素傳遞給一個(gè)函數(shù),并且返回一個(gè)新的RDD表示結(jié)果。另一方面,reduce是一個(gè)動(dòng)作,他通過一些行數(shù)將一些RDD的所有元素聚合起來,并把最終的結(jié)果返回給驅(qū)動(dòng)程序(不過還有一個(gè)并行的reduceByKey,它返回一個(gè)分布式數(shù)據(jù)集)。
Spark中的所有轉(zhuǎn)換都是惰性的,也就是說,他們不會(huì)立即計(jì)算出結(jié)果。相反,他們只是記住應(yīng)用到這些基礎(chǔ)數(shù)據(jù)集(例如file)上的轉(zhuǎn)換。只有當(dāng)發(fā)生一個(gè)需要返回一個(gè)結(jié)果給驅(qū)動(dòng)程序的動(dòng)作時(shí),這些轉(zhuǎn)換才真正執(zhí)行。這樣的設(shè)計(jì)使得Spark運(yùn)行更加高效——例如,我們可以實(shí)現(xiàn),通過map創(chuàng)建一個(gè)數(shù)據(jù)集,并在reduce中使用,最終只返回reduce的結(jié)果給驅(qū)動(dòng)程序,而不是整個(gè)大的新數(shù)據(jù)集。
默認(rèn)情況下,每一個(gè)轉(zhuǎn)換過的RDD都會(huì)在你在它上面運(yùn)行一個(gè)action時(shí)重新計(jì)算。然而,你也可以使用persist方法(或者cache)持久化一個(gè)RDD到內(nèi)存中。在這種情況下,Spark將會(huì)在集群中,保存相關(guān)元素,下次你訪問這個(gè)RDD時(shí),它將能夠更快速訪問,。在磁盤上持久化數(shù)據(jù)集,或者在集群間復(fù)制數(shù)據(jù)集也是支持的。
####基本操作#### 為了說明RDD基礎(chǔ),考慮下面的簡單的程序:
JavaDDD
lines=sc.textFile("data.txtt"); JavaRDD lineLengths=lines.map(s->s.length()); int totalLength=lineLengths.reduce((a,b)->a+b);
第一行通過一個(gè)外部文件定義了一個(gè)基本的RDD。這個(gè)數(shù)據(jù)集未被加載到內(nèi)存,也未在上面執(zhí)行動(dòng)作。lines僅僅是這個(gè)文件的一個(gè)指針。第二行定義了lineLengths作為map轉(zhuǎn)換的結(jié)果。此外,lineLengths因?yàn)槎栊詻]有立即計(jì)算。最后,我們運(yùn)行reduce,他是一個(gè)action。這時(shí)候,Spark將這個(gè)計(jì)算拆分成不同的task,并使其運(yùn)行在獨(dú)立的機(jī)器上,并且每臺機(jī)器運(yùn)行它自己的map部分和本地的reducation,僅僅返回他的結(jié)果給驅(qū)動(dòng)程序。
如果我們想在以后重復(fù)使用lineLengths,我們可以添加:
lineLengths.persist();
在reduce之前,這將導(dǎo)致lineLengths在第一次被計(jì)算之后被保存在內(nèi)存中。
####傳遞Functions到Spark#### Spark的API,在很大程度上依賴于傳遞函數(shù)使其驅(qū)動(dòng)程序在集群上運(yùn)行。在Java中,函數(shù)有實(shí)現(xiàn)了org.apache.spark.api.java.function包中接口的類表示。有兩種創(chuàng)建這樣的函數(shù)的方式:
在你自己的類中實(shí)現(xiàn)Function接口,可以是匿名內(nèi)部類,后者命名類,并且你要傳遞他的一個(gè)實(shí)例到Spark
在Java8中,使用lamdba表達(dá)式來簡潔的定義一種實(shí)現(xiàn)
為了簡潔起見,本指南中的大多數(shù)使用lamdba語法,它易于使用,所有的APIs in long-form,例如,我們可以編寫上面的代碼如下:
JavaRDDlines = sc.textFile("data.txt"); JavaRDD lineLengths = lines.map(new Function () { public Integer call(String s) { return s.length(); } }); int totalLength = lineLengths.reduce(new Function2 () { public Integer call(Integer a, Integer b) { return a + b; } });
或者,如果編寫內(nèi)聯(lián)函數(shù)顯得很笨拙:
class GetLength implements Function{ public Integer call(String s) { return s.length(); } } class Sum implements Function2 { public Integer call(Integer a, Integer b) { return a + b; } } JavaRDD lines = sc.textFile("data.txt"); JavaRDD lineLengths = lines.map(new GetLength()); int totalLength = lineLengths.reduce(new Sum());
Note that anonymous inner classes in Java can also access variables in the enclosing scope as long as they are marked final. Spark will ship copies of these variables to each worker node as it does for other languages
####Wroking with Key-Value Pairs使用鍵/值對工作#### 雖然大多數(shù)Spark操作工作在包含各種類型的對象的RDDs之上,一些特殊的操作僅僅能夠使用包含key-value對的RDDs。最常見的操作之一是分布式”shuffle“操作,例如通過key分組或者聚合元素。
在Java中,key-value對使用scala標(biāo)準(zhǔn)包下的scala Tuple2類表示。你可以簡單的調(diào)用new Tuple2(a,b)去創(chuàng)建一個(gè)tuuple,并且通過tuple._1()和tuple._2()訪問它的字段。
key-value對的RDDs通過JavaPairRDD表示。你可以通過JavaRDDs構(gòu)建JavaPairRDDs,使用指定的map操作版本,像mapToPair和flatMapToPair。JavaPair將不僅擁有標(biāo)準(zhǔn)RDD函數(shù),并且有特殊的key-value函數(shù)。
例如,下面的代碼在key-value對上使用reduceByKey操作來計(jì)算在一個(gè)文件中每行文本出現(xiàn)的次數(shù)和。
JavaRDD
lines=sc.textFile("data.txt"); JavaPairRDD pairs=lines.mapToPair(s->new Tuple2(s,1)) JavaPairRDD counts=pairs.reduceByKey((a,b)->a+b);
我們也可以使用counts.sortByKey(),例如,按照字母順序排序這個(gè)鍵值對。并且最后調(diào)用counts.collect()作為一個(gè)對象數(shù)組返回給驅(qū)動(dòng)程序。
注意:當(dāng)使用自定義的對象作為key-value對操作的key時(shí),你必須確保自定義equals()方法伴隨著一個(gè)匹配的hashCode()方法。有關(guān)詳情,參考 Object.hashCode() 文檔大綱中列出的規(guī)定。
####轉(zhuǎn)換#### 下面的表格列出了Spark支持的常見的轉(zhuǎn)換。更多信息可以參考RDD API 文檔和pair RDD 函數(shù)文檔。
####動(dòng)作#### 下面的表格列出了Spark支持的常見的動(dòng)作。更多信息可以參考RDD API 文檔和pair RDD 函數(shù)文檔。
###RDD持久化### Spark最重要的一個(gè)功能是在不同的操作間,持久化(或者緩存)一個(gè)數(shù)據(jù)集到內(nèi)存中。當(dāng)你持久化一個(gè)RDD時(shí),每一個(gè)節(jié)點(diǎn)都把它計(jì)算的分片結(jié)果保存在內(nèi)存中,并且在對此數(shù)據(jù)集(或者衍生出的數(shù)據(jù)集)進(jìn)行其他動(dòng)作時(shí)重用。這將使后續(xù)的動(dòng)作變得更快(通過快109倍以上)。緩存是(Spark)迭代算法和快速交互使用的關(guān)鍵工具。
你可以使用persist()和cache()方法來標(biāo)記一個(gè)將要持久化的RDD。第一次他被一個(gè)動(dòng)作進(jìn)行計(jì)算,他將會(huì)保留在這個(gè)節(jié)點(diǎn)的內(nèi)存中。Spark的緩存有容錯(cuò)性-如果RDD的任何一個(gè)分區(qū)丟失了,他會(huì)通過使用最初創(chuàng)建的它轉(zhuǎn)換操作,自動(dòng)重新計(jì)算。
此外,每一個(gè)持久化RDD可以使用不同的存儲級別存儲。允許你,例如,持久化數(shù)據(jù)集到磁盤,持久化數(shù)據(jù)集到內(nèi)存作為序列化的Java對象(節(jié)省空間),跨節(jié)點(diǎn)復(fù)制,或者 store it off-heap in Tachyon。這些級別通過傳遞一個(gè)StorageLevel對象(Scala,Java,Python)到persist()來設(shè)置。cache()方法是使用默認(rèn)存儲級別的快捷方法,即StorageLevel.MEMORY_ONLY(存儲反序列化對象到內(nèi)存),完整的存儲級別設(shè)置為:
Spark也會(huì)在shuffle操作(例如,reduceByKey)中自動(dòng)的持久化一些中間數(shù)據(jù)。甚至當(dāng)用戶未調(diào)用persist方法。這樣做是為了阻止在進(jìn)行shuffle操作時(shí)由于一個(gè)節(jié)點(diǎn)故障而重新計(jì)算整個(gè)輸入。我們依然推薦用戶在作為結(jié)果的RDD上調(diào)用persist如果想打算重用它。
####存儲級別的選擇####
####移除數(shù)據(jù)#### Spark自動(dòng)監(jiān)視每一個(gè)節(jié)點(diǎn)上的緩存使用,并且使用LRU方式刪除老的數(shù)據(jù)分區(qū)。如果你想手工的刪除yige RDD而不是等他自動(dòng)從緩存中清除,使用RDD.unpersist()方法。
##共享變量## 通常,當(dāng)傳遞給Spark操作(例如map或者reduce)的函數(shù)在遠(yuǎn)程集群節(jié)點(diǎn)上運(yùn)行時(shí),它實(shí)際上操作的是這個(gè)函數(shù)使用到的所有變量的獨(dú)立拷貝。這些變量被拷貝到每一臺機(jī)器,并且在遠(yuǎn)程機(jī)器上的對這些變量的所有更新都不會(huì)傳回給驅(qū)動(dòng)程序。通??磥?,在不同的任務(wù)之間讀寫變量是低效的。然而,Spark還是為兩種常見的使用模式提供了兩種有限的共享變量:廣播變量和累加器。
###廣播變量### 廣播變量允許程序員保存一個(gè)只讀的變量緩存在每一臺機(jī)器上,而不是每個(gè)任務(wù)都保存一份拷貝。它們可以這樣被使用,例如:以一種高效的方式給每一個(gè)節(jié)點(diǎn)一個(gè)大的輸入數(shù)據(jù)集。Spark會(huì)嘗試使用一種高效的廣播算法來分配廣播變量,以減小通信的代價(jià)。
廣播變量通過調(diào)用SparkContext.broadcast(v)方法從變量v創(chuàng)建。廣播變量是一個(gè)v的包裝器。它的值可以通過調(diào)用value方法訪問。下面的代碼展示了這些:
BroadcastbroadcastVar = sc.broadcast(new int[] {1, 2, 3}); broadcastVar.value(); // returns [1, 2, 3]
在廣播變量被創(chuàng)建后,它應(yīng)該在集群上任何函數(shù)中代替v被使用,使v不再傳遞到這些節(jié)點(diǎn)上。此外,對象v在被廣播后不能被修改,這樣可以保證所有節(jié)點(diǎn)獲得的廣播變量的值是相同的(例如,這個(gè)變量在之后被傳遞到一個(gè)新的節(jié)點(diǎn))。
###累加器### 累加器是一種只能通過關(guān)聯(lián)操作進(jìn)行”加“操作的變量。因此可以高效的支持并行計(jì)算。它們可以用于實(shí)現(xiàn)計(jì)數(shù)器(*例如在MapReduce中)和求和。Spark原生支持?jǐn)?shù)字類型的累加器。開發(fā)者也可以自己添加新的支持類型。
一個(gè)累加器可以通過調(diào)用SparkContext.accumulator(v)方法從一個(gè)初始值v中創(chuàng)建。運(yùn)行在集群上的任務(wù),可以通過使用add方法或者+=操作(在Scala和Python中)來給它加值。然而,他們不能讀取這個(gè)值。只有驅(qū)動(dòng)程序可以使用value的方法來讀取累加器的值。
如下的代碼,展示了如何利用累加器,將一個(gè)數(shù)組里面的所有元素相加:
Accumulatoraccum = sc.accumulator(0); sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x)); // ... // 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s accum.value(); // returns 10
雖然這段代碼使用了內(nèi)置支持Integer類型的累加器。但是開發(fā)者也可以通過實(shí)現(xiàn)AccumulatorParam創(chuàng)建自己的類型 。AccumulatorParam接口有兩個(gè)方法:zero為你的數(shù)據(jù)類型提供一個(gè)"zero value",addInPlace將兩個(gè)值相加。例如,假設(shè)我們有一個(gè)向量類來表示數(shù)學(xué)向量,我們可以這樣寫:
class VectorAccumulatorParam implements AccumulatorParam{ public Vector zero(Vector initialValue) { return Vector.zeros(initialValue.size()); } public Vector addInPlace(Vector v1, Vector v2) { v1.addInPlace(v2); return v1; } } // Then, create an Accumulator of this type: Accumulator vecAccum = sc.accumulator(new Vector(...), new VectorAccumulatorParam());
在Java中,Spark也支持更通用的Accumulable接口來累加數(shù)據(jù),他們的計(jì)算結(jié)果類型和相加的元素的類型不一樣(例如,收集同樣的元素構(gòu)建一個(gè)list)。
感謝各位的閱讀,以上就是“Spark編程知識點(diǎn)有哪些”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對Spark編程知識點(diǎn)有哪些這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!