這篇文章給大家分享的是有關(guān)Hadoop中數(shù)據(jù)傾斜的示例分析的內(nèi)容。小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過(guò)來(lái)看看吧。
創(chuàng)新互聯(lián)建站是創(chuàng)新、創(chuàng)意、研發(fā)型一體的綜合型網(wǎng)站建設(shè)公司,自成立以來(lái)公司不斷探索創(chuàng)新,始終堅(jiān)持為客戶(hù)提供滿(mǎn)意周到的服務(wù),在本地打下了良好的口碑,在過(guò)去的十年時(shí)間我們累計(jì)服務(wù)了上千家以及全國(guó)政企客戶(hù),如成都履帶攪拌車(chē)等企業(yè)單位,完善的項(xiàng)目管理流程,嚴(yán)格把控項(xiàng)目進(jìn)度與質(zhì)量監(jiān)控加上過(guò)硬的技術(shù)實(shí)力獲得客戶(hù)的一致夸獎(jiǎng)。
數(shù)據(jù)分布:
正常的數(shù)據(jù)分布理論上都是傾斜的,就是我們所說(shuō)的20-80原理:80%的財(cái)富集中在20%的人手中, 80%的用戶(hù)只使用20%的功能 , 20%的用戶(hù)貢獻(xiàn)了80%的訪(fǎng)問(wèn)量 , 不同的數(shù)據(jù)字段可能的數(shù)據(jù)傾斜一般有兩種情況:
一種是唯一值非常少,極少數(shù)值有非常多的記錄值(唯一值少于幾千)
一種是唯一值比較多,這個(gè)字段的某些值有遠(yuǎn)遠(yuǎn)多于其他值的記錄數(shù),但是它的占比也小于百分之一或千分之一
分區(qū):
常見(jiàn)的mapreduce分區(qū)方式為hash 和range ,
hash partition 的好處是比較彈性,跟數(shù)據(jù)類(lèi)型無(wú)關(guān),實(shí)現(xiàn)簡(jiǎn)單(設(shè)定reduce個(gè)數(shù)就好,一般不需要自己實(shí)現(xiàn))
range partition 需要實(shí)現(xiàn)者自己了解數(shù)據(jù)分布, 有時(shí)候需要手工做sample取樣. 同時(shí)也不夠彈性, 表現(xiàn)在幾個(gè)方面,1. 對(duì)同一個(gè)表的不同字段都需要實(shí)現(xiàn)不同的range partition, 對(duì)于時(shí)間這種字段根據(jù)查詢(xún)類(lèi)型的不同或者過(guò)濾條件的不同切分range 的大小都不一定.
2 .有時(shí)候可能設(shè)計(jì)使用多個(gè)字段組合的情況, 這時(shí)候又不能使用之前單個(gè)字段的partition 類(lèi), 并且多個(gè)字段組合之間有可能有隱含的聯(lián)系,比如出生日期和星座,商品和季節(jié).
3. 手工做sample 非常耗時(shí)間,需要使用者對(duì)查詢(xún)使用的數(shù)據(jù)集的分布有領(lǐng)域知識(shí).
4. 分配方式是死的,reduce 個(gè)數(shù)是確定的,一旦某種情況下發(fā)生傾斜,調(diào)整參數(shù)
其他的分區(qū)類(lèi)型還有hbase 的hregionpartitioner 或者totalorder partitioner 等.
能夠想到的關(guān)于數(shù)據(jù)傾斜的一些解決方式(歡迎補(bǔ)充,尤其是有沒(méi)有做搜索或者數(shù)據(jù)挖掘的朋友有碰到類(lèi)似問(wèn)題):
1. 增加reduce 的jvm內(nèi)存
2. 增加reduce 個(gè)數(shù)
3. customer partition
4. 其他優(yōu)化的討論.
5. reduce sort merge排序算法的討論
6. 正在實(shí)現(xiàn)中的hive skewed join.
7. pipeline
8. distinct
9. index 尤其是bitmap index
方式1:既然reduce 本身的計(jì)算需要以合適的內(nèi)存作為支持,在硬件環(huán)境容許的情況下,增加reduce 的內(nèi)存大小顯然有改善數(shù)據(jù)傾斜的可能,這種方式尤其適合數(shù)據(jù)分布第一種情況,單個(gè)值有大量記錄, 這種值的所有紀(jì)錄已經(jīng)超過(guò)了分配給reduce 的內(nèi)存,無(wú)論你怎么樣分區(qū)這種情況都不會(huì)改變. 當(dāng)然這種情況的限制也非常明顯, 1.內(nèi)存的限制存在,2.可能會(huì)對(duì)集群其他任務(wù)的運(yùn)行產(chǎn)生不穩(wěn)定的影響.
方式2: 這個(gè)對(duì)于數(shù)據(jù)分布第二種情況有效,唯一值較多,單個(gè)唯一值的記錄數(shù)不會(huì)超過(guò)分配給reduce 的內(nèi)存. 如果發(fā)生了偶爾的數(shù)據(jù)傾斜情況,增加reduce 個(gè)數(shù)可以緩解偶然情況下的某些reduce 不小心分配了多個(gè)較多記錄數(shù)的情況. 但是對(duì)于第一種數(shù)據(jù)分布無(wú)效.
方式3: 一種情況是某個(gè)領(lǐng)域知識(shí)告訴你數(shù)據(jù)分布的顯著類(lèi)型,比如hadoop definitive guide 里面的溫度問(wèn)題,一個(gè)固定的組合(觀(guān)測(cè)站點(diǎn)的位置和溫度) 的分布是固定的, 對(duì)于特定的查詢(xún)?nèi)绻懊鎯煞N方式都沒(méi)用,實(shí)現(xiàn)自己的partitioner 也許是一個(gè)好的方式.
方式4: 目前有的一些針對(duì)數(shù)據(jù)傾斜的優(yōu)化比如pig 的skewed join
http://pig.apache.org/docs/r0.7.0/piglatin_ref1.html#Skewed+Joins
pig 文檔上面說(shuō)是根據(jù)數(shù)據(jù)輸入的統(tǒng)計(jì)信息來(lái)確定分區(qū)(也就是range partition?),另外不清楚這個(gè)行為是否是動(dòng)態(tài)運(yùn)行時(shí)候才決定的,也就是運(yùn)行之前有一步pig 自動(dòng)做sample 的工作,因?yàn)閜ig 是沒(méi)有統(tǒng)計(jì)信息這一說(shuō)的.
hive 中的group by
其中最后一個(gè)參數(shù)hive.groupby.mapaggr.checkinterval 的思路跟in-memory combiner 相似, in-memeory combiner 是發(fā)生在mapper 端sort 之前,而不是現(xiàn)在的combiner發(fā)生在mapper sort 之后甚至在寫(xiě)入磁盤(pán)之后重新讀磁盤(pán)然后排序合并. in-memeory combiner 最早好像是《Data-Intensive Text Processing with MapReduce》,mapr 去年的介紹ppt 里面好像提到它們也有這個(gè)優(yōu)化. mapper 端減少數(shù)據(jù)的機(jī)會(huì)比reduce 端的要大,所以一般不會(huì)看到reduce 端的combiner 的討論,但是這種思路也有,比如google tenzing 的join 討論里面有一個(gè)prev-next 的小優(yōu)化就是基于reduce 端的combiner, 但那個(gè)前提是基于block shuffle 實(shí)現(xiàn)的基礎(chǔ)上,數(shù)據(jù)已經(jīng)排過(guò)序了,所以join 時(shí)候前一條數(shù)據(jù)跟后一條數(shù)據(jù)相同的概率很大.
hive 中的skewed join : 之前的文章已經(jīng)介紹過(guò)兩表join 中hive 的幾個(gè)優(yōu)化,其中的skewed join 的類(lèi)似思路就是上面介紹的skewed 的第二種:增加reduce 的個(gè)數(shù),hive 中是通過(guò)判斷閾值如果大于一個(gè)reduce 需要處理的數(shù)據(jù)量,重新起額外的task 來(lái)處理這些超額的reduce 本身需要處理的數(shù)據(jù), 這是一種較晚的補(bǔ)救措施,本身hive 開(kāi)始分區(qū)的時(shí)候已經(jīng)傾斜(partition 的方式不合理), 當(dāng)運(yùn)行的時(shí)候通過(guò)運(yùn)行時(shí)監(jiān)控reduce 發(fā)現(xiàn)傾斜的特殊key 然后額外的起task 去處理,效果比較一般,感興趣的同學(xué)可以參考HIVE-3086 里面我和facebook 團(tuán)隊(duì)對(duì)這種優(yōu)化思路的討論. 第六節(jié)我會(huì)討論一下我所認(rèn)為的思路和facebook 正在做的思路之間的差別.
方式5 : reduce 分配的內(nèi)存遠(yuǎn)小于處理的數(shù)據(jù)量時(shí),會(huì)產(chǎn)生multi-pass sort 的情況是瓶頸,那么就要問(wèn)
1. 這種排序是有必要的嘛?
2. 是否有其他排序算法或優(yōu)化可以根據(jù)特定情況降低他瓶頸的閾值?
3. map reduce 適合處理這種情況嘛?
關(guān)于問(wèn)題1. 如果是group by , 那么對(duì)于數(shù)據(jù)分布情況1 ,hash 比sort 好非常多,即使某一個(gè)reduce 比其他reduce 處理多的多的數(shù)據(jù),hash 的計(jì)算方式也不會(huì)差距太大.
問(wèn)題2. 一個(gè)是如果實(shí)現(xiàn)block shuffle 肯定會(huì)極大的減少排序本身的成本, 另外,如果分區(qū)之后的reduce 不是使用copy –> sort-merge –> reduce 的計(jì)算方式, 在copy 之后將每個(gè)block 的頭部信息保存在內(nèi)存中,不用sort – merge 也可以直接計(jì)算reduce, 只不過(guò)這時(shí)候變成了隨機(jī)訪(fǎng)問(wèn),而不是現(xiàn)在的sort-merge 之后的順序訪(fǎng)問(wèn). block shuffle 的實(shí)現(xiàn)有兩種類(lèi)型,一種是當(dāng)hadoop 中真正有了列數(shù)據(jù)格式的時(shí)候,數(shù)據(jù)有更大的機(jī)會(huì)已經(jīng)排過(guò)序并且按照block 來(lái)切分,一般block 為1M ( 可以關(guān)注avro-806 ) , 這時(shí)候的mapper 什么都不做,甚至連計(jì)算分區(qū)的開(kāi)銷(xiāo)都小了很多倍,直接進(jìn)入reduce 最后一步,第二種類(lèi)型為沒(méi)有列數(shù)據(jù)格式的支持,需要mapper 排序得到之后的block 的最大最小值,reduce 端在內(nèi)存中保存最大最小值,copy 完成后直接用這個(gè)值來(lái)做隨機(jī)讀然后進(jìn)行reduce. ( block shuffle 的實(shí)現(xiàn)可以關(guān)注 MAPREDUCE-4039 , hash 計(jì)算可以關(guān)注 MAPREDUCE-1639)
問(wèn)題3 . map reduce 只有兩個(gè)函數(shù),一個(gè)map 一個(gè) reduce, 一旦發(fā)生數(shù)據(jù)傾斜就是partition 失效了,對(duì)于join 的例子,某一個(gè)key 分配了過(guò)多的記錄數(shù),對(duì)于只有一次partittion的機(jī)會(huì),分配錯(cuò)了數(shù)據(jù)傾斜的傷害就已經(jīng)造成了,這種情況很難調(diào)試,但是如果你是基于map-reduce-reduce 的方式計(jì)算,那么對(duì)于同一個(gè)key 不需要分配到同一個(gè)reduce 中,在第一個(gè)reduce 中得到的結(jié)果可以在第二個(gè)reduce 才匯總?cè)ブ兀诙€(gè)reduce 不需要sort – merge 的步驟,因?yàn)榍耙粋€(gè)reduce 已經(jīng)排過(guò)序了,中間的reduce 處理的數(shù)據(jù)不用關(guān)心partition 怎么分,處理的數(shù)據(jù)量都是一樣大,而第二個(gè)reduce 又不使用sort-merge 來(lái)排序,不會(huì)遇到現(xiàn)在的內(nèi)存大小的問(wèn)題,對(duì)于skewed join 這種情況瓶頸自然小很多.
方式6: 目前hive 有幾個(gè)正在開(kāi)發(fā)中的處理skewed join 情況的jira case, HIVE-3086 , HIVE-3286 ,HIVE-3026 . 簡(jiǎn)單介紹一下就是facebook 希望通過(guò)手工處理提前枚舉的方式列出單個(gè)傾斜的值,在join 的時(shí)候?qū)⑦@些值特殊列出當(dāng)作map join 來(lái)處理,對(duì)于其他值使用原來(lái)的方式. 我個(gè)人覺(jué)得這太不伸縮了,值本身沒(méi)有考慮應(yīng)用過(guò)濾條件和優(yōu)化方式之后的數(shù)據(jù)量大小問(wèn)題,他們提前列出的值都是基于整個(gè)分區(qū)的. join key 如果為組合key 的情況也應(yīng)該沒(méi)有考慮,對(duì)metastore 的儲(chǔ)存問(wèn)題有限制,對(duì)輸入的大表和小表都會(huì)scan 兩次( 一次處理非skew key , 一次處理skew key 做map join), 對(duì)輸出表也會(huì)scan 兩次(將兩個(gè)結(jié)果進(jìn)行merge) , skew key 必須提前手工列出這又存在額外維護(hù)的成本,目前因?yàn)檫€沒(méi)有完整的開(kāi)發(fā)完到能夠投入生產(chǎn)的情況,所以等所有特性處理完了有了文檔在看看這個(gè)處理方式是否有效,我個(gè)人認(rèn)為的思路應(yīng)該是接著bucked map join 的思路往下走,只不過(guò)不用提前處理cluster key 的問(wèn)題, 這時(shí)候cluster key 的選擇應(yīng)該是join key + 某個(gè)能分散join key 的列, 這等于將大表的同一個(gè)key 的值分散到了多個(gè)不同的reduce 中,而小表的join key 也必須cluster 到跟大表對(duì)應(yīng)的同一個(gè)key , join 中對(duì)于數(shù)據(jù)分布第二種情況不用太難,增加reduce 個(gè)數(shù)就好,主要是第一種,需要大表的join key 能夠分散,對(duì)于同樣join key 的小表又能夠匹配到所有大表中的記錄. 這種思路就是不用掃描大表兩遍或者結(jié)果輸出表,不需要提前手工處理,數(shù)據(jù)是動(dòng)態(tài)sample 的應(yīng)用了過(guò)濾條件之后的數(shù)據(jù),而不是提前基于統(tǒng)計(jì)數(shù)據(jù)的不準(zhǔn)確結(jié)果. 這個(gè)基本思路跟tenzing 里面描述的distributed hash join 是一樣的,想辦法切成合適的大小然后用hash 和 map join .
方式7: 當(dāng)同時(shí)出現(xiàn)join 和group 的時(shí)候, 那么這兩個(gè)操作應(yīng)該是以pipeline (管道) 的方式執(zhí)行. 在join 的時(shí)候就可以直接使用group 的操作符減少大量的數(shù)據(jù),而不是等待join 完成,然后寫(xiě)入磁盤(pán),group 又讀取磁盤(pán)做group操作. HIVE-2206 正在做這個(gè)優(yōu)化. hive 里面是沒(méi)有pipeline 這個(gè)概念的. 像是cloudera 的crunch 或者twitter 的Scalding 都是有這種概念的.
方式8: distinct 本身就是group by 的一種簡(jiǎn)寫(xiě),我原先以為count(distinct x)這種跟group by 是一樣的,但是發(fā)現(xiàn)hive 里面distinct 明顯比group by 要慢,可能跟group by 會(huì)有map 端的combiner有關(guān), 另外觀(guān)察到hive 在預(yù)估count(distinct x) 的reduce 個(gè)數(shù)比group by 的個(gè)數(shù)要少 , 所以hive 中使用count(distinct x) , 要么盡量把reduce 個(gè)數(shù)設(shè)置大,直接設(shè)置reduce 個(gè)數(shù)或者h(yuǎn)ive.exec.reducers.bytes.per.reducer 調(diào)小,我個(gè)人比較喜歡調(diào)后面一個(gè),hive 目前的reduce 個(gè)數(shù)沒(méi)有統(tǒng)計(jì)信息的情況下就是用map端輸入之前的數(shù)值, 如果你是join 之后還用count(distinct x) 的話(huà),這個(gè)默認(rèn)值一般都會(huì)悲劇,如果有where 條件并能過(guò)濾一定數(shù)量的數(shù)據(jù),那么默認(rèn)reduce 個(gè)數(shù)可能就還好一點(diǎn). 不管怎樣,多浪費(fèi)一點(diǎn)reduce slot 總比等十幾甚至幾十分鐘要好, 或者轉(zhuǎn)換成group by 的寫(xiě)法也不錯(cuò),寫(xiě)成group by 的時(shí)候distributed by 也很有幫助.
方式9: hive 中的index 就是物化視圖,對(duì)于group by 和distinct 的情況等于變成了map 端在做計(jì)算,自然不存在傾斜. 尤其是bitmap index , 對(duì)于唯一值比較少的列優(yōu)勢(shì)更大,不過(guò)index 麻煩的地方在于需要判斷你的sql 是不是常用sql , 另外如果create index 的時(shí)候沒(méi)有選你查詢(xún)的時(shí)候用的字段,這個(gè)index 是不能用的( hive 中是永遠(yuǎn)不可能有DBMS中的用index 去lookup 或者join 原始表這種概念的)
其他建議:
網(wǎng)上能找到的另外一份很好的描述數(shù)據(jù)傾斜的資料是
http://nuage.cs.washington.edu/pubs/opencirrus2011.pdf
里面的map side skew 和expensive record 都不是關(guān)系型計(jì)算中的問(wèn)題,所以不是這篇文章關(guān)注點(diǎn). 對(duì)于關(guān)系型計(jì)算,其中數(shù)據(jù)傾斜影響最大的地方在reduce 的sort. 這篇文章里面最后總結(jié)的5點(diǎn)好的建議值得參考,
其中第三條需要你知道應(yīng)用combiner 和特殊優(yōu)化方式是否帶來(lái)了性能的提升,hive 的map aggr 在數(shù)據(jù)分布情況1效果會(huì)比較好,數(shù)據(jù)分布情況2效果就不大,還有combiner 應(yīng)用的時(shí)候是消耗了系統(tǒng)資源的,確認(rèn)這種消耗是否值得而不是任何情況下都使用combiner.
對(duì)于第四點(diǎn)關(guān)系型計(jì)算中map 傾斜情況不太常見(jiàn). 一種可以舉出來(lái)的例子是分區(qū)不合理,或者h(yuǎn)ive 中的cluster by 的key 選擇不合理(都是使用目錄的方式分區(qū), 目錄是最小處理單元了).
Use domain knowledge when choosing the
map output partitioning scheme if the reduce operation is
expensive: Range partition or some other form of explicit
partition may be better than the default hash-partition
Try different partitioning schemes on sample
workloads or collect the data distribution at the reduce input
if a MapReduce job is expected to run several times
Implement a combiner to reduce the amount
of data going into the reduce-phase and, as such, significantly
dampen the effects of any type of reduce-skew
Use a pre-processing MapReduce job that
extracts properties of the input data in the case of a longruning,
skew-prone map phase. Appropriately partitioning the
data before the real application runs can significantly reduce
skew problems in the map phase.
Best Practice 5. Design algorithms whose runtime depends
only on the amount of input data and not the data distribution.
另外一份是淘寶的數(shù)據(jù)傾斜總結(jié):
http://www.alidata.org/archives/2109
不過(guò)我個(gè)人覺(jué)得幫助不是太大,里面第一個(gè)解決方式空值產(chǎn)生的影響第一個(gè)Union All 的方式個(gè)人是極力反對(duì)的,同一個(gè)表尤其是大表掃描兩遍這額外的成本跟收益太不匹配,不推薦,第二個(gè)將特殊值變成random 的方式, 這個(gè)產(chǎn)生的結(jié)果是正確的嘛? 尤其是在各種情況下輸出結(jié)果是正確的嘛?里面背景好像是那個(gè)小表users 的主鍵為userid, 然后userid 又是join key , 而且還不為空? 不太推薦,背景條件和輸出的正確性與否存疑.
第二個(gè)數(shù)據(jù)類(lèi)型不同的問(wèn)題我覺(jué)得跟HIVE-3445 都算是數(shù)據(jù)建模的問(wèn)題,提前修改好是一樣的.
第三個(gè)是因?yàn)樘詫毜膆adoop 版本中沒(méi)有map side hash aggr 的參數(shù)吧. 而且寫(xiě)成distinct 還多了一個(gè)MR 步驟,不太推薦.
數(shù)據(jù)傾斜在MPP 中也是一個(gè)課題,這也設(shè)計(jì)到一個(gè)數(shù)據(jù)重分配的問(wèn)題,但是相對(duì)于MPP 中有比較成熟的機(jī)制,一個(gè)是mpp 在處理數(shù)據(jù)初始分布的時(shí)候總是會(huì)指定segmented by 或者distributed by 這種顯示分配到不同物理機(jī)器上的建表語(yǔ)句. 還有就是統(tǒng)計(jì)信息會(huì)幫助執(zhí)行引擎選擇合適的重新分布.但是統(tǒng)計(jì)信息也不是萬(wàn)能的,比如
1:統(tǒng)計(jì)信息的粒度和更新問(wèn)題.
2: 應(yīng)用了過(guò)濾條件之后的數(shù)據(jù)也許不符合原始期望的數(shù)據(jù)分布.
3: 統(tǒng)計(jì)信息是基于采樣的,總于真實(shí)所有數(shù)據(jù)存在誤差.
4: 統(tǒng)計(jì)信息是基于partittion 的, 對(duì)于查詢(xún)沒(méi)有涉及到partition 字段的切分就不能使用各partition 只和來(lái)表示總體的統(tǒng)計(jì)信息.
5. 臨時(shí)表或者多步驟查詢(xún)的中間過(guò)程數(shù)據(jù)沒(méi)有統(tǒng)計(jì)信息的情況.
6. 各種其他的算法優(yōu)化比如in-mapper combiner 或者google Tenzing 的prev – next combine 都會(huì)影響統(tǒng)計(jì)信息對(duì)于算法選擇的不同.
感謝各位的閱讀!關(guān)于“Hadoop中數(shù)據(jù)傾斜的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,讓大家可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!