真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

MapReduce的output輸出過程是什么-創(chuàng)新互聯(lián)

本篇內容主要講解“MapReduce的output輸出過程是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“MapReduce的output輸出過程是什么”吧!

創(chuàng)新互聯(lián)建站是創(chuàng)新、創(chuàng)意、研發(fā)型一體的綜合型網(wǎng)站建設公司,自成立以來公司不斷探索創(chuàng)新,始終堅持為客戶提供滿意周到的服務,在本地打下了良好的口碑,在過去的十余年時間我們累計服務了上千家以及全國政企客戶,如成都VR全景等企業(yè)單位,完善的項目管理流程,嚴格把控項目進度與質量監(jiān)控加上過硬的技術實力獲得客戶的一致贊美。

1、首先看 ReduceTask.run()  這個執(zhí)行入口

//--------------------------ReduceTask.java
public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, InterruptedException, ClassNotFoundException {
    job.setBoolean("mapreduce.job.skiprecords", this.isSkipping());
    if (this.isMapOrReduce()) {
        this.copyPhase = this.getProgress().addPhase("copy");
        this.sortPhase = this.getProgress().addPhase("sort");
        this.reducePhase = this.getProgress().addPhase("reduce");
    }

    TaskReporter reporter = this.startReporter(umbilical);
    boolean useNewApi = job.getUseNewReducer();
    //reducetask初始化工作
    this.initialize(job, this.getJobID(), reporter, useNewApi);
    if (this.jobCleanup) {
        this.runJobCleanupTask(umbilical, reporter);
    } else if (this.jobSetup) {
        this.runJobSetupTask(umbilical, reporter);
    } else if (this.taskCleanup) {
        this.runTaskCleanupTask(umbilical, reporter);
    } else {
        this.codec = this.initCodec();
        RawKeyValueIterator rIter = null;
        ShuffleConsumerPlugin shuffleConsumerPlugin = null;
        Class combinerClass = this.conf.getCombinerClass();
        CombineOutputCollector combineCollector = null != combinerClass ? new CombineOutputCollector(this.reduceCombineOutputCounter, reporter, this.conf) : null;
        Class clazz = job.getClass("mapreduce.job.reduce.shuffle.consumer.plugin.class", Shuffle.class, ShuffleConsumerPlugin.class);
        shuffleConsumerPlugin = (ShuffleConsumerPlugin)ReflectionUtils.newInstance(clazz, job);
        LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
        Context shuffleContext = new Context(this.getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, this.codec, combinerClass, combineCollector, this.spilledRecordsCounter, this.reduceCombineInputCounter, this.shuffledMapsCounter, this.reduceShuffleBytes, this.failedShuffleCounter, this.mergedMapOutputsCounter, this.taskStatus, this.copyPhase, this.sortPhase, this, this.mapOutputFile, this.localMapFiles);
        shuffleConsumerPlugin.init(shuffleContext);
        rIter = shuffleConsumerPlugin.run();
        this.mapOutputFilesOnDisk.clear();
        this.sortPhase.complete();
        this.setPhase(Phase.REDUCE);
        this.statusUpdate(umbilical);
        Class keyClass = job.getMapOutputKeyClass();
        Class valueClass = job.getMapOutputValueClass();
        RawComparator comparator = job.getOutputValueGroupingComparator();
        //開始運行reducetask
        if (useNewApi) {
            this.runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);
        } else {
            this.runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);
        }

        shuffleConsumerPlugin.close();
        this.done(umbilical, reporter);
    }

和MapTask類似,主要有 this.initialize() 以及 this.runNewReducer() 這兩個方法。做了初始化以及開始運行task的操作。

2、this.initialize()

//----------------------------------------ReduceTask.java
public void initialize(JobConf job, JobID id, Reporter reporter, boolean useNewApi) throws IOException, ClassNotFoundException, InterruptedException {

    //創(chuàng)建上下文對象
    this.jobContext = new JobContextImpl(job, id, reporter);
    this.taskContext = new TaskAttemptContextImpl(job, this.taskId, reporter);
    //修改reducetask的狀態(tài)為運行中
    if (this.getState() == org.apache.hadoop.mapred.TaskStatus.State.UNASSIGNED) {
        this.setState(org.apache.hadoop.mapred.TaskStatus.State.RUNNING);
    }

    if (useNewApi) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("using new api for output committer");
        }

        //反射獲取outputformat類對象。getOutputFormatClass這個方法在JobContextImpl中。
        //默認是TextOutputFormat.class
        this.outputFormat = (OutputFormat)ReflectionUtils.newInstance(this.taskContext.getOutputFormatClass(), job);
        this.committer = this.outputFormat.getOutputCommitter(this.taskContext);
    } else {
        this.committer = this.conf.getOutputCommitter();
    }

    //獲取輸出路徑
    Path outputPath = FileOutputFormat.getOutputPath(this.conf);
    if (outputPath != null) {
        if (this.committer instanceof FileOutputCommitter) {
            FileOutputFormat.setWorkOutputPath(this.conf, ((FileOutputCommitter)this.committer).getTaskAttemptPath(this.taskContext));
        } else {
            FileOutputFormat.setWorkOutputPath(this.conf, outputPath);
        }
    }

    this.committer.setupTask(this.taskContext);
    Class clazz = this.conf.getClass("mapreduce.job.process-tree.class", (Class)null, ResourceCalculatorProcessTree.class);
    this.pTree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree((String)System.getenv().get("JVM_PID"), clazz, this.conf);
    LOG.info(" Using ResourceCalculatorProcessTree : " + this.pTree);
    if (this.pTree != null) {
        this.pTree.updateProcessTree();
        this.initCpuCumulativeTime = this.pTree.getCumulativeCpuTime();
    }

}

主要就是初始化上下文對象,獲取outputformat對象。

3、this.runNewReducer()

//-----------------------------------------------ReduceTask.java
private  void runNewReducer(JobConf job, TaskUmbilicalProtocol umbilical, final TaskReporter reporter, final RawKeyValueIterator rIter, RawComparator comparator, Class keyClass, Class valueClass) throws IOException, InterruptedException, ClassNotFoundException {
    //匿名內部類,用于構建key,value的迭代器
    rIter = new RawKeyValueIterator() {
        public void close() throws IOException {
            rIter.close();
        }

        public DataInputBuffer getKey() throws IOException {
            return rIter.getKey();
        }

        public Progress getProgress() {
            return rIter.getProgress();
        }

        public DataInputBuffer getValue() throws IOException {
            return rIter.getValue();
        }

        public boolean next() throws IOException {
            boolean ret = rIter.next();
            reporter.setProgress(rIter.getProgress().getProgress());
            return ret;
        }
    };
    TaskAttemptContext taskContext = new TaskAttemptContextImpl(job, this.getTaskID(), reporter);
    //反射獲取Reducer對象
    org.apache.hadoop.mapreduce.Reducer reducer = (org.apache.hadoop.mapreduce.Reducer)ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
    //獲取RecordWriter對象,用于將結果寫入到文件中
    org.apache.hadoop.mapreduce.RecordWriter trackedRW = new ReduceTask.NewTrackingRecordWriter(this, taskContext);
    job.setBoolean("mapred.skip.on", this.isSkipping());
    job.setBoolean("mapreduce.job.skiprecords", this.isSkipping());
    //創(chuàng)建reduceContext對象,用于reduce任務
    org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, this.getTaskID(), rIter, this.reduceInputKeyCounter, this.reduceInputValueCounter, trackedRW, this.committer, reporter, comparator, keyClass, valueClass);

    //開始運行reduce
    try {
        reducer.run(reducerContext);
    } finally {
        //關閉輸出流
        trackedRW.close(reducerContext);
    }

}

可以看到,主要做了以下工作:
1)獲取reducer對象,用于運行run() ,也就是運行reduce方法
2)創(chuàng)建 RecordWriter對象
3)創(chuàng)建reduceContext
4)開始運行reducer中的run

4、ReduceTask.NewTrackingRecordWriter()

//--------------------------------------NewTrackingRecordWriter.java
static class NewTrackingRecordWriter extends org.apache.hadoop.mapreduce.RecordWriter {
    private final org.apache.hadoop.mapreduce.RecordWriter real;
    private final org.apache.hadoop.mapreduce.Counter outputRecordCounter;
    private final org.apache.hadoop.mapreduce.Counter fileOutputByteCounter;
    private final List fsStats;

    NewTrackingRecordWriter(ReduceTask reduce, TaskAttemptContext taskContext) throws InterruptedException, IOException {
        this.outputRecordCounter = reduce.reduceOutputCounter;
        this.fileOutputByteCounter = reduce.fileOutputByteCounter;
        List matchedStats = null;
        if (reduce.outputFormat instanceof FileOutputFormat) {
            matchedStats = Task.getFsStatistics(FileOutputFormat.getOutputPath(taskContext), taskContext.getConfiguration());
        }

        this.fsStats = matchedStats;
        long bytesOutPrev = this.getOutputBytes(this.fsStats);
        //通過outputFormat創(chuàng)建RecordWriter對象
        this.real = reduce.outputFormat.getRecordWriter(taskContext);
        long bytesOutCurr = this.getOutputBytes(this.fsStats);
        this.fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
    }
    .....................
}

重點的就是通過outputFormat.getRecordWriter來創(chuàng)建 RecordWriter 對象。
上面也說到,outputFormat默認就是 TextOutputFormat,所以下面看看
TextOutputFormat.getRecordWriter()

5、TextOutputFormat.getRecordWriter()

public class TextOutputFormat extends FileOutputFormat {
    public TextOutputFormat() {
    }

    //可以看到,返回的是靜態(tài)內部類TextOutputFormat.LineRecordWriter
    public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException {
        boolean isCompressed = getCompressOutput(job);
        //key和value的分隔符,默認是 \t
        String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t");
        //分為壓縮和非壓縮輸出
        if (!isCompressed) {
            //獲取輸出路徑
            Path file = FileOutputFormat.getTaskOutputPath(job, name);
            FileSystem fs = file.getFileSystem(job);
            //創(chuàng)建輸出流
            FSDataOutputStream fileOut = fs.create(file, progress);
            return new TextOutputFormat.LineRecordWriter(fileOut, keyValueSeparator);
        } else {
            Class codecClass = getOutputCompressorClass(job, GzipCodec.class);
            CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, job);
            Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension());
            FileSystem fs = file.getFileSystem(job);
            FSDataOutputStream fileOut = fs.create(file, progress);
            //返回LineRecordWriter對象
            return new TextOutputFormat.LineRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator);
        }
    }

    //這里就是 LineRecordWriter 類
    protected static class LineRecordWriter implements RecordWriter {
        private static final byte[] NEWLINE;
        protected DataOutputStream out;
        private final byte[] keyValueSeparator;

        public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
            this.out = out;
            this.keyValueSeparator = keyValueSeparator.getBytes(StandardCharsets.UTF_8);
        }

        public LineRecordWriter(DataOutputStream out) {
            this(out, "\t");
        }

        private void writeObject(Object o) throws IOException {
            if (o instanceof Text) {
                Text to = (Text)o;
                this.out.write(to.getBytes(), 0, to.getLength());
            } else {
                this.out.write(o.toString().getBytes(StandardCharsets.UTF_8));
            }

        }

        //將KV輸出
        public synchronized void write(K key, V value) throws IOException {
            boolean nullKey = key == null || key instanceof NullWritable;
            boolean nullValue = value == null || value instanceof NullWritable;
            if (!nullKey || !nullValue) {
                //先寫key
                if (!nullKey) {
                    this.writeObject(key);
                }

                //接著寫入key和value之間的分隔符
                if (!nullKey && !nullValue) {
                    this.out.write(this.keyValueSeparator);
                }

                //最后寫入value
                if (!nullValue) {
                    this.writeObject(value);
                }

                //接著寫入新的一行
                this.out.write(NEWLINE);
            }
        }

        public synchronized void close(Reporter reporter) throws IOException {
            this.out.close();
        }

        static {
            NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
        }
    }
}

可以看到,最終返回的RecordWriter對象是 LineRecordWriter 類型的。
接著回到3中,看 reduceContext這個對象的類

6、reduceContext = ReduceTask.createReduceContext()

protected static  Reducer.Context createReduceContext(Reducer reducer, Configuration job, org.apache.hadoop.mapreduce.TaskAttemptID taskId, RawKeyValueIterator rIter, org.apache.hadoop.mapreduce.Counter inputKeyCounter, org.apache.hadoop.mapreduce.Counter inputValueCounter, RecordWriter output, OutputCommitter committer, StatusReporter reporter, RawComparator comparator, Class keyClass, Class valueClass) throws IOException, InterruptedException {
    ReduceContext reduceContext = new ReduceContextImpl(job, taskId, rIter, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass);
    Reducer.Context reducerContext = (new WrappedReducer()).getReducerContext(reduceContext);
    return reducerContext;
}

可以看到reducerContext是一個ReduceContextImpl類對象。
下面看看ReduceContextImpl 這個類的構造方法

//---------------------------------ReduceContextImpl.java
public ReduceContextImpl(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, Counter inputKeyCounter, Counter inputValueCounter, RecordWriter output, OutputCommitter committer, StatusReporter reporter, RawComparator comparator, Class keyClass, Class valueClass) throws InterruptedException, IOException {
    //父類是 TaskInputOutputContextImpl,把outputformat對象傳遞進去了
    super(conf, taskid, output, committer, reporter);
    this.input = input;
    this.inputKeyCounter = inputKeyCounter;
    this.inputValueCounter = inputValueCounter;
    this.comparator = comparator;
    this.serializationFactory = new SerializationFactory(conf);
    this.keyDeserializer = this.serializationFactory.getDeserializer(keyClass);
    this.keyDeserializer.open(this.buffer);
    this.valueDeserializer = this.serializationFactory.getDeserializer(valueClass);
    this.valueDeserializer.open(this.buffer);
    this.hasMore = input.next();
    this.keyClass = keyClass;
    this.valueClass = valueClass;
    this.conf = conf;
    this.taskid = taskid;
}

這里面,它繼續(xù)調用了父類的構造方法,把outputformat對象傳遞進去了。
繼續(xù)看看父類 TaskInputOutputContextImpl

public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid, RecordWriter output, OutputCommitter committer, StatusReporter reporter) {
    //可以看到這里的output就是recordWriter對象
    super(conf, taskid, reporter);
    this.output = output;
    this.committer = committer;
}

//這里的邏輯其實就是先讀取KV到 this.key和this.value中,如果沒有KV就返回false,否則返回true
public abstract boolean nextKeyValue() throws IOException, InterruptedException;

public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;

public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;

//調用recordWriter的write方法,將KV輸出,默認是LineRecordWriter這個類
public void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException {
    this.output.write(key, value);

可以看到,這里有3個抽象方法(在子類ReduceContextImpl中實現(xiàn)了邏輯,和RecordWriter無關),以及write這個具體方法。分別用于獲取KV以及將結果KV寫入。write這個寫入方法,就是調用的 recordWriter的write方法,也就是5中創(chuàng)建的LineRecordWriter對象中的write方法,將KV輸出。

7、reducer.run()

public void run(Reducer.Context context) throws IOException, InterruptedException {
        this.setup(context);

        try {
            while(context.nextKey()) {
                this.reduce(context.getCurrentKey(), context.getValues(), context);
                Iterator iter = context.getValues().iterator();
                if (iter instanceof ValueIterator) {
                    ((ValueIterator)iter).resetBackupStore();
                }
            }
        } finally {
            this.cleanup(context);
        }

    }

可以看到,這里就是調用6中創(chuàng)建的 reduceContext中的方法來獲取KV。而且在reduce方法中,我們會通過 context.write(key,value)來將結果KV輸出。調用的其實就是 LineRecordWriter對象中的write方法。

到此,相信大家對“MapReduce的output輸出過程是什么”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續(xù)學習!

另外有需要云服務器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內外云服務器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務器、裸金屬服務器、高防服務器、香港服務器、美國服務器、虛擬主機、免備案服務器”等云主機租用服務以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應用場景需求。


標題名稱:MapReduce的output輸出過程是什么-創(chuàng)新互聯(lián)
當前鏈接:http://weahome.cn/article/eejsd.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部