這篇文章主要講解了“Hadoop和Spark的Shuffle過(guò)程有什么不同”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Hadoop和Spark的Shuffle過(guò)程有什么不同”吧!
愛(ài)民ssl適用于網(wǎng)站、小程序/APP、API接口等需要進(jìn)行數(shù)據(jù)傳輸應(yīng)用場(chǎng)景,ssl證書未來(lái)市場(chǎng)廣闊!成為創(chuàng)新互聯(lián)的ssl證書銷售渠道,可以享受市場(chǎng)價(jià)格4-6折優(yōu)惠!如果有意向歡迎電話聯(lián)系或者加微信:028-86922220(備注:SSL證書合作)期待與您的合作!
一、前言
對(duì)于基于MapReduce編程范式的分布式計(jì)算來(lái)說(shuō),本質(zhì)上而言,就是在計(jì)算數(shù)據(jù)的交、并、差、聚合、排序等過(guò)程。而分布式計(jì)算分而治之的思想,讓每個(gè)節(jié)點(diǎn)只計(jì)算部分?jǐn)?shù)據(jù),也就是只處理一個(gè)分片,那么要想求得某個(gè)key對(duì)應(yīng)的全量數(shù)據(jù),那就必須把相同key的數(shù)據(jù)匯集到同一個(gè)Reduce任務(wù)節(jié)點(diǎn)來(lái)處理,那么Mapreduce范式定義了一個(gè)叫做Shuffle的過(guò)程來(lái)實(shí)現(xiàn)這個(gè)效果。
二、編寫本文的目的
本文旨在剖析Hadoop和Spark的Shuffle過(guò)程,并對(duì)比兩者Shuffle的差異。
三、Hadoop的Shuffle過(guò)程
Shuffle描述的是數(shù)據(jù)從Map端到Reduce端的過(guò)程,大致分為排序(sort)、溢寫(spill)、合并(merge)、拉取拷貝(Copy)、合并排序(merge sort)這幾個(gè)過(guò)程,大體流程如下:
![image](/upload/otherpic61/e4ccedfb6ccaaa0d3c0ad5b3b7ab83d96dd9fed2.png)
上圖的Map的輸出的文件被分片為紅綠藍(lán)三個(gè)分片,這個(gè)分片的就是根據(jù)Key為條件來(lái)分片的,分片算法可以自己實(shí)現(xiàn),例如Hash、Range等,最終Reduce任務(wù)只拉取對(duì)應(yīng)顏色的數(shù)據(jù)來(lái)進(jìn)行處理,就實(shí)現(xiàn)把相同的Key拉取到相同的Reduce節(jié)點(diǎn)處理的功能。下面分開(kāi)來(lái)說(shuō)Shuffle的的各個(gè)過(guò)程。
Map端做了下圖所示的操作:
1、Map端sort
Map端的輸出數(shù)據(jù),先寫環(huán)形緩存區(qū)kvbuffer,當(dāng)環(huán)形緩沖區(qū)到達(dá)一個(gè)閥值(可以通過(guò)配置文件設(shè)置,默認(rèn)80),便要開(kāi)始溢寫,但溢寫之前會(huì)有一個(gè)sort操作,這個(gè)sort操作先把Kvbuffer中的數(shù)據(jù)按照partition值和key兩個(gè)關(guān)鍵字來(lái)排序,移動(dòng)的只是索引數(shù)據(jù),排序結(jié)果是Kvmeta中數(shù)據(jù)按照partition為單位聚集在一起,同一partition內(nèi)的按照key有序。
2、spill(溢寫) 當(dāng)排序完成,便開(kāi)始把數(shù)據(jù)刷到磁盤,刷磁盤的過(guò)程以分區(qū)為單位,一個(gè)分區(qū)寫完,寫下一個(gè)分區(qū),分區(qū)內(nèi)數(shù)據(jù)有序,最終實(shí)際上會(huì)多次溢寫,然后生成多個(gè)文件 3、merge(合并) spill會(huì)生成多個(gè)小文件,對(duì)于Reduce端拉取數(shù)據(jù)是相當(dāng)?shù)托У?,那么這時(shí)候就有了merge的過(guò)程,合并的過(guò)程也是同分片的合并成一個(gè)片段(segment),最終所有的segment組裝成一個(gè)最終文件,那么合并過(guò)程就完成了,如下圖所示
至此,Map的操作就已經(jīng)完成,Reduce端操作即將登場(chǎng)
Reduce操作
總體過(guò)程如下圖的紅框處:
![image](/upload/otherpic61/71a52ed4799d3dbbde4552028f3aea05bc1c98c0.png) 1、拉取拷貝(fetch copy)
Reduce任務(wù)通過(guò)向各個(gè)Map任務(wù)拉取對(duì)應(yīng)分片。這個(gè)過(guò)程都是以Http協(xié)議完成,每個(gè)Map節(jié)點(diǎn)都會(huì)啟動(dòng)一個(gè)常駐的HTTP server服務(wù),Reduce節(jié)點(diǎn)會(huì)請(qǐng)求這個(gè)Http Server拉取數(shù)據(jù),這個(gè)過(guò)程完全通過(guò)網(wǎng)絡(luò)傳輸,所以是一個(gè)非常重量級(jí)的操作。
2、合并排序
Reduce端,拉取到各個(gè)Map節(jié)點(diǎn)對(duì)應(yīng)分片的數(shù)據(jù)之后,會(huì)進(jìn)行再次排序,排序完成,結(jié)果丟給Reduce函數(shù)進(jìn)行計(jì)算。
四、總結(jié)
至此整個(gè)shuffle過(guò)程完成,***總結(jié)幾點(diǎn):
shuffle過(guò)程就是為了對(duì)key進(jìn)行全局聚合
排序操作伴隨著整個(gè)shuffle過(guò)程,所以Hadoop的shuffle是sort-based的
Spark shuffle相對(duì)來(lái)說(shuō)更簡(jiǎn)單,因?yàn)椴灰笕钟行?,所以沒(méi)有那么多排序合并的操作。Spark shuffle分為write和read兩個(gè)過(guò)程。我們先來(lái)看shuffle write。
一、shuffle write
shuffle write的處理邏輯會(huì)放到該ShuffleMapStage的***(因?yàn)閟park以shuffle發(fā)生與否來(lái)劃分stage,也就是寬依賴),final RDD的每一條記錄都會(huì)寫到對(duì)應(yīng)的分區(qū)緩存區(qū)bucket,如下圖所示:
說(shuō)明:
上圖有2個(gè)CPU,可以同時(shí)運(yùn)行兩個(gè)ShuffleMapTask
每個(gè)task將寫一個(gè)buket緩沖區(qū),緩沖區(qū)的數(shù)量和reduce任務(wù)的數(shù)量相等
每個(gè)buket緩沖區(qū)會(huì)生成一個(gè)對(duì)應(yīng)ShuffleBlockFile
ShuffleMapTask 如何決定數(shù)據(jù)被寫到哪個(gè)緩沖區(qū)呢?這個(gè)就是跟partition算法有關(guān)系,這個(gè)分區(qū)算法可以是hash的,也可以是range的
最終產(chǎn)生的ShuffleBlockFile會(huì)有多少呢?就是ShuffleMapTask 數(shù)量乘以reduce的數(shù)量,這個(gè)是非常巨大的
那么有沒(méi)有辦法解決生成文件過(guò)多的問(wèn)題呢?有,開(kāi)啟FileConsolidation即可,開(kāi)啟FileConsolidation之后的shuffle過(guò)程如下:
在同一核CPU執(zhí)行先后執(zhí)行的ShuffleMapTask可以共用一個(gè)bucket緩沖區(qū),然后寫到同一份ShuffleFile里去,上圖所示的ShuffleFile實(shí)際上是用多個(gè)ShuffleBlock構(gòu)成,那么,那么每個(gè)worker最終生成的文件數(shù)量,變成了cpu核數(shù)乘以reduce任務(wù)的數(shù)量,大大縮減了文件量。
二、Shuffle read
Shuffle write過(guò)程將數(shù)據(jù)分片寫到對(duì)應(yīng)的分片文件,這時(shí)候萬(wàn)事具備,只差去拉取對(duì)應(yīng)的數(shù)據(jù)過(guò)來(lái)計(jì)算了。
那么Shuffle Read發(fā)送的時(shí)機(jī)是什么?是要等所有ShuffleMapTask執(zhí)行完,再去fetch數(shù)據(jù)嗎?理論上,只要有一個(gè) ShuffleMapTask執(zhí)行完,就可以開(kāi)始fetch數(shù)據(jù)了,實(shí)際上,spark必須等到父stage執(zhí)行完,才能執(zhí)行子stage,所以,必須等到所有 ShuffleMapTask執(zhí)行完畢,才去fetch數(shù)據(jù)。fetch過(guò)來(lái)的數(shù)據(jù),先存入一個(gè)Buffer緩沖區(qū),所以這里一次性fetch的FileSegment不能太大,當(dāng)然如果fetch過(guò)來(lái)的數(shù)據(jù)大于每一個(gè)閥值,也是會(huì)spill到磁盤的。
fetch的過(guò)程過(guò)來(lái)一個(gè)buffer的數(shù)據(jù),就可以開(kāi)始聚合了,這里就遇到一個(gè)問(wèn)題,每次fetch部分?jǐn)?shù)據(jù),怎么能實(shí)現(xiàn)全局聚合呢?以word count的reduceByKey(《Spark RDD操作之ReduceByKey 》)為例,假設(shè)單詞hello有十個(gè),但是一次fetch只拉取了2個(gè),那么怎么全局聚合呢?Spark的做法是用HashMap,聚合操作實(shí)際上是map.put(key,map.get(key)+1),將map中的聚合過(guò)的數(shù)據(jù)get出來(lái)相加,然后put回去,等到所有數(shù)據(jù)fetch完,也就完成了全局聚合。
感謝各位的閱讀,以上就是“Hadoop和Spark的Shuffle過(guò)程有什么不同”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Hadoop和Spark的Shuffle過(guò)程有什么不同這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!