這篇文章給大家介紹Spark2.x中如何用源碼剖析SortShuffleWriter具體實(shí)現(xiàn),內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對(duì)大家能有所幫助。
創(chuàng)新互聯(lián)技術(shù)團(tuán)隊(duì)10余年來(lái)致力于為客戶(hù)提供網(wǎng)站制作、成都網(wǎng)站制作、品牌網(wǎng)站建設(shè)、成都營(yíng)銷(xiāo)網(wǎng)站建設(shè)、搜索引擎SEO優(yōu)化等服務(wù)。經(jīng)過(guò)多年發(fā)展,公司擁有經(jīng)驗(yàn)豐富的技術(shù)團(tuán)隊(duì),先后服務(wù)、推廣了近1000家網(wǎng)站,包括各類(lèi)中小企業(yè)、企事單位、高校等機(jī)構(gòu)單位。
一、概述
這里講解Spark Shuffle Write的第三種實(shí)現(xiàn)SortShuffleWriter,在ShuffleWrite階段,如果不滿(mǎn)足UnsafeShuffleWriter、BypassMergeSortShuffleWriter兩種條件,最后代碼執(zhí)行SortShuffleWriter,這里來(lái)看看他的具體實(shí)現(xiàn):
二、具體實(shí)現(xiàn)
這里直接看Write()函數(shù),代碼如下:
/** Write a bunch of records to this task's output */
override def write(records: Iterator[Product2[K, V]]): Unit = {
// 根據(jù)是否在map端進(jìn)行數(shù)據(jù)合并初始化ExternalSorter
//ExternalSorter初始化對(duì)應(yīng)參數(shù)的含義
// aggregator:在RDD shuffle時(shí),map/reduce-side使用的aggregator
// partitioner:對(duì)shuffle的輸出,使用哪種partitioner對(duì)數(shù)據(jù)做分區(qū),比如hashPartitioner或者rangePartitioner
// ordering:根據(jù)哪個(gè)key做排序
// serializer:使用哪種序列化,如果沒(méi)有顯示指定,默認(rèn)使用spark.serializer參數(shù)值
sorter = if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// 如果沒(méi)有map-side聚合,那么創(chuàng)建sorter對(duì)象時(shí)候,aggregator和ordering將不傳入對(duì)應(yīng)的值
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
//通過(guò)insertAll方法先寫(xiě)數(shù)據(jù)到buffer
sorter.insertAll(records)
// 構(gòu)造最終的輸出文件實(shí)例,其中文件名為(reduceId為0):
// "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId;
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
//在輸出文件名后加上uuid用于標(biāo)識(shí)文件正在寫(xiě)入,結(jié)束后重命名
val tmp = Utils.tempFileWith(output)
try {
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
//將排序后的record寫(xiě)入輸出文件
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
//生成index文件,也就是每個(gè)reduce通過(guò)該index文件得知它哪些是屬于它的數(shù)據(jù)
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
//構(gòu)造MapStatus返回結(jié)果,里面含有ShuffleWriter輸出結(jié)果的位置信息
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
}
}
}
其中ExternalSorter是SortShuffleWriter一個(gè)排序類(lèi),這個(gè)類(lèi)用于對(duì)一些(K, V)類(lèi)型的key-value對(duì)進(jìn)行排序,如果需要就進(jìn)行merge,生的結(jié)果是一些(K, C)類(lèi)型的key-combiner對(duì)。combiner就是對(duì)同樣key的value進(jìn)行合并的結(jié)果。它首先使用一個(gè)Partitioner來(lái)把key分到不同的partition,然后,如果有必要的話(huà),就把每個(gè)partition內(nèi)部的key按照一個(gè)特定的Comparator來(lái)進(jìn)行排序。它可以輸出只一個(gè)分區(qū)了的文件,其中不同的partition位于這個(gè)文件的不同區(qū)域(在字節(jié)層面上每個(gè)分區(qū)是連續(xù)的),這樣就適用于shuffle時(shí)對(duì)數(shù)據(jù)的抓取。
2.這里接著看上面代碼第14行的 sorter.insertAll(records)函數(shù),里面其實(shí)干了很多事情,代碼如下:
def insertAll(records: Iterator[Product2[K, V]]): Unit = { //這里獲取Map是否聚合標(biāo)識(shí) val shouldCombine = aggregator.isDefined //根據(jù)是否進(jìn)行Map端聚合,來(lái)決定使用map還是buffer, // 如果需要通過(guò)key做map-side聚合,則使用PartitionedAppendOnlyMap; // 如果不需要,則使用PartitionedPairBuffer if (shouldCombine) { // 使用AppendOnlyMap優(yōu)先在內(nèi)存中進(jìn)行combine // 獲取aggregator的mergeValue函數(shù),用于merge新的值到聚合記錄 val mergeValue = aggregator.get.mergeValue // 獲取aggregator的createCombiner函數(shù),用于創(chuàng)建聚合的初始值 val createCombiner = aggregator.get.createCombiner var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { //創(chuàng)建update函數(shù),如果有值進(jìn)行mergeValue,如果沒(méi)有則createCombiner if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (records.hasNext) { addElementsRead() kv = records.next() ////通過(guò)key計(jì)算partition ID,通過(guò)partition ID對(duì)數(shù)據(jù)進(jìn)行排序 //這里的partitionID其實(shí)就是Reduce個(gè)數(shù) // 對(duì)key計(jì)算分區(qū),然后開(kāi)始進(jìn)行merge map.changeValue((getPartition(kv._1), kv._1), update) // 如果需要溢寫(xiě)內(nèi)存數(shù)據(jù)到磁盤(pán) maybeSpillCollection(usingMap = true) } } else { // Stick values into our buffer while (records.hasNext) { addElementsRead() val kv = records.next() //通過(guò)key計(jì)算partition ID,通過(guò)partition ID對(duì)數(shù)據(jù)進(jìn)行排序 //這里的partitionID其實(shí)就是Reduce個(gè)數(shù) buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C]) // 當(dāng)buffer達(dá)到內(nèi)存限制時(shí)(buffer默認(rèn)大小32k,由spark.shuffle.file.buffer參數(shù)決定),會(huì)將buffer中的數(shù)據(jù)spill到文件中 maybeSpillCollection(usingMap = false) } } }
3.下面繼續(xù)跟蹤maybeSpillCollection()函數(shù),如何對(duì)內(nèi)存數(shù)據(jù)溢寫(xiě)的,代碼如下:
private def maybeSpillCollection(usingMap: Boolean): Unit = { var estimatedSize = 0L // 如果是map ,也就是Map端需要聚合的情況 if (usingMap) { //這里預(yù)估一個(gè)值,根據(jù)預(yù)估值判斷是否需要溢寫(xiě), // 如果需要,溢寫(xiě)完成后重新初始化一個(gè)map estimatedSize = map.estimateSize() if (maybeSpill(map, estimatedSize)) { map = new PartitionedAppendOnlyMap[K, C] } // 這里執(zhí)行的map不需要聚合的情況 } else { //這里預(yù)估一個(gè)值,根據(jù)預(yù)估值判斷是否需要溢寫(xiě), // 如果需要,溢寫(xiě)完成后重新初始化一個(gè)buffer estimatedSize = buffer.estimateSize() if (maybeSpill(buffer, estimatedSize)) { buffer = new PartitionedPairBuffer[K, C] } } if (estimatedSize > _peakMemoryUsedBytes) { _peakMemoryUsedBytes = estimatedSize } }
4.上面涉及到溢寫(xiě)判斷函數(shù)maybeSpill,我們看下他是如何進(jìn)行判斷的,代碼如下:
// maybeSpill函數(shù)判斷大體分了三步// 1.為當(dāng)前線(xiàn)程嘗試獲取amountToRequest大小的內(nèi)存(amountToRequest = 2 * currentMemory - myMemoryThreshold)。// 2.如果獲得的內(nèi)存依然不足(myMemoryThreshold <= currentMemory),則調(diào)用spill,執(zhí)行溢出操作。內(nèi)存不足可能是申請(qǐng)到的內(nèi)存為0或者已經(jīng)申請(qǐng)得到的內(nèi)存大小超過(guò)了myMemoryThreshold。// 3.溢出后續(xù)處理,如elementsRead歸零,已溢出內(nèi)存字節(jié)數(shù)(memoryBytesSpilled)增加線(xiàn)程當(dāng)前內(nèi)存大小(currentMemory),釋放當(dāng)前線(xiàn)程占用的內(nèi)存。 protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { var shouldSpill = false //其中內(nèi)存閾值myMemoryThreshold 由參數(shù)spark.shuffle.spill.initialMemoryThreshold控制,默認(rèn)是5M if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { // Claim up to double our current memory from the shuffle memory pool val amountToRequest = 2 * currentMemory - myMemoryThreshold //底層調(diào)用TaskMemoryManager的acquireExecutionMemory方法分配內(nèi)存 val granted = acquireMemory(amountToRequest) // 更新現(xiàn)在內(nèi)存閥值 myMemoryThreshold += granted //再次判斷當(dāng)前內(nèi)存是否大于閥值,如果還是大于閥值則spill shouldSpill = currentMemory >= myMemoryThreshold } shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold // Actually spill if (shouldSpill) { _spillCount += 1 logSpillage(currentMemory) //進(jìn)行spill,這了溢寫(xiě)肯定先寫(xiě)到緩沖區(qū),后寫(xiě)到磁盤(pán), //有個(gè)比較重要的參數(shù)spark.shuffle.file.buffer 默認(rèn)32k, 優(yōu)化時(shí)常進(jìn)行調(diào)整 spill(collection) _elementsRead = 0 _memoryBytesSpilled += currentMemory releaseMemory() } shouldSpill }
里面還有更深層次的代碼,這里就不再跟蹤了,只要是了解了整個(gè)大體思路即可,有興趣的自己去跟蹤看下即可。
為方便大家理解,下面給大家畫(huà)了下SorteShuffleWriter執(zhí)行的流程圖,BypassMergeSortShuffleWriter和UnsafeShuffleWriter的處理流程與這個(gè)流程基本一致,只是具體的實(shí)現(xiàn)稍有差異,水平有限,僅供參考:
關(guān)于Spark2.x中如何用源碼剖析SortShuffleWriter具體實(shí)現(xiàn)就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。