這篇文章給大家介紹Spark 3.0 AQE及CBO的示例分析,內(nèi)容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
創(chuàng)新互聯(lián)公司網(wǎng)站建設(shè)公司一直秉承“誠信做人,踏實做事”的原則,不欺瞞客戶,是我們最起碼的底線! 以服務(wù)為基礎(chǔ),以質(zhì)量求生存,以技術(shù)求發(fā)展,成交一個客戶多一個朋友!專注中小微企業(yè)官網(wǎng)定制,成都網(wǎng)站建設(shè)、成都網(wǎng)站制作,塑造企業(yè)網(wǎng)絡(luò)形象打造互聯(lián)網(wǎng)企業(yè)效應。
Spark3.0已經(jīng)發(fā)布半年之久,這次大版本的升級主要是集中在性能優(yōu)化和文檔豐富上,其中46%的優(yōu)化都集中在Spark SQL上,SQL優(yōu)化里最引人注意的非Adaptive Query Execution莫屬了。
Adaptive Query Execution(AQE)是英特爾大數(shù)據(jù)技術(shù)團隊和百度大數(shù)據(jù)基礎(chǔ)架構(gòu)部工程師在Spark 社區(qū)版本的基礎(chǔ)上,改進并實現(xiàn)的自適應執(zhí)行引擎。近些年來,Spark SQL 一直在針對CBO 特性進行優(yōu)化,而且做得十分成功。
首先,我們先來介紹另一個基于規(guī)則優(yōu)化(Rule-Based Optimization,簡稱RBO)的優(yōu)化器,這是一種經(jīng)驗式、啟發(fā)式的優(yōu)化思路,優(yōu)化規(guī)則都已經(jīng)預先定義好,只需要將SQL往這些規(guī)則上套就可以。簡單的說,RBO就像是一個經(jīng)驗豐富的老司機,基本套路全都知道。
然而世界上有一種東西叫做 – 不按套路來。與其說它不按套路來,倒不如說它本身并沒有什么套路。最典型的莫過于復雜Join算子優(yōu)化,對于這些Join來說,通常有兩個選擇題要做:
Join應該選擇哪種算法策略來執(zhí)行?BroadcastJoin or ShuffleHashJoin or SortMergeJoin?不同的執(zhí)行策略對系統(tǒng)的資源要求不同,執(zhí)行效率也有天壤之別,同一個SQL,選擇到合適的策略執(zhí)行可能只需要幾秒鐘,而如果沒有選擇到合適的執(zhí)行策略就可能會導致系統(tǒng)OOM。
對于雪花模型或者星型模型來講,多表Join應該選擇什么樣的順序執(zhí)行?不同的Join順序意味著不同的執(zhí)行效率,比如A join B join C,A、B表都很大,C表很小,那A join B很顯然需要大量的系統(tǒng)資源來運算,執(zhí)行時間必然不會短。而如果使用A join C join B的執(zhí)行順序,因為C表很小,所以A join C會很快得到結(jié)果,而且結(jié)果集會很小,再使用小的結(jié)果集 join B,性能顯而易見會好于前一種方案。
大家想想,這有什么固定的優(yōu)化規(guī)則么?并沒有。說白了,你需要知道更多關(guān)于表的基礎(chǔ)信息(表大小、表記錄總條數(shù)等),再通過一定規(guī)則代價評估才能從中選擇一條最優(yōu)的執(zhí)行計劃。所以,CBO 意為基于代價優(yōu)化策略,它需要計算所有可能執(zhí)行計劃的代價,并挑選出代價最小的執(zhí)行計劃。
AQE對于整體的Spark SQL的執(zhí)行過程做了相應的調(diào)整和優(yōu)化,它最大的亮點是可以根據(jù)已經(jīng)完成的計劃結(jié)點真實且精確的執(zhí)行統(tǒng)計結(jié)果來不停的反饋并重新優(yōu)化剩下的執(zhí)行計劃。
CBO 會計算一些和業(yè)務(wù)數(shù)據(jù)相關(guān)的統(tǒng)計數(shù)據(jù),來優(yōu)化查詢,例如行數(shù)、去重后的行數(shù)、空值、最大最小值等。Spark會根據(jù)這些數(shù)據(jù),自動選擇BHJ或者SMJ,對于多Join場景下的Cost-based Join Reorder,來達到優(yōu)化執(zhí)行計劃的目的。
但是,由于這些統(tǒng)計數(shù)據(jù)是需要預先處理的,會過時,所以我們在用過時的數(shù)據(jù)進行判斷,在某些情況下反而會變成負面效果,拉低了SQL執(zhí)行效率。
Spark3.0的AQE框架用了三招解決這個問題:
動態(tài)合并shuffle分區(qū)(Dynamically coalescing shuffle partitions)
動態(tài)調(diào)整Join策略(Dynamically switching join strategies)
動態(tài)優(yōu)化數(shù)據(jù)傾斜Join(Dynamically optimizing skew joins)
下面我們來詳細介紹這三個特性。
動態(tài)合并 shuffle 的分區(qū)
在我們處理的數(shù)據(jù)量級非常大時,shuffle通常來說是最影響性能的。因為shuffle是一個非常耗時的算子,它需要通過網(wǎng)絡(luò)移動數(shù)據(jù),分發(fā)給下游算子。 在shuffle中,partition的數(shù)量十分關(guān)鍵。partition的最佳數(shù)量取決于數(shù)據(jù),而數(shù)據(jù)大小在不同的query不同stage都會有很大的差異,所以很難去確定一個具體的數(shù)目:
如果partition過少,每個partition數(shù)據(jù)量就會過多,可能就會導致大量數(shù)據(jù)要落到磁盤上,從而拖慢了查詢。
如果partition過多,每個partition數(shù)據(jù)量就會很少,就會產(chǎn)生很多額外的網(wǎng)絡(luò)開銷,并且影響Spark task scheduler,從而拖慢查詢。
為了解決該問題,我們在最開始設(shè)置相對較大的shuffle partition個數(shù),通過執(zhí)行過程中shuffle文件的數(shù)據(jù)來合并相鄰的小partitions。 例如,假設(shè)我們執(zhí)行SELECT max(i) FROM tbl GROUP BY j,表tbl只有2個partition并且數(shù)據(jù)量非常小。我們將初始shuffle partition設(shè)為5,因此在分組后會出現(xiàn)5個partitions。若不進行AQE優(yōu)化,會產(chǎn)生5個tasks來做聚合結(jié)果,事實上有3個partitions數(shù)據(jù)量是非常小的。
然而在這種情況下,AQE只會生成3個reduce task。
動態(tài)切換join策略
Spark 支持許多 Join 策略,其中 broadcast hash join 通常是性能最好的,前提是參加 join 的一張表的數(shù)據(jù)能夠裝入內(nèi)存。由于這個原因,當 Spark 估計參加 join 的表數(shù)據(jù)量小于廣播大小的閾值時,其會將 Join 策略調(diào)整為 broadcast hash join。但是,很多情況都可能導致這種大小估計出錯——例如存在一個非常有選擇性的過濾器。
由于AQE擁有精確的上游統(tǒng)計數(shù)據(jù),因此可以解決該問題。比如下面這個例子,右表的實際大小為15M,而在該場景下,經(jīng)過filter過濾后,實際參與join的數(shù)據(jù)大小為8M,小于了默認broadcast閾值10M,應該被廣播。
在我們執(zhí)行過程中轉(zhuǎn)化為BHJ的同時,我們甚至可以將傳統(tǒng)shuffle優(yōu)化為本地shuffle(例如shuffle讀在mapper而不是基于reducer)來減小網(wǎng)絡(luò)開銷。
動態(tài)優(yōu)化數(shù)據(jù)傾斜
Join里如果出現(xiàn)某個key的數(shù)據(jù)傾斜問題,那么基本上就是這個任務(wù)的性能殺手了。在AQE之前,用戶沒法自動處理Join中遇到的這個棘手問題,需要借助外部手動收集數(shù)據(jù)統(tǒng)計信息,并做額外的加鹽,分批處理數(shù)據(jù)等相對繁瑣的方法來應對數(shù)據(jù)傾斜問題。
數(shù)據(jù)傾斜本質(zhì)上是由于集群上數(shù)據(jù)在分區(qū)之間分布不均勻所導致的,它會拉慢join場景下整個查詢。AQE根據(jù)shuffle文件統(tǒng)計數(shù)據(jù)自動檢測傾斜數(shù)據(jù),將那些傾斜的分區(qū)打散成小的子分區(qū),然后各自進行join。
我們可以看下這個場景,Table A join Table B,其中Table A的partition A0數(shù)據(jù)遠大于其他分區(qū)。
AQE會將partition A0切分成2個子分區(qū),并且讓他們獨自和Table B的partition B0進行join。
如果不做這個優(yōu)化,SMJ將會產(chǎn)生4個tasks并且其中一個執(zhí)行時間遠大于其他。經(jīng)優(yōu)化,這個join將會有5個tasks,但每個task執(zhí)行耗時差不多相同,因此個整個查詢帶來了更好的性能。
如何開啟AQE
我們可以設(shè)置參數(shù)spark.sql.adaptive.enabled為true來開啟AQE,在Spark 3.0中默認是false,并滿足以下條件:
非流式查詢
包含至少一個exchange(如join、聚合、窗口算子)或者一個子查詢
AQE通過減少了對靜態(tài)統(tǒng)計數(shù)據(jù)的依賴,成功解決了Spark CBO的一個難以處理的trade off(生成統(tǒng)計數(shù)據(jù)的開銷和查詢耗時)以及數(shù)據(jù)精度問題。相比之前具有局限性的CBO,現(xiàn)在就顯得非常靈活。
Adaptive Execution 模式是在使用Spark物理執(zhí)行計劃注入生成的。在QueryExecution類中有 preparations 一組優(yōu)化器來對物理執(zhí)行計劃進行優(yōu)化, InsertAdaptiveSparkPlan 就是第一個優(yōu)化器。
InsertAdaptiveSparkPlan 使用 PlanAdaptiveSubqueries Rule對部分SubQuery處理后,將當前 Plan 包裝成 AdaptiveSparkPlanExec 。
當執(zhí)行 AdaptiveSparkPlanExec 的 collect() 或 take() 方法時,全部會先執(zhí)行 getFinalPhysicalPlan() 方法生成新的SparkPlan,再執(zhí)行對應的SparkPlan對應的方法。
// QueryExecution類 lazy val executedPlan: SparkPlan = { executePhase(QueryPlanningTracker.PLANNING) { QueryExecution.prepareForExecution(preparations, sparkPlan.clone()) } } protected def preparations: Seq[Rule[SparkPlan]] = { QueryExecution.preparations(sparkSession, Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession, this)))) } private[execution] def preparations( sparkSession: SparkSession, adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None): Seq[Rule[SparkPlan]] = { // `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op // as the original plan is hidden behind `AdaptiveSparkPlanExec`. adaptiveExecutionRule.toSeq ++ Seq( PlanDynamicPruningFilters(sparkSession), PlanSubqueries(sparkSession), EnsureRequirements(sparkSession.sessionState.conf), ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf, sparkSession.sessionState.columnarRules), CollapseCodegenStages(sparkSession.sessionState.conf), ReuseExchange(sparkSession.sessionState.conf), ReuseSubquery(sparkSession.sessionState.conf) ) } // InsertAdaptiveSparkPlan override def apply(plan: SparkPlan): SparkPlan = applyInternal(plan, false) private def applyInternal(plan: SparkPlan, isSubquery: Boolean): SparkPlan = plan match { // ...some checking case _ if shouldApplyAQE(plan, isSubquery) => if (supportAdaptive(plan)) { try { // Plan sub-queries recursively and pass in the shared stage cache for exchange reuse. // Fall back to non-AQE mode if AQE is not supported in any of the sub-queries. val subqueryMap = buildSubqueryMap(plan) val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap) val preprocessingRules = Seq( planSubqueriesRule) // Run pre-processing rules. val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preprocessingRules) logDebug(s"Adaptive execution enabled for plan: $plan") AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext, preprocessingRules, isSubquery) } catch { case SubqueryAdaptiveNotSupportedException(subquery) => logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " + s"but is not supported for sub-query: $subquery.") plan } } else { logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " + s"but is not supported for query: $plan.") plan } case _ => plan }
AQE對Stage 分階段提交執(zhí)行和優(yōu)化過程如下:
private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized { // 第一次調(diào)用 getFinalPhysicalPlan方法時為false,等待該方法執(zhí)行完畢,全部Stage不會再改變,直接返回最終plan if (isFinalPlan) return currentPhysicalPlan // In case of this adaptive plan being executed out of `withActive` scoped functions, e.g., // `plan.queryExecution.rdd`, we need to set active session here as new plan nodes can be // created in the middle of the execution. context.session.withActive { val executionId = getExecutionId var currentLogicalPlan = currentPhysicalPlan.logicalLink.get var result = createQueryStages(currentPhysicalPlan) val events = new LinkedBlockingQueue[StageMaterializationEvent]() val errors = new mutable.ArrayBuffer[Throwable]() var stagesToReplace = Seq.empty[QueryStageExec] while (!result.allChildStagesMaterialized) { currentPhysicalPlan = result.newPlan // 接下來有哪些Stage要執(zhí)行,參考 createQueryStages(plan: SparkPlan) 方法 if (result.newStages.nonEmpty) { stagesToReplace = result.newStages ++ stagesToReplace // onUpdatePlan 通過listener更新UI executionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan))) // Start materialization of all new stages and fail fast if any stages failed eagerly result.newStages.foreach { stage => try { // materialize() 方法對Stage的作為一個單獨的Job提交執(zhí)行,并返回 SimpleFutureAction 來接收執(zhí)行結(jié)果 // QueryStageExec: materialize() -> doMaterialize() -> // ShuffleExchangeExec: -> mapOutputStatisticsFuture -> ShuffleExchangeExec // SparkContext: -> submitMapStage(shuffleDependency) stage.materialize().onComplete { res => if (res.isSuccess) { events.offer(StageSuccess(stage, res.get)) } else { events.offer(StageFailure(stage, res.failed.get)) } }(AdaptiveSparkPlanExec.executionContext) } catch { case e: Throwable => cleanUpAndThrowException(Seq(e), Some(stage.id)) } } } // Wait on the next completed stage, which indicates new stats are available and probably // new stages can be created. There might be other stages that finish at around the same // time, so we process those stages too in order to reduce re-planning. // 等待,直到有Stage執(zhí)行完畢 val nextMsg = events.take() val rem = new util.ArrayList[StageMaterializationEvent]() events.drainTo(rem) (Seq(nextMsg) ++ rem.asScala).foreach { case StageSuccess(stage, res) => stage.resultOption = Some(res) case StageFailure(stage, ex) => errors.append(ex) } // In case of errors, we cancel all running stages and throw exception. if (errors.nonEmpty) { cleanUpAndThrowException(errors, None) } // Try re-optimizing and re-planning. Adopt the new plan if its cost is equal to or less // than that of the current plan; otherwise keep the current physical plan together with // the current logical plan since the physical plan's logical links point to the logical // plan it has originated from. // Meanwhile, we keep a list of the query stages that have been created since last plan // update, which stands for the "semantic gap" between the current logical and physical // plans. And each time before re-planning, we replace the corresponding nodes in the // current logical plan with logical query stages to make it semantically in sync with // the current physical plan. Once a new plan is adopted and both logical and physical // plans are updated, we can clear the query stage list because at this point the two plans // are semantically and physically in sync again. // 對前面的Stage替換為 LogicalQueryStage 節(jié)點 val logicalPlan = replaceWithQueryStagesInLogicalPlan(currentLogicalPlan, stagesToReplace) // 再次調(diào)用optimizer 和planner 進行優(yōu)化 val (newPhysicalPlan, newLogicalPlan) = reOptimize(logicalPlan) val origCost = costEvaluator.evaluateCost(currentPhysicalPlan) val newCost = costEvaluator.evaluateCost(newPhysicalPlan) if (newCost < origCost || (newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) { logOnLevel(s"Plan changed from $currentPhysicalPlan to $newPhysicalPlan") cleanUpTempTags(newPhysicalPlan) currentPhysicalPlan = newPhysicalPlan currentLogicalPlan = newLogicalPlan stagesToReplace = Seq.empty[QueryStageExec] } // Now that some stages have finished, we can try creating new stages. // 進入下一輪循環(huán),如果存在Stage執(zhí)行完畢, 對應的resultOption 會有值,對應的allChildStagesMaterialized 屬性 = true result = createQueryStages(currentPhysicalPlan) } // Run the final plan when there's no more unfinished stages. // 所有前置stage全部執(zhí)行完畢,根據(jù)stats信息優(yōu)化物理執(zhí)行計劃,確定最終的 physical plan currentPhysicalPlan = applyPhysicalRules(result.newPlan, queryStageOptimizerRules) isFinalPlan = true executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan))) currentPhysicalPlan } }
// SparkContext /** * Submit a map stage for execution. This is currently an internal API only, but might be * promoted to DeveloperApi in the future. */ private[spark] def submitMapStage[K, V, C](dependency: ShuffleDependency[K, V, C]) : SimpleFutureAction[MapOutputStatistics] = { assertNotStopped() val callSite = getCallSite() var result: MapOutputStatistics = null val waiter = dagScheduler.submitMapStage( dependency, (r: MapOutputStatistics) => { result = r }, callSite, localProperties.get) new SimpleFutureAction[MapOutputStatistics](waiter, result) } // DAGScheduler def submitMapStage[K, V, C]( dependency: ShuffleDependency[K, V, C], callback: MapOutputStatistics => Unit, callSite: CallSite, properties: Properties): JobWaiter[MapOutputStatistics] = { val rdd = dependency.rdd val jobId = nextJobId.getAndIncrement() if (rdd.partitions.length == 0) { throw new SparkException("Can't run submitMapStage on RDD with 0 partitions") } // We create a JobWaiter with only one "task", which will be marked as complete when the whole // map stage has completed, and will be passed the MapOutputStatistics for that stage. // This makes it easier to avoid race conditions between the user code and the map output // tracker that might result if we told the user the stage had finished, but then they queries // the map output tracker and some node failures had caused the output statistics to be lost. val waiter = new JobWaiter[MapOutputStatistics]( this, jobId, 1, (_: Int, r: MapOutputStatistics) => callback(r)) eventProcessLoop.post(MapStageSubmitted( jobId, dependency, callSite, waiter, Utils.cloneProperties(properties))) waiter }
當前,AdaptiveSparkPlanExec 中對物理執(zhí)行的優(yōu)化器列表如下:
// AdaptiveSparkPlanExec @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( ReuseAdaptiveSubquery(conf, context.subqueryCache), CoalesceShufflePartitions(context.session), // The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs' // added by `CoalesceShufflePartitions`. So they must be executed after it. OptimizeSkewedJoin(conf), OptimizeLocalShuffleReader(conf), ApplyColumnarRulesAndInsertTransitions(conf, context.session.sessionState.columnarRules), CollapseCodegenStages(conf) )
其中 OptimizeSkewedJoin方法就是針對最容易出現(xiàn)數(shù)據(jù)傾斜的Join進行的優(yōu)化:
AQE模式下,每個Stage執(zhí)行之前,前置依賴Stage已經(jīng)全部執(zhí)行完畢,那么就可以獲取到每個Stage的stats信息。 當發(fā)現(xiàn)shuffle partition的輸出超過partition size的中位數(shù)的5倍,且partition的輸出大于 256M 會被判斷產(chǎn)生數(shù)據(jù)傾斜, 將partition 數(shù)據(jù)按照targetSize進行切分為N份。 targetSize = max(64M, 非數(shù)據(jù)傾斜partition的平均大小)。
優(yōu)化前 shuffle 如下:
優(yōu)化后 shuffle:
FreeWheel團隊通過高效的敏捷開發(fā)趕在 2020 年圣誕廣告季之前在生產(chǎn)環(huán)境順利發(fā)布上線,整體性能提升高達 40%(對于大 batch)的數(shù)據(jù),AWS Cost 平均節(jié)省 25%~30%之間,大約每年至少能為公司節(jié)省百萬成本。
打開 Spark 3.0 AQE 的新特性,主要配置如下:
"spark.sql.adaptive.enabled": true, "spark.sql.adaptive.coalescePartitions.enabled": true, "spark.sql.adaptive.coalescePartitions.minPartitionNum": 1, "spark.sql.adaptive.advisoryPartitionSizeInBytes": "128MB"
需要注意的是,AQE 特性只是在 reducer 階段不用指定 reducer 的個數(shù),但并不代表你不再需要指定任務(wù)的并行度了。因為 map 階段仍然需要將數(shù)據(jù)劃分為合適的分區(qū)進行處理,如果沒有指定并行度會使用默認的 200,當數(shù)據(jù)量過大時,很容易出現(xiàn) OOM。建議還是按照任務(wù)之前的并行度設(shè)置來配置參數(shù)spark.sql.shuffle.partitions和spark.default.parallelism。
我們來仔細看一下為什么升級到 3.0 以后可以減少運行時間,又能節(jié)省集群的成本。 以 Optimus 數(shù)據(jù)建模里的一張表的運行情況為例:
在 reduce 階段從沒有 AQE 的40320個 tasks 銳減到4580個 tasks,減少了一個數(shù)量級。
下圖里下半部分是沒有 AQE 的 Spark 2.x 的 task 情況,上半部分是打開 AQE 特性后的 Spark 3.x 的情況。
從更詳細的運行時間圖來看,shuffler reader后同樣的 aggregate 的操作等時間也從4.44h到2.56h,節(jié)省將近一半。
左邊是 spark 2.x 的運行指標明細,右邊是打開 AQE 后通過custom shuffler reader后的運行指標情況。
AQE性能
AQE對于整體的 Spark SQL 的執(zhí)行過程做了相應的調(diào)整和優(yōu)化(如下圖),它最大的亮點是可以根據(jù)已經(jīng)完成的計劃結(jié)點真實且精確的執(zhí)行統(tǒng)計結(jié)果來不停的反饋并重新優(yōu)化剩下的執(zhí)行計劃。
AQE 自動調(diào)整 reducer 的數(shù)量,減小 partition 數(shù)量。Spark 任務(wù)的并行度一直是讓用戶比較困擾的地方。如果并行度太大的話,會導致 task 過多,overhead 比較大,整體拉慢任務(wù)的運行。而如果并行度太小的,數(shù)據(jù)分區(qū)會比較大,容易出現(xiàn) OOM 的問題,并且資源也得不到合理的利用,并行運行任務(wù)優(yōu)勢得不到最大的發(fā)揮。
而且由于 Spark Context 整個任務(wù)的并行度,需要一開始設(shè)定好且沒法動態(tài)修改,這就很容易出現(xiàn)任務(wù)剛開始的時候數(shù)據(jù)量大需要大的并行度,而運行的過程中通過轉(zhuǎn)化過濾可能最終的數(shù)據(jù)集已經(jīng)變得很小,最初設(shè)定的分區(qū)數(shù)就顯得過大了。AQE 能夠很好的解決這個問題,在 reducer 去讀取數(shù)據(jù)時,會根據(jù)用戶設(shè)定的分區(qū)數(shù)據(jù)的大小(spark.sql.adaptive.advisoryPartitionSizeInBytes)來自動調(diào)整和合并(Coalesce)小的 partition,自適應地減小 partition 的數(shù)量,以減少資源浪費和 overhead,提升任務(wù)的性能。
由上面單張表可以看到,打開 AQE 的時候極大的降低了 task 的數(shù)量,除了減輕了 Driver 的負擔,也減少啟動 task 帶來的 schedule,memory,啟動管理等 overhead,減少 cpu 的占用,提升的 I/O 性能。
拿歷史 Data Pipelines 為例,同時會并行有三十多張表在 Spark 里運行,每張表都有極大的性能提升,那么也使得其他的表能夠獲得資源更早更多,互相受益,那么最終整個的數(shù)據(jù)建模過程會自然而然有一個加速的結(jié)果。
大 batch(>200G)相對小 batch(< 100G )有比較大的提升,有高達 40%提升,主要是因為大 batch 本身數(shù)據(jù)量大,需要機器數(shù)多,設(shè)置并發(fā)度也更大,那么 AQE 展現(xiàn)特性的時刻會更多更明顯。而小 batch 并發(fā)度相對較低,那么提升也就相對會少一些,不過也是有 27.5%左右的加速。
內(nèi)存優(yōu)化
除了因為 AQE 的打開,減少過碎的 task 對于 memory 的占用外,Spark 3.0 也在其他地方做了很多內(nèi)存方面的優(yōu)化,比如 Aggregate 部分指標瘦身、Netty 的共享內(nèi)存 Pool 功能、Task Manager 死鎖問題、避免某些場景下從網(wǎng)絡(luò)讀取 shuffle block等等,來減少內(nèi)存的壓力。一系列內(nèi)存的優(yōu)化加上 AQE 特性疊加從前文內(nèi)存實踐圖中可以看到集群的內(nèi)存使用同時有30%左右的下降。
升級主要的實踐成果如下:
性能提升明顯
歷史數(shù)據(jù) Pipeline 對于大 batch 的數(shù)據(jù)(200~400G/每小時)性能提升高達40%, 對于小 batch(小于 100G/每小時)提升效果沒有大 batch 提升的那么明顯,每天所有 batches平均提升水平27.5%左右。
預測數(shù)據(jù)性能平均提升30%。由于數(shù)據(jù)輸入源不一樣,目前是分別兩個 pipelines 在跑歷史和預測數(shù)據(jù),產(chǎn)生的表的數(shù)目也不太一樣,因此做了分別的評估。
以歷史數(shù)據(jù)上線后的端到端到運行時間為例(如下圖),肉眼可見上線后整體 pipeline 的運行時間有了明顯的下降,能夠更快的輸出數(shù)據(jù)供下游使用。
集群內(nèi)存使用降低
集群內(nèi)存使用對于大 batch 達降低30%左右,每天平均平均節(jié)省25%左右。
以歷史數(shù)據(jù)上線后的運行時集群的 memory 在 ganglia 上的截圖為例(如下圖),整體集群的內(nèi)存使用從 41.2T 降到 30.1T,這意味著我們可以用更少的機器花更少的錢來跑同樣的 Spark 任務(wù)。
AWS Cost 降低
Pipelines 做了自動的 Scale In/Scale Out 策略: 在需要資源的時候擴集群的 Task 結(jié)點,在任務(wù)結(jié)束后自動去縮集群的 Task 結(jié)點,且會根據(jù)每次 batch 數(shù)據(jù)的大小通過算法學習得到最佳的機器數(shù)。通過升級到 Spark 3.0 后,由于現(xiàn)在任務(wù)跑的更快并且需要的機器更少,上線后統(tǒng)計 AWS Cost 每天節(jié)省30%左右,大約一年能為公司節(jié)省百萬成本。
關(guān)于Spark 3.0 AQE及CBO的示例分析就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。