本篇內(nèi)容主要講解“MapReduce大型集群上的簡化數(shù)據(jù)怎么處理”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“MapReduce大型集群上的簡化數(shù)據(jù)怎么處理”吧!
成都創(chuàng)新互聯(lián)公司主營彌渡網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,手機(jī)APP定制開發(fā),彌渡h5微信小程序開發(fā)搭建,彌渡網(wǎng)站營銷推廣歡迎彌渡等地區(qū)企業(yè)咨詢
MapReduce是一個編程模型,以及處理和生成大型數(shù)據(jù)集的一個相關(guān)實現(xiàn),它適合各種各樣的現(xiàn)實任務(wù)。用戶指定計算的map和reduce函數(shù)。底層運行系統(tǒng)自動地將大規(guī)模集群機(jī)器間的計算并行化,處理機(jī)器故障,以及調(diào)度機(jī)器間通信以充分利用網(wǎng)絡(luò)和磁盤。程序員會發(fā)現(xiàn)這個系統(tǒng)很好使用:在過去的去年中,超過一萬個不同的MapReduce程序已經(jīng)在Google內(nèi)部實現(xiàn),平均每天有十萬個MapReuce作業(yè)在Google集群上被執(zhí)行,每天總共處理20PB以上的數(shù)據(jù)。
在MapReduce開發(fā)之前,作者和其他許多的Google員工實現(xiàn)了數(shù)以百計的處理大量原始數(shù)據(jù)(如抓取到的文檔、Web請求日志等等)的專用計算方法,以計算各種導(dǎo)出的數(shù)據(jù),如倒排索引、Web文檔圖結(jié)構(gòu)的各種表示、每個host抓取到的頁面數(shù)的總結(jié)、某一天最頻繁的一組查詢。大多數(shù)這樣的計算在概念上是非常簡單的,然而它們的輸入數(shù)據(jù)量通常非常大。為了在合理的時間內(nèi)完成這些計算,它們必須分布到成百上千的機(jī)器上。如何并行化計算,分發(fā)數(shù)據(jù),以及處理故障,這些問題結(jié)合起來,往往會讓程序員使用大量復(fù)雜代碼來處理,而掩蓋了原本簡單的計算。
為了應(yīng)對這一復(fù)雜性,我們設(shè)計了一個新的抽象,它允許我們表達(dá)試圖執(zhí)行的簡單計算,但將并行化、容錯、數(shù)據(jù)分布和負(fù)載均衡等凌亂的細(xì)節(jié)隱藏到了庫中。這個抽象的靈感來源于出現(xiàn)在Lisp和許多其他函數(shù)式語言中的map和reduce原語。我們實現(xiàn)了大部分的計算,包括為輸入的每一個邏輯記錄應(yīng)用一個map操作以計算一組中間鍵值對,然后對所有共享同一個鍵的值應(yīng)用一個reduce操作以恰當(dāng)?shù)亟Y(jié)合導(dǎo)出的數(shù)據(jù)。此函數(shù)式模型支持用戶自定義map和reduce操作,使我們能非常容易地并行處理大型計算,和使用再執(zhí)行(reexecution)作為主要的容錯機(jī)制。
這項工作的主要貢獻(xiàn)就是一個簡單而強(qiáng)大的接口,它完成自動并行化、大規(guī)模分布計算,結(jié)合該接口的一個實現(xiàn)在大型商用PC集群上獲得了很高的性能表現(xiàn)。該編程模型還可以用于同一臺機(jī)器上多個核心間的并行計算。
第2部分描述了基本的編程模型并給出幾個例子。第3部分描述了MapReduce接口專門針對基于集群的計算環(huán)境的一個實現(xiàn)。第4部分描述了我們發(fā)現(xiàn)的這個編程模型的幾個很有用的改進(jìn)(refinements)。第5部分描述了對各種不同任務(wù)的實現(xiàn)的性能度量。第6部分探索了MapReduce在Google中的應(yīng)用,包括使用它作為重寫我們的生產(chǎn)索引系統(tǒng)的基礎(chǔ)的一些經(jīng)驗。第7部分討論了相關(guān)和未來的工作。
這個計算需要一組輸入鍵/值對,并生成一組輸出鍵/值對。MapReduce庫的使用者將計算表達(dá)為兩個函數(shù):map和reduce。
map,由用戶編寫,需要一對輸入并生成一組中間鍵/值對。MapReduce庫將所有與相同鍵值 I 相關(guān)聯(lián)的值組合到一起,并將它們傳遞給reduce函數(shù)。
Reduce函數(shù),同樣由用戶編寫,接受中間鍵 I 和這個鍵的一組值。它將這些值合并以形成一組可能更小的值。通常每次reduce調(diào)用只生成0個或1個輸出值。中間值靠一個迭代器提供給用戶的reduce函數(shù)。這使我們能夠處理大量太大以至于不能裝入內(nèi)存的值列表。
考慮一下在一個巨大的文檔集合中統(tǒng)計每個單詞出現(xiàn)次數(shù)的問題。使用者會編寫與下面?zhèn)未a類似的代碼:
map(String key, String value); // key: document name // value: document contents for each word w in value: EmitIntermediate(w, "1"); reduce(String key, String values); // key: a word // values: a list of counts int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result));
map函數(shù)發(fā)出每個單詞加一個相關(guān)的出現(xiàn)次數(shù)(count)(在這個簡單例子中僅為1)。reduce函數(shù)對發(fā)給一個單詞的所有數(shù)(count)求和。
此外,用戶編寫代碼將輸入和輸出文件名以及可選的調(diào)優(yōu)參數(shù)填入mapreduce規(guī)范對象中。然后調(diào)用MapReduce函數(shù),將它傳遞給規(guī)范對象。用戶的代碼與MapReduce庫(C++實現(xiàn))相連接。我們最初的MapReduce資料中有這個例子的完整程序【8】。
盡管前面的偽代碼是按照輸入輸出字符串形式編寫的,概念上由用戶提供的map和reduce函數(shù)是有相關(guān)類型的。
map (k1, v1) --> list(k2, v2) reduce (k2, list(v2)) --> list(v2)
也就是說,輸入鍵和值與輸出鍵和值來自不同的域。此外,中間鍵和值與輸出鍵和值來自同一個域。
MapRedue接口的許多不同實現(xiàn)都是可能的。正確的選擇取決于環(huán)境。例如,一種實現(xiàn)可能適合一個小型的共享內(nèi)存的機(jī)器,另外一種可能適合一個大型的NUMA多處理器,而另外一種可能適合一個更大的聯(lián)網(wǎng)計算機(jī)集合。在我們最初的文章發(fā)表以后,已經(jīng)發(fā)展出了很多MapReduce的開源實現(xiàn)【1, 2】,MapReduce在各種問題領(lǐng)域的適用性也得到了研究【7, 16】。
這一部分描述了我們的一種MapReduce實現(xiàn),其目標(biāo)是目前廣泛應(yīng)用在Google中的計算環(huán)境:由交換千兆以太網(wǎng)連接在一起的大型PC集群【4】。在該環(huán)境中,機(jī)器通常運行Linux系統(tǒng),有雙核 x86 處理器以及4-8GB內(nèi)存。個別機(jī)器擁有1GB/s的網(wǎng)絡(luò)帶寬,但每臺機(jī)器等分的帶寬遠(yuǎn)遠(yuǎn)低于1GB/s。一個計算集群包含了成千上萬臺機(jī)器,因此機(jī)器故障是很常見的。存儲由直接附在單獨機(jī)器上的廉價IDE磁盤提供。GFS,Google內(nèi)部開發(fā)的一個分布式文件系統(tǒng)【10】,用來管理存儲在這些磁盤上的數(shù)據(jù)。文件系統(tǒng)使用復(fù)制來提供不可靠的硬件之上的可用性與可靠性。
使用者提交 jobs 給調(diào)度系統(tǒng)。每個 job 包含一組任務(wù),且由調(diào)度程序映射(mapped)到集群間的一組可用的機(jī)器上。
通過自動將輸入數(shù)據(jù)分割為一個有M個分裂(splits)的組,map調(diào)用分布在多臺機(jī)器間。輸入分裂可以由不同的機(jī)器并行處理。reduce調(diào)用通過利用分割函數(shù)(比如,hash(key) mod R)將中間鍵空間劃分為R片進(jìn)行分布。分割數(shù)R和分割函數(shù)都是由使用者指定的。
圖1展示了在我們的實現(xiàn)中MapReduce操作的整體流程。當(dāng)用戶程序調(diào)用MapReduce函數(shù),以下順序行為將會發(fā)生(圖1中標(biāo)記的數(shù)字對應(yīng)下面列中的數(shù)字)。
用戶程序中的MapReduce庫首先將輸入文件劃分為M片,通常每片16~64MB(由用戶通過可選參數(shù)控制)。然后啟動集群中程序的多個副本。
這些程序副本中有一個特殊的master副本。其他副本則是由master分配了work的workers。集群中需要分配 M 個 map tasks 和 R 個 reduce tasks。master挑選閑置的workers且為每個worker分配一個 map task 或 reduce task。
分配了 map task 的一個worker讀取相應(yīng)輸入劃分的內(nèi)容。它從輸入數(shù)據(jù)中解析出鍵/值對并將每一對傳遞給用戶定義的map函數(shù)。由map函數(shù)產(chǎn)生的中間鍵/值對緩沖在內(nèi)存中。
緩沖區(qū)的鍵/值對定期地寫入本地磁盤,由partition函數(shù)劃分到 R 個區(qū)域中。這些本地磁盤上的緩沖對的位置被傳遞會master,它將負(fù)責(zé)轉(zhuǎn)發(fā)這些位置給 reduce workers。
當(dāng)一個 reduce worker 被 master 通知了這些位置后,它使用遠(yuǎn)程進(jìn)程調(diào)用來讀取來自map workers的本地磁盤中的緩沖數(shù)據(jù)。當(dāng)reduce worker讀取到了所有分區(qū)中的中間數(shù)據(jù)后,它按照中間鍵將其排序,從而使所有相同鍵的出現(xiàn)次數(shù)組合在了一起。排序是必要的,因為通常很多不同的鍵被map到了同一個reduce task。如果中間數(shù)據(jù)太大以至于不能放在內(nèi)存中,還需要使用一個外部的排序。
reduce worker對排序好的中間數(shù)據(jù)執(zhí)行迭代,對每個唯一的中間鍵,它將這個鍵以及相應(yīng)的一組中間值傳遞個用戶的 reduce 函數(shù)。reduce 函數(shù)的輸出被附加到這個reduce分區(qū)的最終輸出文件中。
當(dāng)所有的 map tasks 和 reduce tasks 都完成后,master喚醒用戶程序。在這一點上,用戶程序的MapReduce調(diào)用返回到用戶代碼處。
成功完成后,mapreduce執(zhí)行的輸出可以在R個輸出文件中獲得(每個reduce task一個,由用戶指定文件名)。通常,用戶無需將這R個輸出文件合并到一個文件中;他們通常將這些文件作為另一個MapReduce調(diào)用的輸入,或者在來自另外一個可以處理劃分到了多個文件中的輸入的分布式應(yīng)用程序中使用它們。
master中有多種數(shù)據(jù)結(jié)構(gòu)。對每一個map task和reduce task,它存儲了其狀態(tài)信息(限制,進(jìn)行,或完成)和worker機(jī)器的身份(對于非閑置tasks)。
master是map tasks傳播中間文件區(qū)域位置到reduce tasks的導(dǎo)管。因此,對于每個完成了的map task,master存儲由這個map task生成的R個中間文件區(qū)域的位置和大小。master在map tasks稱后接收到這些位置和大小信息的更新。這些信息將逐步推送到正在進(jìn)行reduce tasks的workers中。
由于MapReduce庫旨在幫助利用成百上千的機(jī)器來處理大量數(shù)據(jù),它必須優(yōu)雅地容忍機(jī)器故障。
master會定期地ping每一個worker。如果在一定時間內(nèi)沒有收到來自某臺worker的響應(yīng),master將這個worker標(biāo)記為故障。任何由worker完成的map tasks都被重置為初始閑置狀態(tài),因而可以在其他的workers中調(diào)度。同樣,在故障worker上的任何正在進(jìn)行的map task和reduce task也被重置為閑置狀態(tài)以便進(jìn)行重新調(diào)度。
故障worker上已完成的map task需要重新執(zhí)行,因為它們的輸出存儲在了故障機(jī)器的本地磁盤中導(dǎo)致無法訪問。已完成的reduce tasks無需重新執(zhí)行,因為它們的輸出存儲在了全局文件系統(tǒng)中。
當(dāng)一個map task首先由worker A執(zhí)行然后又由worker B執(zhí)行(因為A發(fā)生了故障),所有執(zhí)行reduce task的workers將被通知重新執(zhí)行。任何還未從worker A讀取數(shù)據(jù)的reduce task將從worker B讀取數(shù)據(jù)。
MapReduce適應(yīng)于大規(guī)模的worker故障。例如,在一個MapReduce操作中,在運行中的集群上的網(wǎng)絡(luò)維護(hù)導(dǎo)致了一組80臺機(jī)器在幾分鐘內(nèi)無法到達(dá)。MapReduce master簡單地重新執(zhí)行無法到達(dá)的worker機(jī)器的工作且繼續(xù)前進(jìn),最終完成MapReduce操作。
當(dāng)用戶提供的map和reduce操作是它們他們的輸入值的特定函數(shù)時,我們的分布式實現(xiàn)生成的輸出將與整個程序的無錯順序執(zhí)行生成的輸出相同。
我們依靠map和reduce任務(wù)輸出的原子的提交來實現(xiàn)這一性質(zhì)。每個正在進(jìn)行的task將其輸出寫入私有臨時文件中。一個reduce task生成一個這樣的文件,map task生成R個這樣的文件(每個reduce task一個)。當(dāng)一個map task完成后,worker發(fā)送一條消息給master,這條消息中包含了R個臨時文件的名字。如果master接收到了來自一個已完成的map task的完成消息,它將忽略這條消息。否則,它將這R個文件名記錄到master數(shù)據(jù)結(jié)構(gòu)中。
當(dāng)一個reduce task完成后,reduce worker自動重命名其臨時輸出文件為最終輸出文件。如果同一個reduce task在多臺機(jī)器上執(zhí)行,同一個最終輸出文件的多個重命名調(diào)用將被執(zhí)行。我們依靠由底層文件系統(tǒng)提供的原子的重命名操作來保證最終文件系統(tǒng)狀態(tài)僅包含來自一個reduce任務(wù)執(zhí)行生成的數(shù)據(jù)。
絕大多數(shù)的map和reduce操作是確定的,事實上,我們的語義等價于這種情況下的一次順序執(zhí)行,這使得程序員能夠非常容易地推斷程序的行為。當(dāng) map 和/或 reduce 操作不確定時,我們提供了較弱但仍然合理的語義。在不確定操作存在時,一個特定reduce task R1的輸出等價于由非確定性程序的一次順序執(zhí)行R1生成的輸出。然而,另一個不同的reduce task R2的輸出可能對應(yīng)該非確定性程序的另一個不同順序執(zhí)行R2的輸出。
考慮map task M和reduce task R1和R2。令 e(Ri) 作為作為R1的執(zhí)行(這確實是一個這樣的執(zhí)行)。較弱的語義出現(xiàn)因為 e(R1) 可能讀取了M的一次執(zhí)行生成的輸出,e(R2)可能讀取了M的另一次執(zhí)行生成的輸出。
在我們的計算環(huán)境中,網(wǎng)絡(luò)帶寬是一個相對稀缺的資源。我們靠充分利用輸入數(shù)據(jù)(由GFS管理【10】)存儲在組成集群的機(jī)器的本地磁盤中這一事實來節(jié)省網(wǎng)絡(luò)帶寬。GFS將每個文件分成64MB的塊且在不同機(jī)器上存儲了每個塊的多個副本(通常3個)。MapReeuce master考慮每個輸入文件的位置信息且試圖調(diào)度一臺含有相應(yīng)輸入數(shù)據(jù)的機(jī)器上的一個map task。如果失敗,它將試圖調(diào)度與該任務(wù)的輸入的復(fù)制品相鄰的一個map task(例如,同一網(wǎng)絡(luò)交換機(jī)中包含相同數(shù)據(jù)的兩臺機(jī)器)。當(dāng)在一個集群的 workers 重要部分運行大型MapReduce操作時,大多數(shù)輸入數(shù)據(jù)都是本地讀取的,并不消耗網(wǎng)絡(luò)帶寬。
我們將map階段細(xì)分為M個片段,reduce階段細(xì)分為R個片段,如前所述。理想情況下,M和R應(yīng)該遠(yuǎn)高于worker機(jī)器的數(shù)量。每個worker執(zhí)行多個不同tasks改善了負(fù)載均衡,且當(dāng)一個worker故障后加快了恢復(fù)速度:它完成的多個map tasks可以分布到所有其他worker機(jī)器上重新執(zhí)行。
由于master必須做O(M+R)此調(diào)度決策和在內(nèi)存中保持O(M*R)個狀態(tài),如前所述,在我們的實現(xiàn)中M和R的數(shù)量大小是有實際界限的。(然而,內(nèi)存的使用量很小。O(M*R)個狀態(tài)中大約包含每個map/reduce task對一字節(jié)的數(shù)據(jù)。)
此外,R通常受到用戶限制,因為每個reduce task的輸出最終保存在一個單獨的輸出文件中。在實踐中,我們傾向于選擇M因而每個獨立task大約有16MB到64MB的輸入數(shù)據(jù)(因而之前所述的局部優(yōu)化達(dá)到最搞笑),且我們讓R是我們希望使用的機(jī)器數(shù)量的一個小的倍數(shù)。我們通常以M=200000, R=5000,使用2000臺worker機(jī)器執(zhí)行MapReduce。
延長MapReduce操作總時間的一個普遍原因是一個掉隊者(straggler),也就是說,在這個計算中有一臺機(jī)器花了異常長的時間來完成最后幾個map或reduce tasks。掉隊者會以一大堆的理由出現(xiàn)。比如說,一臺擁有壞磁盤的機(jī)器可能經(jīng)歷頻繁的矯正錯誤從而使讀取性能從30MB/s降低到了1MB/s。集群調(diào)度系統(tǒng)可能在這個機(jī)器上調(diào)度了其他任務(wù),導(dǎo)致它更慢地執(zhí)行MapReduce代碼,由于競爭CPU、內(nèi)存、本地磁盤或網(wǎng)絡(luò)帶寬等資源。我們經(jīng)歷的一個最近的問題是機(jī)器初始化代碼中的一個bug導(dǎo)致處理器緩存失效:受影響的機(jī)器計算速度放慢了100倍。
我們有一個通用機(jī)制來減輕掉隊者問題。當(dāng)一個MapReduce操作接近完成時,master將調(diào)度還在進(jìn)行的任務(wù)的備份執(zhí)行。無論是原始或者備份執(zhí)行完成,這個任務(wù)都被標(biāo)記為完成。我們調(diào)整了這個機(jī)制,因而它增加了該計算的計算資源的使用,但不超過幾個百分點。我們發(fā)現(xiàn)它大大降低了完成大型MapReduce操作的時間。作為一個例子,當(dāng)沒有備份task機(jī)制時,在5.3部分描述的排序程序多花了44%的時間完成。
雖然由簡單編寫的map和reduce函數(shù)提供的基本功能已足以滿足大多數(shù)需求,我們發(fā)現(xiàn)了一些有用的擴(kuò)展。這包括:
用戶指定的分區(qū)(partition)函數(shù)來決定如何將中間鍵值對映射到R個reduce碎片;
排序保證:我們的實現(xiàn)保證這R個reduce分區(qū)中的每個,中間鍵值對都按鍵的升序處理;
用戶指定的結(jié)合(combiner)函數(shù)的作用是,在同一個map task內(nèi),對按照同一個鍵生成的中間值進(jìn)行局部結(jié)合,以減少必須在網(wǎng)絡(luò)間傳輸?shù)闹虚g數(shù)據(jù)數(shù)量;
自定義輸入輸出類型,為了讀新的輸入格式和生成新的輸出格式;
在單機(jī)上執(zhí)行簡單debug和小規(guī)模測試的一種方式。
在【8】中有對這幾項的詳細(xì)討論。
在此部分,我們利用大型集群上的兩個計算來測量MapReduce的性能表現(xiàn)。一個計算通過搜索大約1TB的數(shù)據(jù)來找到一個特定的模式。另一個計算對大約1TB的數(shù)據(jù)進(jìn)行排序。這兩個程序代表由MapReduce用戶編寫的真正程序的一個大的子集-----程序的一個類用來從一個表示(representation)向另一個表示shuffle數(shù)據(jù),另一個類從大數(shù)據(jù)集中提取小部分關(guān)注的數(shù)據(jù)。
所有程序都在一個擁有大約1800臺機(jī)器的集群上執(zhí)行。每臺機(jī)器擁有兩個支持超線程的2GHz的Intel Xeon處理器,4GB內(nèi)存,兩個160GB的IDE磁盤,和千兆以太網(wǎng)接入。這些機(jī)器被安排在一個二級樹形的交換網(wǎng)絡(luò)中,該網(wǎng)絡(luò)根部大約有100~200Gbps的聚合帶寬。所有機(jī)器都在同一個托管設(shè)施中,因此任何一對機(jī)器間的往返通信時間不超過1毫秒。
雖然有4GB內(nèi)存,但是大約1~1.5GB保留給了運行在集群上的其他任務(wù)。這些程序在一個周末的下午執(zhí)行,此時CPUs,磁盤和網(wǎng)絡(luò)帶寬基本都空閑。
grep程序掃描了10^10個100字節(jié)的記錄,搜索一個相對稀有的三字符模式串(該模式串大約出現(xiàn)在92337個記錄中)。輸入被劃分為了大約64MB大小的片(M=15000),整個輸出都放在了一個文件中(R=1)。
圖2展示了計算隨時間推移的進(jìn)展。Y軸顯示了輸入數(shù)據(jù)的掃描速率。隨著安排到MapReduce計算的機(jī)器越來越多,速率也在逐步提升,當(dāng)安排了1764個workers時速度達(dá)到峰值30GB/s以上。map任務(wù)結(jié)束后,速率來時下降且在大約80秒時到達(dá)0。整個計算從開始到結(jié)束大約花費了150秒。這包括1分鐘的啟動消耗。這個消耗來自向所有workers機(jī)器傳播程序、延遲與GFS的交互以開啟一組1000個輸入文件,和獲取局部優(yōu)化所需的信息。
到此,相信大家對“MapReduce大型集群上的簡化數(shù)據(jù)怎么處理”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!