MapReduce源于Google一篇論文,它充分借鑒了“分而治之”的思想,將一個(gè)數(shù)據(jù)處理過(guò)程拆分為主要的Map(映射)與Reduce(歸約)兩步。簡(jiǎn)單地說(shuō),MapReduce就是"任務(wù)的分解與結(jié)果的匯總"。
為云夢(mèng)等地區(qū)用戶提供了全套網(wǎng)頁(yè)設(shè)計(jì)制作服務(wù),及云夢(mèng)網(wǎng)站建設(shè)行業(yè)解決方案。主營(yíng)業(yè)務(wù)為網(wǎng)站建設(shè)、成都網(wǎng)站制作、云夢(mèng)網(wǎng)站設(shè)計(jì),以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠(chéng)的服務(wù)。我們深信只要達(dá)到每一位用戶的要求,就會(huì)得到認(rèn)可,從而選擇與我們長(zhǎng)期合作。這樣,我們也可以走得更遠(yuǎn)!
MapReduce (MR) 是一個(gè)基于磁盤運(yùn)算的框架,賊慢,慢的主要原因:
1)MR是進(jìn)程級(jí)別的,一個(gè)MR任務(wù)會(huì)創(chuàng)建多個(gè)進(jìn)程(map task和reduce task都是進(jìn)程),進(jìn)程的創(chuàng)建和銷毀等過(guò)程需要耗很多的時(shí)間。?
2)磁盤I/O問(wèn)題,? MapReduce作業(yè)通常都是數(shù)據(jù)密集型作業(yè),大量的中間結(jié)果需要寫到磁盤上并通過(guò)網(wǎng)絡(luò)進(jìn)行傳輸,這耗去了大量的時(shí)間。?
注:mapreduce 1.x架構(gòu)有兩個(gè)進(jìn)程:
JobTracker :負(fù)責(zé)資源管理、作業(yè)調(diào)度、監(jiān)控TaskTracker。
TaskTracker:任務(wù)的執(zhí)行者。運(yùn)行 map task 和 reduce task。
在2.x的時(shí)候由yarn取代他們的工作了。
MapReduce工作流程:
HDFS上的文件—>InputFormat—>Map階段—>shuffle階段(橫跨Mapper和Reducer,在Mapper輸出數(shù)據(jù)之前和Reducer接收數(shù)據(jù)之后都有進(jìn)行)—>Reduce階段 —>OutputFormat —>HDFS:output.txt
InputFormat接口:將輸入數(shù)據(jù)進(jìn)行分片(split),輸入分片的大小一般和hdfs的blocksize相同(128M)。
Map階段: Map會(huì)讀取輸入分片數(shù)據(jù),一個(gè)輸入分片(input split)針對(duì)一個(gè)map任務(wù),進(jìn)行map邏輯處理(用戶自定義)
Reduce階段:對(duì)已排序輸出中的每個(gè)鍵調(diào)用reduce函數(shù)。reduce task 個(gè)數(shù)通過(guò)setNumReduceTasks設(shè)定,即mapreduce.job.reduces參數(shù)的默認(rèn)值1。此階段的輸出直接寫到輸出文件系統(tǒng),一般為hdfs。
MapReduce Shffle詳解
為了確保每個(gè)reducer的的輸入都是按鍵排序的,系統(tǒng)執(zhí)行排序的過(guò)程,即將map task的輸出通過(guò)一定規(guī)則傳給reduce task,這個(gè)過(guò)程成為shuffle。
Shuffle階段一部分是在map task 中進(jìn)行的, 這里稱為Map shuffle , 還有一部分是在reduce task 中進(jìn)行的, 這里稱為Reduce shffle。
Map Shuffle階段
Map在做輸出時(shí)候會(huì)在內(nèi)存里開(kāi)啟一個(gè)環(huán)形緩沖區(qū),默認(rèn)大小是100M(參數(shù):mapreduce.task.io.sort.mb),Map中的outputCollect會(huì)把輸出的所有kv對(duì)收集起來(lái),存到這個(gè)環(huán)形緩沖區(qū)中。
環(huán)形緩沖區(qū):本質(zhì)上是一個(gè)首尾相連的數(shù)組,這個(gè)數(shù)組會(huì)被一分為二,一邊用來(lái)寫索引,一邊用來(lái)寫數(shù)據(jù)。一旦這個(gè)環(huán)形緩沖區(qū)中的內(nèi)容達(dá)到閾值(默認(rèn)是0.8,參數(shù):mapreduce.map.sort.spill.percent),一個(gè)后臺(tái)線程就會(huì)把內(nèi)容溢寫(spill)到磁盤上,在這過(guò)程中,map輸出并不會(huì)停止往緩沖區(qū)寫入數(shù)據(jù)(反向?qū)?,到達(dá)閾值后,再反向,以此類推),但如果在此期間緩沖區(qū)被寫滿,map會(huì)被阻塞直到寫磁盤過(guò)程完成。溢寫過(guò)程按照輪詢方式將緩沖區(qū)的內(nèi)容寫到mapred.local.dir指定的作業(yè)特定子目錄中的目錄中,map任務(wù)結(jié)束后刪除。
相關(guān)概念了解:
Combiner: 本地的reducer,運(yùn)行combiner使得map輸出結(jié)果更緊湊,可以減少寫到磁盤的數(shù)據(jù)和傳遞給reducer的數(shù)據(jù)??赏ㄟ^(guò)編程自定義(沒(méi)有定義默認(rèn)沒(méi)有)。適用場(chǎng)景:求和、次數(shù)等 (做 ‘’+‘’ 法的場(chǎng)景) 【如平均數(shù)等場(chǎng)景不適合用】。
Partitioner:分區(qū),按照一定規(guī)則,把數(shù)據(jù)分成不同的區(qū),Partitioner決定map task輸出的數(shù)據(jù)交由哪個(gè)reduce task處理, 分區(qū)規(guī)則可通過(guò)編程自定義,一般自定義一個(gè)partition對(duì)應(yīng)一個(gè)reduce task,默認(rèn)是按照key的hashcode進(jìn)行分區(qū)。注:默認(rèn)reduce task 設(shè)置為1,所以不執(zhí)行partition,執(zhí)行partition操作時(shí)會(huì)先判斷reduce task是否大于1 。
Spill:每次溢寫會(huì)生成一個(gè)溢寫文件(spill file),因此在map任務(wù)寫完其最后一個(gè)輸出記錄之后,會(huì)有多個(gè)溢寫文件。在Map 任務(wù)完成前,所有的spill file將會(huì)進(jìn)行歸并排序?yàn)橐粋€(gè)分區(qū)且有序的文件。這是一個(gè)多路歸并過(guò)程,最大歸并路數(shù)由默認(rèn)是10(參數(shù):mapreduce.task.io.sort.factor)。如果有定義combiner,且至少存在3個(gè)(參數(shù):mapreduce.map.combine.minspills )溢出文件時(shí),則combiner就會(huì)在輸出文件寫到磁盤之前再次運(yùn)行。當(dāng)spill 文件歸并完畢后,Map 將刪除所有的臨時(shí)spill 文件,通知appmaster, map task已經(jīng)完成。
Map階段壓縮:在將壓縮map輸出寫到磁盤的過(guò)程中對(duì)它進(jìn)行壓縮加快寫磁盤的速度、更加節(jié)約時(shí)間、減少傳給reducer的數(shù)據(jù)量。將mapreduce.map.output.compress設(shè)置為true(默認(rèn)為false),就可以啟用這個(gè)功能。使用的壓縮庫(kù)由參數(shù)mapreduce.map.output.compress.codec指定。注:此時(shí)建議優(yōu)先使用效率比較高的壓縮模式。
Reduce Shuffle階段
????Reducer是通過(guò)HTTP的方式得到輸出文件的分區(qū)。使用netty進(jìn)行數(shù)據(jù)傳輸(RPC協(xié)議),默認(rèn)情況下netty的工作線程數(shù)是處理器數(shù)的2倍。一個(gè)reduce task 對(duì)應(yīng)一個(gè)分區(qū)。
????
????在reduce端獲取所有的map輸出之前,Reduce端的線程會(huì)周期性的詢問(wèn)appmaster 關(guān)于map的輸出。App Master是知道m(xù)ap的輸出和host之間的關(guān)系。在reduce端獲取所有的map輸出之前,Reduce端的線程會(huì)周期性的詢問(wèn)master 關(guān)于map的輸出。Reduce并不會(huì)在獲取到map輸出之后就立即刪除hosts,因?yàn)閞educe有可能運(yùn)行失敗。相反,是等待appmaster的刪除消息來(lái)決定刪除host。
????
????當(dāng)map任務(wù)的完成數(shù)占總map任務(wù)的0.05(參數(shù):mapreduce.job.reduce.slowstart.completedmaps),reduce任務(wù)就開(kāi)始復(fù)制它的輸出,復(fù)制階段把Map輸出復(fù)制到Reducer的內(nèi)存或磁盤。復(fù)制線程的數(shù)量由mapreduce.reduce.shuffle.parallelcopies參數(shù)來(lái)決定,默認(rèn)是 5。
????
????如果map輸出相當(dāng)小,會(huì)被復(fù)制到reduce任務(wù)JVM的內(nèi)存(緩沖區(qū)大小由mapreduce.reduce.shuffle.input.buffer.percent屬性控制,指定用于此用途的堆空間的百分比,默認(rèn)為0.7),如果緩沖區(qū)空間不足,map輸出會(huì)被復(fù)制到磁盤。一旦內(nèi)存緩沖區(qū)達(dá)到閾值(參數(shù):mapreduce.reduce.shuffle.merge.percent,默認(rèn)0.66)或達(dá)到map的輸出閾值(參數(shù):mapreduce.reduce.merge.inmem.threshold,默認(rèn)1000)則合并后溢寫到磁盤中。如果指定combiner,則在合并期間運(yùn)行它已降低寫入磁盤的數(shù)據(jù)量。隨著磁盤上副本的增多,后臺(tái)線程會(huì)將它們合并為更大的,排序好的文件。注:為了合并,壓縮的map輸出都必須在內(nèi)存中解壓縮。
????復(fù)制完所有的map輸出后,reduce任務(wù)進(jìn)入歸并排序階段,這個(gè)階段將合并map的輸出,維持其順序排序。這是循環(huán)進(jìn)行的。目標(biāo)是合并最小數(shù)據(jù)量的文件以便最后一趟剛好滿足合并系數(shù)(參數(shù):mapreduce.task.io.sort.factor,默認(rèn)10)。
????
????因此,如果有40個(gè)文件(包括磁盤和內(nèi)存),不會(huì)在四趟中每趟合并10個(gè)文件而得到4個(gè)文件,再將4個(gè)文件合并到reduce。而是第一趟只合并4個(gè)文件,隨后的三塘合并10個(gè)文件。最后一趟中,4個(gè)已經(jīng)合并的文件和剩余的6個(gè)文件合計(jì)十個(gè)文件直接合并到reduce。
????
????這并沒(méi)有改變合并的次數(shù),它只是一個(gè)優(yōu)化措施,盡量減少寫到磁盤的數(shù)據(jù)量。因?yàn)樽詈笠惶丝偸侵苯雍喜⒌絩educe,沒(méi)有磁盤往返。
????至此,Shuffle階段結(jié)束。
Shuffle總結(jié)
????1)map task收集map()方法輸出的kv對(duì),放到內(nèi)存環(huán)形緩沖區(qū)中
????2)從內(nèi)存環(huán)形緩沖區(qū)不斷將文件經(jīng)過(guò)分區(qū)、排序、combine(可選)溢寫(spill)到本地磁盤
????3)多個(gè)溢出文件會(huì)歸并排序成大的spill file
????4)reduce task根據(jù)自己的分區(qū)號(hào),去各個(gè)map task機(jī)器上取相應(yīng)的結(jié)果分區(qū)數(shù)據(jù)
????5)reduce task會(huì)取到同一個(gè)分區(qū)的來(lái)自不同maptask的結(jié)果文件,reduce task會(huì)將這些文件再進(jìn)行歸并排序
????6)合并成大文件后,shuffle過(guò)程結(jié)束
MapReduce調(diào)優(yōu)
輸入階段:處理小文件問(wèn)題:
Map階段:
? ? 1)減少溢寫(spill)次數(shù)。
? ? 2)減少合并(merge)次數(shù)。
? ? 3)不影響業(yè)務(wù)邏輯前提下,設(shè)置combine。
? ? 4)啟用壓縮。
Reduce階段:
? ? 1)合理設(shè)置map和reduce數(shù)。
? ? 2)合理設(shè)置map、reduce共存。
? ? 3)規(guī)避使用reduce:因?yàn)閞educe在用于連接數(shù)據(jù)集的時(shí)候?qū)?huì)產(chǎn)生大量的網(wǎng)絡(luò)消耗。
? ? 4)合理設(shè)置reduce端的buffer:默認(rèn)情況下,數(shù)據(jù)達(dá)到一個(gè)閾值的時(shí)候,buffer中的數(shù)據(jù)就會(huì)寫入磁盤,然后reduce會(huì)從磁盤中獲得所有的數(shù)據(jù)。也就是說(shuō),buffer和reduce是沒(méi)有直接關(guān)聯(lián)的,中間多個(gè)一個(gè)寫磁盤->讀磁盤的過(guò)程,既然有這個(gè)弊端,那么就可以通過(guò)參數(shù)來(lái)配置,使得buffer中的一部分?jǐn)?shù)據(jù)可以直接輸送到reduce,從而減少IO開(kāi)銷。