真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

怎么分析spark計算框架

本篇文章給大家分享的是有關(guān)怎么分析spark計算框架,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

創(chuàng)新互聯(lián)企業(yè)建站,十余年網(wǎng)站建設(shè)經(jīng)驗,專注于網(wǎng)站建設(shè)技術(shù),精于網(wǎng)頁設(shè)計,有多年建站和網(wǎng)站代運營經(jīng)驗,設(shè)計師為客戶打造網(wǎng)絡(luò)企業(yè)風格,提供周到的建站售前咨詢和貼心的售后服務(wù)。對于網(wǎng)站建設(shè)、做網(wǎng)站中不同領(lǐng)域進行深入了解和探索,創(chuàng)新互聯(lián)在網(wǎng)站建設(shè)中充分了解客戶行業(yè)的需求,以靈動的思維在網(wǎng)頁中充分展現(xiàn),通過對客戶行業(yè)精準市場調(diào)研,為客戶提供的解決方案。

首先明確一點:學計算框架主要就是學2部分:1.資源調(diào)度 2.任務(wù)調(diào)度

寫一個spark程序包含加載配置文件,創(chuàng)建上下文,創(chuàng)建RDD , 調(diào)用RDD的算子,用戶在算子中自定義的函數(shù)

map端:狹窄的理解是MapReduce中的map端,本質(zhì)就是將數(shù)據(jù)變成你想要的形式,例如:按照空格切分,乘2等等操作。

shuffle : 分為shuffle write(臨時存到本地磁盤)和shuffle read(從磁盤拉數(shù)據(jù),同一個分區(qū)的拉到一個partition上)階段,本質(zhì)就是數(shù)據(jù)的規(guī)整,例如同一個分區(qū)的拉到一塊。

reduce端:狹窄的理解是MapReduce中的reduce端,本質(zhì)就是數(shù)據(jù)的聚合

寬泛的理解2個stage之間,前面的可以說是map端,后面的stage可以理解為reduce端,中間正好需要shuffle過程,且shuffle過程需要再shuffle write階段將數(shù)據(jù)暫時存到本地磁盤上。

spark專業(yè)術(shù)語:

任務(wù)相關(guān)的專業(yè)術(shù)語:

1.application:用戶寫的應(yīng)用程序(包含2部分:Driver Program(運行應(yīng)用的main()方法,創(chuàng)建spark上下文 )和Executor Program(用戶在算子中自定義的函數(shù)))

2.job:一個action類算子觸發(fā)執(zhí)行的操作,有多少個action類算子就有多少個job,一個應(yīng)用程序可以有多個job.

3.stage(階段):一組任務(wù)(task)就是一個stage,例如MapReduce中一組的map task(一個切片對應(yīng)一個map task),一個job中可以有有多個stage(根據(jù)寬依賴為分界線來劃分的)

.4.task(任務(wù):底層就是一個thread(線程)):在集群運行時最小的執(zhí)行單元

集群相關(guān)的專業(yè)術(shù)語:

Master:資源管理的主節(jié)點

Worker:資源管理的從節(jié)點

Executor:執(zhí)行任務(wù)的進程,運行在worker節(jié)點上,負責運行task,負責將數(shù)據(jù)存儲到內(nèi)存或磁盤,每個application有多個獨立的Executors

ThreadPool:線程池,存在與Executor進程中,task在線程池中運行

RDD的依賴關(guān)系

RDD有5大特性:

1.一個RDD有多個partition組成。

2.每個算子實質(zhì)上作用于每個partition上。

3.每個RDD依賴其父RDD.

4.可選項 :分區(qū)器是作用于KV格式的RDD上

5.可選項:RDD會提供一系列的最佳的計算位置

父RDD不知道其子RDD,但是子RDD知道的的所有父RDD

1.窄依賴:父RDD與子RDD,partition的關(guān)系是一對一,這種情況并沒有shuffle過程

例如:map(x=>x.split(" "))

2.寬依賴 : 父RDD與子RDD,partition之間的關(guān)系是一對多,這種情況下一般都會導致shuffle數(shù)據(jù)規(guī)整的過程

例如:groupByKey()->相同key的二元組一定在同一個分區(qū)中,無參的情況下子RDD的分區(qū)數(shù)等于父RDD的分區(qū)數(shù)(也就是會先計算key的hash函數(shù)再與父RDD的分區(qū)數(shù)求余,所以最終的數(shù)據(jù)一定會散落在這幾個分區(qū)中),當然你可以傳入?yún)?shù),這個參數(shù)用于鎖定該子RDD有多少個分區(qū),后面調(diào)優(yōu)的時候會用到。

groupBy:根據(jù)指定的作為分組依據(jù),同sortBy和sortByKey

寬窄依賴的作用是:將job切割成多個stage.從祖先RDD開始找,如果是窄依賴繼續(xù)往下找,以寬依賴為切割點,分為2個stage

那么為什么要劃分出stage呢?因為每個stage中的RDD都是窄依賴,沒有shuffle過程,且每個partition都是一對一的關(guān)系,所以可以在后面以管道的形式使每個partition上的task并行處理 (簡單說就是為了是每個task以管道的形式進行計算)

關(guān)于stage的一個結(jié)論:stage與stage之間是寬依賴,stage內(nèi)部都是窄依賴

形成一個DAG(有向無環(huán)圖)需要從最后一個RDD往前回溯:因為子RDD知道父RDD,但是父RDD不知道子RDD

怎么分析spark計算框架

RDD中不是存儲的真實數(shù)據(jù),而是存儲的對數(shù)據(jù)處理的邏輯過程

對于KV格式的RDD應(yīng)該說:存儲的邏輯過程的返回類型是二元組類型我們稱為是KV格式的RDD

怎么分析spark計算框架

每個task作用于partition所在的block或副本所在的節(jié)點上(計算向數(shù)據(jù)移動,本地化可以大大減少網(wǎng)絡(luò)傳輸),這里task的計算邏輯(也就是這個展開式),處理的結(jié)果并沒有落地(存到磁盤的意思),而是以管道的模式,一條一條數(shù)據(jù)的從partition(邏輯上的,數(shù)據(jù)存在block上)中讀到內(nèi)存,在內(nèi)存中一直連續(xù)的執(zhí)行,直到最后執(zhí)行完這個task才可能會落地,一條接著一條的流式處理,一個task中的數(shù)據(jù)像流水線一樣,多個task是并行計算的。

偽代碼中的輸出:一條filter的輸出,一條map的輸出,交替出現(xiàn),而不是先將filter中的所有數(shù)據(jù)都打印出來,再打印map的數(shù)據(jù)。

從這里就能明顯感覺到spark計算框架比MapReduce計算框架的優(yōu)勢:基于內(nèi)存迭代,不需要落地,不需要存儲到磁盤,減少了磁盤IO,大大提高了效率。

幾個問題:

1.stage中的task(管道模式)并行計算,什么時候會落地磁盤呢?

①如果是stage后面是action類算子

collect:將每個管道中的輸出結(jié)果收集到driver端的內(nèi)存中

saveAsTextFile:將每個管道中的輸出結(jié)果保存到指定目錄,可以是本地磁盤,也可以是hdfs中

count:將管道的計算結(jié)果統(tǒng)計記錄數(shù),返回給Driver

②如果是stage后面是stage

在shuffle write節(jié)點會寫到本地磁盤暫時存儲,因為內(nèi)存中的數(shù)據(jù)不夠穩(wěn)定,為了防止reduce task拉取數(shù)據(jù)失敗

2.spark在計算過程中,是不是非常消耗內(nèi)存?

不是,正常使用,因為管道是很細的不會導致內(nèi)存過大,多個task并行運算,也是正常使用,但是如果使用控制類算子的 cache,就會消耗大量內(nèi)存,因為如果一個rdd調(diào)用cache(),會將這個管道,開一個口,將數(shù)據(jù)復制一份放到內(nèi)存中存儲,方便下次運行,但是非常消耗內(nèi)存。

3.RDD彈性分布式數(shù)據(jù)集,為什么不存儲數(shù)據(jù),還依然叫數(shù)據(jù)集?

因為它有處理數(shù)據(jù)的能力,可以通過生活的例子來舉例說明:例如:滴滴雖然每年一直虧損,但是市值依然很高,因為他雖然沒錢,但有創(chuàng)造錢的能力

對比一下spark和MapReduce的計算模式的差異:

mapreduce是1+1=2 2+1=3

spark是1+1+1=3

spark的任務(wù)調(diào)度過程:

怎么分析spark計算框架

1.首先編寫一個Application(上面的這個程序缺少一個action算子),一個spark應(yīng)用程序是基于RDD來操作的,會先創(chuàng)建出相應(yīng)的RDD對象,然后建立一個系統(tǒng)DAG(有向無環(huán)圖)

2.DAGScheduler(有向無環(huán)圖調(diào)度器)分割這個DAG,將其分割成多個stage,每個stage中有一組的task,所以也叫TaskSet(任務(wù)集合),一個stage就是一個TaskSet

3.將TaskSet提交給TaskScheduler(任務(wù)調(diào)度器),經(jīng)由集群管理者發(fā)送任務(wù)到worker節(jié)點運行,監(jiān)控task,會重試失敗的task和掉隊的task,不可能無限重試,所以限制重試次數(shù)為3次,默認最大失敗次數(shù)為4次,如果重試了3次還是失敗,此時TaskScheduler會向DAGScheduler匯報當前失敗的task所在的stage失敗,此時DAGScheduler收到匯報也會重試該stage,重試次數(shù)默認為4次,注意此時已經(jīng)成功執(zhí)行的task不需要再重新執(zhí)行了,只需要提交失敗的task就行,如果stage重試4次失敗,說明這個job就徹底失敗了,job沒有重試。

那么問題是發(fā)送到哪個work節(jié)點呢?最好是存儲節(jié)點(HDFS)包含計算節(jié)點(這里是spark集群),因為這樣為了數(shù)據(jù)本地化。根據(jù)文件名就可以獲得該文件的所有信息,根據(jù)文件名可以獲得每一個block的位置,以及block所在節(jié)點的ip等,然后就將task發(fā)送到該節(jié)點運行就行。

4.task放到work節(jié)點的executor進程中的線程池中運行

怎么分析spark計算框架

spark資源調(diào)度的方式

粗粒度的資源調(diào)度

在任務(wù)執(zhí)行前申請到所需的所有資源,當所有 task 執(zhí)行完畢后再釋放資源

優(yōu)點:task 直接使用已經(jīng)申請好的資源,執(zhí)行效率高

缺點:所有的 task 執(zhí)行完畢才釋放資源,可能導致集群資源浪費,例如只剩一個 task 遲遲不能結(jié)束,那么大量資源將被閑置

細粒度的資源調(diào)度

任務(wù)執(zhí)行時,task 自己去申請資源,執(zhí)行完畢后釋放資源

優(yōu)點:使集群資源得以充分利用

缺點:task 需要自己申請資源,執(zhí)行效率低

spark on standalone 執(zhí)行流程

1> worker 節(jié)點啟動,向 master 匯報信息,該信息被存儲在 workers 對象中,workers 底層使用 HashSet 數(shù)據(jù)結(jié)構(gòu),為了防止同一臺 worker 節(jié)點在 master 中注冊兩次(worker 節(jié)點掛掉但是迅速恢復可能會導致此問題)

2> 在客戶端提交任務(wù),這里以客戶端提交方式為例,首先客戶端會啟動 driver 進程,然后構(gòu)建Spark Application的運行環(huán)境,創(chuàng)建 SparkContext 對象,這會創(chuàng)建并初始化 TaskScheduler 和 DAGScheduler 兩個對象

3> 當兩個對象創(chuàng)建完成后,TaskScheduler 會向 master 為 Application 申請資源, Application 的信息會注冊在 master 上的 waitingApps 對象中,waitingApps 使用 ArrayBuffer 存儲數(shù)據(jù)

4> 當 waitingApps 集合中的元素發(fā)生變化時會回調(diào) schedule() 方法,這時 master 就知道有 Appliacation 在請求執(zhí)行。master 會去讀取 workers 來獲取自己掌握的 worker 節(jié)點,然后在資源充足的 worker 節(jié)點上為 Appliacation 分配資源 -> 通知 worker 節(jié)點啟動Executor 進程,Executor 進程啟動時會在內(nèi)部初始化一個線程池,用來執(zhí)行 task

–master 采用輪循方式分配資源,確保整個集群的資源得到充分利用,并有利于后面分發(fā) task 時實現(xiàn)數(shù)據(jù)本地化–每一個 worker 節(jié)點上默認為 Applacation 啟動 1 個 Executor 進程,該 Executor 進程默認使用 1G 內(nèi)存和該 worker 節(jié)點上空閑的所有的核可通過在提交任務(wù)時使用 - -executor-cores 和 - -executor-memory 來手動指定每個 Executor 使用的資源–spark 采用粗粒度的資源調(diào)度,當所有 task 都執(zhí)行完畢后,才進行資源回收

5> 當 Executor 成功啟動后,會去向 TaskScheduler 反向注冊,此時 TaskScheduler 就得到所有成功啟動的 Executor 的信息

6> SparkContext 對象解析代碼構(gòu)建DAG(有向無環(huán)圖)交給 DAGScheduler,每一個 job 會構(gòu)建一個DAG圖,DAGScheduler 根據(jù) DAG 中 RDD 的寬窄依賴將其切分成一個個 stage,每個 stage 中包含一組 task,每個 task 因為都是窄依賴,不會產(chǎn)生 shuffle,所以都是 pipeline(管道) 計算模式

7> DAGScheduler 將一個 stage 封裝到一個 taskSet 中,傳給 TaskScheduler,TaskScheduler拿到后遍歷 taskSet ,得到一個個 task,解讀其要計算的數(shù)據(jù),然后調(diào)用 HDFS 的 API 得到數(shù)據(jù)所在的位置

8> 本著計算向數(shù)據(jù)靠攏的原則,TaskScheduler 將 task 分發(fā)到其所要計算的數(shù)據(jù)所在的節(jié)點的 Executor 進程中,task 最后會被封裝到線程池里的一個線程中執(zhí)行,task 執(zhí)行的過程中 TaskScheduler 會對其進行監(jiān)控

9> 如果 task 執(zhí)行失敗,TaskScheduler 會進行重試,再次分發(fā)該 task ,最多重試3次;

如果 task 陷入掙扎并且 spark 開啟了推測執(zhí)行,TaskScheduler 會換一個節(jié)點分發(fā)陷入掙扎的 task,兩個 task 誰先執(zhí)行完就以誰的結(jié)果為準

陷入掙扎的判定標準:當75%的 task 已經(jīng)執(zhí)行完畢后,這時 TaskScheduler 每隔10ms會計算一次剩余 task 當前執(zhí)行時間的中值 t,然后以 t 的1.5倍 為標準,未執(zhí)行完的 task 當前執(zhí)行時間如果大于 t*1.5 則該 task 被判定為陷入掙扎的 task

10> 如果3次重試后 task 依然執(zhí)行失敗,該 task 所在的 stage 就會被判定為失敗,TaskScheduler 會向 DAGScheduler 反饋,DAGScheduler 會重試失敗的 stage,最多重試4次,如果4次重試后該 stage 依然失敗,則該 job 被判定為失敗,程序中止

DAGScheduler 重試 stage 時只會重試 stage 中失敗的 task

11> 當所有 task 成功執(zhí)行完畢后或 job 失敗,driver 會通知 master, master 會通知 worker kill 掉 Executor,完成資源回收

以上就是怎么分析spark計算框架,小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學到更多知識。更多詳情敬請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


標題名稱:怎么分析spark計算框架
地址分享:http://weahome.cn/article/jgieoh.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部