這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)碛嘘P(guān)Spark SQL中Not in Subquery為何低效以及如何規(guī)避,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
十余年的濉溪網(wǎng)站建設(shè)經(jīng)驗(yàn),針對(duì)設(shè)計(jì)、前端、開發(fā)、售后、文案、推廣等六對(duì)一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。營銷型網(wǎng)站建設(shè)的優(yōu)勢(shì)是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整濉溪建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。創(chuàng)新互聯(lián)公司從事“濉溪網(wǎng)站設(shè)計(jì)”,“濉溪網(wǎng)站推廣”以來,每個(gè)客戶項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。
首先看個(gè)Not in Subquery的SQL:
// test_partition1 和 test_partition2為Hive外部分區(qū)表select * from test_partition1 t1 where t1.id not in (select id from test_partition2);
== Parsed Logical Plan ==
'Project [*]
+- 'Filter NOT 't1.id IN (list#3 [])
: +- 'Project ['id]
: +- 'UnresolvedRelation `test_partition2`
+- 'SubqueryAlias `t1`
+- 'UnresolvedRelation `test_partition1`
== Analyzed Logical Plan ==
id: string, name: string, dt: string
Project [id#4, name#5, dt#6]
+- Filter NOT id#4 IN (list#3 [])
: +- Project [id#7]
: +- SubqueryAlias `default`.`test_partition2`
: +- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9]
+- SubqueryAlias `t1`
+- SubqueryAlias `default`.`test_partition1`
+- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]
== Optimized Logical Plan ==
Join LeftAnti, ((id#4 = id#7) || isnull((id#4 = id#7)))
:- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]
+- Project [id#7]
+- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9]
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, LeftAnti, ((id#4 = id#7) || isnull((id#4 = id#7)))
:- Scan hive default.test_partition1 [id#4, name#5, dt#6], HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]
+- BroadcastExchange IdentityBroadcastMode
+- Scan hive default.test_partition2 [id#7], HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9]
提起B(yǎng)roadcastNestedLoopJoin,不得不提Nested Loop Join,它在很多RDBMS中得到應(yīng)用,比如MySQL。它的工作方式是循環(huán)從一張表(outer table)中讀取數(shù)據(jù),然后訪問另一張表(inner table,通常有索引),將outer表中的每一條數(shù)據(jù)與inner表中的數(shù)據(jù)進(jìn)行join,類似一個(gè)嵌套的循環(huán)并且在循環(huán)的過程中進(jìn)行數(shù)據(jù)的比對(duì)校驗(yàn)是否滿足一定條件。
對(duì)于被連接的數(shù)據(jù)集較小的情況下,Nested Loop Join是個(gè)較好的選擇。但是當(dāng)數(shù)據(jù)集非常大時(shí),從它的執(zhí)行原理可知,效率會(huì)很低甚至可能影響整個(gè)服務(wù)的穩(wěn)定性。
而Spark SQL中的BroadcastNestedLoopJoin就類似于Nested Loop Join,只不過加上了廣播表(build table)而已。
BroadcastNestedLoopJoin是一個(gè)低效的物理執(zhí)行計(jì)劃,內(nèi)部實(shí)現(xiàn)將子查詢(select id from test_partition2)進(jìn)行廣播,然后test_partition1每一條記錄通過loop遍歷廣播的數(shù)據(jù)去匹配是否滿足一定條件。
private def leftExistenceJoin(
// 廣播的數(shù)據(jù)
relation: Broadcast[Array[InternalRow]],
exists: Boolean): RDD[InternalRow] = {
assert(buildSide == BuildRight)
/* streamed對(duì)應(yīng)物理計(jì)劃中:
Scan hive default.test_partition1 [id#4, name#5, dt#6], HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]
*/
streamed.execute().mapPartitionsInternal { streamedIter =>
val buildRows = relation.value
val joinedRow = new JoinedRow
// 條件是否定義。此處為Some(((id#4 = id#7) || isnull((id#4 = id#7))))
if (condition.isDefined) {
streamedIter.filter(l =>
// exists主要是為了根據(jù)joinType來進(jìn)一步條件判斷數(shù)據(jù)的返回與否,此處joinType為L(zhǎng)eftAnti
buildRows.exists(r => boundCondition(joinedRow(l, r))) == exists
)
// else
} else if (buildRows.nonEmpty == exists) {
streamedIter
} else {
Iterator.empty
}
}
}
由于BroadcastNestedLoopJoin的低效率執(zhí)行,可能導(dǎo)致長(zhǎng)時(shí)間占用executor資源,影響集群性能。同時(shí),因?yàn)樽硬樵兊慕Y(jié)果集要進(jìn)行廣播,如果數(shù)據(jù)量特別大,對(duì)driver端也是一個(gè)嚴(yán)峻的考驗(yàn),極有可能帶來OOM的風(fēng)險(xiǎn)。因此,在實(shí)際生產(chǎn)中,要盡可能利用其他效率相對(duì)高的SQL來避免使用Not in Subquery。
雖然通過改寫Not in Subquery的SQL,進(jìn)行低效率的SQL到高效率的SQL過渡,能夠避免上面所說的問題。但是這往往建立在我們發(fā)現(xiàn)任務(wù)執(zhí)行慢甚至失敗,然后排查任務(wù)中的SQL,發(fā)現(xiàn)"問題"SQL的前提下。那么如何在任務(wù)執(zhí)行前,就"檢查"出這樣的SQL,從而進(jìn)行提前預(yù)警呢?
這里筆者給出一個(gè)思路,就是解析Spark SQL計(jì)劃,根據(jù)Spark SQL的join策略匹配條件等,來判斷任務(wù)中是否使用了低效的Not in Subquery進(jìn)行預(yù)警,然后通知業(yè)務(wù)方進(jìn)行修改。同時(shí),我們?cè)趯?shí)際完成數(shù)據(jù)的ETL處理等分析時(shí),也要事前避免類似的低性能SQL。
上述就是小編為大家分享的Spark SQL中Not in Subquery為何低效以及如何規(guī)避了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。