本篇內(nèi)容介紹了“Spark的基礎(chǔ)介紹和操作調(diào)優(yōu)”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
大城網(wǎng)站制作公司哪家好,找成都創(chuàng)新互聯(lián)!從網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、響應(yīng)式網(wǎng)站建設(shè)等網(wǎng)站項(xiàng)目制作,到程序開發(fā),運(yùn)營(yíng)維護(hù)。成都創(chuàng)新互聯(lián)成立于2013年到現(xiàn)在10年的時(shí)間,我們擁有了豐富的建站經(jīng)驗(yàn)和運(yùn)維經(jīng)驗(yàn),來(lái)保證我們的工作的順利進(jìn)行。專注于網(wǎng)站建設(shè)就選成都創(chuàng)新互聯(lián)。
在討論spark調(diào)優(yōu)之前,先看看spark里的一些概念。
Action是得到非RDD結(jié)果的RDD操作。如Spark中有如下常見action操作: reduce, collect, count, first, take, takeSample, countByKey, saveAsTextFile
每個(gè)spark的action會(huì)被分解成一個(gè)job。
一個(gè)job會(huì)被分成多組task,每組task稱為一個(gè)stage。stage的劃分界限為以下兩種task之一:
shuffleMapTask - 所有的wide transformation之前,可以簡(jiǎn)單認(rèn)為是shuffle之前
resultTask - 可以簡(jiǎn)單認(rèn)為是take()之類的操作
RDD 包含固定數(shù)目的 partition, 每個(gè) partiton 包含若干的 record。
narrow tansformation(比如 map 和 filter)返回的 RDD,一個(gè) partition 中的 record 只需要從父 RDD 對(duì)應(yīng)的 partition 中的 record 計(jì)算得到。同樣narrow transformation不會(huì)改變partition的個(gè)數(shù)。
被送到executor上執(zhí)行的工作單元; 一個(gè)task只能做一個(gè)stage中的一個(gè)partition的數(shù)據(jù)。
調(diào)整在 stage 邊屆時(shí)的 partition 個(gè)數(shù)經(jīng)常可以很大程度上影響程序的執(zhí)行效率;
associative reductive operation, 能使用reduceByKey時(shí)不使用groupByKey,因?yàn)間rouByKey會(huì)把所有數(shù)據(jù)shuffle一遍,而reduceByKey只會(huì)Shuffle reduce的結(jié)果。
輸入和輸出結(jié)果不一樣時(shí),不使用reduceByKey,而使用aggregateByKey;
aggregateByKey: Aggregate the values of each key, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of the values in this RDD, V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, as in scala.TraversableOnce. The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U.
不要用flatMap-join-groupBy的模式,可以用cogroup;
當(dāng)兩個(gè)reduceByKey的結(jié)果join時(shí),如果大家的partition都一樣,則spark不會(huì)在join時(shí)做shuffle;
當(dāng)一個(gè)內(nèi)存能放得下的數(shù)據(jù)集join時(shí),可以考慮broadcast而不使用join;
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3)
spark中的資源可以簡(jiǎn)單歸結(jié)為CPU和內(nèi)存,而以下的參數(shù)會(huì)影響內(nèi)存和CPU的使用。
executor 越大并行性越好,越大每個(gè)executor所有的內(nèi)存就越??;
core,越大并行性越好;
HDFS client 在大量并發(fā)線程是時(shí)性能問題。大概的估計(jì)是每個(gè) executor 中最多5個(gè)并行的 task 就可以占滿的寫入帶寬。
partition,如果比excutor*core小則很傻;越多每個(gè)partition占用的內(nèi)存就越少;足夠大以后對(duì)性能提升不再有用。
core = min(5,cpu核數(shù));
executor = instance數(shù) * cpu核數(shù) / core
平均每instance的executor個(gè)數(shù)決定executor.memory,從而決定shuffle.memory和storage.memory;
估計(jì)總數(shù)據(jù)量,即最大的shuffle時(shí)的數(shù)據(jù)大?。╯park driver運(yùn)行記錄中會(huì)有shuffle size);
用4的結(jié)果除以3得到partition數(shù),如果很小,把partition設(shè)成和(executor*core)的若干倍.
“Spark的基礎(chǔ)介紹和操作調(diào)優(yōu)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!