這篇文章主要介紹“ReduceTask流程是怎樣的”,在日常操作中,相信很多人在ReduceTask流程是怎樣的問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對(duì)大家解答”ReduceTask流程是怎樣的”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!
目前創(chuàng)新互聯(lián)已為上1000+的企業(yè)提供了網(wǎng)站建設(shè)、域名、網(wǎng)絡(luò)空間、成都網(wǎng)站托管、企業(yè)網(wǎng)站設(shè)計(jì)、清原網(wǎng)站維護(hù)等服務(wù),公司將堅(jiān)持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長,共同發(fā)展。
1、最終的文件就是 file.out 和 file.out.index ,等待reduce的拷貝.
2、在LocalJobRunner$Job中的run方法中: //LocalJobRunner類中555行 if (numReduceTasks > 0) { //判斷reduceTask的個(gè)數(shù) //創(chuàng)建Runnable對(duì)象: LocalJobRunner$Job$ReduceTaskRunnable ListreduceRunnables = getReduceTaskRunnables( jobId, mapOutputFiles); //創(chuàng)建線程池 ExecutorService reduceService = createReduceExecutor(); //將所有的LocalJobRunner$Job$ReduceTaskRunnable 提交到線程池執(zhí)行. runTasks(reduceRunnables, reduceService, "reduce"); }
3、進(jìn)入runTasks(reduceRunnables, reduceService, "reduce");方法 //559行 for (Runnable r : runnables) { //循環(huán)每個(gè)Runnable,提交給線程池去執(zhí)行. service.submit(r); }
4、線程執(zhí)行的時(shí)候,要運(yùn)行LocalJobRunner$Job$ReduceTaskRunnable 中run方法
5、創(chuàng)建ReduceTask對(duì)象 //LocalJobRunner類~332行 ReduceTask reduce = new ReduceTask(systemJobFile.toString(),reduceId, taskId,mapIds.size(), 1); 6、執(zhí)行ReduceTask中的run方法 //LocalJobRunner類 --> 347行reduce.run(localConf, Job.this); --> //進(jìn)入run方法 7、調(diào)到ReduceTask的run方法內(nèi) //ReduceTask類~320行 initialize(job, getJobID(), reporter, useNewApi); //初始化~333行 sortPhase.complete(); //排序~382行 RawComparator comparator = job.getOutputValueGroupingComparator(); //387行 獲取分組比較器
8、進(jìn)入下列代碼(390行) runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); 進(jìn)入runNewReducer方法內(nèi) //ReduceTask~577行 --獲取job的相關(guān)信息 org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter); --反射的操作創(chuàng)建reduce對(duì)象 ,例如: WordCountReducer org.apache.hadoop.mapreduce.Reducerreducer = (org.apache.hadoop.mapreduce.Reducer ) ReflectionUtils.newInstance(taskContext.getReducerClass(), job); --創(chuàng)建RecordWriter對(duì)象 org.apache.hadoop.mapreduce.RecordWriter trackedRW = new NewTrackingRecordWriter (this, taskContext);
9、向下走,定位到reducer.run(reducerContext);方法 --> 然后進(jìn)入(Reducer的run方法) //~628行 setup(context); reduce(context.getCurrentKey(), context.getValues(), context); //執(zhí)行到WordCountReducer中的reduce方法,是一個(gè)循環(huán)調(diào)用過程. context.write(key,outv); //數(shù)據(jù)寫出源碼流程如下: ①:reduceContext.write(key, value); ②:output.write(key, value); //進(jìn)入到ReduceTask的write方法 //557行 ③:real.write(key,value); //real :TextOutputFormat$LineRecordWriter 進(jìn)入到real.write()方法 //TextOutputFormat類~84行 writeObject(key); //寫出key writeObject(value); //寫出value 寫出key的源碼~簡單看下: //TextOutputFormat類~75行 private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(StandardCharsets.UTF_8)); //調(diào)用對(duì)象的toString方法,將返回的字符串轉(zhuǎn)換成字節(jié),通過流寫出 } }
10、cleanup(context); //清除生相關(guān)的文件,生成分區(qū)文件
源碼總結(jié)說明: 1. 看源碼目的: 熟悉整個(gè)MR的流程,能夠?qū)⑽覀冎v解的知識(shí)點(diǎn)對(duì)應(yīng)到源碼中具體的位置. 為面試做準(zhǔn)備. 2. 在整個(gè)MR中 ,會(huì)有N個(gè)MapTask(按照切片數(shù)量決定個(gè)數(shù))和 N個(gè)ReduceTask(自行設(shè)置個(gè)數(shù)) --在集群中的效果是多個(gè)MapTask并行運(yùn)行, 并行數(shù)由集群的資源來決定. --多個(gè)ReduceTask并行運(yùn)行,并行數(shù)由集群的資源來決定. 一般來說,ReduceTask的數(shù)量比較少,基本上都 能夠同時(shí)并行.
到此,關(guān)于“ReduceTask流程是怎樣的”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!