Mapreduce中由于sort的存在,MapTask和ReduceTask直接是工作流的架構(gòu)。而不是數(shù)據(jù)流的架構(gòu)。在MapTask尚未結(jié)束,其輸出結(jié)果尚未排序及合并前,ReduceTask是又有數(shù)據(jù)輸入的,因此即使ReduceTask已經(jīng)創(chuàng)建也只能睡眠等待MapTask完成。從而可以從MapTask節(jié)點獲取數(shù)據(jù)。一個MapTask最終的數(shù)據(jù)輸出是一個合并的spill文件,可以通過Web地址訪問。所以reduceTask一般在MapTask快要完成的時候才啟動。啟動早了浪費container資源。
創(chuàng)新互聯(lián)是一家集網(wǎng)站建設(shè),灞橋企業(yè)網(wǎng)站建設(shè),灞橋品牌網(wǎng)站建設(shè),網(wǎng)站定制,灞橋網(wǎng)站建設(shè)報價,網(wǎng)絡(luò)營銷,網(wǎng)絡(luò)優(yōu)化,灞橋網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強企業(yè)競爭力??沙浞譂M足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時我們時刻保持專業(yè)、時尚、前沿,時刻以成就客戶成長自我,堅持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實用型網(wǎng)站。
ReduceTask是個線程,這個線程運行在YarnChild的Java虛擬機上,我們從ReduceTask.run開始看Reduce階段。 獲取更多大數(shù)據(jù)視頻資料請加QQ群:947967114
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
if (isMapOrReduce()) {
/添加reduce過程需要經(jīng)過的幾個階段。以便通知TaskTracker目前運 行的情況/
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
// start thread that will handle communication with parent
TaskReporter reporter = startReporter(umbilical);
// 設(shè)置并啟動reporter進程以便和TaskTracker進行交流
boolean useNewApi = job.getUseNewReducer();
//在job client中初始化job時,默認就是用新的API,詳見Job.setUseNewAPI()方法
initialize(job, getJobID(), reporter, useNewApi);
/用來初始化任務(wù),主要是進行一些和任務(wù)輸出相關(guān)的設(shè)置,比如創(chuàng)建commiter,設(shè)置工作目錄等/
// check if it is a cleanupJobTask
/以下4個if語句均是根據(jù)任務(wù)類型的不同進行相應(yīng)的操作,這些方 法均是Task類的方法,所以與任務(wù)是MapTask還是ReduceTask無關(guān)/
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;//只是為了JobCleanup,做完就停
}
if () {
runJobSetupTask(umbilical, reporter);
return;
//主要是創(chuàng)建工作目錄的FileSystem對象
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
//設(shè)置任務(wù)目前所處的階段為結(jié)束階段,并且刪除工作目錄
}
下面才是真正要成為reducer
// Initialize the codec
codec = initCodec();
RawKeyValueIterator rIter = null;
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
Class combinerClass = conf.getCombinerClass();
CombineOutputCollector combineCollector =
(null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
//如果需要就創(chuàng)建combineCollector
Classextends ShuffleConsumerPlugin> clazz =
job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
//配置文件找mapreduce.job.reduce.shuffle.consumer.plugin.class默認是shuffle.class
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
//創(chuàng)建shuffle類對象
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
ShuffleConsumerPlugin.Context shuffleContext =
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
shuffledMapsCounter,
reduceShuffleBytes, failedShuffleCounter,
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile, localMapFiles);
//創(chuàng)建context對象,ShuffleConsumerPlugin.Context
shuffleConsumerPlugin.init(shuffleContext);
//這里調(diào)用的起始是shuffle的init函數(shù),重點摘要如下。
this.localMapFiles = context.getLocalMapFiles();
scheduler = new ShuffleSchedulerImpl(jobConf, taskStatus, reduceId,
this, copyPhase, context.getShuffledMapsCounter(),
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
//創(chuàng)建shuffle所需的調(diào)度器
merger = createMergeManager(context);
//創(chuàng)建shuffle內(nèi)部的merge,createMergeManager里面源碼:
return new MergeManagerImpl(reduceId, jobConf, context.getLocalFS(),
context.getLocalDirAllocator(), reporter, context.getCodec(),
context.getCombinerClass(), context.getCombineCollector(),
context.getSpilledRecordsCounter(),
context.getReduceCombineInputCounter(),
context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
context.getMapOutputFile());
//創(chuàng)建MergeMnagerImpl對象和Merge線程
rIter = shuffleConsumerPlugin.run();
//從各個Mapper復(fù)制其輸出文件,并加以合并排序,等待直到完成為止
// free up the data structures
mapOutputFilesOnDisk.clear();
sortPhase.complete();
//排序階段完成
setPhase(TaskStatus.Phase.REDUCE);
//進入reduce階段
statusUpdate(umbilical);
Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
RawComparator comparator = job.getOutputValueGroupingComparator();
//3.Reduce 1.Reduce任務(wù)的最后一個階段。它會準備好Map的 keyClass("mapred.output.key.class""mapred.mapoutput.key.class"),valueClass("mapred.mapoutput.value.class"或"mapred.output.value.class")和 Comparator (“mapred.output.value.groupfn.class”或“mapred.output.key.comparator.class”)
if (useNewApi) {
//2.根據(jù)參數(shù)useNewAPI判斷執(zhí)行runNewReduce還是runOldReduce。分析潤runNewReduce
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
//0.像報告進程書寫一些信息,1.獲得一個TaskAttemptContext對象。通過這個對象創(chuàng)建reduce、output及用于跟蹤的統(tǒng)計output的RecordWrit、最后創(chuàng)建用于收集reduce結(jié)果的Context,2.reducer.run(reducerContext)開始執(zhí)行reduce
} else {//老API
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
shuffleConsumerPlugin.close();
done(umbilical, reporter);
}
(1)reduce分為三個階段(copy就是遠程拷貝Map的輸出數(shù)據(jù)、sort就是對所有的數(shù)據(jù)做排序、reduce做聚集就是我們自己寫的reducer),為這三個階段分別設(shè)置Progress,用來和TaskTracker通信報道狀態(tài)。
(2)上面代碼的15-40行和MapReduce的MapTask任務(wù)的運行源碼級分析中對應(yīng)部分基本相同,可參考之;
(3)codec = initCodec()這句是檢查map的輸出是否是壓縮的,壓縮的則返回壓縮codec實例,否則返回null,這里討論不壓縮的;
(4)我們討論完全分布式的hadoop,即isLocal==false,然后構(gòu)造一個ReduceCopier對象reduceCopier,并調(diào)用reduceCopier.fetchOutputs()方法拷貝各個Mapper的輸出,到本地;
(5)然后copy階段完成,設(shè)置接下來的階段是sort階段,更新狀態(tài)信息;
(6)根據(jù)isLocal來選擇KV迭代器,完全分布式的會使用reduceCopier.createKVIterator(job, rfs, reporter)作為KV迭代器;
(7)sort階段完成,設(shè)置接下來的階段是reduce階段,更新狀態(tài)信息;
(8)然后獲取一些配置信息,并根據(jù)是否使用新API選擇不同的處理方式,這里是新的API,調(diào)用runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass)會執(zhí)行reducer;
(9)done(umbilical, reporter)這個方法用于做結(jié)束任務(wù)的一些清理工作:更新計數(shù)器updateCounters();如果任務(wù)需要提交,設(shè)置Taks狀態(tài)為COMMIT_PENDING,并利用TaskUmbilicalProtocol,匯報Task完成,等待提交,然后調(diào)用commit提交任務(wù);設(shè)置任務(wù)結(jié)束標志位;結(jié)束Reporter通信線程;發(fā)送最后一次統(tǒng)計報告(通過sendLastUpdate方法);利用TaskUmbilicalProtocol報告結(jié)束狀態(tài)(通過sendDone方法)。
有些人將Reduce Task分為了5個階段:一、shuffle階段:也稱為Copy階段,就是從各個MapTask上遠程拷貝一片數(shù)據(jù),如果大小超過一定閾值就寫到磁盤,否則放入內(nèi)存;二、Merge階段:在遠程拷貝數(shù)據(jù)的同時,Reduce Task啟動了兩個后臺線程對內(nèi)存和磁盤上的文件進行合并,防止內(nèi)存使用過多和磁盤文件過多;三、sort階段:用戶編寫的reduce方法的輸入數(shù)據(jù)是按key進行聚集的,需要對copy過來的數(shù)據(jù)排序,這里用的是歸并排序,因為Map Task的結(jié)果是有序的;四、Reduce階段:將每組數(shù)據(jù)依次交給用戶編寫的Reduce方法處理;五、write階段:就是將結(jié)果寫入HDFS。
上面的5個階段分的比較細了,代碼里分為3個階段copy、sort、reduce,我們在eclipse運行MR程序時,控制臺看到的reduce階段的百分比就分為3個階段各占33.3%。
這里的shuffleConsumerPlugin是實現(xiàn)了ShuffleConsumerPlugin的某個類對象。具體可以通過配置文件mapreduce.job.reduce.shuffle.consumer.plugin.class選項設(shè)置,默認情況下是使用shuffle。我們在代碼中分析過完成shuffleConsumerPlugin.run,通常是shuffle.run,因為有了這個過程Mapper的合成的spill文件才能通過HTTP協(xié)議傳輸?shù)絉educer端。有了數(shù)據(jù)才能進行runNewReducer或者runOldReducer??梢哉fshuffle對象就是MapTask的搬運工。而且shuffle的搬運方式不是一遍搬運一遍Reducer處理,而是要把MapTask所有的數(shù)據(jù)都搬運過來,并且進行合并排序之后才開始提供給對應(yīng)的Reducer。
一般而言,MapTask和ReduceTask是多對多的關(guān)系,假如有M個Mapper有N個Reducer。我們知道N個Reducer對應(yīng)著N個partition,所以每個Mapper都會被劃分成N個Partition,每個Reducer承擔(dān)著一個Partition部分的操作。這樣每一個Reducer從每個不同的Mapper內(nèi)拿來屬于自己的那部分數(shù)據(jù),這樣每個Reducer就有M份不同Mapper的數(shù)據(jù),把M份數(shù)據(jù)合并在一起就是一個最終完整的Partition,有必要還會進行排序,這時候才成為了Reducer的具體輸入數(shù)據(jù)。這個數(shù)據(jù)搬運和重組的過程被叫做shuffle過程。shuffle這個過程開銷頗大,會占用較大的網(wǎng)絡(luò)流量,因為涉及到大量數(shù)據(jù)的傳輸,shuffle過程也會有延遲,因為M個Mapper的計算有快有慢,但是shuffle要所有的Mapper完成才能開始,Reduce又必須等shuffle完成才能開始,當(dāng)然這種延遲不是shuffle造成的,如果Reducer不需要全部Partition數(shù)據(jù)到位并排序,就不用與最慢的Mapper同步,這是排序付出的代價。
所以shuffle在MapReduce框架中起著非常重要的作用。我們先看shuffle的摘要: 獲取更多大數(shù)據(jù)視頻資料請加QQ群:947967114
public class Shuffle implements ShuffleConsumerPlugin, ExceptionReporter
private ShuffleConsumerPlugin.Context context;
private TaskAttemptID reduceId;
private JobConf jobConf;
private TaskUmbilicalProtocol umbilical;
private ShuffleSchedulerImpl scheduler;
private MergeManager merger;
private Task reduceTask; //Used for status updates
private Map localMapFiles;
public void init(ShuffleConsumerPlugin.Context context)
public RawKeyValueIterator run() throws IOException, InterruptedException
在ReduceTask.run中看到調(diào)用了shuffle.init,在run理創(chuàng)建了ShuffleSchedulerImpl和MergeManagerImpl對象。后面會講解就是是做什么用的。
之后就是對shuffle.run的調(diào)用,shuffle雖然有一個run但是并非是一個線程,只是用了這個名字而已。
我們看:ReduceTask.run->Shuffle.run
public RawKeyValueIterator run() throws IOException, InterruptedException {
int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
// Start the map-completion events fetcher thread
final EventFetcher eventFetcher =
new EventFetcher(reduceId, umbilical, scheduler, this,
maxEventsToFetch);
eventFetcher.start();
//通過查看EventFetcher我們看到他繼承了Thread,所以他是一個線程
// Start the map-output fetcher threads
boolean isLocal = localMapFiles != null;
final int numFetchers = isLocal ? 1 :
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher[] fetchers = new Fetcher[numFetchers];
//創(chuàng)建了一個線程池
if (isLocal) {
//如果Mapper和Reducer在同一臺機器上,就在本地fetche
fetchers[0] = new LocalFetcher(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
//LocalFetcher是對Fetcher的擴展,也是線程。
fetchers[0].start();//本地Fecher只有一個
} else {
//Mapper集合Reducer不在同一個機器上,需要跨多個節(jié)點Fecher
for (int i=0; i < numFetchers; ++i) {
//啟動所有的Fecher
fetchers[i] = new Fetcher(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
//創(chuàng)建Fecher線程
fetchers[i].start();
//跨節(jié)點的Fecher需要好多個,都需要開啟
}
}
// Wait for shuffle to complete successfully
while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
reporter.progress();
//等待所有的Fecher都完成,如果有超時情況就報告進度
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
}
// Stop the event-fetcher thread
eventFetcher.shutDown();
//關(guān)閉eventFetcher,代表shuffle操作完成,所有的MapTask的數(shù)據(jù)都拷貝過來了
// Stop the map-output fetcher threads
for (Fetcher fetcher : fetchers) {
fetcher.shutDown();//關(guān)閉所有的fetcher。
}
// stop the scheduler
scheduler.close();
//也不需要shuffle的調(diào)度,所以關(guān)閉
copyPhase.complete(); // copy is already complete
//文件復(fù)制階段結(jié)束
以下就是Reduce階段的MergeSort了
taskStatus.setPhase(TaskStatus.Phase.SORT);
//完成排序
reduceTask.statusUpdate(umbilical);
//通過umbilical向MRAppMaster匯報,更新狀態(tài)
// Finish the on-going merges...
RawKeyValueIterator kvIter = null;
try {
kvIter = merger.close();
//合并和排序,完成后返回一個隊列kvIter 。
} catch (Throwable e) {
throw new ShuffleError("Error while doing final merge " , e);
}
// Sanity check
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
return kvIter;
}
數(shù)據(jù)從MapTask轉(zhuǎn)移到ReduceTask就兩種方式,一MapTask送,二ReduceTask取,hadoop采用的是第二種方式,就是文件的復(fù)制。在Shuffle進入run之前,RduceTask.run調(diào)用過他的init函數(shù)shuffleConsumerPlugin.init(shuffleContext),在init里創(chuàng)建了scheduler和用于合并排序的merge,進入run后又創(chuàng)建了EventFetcher線程和若干個Fetcher線程。Fetcher的作用就是拿取,向MapTask節(jié)點提取數(shù)據(jù)。但是我們要清楚EventFetcher雖然也是Fetcher,但是提取的是event,不是數(shù)據(jù)本身。我們可以認為它只是對Fetcher過程的一個事件的控制。
Fetcher線程的數(shù)量也不一定,Uber模式下,MapTask和ReduceTask在同一個節(jié)點上,并且只有一個MapTask,所以只有一個Fetcher就能夠完成,而且這個Fetcher是localFetcher。如果不是Uber模式可能會有很多MapTask并且一般和ReduceTask不在同一個節(jié)點。這時Fetcher的數(shù)量可以進行配置,默認有5個。數(shù)組fetchers就相當(dāng)于Fetcher的線程池。
創(chuàng)建了EventFetcher和Fetcher線程池后,進入了while循環(huán),但是while循環(huán)什么都不做,一直等待,所以實際的操作都是在線程完成的,也就是通過EventFetcher和若干的Fetcher完成。EventFetcher起到了非常關(guān)鍵的樞紐的作用。
我們查看EventFetcher的源代碼摘要,我們提取關(guān)鍵的東西:
class EventFetcher extends Thread {
private final TaskAttemptID reduce;
private final TaskUmbilicalProtocol umbilical;
private final ShuffleScheduler scheduler;
private final int maxEventsToFetch;
public void run() {
int failures = 0;
LOG.info(reduce + " Thread started: " + getName());
try {
while (!stopped && !Thread.currentThread().isInterrupted()) {//線程沒有被打斷
try {
int numNewMaps = getMapCompletionEvents();
//獲取Map的完成的事件,接著我們看getMapCompletionEvents源代碼:
protected int getMapCompletionEvents()
throws IOException, InterruptedException {
int numNewMaps = 0;
TaskCompletionEvent events[] = null;
do {
MapTaskCompletionEventsUpdate update =
umbilical.getMapCompletionEvents(
(org.apache.hadoop.mapred.JobID)reduce.getJobID(),
fromEventIdx,
maxEventsToFetch,
(org.apache.hadoop.mapred.TaskAttemptID)reduce);
//匯報umbilical從MRAppMaster獲取Map完成的時間的報告
events = update.getMapTaskCompletionEvents();
//獲取有關(guān)具體的MapTask結(jié)束運行的情況
LOG.debug("Got " + events.length + " map completion events from " +
fromEventIdx);
assert !update.shouldReset() : "Unexpected legacy state";
//做了一個斷言 獲取更多大數(shù)據(jù)視頻資料請加QQ群:947967114
// Update the last seen event ID
fromEventIdx += events.length;
// Process the TaskCompletionEvents:
// 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
// 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
// fetching from those maps.
// 3. Remove TIPFAILED maps from neededOutputs since we don't need their
// outputs at all.
for (TaskCompletionEvent event : events) {
//對于獲取的每個事件的報告
scheduler.resolve(event);
//這里使用了ShuffleSchedullerImpl.resolve函數(shù),源代碼如下:
public void resolve(TaskCompletionEvent event) {
switch (event.getTaskStatus()) {
case SUCCEEDED://如果成功
URI u = getBaseURI(reduceId, event.getTaskTrackerHttp());//獲取其URI
addKnownMapOutput(u.getHost() + ":" + u.getPort(),
u.toString(),
event.getTaskAttemptId());
//記錄這個MapTask的節(jié)點主機記錄下來,供Fetcher使用,getBaseURI的源代碼:
static URI getBaseURI(TaskAttemptID reduceId, String url) {
StringBuffer baseUrl = new StringBuffer(url);
if (!url.endsWith("/")) {
baseUrl.append("/");
}
baseUrl.append("mapOutput?job=");
baseUrl.append(reduceId.getJobID());
baseUrl.append("&reduce=");
baseUrl.append(reduceId.getTaskID().getId());
baseUrl.append("&map=");
URI u = URI.create(baseUrl.toString());
return u;
獲取各種信息,然后添加都URI對象中。
}
回到源代碼
maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime());
//最大的嘗試時間
break;
case FAILED:
case KILLED:
case OBSOLETE://如果MapTask運行失敗
obsoleteMapOutput(event.getTaskAttemptId());//獲取TaskId
LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
" map-task: '" + event.getTaskAttemptId() + "'");//寫日志
break;
case TIPFAILED://如果失敗
tipFailed(event.getTaskAttemptId().getTaskID());
LOG.info("Ignoring output of failed map TIP: '" +
event.getTaskAttemptId() + "'");//寫日志
break;
}
}
回到源代碼
if (TaskCompletionEvent.Status.SUCCEEDED == event.getTaskStatus()) {//如果事件成功
++numNewMaps;//增加map數(shù)量
}
}
} while (events.length == maxEventsToFetch);
return numNewMaps;
}
回到源代碼
failures = 0;
if (numNewMaps > 0) {
LOG.info(reduce + ": " + "Got " + numNewMaps + " new map-outputs");
}
LOG.debug("GetMapEventsThread about to sleep for " + SLEEP_TIME);
if (!Thread.currentThread().isInterrupted()) {
Thread.sleep(SLEEP_TIME);
}
} catch (InterruptedException e) {
LOG.info("EventFetcher is interrupted.. Returning");
return;
} catch (IOException ie) {
LOG.info("Exception in getting events", ie);
// check to see whether to abort
if (++failures >= MAX_RETRIES) {
throw new IOException("too many failures downloading events", ie);//失敗數(shù)量大于重試的數(shù)量
}
// sleep for a bit
if (!Thread.currentThread().isInterrupted()) {
Thread.sleep(RETRY_PERIOD);
}
}
}
} catch (InterruptedException e) {
return;
} catch (Throwable t) {
exceptionReporter.reportException(t);
return;
}
}
MapTask和ReduceTask沒有直接的關(guān)系,MapTask不知道ReduceTask在哪些節(jié)點上,它只是把進度的時間報告給MRAppMaster。ReduceTask通過“臍帶”執(zhí)行g(shù)etMapCompletionEvents操作想MRAppMaster獲取MapTask結(jié)束運行的時間報告。有個別的MapTask可能會失敗,但是絕大多數(shù)都會成功,只要成功的就通過Fetcher去索取輸出數(shù)據(jù),這個信息就是通過shcheduler完成的也就是ShuffleSchedulerImpl對象,ShuffleSchedulerImpl對象并不多,只是個普通的對象。
fetchers就像線程池,里面有若干線程(默認有5個),這些線程等待EventFetcher的通知,一旦有MapTask完成就前往提取數(shù)據(jù)。
獲取更多大數(shù)據(jù)視頻資料請加QQ群:947967114
我們看Fetcher線程類的run方法:
public void run() {
try {
while (!stopped && !Thread.currentThread().isInterrupted()) {
MapHost host = null;
try {
// If merge is on, block
merger.waitForResource();
// Get a host to shuffle from
host = scheduler.getHost();
//從scheduler獲取一個已經(jīng)成功完成的MapTask的節(jié)點。
metrics.threadBusy();
//線程變成繁忙狀態(tài)
// Shuffle
copyFromHost(host);
//開始復(fù)制這個節(jié)點的數(shù)據(jù)
} finally {
if (host != null) {//maphost還有運行中的
scheduler.freeHost(host);
//狀態(tài)設(shè)置成空閑狀態(tài),等待其完成。
metrics.threadFree();
}
}
}
} catch (InterruptedException ie) {
return;
} catch (Throwable t) {
exceptionReporter.reportException(t);
}
}
這里的重點是copyFromHost獲取數(shù)據(jù)的函數(shù)。
protected void copyFromHost(MapHost host) throws IOException {
// reset retryStartTime for a new host
//這是在ReduceTask的節(jié)點上運行的
retryStartTime = 0;
// Get completed maps on 'host'
List
//獲取目標節(jié)點上的MapTask集合。
// Sanity check to catch hosts with only 'OBSOLETE' maps,
// especially at the tail of large jobs
if (maps.size() == 0) {
return;//沒有完成的直接返回
}
if(LOG.isDebugEnabled()) {
LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
}
// List of maps to be fetched yet
Set remaining = new HashSet(maps);
//已經(jīng)完成、等待shuffle的MapTask集合。
// Construct the url and connect
DataInputStream input = null;
URL url = getMapOutputURL(host, maps);
//生成MapTask所在節(jié)點的URL,下面要看getMapOutputURL源碼:
private URL getMapOutputURL(MapHost host, Collection maps
) throws MalformedURLException {
// Get the base url
StringBuffer url = new StringBuffer(host.getBaseUrl());
boolean first = true;
for (TaskAttemptID mapId : maps) {
if (!first) {
url.append(",");
}
url.append(mapId);//在URL后面加上mapid
first = false;
}
LOG.debug("MapOutput URL for " + host + " -> " + url.toString());
//寫日志
return new URL(url.toString());
//返回URL
}
回到主代碼:
try {
setupConnectionsWithRetry(host, remaining, url);
//和對方主機建立HTTP連接,setupConnectionsWithRetry使用了openConnectionWithRetry函數(shù)打開鏈接。
openConnectionWithRetry(host, remaining, url);
這段源代碼有使用了openConnection(url);方式,繼續(xù)查看。
如下是鏈接的主要過程:
protected synchronized void openConnection(URL url)
throws IOException {
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
//使用的是HTTPURL進行連接
if (sslShuffle) {//如果是有信任證書的
HttpsURLConnection httpsConn = (HttpsURLConnection) conn;
//強轉(zhuǎn)conn類型
try {
httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());//添加一個證書socket的工廠
} catch (GeneralSecurityException ex) {
throw new IOException(ex);
}
httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
}
connection = conn;
}
在setupConnectionsWithRetry中繼續(xù)寫到:
setupShuffleConnection(encHash);
//建立了Shuffle鏈接
connect(connection, connectionTimeout);
// verify that the thread wasn't stopped during calls to connect
if (stopped) {
return;
}
verifyConnection(url, msgToEncode, encHash);
}
//至此連接通過。
if (stopped) {
abortConnect(host, remaining);
//這里邊是關(guān)閉連接,可以點進去看一下,滿足列表和等待的兩個條件
return;
}
} catch (IOException ie) {
boolean connectExcpt = ie instanceof ConnectException;
ioErrs.increment(1);
LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
" map outputs", ie);
回到主代碼
input = new DataInputStream(connection.getInputStream());
//實例一個輸入流對象。
try {
// Loop through available map-outputs and fetch them
// On any error, faildTasks is not null and we exit
// after putting back the remaining maps to the
// yet_to_be_fetched list and marking the failed tasks.
TaskAttemptID[] failedTasks = null;
while (!remaining.isEmpty() && failedTasks == null) {
//如果需要fetcher的列表不空,并且失敗的task數(shù)量沒有
try {
failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled);
//復(fù)制數(shù)據(jù)出來copyMapOutput的源代碼如下:
try {
ShuffleHeader header = new ShuffleHeader();
header.readFields(input);
mapId = TaskAttemptID.forName(header.mapId);
//獲取mapID
compressedLength = header.compressedLength;
decompressedLength = header.uncompressedLength;
forReduce = header.forReduce;
} catch (IllegalArgumentException e) {
badIdErrs.increment(1);
LOG.warn("Invalid map id ", e);
//Don't know which one was bad, so consider all of them as bad
return remaining.toArray(new TaskAttemptID[remaining.size()]);
}
InputStream is = input;
is = CryptoUtils.wrapIfNecessary(jobConf, is, compressedLength);
compressedLength -= CryptoUtils.cryptoPadding(jobConf);
decompressedLength -= CryptoUtils.cryptoPadding(jobConf);
//如果需要解壓或解密
// Do some basic sanity verification
if (!verifySanity(compressedLength, decompressedLength, forReduce,
remaining, mapId)) {
return new TaskAttemptID[] {mapId};
}
if(LOG.isDebugEnabled()) {
LOG.debug("header: " + mapId + ", len: " + compressedLength +
", decomp len: " + decompressedLength);
}
try {
mapOutput = merger.reserve(mapId, decompressedLength, id);
//為merge預(yù)留一個MapOutput:是內(nèi)存還是磁盤上。
} catch (IOException ioe) {
// kill this reduce attempt
ioErrs.increment(1);
scheduler.reportLocalError(ioe);
//報告錯誤
return EMPTY_ATTEMPT_ID_ARRAY;
}
// Check if we can shuffle now ...
if (mapOutput == null) {
LOG.info("fetcher#" + id + " - MergeManager returned status WAIT ...");
//Not an error but wait to process data.
return EMPTY_ATTEMPT_ID_ARRAY;
}
// The codec for lz0,lz4,snappy,bz2,etc. throw java.lang.InternalError
// on decompression failures. Catching and re-throwing as IOException
// to allow fetch failure logic to be processed
try {
// Go!
LOG.info("fetcher#" + id + " about to shuffle output of map "
mapOutput.getMapId() + " decomp: " + decompressedLength
mapOutput.shuffle(host, is, compressedLength, decompressedLength,
metrics, reporter);
//跨節(jié)點把Mapper的文件內(nèi)容拷貝到reduce的內(nèi)存或者磁盤上。
} catch (java.lang.InternalError e) {
LOG.warn("Failed to shuffle for fetcher#"+id, e);
throw new IOException(e);
}
// Inform the shuffle scheduler
long endTime = Time.monotonicNow();
// Reset retryStartTime as map task make progress if retried before.
retryStartTime = 0;
scheduler.copySucceeded(mapId, host, compressedLength,
startTime, endTime, mapOutput);
//告訴調(diào)度器完成了一個節(jié)點的Map輸出的文件拷貝。
remaining.remove(mapId);
//這個MapTask的輸出已經(jīng)shuffle完畢
metrics.successFetch();
return null;后面的異常失敗信息我們不管。
這里的mapOutput是用來容納MapTask輸出文件的存儲空間,根據(jù)輸出文件的內(nèi)容大小和內(nèi)存的情況,可以是內(nèi)存的Output也可以是DiskOutput。 如果是內(nèi)存需要預(yù)約,因為不止一個Fetcher。我們以InMemoryMapOutput為例。
代碼結(jié)構(gòu);
Fetcher.run-->copyFromHost-->copyMapOutput-->merger.reserve(MergeManagerImpl.reserve)-->InmemoryMapOutput.shuffle
public void shuffle(MapHost host, InputStream input,
long compressedLength, long decompressedLength,
ShuffleClientMetrics metrics,
Reporter reporter) throws IOException {
//跨節(jié)點從Mapper拷貝spill文件
IFileInputStream checksumIn =
new IFileInputStream(input, compressedLength, conf);
//校驗和的輸入流
input = checksumIn;
// Are map-outputs compressed?
if (codec != null) {
//如果涉及到了壓縮
decompressor.reset();
//重啟解壓器
input = codec.createInputStream(input, decompressor);
//加了解壓器的輸入流
}
try {
IOUtils.readFully(input, memory, 0, memory.length);
//從Mapper方把特定的Partition數(shù)據(jù)讀入Reducer的內(nèi)存緩沖區(qū)。
metrics.inputBytes(memory.length);
reporter.progress();//匯報進度
LOG.info("Read " + memory.length + " bytes from map-output for " +
getMapId());
/**
We've gotten the amount of data we were expecting. Verify the
decompressor has nothing more to offer. This action also forces the
decompressor to read any trailing bytes that weren't critical
for decompression, which is necessary to keep the stream
*/
if (input.read() >= 0 ) {
throw new IOException("Unexpected extra bytes from input stream for " +
getMapId());
}
} catch (IOException ioe) {
// Close the streams
IOUtils.cleanup(LOG, input);
// Re-throw
throw ioe;
} finally {
CodecPool.returnDecompressor(decompressor);
//釋放解壓器
}
}
從對方把spill文件中屬于本partition數(shù)據(jù)復(fù)制過來,回到copyFromHost中,通過scheduler.copySuccessed告知scheduler,并把這個MapTask的ID從remaining集合中刪除,進入下一個循環(huán),復(fù)制下一個MapTask數(shù)據(jù)。直到把所有的屬于本Partition的數(shù)據(jù)都復(fù)制過來。
以上是Reducer端Fetcher的過程,它向Mapper端發(fā)送HTTP GET請求,下載文件。在MapTask就有一個與之對應(yīng)的Server,這個網(wǎng)絡(luò)協(xié)議的源代碼不做深究,課下有興趣自己研究。 獲取更多大數(shù)據(jù)視頻資料請加QQ群:947967114