真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Shuffle流程是什么

本篇內(nèi)容介紹了“Shuffle流程是什么”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

創(chuàng)新互聯(lián)堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:成都做網(wǎng)站、網(wǎng)站設(shè)計(jì)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時(shí)代的云巖網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!

shuffle流程源碼解讀

1、從WordCountMapper類中的map方法中寫出kv后,進(jìn)入shuffle流程	--context.write(outK,outV);
進(jìn)入TaskInputOutputContext中的write()方法			--看下就過
進(jìn)入WrappedMapper.java中的mapContext.write(key, value);方法	//112行
進(jìn)入TaskInputOutputContextImpl.java 中output.write(key, value);方法 	//89行
最終定位到MapTask的write()方法內(nèi),	//726行
2、重點(diǎn)步驟,收集器對(duì)象將kv收集到緩沖區(qū),并在收集前將kv的分區(qū)號(hào)計(jì)算出來(lái).
collector.collect(key, value,partitioner.getPartition(key, value, partitions));
第一次進(jìn)入該方法時(shí),因?yàn)闆]有設(shè)置reduce的個(gè)數(shù),所以最終返回的永遠(yuǎn)是0號(hào)分區(qū)
3、定位到MapTask類中的collect方法并進(jìn)入    		//1082行
bufferRemaining -= METASIZE;	//計(jì)算緩沖區(qū)剩余大小,該行代碼前面的代碼是對(duì)kv類型的一個(gè)判斷
如果bufferRemaining < 0 則開始進(jìn)行溢寫操作,內(nèi)部是對(duì)數(shù)據(jù)的一些校驗(yàn)和計(jì)算
4、定位到startSpill(); --1126行  	//只有當(dāng)溢寫數(shù)據(jù)大小滿足80%時(shí),才會(huì)觸發(fā)該操作
WordCountMapper持續(xù)往緩沖區(qū)寫數(shù)據(jù),當(dāng)達(dá)到溢寫條件80%時(shí),開始溢寫
5、進(jìn)入到startSpill()方法內(nèi)部		--MapTask類1590行
spillReady.signal(); //1602行  		--線程通信, 通知溢寫線程開始干活
//執(zhí)行溢寫線程(MapTask內(nèi)部類SpillThread)的run方法
//run方法中調(diào)用MapTask$MapOutputBuffer中的sortAndSpill()方法
直接執(zhí)行下面的排序和溢寫方法		--sortAndSpill()方法  	--MapTask的1605行
6、定位到1615行
final SpillRecord spillRec = new SpillRecord(partitions); //根據(jù)分區(qū)數(shù)創(chuàng)建溢寫記錄對(duì)象
--排序按照分區(qū)排序,溢寫按照分區(qū)溢寫

final Path filename =mapOutputFile.getSpillFileForWrite(numSpills, size);//獲取溢寫文件名稱
 ///tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619
 _0001/attempt_local1440922619_0001_m_000000_0/output/(spill0.out),這時(shí)還沒有溢寫文件,只有目錄

out = rfs.create(filename);		//創(chuàng)建執(zhí)行改步后,在上述的目錄下生成溢寫文件spill0.out文件

Shuffle流程是什么

7、繼續(xù)向下走,定位到MapTask類的1625行
sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);		//溢寫前排序

8、定位到1629行,進(jìn)入for循環(huán)	--按照分區(qū)進(jìn)行溢寫

9、分析for循環(huán)內(nèi)代碼,看具體溢寫過程
	9.1 先有一個(gè)writer對(duì)象,通過該對(duì)象來(lái)完成數(shù)據(jù)溢寫
		writer = new Writer(job, partitionOut, keyClass, valClass, codec,
	9.2 判斷是否有設(shè)置combinerRunner對(duì)象
		如果有,則按照設(shè)置的combinerRunner業(yè)務(wù)去處理;
		如果沒有,則走默認(rèn)的溢寫規(guī)則

10、執(zhí)行到1667行,即writer.close();方法,本次溢寫完畢,此時(shí)我們?cè)偃タ匆鐚懳募pill0.out文件有數(shù)據(jù)

Shuffle流程是什么

11、if (totalIndexCacheMemory >= indexCacheMemoryLimit(大小為:1M)) {}	//MapTask類的1685行
// 如果索引數(shù)據(jù)超過指定的內(nèi)存大小,也需要溢寫到文件中.(該現(xiàn)象一般情況很難發(fā)生.)
12、當(dāng)本次溢寫完畢之后,繼續(xù)回到WordCountMapper類中的map方法內(nèi)的context.write(outk,outv);方法處

--說明:因?yàn)槲覀兪褂帽镜豥ebug模式調(diào)試,所以看不到并行的效果,只能是串行效果,因此看到的是當(dāng)內(nèi)存內(nèi)讀取滿足
80%時(shí),發(fā)生溢寫操作,其實(shí)溢寫并未停止,只不過我們看不到,剩余的溢寫數(shù)據(jù)在20%內(nèi)存進(jìn)行
13、如上溢寫過程,在整個(gè)mapTask中會(huì)出現(xiàn)N次,具體多少看數(shù)據(jù)量. 如果map中最后的數(shù)據(jù)寫到緩沖區(qū),但是沒有滿足
80%溢寫條件的情況,最終也需要將緩沖區(qū)的數(shù)據(jù)刷寫到磁盤(最后一次溢寫)。

最后一次會(huì)發(fā)生在 MapTask中關(guān)閉 NewOutputCollector對(duì)象的時(shí)候.
即在該行代碼處發(fā)生    output.close(mapperContext);	--MapTask的805行

14、進(jìn)入output.close(mapperContext);方法內(nèi)	--MapTask的732行
定位到collector.flush();方法 // 735行
-->將緩沖區(qū)的數(shù)據(jù)刷寫到磁盤-->重新走sortAndSpill()方法(最后一次刷寫)

Shuffle流程是什么 Shuffle流程是什么

上述流程,每發(fā)生一次溢寫就會(huì)生成一個(gè)溢寫小文件(溢寫文件內(nèi)的數(shù)據(jù)是排好序的)
最終所有的數(shù)據(jù)都寫到磁盤中后,在磁盤上就是多個(gè)溢寫文件, 比如:spill0.out,spill1.out,...spillN.out
15、溢寫全部完成之后,就進(jìn)入歸并操作		--MapTask的1527行
mergeParts();方法,進(jìn)入該方法,定位到MapTask的1844行
filename[0]: /tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local
1440922619_0001/attempt_local1440922619_0001_m_000000_0/output/spill0.out

Shuffle流程是什么

16、繼續(xù)向下走,定位到MapTask的1880行
Path finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize);
   --歸并后,最終輸出的文件路徑
/tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_00
01/attempt_local1440922619_0001_m_000000_0/output/file.out

17、繼續(xù)向下走,定位到MapTask的1882行
Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
   --歸并后,最終輸出文件的索引文件
/tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_00
01/attempt_local1440922619_0001_m_000000_0/output/file.out.index

18、創(chuàng)建file.out 文件
	FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);

19、for (int parts = 0; parts < partitions; parts++) {}	//1925行,按照分區(qū)進(jìn)行歸并排序

20、for循環(huán)內(nèi)具體的歸并操作	//1950行
	RawKeyValueIterator kvIter = Merger.merge(job, rfs,
                         keyClass, valClass, codec,
                         segmentList, mergeFactor,
                         new Path(mapId.toString()),
                         job.getOutputKeyComparator(), reporter, sortSegments,
                         null, spilledRecordsCounter, sortPhase.phase(),
                         TaskType.MAP);
21、歸并后的數(shù)據(jù)寫出到文件
Writer writer = new Writer(job, finalPartitionOut, 
keyClass, valClass, codec,spilledRecordsCounter); //1961行

//歸并也可以使用combiner,但是前提條件是設(shè)置了combiner,并且溢寫次數(shù)大于等于3 
if (combinerRunner == null || numSpills < minSpillsForCombine(3)) {
      Merger.writeFile(kvIter, writer, reporter, job);
} else {
      combineCollector.setWriter(writer);
      combinerRunner.combine(kvIter, combineCollector);
}

22、歸并完成
writer.close();		//1972行

Shuffle流程是什么

23、寫出索引文件
spillRec.writeToFile(finalIndexFile, job);	//1986行

24、刪除所有的溢寫文件spill0.out spill1.out ... spill0.out,只保留最終的輸出文件。
for(int i = 0; i < numSpills; i++) {
       rfs.delete(filename[i],true);
}

Shuffle流程是什么

“Shuffle流程是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!


網(wǎng)站標(biāo)題:Shuffle流程是什么
文章出自:http://weahome.cn/article/pcjedo.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部