1,什么是mapreduce ?
成都創(chuàng)新互聯(lián)主營淮安網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,重慶APP開發(fā)公司,淮安h5小程序設(shè)計(jì)搭建,淮安網(wǎng)站營銷推廣歡迎淮安等地區(qū)企業(yè)咨詢
Mapreduce是一種編程模型,是一種編程方法,抽象理論。
hadoop要分布式包括兩部分,一是分布式文件系統(tǒng)hdfs,一部是分布式計(jì)算框,就是mapreduce,缺一不可,也就是說,可以通過mapreduce很容易在hadoop平臺(tái)上進(jìn)行分布式的計(jì)算編程。MR由兩個(gè)階段組成,map和reduce,用戶只需要實(shí)現(xiàn)map()和reduce()兩個(gè)函數(shù),即可實(shí)現(xiàn)分布式計(jì)算
2,mapreduce工作原理和執(zhí)行步驟見下:
Shuffle的本意是洗牌、混亂的意思,類似于java中的Collections.shuffle(List)方法,它會(huì)隨機(jī)地打亂參數(shù)list里的元素順序。MapReduce中的Shuffle過程。
所謂Shuffle過程可以大致的理解成:怎樣把map task的輸出結(jié)果有效地傳送到reduce輸入端。也可以這樣理解, Shuffle描述著數(shù)據(jù)從map task輸出到reduce task輸入的這段過程。
上圖表示的是Shuffle的整個(gè)過程。在Hadoop這樣的集群環(huán)境中,大部分map task與reduce task的執(zhí)行是在不同的節(jié)點(diǎn)上。當(dāng)然很多情況下Reduce執(zhí)行時(shí)需要跨節(jié)點(diǎn)去讀取其它節(jié)點(diǎn)上的map task結(jié)果,并存儲(chǔ)到本地。如果集群正在運(yùn)行的job有很多,那么task的正常執(zhí)行對集群內(nèi)部的網(wǎng)絡(luò)資源消耗會(huì)很嚴(yán)重。這種網(wǎng)絡(luò)消耗是正常的,我們不能限制,能做的就是最大化地減少不必要的消耗。另外在節(jié)點(diǎn)內(nèi),相比于內(nèi)存,磁盤IO對job完成時(shí)間的影響也是比較大的,spark 就是基于這點(diǎn)對hadoop做出了改進(jìn),將map和reduce的所有任務(wù)都在內(nèi)存中進(jìn)行,并且中間接過都保存在內(nèi)存中,從而比hadoop的速度要快100倍以上。從最基本的要求來說,我們對Shuffle過程希望做到:
完整地從map task端讀取數(shù)據(jù)到reduce 端。
在跨節(jié)點(diǎn)讀取數(shù)據(jù)時(shí),盡可能地減少對帶寬的不必要消耗。
減少磁盤IO對task執(zhí)行的影響。
Shuffle實(shí)際上包括map端和reduce端的兩個(gè)過程,在map端中我們稱之為前半段,在reduce端我們稱之為后半段。
Shuffle前半段過程主要包括:
1、split過程
2、partition過程:partition是分割map每個(gè)節(jié)點(diǎn)的結(jié)果,按照key分別映射給不同的reduce,也是可以自定義的。
3、溢寫過程
4、Merge過程
每個(gè)map task都有一個(gè)內(nèi)存緩沖區(qū),存儲(chǔ)著map的輸出結(jié)果,當(dāng)緩沖區(qū)快滿的時(shí)候需要將緩沖區(qū)的數(shù)據(jù)以一個(gè)臨時(shí)文件的方式存放到磁盤,當(dāng)整個(gè)map task結(jié)束后再對磁盤中這個(gè)map task產(chǎn)生的所有臨時(shí)文件做合并,生成最終的正式輸出文件,然后等待reduce task來讀取數(shù)據(jù)。 下面可以將Shuffle過程主要分為四個(gè)步驟:(結(jié)合WordCount的例子來進(jìn)行說明)
1、split過程:在map task執(zhí)行時(shí),它的輸入數(shù)據(jù)來源于HDFS的block,當(dāng)然在MapReduce概念中,map task只讀取split。Split與block的對應(yīng)關(guān)系可能是多對一,默認(rèn)是一對一。在WordCount例子里,假設(shè)map的輸入數(shù)據(jù)都是像“aaa”這樣的字符串。
2、partiton過程:在經(jīng)過mapper的運(yùn)行后,我們得知mapper的輸出是這樣一個(gè)key/value對: key是“aaa”, value是數(shù)值1。因?yàn)楫?dāng)前map端只做加1的操作,在reduce task里才去合并結(jié)果集。前面我們知道這個(gè)job有3個(gè)reduce task,到底當(dāng)前的“aaa”應(yīng)該交由哪個(gè)reduce去做呢,這個(gè)主要有partition來決定。下面就說明如何決定由哪個(gè)reduce去做這個(gè)事情。
MapReduce提供Partitioner接口,它的作用就是根據(jù)key或value及reduce的數(shù)量來決定當(dāng)前的這對輸出數(shù)據(jù)最終應(yīng)該交由哪個(gè)reduce task處理。默認(rèn)是對key hash后再以reduce task數(shù)量取模。默認(rèn)的取模方式只是為了平均reduce的處理能力,如果用戶自己對Partitioner有需求,可以自己重新實(shí)現(xiàn)partition的接口并設(shè)置到j(luò)ob上即可。
在我們的例子中,“aaa”經(jīng)過Partitioner后返回0,也就是這對值應(yīng)當(dāng)交由第一個(gè)reducer來處理。接下來,需要將數(shù)據(jù)寫入內(nèi)存緩沖區(qū)中,緩沖區(qū)的作用是批量收集map結(jié)果,減少磁盤IO的影響。我們的key/value對以及Partition的結(jié)果都會(huì)被寫入緩沖區(qū)。當(dāng)然寫入之前,key與value值都會(huì)被序列化成字節(jié)數(shù)組。
3、溢寫過程:這個(gè)內(nèi)存緩沖區(qū)是有大小限制的,默認(rèn)是100MB,也可以通過設(shè)置配置文件中的參數(shù)mapreduce.task.io.sort.mb來設(shè)置。當(dāng)map task的輸出結(jié)果很多時(shí),就可能會(huì)撐爆內(nèi)存,所以需要在一定條件下將緩沖區(qū)中的數(shù)據(jù)臨時(shí)寫入磁盤,然后重新利用這塊緩沖區(qū)。這個(gè)從內(nèi)存往磁盤寫數(shù)據(jù)的過程被稱為Spill,中文可譯為溢寫,字面意思很直觀。這個(gè)溢寫是由單獨(dú)線程來完成,不影響往緩沖區(qū)寫map結(jié)果的線程。溢寫線程啟動(dòng)時(shí)不應(yīng)該阻止map的結(jié)果輸出,所以整個(gè)緩沖區(qū)有個(gè)溢寫的比例spill.percent。這個(gè)比例默認(rèn)是0.8,也就是當(dāng)緩沖區(qū)的數(shù)據(jù)已經(jīng)達(dá)到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫線程啟動(dòng),鎖定這80MB的內(nèi)存,執(zhí)行溢寫過程。Map task的輸出結(jié)果還可以往剩下的20MB內(nèi)存中寫,互不影響。
當(dāng)溢寫線程啟動(dòng)后,需要對這80MB空間內(nèi)的key做排序(Sort)。排序是MapReduce模型默認(rèn)的行為,這里的排序也是對序列化的字節(jié)做的排序。
在這里我們可以想想,因?yàn)閙ap task的輸出是需要發(fā)送到不同的reduce端去,而內(nèi)存緩沖區(qū)沒有對將發(fā)送到相同reduce端的數(shù)據(jù)做合并,那么這種合并應(yīng)該是體現(xiàn)是磁盤文件中的。從官方圖上也可以看到寫到磁盤中的溢寫文件是對不同的reduce端的數(shù)值做過合并。所以溢寫過程一個(gè)很重要的細(xì)節(jié)在于,如果有很多個(gè)key/value對需要發(fā)送到某個(gè)reduce端去,那么需要將這些key/value值拼接到一塊,減少與partition相關(guān)的索引記錄。
在針對每個(gè)reduce端而合并數(shù)據(jù)時(shí),有些數(shù)據(jù)可能像這樣:“aaa”/1, “aaa”/1。對于WordCount例子,就是簡單地統(tǒng)計(jì)單詞出現(xiàn)的次數(shù),如果在同一個(gè)map task的結(jié)果中有很多個(gè)像“aaa”一樣出現(xiàn)多次的key,我們就應(yīng)該把它們的值合并到一塊,這個(gè)過程叫reduce也叫combine。但MapReduce的術(shù)語中,reduce只指reduce端執(zhí)行從多個(gè)map task取數(shù)據(jù)做計(jì)算的過程。除reduce外,非正式地合并數(shù)據(jù)只能算做combine了。其實(shí)大家知道的,MapReduce中將Combiner等同于Reducer。
如果client設(shè)置過Combiner,那么現(xiàn)在就是使用Combiner的時(shí)候了。將有相同key的key/value對的value加起來,減少溢寫到磁盤的數(shù)據(jù)量。Combiner會(huì)優(yōu)化MapReduce的中間結(jié)果,所以它在整個(gè)模型中會(huì)多次使用。那么哪些場景才能使用Combiner呢?從這里分析,Combiner的輸出是Reducer的輸入,Combiner絕不能改變最終的計(jì)算結(jié)果。所以從我的想法來看,Combiner只應(yīng)該用于那種Reduce的輸入key/value與輸出key/value類型完全一致,且不影響最終結(jié)果的場景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它對job執(zhí)行效率有幫助,反之會(huì)影響reduce的最終結(jié)果。
4、merge過程:merge是將多個(gè)溢寫文件合并到一個(gè)文件。每次溢寫會(huì)在磁盤上生成一個(gè)溢寫文件,如果map的輸出結(jié)果真的很大,有多次這樣的溢寫發(fā)生,磁盤上相應(yīng)的就會(huì)有多個(gè)溢寫文件存在。當(dāng)map task真正完成時(shí),內(nèi)存緩沖區(qū)中的數(shù)據(jù)也全部溢寫到磁盤中形成一個(gè)溢寫文件。最終磁盤中會(huì)至少有一個(gè)這樣的溢寫文件存在(如果map的輸出結(jié)果很少,當(dāng)map執(zhí)行完成時(shí),只會(huì)產(chǎn)生一個(gè)溢寫文件),因?yàn)樽罱K的文件只有一個(gè),所以需要將這些溢寫文件歸并到一起,這個(gè)過程就叫做Merge。Merge是怎樣的?如前面的例子,“aaa”從某個(gè)map task讀取過來時(shí)值是5,從另外一個(gè)map 讀取時(shí)值是8,因?yàn)樗鼈冇邢嗤膋ey,所以得merge成group。什么是group。對于“aaa”就是像這樣的:{“aaa”, [5, 8, 2, …]},數(shù)組中的值就是從不同溢寫文件中讀取出來的,然后再把這些值加起來。請注意,因?yàn)閙erge是將多個(gè)溢寫文件合并到一個(gè)文件,所以可能也有相同的key存在,在這個(gè)過程中如果client設(shè)置過Combiner,也會(huì)使用Combiner來合并相同的key。
至此,map端的所有工作都已結(jié)束,最終生成的這個(gè)文件也存放在TaskTracker夠得著的某個(gè)本地目錄內(nèi)。每個(gè)reduce task不斷地通過RPC從JobTracker那里獲取map task是否完成的信息,如果reduce task得到通知,獲知某臺(tái)TaskTracker上的map task執(zhí)行完成,Shuffle的后半段過程開始啟動(dòng)。
簡單地說,reduce task在執(zhí)行之前的工作就是不斷地拉取當(dāng)前job里每個(gè)map task的最終結(jié)果,然后對從不同地方拉取過來的數(shù)據(jù)不斷地做merge,也最終形成一個(gè)文件作為reduce task的輸入文件。
Shuffle在reduce端的過程也能用三點(diǎn)來概括。當(dāng)前reduce copy數(shù)據(jù)的前提是它要從JobTracker獲得有哪些map task已執(zhí)行結(jié)束。Reducer真正運(yùn)行之前,所有的時(shí)間都是在拉取數(shù)據(jù),做merge,且不斷重復(fù)地在做。如前面的方式一樣,下面我也分段地描述reduce 端的Shuffle細(xì)節(jié):
1. Copy過程,簡單地拉取數(shù)據(jù)。Reduce進(jìn)程啟動(dòng)一些數(shù)據(jù)copy線程(Fetcher),通過HTTP方式請求map task所在的TaskTracker獲取map task的輸出文件。因?yàn)閙ap task早已結(jié)束,這些文件就歸TaskTracker管理在本地磁盤中。
2. Merge階段。這里的merge如map端的merge動(dòng)作,只是數(shù)組中存放的是不同map端copy來的數(shù)值。Copy過來的數(shù)據(jù)會(huì)先放入內(nèi)存緩沖區(qū)中,這里的緩沖區(qū)大小要比map端的更為靈活,它基于JVM的heap size設(shè)置,因?yàn)镾huffle階段Reducer不運(yùn)行,所以應(yīng)該把絕大部分的內(nèi)存都給Shuffle用。這里需要強(qiáng)調(diào)的是,merge有三種形式:1)內(nèi)存到內(nèi)存 2)內(nèi)存到磁盤 3)磁盤到磁盤。默認(rèn)情況下第一種形式不啟用,讓人比較困惑,是吧。當(dāng)內(nèi)存中的數(shù)據(jù)量到達(dá)一定閾值,就啟動(dòng)內(nèi)存到磁盤的merge。與map 端類似,這也是溢寫的過程,這個(gè)過程中如果你設(shè)置有Combiner,也是會(huì)啟用的,然后在磁盤中生成了眾多的溢寫文件。第二種merge方式一直在運(yùn)行,直到?jīng)]有map端的數(shù)據(jù)時(shí)才結(jié)束,然后啟動(dòng)第三種磁盤到磁盤的merge方式生成最終的那個(gè)文件。
3. Reducer的輸入文件。不斷地merge后,最后會(huì)生成一個(gè)“最終文件”。為什么加引號?因?yàn)檫@個(gè)文件可能存在于磁盤上,也可能存在于內(nèi)存中。對我們來說,當(dāng)然希望它存放于內(nèi)存中,直接作為Reducer的輸入,但默認(rèn)情況下,這個(gè)文件是存放于磁盤中的。當(dāng)Reducer的輸入文件已定,整個(gè)Shuffle才最終結(jié)束。然后就是Reducer執(zhí)行,把結(jié)果放到HDFS上。
#######################總結(jié)############################
Shuffle產(chǎn)生的意義是什么?x
Shuffle過程的期望可以有:
完整地從map task端拉取數(shù)據(jù)到reduce 端。
在跨節(jié)點(diǎn)拉取數(shù)據(jù)時(shí),盡可能地減少對帶寬的不必要消耗。
減少磁盤IO對task執(zhí)行的影響。
每個(gè)map task都有一個(gè)內(nèi)存緩沖區(qū),存儲(chǔ)著map的輸出結(jié)果,當(dāng)緩沖區(qū)快滿的時(shí)候需要將緩沖區(qū)的數(shù)據(jù)該如何處理?
每個(gè)map task都有一個(gè)內(nèi)存緩沖區(qū),存儲(chǔ)著map的輸出結(jié)果,當(dāng)緩沖區(qū)快滿的時(shí)候需要將緩沖區(qū)的數(shù)據(jù)以一個(gè)臨時(shí)文件的方式存放到磁盤,當(dāng)整個(gè)map task結(jié)束后再對磁盤中這個(gè)map task產(chǎn)生的所有臨時(shí)文件做合并,生成最終的正式輸出文件,然后等待reduce task來拉數(shù)據(jù)。
MapReduce提供Partitioner接口,它的作用是什么?
MapReduce提供Partitioner接口,它的作用就是根據(jù)key或value及reduce的數(shù)量來決定當(dāng)前的這對輸出數(shù)據(jù)最終應(yīng)該交由哪個(gè)reduce task處理。默認(rèn)對key hash后再以reduce task數(shù)量取模。默認(rèn)的取模方式只是為了平均reduce的處理能力,如果用戶自己對Partitioner有需求,可以訂制并設(shè)置到j(luò)ob上。
什么是溢寫?
在一定條件下將緩沖區(qū)中的數(shù)據(jù)臨時(shí)寫入磁盤,然后重新利用這塊緩沖區(qū)。這個(gè)從內(nèi)存往磁盤寫數(shù)據(jù)的過程被稱為Spill,中文可譯為溢寫。
溢寫是為什么不影響往緩沖區(qū)寫map結(jié)果的線程?
溢寫線程啟動(dòng)時(shí)不應(yīng)該阻止map的結(jié)果輸出,所以整個(gè)緩沖區(qū)有個(gè)溢寫的比例spill.percent。這個(gè)比例默認(rèn)是0.8,也就是當(dāng)緩沖區(qū)的數(shù)據(jù)已經(jīng)達(dá)到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫線程啟動(dòng),鎖定這80MB的內(nèi)存,執(zhí)行溢寫過程。Map task的輸出結(jié)果還可以往剩下的20MB內(nèi)存中寫,互不影響。
當(dāng)溢寫線程啟動(dòng)后,需要對這80MB空間內(nèi)的key做排序(Sort)。排序是MapReduce模型默認(rèn)的行為,這里的排序也是對誰的排序?
當(dāng)溢寫線程啟動(dòng)后,需要對這80MB空間內(nèi)的key做排序(Sort)。排序是MapReduce模型默認(rèn)的行為,這里的排序也是對序列化的字節(jié)做的排序。
溢寫過程中如果有很多個(gè)key/value對需要發(fā)送到某個(gè)reduce端去,那么如何處理這些key/value值?
如果有很多個(gè)key/value對需要發(fā)送到某個(gè)reduce端去,那么需要將這些key/value值拼接到一塊,減少與partition相關(guān)的索引記錄。
哪些場景才能使用Combiner呢?
Combiner的輸出是Reducer的輸入,Combiner絕不能改變最終的計(jì)算結(jié)果。所以從我的想法來看,Combiner只應(yīng)該用于那種Reduce的輸入key/value與輸出key/value類型完全一致,且不影響最終結(jié)果的場景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它對job執(zhí)行效率有幫助,反之會(huì)影響reduce的最終結(jié)果。
Merge的作用是什么?
最終磁盤中會(huì)至少有一個(gè)這樣的溢寫文件存在(如果map的輸出結(jié)果很少,當(dāng)map執(zhí)行完成時(shí),只會(huì)產(chǎn)生一個(gè)溢寫文件),因?yàn)樽罱K的文件只有一個(gè),所以需要將這些溢寫文件歸并到一起,這個(gè)過程就叫做Merge
每個(gè)reduce task不斷的通過什么協(xié)議從JobTracker那里獲取map task是否完成的信息?
每個(gè)reduce task不斷地通過RPC從JobTracker那里獲取map task是否完成的信息
reduce中Copy過程采用是什么協(xié)議?
Copy過程,簡單地拉取數(shù)據(jù)。Reduce進(jìn)程啟動(dòng)一些數(shù)據(jù)copy線程(Fetcher),通過HTTP方式請求map task所在的TaskTracker獲取map task的輸出文件。
reduce中merge過程有幾種方式?
merge有三種形式:1)內(nèi)存到內(nèi)存 2)內(nèi)存到磁盤 3)磁盤到磁盤。默認(rèn)情況下第一種形式不啟用,讓人比較困惑,是吧。當(dāng)內(nèi)存中的數(shù)據(jù)量到達(dá)一定閾值,就啟動(dòng)內(nèi)存到磁盤的merge。與map 端類似,這也是溢寫的過程,這個(gè)過程中如果你設(shè)置有Combiner,也是會(huì)啟用的,然后在磁盤中生成了眾多的溢寫文件。第二種merge方式一直在運(yùn)行,直到?jīng)]有map端的數(shù)據(jù)時(shí)才結(jié)束,然后啟動(dòng)第三種磁盤到磁盤的merge方式生成最終的那個(gè)文件。