[TOC]
成都創(chuàng)新互聯(lián)公司2013年至今,是專業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項(xiàng)目做網(wǎng)站、網(wǎng)站制作網(wǎng)站策劃,項(xiàng)目實(shí)施與項(xiàng)目整合能力。我們以讓每一個(gè)夢(mèng)想脫穎而出為使命,1280元宜豐做網(wǎng)站,已為上家服務(wù),為宜豐各地企業(yè)和個(gè)人服務(wù),聯(lián)系電話:18982081108
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
//創(chuàng)建spark配置文件對(duì)象.設(shè)置app名稱,master地址,local表示為本地模式。
//如果是提交到集群中,通常不指定。因?yàn)榭赡茉诙鄠€(gè)集群匯上跑,寫死不方便
val conf = new SparkConf().setAppName("wordCount")
//創(chuàng)建spark context對(duì)象
val sc = new SparkContext(conf)
sc.textFile(args(0)).flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.saveAsTextFile(args(1))
sc.stop()
}
}
核心代碼很簡(jiǎn)單,首先看 textFile這個(gè)函數(shù)
SparkContext.scala
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
//指定文件路徑、輸入的格式類為textinputformat,輸出的key類型為longwritable,輸出的value類型為text
//map(pair => pair._2.toString)取出前面的value,然后將value轉(zhuǎn)為string類型
//最后將處理后的value返回成一個(gè)新的list,也就是RDD[String]
//setName(path) 設(shè)置該file名字為路徑
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
關(guān)鍵性的操作就是:
返回了一個(gè)hadoopFile,它有幾個(gè)參數(shù):
path:文件路徑
classOf[TextInputFormat]:這個(gè)其實(shí)就是輸入文件的處理類,也就是我們mr中分析過的TextInputFormat,其實(shí)就是直接拿過來的用的,不要懷疑,就是醬紫的
classOf[LongWritable], classOf[Text]:這兩個(gè)其實(shí)可以猜到了,就是輸入的key和value的類型。
接著執(zhí)行了一個(gè)map(pair => pair._2.toString),將KV中的value轉(zhuǎn)為string類型
我們接著看看hadoopFile 這個(gè)方法
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// This is a hack to enforce loading hdfs-site.xml.
// See SPARK-11227 for details.
FileSystem.getLocal(hadoopConfiguration)
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
//看到這里,最后返回的是一個(gè) HadoopRDD 對(duì)象
//指定sc對(duì)象,配置文件、輸入方法類、KV類型、分區(qū)個(gè)數(shù)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}
最后返回HadoopRDD對(duì)象。
接著就是flatMap(.split(" ")) .map((,1)),比較簡(jiǎn)單
flatMap(_.split(" "))
就是將輸入每一行,按照空格切割,然后切割后的元素稱為一個(gè)新的數(shù)組。
然后將每一行生成的數(shù)組合并成一個(gè)大數(shù)組。
map((_,1))
將每個(gè)元素進(jìn)行1的計(jì)數(shù),組成KV對(duì),K是元素,V是1
接著看.reduceByKey(_+_)
這個(gè)其實(shí)就是將同一key的KV進(jìn)行聚合分組,然后將同一key的value進(jìn)行相加,最后就得出某個(gè)key對(duì)應(yīng)的value,也就是某個(gè)單詞的個(gè)數(shù)
看看這個(gè)函數(shù)
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}
這個(gè)過程中會(huì)分區(qū),默認(rèn)分區(qū)數(shù)是2,使用的是HashPartitioner進(jìn)行分區(qū),可以指定分區(qū)的最小個(gè)數(shù)
圖2.1 spark資源調(diào)度
1、執(zhí)行提交命令,會(huì)在client客戶端啟動(dòng)一個(gè)spark-submit進(jìn)程(用來為Driver申請(qǐng)資源)。
2、為Driver向Master申請(qǐng)資源,在Master的waitingDrivers 集合中添加這個(gè)Driver要申請(qǐng)的信息。Master查看works集合,挑選出合適的Work節(jié)點(diǎn)。
3、在選中的Work節(jié)點(diǎn)中啟動(dòng)Driver進(jìn)程(Driver進(jìn)程已經(jīng)啟動(dòng)了,spark-submit的使命已經(jīng)完成了,關(guān)閉該進(jìn)程)。所以其實(shí)driver也需要資源,也只是跑在executor上的一個(gè)線程而已
4、Driver進(jìn)程為要運(yùn)行的Application申請(qǐng)資源(這個(gè)資源指的是Executor進(jìn)程)。此時(shí)Master的waitingApps 中要添加這個(gè)Application申請(qǐng)的資源信息。這時(shí)要根據(jù)申請(qǐng)資源的要求去計(jì)算查看需要用到哪些Worker節(jié)點(diǎn)(每一個(gè)節(jié)點(diǎn)要用多少資源)。在這些節(jié)點(diǎn)啟動(dòng)Executor進(jìn)程。
(注:輪詢啟動(dòng)Executor。Executor占用這個(gè)節(jié)點(diǎn)1G內(nèi)存和這個(gè)Worker所能管理的所有的core)
5、此時(shí)Driver就可以分發(fā)任務(wù)到各個(gè)Worker節(jié)點(diǎn)的Executor進(jìn)程中運(yùn)行了。
Master中的三個(gè)集合
val works = new HashSet[WorkInfo]()
??works 集合采用HashSet數(shù)組存儲(chǔ)work的節(jié)點(diǎn)信息,可以避免存放重復(fù)的work節(jié)點(diǎn)。為什么要避免重復(fù)?首先我們要知道work節(jié)點(diǎn)有可能因?yàn)槟承┰驋斓?,掛掉之后下一次與master通信時(shí)會(huì)報(bào)告給master,這個(gè)節(jié)點(diǎn)掛掉了,然后master會(huì)在works對(duì)象里把這個(gè)節(jié)點(diǎn)去掉,等下次再用到這個(gè)節(jié)點(diǎn)是時(shí)候,再加進(jìn)來。這樣來說,理論上是不會(huì)有重復(fù)的work節(jié)點(diǎn)的??墒怯幸环N特殊情況:work掛掉了,在下一次通信前又自己?jiǎn)?dòng)了,這時(shí)works里面就會(huì)有重復(fù)的work信息。
??val waitingDrivers = new ArrayBuffer[DriverInfo]()
??當(dāng)客戶端向master為Driver申請(qǐng)資源時(shí),會(huì)將要申請(qǐng)的Driver的相關(guān)信息封裝到master節(jié)點(diǎn)的DriverInfo這個(gè)泛型里,然后添加到waitingDrivers 里。master會(huì)監(jiān)控這個(gè)waitingDrivers 對(duì)象,當(dāng)waitingDrivers集合中的元素不為空時(shí),說明有客戶端向master申請(qǐng)資源了。此時(shí)應(yīng)該先查看一下works集合,找到符合要求的worker節(jié)點(diǎn),啟動(dòng)Driver。當(dāng)Driver啟動(dòng)成功后,會(huì)把這個(gè)申請(qǐng)信息從waitingDrivers 對(duì)象中移除。
?? val waitingApps = new ArrayBuffer[ApplicationInfo]()
??Driver啟動(dòng)成功后,會(huì)為application向master申請(qǐng)資源,這個(gè)申請(qǐng)信息封存到master節(jié)點(diǎn)的waitingApps 對(duì)象中。同樣的,當(dāng)waitingApps 集合不為空,說明有Driver向Master為當(dāng)前的Application申請(qǐng)資源。此時(shí)查看workers集合,查找到合適的Worker節(jié)點(diǎn)啟動(dòng)Executor進(jìn)程,默認(rèn)的情況下每一個(gè)Worker只是為每一個(gè)Application啟動(dòng)一個(gè)Executor,這個(gè)Executor會(huì)使用1G內(nèi)存和所有的core。啟動(dòng)Executor后把申請(qǐng)信息從waitingApps 對(duì)象中移除。
??注意點(diǎn):上面說到master會(huì)監(jiān)控這三個(gè)集合,那么到底是怎么監(jiān)控的呢???
??master并不是分出來線程專門的對(duì)這三個(gè)集合進(jìn)行監(jiān)控,相對(duì)而言這樣是比較浪費(fèi)資源的。master實(shí)際上是‘監(jiān)控’這三個(gè)集合的改變,當(dāng)這三個(gè)集合中的某一個(gè)集合發(fā)生變化時(shí)(新增或者刪除),那么就會(huì)調(diào)用schedule()方法。schedule方法中封裝了上面提到的處理邏輯。
1、默認(rèn)情況下,每一個(gè)Worker只會(huì)為每一個(gè)Application啟動(dòng)一個(gè)Executor。每個(gè)Executor默認(rèn)使用1G內(nèi)存和這個(gè)Worker所能管理的所有的core。
2、如果想要在一個(gè)Worker上啟動(dòng)多個(gè)Executor,在提交Application的時(shí)候要指定Executor使用的core數(shù)量(避免使用該worker所有的core)。提交命令:spark-submit --executor-cores
3、默認(rèn)情況下,Executor的啟動(dòng)方式是輪詢啟動(dòng),一定程度上有利于數(shù)據(jù)的本地化。
什么是輪詢啟動(dòng)???為什么要輪訓(xùn)啟動(dòng)呢???
??輪詢啟動(dòng):輪詢啟動(dòng)就是一個(gè)個(gè)的啟動(dòng)。例如這里有5個(gè)人,每個(gè)人要發(fā)一個(gè)蘋果+一個(gè)香蕉。輪詢啟動(dòng)的分發(fā)思路就是:五個(gè)人先一人分一個(gè)蘋果,分發(fā)完蘋果再分發(fā)香蕉。
??為什么要使用輪詢啟動(dòng)的方式呢???我們做大數(shù)據(jù)計(jì)算首先肯定想的是計(jì)算找數(shù)據(jù)。在數(shù)據(jù)存放的地方直接計(jì)算,而不是把數(shù)據(jù)搬過來再計(jì)算。我們有n臺(tái)Worker節(jié)點(diǎn),如果只是在數(shù)據(jù)存放的節(jié)點(diǎn)計(jì)算。只用了幾臺(tái)Worker去計(jì)算,大部分的worker都是閑置的。這種方案肯定不可行。所以我們就使用輪詢方式啟動(dòng)Executor,先在每一臺(tái)節(jié)點(diǎn)都允許一個(gè)任務(wù)。
??存放數(shù)據(jù)的節(jié)點(diǎn)由于不需要網(wǎng)絡(luò)傳輸數(shù)據(jù),所以肯定速度快,執(zhí)行的task數(shù)量就會(huì)比較多。這樣不會(huì)浪費(fèi)集群資源,也可以在存放數(shù)據(jù)的節(jié)點(diǎn)進(jìn)行計(jì)算,在一定程度上也有利于數(shù)據(jù)的本地化。
粗粒度(富二代):
在任務(wù)執(zhí)行之前,會(huì)先將資源申請(qǐng)完畢,當(dāng)所有的task執(zhí)行完畢,才會(huì)釋放這部分資源。
優(yōu)點(diǎn):每一個(gè)task執(zhí)行前。不需要自己去申請(qǐng)資源了,節(jié)省啟動(dòng)時(shí)間。
缺點(diǎn):等到所有的task執(zhí)行完才會(huì)釋放資源(也就是整個(gè)job執(zhí)行完成),集群的資源就無法充分利用。
這是spark使用的調(diào)度粒度,主要是為了讓stage,job,task的執(zhí)行效率高一點(diǎn)
細(xì)粒度(窮二代):
Application提交的時(shí)候,每一個(gè)task自己去申請(qǐng)資源,task申請(qǐng)到資源才會(huì)執(zhí)行,執(zhí)行完這個(gè)task會(huì)立刻釋放資源。
優(yōu)點(diǎn):每一個(gè)task執(zhí)行完畢之后會(huì)立刻釋放資源,有利于充分利用資源。
缺點(diǎn):由于需要每一個(gè)task自己去申請(qǐng)資源,導(dǎo)致task啟動(dòng)時(shí)間過長(zhǎng),進(jìn)而導(dǎo)致stage、job、application啟動(dòng)時(shí)間延長(zhǎng)。
我們提交任務(wù)時(shí),可以指定一些資源限制的參數(shù):
--executor-cores : 單個(gè)executor使用的core數(shù)量,不指定的話默認(rèn)使用該worker所有能調(diào)用的core
--executor-memory : 單個(gè)executor使用的內(nèi)存大小,如1G。默認(rèn)是1G
--total-executor-cores : 整個(gè)application最多使用的core數(shù)量,防止獨(dú)占整個(gè)集群資源
https://blog.csdn.net/qq_33247435/article/details/83653584#3Spark_51
一個(gè)application的調(diào)度到完成,需要經(jīng)過以下階段:
application-->資源調(diào)度-->任務(wù)調(diào)度(task)-->并行計(jì)算-->完成
圖3.1 spark調(diào)度流程
可以看到,driver啟動(dòng)后,會(huì)有下面兩個(gè)對(duì)象:
DAGScheduler:
據(jù)RDD的寬窄依賴關(guān)系將DAG有向無環(huán)圖切割成一個(gè)個(gè)的stage,將stage封裝給另一個(gè)對(duì)象taskSet,taskSet=stage,然后將一個(gè)個(gè)的taskSet給taskScheduler。
taskScheduler:
taskSeheduler拿倒taskSet之后,會(huì)遍歷這個(gè)taskSet,拿到每一個(gè)task,然后去調(diào)用HDFS上的方法,獲取數(shù)據(jù)的位置,根據(jù)獲得的數(shù)據(jù)位置分發(fā)task到響應(yīng)的Worker節(jié)點(diǎn)的Executor進(jìn)程中的線程池中執(zhí)行。并且會(huì)根據(jù)每個(gè)task的執(zhí)行情況監(jiān)控,等到所有task執(zhí)行完成后,就告訴master將所喲executor殺死
任務(wù)調(diào)度中主要涉涉及到以下流程:
1)、DAGScheduler:根據(jù)RDD的寬窄依賴關(guān)系將DAG有向無環(huán)圖切割成一個(gè)個(gè)的stage,將stage封裝給另一個(gè)對(duì)象taskSet,taskSet=stage,然后將一個(gè)個(gè)的taskSet給taskScheduler。
2)、taskScheduler:taskSeheduler拿倒taskSet之后,會(huì)遍歷這個(gè)taskSet,拿到每一個(gè)task,然后去調(diào)用HDFS上的方法,獲取數(shù)據(jù)的位置,根據(jù)獲得的數(shù)據(jù)位置分發(fā)task到響應(yīng)的Worker節(jié)點(diǎn)的Executor進(jìn)程中的線程池中執(zhí)行。
3)、taskScheduler:taskScheduler節(jié)點(diǎn)會(huì)跟蹤每一個(gè)task的執(zhí)行情況,若執(zhí)行失敗,TaskScher會(huì)嘗試重新提交,默認(rèn)會(huì)重試提交三次,如果重試三次依然失敗,那么這個(gè)task所在的stage失敗,此時(shí)TaskScheduler向DAGScheduler做匯報(bào)。
4)DAGScheduler:接收到stage失敗的請(qǐng)求后,,此時(shí)DAGSheduler會(huì)重新提交這個(gè)失敗的stage,已經(jīng)成功的stage不會(huì)重復(fù)提交,只會(huì)重試這個(gè)失敗的stage。
(注:如果DAGScheduler重試了四次依然失敗,那么這個(gè)job就失敗了,job不會(huì)重試
掉隊(duì)任務(wù)的概念:
當(dāng)所有的task中,75%以上的task都運(yùn)行成功了,就會(huì)每隔一百秒計(jì)算一次,計(jì)算出目前所有未成功任務(wù)執(zhí)行時(shí)間的中位數(shù)*1.5,凡是比這個(gè)時(shí)間長(zhǎng)的task都是掙扎的task。
總的調(diào)度流程:
=======================================資源調(diào)度=========================================
1、啟動(dòng)Master和備用Master(如果是高可用集群需要啟動(dòng)備用Master,否則沒有備用Master)。
2、啟動(dòng)Worker節(jié)點(diǎn)。Worker節(jié)點(diǎn)啟動(dòng)成功后會(huì)向Master注冊(cè)。在works集合中添加自身信息。
3、在客戶端提交Application,啟動(dòng)spark-submit進(jìn)程。偽代碼:spark-submit --master --deploy-mode cluster --class jarPath
4、Client向Master為Driver申請(qǐng)資源。申請(qǐng)信息到達(dá)Master后在Master的waitingDrivers集合中添加該Driver的申請(qǐng)信息。
5、當(dāng)waitingDrivers集合不為空,調(diào)用schedule()方法,Master查找works集合,在符合條件的Work節(jié)點(diǎn)啟動(dòng)Driver。啟動(dòng)Driver成功后,waitingDrivers集合中的該條申請(qǐng)信息移除。Client客戶端的spark-submit進(jìn)程關(guān)閉。
(Driver啟動(dòng)成功后,會(huì)創(chuàng)建DAGScheduler對(duì)象和TaskSchedule對(duì)象)
6、當(dāng)TaskScheduler創(chuàng)建成功后,會(huì)向Master會(huì)Application申請(qǐng)資源。申請(qǐng)請(qǐng)求發(fā)送到Master端后會(huì)在waitingApps集合中添加該申請(qǐng)信息。
7、當(dāng)waitingApps集合中的元素發(fā)生改變,會(huì)調(diào)用schedule()方法。查找works集合,在符合要求的worker節(jié)點(diǎn)啟動(dòng)Executor進(jìn)程。
8、當(dāng)Executor進(jìn)程啟動(dòng)成功后會(huì)將waitingApps集合中的該申請(qǐng)信息移除。并且向TaskSchedule反向注冊(cè)。此時(shí)TaskSchedule就有一批Executor的列表信息。
=======================================任務(wù)調(diào)度=========================================
9、根據(jù)RDD的寬窄依賴,切割job,劃分stage。每一個(gè)stage是由一組task組成的。每一個(gè)task是一個(gè)pipleline計(jì)算模式。
10、TaskScheduler會(huì)根據(jù)數(shù)據(jù)位置分發(fā)task。(taskScheduler是如何拿到數(shù)據(jù)位置的???TaskSchedule調(diào)用HDFS的api,拿到數(shù)據(jù)的block塊以及block塊的位置信息)
11、TaskSchedule分發(fā)task并且監(jiān)控task的執(zhí)行情況。
12、若task執(zhí)行失敗或者掙扎。會(huì)重試這個(gè)task。默認(rèn)會(huì)重試三次。
13、若重試三次依舊失敗。會(huì)把這個(gè)task返回給DAGScheduler,DAGScheduler會(huì)重試這個(gè)失敗的stage(只重試失敗的這個(gè)stage)。默認(rèn)重試四次。
14、告訴master,將集群中的executor殺死,釋放資源。