這篇文章將為大家詳細講解有關hadoop-Mapper的示例分析,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
成都創(chuàng)新互聯(lián)公司專注于寶雞企業(yè)網(wǎng)站建設,響應式網(wǎng)站開發(fā),成都商城網(wǎng)站開發(fā)。寶雞網(wǎng)站建設公司,為寶雞等地區(qū)提供建站服務。全流程定制網(wǎng)站設計,專業(yè)設計,全程項目跟蹤,成都創(chuàng)新互聯(lián)公司專業(yè)和態(tài)度為您提供的服務
* Licensed to the Apache Software Foundation (ASF) under one package org.apache.hadoop.mapreduce; import java.io.IOException; /** * Maps input key/value pairs to a set of intermediate key/value pairs. * *Maps are the individual tasks which transform input records into a * intermediate records. The transformed intermediate records need not be of * the same type as the input records. A given input pair may map to zero or * many output pairs.
* *The Hadoop Map-Reduce framework spawns one map task for each * {@link InputSplit} generated by the {@link InputFormat} for the job. *
Mapper
implementations can access the {@link Configuration} for * the job via the {@link JobContext#getConfiguration()}. * *The framework first calls * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by * {@link #map(Object, Object, Context)} * for each key/value pair in the
* *InputSplit
. Finally * {@link #cleanup(Context)} is called.All intermediate values associated with a given output key are * subsequently grouped by the framework, and passed to a {@link Reducer} to * determine the final output. Users can control the sorting and grouping by * specifying two key {@link RawComparator} classes.
* *The
Mapper
outputs are partitioned per *Reducer
. Users can control which keys (and hence records) go to * whichReducer
by implementing a custom {@link Partitioner}. * *Users can optionally specify a
combiner
, via * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the * intermediate outputs, which helps to cut down the amount of data transferred * from theMapper
to theReducer
. * *Applications can specify if and how the intermediate * outputs are to be compressed and which {@link CompressionCodec}s are to be * used via the
* *Configuration
.If the job has zero * reduces then the output of the
* *Mapper
is directly written * to the {@link OutputFormat} without sorting by keys.Example:
** ** public class TokenCounterMapper * extends MapperApplications may override the {@link #run(Context)} method to exert * greater control on map processing e.g. multi-threaded
* * @see InputFormat * @see JobContext * @see Partitioner * @see Reducer */ public class MapperMapper
s * etc.{ public class Context extends MapContext { public Context(Configuration conf, TaskAttemptID taskid, RecordReader reader, RecordWriter writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) throws IOException, InterruptedException { super(conf, taskid, reader, writer, committer, reporter, split); } } /** * Called once at the beginning of the task. */ protected void setup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * Called once for each key/value pair in the input split. Most applications * should override this, but the default is the identity function. */ @SuppressWarnings("unchecked") protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } /** * Called once at the end of the task. */ protected void cleanup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * Expert users can override this method for more complete control over the * execution of the Mapper. * @param context * @throws IOException */ public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); } }
Mapper的四個方法是setup,map,cleanup和run。其中,setup和cleanup用于管理Mapper生命周期中的資源,setup在完成Mapper構(gòu)造,即將開始執(zhí)行map動作前調(diào)用,cleanup則在所有的map動作完成后被調(diào)用。方法map用于對一次輸入的key/value對進行map動作。run方法執(zhí)行了上面描述的過程,它調(diào)用setup,讓后迭代所有的key/value對,進行map,最后調(diào)用cleanup。
org.apache.hadoop.mapreduce.lib.map中實現(xiàn)了Mapper的三個子類,分別是InverseMapper(將輸入
InverseMapper源代碼:
* Licensed to the Apache Software Foundation (ASF) under one package org.apache.hadoop.mapreduce.lib.map; import java.io.IOException; /** A {@link Mapper} that swaps keys and values. */ public class InverseMapperextends Mapper { /** The inverse function. Input keys and values are swapped.*/ @Override public void map(K key, V value, Context context ) throws IOException, InterruptedException { context.write(value, key); } }
TokenCountMapper源代碼:
* Licensed to the Apache Software Foundation (ASF) under one package org.apache.hadoop.mapreduce.lib.map; import java.io.IOException; /** * Tokenize the input values and emit each word with a count of 1. */ public class TokenCounterMapper extends Mapper
MultithreadedMapper會啟動多個線程執(zhí)行另一個Mapper的map方法,它會啟動mapred.map.multithreadedrunner.threads(配置項)個線程執(zhí)行Mapper:mapred.map.multithreadedrunner.class(配置項)。MultithreadedMapper重寫了基類Mapper的run方法,啟動N個線程(對應的類為MapRunner)執(zhí)行mapred.map.multithreadedrunner.class(我們稱為目標Mapper)的run方法(就是說,目標Mapper的setup和cleanup會被執(zhí)行多次)。目標Mapper共享同一份InputSplit,這就意味著,對InputSplit的數(shù)據(jù)讀必須線程安全。為此,MultithreadedMapper引入了內(nèi)部類SubMapRecordReader,SubMapRecordWriter,SubMapStatusReporter,分別繼承自RecordReader,RecordWriter和StatusReporter,它們通過互斥訪問MultithreadedMapper的Mapper.Context,實現(xiàn)了對同一份InputSplit的線程安全訪問,為Mapper提供所需的Context。這些類的實現(xiàn)方法都很簡單。
關于“hadoop-Mapper的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。