本篇內(nèi)容介紹了“Shuffle流程是什么”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
創(chuàng)新互聯(lián)堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:成都做網(wǎng)站、網(wǎng)站設(shè)計(jì)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時(shí)代的云巖網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!
1、從WordCountMapper類中的map方法中寫出kv后,進(jìn)入shuffle流程 --context.write(outK,outV); 進(jìn)入TaskInputOutputContext中的write()方法 --看下就過 進(jìn)入WrappedMapper.java中的mapContext.write(key, value);方法 //112行 進(jìn)入TaskInputOutputContextImpl.java 中output.write(key, value);方法 //89行 最終定位到MapTask的write()方法內(nèi), //726行
2、重點(diǎn)步驟,收集器對(duì)象將kv收集到緩沖區(qū),并在收集前將kv的分區(qū)號(hào)計(jì)算出來(lái). collector.collect(key, value,partitioner.getPartition(key, value, partitions)); 第一次進(jìn)入該方法時(shí),因?yàn)闆]有設(shè)置reduce的個(gè)數(shù),所以最終返回的永遠(yuǎn)是0號(hào)分區(qū)
3、定位到MapTask類中的collect方法并進(jìn)入 //1082行 bufferRemaining -= METASIZE; //計(jì)算緩沖區(qū)剩余大小,該行代碼前面的代碼是對(duì)kv類型的一個(gè)判斷 如果bufferRemaining < 0 則開始進(jìn)行溢寫操作,內(nèi)部是對(duì)數(shù)據(jù)的一些校驗(yàn)和計(jì)算
4、定位到startSpill(); --1126行 //只有當(dāng)溢寫數(shù)據(jù)大小滿足80%時(shí),才會(huì)觸發(fā)該操作 WordCountMapper持續(xù)往緩沖區(qū)寫數(shù)據(jù),當(dāng)達(dá)到溢寫條件80%時(shí),開始溢寫
5、進(jìn)入到startSpill()方法內(nèi)部 --MapTask類1590行 spillReady.signal(); //1602行 --線程通信, 通知溢寫線程開始干活 //執(zhí)行溢寫線程(MapTask內(nèi)部類SpillThread)的run方法 //run方法中調(diào)用MapTask$MapOutputBuffer中的sortAndSpill()方法 直接執(zhí)行下面的排序和溢寫方法 --sortAndSpill()方法 --MapTask的1605行
6、定位到1615行 final SpillRecord spillRec = new SpillRecord(partitions); //根據(jù)分區(qū)數(shù)創(chuàng)建溢寫記錄對(duì)象 --排序按照分區(qū)排序,溢寫按照分區(qū)溢寫 final Path filename =mapOutputFile.getSpillFileForWrite(numSpills, size);//獲取溢寫文件名稱 ///tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619 _0001/attempt_local1440922619_0001_m_000000_0/output/(spill0.out),這時(shí)還沒有溢寫文件,只有目錄 out = rfs.create(filename); //創(chuàng)建執(zhí)行改步后,在上述的目錄下生成溢寫文件spill0.out文件
7、繼續(xù)向下走,定位到MapTask類的1625行 sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); //溢寫前排序 8、定位到1629行,進(jìn)入for循環(huán) --按照分區(qū)進(jìn)行溢寫 9、分析for循環(huán)內(nèi)代碼,看具體溢寫過程 9.1 先有一個(gè)writer對(duì)象,通過該對(duì)象來(lái)完成數(shù)據(jù)溢寫 writer = new Writer(job, partitionOut, keyClass, valClass, codec, 9.2 判斷是否有設(shè)置combinerRunner對(duì)象 如果有,則按照設(shè)置的combinerRunner業(yè)務(wù)去處理; 如果沒有,則走默認(rèn)的溢寫規(guī)則 10、執(zhí)行到1667行,即writer.close();方法,本次溢寫完畢,此時(shí)我們?cè)偃タ匆鐚懳募pill0.out文件有數(shù)據(jù)
11、if (totalIndexCacheMemory >= indexCacheMemoryLimit(大小為:1M)) {} //MapTask類的1685行 // 如果索引數(shù)據(jù)超過指定的內(nèi)存大小,也需要溢寫到文件中.(該現(xiàn)象一般情況很難發(fā)生.)
12、當(dāng)本次溢寫完畢之后,繼續(xù)回到WordCountMapper類中的map方法內(nèi)的context.write(outk,outv);方法處 --說明:因?yàn)槲覀兪褂帽镜豥ebug模式調(diào)試,所以看不到并行的效果,只能是串行效果,因此看到的是當(dāng)內(nèi)存內(nèi)讀取滿足 80%時(shí),發(fā)生溢寫操作,其實(shí)溢寫并未停止,只不過我們看不到,剩余的溢寫數(shù)據(jù)在20%內(nèi)存進(jìn)行
13、如上溢寫過程,在整個(gè)mapTask中會(huì)出現(xiàn)N次,具體多少看數(shù)據(jù)量. 如果map中最后的數(shù)據(jù)寫到緩沖區(qū),但是沒有滿足 80%溢寫條件的情況,最終也需要將緩沖區(qū)的數(shù)據(jù)刷寫到磁盤(最后一次溢寫)。 最后一次會(huì)發(fā)生在 MapTask中關(guān)閉 NewOutputCollector對(duì)象的時(shí)候. 即在該行代碼處發(fā)生 output.close(mapperContext); --MapTask的805行 14、進(jìn)入output.close(mapperContext);方法內(nèi) --MapTask的732行 定位到collector.flush();方法 // 735行 -->將緩沖區(qū)的數(shù)據(jù)刷寫到磁盤-->重新走sortAndSpill()方法(最后一次刷寫)
上述流程,每發(fā)生一次溢寫就會(huì)生成一個(gè)溢寫小文件(溢寫文件內(nèi)的數(shù)據(jù)是排好序的) 最終所有的數(shù)據(jù)都寫到磁盤中后,在磁盤上就是多個(gè)溢寫文件, 比如:spill0.out,spill1.out,...spillN.out
15、溢寫全部完成之后,就進(jìn)入歸并操作 --MapTask的1527行 mergeParts();方法,進(jìn)入該方法,定位到MapTask的1844行 filename[0]: /tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local 1440922619_0001/attempt_local1440922619_0001_m_000000_0/output/spill0.out
16、繼續(xù)向下走,定位到MapTask的1880行 Path finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize); --歸并后,最終輸出的文件路徑 /tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_00 01/attempt_local1440922619_0001_m_000000_0/output/file.out 17、繼續(xù)向下走,定位到MapTask的1882行 Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize); --歸并后,最終輸出文件的索引文件 /tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_00 01/attempt_local1440922619_0001_m_000000_0/output/file.out.index 18、創(chuàng)建file.out 文件 FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); 19、for (int parts = 0; parts < partitions; parts++) {} //1925行,按照分區(qū)進(jìn)行歸并排序 20、for循環(huán)內(nèi)具體的歸并操作 //1950行 RawKeyValueIterator kvIter = Merger.merge(job, rfs, keyClass, valClass, codec, segmentList, mergeFactor, new Path(mapId.toString()), job.getOutputKeyComparator(), reporter, sortSegments, null, spilledRecordsCounter, sortPhase.phase(), TaskType.MAP);
21、歸并后的數(shù)據(jù)寫出到文件 Writerwriter = new Writer (job, finalPartitionOut, keyClass, valClass, codec,spilledRecordsCounter); //1961行 //歸并也可以使用combiner,但是前提條件是設(shè)置了combiner,并且溢寫次數(shù)大于等于3 if (combinerRunner == null || numSpills < minSpillsForCombine(3)) { Merger.writeFile(kvIter, writer, reporter, job); } else { combineCollector.setWriter(writer); combinerRunner.combine(kvIter, combineCollector); } 22、歸并完成 writer.close(); //1972行
23、寫出索引文件 spillRec.writeToFile(finalIndexFile, job); //1986行 24、刪除所有的溢寫文件spill0.out spill1.out ... spill0.out,只保留最終的輸出文件。 for(int i = 0; i < numSpills; i++) { rfs.delete(filename[i],true); }
“Shuffle流程是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!