這篇文章將為大家詳細(xì)講解有關(guān)Spark Join原理是什么,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對相關(guān)知識(shí)有一定的了解。
創(chuàng)新互聯(lián)堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:成都網(wǎng)站制作、做網(wǎng)站、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時(shí)代的南昌網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!
數(shù)據(jù)分析中將兩個(gè)數(shù)據(jù)集進(jìn)行 Join 操作是很常見的場景。在 Spark 的物理計(jì)劃階段,Spark 的 Join Selection 類會(huì)根 據(jù) Join hints 策略、Join 表的大小、 Join 是等值 Join 還是不等值以及參與 Join 的 key 是否可以排序等條件來選擇最 終的 Join 策略,最后 Spark 會(huì)利用選擇好的 Join 策略執(zhí)行最終的計(jì)算。當(dāng)前 Spark 一共支持五種 Join 策略:
Broadcast hash join (BHJ)
Shuffle hash join(SHJ)
Shuffle sort merge join (SMJ)
Shuffle-and-replicate nested loop join,又稱笛卡爾積(Cartesian product join)
Broadcast nested loop join (BNLJ)
其中 BHJ
和 SMJ
這兩種 Join 策略是我們運(yùn)行 Spark 作業(yè)最常見的。JoinSelection
會(huì)先根據(jù) Join
的 Key 為等值 Join 來選擇 Broadcast hash join
、Shuffle hash join
以及 Shuffle sort merge join
中的一個(gè);如果 Join 的 Key 為不等值 Join 或者沒有指定 Join 條件,則會(huì)選擇 Broadcast nested loop join
或 Shuffle-and-replicate nested loop join
。 不同的 Join 策略在執(zhí)行上效率差別很大,了解每種 Join 策略的執(zhí)行過程和適用條件是很有必要的。
Broadcast Hash Join
的實(shí)現(xiàn)是將小表的數(shù)據(jù)廣播到 Spark
所有的 Executor
端,這個(gè)廣播過程和我們自己去廣播數(shù) 據(jù)沒什么區(qū)別:
利用 collect 算子將小表的數(shù)據(jù)從 Executor 端拉到 Driver 端 在 Driver 端調(diào)用 sparkContext.broadcast 廣播到所有 Executor 端 在 Executor 端使用廣播的數(shù)據(jù)與大表進(jìn)行 Join 操作(實(shí)際上是執(zhí)行map操作)
這種 Join 策略避免了 Shuffle 操作。一般而言,Broadcast Hash Join 會(huì)比其他 Join 策略執(zhí)行的要快。
使用這種 Join 策略必須滿足以下條件: 小表的數(shù)據(jù)必須很小,可以通過 spark.sql.autoBroadcastJoinThreshold
參數(shù)來配置,默認(rèn)是 10MB 如果內(nèi)存比較大,可以將閾值適當(dāng)加大 將 spark.sql.autoBroadcastJoinThreshold
參數(shù)設(shè)置為 -1,可以關(guān)閉這種連接方式 只能用于等值 Join,不要求參與 Join 的 keys 可排序
當(dāng)表中的數(shù)據(jù)比較大,又不適合使用廣播,這個(gè)時(shí)候就可以考慮使用 Shuffle Hash Join
。 Shuffle Hash Join
同樣是在大表和小表進(jìn)行 Join 的時(shí)候選擇的一種策略。它的計(jì)算思想是:把大表和小表按照相同 的分區(qū)算法和分區(qū)數(shù)進(jìn)行分區(qū)(根據(jù)參與 Join 的 keys 進(jìn)行分區(qū)),這樣就保證了 hash 值一樣的數(shù)據(jù)都分發(fā)到同一 個(gè)分區(qū)中,然后在同一個(gè) Executor 中兩張表 hash 值一樣的分區(qū)就可以在本地進(jìn)行 hash Join 了。在進(jìn)行 Join 之 前,還會(huì)對小表的分區(qū)構(gòu)建 Hash Map。Shuffle hash join
利用了分治思想,把大問題拆解成小問題去解決。
要啟用 Shuffle Hash Join
必須滿足以下條件: 僅支持等值 Join,不要求參與 Join 的 Keys 可排序 spark.sql.join.preferSortMergeJoin
參數(shù)必須設(shè)置為 false,參數(shù)是從 Spark 2.0.0 版本引入的,默認(rèn)值為 true,也就是默認(rèn)情況下選擇 Sort Merge Join 小表的大小(plan.stats.sizeInBytes
)必須小于 spark.sql.autoBroadcastJoinThreshold
* spark.sql.shuffle.partitions
(默認(rèn)值200) 而且小表大?。╯tats.sizeInBytes)的三倍必須小于等于大表的大小(stats.sizeInBytes),也就是 a.stats.sizeInBytes * 3 < = b.stats.sizeInBytes
前面兩種 Join 策略對表的大小都有條件的,如果參與 Join 的表都很大,這時(shí)候就得考慮用 Shuffle Sort Merge Join 了。 Shuffle Sort Merge Join
的實(shí)現(xiàn)思想: 將兩張表按照 join key
進(jìn)行shuffle
,保證join key
值相同的記錄會(huì)被分在相應(yīng)的分區(qū) 對每個(gè)分區(qū)內(nèi)的數(shù)據(jù)進(jìn)行排序 排序后再對相應(yīng)的分區(qū)內(nèi)的記錄進(jìn)行連接 無論分區(qū)有多大,Sort Merge Join
都不用把一側(cè)的數(shù)據(jù)全部加載到內(nèi)存中,而是即用即丟;因?yàn)閮蓚€(gè)序列都有序。從 頭遍歷,碰到key相同的就輸出,如果不同,左邊小就繼續(xù)取左邊,反之取右邊。從而大大提高了大數(shù)據(jù)量下sql join
的穩(wěn)定性。
要啟用 Shuffle Sort Merge Join
必須滿足以下條件:
僅支持等值 Join
,并且要求參與 Join
的 Keys 可排序
如果 Spark 中兩張參與 Join
的表沒指定連接條件,那么會(huì)產(chǎn)生 Cartesian product join,
這個(gè) Join 得到的結(jié)果其實(shí)
就是兩張表行數(shù)的乘積。
可以把 Broadcast nested loop join 的執(zhí)行看做下面的計(jì)算:
for record_1 in relation_1
:
for record_2 in relation_2:
join condition is executed
可以看出 Broadcast nested loop join 在某些情況會(huì)對某張表重復(fù)掃描多次,效率非常低下。從名字可以看出,這種
join 會(huì)根據(jù)相關(guān)條件對小表進(jìn)行廣播,以減少表的掃描次數(shù)。
Broadcast nested loop join
支持等值和不等值 Join,支持所有的 Join 類型。
關(guān)于Spark Join原理是什么就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。