真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

如何進(jìn)行Spark的shuffle實(shí)現(xiàn)

這篇文章將為大家詳細(xì)講解有關(guān)如何進(jìn)行Spark的shuffle實(shí)現(xiàn),文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。

創(chuàng)新互聯(lián)是一家專業(yè)提供北侖企業(yè)網(wǎng)站建設(shè),專注與做網(wǎng)站、成都網(wǎng)站設(shè)計(jì)、HTML5建站、小程序制作等業(yè)務(wù)。10年已為北侖眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)的建站公司優(yōu)惠進(jìn)行中。

Background

在MapReduce框架中,shuffle是連接Map和Reduce之間的橋梁,Map的輸出要用到Reduce中必須經(jīng)過shuffle這個(gè)環(huán)節(jié),shuffle的性能高低直接影響了整個(gè)程序的性能和吞吐量。Spark作為MapReduce框架的一種實(shí)現(xiàn),自然也實(shí)現(xiàn)了shuffle的邏輯,本文就深入研究Spark的shuffle是如何實(shí)現(xiàn)的,有什么優(yōu)缺點(diǎn),與Hadoop MapReduce的shuffle有什么不同。

Shuffle

Shuffle是MapReduce框架中的一個(gè)特定的phase,介于Map phase和Reduce phase之間,當(dāng)Map的輸出結(jié)果要被Reduce使用時(shí),輸出結(jié)果需要按key哈希,并且分發(fā)到每一個(gè)Reducer上去,這個(gè)過程就是shuffle。由于shuffle涉及到了磁盤的讀寫和網(wǎng)絡(luò)的傳輸,因此shuffle性能的高低直接影響到了整個(gè)程序的運(yùn)行效率。

下面這幅圖清晰地描述了MapReduce算法的整個(gè)流程,其中shuffle phase是介于Map phase和Reduce phase之間。

如何進(jìn)行Spark的shuffle實(shí)現(xiàn)

概念上shuffle就是一個(gè)溝通數(shù)據(jù)連接的橋梁,那么實(shí)際上shuffle這一部分是如何實(shí)現(xiàn)的的呢,下面我們就以Spark為例講一下shuffle在Spark中的實(shí)現(xiàn)。

Spark Shuffle進(jìn)化史

先以圖為例簡(jiǎn)單描述一下Spark中shuffle的整一個(gè)流程:

如何進(jìn)行Spark的shuffle實(shí)現(xiàn)

  • 首先每一個(gè)Mapper會(huì)根據(jù)Reducer的數(shù)量創(chuàng)建出相應(yīng)的bucket,bucket的數(shù)量是M×R,其中M是Map的個(gè)數(shù),R是Reduce的個(gè)數(shù)。

  • 其次Mapper產(chǎn)生的結(jié)果會(huì)根據(jù)設(shè)置的partition算法填充到每個(gè)bucket中去。這里的partition算法是可以自定義的,當(dāng)然默認(rèn)的算法是根據(jù)key哈希到不同的bucket中去。

  • 當(dāng)Reducer啟動(dòng)時(shí),它會(huì)根據(jù)自己task的id和所依賴的Mapper的id從遠(yuǎn)端或是本地的block manager中取得相應(yīng)的bucket作為Reducer的輸入進(jìn)行處理。

這里的bucket是一個(gè)抽象概念,在實(shí)現(xiàn)中每個(gè)bucket可以對(duì)應(yīng)一個(gè)文件,可以對(duì)應(yīng)文件的一部分或是其他等。

接下來我們分別從shuffle writeshuffle fetch這兩塊來講述一下Spark的shuffle進(jìn)化史。

Shuffle Write

在Spark 0.6和0.7的版本中,對(duì)于shuffle數(shù)據(jù)的存儲(chǔ)是以文件的方式存儲(chǔ)在block manager中,與rdd.persist(StorageLevel.DISk_ONLY)采取相同的策略,可以參看:

override def run(attemptId: Long): MapStatus = {  val numOutputSplits = dep.partitioner.numPartitions     ...    // Partition the map output.    val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)])    for (elem <- rdd.iterator(split, taskContext)) {      val pair = elem.asInstanceOf[(Any, Any)]      val bucketId = dep.partitioner.getPartition(pair._1)      buckets(bucketId) += pair    }    ...    val blockManager = SparkEnv.get.blockManager    for (i <- 0 until numOutputSplits) {      val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + i      // Get a Scala iterator from Java map      val iter: Iterator[(Any, Any)] = buckets(i).iterator      val size = blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false)      totalBytes += size    }  ...}

我已經(jīng)將一些干擾代碼刪去??梢钥吹絊park在每一個(gè)Mapper中為每個(gè)Reducer創(chuàng)建一個(gè)bucket,并將RDD計(jì)算結(jié)果放進(jìn)bucket中。需要注意的是每個(gè)bucket是一個(gè)ArrayBuffer,也就是說Map的輸出結(jié)果是會(huì)先存儲(chǔ)在內(nèi)存。

接著Spark會(huì)將ArrayBuffer中的Map輸出結(jié)果寫入block manager所管理的磁盤中,這里文件的命名方式為:shuffle_ + shuffle_id + "_" + map partition id + "_" + shuffle partition id。

早期的shuffle write有兩個(gè)比較大的問題:

  1. Map的輸出必須先全部存儲(chǔ)到內(nèi)存中,然后寫入磁盤。這對(duì)內(nèi)存是一個(gè)非常大的開銷,當(dāng)內(nèi)存不足以存儲(chǔ)所有的Map output時(shí)就會(huì)出現(xiàn)OOM。

  2. 每一個(gè)Mapper都會(huì)產(chǎn)生Reducer number個(gè)shuffle文件,如果Mapper個(gè)數(shù)是1k,Reducer個(gè)數(shù)也是1k,那么就會(huì)產(chǎn)生1M個(gè)shuffle文件,這對(duì)于文件系統(tǒng)是一個(gè)非常大的負(fù)擔(dān)。同時(shí)在shuffle數(shù)據(jù)量不大而shuffle文件又非常多的情況下,隨機(jī)寫也會(huì)嚴(yán)重降低IO的性能。

在Spark 0.8版本中,shuffle write采用了與RDD block write不同的方式,同時(shí)也為shuffle write單獨(dú)創(chuàng)建了ShuffleBlockManager,部分解決了0.6和0.7版本中遇到的問題。

首先我們來看一下Spark 0.8的具體實(shí)現(xiàn):

override def run(attemptId: Long): MapStatus = {  ...  val blockManager = SparkEnv.get.blockManager  var shuffle: ShuffleBlocks = null  var buckets: ShuffleWriterGroup = null  try {    // Obtain all the block writers for shuffle blocks.    val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)    shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser)    buckets = shuffle.acquireWriters(partition)    // Write the map output to its associated buckets.    for (elem <- rdd.iterator(split, taskContext)) {      val pair = elem.asInstanceOf[Product2[Any, Any]]      val bucketId = dep.partitioner.getPartition(pair._1)      buckets.writers(bucketId).write(pair)    }    // Commit the writes. Get the size of each bucket block (total block size).    var totalBytes = 0L    val compressedSizes: Array[Byte] = buckets.writers.map { writer:   BlockObjectWriter =>      writer.commit()      writer.close()      val size = writer.size()      totalBytes += size      MapOutputTracker.compressSize(size)    }    ...  } catch { case e: Exception =>    // If there is an exception from running the task, revert the partial writes    // and throw the exception upstream to Spark.    if (buckets != null) {      buckets.writers.foreach(_.revertPartialWrites())    }    throw e  } finally {    // Release the writers back to the shuffle block manager.    if (shuffle != null && buckets != null) {      shuffle.releaseWriters(buckets)    }    // Execute the callbacks on task completion.    taskContext.executeOnCompleteCallbacks()    }  }}

在這個(gè)版本中為shuffle write添加了一個(gè)新的類ShuffleBlockManager,由ShuffleBlockManager來分配和管理bucket。同時(shí)ShuffleBlockManager為每一個(gè)bucket分配一個(gè)DiskObjectWriter,每個(gè)write handler擁有默認(rèn)100KB的緩存,使用這個(gè)write handler將Map output寫入文件中??梢钥吹浆F(xiàn)在的寫入方式變?yōu)?code>buckets.writers(bucketId).write(pair),也就是說Map output的key-value pair是逐個(gè)寫入到磁盤而不是預(yù)先把所有數(shù)據(jù)存儲(chǔ)在內(nèi)存中在整體flush到磁盤中去。

ShuffleBlockManager的代碼如下所示:

private[spark]class ShuffleBlockManager(blockManager: BlockManager) {  def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): ShuffleBlocks = {    new ShuffleBlocks {      // Get a group of writers for a map task.      override def acquireWriters(mapId: Int): ShuffleWriterGroup = {        val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024        val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>          val blockId = ShuffleBlockManager.blockId(shuffleId, bucketId, mapId)          blockManager.getDiskBlockWriter(blockId, serializer, bufferSize)        }        new ShuffleWriterGroup(mapId, writers)      }      override def releaseWriters(group: ShuffleWriterGroup) = {        // Nothing really to release here.      }    }  }}

Spark 0.8顯著減少了shuffle的內(nèi)存壓力,現(xiàn)在Map output不需要先全部存儲(chǔ)在內(nèi)存中,再flush到硬盤,而是record-by-record寫入到磁盤中。同時(shí)對(duì)于shuffle文件的管理也獨(dú)立出新的ShuffleBlockManager進(jìn)行管理,而不是與rdd cache文件在一起了。

但是這一版Spark 0.8的shuffle write仍然有兩個(gè)大的問題沒有解決:

  • 首先依舊是shuffle文件過多的問題,shuffle文件過多一是會(huì)造成文件系統(tǒng)的壓力過大,二是會(huì)降低IO的吞吐量。

  • 其次雖然Map output數(shù)據(jù)不再需要預(yù)先在內(nèi)存中evaluate顯著減少了內(nèi)存壓力,但是新引入的DiskObjectWriter所帶來的buffer開銷也是一個(gè)不容小視的內(nèi)存開銷。假定我們有1k個(gè)Mapper和1k個(gè)Reducer,那么就會(huì)有1M個(gè)bucket,于此同時(shí)就會(huì)有1M個(gè)write handler,而每一個(gè)write handler默認(rèn)需要100KB內(nèi)存,那么總共需要100GB的內(nèi)存。這樣的話僅僅是buffer就需要這么多的內(nèi)存,內(nèi)存的開銷是驚人的。當(dāng)然實(shí)際情況下這1k個(gè)Mapper是分時(shí)運(yùn)行的話,所需的內(nèi)存就只有cores * reducer numbers * 100KB大小了。但是reducer數(shù)量很多的話,這個(gè)buffer的內(nèi)存開銷也是蠻厲害的。

為了解決shuffle文件過多的情況,Spark 0.8.1引入了新的shuffle consolidation,以期顯著減少shuffle文件的數(shù)量。

首先我們以圖例來介紹一下shuffle consolidation的原理。

如何進(jìn)行Spark的shuffle實(shí)現(xiàn)

假定該job有4個(gè)Mapper和4個(gè)Reducer,有2個(gè)core,也就是能并行運(yùn)行兩個(gè)task。我們可以算出Spark的shuffle write共需要16個(gè)bucket,也就有了16個(gè)write handler。在之前的Spark版本中,每一個(gè)bucket對(duì)應(yīng)的是一個(gè)文件,因此在這里會(huì)產(chǎn)生16個(gè)shuffle文件。

而在shuffle consolidation中每一個(gè)bucket并非對(duì)應(yīng)一個(gè)文件,而是對(duì)應(yīng)文件中的一個(gè)segment,同時(shí)shuffle consolidation所產(chǎn)生的shuffle文件數(shù)量與Spark core的個(gè)數(shù)也有關(guān)系。在上面的圖例中,job的4個(gè)Mapper分為兩批運(yùn)行,在第一批2個(gè)Mapper運(yùn)行時(shí)會(huì)申請(qǐng)8個(gè)bucket,產(chǎn)生8個(gè)shuffle文件;而在第二批Mapper運(yùn)行時(shí),申請(qǐng)的8個(gè)bucket并不會(huì)再產(chǎn)生8個(gè)新的文件,而是追加寫到之前的8個(gè)文件后面,這樣一共就只有8個(gè)shuffle文件,而在文件內(nèi)部這有16個(gè)不同的segment。因此從理論上講shuffle consolidation所產(chǎn)生的shuffle文件數(shù)量為C×R,其中C是Spark集群的core number,R是Reducer的個(gè)數(shù)。

需要注意的是當(dāng) M=C時(shí)shuffle consolidation所產(chǎn)生的文件數(shù)和之前的實(shí)現(xiàn)是一樣的。

Shuffle consolidation顯著減少了shuffle文件的數(shù)量,解決了之前版本一個(gè)比較嚴(yán)重的問題,但是writer handler的buffer開銷過大依然沒有減少,若要減少writer handler的buffer開銷,我們只能減少Reducer的數(shù)量,但是這又會(huì)引入新的問題,下文將會(huì)有詳細(xì)介紹。

講完了shuffle write的進(jìn)化史,接下來要講一下shuffle fetch了,同時(shí)還要講一下Spark的aggregator,這一塊對(duì)于Spark實(shí)際應(yīng)用的性能至關(guān)重要。

Shuffle Fetch and Aggregator

Shuffle write寫出去的數(shù)據(jù)要被Reducer使用,就需要shuffle fetcher將所需的數(shù)據(jù)fetch過來,這里的fetch包括本地和遠(yuǎn)端,因?yàn)閟huffle數(shù)據(jù)有可能一部分是存儲(chǔ)在本地的。Spark對(duì)shuffle fetcher實(shí)現(xiàn)了兩套不同的框架:NIO通過socket連接去fetch數(shù)據(jù);OIO通過netty server去fetch數(shù)據(jù)。分別對(duì)應(yīng)的類是BasicBlockFetcherIteratorNettyBlockFetcherIterator。

在Spark 0.7和更早的版本中,只支持BasicBlockFetcherIterator,而BasicBlockFetcherIterator在shuffle數(shù)據(jù)量比較大的情況下performance始終不是很好,無法充分利用網(wǎng)絡(luò)帶寬,為了解決這個(gè)問題,添加了新的shuffle fetcher來試圖取得更好的性能。對(duì)于早期shuffle性能的評(píng)測(cè)可以參看Spark usergroup。當(dāng)然現(xiàn)在BasicBlockFetcherIterator的性能也已經(jīng)好了很多,使用的時(shí)候可以對(duì)這兩種實(shí)現(xiàn)都進(jìn)行測(cè)試比較。

接下來說一下aggregator。我們都知道在Hadoop MapReduce的shuffle過程中,shuffle fetch過來的數(shù)據(jù)會(huì)進(jìn)行merge sort,使得相同key下的不同value按序歸并到一起供Reducer使用,這個(gè)過程可以參看下圖:

如何進(jìn)行Spark的shuffle實(shí)現(xiàn)

所有的merge sort都是在磁盤上進(jìn)行的,有效地控制了內(nèi)存的使用,但是代價(jià)是更多的磁盤IO。

那么Spark是否也有merge sort呢,還是以別的方式實(shí)現(xiàn),下面我們就細(xì)細(xì)說明。

首先雖然Spark屬于MapReduce體系,但是對(duì)傳統(tǒng)的MapReduce算法進(jìn)行了一定的改變。Spark假定在大多數(shù)用戶的case中,shuffle數(shù)據(jù)的sort不是必須的,比如word count,強(qiáng)制地進(jìn)行排序只會(huì)使性能變差,因此Spark并不在Reducer端做merge sort。既然沒有merge sort那Spark是如何進(jìn)行reduce的呢?這就要說到aggregator了。

aggregator本質(zhì)上是一個(gè)hashmap,它是以map output的key為key,以任意所要combine的類型為value的hashmap。當(dāng)我們?cè)谧鰓ord count reduce計(jì)算count值的時(shí)候,它會(huì)將shuffle fetch到的每一個(gè)key-value pair更新或是插入到hashmap中(若在hashmap中沒有查找到,則插入其中;若查找到則更新value值)。這樣就不需要預(yù)先把所有的key-value進(jìn)行merge sort,而是來一個(gè)處理一個(gè),省下了外部排序這一步驟。但同時(shí)需要注意的是reducer的內(nèi)存必須足以存放這個(gè)partition的所有key和count值,因此對(duì)內(nèi)存有一定的要求。

在上面word count的例子中,因?yàn)関alue會(huì)不斷地更新,而不需要將其全部記錄在內(nèi)存中,因此內(nèi)存的使用還是比較少的??紤]一下如果是group by key這樣的操作,Reducer需要得到key對(duì)應(yīng)的所有value。在Hadoop MapReduce中,由于有了merge sort,因此給予Reducer的數(shù)據(jù)已經(jīng)是group by key了,而Spark沒有這一步,因此需要將key和對(duì)應(yīng)的value全部存放在hashmap中,并將value合并成一個(gè)array。可以想象為了能夠存放所有數(shù)據(jù),用戶必須確保每一個(gè)partition足夠小到內(nèi)存能夠容納,這對(duì)于內(nèi)存是一個(gè)非常嚴(yán)峻的考驗(yàn)。因此Spark文檔中建議用戶涉及到這類操作的時(shí)候盡量增加partition,也就是增加Mapper和Reducer的數(shù)量。

增加Mapper和Reducer的數(shù)量固然可以減小partition的大小,使得內(nèi)存可以容納這個(gè)partition。但是我們?cè)趕huffle write中提到,bucket和對(duì)應(yīng)于bucket的write handler是由Mapper和Reducer的數(shù)量決定的,task越多,bucket就會(huì)增加的更多,由此帶來write handler所需的buffer也會(huì)更多。在一方面我們?yōu)榱藴p少內(nèi)存的使用采取了增加task數(shù)量的策略,另一方面task數(shù)量增多又會(huì)帶來buffer開銷更大的問題,因此陷入了內(nèi)存使用的兩難境地。

為了減少內(nèi)存的使用,只能將aggregator的操作從內(nèi)存移到磁盤上進(jìn)行,Spark社區(qū)也意識(shí)到了Spark在處理數(shù)據(jù)規(guī)模遠(yuǎn)遠(yuǎn)大于內(nèi)存大小時(shí)所帶來的問題。因此PR303提供了外部排序的實(shí)現(xiàn)方案,相信在Spark 0.9 release的時(shí)候,這個(gè)patch應(yīng)該能merge進(jìn)去,到時(shí)候內(nèi)存的使用量可以顯著地減少。

shuffle作為Spark程序中很重要的一個(gè)環(huán)節(jié),直接影響了Spark程序的性能,現(xiàn)如今的Spark版本雖然shuffle實(shí)現(xiàn)還存在著種種問題,但是相比于早期版本,已經(jīng)有了很大的進(jìn)步。開源代碼就是如此不停地迭代推進(jìn),隨著Spark的普及程度越來越高,貢獻(xiàn)的人越來越多,相信后續(xù)的版本會(huì)有更大的提升。

關(guān)于如何進(jìn)行Spark的shuffle實(shí)現(xiàn)就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。


當(dāng)前題目:如何進(jìn)行Spark的shuffle實(shí)現(xiàn)
分享URL:http://weahome.cn/article/jjsdij.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部