MapReduce如何實現(xiàn)Reduce端重分區(qū)Join操作優(yōu)化,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。
成都創(chuàng)新互聯(lián)公司是專業(yè)的巴彥淖爾網(wǎng)站建設(shè)公司,巴彥淖爾接單;提供做網(wǎng)站、網(wǎng)站設(shè)計,網(wǎng)頁設(shè)計,網(wǎng)站設(shè)計,建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行巴彥淖爾網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊,希望更多企業(yè)前來合作!一、重分區(qū)Join操作(Reduce端)
本文介紹的第一種方法是最基本的重分區(qū)Join操作,該方法允許執(zhí)行內(nèi)部和外部Join。開始之前,我們先搞清楚要解決的問題是將大型數(shù)據(jù)集Join在一起,我們選用的解決方案是Reduce端重分區(qū)Join。該方法是一種Reduce端Join實現(xiàn),利用MapReduce的sortmerge將記錄組合在一起,作為單個MapReduce作業(yè)實現(xiàn),可支持N路連接,其中N是要連接的數(shù)據(jù)集數(shù)量。
Map端負(fù)責(zé)從數(shù)據(jù)集中讀取數(shù)據(jù),確定每個Join操作的value,并將該value的key輸出,輸出key包含在reducer中并將數(shù)據(jù)集組合在一起以生成最終結(jié)果。
單個reducer調(diào)用接收map函數(shù)Join操作發(fā)出的Key對應(yīng)的所有值,并將數(shù)據(jù)分N個分區(qū),其中N是要連接的數(shù)據(jù)集數(shù)量。reducer讀取連接value的所有輸入并將它們分區(qū)到內(nèi)存中,然后跨所有分區(qū)執(zhí)行笛卡爾積,并發(fā)出每個Join操作的結(jié)果。
圖6.10 重分區(qū)Join操作的基本MapReduce實現(xiàn)
MapReduce代碼要支持這種技術(shù),需要滿足以下條件:
支持多個map類,每個map類處理不同的輸入數(shù)據(jù)集,這是通過使用MultipleInputs類完成的。
需要一種方法來標(biāo)記mapper發(fā)出的記錄,以便可以與其原點的數(shù)據(jù)集相關(guān)聯(lián),本文將使用htuple項目處理MapReduce中的數(shù)據(jù)。
重分區(qū)Join操作的代碼如下:
可以使用以下命令運行作業(yè)并查看輸出:
總結(jié)
Hadoop捆綁了一個hadoop-datajoin模塊,這是一個重分區(qū)Join操作框架,包括用于處理多個輸入數(shù)據(jù)集和執(zhí)行Join操作的管道。上述操作示例及hadoop-datajoin代碼是重分區(qū)Join的最基本形式,兩者都要求在執(zhí)行笛卡爾積之前將連接key的所有數(shù)據(jù)加載到內(nèi)存中,但如果連接key的基數(shù)大于可用內(nèi)存,那么,這種方法就不太適用。下一個技術(shù)將著眼解決此問題。
二、優(yōu)化重分區(qū)Join操作
舊版重分區(qū)Join操作實現(xiàn)會浪費大量空間,需要將給定key的所有value加載到內(nèi)存中才能執(zhí)行多路連接,將較小的數(shù)據(jù)集加載到內(nèi)存中才能迭代更大的數(shù)據(jù)集,沿途執(zhí)行Join更有效。
我們希望在MapReduce中執(zhí)行重分區(qū)Join,且無需緩存reducer中的所有記錄。優(yōu)化后的重分區(qū)Join框架將僅緩存要連接的其中一個數(shù)據(jù)集,以減少reducer中緩存的數(shù)據(jù)量。此優(yōu)化僅緩存來自兩個數(shù)據(jù)集中較小者的記錄,以減少緩存所有記錄的內(nèi)存開銷,圖6.11顯示了改進(jìn)的重分區(qū)Join實現(xiàn)。
圖6.11 重分區(qū)Join操作優(yōu)化MapReduce實現(xiàn)
該技術(shù)與舊版相比存在一定差異,此處使用輔助排序確保來自較小數(shù)據(jù)集的所有記錄在較大數(shù)據(jù)集的記錄之前到達(dá)reducer,以此來盡可能減少reducer中要緩存的數(shù)據(jù)量。此外,mapper會發(fā)出需要進(jìn)行Join操作的用戶名元組的key以及標(biāo)識原始數(shù)據(jù)集的字段。
以下代碼顯示了一個新的枚舉,顯示了用戶mapper如何填充元組字段:
需要更新MapReduce驅(qū)動程序代碼以指示元組中的哪些字段應(yīng)用于排序、分區(qū)和分組:
分區(qū)程序應(yīng)僅基于用戶名進(jìn)行分區(qū),以便用戶的所有記錄都到達(dá)同一個reducer。
排序應(yīng)使用用戶名和數(shù)據(jù)集指示符,以便首先排序較小的數(shù)據(jù)集(由于USERS常量小于USER_LOGS常量,導(dǎo)致用戶記錄在用戶登錄之前排序)。
分組應(yīng)對用戶進(jìn)行分組,以便將兩個數(shù)據(jù)集都流式傳輸?shù)酵粋€reducer調(diào)用:
最后,我們要修改reducer以緩存?zhèn)魅氲挠脩粲涗?,然后將其與用戶日志Join:
可以使用以下命令來運行作業(yè)并查看輸出:
Hive
在執(zhí)行重分區(qū)Join操作時,Hive可支持類似優(yōu)化。Hive可緩存Join鍵的所有數(shù)據(jù)集,然后流式傳輸大型數(shù)據(jù)集,使其不需要存儲在內(nèi)存中。假定在查詢時,Hive最后指定的數(shù)據(jù)集大。想象一下,你有兩個名為users和user_logs的表,而user_logs要大得多。要連接這些表,我們需要確保user_logs表被引用為查詢中的最后一個:
如果不想重新查詢,可以使用STREAMTABLE提示告訴Hive哪個表更大:
總結(jié)
此操作實現(xiàn)通過僅緩沖較小數(shù)據(jù)集的value來改進(jìn)早期技術(shù),但它仍然存在數(shù)據(jù)在map和reducer之間的傳輸問題,這是一個昂貴的網(wǎng)絡(luò)成本。此外,舊版可以支持N路連接,但是這種實現(xiàn)僅支持雙向連接。
三、使用Bloom過濾器來減少混洗數(shù)據(jù)
如果希望根據(jù)某些謂詞對數(shù)據(jù)子集執(zhí)行Join操作,例如“僅限居住在加利福尼亞地區(qū)的用戶”。到目前為止,我們還必須在reducer中執(zhí)行過濾器才可以實現(xiàn)這一目的 ,因為只有一個數(shù)據(jù)集存放了有關(guān)狀態(tài)的詳細(xì)信息——用戶日志沒有該信息。接下來,我將介紹如何在map端使用Bloom過濾器,這會對作業(yè)執(zhí)行時間產(chǎn)生很大影響。我要解決的問題是在重分區(qū)Join操作中過濾數(shù)據(jù),但要將該過濾器推送到mapper。一個可行的解決方案是使用預(yù)處理作業(yè)創(chuàng)建Bloom過濾器,然后在重分區(qū)作業(yè)中加載Bloom過濾器以過濾mapper中的記錄。
Bloom過濾器是一種非常有用的隨機(jī)數(shù)據(jù)結(jié)構(gòu),它利用位數(shù)組簡潔表明集合,并能判斷一個元素是否屬于該集合。然而,與Java中的HashSet相比,Bloom需要的內(nèi)存要少得多,因此它們非常適合處理大型數(shù)據(jù)集。此解決方案有兩個步驟,一是運行作業(yè)來生成Bloom過濾器,該過濾器將對用戶數(shù)據(jù)進(jìn)行操作,并由居住在加利福尼亞地區(qū)的用戶填充;二是在重分區(qū)Join操作中使用此Bloom過濾器丟棄不需要的用戶,該過程需要Bloom過濾器的原因是用戶日志的mapper沒有狀態(tài)的詳細(xì)信息。
圖6.12 在重分區(qū)Join中使用Bloom過濾器的兩步過程
第1步:創(chuàng)建Bloom過濾器
第一個作業(yè)是創(chuàng)建Bloom過濾器,其中包含加利福尼亞州的用戶名。mapper生成中間Bloom過濾器,reducer將其組合成一個Bloom過濾器,作業(yè)輸出是包含序列化Bloom過濾器的Avro文件:
第2步:重分區(qū)Join
重分區(qū)Join與上文提到的唯一區(qū)別是mapper加載第一步中生成的Bloom過濾器,并且在處理map記錄時,執(zhí)行針對Bloom過濾器的元素審查以確定是否應(yīng)將記錄發(fā)送給reducer。以下代碼顯示了兩件事:一般化Bloom過濾器加載、抽象mapper以及支持兩個Join數(shù)據(jù)集的子類:
以下命令運行兩個作業(yè)并轉(zhuǎn)儲Join輸出:
總結(jié)
該技術(shù)提出了一種在兩個數(shù)據(jù)集上執(zhí)行map端過濾的有效方法,以最小化mapper和reducer之間的網(wǎng)絡(luò)I/O。作為shuffle的一部分,它還減少了mapper和reducer的磁盤溢出數(shù)據(jù)量。過濾器通常是加速和優(yōu)化作業(yè)最簡單有效的方法,重分區(qū)Join也同樣適用于其他MapReduce作業(yè)。
四、reducer端Join操作可能發(fā)生數(shù)據(jù)傾斜
數(shù)據(jù)傾斜是實際操作中很容易碰到的問題,可能存在兩種類型的數(shù)據(jù)傾斜:
高Join-key基數(shù),其中有一些連接key在一個或兩個數(shù)據(jù)集中具有大量記錄,我把這種稱之為join-product偏差。
糟糕的散列分區(qū),少數(shù)reducer在總記錄數(shù)中占很大比例,我將此稱為散列分區(qū)傾斜。
五、加入具有高連接密鑰基數(shù)的大型數(shù)據(jù)集
這種技術(shù)解決了join-product的傾斜問題,下一個技術(shù)檢查了散列分區(qū)偏差?,F(xiàn)在面臨的問題是某些連接key是高基數(shù)的,這會導(dǎo)致某些reducer在嘗試緩存這些key時耗盡內(nèi)存。我們可以過濾掉這些key并將它們單獨連接或?qū)⑵湟绯龅絩educer中并安排后續(xù)作業(yè)Join。
如果提前知道了哪些Key是高基數(shù)的,則可以將其分成單獨的Join作業(yè),如果不確定高基數(shù)Key是哪些,則可能需要在reducer中構(gòu)建智能檢測并將其寫入副本文件,該文件由后續(xù)作業(yè)Join,如圖6.14所示。
圖6.13 提前知道高基數(shù)密鑰時處理傾斜
圖6.14 提前知道高基數(shù)密鑰處理時的偏差
Hive
Hive支持類似于第二種方法的偏斜緩解策略,運行作業(yè)之前可指定以下配置啟用:
可以選擇設(shè)置一些其他配置來控制在高基數(shù)key上運行的map端連接:
最后,如果在SQL中使用GROUP BY,可能還需要考慮啟用以下配置來處理分組數(shù)據(jù)中的偏差:
總結(jié)
此技術(shù)假設(shè)給定的Join鍵,只有一個數(shù)據(jù)集具有高基數(shù)出現(xiàn),因此可緩存較小數(shù)據(jù)集的map端連接。如果兩個數(shù)據(jù)集都是高基數(shù)的,那么將面臨一個昂貴的笛卡爾積運算,執(zhí)行起來會很慢,因為它不適合MapReduce的工作方式(這意味著它本身不可拆分和可并行化)。在這種情況下,我們應(yīng)該重新檢查是否有任何技術(shù)(如過濾或投影)可幫助減少執(zhí)行join所需的時間。
六、處理由散列分區(qū)生成的偏差
MapReduce的默認(rèn)分區(qū)程序是一個散列分區(qū)程序,接受每個map輸出key的散列,并對reducer數(shù)量建模,以確定key被發(fā)送到哪個reducer。散列分區(qū)程序可以很好地用作通用分區(qū)程序,但是有些數(shù)據(jù)集可能會導(dǎo)致散列分區(qū)程序因一些不成比例的密鑰散列到同一個reducer而使其重載。與大多數(shù)reducer相比,這些reducer需要更長時間才能完成。此外,當(dāng)檢查straggler reducer計數(shù)器時,會注意到發(fā)送給落后者的組數(shù)遠(yuǎn)遠(yuǎn)高于已完成的其他組。
區(qū)分高基數(shù)key與散列分區(qū)引起的偏差可以使用MapReduce reducer來識別數(shù)據(jù)傾斜類型。由性能較差的哈希分區(qū)器引入的偏差將具有更多的組(唯一密鑰)發(fā)送到這些reducer,而導(dǎo)致傾斜的高基數(shù)密鑰可以通過所有reducer中大致相等數(shù)量的組來證明,傾斜越多,reducer的記錄數(shù)量越多。
我們要解決的問題是reducer端連接需要很長時間才能完成,而落后的組需要比大多數(shù)reducer更長時間。使用范圍分區(qū)程序或編寫自定義分區(qū)程序,將偏移的key集中到一組reducer。此解決方案的目標(biāo)是省去默認(rèn)的散列分區(qū)程序,并將其替換為可以更好處理數(shù)據(jù)傾斜的內(nèi)容,本文提供兩個選項可供探索:
使用與Hadoop捆綁在一起的sampler和TotalOrderPartitioner,將散列分區(qū)程序替換為范圍分區(qū)程序。
編寫自定義分區(qū)程序,將具有數(shù)據(jù)傾斜的key路由到為傾斜key保留的Reducer。
范圍分區(qū)法
范圍分區(qū)根據(jù)預(yù)定義值分配map輸出,其中每個map接收該范圍內(nèi)的所有reducer,這正是TotalOrderPartitioner的工作原理。實際上,TeraSort使用TotalOrderPartitioner在所有Reducer之間均勻分布,以大限度減少數(shù)據(jù)傾斜。TotalOrderPartitioner附帶采樣器,可對輸入數(shù)據(jù)進(jìn)行采樣并將其寫入HDFS,然后在分區(qū)時由TotalOrderPartitioner使用。
自定義分區(qū)法
如果已經(jīng)知道哪些Key顯示數(shù)據(jù)傾斜,并且該組Key是靜態(tài)的,則可以編寫自定義分區(qū)程序以將這些高基數(shù)key推送到一組reducer。
看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進(jìn)一步的了解或閱讀更多相關(guān)文章,請關(guān)注創(chuàng)新互聯(lián)-成都網(wǎng)站建設(shè)公司行業(yè)資訊頻道,感謝您對創(chuàng)新互聯(lián)的支持。