本篇內容主要講解“MapReduce的output輸出過程是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“MapReduce的output輸出過程是什么”吧!
創(chuàng)新互聯(lián)建站是創(chuàng)新、創(chuàng)意、研發(fā)型一體的綜合型網(wǎng)站建設公司,自成立以來公司不斷探索創(chuàng)新,始終堅持為客戶提供滿意周到的服務,在本地打下了良好的口碑,在過去的十余年時間我們累計服務了上千家以及全國政企客戶,如成都VR全景等企業(yè)單位,完善的項目管理流程,嚴格把控項目進度與質量監(jiān)控加上過硬的技術實力獲得客戶的一致贊美。//--------------------------ReduceTask.java public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, InterruptedException, ClassNotFoundException { job.setBoolean("mapreduce.job.skiprecords", this.isSkipping()); if (this.isMapOrReduce()) { this.copyPhase = this.getProgress().addPhase("copy"); this.sortPhase = this.getProgress().addPhase("sort"); this.reducePhase = this.getProgress().addPhase("reduce"); } TaskReporter reporter = this.startReporter(umbilical); boolean useNewApi = job.getUseNewReducer(); //reducetask初始化工作 this.initialize(job, this.getJobID(), reporter, useNewApi); if (this.jobCleanup) { this.runJobCleanupTask(umbilical, reporter); } else if (this.jobSetup) { this.runJobSetupTask(umbilical, reporter); } else if (this.taskCleanup) { this.runTaskCleanupTask(umbilical, reporter); } else { this.codec = this.initCodec(); RawKeyValueIterator rIter = null; ShuffleConsumerPlugin shuffleConsumerPlugin = null; Class combinerClass = this.conf.getCombinerClass(); CombineOutputCollector combineCollector = null != combinerClass ? new CombineOutputCollector(this.reduceCombineOutputCounter, reporter, this.conf) : null; Class extends ShuffleConsumerPlugin> clazz = job.getClass("mapreduce.job.reduce.shuffle.consumer.plugin.class", Shuffle.class, ShuffleConsumerPlugin.class); shuffleConsumerPlugin = (ShuffleConsumerPlugin)ReflectionUtils.newInstance(clazz, job); LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin); Context shuffleContext = new Context(this.getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, this.codec, combinerClass, combineCollector, this.spilledRecordsCounter, this.reduceCombineInputCounter, this.shuffledMapsCounter, this.reduceShuffleBytes, this.failedShuffleCounter, this.mergedMapOutputsCounter, this.taskStatus, this.copyPhase, this.sortPhase, this, this.mapOutputFile, this.localMapFiles); shuffleConsumerPlugin.init(shuffleContext); rIter = shuffleConsumerPlugin.run(); this.mapOutputFilesOnDisk.clear(); this.sortPhase.complete(); this.setPhase(Phase.REDUCE); this.statusUpdate(umbilical); Class keyClass = job.getMapOutputKeyClass(); Class valueClass = job.getMapOutputValueClass(); RawComparator comparator = job.getOutputValueGroupingComparator(); //開始運行reducetask if (useNewApi) { this.runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); } else { this.runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); } shuffleConsumerPlugin.close(); this.done(umbilical, reporter); }
和MapTask類似,主要有 this.initialize() 以及 this.runNewReducer() 這兩個方法。做了初始化以及開始運行task的操作。
//----------------------------------------ReduceTask.java public void initialize(JobConf job, JobID id, Reporter reporter, boolean useNewApi) throws IOException, ClassNotFoundException, InterruptedException { //創(chuàng)建上下文對象 this.jobContext = new JobContextImpl(job, id, reporter); this.taskContext = new TaskAttemptContextImpl(job, this.taskId, reporter); //修改reducetask的狀態(tài)為運行中 if (this.getState() == org.apache.hadoop.mapred.TaskStatus.State.UNASSIGNED) { this.setState(org.apache.hadoop.mapred.TaskStatus.State.RUNNING); } if (useNewApi) { if (LOG.isDebugEnabled()) { LOG.debug("using new api for output committer"); } //反射獲取outputformat類對象。getOutputFormatClass這個方法在JobContextImpl中。 //默認是TextOutputFormat.class this.outputFormat = (OutputFormat)ReflectionUtils.newInstance(this.taskContext.getOutputFormatClass(), job); this.committer = this.outputFormat.getOutputCommitter(this.taskContext); } else { this.committer = this.conf.getOutputCommitter(); } //獲取輸出路徑 Path outputPath = FileOutputFormat.getOutputPath(this.conf); if (outputPath != null) { if (this.committer instanceof FileOutputCommitter) { FileOutputFormat.setWorkOutputPath(this.conf, ((FileOutputCommitter)this.committer).getTaskAttemptPath(this.taskContext)); } else { FileOutputFormat.setWorkOutputPath(this.conf, outputPath); } } this.committer.setupTask(this.taskContext); Class extends ResourceCalculatorProcessTree> clazz = this.conf.getClass("mapreduce.job.process-tree.class", (Class)null, ResourceCalculatorProcessTree.class); this.pTree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree((String)System.getenv().get("JVM_PID"), clazz, this.conf); LOG.info(" Using ResourceCalculatorProcessTree : " + this.pTree); if (this.pTree != null) { this.pTree.updateProcessTree(); this.initCpuCumulativeTime = this.pTree.getCumulativeCpuTime(); } }
主要就是初始化上下文對象,獲取outputformat對象。
//-----------------------------------------------ReduceTask.java privatevoid runNewReducer(JobConf job, TaskUmbilicalProtocol umbilical, final TaskReporter reporter, final RawKeyValueIterator rIter, RawComparator comparator, Class keyClass, Class valueClass) throws IOException, InterruptedException, ClassNotFoundException { //匿名內部類,用于構建key,value的迭代器 rIter = new RawKeyValueIterator() { public void close() throws IOException { rIter.close(); } public DataInputBuffer getKey() throws IOException { return rIter.getKey(); } public Progress getProgress() { return rIter.getProgress(); } public DataInputBuffer getValue() throws IOException { return rIter.getValue(); } public boolean next() throws IOException { boolean ret = rIter.next(); reporter.setProgress(rIter.getProgress().getProgress()); return ret; } }; TaskAttemptContext taskContext = new TaskAttemptContextImpl(job, this.getTaskID(), reporter); //反射獲取Reducer對象 org.apache.hadoop.mapreduce.Reducer reducer = (org.apache.hadoop.mapreduce.Reducer)ReflectionUtils.newInstance(taskContext.getReducerClass(), job); //獲取RecordWriter對象,用于將結果寫入到文件中 org.apache.hadoop.mapreduce.RecordWriter trackedRW = new ReduceTask.NewTrackingRecordWriter(this, taskContext); job.setBoolean("mapred.skip.on", this.isSkipping()); job.setBoolean("mapreduce.job.skiprecords", this.isSkipping()); //創(chuàng)建reduceContext對象,用于reduce任務 org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, this.getTaskID(), rIter, this.reduceInputKeyCounter, this.reduceInputValueCounter, trackedRW, this.committer, reporter, comparator, keyClass, valueClass); //開始運行reduce try { reducer.run(reducerContext); } finally { //關閉輸出流 trackedRW.close(reducerContext); } }
可以看到,主要做了以下工作:
1)獲取reducer對象,用于運行run() ,也就是運行reduce方法
2)創(chuàng)建 RecordWriter對象
3)創(chuàng)建reduceContext
4)開始運行reducer中的run
//--------------------------------------NewTrackingRecordWriter.java static class NewTrackingRecordWriterextends org.apache.hadoop.mapreduce.RecordWriter { private final org.apache.hadoop.mapreduce.RecordWriter real; private final org.apache.hadoop.mapreduce.Counter outputRecordCounter; private final org.apache.hadoop.mapreduce.Counter fileOutputByteCounter; private final List fsStats; NewTrackingRecordWriter(ReduceTask reduce, TaskAttemptContext taskContext) throws InterruptedException, IOException { this.outputRecordCounter = reduce.reduceOutputCounter; this.fileOutputByteCounter = reduce.fileOutputByteCounter; List matchedStats = null; if (reduce.outputFormat instanceof FileOutputFormat) { matchedStats = Task.getFsStatistics(FileOutputFormat.getOutputPath(taskContext), taskContext.getConfiguration()); } this.fsStats = matchedStats; long bytesOutPrev = this.getOutputBytes(this.fsStats); //通過outputFormat創(chuàng)建RecordWriter對象 this.real = reduce.outputFormat.getRecordWriter(taskContext); long bytesOutCurr = this.getOutputBytes(this.fsStats); this.fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); } ..................... }
重點的就是通過outputFormat.getRecordWriter來創(chuàng)建 RecordWriter 對象。
上面也說到,outputFormat默認就是 TextOutputFormat,所以下面看看
TextOutputFormat.getRecordWriter()
public class TextOutputFormatextends FileOutputFormat { public TextOutputFormat() { } //可以看到,返回的是靜態(tài)內部類TextOutputFormat.LineRecordWriter public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { boolean isCompressed = getCompressOutput(job); //key和value的分隔符,默認是 \t String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t"); //分為壓縮和非壓縮輸出 if (!isCompressed) { //獲取輸出路徑 Path file = FileOutputFormat.getTaskOutputPath(job, name); FileSystem fs = file.getFileSystem(job); //創(chuàng)建輸出流 FSDataOutputStream fileOut = fs.create(file, progress); return new TextOutputFormat.LineRecordWriter(fileOut, keyValueSeparator); } else { Class extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, job); Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension()); FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, progress); //返回LineRecordWriter對象 return new TextOutputFormat.LineRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator); } } //這里就是 LineRecordWriter 類 protected static class LineRecordWriter implements RecordWriter { private static final byte[] NEWLINE; protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; this.keyValueSeparator = keyValueSeparator.getBytes(StandardCharsets.UTF_8); } public LineRecordWriter(DataOutputStream out) { this(out, "\t"); } private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text)o; this.out.write(to.getBytes(), 0, to.getLength()); } else { this.out.write(o.toString().getBytes(StandardCharsets.UTF_8)); } } //將KV輸出 public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (!nullKey || !nullValue) { //先寫key if (!nullKey) { this.writeObject(key); } //接著寫入key和value之間的分隔符 if (!nullKey && !nullValue) { this.out.write(this.keyValueSeparator); } //最后寫入value if (!nullValue) { this.writeObject(value); } //接著寫入新的一行 this.out.write(NEWLINE); } } public synchronized void close(Reporter reporter) throws IOException { this.out.close(); } static { NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); } } }
可以看到,最終返回的RecordWriter對象是 LineRecordWriter 類型的。
接著回到3中,看 reduceContext這個對象的類
protected staticReducer .Context createReduceContext(Reducer reducer, Configuration job, org.apache.hadoop.mapreduce.TaskAttemptID taskId, RawKeyValueIterator rIter, org.apache.hadoop.mapreduce.Counter inputKeyCounter, org.apache.hadoop.mapreduce.Counter inputValueCounter, RecordWriter output, OutputCommitter committer, StatusReporter reporter, RawComparator comparator, Class keyClass, Class valueClass) throws IOException, InterruptedException { ReduceContext reduceContext = new ReduceContextImpl(job, taskId, rIter, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass); Reducer .Context reducerContext = (new WrappedReducer()).getReducerContext(reduceContext); return reducerContext; }
可以看到reducerContext是一個ReduceContextImpl類對象。
下面看看ReduceContextImpl 這個類的構造方法
//---------------------------------ReduceContextImpl.java public ReduceContextImpl(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, Counter inputKeyCounter, Counter inputValueCounter, RecordWriteroutput, OutputCommitter committer, StatusReporter reporter, RawComparator comparator, Class keyClass, Class valueClass) throws InterruptedException, IOException { //父類是 TaskInputOutputContextImpl,把outputformat對象傳遞進去了 super(conf, taskid, output, committer, reporter); this.input = input; this.inputKeyCounter = inputKeyCounter; this.inputValueCounter = inputValueCounter; this.comparator = comparator; this.serializationFactory = new SerializationFactory(conf); this.keyDeserializer = this.serializationFactory.getDeserializer(keyClass); this.keyDeserializer.open(this.buffer); this.valueDeserializer = this.serializationFactory.getDeserializer(valueClass); this.valueDeserializer.open(this.buffer); this.hasMore = input.next(); this.keyClass = keyClass; this.valueClass = valueClass; this.conf = conf; this.taskid = taskid; }
這里面,它繼續(xù)調用了父類的構造方法,把outputformat對象傳遞進去了。
繼續(xù)看看父類 TaskInputOutputContextImpl
public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid, RecordWriteroutput, OutputCommitter committer, StatusReporter reporter) { //可以看到這里的output就是recordWriter對象 super(conf, taskid, reporter); this.output = output; this.committer = committer; } //這里的邏輯其實就是先讀取KV到 this.key和this.value中,如果沒有KV就返回false,否則返回true public abstract boolean nextKeyValue() throws IOException, InterruptedException; public abstract KEYIN getCurrentKey() throws IOException, InterruptedException; public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException; //調用recordWriter的write方法,將KV輸出,默認是LineRecordWriter這個類 public void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException { this.output.write(key, value);
可以看到,這里有3個抽象方法(在子類ReduceContextImpl中實現(xiàn)了邏輯,和RecordWriter無關),以及write這個具體方法。分別用于獲取KV以及將結果KV寫入。write這個寫入方法,就是調用的 recordWriter的write方法,也就是5中創(chuàng)建的LineRecordWriter對象中的write方法,將KV輸出。
public void run(Reducer.Context context) throws IOException, InterruptedException { this.setup(context); try { while(context.nextKey()) { this.reduce(context.getCurrentKey(), context.getValues(), context); Iterator iter = context.getValues().iterator(); if (iter instanceof ValueIterator) { ((ValueIterator)iter).resetBackupStore(); } } } finally { this.cleanup(context); } }
可以看到,這里就是調用6中創(chuàng)建的 reduceContext中的方法來獲取KV。而且在reduce方法中,我們會通過 context.write(key,value)來將結果KV輸出。調用的其實就是 LineRecordWriter對象中的write方法。
到此,相信大家對“MapReduce的output輸出過程是什么”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續(xù)學習!
另外有需要云服務器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內外云服務器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務器、裸金屬服務器、高防服務器、香港服務器、美國服務器、虛擬主機、免備案服務器”等云主機租用服務以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應用場景需求。