真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Spark2.x中如何用源碼剖析SortShuffleWriter具體實(shí)現(xiàn)

這篇文章給大家介紹Spark2.x中如何用源碼剖析SortShuffleWriter具體實(shí)現(xiàn),內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對(duì)大家能有所幫助。

創(chuàng)新互聯(lián)技術(shù)團(tuán)隊(duì)10余年來(lái)致力于為客戶(hù)提供網(wǎng)站制作、成都網(wǎng)站制作、品牌網(wǎng)站建設(shè)成都營(yíng)銷(xiāo)網(wǎng)站建設(shè)、搜索引擎SEO優(yōu)化等服務(wù)。經(jīng)過(guò)多年發(fā)展,公司擁有經(jīng)驗(yàn)豐富的技術(shù)團(tuán)隊(duì),先后服務(wù)、推廣了近1000家網(wǎng)站,包括各類(lèi)中小企業(yè)、企事單位、高校等機(jī)構(gòu)單位。

 一、概述

這里講解Spark Shuffle Write的第三種實(shí)現(xiàn)SortShuffleWriter,在ShuffleWrite階段,如果不滿(mǎn)足UnsafeShuffleWriter、BypassMergeSortShuffleWriter兩種條件,最后代碼執(zhí)行SortShuffleWriter,這里來(lái)看看他的具體實(shí)現(xiàn):

二、具體實(shí)現(xiàn)

    這里直接看Write()函數(shù),代碼如下:

 /** Write a bunch of records to this task's output */  override def write(records: Iterator[Product2[K, V]]): Unit = {    // 根據(jù)是否在map端進(jìn)行數(shù)據(jù)合并初始化ExternalSorter    //ExternalSorter初始化對(duì)應(yīng)參數(shù)的含義    // aggregator:在RDD shuffle時(shí),map/reduce-side使用的aggregator    // partitioner:對(duì)shuffle的輸出,使用哪種partitioner對(duì)數(shù)據(jù)做分區(qū),比如hashPartitioner或者rangePartitioner    // ordering:根據(jù)哪個(gè)key做排序    // serializer:使用哪種序列化,如果沒(méi)有顯示指定,默認(rèn)使用spark.serializer參數(shù)值    sorter = if (dep.mapSideCombine) {      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")      new ExternalSorter[K, V, C](        context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)    } else {      // 如果沒(méi)有map-side聚合,那么創(chuàng)建sorter對(duì)象時(shí)候,aggregator和ordering將不傳入對(duì)應(yīng)的值      new ExternalSorter[K, V, V](        context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)    }    //通過(guò)insertAll方法先寫(xiě)數(shù)據(jù)到buffer    sorter.insertAll(records)
   // 構(gòu)造最終的輸出文件實(shí)例,其中文件名為(reduceId為0):    // "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId;    val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)    //在輸出文件名后加上uuid用于標(biāo)識(shí)文件正在寫(xiě)入,結(jié)束后重命名    val tmp = Utils.tempFileWith(output)
   try {      val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
     //將排序后的record寫(xiě)入輸出文件      val partitionLengths = sorter.writePartitionedFile(blockId, tmp)      //生成index文件,也就是每個(gè)reduce通過(guò)該index文件得知它哪些是屬于它的數(shù)據(jù)      shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)      //構(gòu)造MapStatus返回結(jié)果,里面含有ShuffleWriter輸出結(jié)果的位置信息      mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)    } finally {      if (tmp.exists() && !tmp.delete()) {        logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")      }    }  }      

    其中ExternalSorter是SortShuffleWriter一個(gè)排序類(lèi),這個(gè)類(lèi)用于對(duì)一些(K, V)類(lèi)型的key-value對(duì)進(jìn)行排序,如果需要就進(jìn)行merge,生的結(jié)果是一些(K, C)類(lèi)型的key-combiner對(duì)。combiner就是對(duì)同樣key的value進(jìn)行合并的結(jié)果。它首先使用一個(gè)Partitioner來(lái)把key分到不同的partition,然后,如果有必要的話(huà),就把每個(gè)partition內(nèi)部的key按照一個(gè)特定的Comparator來(lái)進(jìn)行排序。它可以輸出只一個(gè)分區(qū)了的文件,其中不同的partition位于這個(gè)文件的不同區(qū)域(在字節(jié)層面上每個(gè)分區(qū)是連續(xù)的),這樣就適用于shuffle時(shí)對(duì)數(shù)據(jù)的抓取。

2.這里接著看上面代碼第14行的 sorter.insertAll(records)函數(shù),里面其實(shí)干了很多事情,代碼如下:

 def insertAll(records: Iterator[Product2[K, V]]): Unit = {       //這里獲取Map是否聚合標(biāo)識(shí)    val shouldCombine = aggregator.isDefined    //根據(jù)是否進(jìn)行Map端聚合,來(lái)決定使用map還是buffer,    // 如果需要通過(guò)key做map-side聚合,則使用PartitionedAppendOnlyMap;    // 如果不需要,則使用PartitionedPairBuffer    if (shouldCombine) {      // 使用AppendOnlyMap優(yōu)先在內(nèi)存中進(jìn)行combine      // 獲取aggregator的mergeValue函數(shù),用于merge新的值到聚合記錄      val mergeValue = aggregator.get.mergeValue       // 獲取aggregator的createCombiner函數(shù),用于創(chuàng)建聚合的初始值      val createCombiner = aggregator.get.createCombiner      var kv: Product2[K, V] = null      val update = (hadValue: Boolean, oldValue: C) => {      //創(chuàng)建update函數(shù),如果有值進(jìn)行mergeValue,如果沒(méi)有則createCombiner        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)      }      while (records.hasNext) {        addElementsRead()        kv = records.next()        ////通過(guò)key計(jì)算partition ID,通過(guò)partition ID對(duì)數(shù)據(jù)進(jìn)行排序        //這里的partitionID其實(shí)就是Reduce個(gè)數(shù)        // 對(duì)key計(jì)算分區(qū),然后開(kāi)始進(jìn)行merge        map.changeValue((getPartition(kv._1), kv._1), update)         // 如果需要溢寫(xiě)內(nèi)存數(shù)據(jù)到磁盤(pán)        maybeSpillCollection(usingMap = true)      }    } else {      // Stick values into our buffer      while (records.hasNext) {        addElementsRead()        val kv = records.next()        //通過(guò)key計(jì)算partition ID,通過(guò)partition ID對(duì)數(shù)據(jù)進(jìn)行排序        //這里的partitionID其實(shí)就是Reduce個(gè)數(shù)        buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])        // 當(dāng)buffer達(dá)到內(nèi)存限制時(shí)(buffer默認(rèn)大小32k,由spark.shuffle.file.buffer參數(shù)決定),會(huì)將buffer中的數(shù)據(jù)spill到文件中        maybeSpillCollection(usingMap = false)      }    }  }

3.下面繼續(xù)跟蹤maybeSpillCollection()函數(shù),如何對(duì)內(nèi)存數(shù)據(jù)溢寫(xiě)的,代碼如下:

private def maybeSpillCollection(usingMap: Boolean): Unit = {    var estimatedSize = 0L   // 如果是map ,也就是Map端需要聚合的情況    if (usingMap) {      //這里預(yù)估一個(gè)值,根據(jù)預(yù)估值判斷是否需要溢寫(xiě),      // 如果需要,溢寫(xiě)完成后重新初始化一個(gè)map      estimatedSize = map.estimateSize()      if (maybeSpill(map, estimatedSize)) {        map = new PartitionedAppendOnlyMap[K, C]      }     // 這里執(zhí)行的map不需要聚合的情況    } else {      //這里預(yù)估一個(gè)值,根據(jù)預(yù)估值判斷是否需要溢寫(xiě),      // 如果需要,溢寫(xiě)完成后重新初始化一個(gè)buffer       estimatedSize = buffer.estimateSize()      if (maybeSpill(buffer, estimatedSize)) {        buffer = new PartitionedPairBuffer[K, C]      }    }    if (estimatedSize > _peakMemoryUsedBytes) {      _peakMemoryUsedBytes = estimatedSize    }  }

4.上面涉及到溢寫(xiě)判斷函數(shù)maybeSpill,我們看下他是如何進(jìn)行判斷的,代碼如下:

// maybeSpill函數(shù)判斷大體分了三步// 1.為當(dāng)前線(xiàn)程嘗試獲取amountToRequest大小的內(nèi)存(amountToRequest = 2 * currentMemory - myMemoryThreshold)。// 2.如果獲得的內(nèi)存依然不足(myMemoryThreshold <= currentMemory),則調(diào)用spill,執(zhí)行溢出操作。內(nèi)存不足可能是申請(qǐng)到的內(nèi)存為0或者已經(jīng)申請(qǐng)得到的內(nèi)存大小超過(guò)了myMemoryThreshold。// 3.溢出后續(xù)處理,如elementsRead歸零,已溢出內(nèi)存字節(jié)數(shù)(memoryBytesSpilled)增加線(xiàn)程當(dāng)前內(nèi)存大小(currentMemory),釋放當(dāng)前線(xiàn)程占用的內(nèi)存。 protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {    var shouldSpill = false    //其中內(nèi)存閾值myMemoryThreshold  由參數(shù)spark.shuffle.spill.initialMemoryThreshold控制,默認(rèn)是5M    if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {      // Claim up to double our current memory from the shuffle memory pool      val amountToRequest = 2 * currentMemory - myMemoryThreshold      //底層調(diào)用TaskMemoryManager的acquireExecutionMemory方法分配內(nèi)存      val granted = acquireMemory(amountToRequest)      // 更新現(xiàn)在內(nèi)存閥值      myMemoryThreshold += granted     //再次判斷當(dāng)前內(nèi)存是否大于閥值,如果還是大于閥值則spill      shouldSpill = currentMemory >= myMemoryThreshold    }    shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold    // Actually spill    if (shouldSpill) {      _spillCount += 1      logSpillage(currentMemory)      //進(jìn)行spill,這了溢寫(xiě)肯定先寫(xiě)到緩沖區(qū),后寫(xiě)到磁盤(pán),      //有個(gè)比較重要的參數(shù)spark.shuffle.file.buffer  默認(rèn)32k, 優(yōu)化時(shí)常進(jìn)行調(diào)整      spill(collection)      _elementsRead = 0      _memoryBytesSpilled += currentMemory      releaseMemory()    }    shouldSpill  }

    里面還有更深層次的代碼,這里就不再跟蹤了,只要是了解了整個(gè)大體思路即可,有興趣的自己去跟蹤看下即可。

   為方便大家理解,下面給大家畫(huà)了下SorteShuffleWriter執(zhí)行的流程圖,BypassMergeSortShuffleWriter和UnsafeShuffleWriter的處理流程與這個(gè)流程基本一致,只是具體的實(shí)現(xiàn)稍有差異,水平有限,僅供參考:

Spark2.x中如何用源碼剖析SortShuffleWriter具體實(shí)現(xiàn)

  

關(guān)于Spark2.x中如何用源碼剖析SortShuffleWriter具體實(shí)現(xiàn)就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。


新聞標(biāo)題:Spark2.x中如何用源碼剖析SortShuffleWriter具體實(shí)現(xiàn)
文章起源:http://weahome.cn/article/iehicd.html

其他資訊

在線(xiàn)咨詢(xún)

微信咨詢(xún)

電話(huà)咨詢(xún)

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部