創(chuàng)新互聯(lián)公司總部坐落于成都市區(qū),致力網(wǎng)站建設(shè)服務(wù)有成都做網(wǎng)站、網(wǎng)站建設(shè)、外貿(mào)營銷網(wǎng)站建設(shè)、網(wǎng)絡(luò)營銷策劃、網(wǎng)頁設(shè)計、網(wǎng)站維護、公眾號搭建、小程序開發(fā)、軟件開發(fā)等為企業(yè)提供一整套的信息化建設(shè)解決方案。創(chuàng)造真正意義上的網(wǎng)站建設(shè),為互聯(lián)網(wǎng)品牌在互動行銷領(lǐng)域創(chuàng)造價值而不懈努力!
1.啟動spark集群,就是執(zhí)行sbin/start-all.sh,啟動master和多個worker節(jié)點,master主要作為集群的管理和監(jiān)控,worker節(jié)點主要擔任運行各個application的任務(wù)。master節(jié)點需要讓worker節(jié)點匯報自身狀況,比如CPU,內(nèi)存多大,這個過程都是通過心跳機制來完成的
2.master收到worker的匯報信息之后,會給予worker信息
3.driver提交任務(wù)給spark集群[driver和master之間的通信是通過AKKAactor來做的,也就是說master是akkaactor異步通信模型中的一個actor模型,driver也是一樣,driver異步向mater發(fā)送注冊信息(registerApplication)異步注冊信息]
4.master節(jié)點對application預(yù)估,7個G的內(nèi)存完成任務(wù),對任務(wù)進行分配,每一個worker節(jié)點上都分配3.5G的內(nèi)存去執(zhí)行任務(wù),在master就對各個worker上的任務(wù)進行整體的監(jiān)控調(diào)度
5.worker節(jié)點領(lǐng)到任務(wù),開始執(zhí)行,在worker節(jié)點上啟動相應(yīng)的executor進程來執(zhí)行,每個executor中都有一個線程池的概念,里面存有多個task線程
6.executor會從線程池中取出task去計算rddpatition中的數(shù)據(jù),transformation操作,action操作
7.worker節(jié)點向driver節(jié)點匯報計算狀態(tài)
通過本地并行化集合創(chuàng)建RDD
public class JavaLocalSumApp{ public static void main(String[] args){ SparkConf conf = new SparkConf().setAppName("JavaLocalSumApp"); JavaSparkContext sc = new JavaSparkContext(conf); Listlist = Arrays.asList(1,3,4,5,6,7,8); //通過本地并行化集合創(chuàng)建RDD JavaRDD listRDD = sc.parallelize(list); //求和 Integer sum = listRDD.reduce(new Function2 (){ @Override public Integer call(Integer v1,Integer v2) throws Exception{ return v1+v2; } } ); System.out.println(sum) } } //java 中的函數(shù)式編程,需要將編譯器設(shè)置成1.8 listRDD.reduce((v1,v2)=> v1+v2)
Sparktransformation和action操作
RDD:彈性分布式數(shù)據(jù)集,是一種集合,支持多種來源,有容錯機制,可以被緩存,支持并行操作,一個RDD代表一個分區(qū)里的數(shù)據(jù)集
RDD有兩種操作算子:
Transformation(轉(zhuǎn)化):Transformation屬于延遲計算,當一個RDD轉(zhuǎn)換成另一個RDD時并沒有立即進行轉(zhuǎn)換,緊緊是記住了數(shù)據(jù)集的邏輯操作
Action(執(zhí)行):觸發(fā)Spark作業(yè)的運行,真正觸發(fā)轉(zhuǎn)換算子的計算
spark算子的作用
該圖描述的是Spark在運行轉(zhuǎn)換中通過算子對RDD進行轉(zhuǎn)換,算子是RDD中定義的函數(shù),可以對RDD中的數(shù)據(jù)進行轉(zhuǎn)換和操作。
輸入:在Spark程序運行中,數(shù)據(jù)從外部數(shù)據(jù)空間(如分布式存儲:textFile讀取HDFS等,parallelize方法輸入Scala集合或數(shù)據(jù))輸入Spark ,數(shù)據(jù)進入Spark運行時數(shù)據(jù)空間,轉(zhuǎn)化為Spark中的數(shù)據(jù)塊,通過BlockManager進行管理
運行:在Spark數(shù)據(jù)輸入形成RDD后便可以通過變換算子,如filter等。對數(shù)據(jù)進行操作并將RDD轉(zhuǎn)換為新的RDD,通過Action算子,觸發(fā)Spark提交作業(yè),如果數(shù)據(jù)需要復(fù)用,可以通過Cache算子,將數(shù)據(jù)緩存到內(nèi)存
輸出:程序運行結(jié)束數(shù)據(jù)會輸出Spark運行時空間,存儲到分布式存儲中(如saveAsTextFile輸出到HDFS),或Scala數(shù)據(jù)或集合中(collect輸出到Scala集合,count返回Scala int 型數(shù)據(jù))
Transformation 和 Actions操作概況
Transformation
map(func):返回一個新的分布式數(shù)據(jù)集,由每個原元素經(jīng)過func函數(shù)轉(zhuǎn)換后組成
filter(func) :返回一個新的數(shù)據(jù)集,由經(jīng)過func函數(shù)
flatMap(func):類似于map,但是每一個輸入元素,會被映射為0到多個輸出元素(因此,func函數(shù)的返回值是一個Seq,而不是單一元素)
sample(withReplacement, frac, seed): 根據(jù)給定的隨機種子seed,隨機抽樣出數(shù)量為frac的數(shù)據(jù)
union(otherDataset): 返回一個新的數(shù)據(jù)集,由原數(shù)據(jù)集和參數(shù)聯(lián)合而成
roupByKey([numTasks]): 在一個由(K,V)對組成的數(shù)據(jù)集上調(diào)用,返回一個(K,Seq[V])對的數(shù)據(jù)集。注意:默認情況下,使用8個并行任務(wù)進行分組,你可以傳入numTask可選參數(shù),根據(jù)數(shù)據(jù)量設(shè)置不同數(shù)目的Task
reduceByKey(func, [numTasks]): 在一個(K,V)對的數(shù)據(jù)集上使用,返回一個(K,V)對的數(shù)據(jù)集,key相同的值,都被使用指定的reduce函數(shù)聚合到一起。和groupbykey類似,任務(wù)的個數(shù)是可以通過第二個可選參數(shù)來配置的。
join(otherDataset, [numTasks]): 在類型為(K,V)和(K,W)類型的數(shù)據(jù)集上調(diào)用,返回一個(K,(V,W))對,每個key中的所有元素都在一起的數(shù)據(jù)集
groupWith(otherDataset, [numTasks]): 在類型為(K,V)和(K,W)類型的數(shù)據(jù)集上調(diào)用,返回一個數(shù)據(jù)集,組成元素為(K, Seq[V], Seq[W]) Tuples。這個操作在其它框架,稱為CoGroup
cartesian(otherDataset): 笛卡爾積。但在數(shù)據(jù)集T和U上調(diào)用時,返回一個(T,U)對的數(shù)據(jù)集,所有元素交互進行笛卡爾積。
Actions操作
reduce(func): 通過函數(shù)func聚集數(shù)據(jù)集中的所有元素。Func函數(shù)接受2個參數(shù),返回一個值。這個函數(shù)必須是關(guān)聯(lián)性的,確??梢员徽_的并發(fā)執(zhí)行
collect(): 在Driver的程序中,以數(shù)組的形式,返回數(shù)據(jù)集的所有元素。這通常會在使用filter或者其它操作后,返回一個足夠小的數(shù)據(jù)子集再使用,直接將整個RDD集Collect返回,很可能會讓Driver程序OOM
count(): 返回數(shù)據(jù)集的元素個數(shù)
take(n): 返回一個數(shù)組,由數(shù)據(jù)集的前n個元素組成。注意,這個操作目前并非在多個節(jié)點上,并行執(zhí)行,而是Driver程序所在機器,單機計算所有的元素(Gateway的內(nèi)存壓力會增大,需要謹慎使用)
first(): 返回數(shù)據(jù)集的第一個元素(類似于take(1))
saveAsTextFile(path): 將數(shù)據(jù)集的元素,以textfile的形式,保存到本地文件系統(tǒng),hdfs或者任何其它hadoop支持的文件系統(tǒng)。Spark將會調(diào)用每個元素的toString方法,并將它轉(zhuǎn)換為文件中的一行文本
saveAsSequenceFile(path): 將數(shù)據(jù)集的元素,以sequencefile的格式,保存到指定的目錄下,本地系統(tǒng),hdfs或者任何其它hadoop支持的文件系統(tǒng)。RDD的元素必須由key-value對組成,并都實現(xiàn)了Hadoop的Writable接口,或隱式可以轉(zhuǎn)換為Writable(Spark包括了基本類型的轉(zhuǎn)換,例如Int,Double,String等等)
foreach(func): 在數(shù)據(jù)集的每一個元素上,運行函數(shù)func。這通常用于更新一個累加器變量,或者和外部存儲系統(tǒng)做交互
WordCount執(zhí)行過程
總結(jié)
以上就是本文關(guān)于Spark 調(diào)度架構(gòu)原理詳解的全部內(nèi)容,希望對大家有所幫助。感興趣的朋友可以繼續(xù)參閱本站其他相關(guān)專題,如有不足之處,歡迎留言指出。感謝朋友們對本站的支持!