這篇文章主要為大家展示了“hadoop中map如何輸出”,內(nèi)容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“hadoop中map如何輸出”這篇文章吧。
成都創(chuàng)新互聯(lián)公司是一家專業(yè)提供欽州企業(yè)網(wǎng)站建設(shè),專注與網(wǎng)站設(shè)計、網(wǎng)站制作、html5、小程序制作等業(yè)務(wù)。10年已為欽州眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)站設(shè)計公司優(yōu)惠進(jìn)行中。Mapper 的輸入官方文檔如下
The Mapper outputs are sorted and then partitioned per Reducer. The total number of partitions is the same as the number of reduce tasks for the job. Users can control which keys (and hence records) go to which Reducer by implementing a custom Partitioner.
mapper的輸出是已經(jīng)排序并且針對每個reducer劃分開的,那么hadoop代碼是如何劃分的,這里將跟從代碼分析。
還是根據(jù)官方示例WordCount的示例
第一次分析為了簡化map的輸出復(fù)雜情況,
只分析一個文檔,并且其中只有10個'單詞',分別為“J", .."c", "b", "a" ( 這里10個字母最好是亂序的,后面會看到其排序),
注釋掉設(shè)置combine class的代碼。
可以追蹤到最終實際是由org.apache.hadoop.mapred.MapTask.MapOutputBuffer.collect(K, V, int)
這里因為我們的output 只有10個Record 且每個大小都比較小,所以跳過了spill了處理以及combine處理,主要代碼如下,
public synchronized void collect(K key, V value, final int partition ) throws IOException {
{
...
keySerializer.serialize(key);
...
valSerializer.serialize(value);
.... kvmeta.put(kvindex + PARTITION, partition); kvmeta.put(kvindex + KEYSTART, keystart);
kvmeta.put(kvindex + VALSTART, valstart);
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend)); ...
}
這里實際是將(K,V) 序列化到了byte數(shù)組org.apache.hadoop.mapred.MapTask.MapOutputBuffer.kvbuffer 中,
并將(K,V)在內(nèi)存中的位置信息 以及 其partition(相同partition的record由同一個reducer處理) 消息 存在 kvmeta 中.
到此map的輸出都存在了內(nèi)存中
可以找到在 org.apache.hadoop.mapred.MapTask.MapOutputBuffer.sortAndSpill() 中找到有使用,設(shè)置斷點,看到如下,
private void sortAndSpill() throws IOException, ClassNotFoundException, InterruptedException { ...
sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
...
for (int i = 0; i < partitions; ++i) {
...
if (combinerRunner == null) {
// spill directly DataInputBuffer key = new DataInputBuffer();
while (spindex < mend &&
kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
....
writer.append(key, value);
++spindex;
}
} ...
spillRec.putIndex(rec, i);
}
...
indexCacheList.add(spillRec);
...}
這里有三個操作,
1. Sorter.sort :是以partition 和key 來排序的,目的是聚合相同partition的record, 并以key的順序排列。
2. writer.append : 將序列化的record 寫入輸出流,這里寫入到文件spill0.out
3. indexCacheList.add : 每個spillRec記錄某個spill out文件中包含的partition信息。
在此設(shè)置斷點,可以看到這里我們只有一個spill文件,不需要merge,
這里只是唯一的spillRec 寫入到到文件中, file.out.index
將spill0.out 重命名為file.out, 可以vim打開這個文件看到里面存在順序號的字符。
private void mergeParts() throws IOException, InterruptedException, ClassNotFoundException {
...
sameVolRename(filename[0],
mapOutputFile.getOutputFileForWriteInVolume(filename[0]));...
indexCacheList.get(0).writeToFile(
mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);
...}
總結(jié)如下:
1. map的輸出首先序列化到內(nèi)存中kvbuffer,kvmeta
2. sortAndSpill 會將內(nèi)存中的record寫入到文件中
3. merge將spill出的文件merge問一個文件file.out,并將每個文件中partition的信息寫入file.out.index
還沒分析的情況:
map 輸出大量數(shù)據(jù),出現(xiàn)多個spill 文件的復(fù)雜情況的細(xì)節(jié)(1. 異步spill, 2. merge 多個文件)
以上是“hadoop中map如何輸出”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。