這篇文章主要講解了“spark性能調(diào)優(yōu)的方法是什么”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“spark性能調(diào)優(yōu)的方法是什么”吧!
在民和等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場(chǎng)前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都網(wǎng)站制作、做網(wǎng)站 網(wǎng)站設(shè)計(jì)制作按需求定制設(shè)計(jì),公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),成都品牌網(wǎng)站建設(shè),成都全網(wǎng)營銷,成都外貿(mào)網(wǎng)站建設(shè),民和網(wǎng)站建設(shè)費(fèi)用合理。
分配哪些資源?executor、cpu per executor、memory per executor、driver memory
在哪里分配這些資源?在我們?cè)谏a(chǎn)環(huán)境中,提交spark作業(yè)時(shí),用的spark-submit shell腳本,里面調(diào)整對(duì)應(yīng)的參數(shù)
/usr/local/spark/bin/spark-submit \ --class cn.spark.sparktest.core.WordCountCluster \ --num-executors 3 \ 配置executor的數(shù)量 --driver-memory 100m \ 配置driver的內(nèi)存(影響很大) --executor-memory 100m \ 配置每個(gè)executor的內(nèi)存大小 --executor-cores 3 \ 配置每個(gè)executor的cpu core數(shù)量 /usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
調(diào)節(jié)到多大,算是最大呢?
第一種,Spark Standalone,公司集群上,搭建了一套Spark集群,你心里應(yīng)該清楚每臺(tái)機(jī)器還能夠給你使用的,大概有多少內(nèi)存,多少cpu core;那么,設(shè)置的時(shí)候,就根據(jù)這個(gè)實(shí)際的情況,去調(diào)節(jié)每個(gè)spark作業(yè)的資源分配。比如說你的每臺(tái)機(jī)器能夠給你使用4G內(nèi)存,2個(gè)cpu core;20臺(tái)機(jī)器;executor,20;平均每個(gè)executor:4G內(nèi)存,2個(gè)cpu core。
第二種,Yarn。資源隊(duì)列。資源調(diào)度。應(yīng)該去查看,你的spark作業(yè),要提交到的資源隊(duì)列,大概有多少資源?500G內(nèi)存,100個(gè)cpu core;executor,50;平均每個(gè)executor:10G內(nèi)存,2個(gè)cpu core。
設(shè)置隊(duì)列名稱:spark.yarn.queue default
一個(gè)原則,你能使用的資源有多大,就盡量去調(diào)節(jié)到最大的大?。╡xecutor的數(shù)量,幾十個(gè)到上百個(gè)不等;executor內(nèi)存;executor cpu core)
為什么調(diào)節(jié)了資源以后,性能可以提升?
增加executor:
如果executor數(shù)量比較少,那么,能夠并行執(zhí)行的task數(shù)量就比較少,就意味著,我們的Application的并行執(zhí)行的能力就很弱。比如有3個(gè)executor,每個(gè)executor有2個(gè)cpu core,那么同時(shí)能夠并行執(zhí)行的task,就是6個(gè)。6個(gè)執(zhí)行完以后,再換下一批6個(gè)task。增加了executor數(shù)量以后,那么,就意味著,能夠并行執(zhí)行的task數(shù)量,也就變多了。比如原先是6個(gè),現(xiàn)在可能可以并行執(zhí)行10個(gè),甚至20個(gè),100個(gè)。那么并行能力就比之前提升了數(shù)倍,數(shù)十倍。相應(yīng)的,性能(執(zhí)行的速度),也能提升數(shù)倍~數(shù)十倍。
有時(shí)候數(shù)據(jù)量比較少,增加大量的task反而性能會(huì)降低,為什么?(想想就明白了,你用多了,別人用的就少了。。。。)
增加每個(gè)executor的cpu core:
也是增加了執(zhí)行的并行能力。原本20個(gè)executor,每個(gè)才2個(gè)cpu core。能夠并行執(zhí)行的task數(shù)量,就是40個(gè)task?,F(xiàn)在每個(gè)executor的cpu core,增加到了5個(gè)。能夠并行執(zhí)行的task數(shù)量,就是100個(gè)task。執(zhí)行的速度,提升了2.5倍。
SparkContext,DAGScheduler,TaskScheduler,會(huì)將我們的算子,切割成大量的task,
提交到Application的executor上面去執(zhí)行。
增加每個(gè)executor的內(nèi)存量:
增加了內(nèi)存量以后,對(duì)性能的提升,有三點(diǎn):
1、如果需要對(duì)RDD進(jìn)行cache,那么更多的內(nèi)存,就可以緩存更多的數(shù)據(jù),將更少的數(shù)據(jù)寫入磁盤,甚至不寫入磁盤。減少了磁盤IO。
2、對(duì)于shuffle操作,reduce端,會(huì)需要內(nèi)存來存放拉取的數(shù)據(jù)并進(jìn)行聚合。如果內(nèi)存不夠,也會(huì)寫入磁盤。如果給executor分配更多內(nèi)存以后,就有更少的數(shù)據(jù),需要寫入磁盤,
甚至不需要寫入磁盤。減少了磁盤IO,提升了性能。
3、對(duì)于task的執(zhí)行,可能會(huì)創(chuàng)建很多對(duì)象。如果內(nèi)存比較小,可能會(huì)頻繁導(dǎo)致JVM堆內(nèi)存滿了,然后頻繁GC,垃圾回收,minor GC和full GC。(速度很慢)。內(nèi)存加大以后,帶來更少的GC,垃圾回收,避免了速度變慢,速度變快了。
Spark并行度指的是什么?
Spark作業(yè),Application,Jobs,action(collect)觸發(fā)一個(gè)job,1個(gè)job;每個(gè)job拆成多個(gè)stage,
發(fā)生shuffle的時(shí)候,會(huì)拆分出一個(gè)stage,reduceByKey。
stage0 val lines = sc.textFile("hdfs://") val words = lines.flatMap(_.split(" ")) val pairs = words.map((_,1)) val wordCount = pairs.reduceByKey(_ + _) stage1 val wordCount = pairs.reduceByKey(_ + _) wordCount.collect()
reduceByKey,stage0的task,在最后,執(zhí)行到reduceByKey的時(shí)候,會(huì)為每個(gè)stage1的task,都創(chuàng)建一份文件(也可能是合并在少量的文件里面);每個(gè)stage1的task,會(huì)去各個(gè)節(jié)點(diǎn)上的各個(gè)task創(chuàng)建的屬于自己的那一份文件里面,拉取數(shù)據(jù);每個(gè)stage1的task,拉取到的數(shù)據(jù),一定是相同key對(duì)應(yīng)的數(shù)據(jù)。對(duì)相同的key,對(duì)應(yīng)的values,才能去執(zhí)行我們自定義的function操作(_ + _)
并行度:其實(shí)就是指的是,Spark作業(yè)中,各個(gè)stage的task數(shù)量,也就代表了Spark作業(yè)的在各個(gè)階段(stage)的并行度。
如果不調(diào)節(jié)并行度,導(dǎo)致并行度過低,會(huì)怎么樣?
task沒有設(shè)置,或者設(shè)置的很少,比如就設(shè)置了,100個(gè)task。50個(gè)executor,每個(gè)executor有3個(gè)cpu core,也就是說,你的Application任何一個(gè)stage運(yùn)行的時(shí)候,都有總數(shù)在150個(gè)cpu core,可以并行運(yùn)行。但是你現(xiàn)在,只有100個(gè)task,平均分配一下,每個(gè)executor分配到2個(gè)task,ok,那么同時(shí)在運(yùn)行的task,只有100個(gè),每個(gè)executor只會(huì)并行運(yùn)行2個(gè)task。每個(gè)executor剩下的一個(gè)cpu core,就浪費(fèi)掉了。
你的資源雖然分配足夠了,但是問題是,并行度沒有與資源相匹配,導(dǎo)致你分配下去的資源都浪費(fèi)掉了。合理的并行度的設(shè)置,應(yīng)該是要設(shè)置的足夠大,大到可以完全合理的利用你的集群資源;比如上面的例子,總共集群有150個(gè)cpu core,可以并行運(yùn)行150個(gè)task。那么就應(yīng)該將你的Application的并行度,至少設(shè)置成150,才能完全有效的利用你的集群資源,讓150個(gè)task,并行執(zhí)行;而且task增加到150個(gè)以后,即可以同時(shí)并行運(yùn)行,還可以讓每個(gè)task要處理的數(shù)據(jù)量變少;比如總共150G的數(shù)據(jù)要處理,如果是100個(gè)task,每個(gè)task計(jì)算1.5G的數(shù)據(jù);現(xiàn)在增加到150個(gè)task,可以并行運(yùn)行,而且每個(gè)task主要處理1G的數(shù)據(jù)就可以。
很簡單的道理,只要合理設(shè)置并行度,就可以完全充分利用你的集群計(jì)算資源,并且減少每個(gè)task要處理的數(shù)據(jù)量,最終,就是提升你的整個(gè)Spark作業(yè)的性能和運(yùn)行速度。
task數(shù)量,至少設(shè)置成與Spark application的總cpu core數(shù)量相同(最理想情況,比如總共150個(gè)cpu core,分配了150個(gè)task,一起運(yùn)行,差不多同一時(shí)間運(yùn)行完畢)
官方是推薦,task數(shù)量,設(shè)置成spark application總cpu core數(shù)量的2~3倍,比如150個(gè)cpu core,基本要設(shè)置task數(shù)量為300~500;實(shí)際情況,與理想情況不同的,有些task會(huì)運(yùn)行的快一點(diǎn),比如50s就完了,有些task,可能會(huì)慢一點(diǎn),要1分半才運(yùn)行完,所以如果你的task數(shù)量,剛好設(shè)置的跟cpu core數(shù)量相同,可能還是會(huì)導(dǎo)致資源的浪費(fèi),因?yàn)?,比?50個(gè)task,10個(gè)先運(yùn)行完了,剩余140個(gè)還在運(yùn)行,但是這個(gè)時(shí)候,有10個(gè)cpu core就空閑出來了,就導(dǎo)致了浪費(fèi)。那如果task數(shù)量設(shè)置成cpu core總數(shù)的2~3倍,那么一個(gè)task運(yùn)行完了以后,另一個(gè)task馬上可以補(bǔ)上來,就盡量讓cpu core不要空閑,同時(shí)也是盡量提升spark作業(yè)運(yùn)行的效率和速度,提升性能。
如何設(shè)置一個(gè)Spark Application的并行度?
spark.default.parallelism SparkConf conf = new SparkConf().set("spark.default.parallelism", "500")
默認(rèn)情況下,多次對(duì)一個(gè)RDD執(zhí)行算子,去獲取不同的RDD;都會(huì)對(duì)這個(gè)RDD以及之前的父RDD,全部重新計(jì)算一次;讀取HDFS->RDD1->RDD2-RDD4這種情況,是絕對(duì)絕對(duì),一定要避免的,一旦出現(xiàn)一個(gè)RDD重復(fù)計(jì)算的情況,就會(huì)導(dǎo)致性能急劇降低。比如,HDFS->RDD1-RDD2的時(shí)間是15分鐘,那么此時(shí)就要走兩遍,變成30分鐘
RDD架構(gòu)重構(gòu)與優(yōu)化盡量去復(fù)用RDD,差不多的RDD,可以抽取稱為一個(gè)共同的RDD,供后面的RDD計(jì)算時(shí),反復(fù)使用。
公共RDD一定要實(shí)現(xiàn)持久化。就好比北方吃餃子,現(xiàn)包現(xiàn)煮。你人來了,要點(diǎn)一盤餃子。餡料+餃子皮+水->包好的餃子,對(duì)包好的餃子去煮,煮開了以后,才有你需要的熟的,熱騰騰的餃子?,F(xiàn)實(shí)生活中,餃子現(xiàn)包現(xiàn)煮,當(dāng)然是最好的了。但是Spark中,RDD要去“現(xiàn)包現(xiàn)煮”,那就是一場(chǎng)致命的災(zāi)難。對(duì)于要多次計(jì)算和使用的公共RDD,一定要進(jìn)行持久化。持久化,也就是說,將RDD的數(shù)據(jù)緩存到內(nèi)存中/磁盤中,(BlockManager),以后無論對(duì)這個(gè)RDD做多少次計(jì)算,那么都是直接取這個(gè)RDD的持久化的數(shù)據(jù),比如從內(nèi)存中或者磁盤中,直接提取一份數(shù)據(jù)。
持久化,是可以進(jìn)行序列化的如果正常將數(shù)據(jù)持久化在內(nèi)存中,那么可能會(huì)導(dǎo)致內(nèi)存的占用過大,這樣的話,也許,會(huì)導(dǎo)致OOM內(nèi)存溢出。當(dāng)純內(nèi)存無法支撐公共RDD數(shù)據(jù)完全存放的時(shí)候,就優(yōu)先考慮,使用序列化的方式在純內(nèi)存中存儲(chǔ)。將RDD的每個(gè)partition的數(shù)據(jù),序列化成一個(gè)大的字節(jié)數(shù)組,就一個(gè)對(duì)象;序列化后,大大減少內(nèi)存的空間占用。序列化的方式,唯一的缺點(diǎn)就是,在獲取數(shù)據(jù)的時(shí)候,需要反序列化。如果序列化純內(nèi)存方式,還是導(dǎo)致OOM,內(nèi)存溢出;就只能考慮磁盤的方式,內(nèi)存+磁盤的普通方式(無序列化)。內(nèi)存+磁盤,序列化。
為了數(shù)據(jù)的高可靠性,而且內(nèi)存充足,可以使用雙副本機(jī)制,進(jìn)行持久化持久化的雙副本機(jī)制,持久化后的一個(gè)副本,因?yàn)闄C(jī)器宕機(jī)了,副本丟了,就還是得重新計(jì)算一次;持久化的每個(gè)數(shù)據(jù)單元,存儲(chǔ)一份副本,放在其他節(jié)點(diǎn)上面;從而進(jìn)行容錯(cuò);一個(gè)副本丟了,不用重新計(jì)算,還可以使用另外一份副本。這種方式,僅僅針對(duì)你的內(nèi)存資源極度充足.
持久化,很簡單,就是對(duì)RDD調(diào)用persist()方法,并傳入一個(gè)持久化級(jí)別
如果是persist(StorageLevel.MEMORY_ONLY()),純內(nèi)存,無序列化,那么就可以用cache()方法來替代
StorageLevel.MEMORY_ONLY_SER(),第二選擇
StorageLevel.MEMORY_AND_DISK(),第三選擇
StorageLevel.MEMORY_AND_DISK_SER(),第四選擇
StorageLevel.DISK_ONLY(),第五選擇
如果內(nèi)存充足,要使用雙副本高可靠機(jī)制,選擇后綴帶_2的策略
StorageLevel.MEMORY_ONLY_2()
感謝各位的閱讀,以上就是“spark性能調(diào)優(yōu)的方法是什么”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)spark性能調(diào)優(yōu)的方法是什么這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!