本篇內(nèi)容介紹了“spark的動態(tài)分區(qū)裁剪下物理計(jì)劃怎么實(shí)現(xiàn)”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
網(wǎng)站設(shè)計(jì)制作過程拒絕使用模板建站;使用PHP+MYSQL原生開發(fā)可交付網(wǎng)站源代碼;符合網(wǎng)站優(yōu)化排名的后臺管理系統(tǒng);成都做網(wǎng)站、網(wǎng)站制作收費(fèi)合理;免費(fèi)進(jìn)行網(wǎng)站備案等企業(yè)網(wǎng)站建設(shè)一條龍服務(wù).我們是一家持續(xù)穩(wěn)定運(yùn)營了十年的成都創(chuàng)新互聯(lián)網(wǎng)站建設(shè)公司。
本文基于delta 0.7.0 spark 3.0.1 spark 3.x引入了動態(tài)分區(qū)裁剪,在 spark 的動態(tài)分區(qū)裁剪上(Dynamic partition pruning)-邏輯計(jì)劃我們提到在邏輯計(jì)劃階段會加入DynamicPruningSubquery,今天我們分析一下在物理階段怎么對DynamicPruningSubquery進(jìn)行優(yōu)化以及實(shí)現(xiàn)的
直接轉(zhuǎn)到PlanDynamicPruningFilters的apply方法:
override def apply(plan: SparkPlan): SparkPlan = { if (!SQLConf.get.dynamicPartitionPruningEnabled) { return plan } plan transformAllExpressions { case DynamicPruningSubquery( value, buildPlan, buildKeys, broadcastKeyIndex, onlyInBroadcast, exprId) => val sparkPlan = QueryExecution.createSparkPlan( sparkSession, sparkSession.sessionState.planner, buildPlan) // Using `sparkPlan` is a little hacky as it is based on the assumption that this rule is // the first to be applied (apart from `InsertAdaptiveSparkPlan`). val canReuseExchange = SQLConf.get.exchangeReuseEnabled && buildKeys.nonEmpty && plan.find { case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _) => left.sameResult(sparkPlan) case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right) => right.sameResult(sparkPlan) case _ => false }.isDefined if (canReuseExchange) { val mode = broadcastMode(buildKeys, buildPlan) val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, sparkPlan) // plan a broadcast exchange of the build side of the join val exchange = BroadcastExchangeExec(mode, executedPlan) val name = s"dynamicpruning#${exprId.id}" // place the broadcast adaptor for reusing the broadcast results on the probe side val broadcastValues = SubqueryBroadcastExec(name, broadcastKeyIndex, buildKeys, exchange) DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId)) } else if (onlyInBroadcast) { // it is not worthwhile to execute the query, so we fall-back to a true literal DynamicPruningExpression(Literal.TrueLiteral) } else { // we need to apply an aggregate on the buildPlan in order to be column pruned val alias = Alias(buildKeys(broadcastKeyIndex), buildKeys(broadcastKeyIndex).toString)() val aggregate = Aggregate(Seq(alias), Seq(alias), buildPlan) DynamicPruningExpression(expressions.InSubquery( Seq(value), ListQuery(aggregate, childOutputs = aggregate.output))) } } }
如果沒有開啟動態(tài)分區(qū)裁剪,則直接跳過
QueryExecution.createSparkPlan( sparkSession, sparkSession.sessionState.planner, buildPlan)
通過邏輯計(jì)劃構(gòu)造物理計(jì)劃
判斷是否reuseExchange,如果spark.sql.exchange.reuse配置為true,且存在join的是broadcastHashjoin,而且計(jì)算結(jié)果和要進(jìn)行過濾的物理計(jì)劃的結(jié)果一樣,則進(jìn)行下一步,
進(jìn)行物理計(jì)劃執(zhí)行前的準(zhǔn)備, 得到executedPlan
構(gòu)建BroadcastExchangeExec,broadcastValues,InSubqueryExec,DynamicPruningExpression,BroadcastExchangeExec內(nèi)部就是進(jìn)行spark的broadcast操作 注意:這里的BroadcastExchangeExec會在ReuseExchange規(guī)則中被優(yōu)化, 最終會被BroadcastQueryStageExec調(diào)用,從而公用同一個broacast的值
如果以上不滿足,默認(rèn)DynamicPruningExpression(Literal.TrueLiteral),也就是不會進(jìn)行裁剪
如果不是broadcastHashjoin,但是能夠加速,則按照需要過濾的key做一次聚合,之后再組成DynamicPruningExpression
至此動態(tài)裁剪的物理計(jì)劃優(yōu)化就分析完了
“spark的動態(tài)分區(qū)裁剪下物理計(jì)劃怎么實(shí)現(xiàn)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!