這篇文章將為大家詳細(xì)講解有關(guān)Hive數(shù)據(jù)傾斜的示例分析,小編覺得挺實(shí)用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
網(wǎng)站設(shè)計(jì)制作、做網(wǎng)站介紹好的網(wǎng)站是理念、設(shè)計(jì)和技術(shù)的結(jié)合。創(chuàng)新互聯(lián)擁有的網(wǎng)站設(shè)計(jì)理念、多方位的設(shè)計(jì)風(fēng)格、經(jīng)驗(yàn)豐富的設(shè)計(jì)團(tuán)隊(duì)。提供PC端+手機(jī)端網(wǎng)站建設(shè),用營銷思維進(jìn)行網(wǎng)站設(shè)計(jì)、采用先進(jìn)技術(shù)開源代碼、注重用戶體驗(yàn)與SEO基礎(chǔ),將技術(shù)與創(chuàng)意整合到網(wǎng)站之中,以契合客戶的方式做到創(chuàng)意性的視覺化效果。
首先介紹 “數(shù)據(jù)傾斜” 的概念。
“傾斜”應(yīng)該來自于統(tǒng)計(jì)學(xué)里的的偏態(tài)分布,數(shù)據(jù)處理中的傾斜和此相關(guān)。
對于分布式數(shù)據(jù)處理來說,我們希望數(shù)據(jù)平均分布到每個處理節(jié)點(diǎn),但是實(shí)際上由于業(yè)務(wù)數(shù)據(jù)本身的問題或者分布算法的問題,每個節(jié)點(diǎn)分配到的數(shù)據(jù)量很可能并不是我們預(yù)想的那樣。
也就是說,只有待分到最多數(shù)據(jù)的節(jié)點(diǎn)處理完數(shù)據(jù),整個數(shù)據(jù)處理任務(wù)才能完成,時(shí)分布式的意義就大打折扣 ,想想那個卡死的 99% 。
實(shí)際上,即使每個節(jié)點(diǎn)分配到的數(shù)據(jù)量大致相同,數(shù)據(jù)仍可能傾斜,比如考慮統(tǒng)計(jì)詞頻的極端問題,如果某個節(jié)點(diǎn)分配到的詞都是一個詞,那么顯此節(jié)點(diǎn)需要的耗時(shí)將很長,即使其數(shù)據(jù)量和其他節(jié)點(diǎn)的數(shù)據(jù)量相同。
Hive 的優(yōu)化正是采用各種措施和方法對上述場景的傾斜問題進(jìn)行優(yōu)化和處理。
其實(shí)在實(shí)際 Hive SQL 開發(fā)的過程中, Hive SQL 性能的問題上實(shí)際只有一小部分和數(shù)據(jù)傾相關(guān)。
很多時(shí)候, Hive SQL 運(yùn)行得慢是由開發(fā)人員對于使用的數(shù)據(jù)了解不夠以及一些不良的使用習(xí)慣引起的。
開發(fā)人員 要確定以下幾點(diǎn):
「需要計(jì)算的指標(biāo)真的需要從數(shù)據(jù)倉庫的公共明細(xì)層來自行匯總么?」 是不是數(shù)據(jù)公共層團(tuán)隊(duì)開發(fā)的公共匯總層已經(jīng)可以滿足自己的需求?對于大眾的、 KPI 相關(guān)的指標(biāo)等通常設(shè)計(jì)良好的數(shù)據(jù)倉庫公共層肯定已經(jīng)包含了,直接使用即可。
「真的需要掃描這么多分區(qū)么?」 比如對于銷售明細(xì)事務(wù)表來說,掃描一年的分區(qū)和掃描一周的分區(qū)所帶來的計(jì)算、 IO 開銷完全是兩個量級,所耗費(fèi)的時(shí)間肯定也是不同的。作為開發(fā)人員,我們需要仔細(xì)考慮業(yè)務(wù)的需求,盡量不要浪費(fèi)計(jì)算和存儲資源!
「盡量不要使用 select * from your_table 這樣的方式,用到哪些列就指定哪些列?!?如 select coll, col2 from your_table ,另外, where 條件中也盡量添加過濾條件,以去掉無關(guān)的數(shù)據(jù)行,從而減少整個 MapReduce 任務(wù)中需要處理、分發(fā)的數(shù)據(jù)量
「輸入文件不要是大量的小文件?!?Hive 的默認(rèn) Input Split 是 128MB (可配置),小文件可先合并成大文件。
在保證了上述幾點(diǎn)之后,有的時(shí)候發(fā)現(xiàn) Hive SQL 還是要運(yùn)行很長時(shí)間,甚至運(yùn)行不出來, 這時(shí)就需要真正的 Hive 優(yōu)化技術(shù)了!
Hive SQL 性能問題基本上大部分都和 join 相關(guān),對于和 join 無關(guān)的問題主要有 group by 相關(guān)的傾斜和 count distinct 相關(guān)的優(yōu)化。
group by 引起的傾斜優(yōu)化
group by 引起的傾斜主要是輸入數(shù)據(jù)行按照 「group by 列分布不均勻」 引起的。
比如,假設(shè)按照供應(yīng)商對銷售明細(xì)事實(shí)表來統(tǒng)計(jì)訂單數(shù),那么部分大供應(yīng)商的訂單量顯然非常多,而多數(shù)供應(yīng)商的訂單量就一般,由于 group by 的時(shí)候是按照供應(yīng)商的 ID 分發(fā)到每個 Reduce Task ,那么此時(shí)分配到大供應(yīng)商的 Reduce Task 就分配了更多的訂單,從而導(dǎo)致數(shù)據(jù)傾斜。
對于 group by 引起的傾斜,優(yōu)化措施非常簡單,只需設(shè)置下面參數(shù)即可:
set hive.map.aggr = true set hive.groupby.skewindata=true
此時(shí)Hive 在數(shù)據(jù)傾斜的時(shí)候會進(jìn)行負(fù)載均衡,生成的查詢計(jì)劃會有兩個 MapReduce Job。
第一個 MapReduce Job 中,Map 的輸出結(jié)果集合會隨機(jī)分布到 Reduce 中,每個Reduce 做部分聚合操作并輸出結(jié)果。這樣處理的結(jié)果是相同的 GroupBy Key 有可能被分布到不同的 Reduce 中,從而達(dá)到負(fù)載均衡的目的;
第二個 MapReduce Job 再根據(jù)預(yù)處理的數(shù)據(jù)結(jié)果,按照 GroupBy Key 分布到 Reduce 中(這過程可以保證相同的GroupBy Key被分布到同一個Reduce中),最后完成最終的聚合操作。
count distinct 優(yōu)化
在 Hive 開發(fā)過程中,應(yīng)該小心使用 count distinct ,因?yàn)楹苋菀滓鹦阅軉栴},比如下面的 SQL:
select count(distinct user) from some_table
由于必須去重,因此 Hive 將會把 Map 階段的輸出全部分布到 Reduce Task 上,此時(shí)很容易引起性能問題。對于這種情況,可以通過先 group by 再 count 的方式來優(yōu)化,優(yōu)化后的 SQL 如下:
select count(*) from ( select user from some_table group by user ) tmp;
原理為:先利用 group by 去重,再統(tǒng)計(jì) group by 的行數(shù)目。
join 相關(guān)的優(yōu)化主要分為 mapjoin 可以解決的優(yōu)化 ( 即大表 join 小表) 和 mapjoin 無法解決的優(yōu)化( 即大表 join 大表 )。大表 join 小表相對容易解決,大表 join 大表相對復(fù)雜和難以解決,但也不是不可解決的,只是相對比較麻煩而已。
首先介紹大表 join 小表優(yōu)化 。仍以銷售明細(xì)事實(shí)表為例來說明大表 join 小表的場景。
假如供應(yīng)商會進(jìn)行評級,比如(五星、四星、 兩星、 一星),此時(shí)業(yè)務(wù)人員希望能夠分析各供應(yīng)商星級的每天銷售情況及其占比。
開發(fā)人員一般會寫出如下 SQL:
select Seller_srar, count(order_id) as ordre_cnt from ( select order_id,seller_id from dwd_sls_fact_detail_table where partition_value ='20170101' ) a Left outer join( select seller_id,seller_star from dim_seller where partition_value='20170101' ) b on a.seller_id = b.seller_id group by b.seller_star;
但正如上述所言,現(xiàn)實(shí)世界的二八準(zhǔn)則將導(dǎo)致訂單集中在部分供應(yīng)商上,而好的供應(yīng)商的評級通常會更高,此時(shí)更加劇了數(shù)據(jù)傾斜的程度。如果不加以優(yōu)化,上述 SQL 將會耗費(fèi)很長時(shí)間,甚至運(yùn)行不出結(jié)果!
通常來說,供應(yīng)商是有限的,比如上千家、上萬家,數(shù)據(jù)量不會很大,而銷售明細(xì)事實(shí)表比較大,這就是典型的大表 join 小表問題,可以通過 mapjoin 的方式來優(yōu)化,只需添加 mapjoin hint 即可,優(yōu)化后的 SQL 如下:
select /*+mapjoin(b)*/ Seller_srar, count(order_id) as ordre_cnt from ( select order_id,seller_id from dwd_sls_fact_detail_table where partition_value ='20170101' ) a Left outer join( select seller_id,seller_star from dim_seller where partition_value='20170101' ) b on a.seller_id = b.seller_id group by b.seller_star;
/*+mapjoin(b)*/ 即 mapjoin himt,如果需要 mapjoin 多個表,則格式為/*+mapjoin(b,c,d)*/ 。
Hive 對于 mapjoin 是默認(rèn)開啟的,設(shè)置參數(shù)為:
Set hive.auto.convert.join=ture;
mapjoin 優(yōu)化是在 Map 階段進(jìn)行 join ,而不是像通常那樣在 Reduce 階段按照 join 列進(jìn)行分發(fā)后在每個 Reduce 任務(wù)節(jié)點(diǎn)上進(jìn)行 join ,不需要分發(fā)也就沒有傾斜的問題,相反 Hive 會將小表全量復(fù)制到每個 Map 任務(wù)節(jié)點(diǎn)(對于本例是 dim_seller ,當(dāng)然僅全量復(fù)制 b表 sql 指定的列),然后每個 Map 任務(wù)節(jié)點(diǎn)執(zhí)行 lookup 小表即可。
「從上述分析可以看出,小表不能太大,否則全量復(fù)制分發(fā)得不償失?!?/p>
實(shí)際上 Hive 根據(jù)參數(shù) hive.mapjoin.smalltable.filesize ( 0.11.0 本后是 hive.auto.convert.join.noconditionaltask.size )來確定小表的大小是否滿足條件(默認(rèn) 25M)。
實(shí)際中此參數(shù)值所允許的最大值可以修改,但是一般最大不能超過 1GB (太大的話 Map 任務(wù)所在的節(jié)點(diǎn)內(nèi)存會撐爆, Hive 會報(bào)錯 。另外需要注意的是, HDFS 顯示的文件大小是壓縮后的大小, 當(dāng)實(shí)際加載到內(nèi)存的時(shí)候,容量會增大很多,很多場景下可能會膨脹 10 倍)。
如果上述 mapjoin 中小表 dim_seller 很大呢?比如超過了 1GB 的大小?這種就是大表join 大表的問題 。
這類問題相對比較復(fù)雜,我們首先引入具體的問題場景,然后基于此介紹各種優(yōu)化方案。
問題場景
我們先假設(shè)一個問題場景:
A 表為一個匯總表,匯總的是賣家買家最近 N 天交易匯總信息,即對于每個賣家最近 N 天,其每個買家共成交了多少單、總金額是多少,我們這里 N 先只取 90 天,匯總值僅取成交單數(shù) 。A 表的字段有:buyer_id 、seller_id 和 pay_cnt_90d 。
B 表為賣家基本信息表,其中包含賣家的一個分層評級信息,比如把賣家分為 6 個級別:S0、S1、S2、S3、S4、S5、S6 。
要獲得的結(jié)果是每個買家在各個級別賣家的成交比例信息,比如:
某買家 S0:10%; S1:20%; S2:20%; S3:10%; S4:20%; S4:10%; S5:10%。
B表的字段有:seller_id 和 s_level。
正如 mapjoin 中的例子一樣,我們的第一反應(yīng)是直接 join 表并統(tǒng)計(jì):
select m.buyer_id ,sum(pay_cnt_90d) as pay_cnt_90d ,sum(case when m.s_level=O then pay_cnt_90d end) as pay_cnt_90d_s0 ,sum(case when m.s_level=l then pay_cnt_90d end) as pay_cnt_90d_sl ,sum(case when m.s_level=2 then pay_cnt_90d end) as pay_cnt_90d_s2 ,sum(case when m.s level=3 then pay cnt 90d end) as pay_cnt_90d_s3 ,sum(case when m.s_level=4 then pay_cnt_90d end) as pay_cnt_90d_s4 ,sum(case when m.s_level=S then pay_cnt_90d end) as pay_cnt_90d_s5 from ( select a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d from ( select buyer_id ,seller_id,pay_cnt_90d from table A ) a join ( select seller_id,s_level from table B ) b on a.seller_id=b.seller_id ) m group by m.buyer_id
但是此 SQL 會引起數(shù)據(jù)傾斜,原因在于賣家的二八準(zhǔn)則。某些賣家 90 天內(nèi)會有幾百萬甚至上千萬的買家,但是大部分賣家 90 天內(nèi)的買家數(shù)目并不多, join table_A 和table_B 的時(shí)候 ODPS 會按照 Seller_id 進(jìn)行分發(fā), table_A 的大賣家引起了數(shù)據(jù)傾斜。
「但是本數(shù)據(jù)傾斜問題無法用 mapjoin table_B 解決,因?yàn)橘u家有超過千萬條、文件大小幾個GB ,超過了 mapjoin 表最大 1GB 的限制?!?/p>
方案 1:轉(zhuǎn)化為 mapjoin
大表無法直接mapjoin,那么是否可以間接呢?實(shí)際上此思路有兩種途徑:限制行和限制列。
限制行: 不需要join B全表,只需要join其在A表中存在的。對于本問題場景,就是過濾掉 90 天內(nèi)沒有成交的賣家。
限制列: 只取需要的字段。
select m.buyer_id ,sum(pay_cnt_90d) as pay_cnt_90d ,sum(case when m.s_level=O then pay_cnt_90d end) as pay_cnt_90d_s0 ,sum(case when m.s_level=l then pay_cnt_90d end) as pay_cnt_90d_sl ,sum(case when m.s_level=2 then pay_cnt_90d end) as pay_cnt_90d_s2 ,sum(case when m.s level=3 then pay cnt 90d end) as pay_cnt_90d_s3 ,sum(case when m.s_level=4 then pay_cnt_90d end) as pay_cnt_90d_s4 ,sum(case when m.s_level=S then pay_cnt_90d end) as pay_cnt_90d_s5 from ( select /*+mapjoin(b)*/ a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d from ( select buyer_id ,seller_id,pay_cnt_90d from table_A ) a join ( select b0.seller id,s_level from table_B b0 join (select seller_id from table_A group by seller_id) a0 on b0.seller_id=a0.seller_id ) b on a.seller_id=b.seller_id ) m group by m.buyer_id
此方案在一些情況下可以起作用,但很多時(shí)候還是無法解決上述問題,因?yàn)榇蟛糠仲u家盡管 90 買家不多 ,但還是有一些的,過濾后的 B 表仍然很大。
方案 2:join 時(shí)用 case when 語句
應(yīng)用場景為: 傾斜的值是明確的而且數(shù)量很少,比如null值引起的傾斜。
將這些引起傾斜的值隨機(jī)分發(fā)到Reduce,其主要核心邏輯在于 join 時(shí)對這些特殊值concat 隨機(jī)數(shù),從而達(dá)到隨機(jī)分發(fā)的目的。核心邏輯如下:
Select a.user_id,a.order_id,b.user_id From table_a a Join table_b b On (case when a.user_id is null then concat ('hive' ,rand()) else a.user_id end)=b.user_id
Hive已對此進(jìn)行了優(yōu)化,不需要修改SQL,只需要設(shè)置參數(shù);比如 table_B 的值 "0" 和 "1" 引起傾斜,只需要如下設(shè)置:
set hive.optimize.skewinfo=table_B:(seller_id)[("0")("1")]; set hive.optimize.skewjoin=true;
但是方案二還是不能解決上述問題,因?yàn)閮A斜的賣家大量存在而且動態(tài)變化。
方案 3:倍數(shù)B表,再取模join
通用方案
是建立一個numbers表,其值只有一列int行,比如從1到10(具體根據(jù)傾斜程度確定),然后放大B表10倍,再取模join。
select m,buer_id ,sum(pay_cnt_90d) as pay_cnt_90d ,sum(case when m.s_level=O then pay_cnt_90d end) as pay cnt 90d so ,sum(case when m.s_level=l then pay cnt 90d end) as pay cnt 90d_sl ,sum(case when m.s_level=2 then pay_cnt_90d end) as pay_cnt_90d s2 ,sum(case when m.s_level=3 then pay_cnt_90d end) as pay_cnt_90d_s3 ,sum(case when m.s_level=4 then pay_cnt_90d end) as pay cnt 90d s4 ,sum(case when m.s level=S then pay cnt 90d end) as pay cnt 90d s5 from ( select a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d from ( select buyer_id,seller_id,pay_cnt_90d from table_A ) a JOin ( select /*+mapjoin(members)*/ seller_id,s_level,member from table_B join numbers ) b on a.seller_id=b.seller_id and mod(a.pay_cnt_90d,10)+1=b.number ) m group by m.buyer_id
思路核心在于:既然按照seller_id分發(fā)會傾斜,那么再人工增加一列進(jìn)行分發(fā),這樣之前傾斜的值的傾斜程度會減少為原來的1/10??梢酝ㄟ^配置numbers表修改放大倍數(shù)來降低傾斜程度,但弊端就是B表會膨脹N倍。
專有方案
通用方案思路是把B表的每條數(shù)據(jù)都放大了相同的倍數(shù),實(shí)際上只需要把大賣家放大倍數(shù)即可。
首先需要知道大賣家的名單,即先建立一個臨時(shí)表動態(tài)存放每日最新的大賣家(比如dim_big_seller),同時(shí)此表的大賣家要膨脹預(yù)先設(shè)定的倍數(shù)(比如1000倍)。
在A表和 B表中分別新建一個 join 列,其邏輯為:如果是大賣家,那么 concat 一個隨 機(jī)分配正整數(shù)(0到預(yù)定義的倍數(shù)之間,本例為0~1000 );如果不是,保持不變。
相比通用方案,專用方案的運(yùn)行效率明顯好了很多,因?yàn)橹皇菍表中大賣家的行數(shù)放大了 1000 倍,其他賣家的行數(shù)保持不變,但同時(shí)也可以看到代碼也復(fù)雜了很多,而且必須首先建立大賣家表。
方案 4:動態(tài)一分為二
實(shí)際上方案 2 和 3 都用到了一分為二的思想,但是都不徹底,對于 mapjoin 不能解決的 問題,終極解決方案就是動態(tài)一分為 ,即對傾斜的鍵值和不傾斜的鍵值分開處理,不傾 斜的正常 join 即可,傾斜的把它們找出來然后做 mapjoin ,最后 union all 其結(jié)果即可。
但是此種解決方案比較麻煩,代碼會變得復(fù)雜而且需要一個臨時(shí)表存放傾斜的鍵值。
- 對于 90 天買家數(shù)超過 10000 的賣家直接 map join ,對于其他賣家正常 join 即可
- 對于 90 天買家數(shù)超過 10000 的賣家直接 map join ,對于其他賣家正常 join 即可 select m.buyer_id ,sum(pay_cnt_90d) as pay_cnt_90d ,sum(case when rn.s_level=O then pay_cnt_90d end) as pay_cnt_90d_s0 ,sum(case when rn.s_level=l then pay_cnt_90d end) as pay_cnt_90d_sl ,sum(case when rn.s_level=2 then pay_cnt_90d end) as pay_cnt_90d_s2 ,sum(case when rn.s_level=3 then pay_cnt_90d end) as pay_cnt_90d_s3 ,sum(case when rn.s_level=4 then pay_cnt_90d end) as pay_cnt_90d_s4 ,sum(case when rn.s_level=S then pay_cnt_90d end) as pay_cnt_90d_s5 from ( select a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d from ( select buyer_id,seller_id,pay_cnt_90d from table_A ) a join ( select seller_id ,a.s_level from table_A a left outer join tmp_table_B b on a.user_id = b.seller_id where b.seller_id is null ) b on a.seller id=b.seller id union all select /*+mapjoin(b)*/ a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d from select buyer_id,seller_id,pay_cnt_90d from table A ) a join select seller_id,s_level from table B ) b on a.seller id=b.seller id ) m group by m.buyer_id ) m group by m.byer_id
總結(jié)起來,方案 1、2 以及方案 3 中的通用方案不能保證解決大表 join 大表問題,因?yàn)樗鼈兌即嬖诜N種不同的限制和特定的使用場景。
而方案 3 的專用方案和方案 4 是比較推薦的優(yōu)化方案,但是它們都需要新建一個臨時(shí)表來存放每日動態(tài)變化的大賣家 。
相對方案 4 來說,方案 3 的專用方案不需要對代碼框架進(jìn)行修改,但是 B 表會被放大,所以一定要是維度表,不然統(tǒng)計(jì)結(jié)果會是錯誤的 。方案 4 的解決方案最通用,自由度最高,但是對代碼的更改也最大,甚至需要更改代碼框架,可作為終極方案來使用。
關(guān)于“Hive數(shù)據(jù)傾斜的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學(xué)到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。