真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

hadoop-Mapper的示例分析

這篇文章將為大家詳細講解有關hadoop-Mapper的示例分析,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

成都創(chuàng)新互聯(lián)公司專注于寶雞企業(yè)網(wǎng)站建設,響應式網(wǎng)站開發(fā),成都商城網(wǎng)站開發(fā)。寶雞網(wǎng)站建設公司,為寶雞等地區(qū)提供建站服務。全流程定制網(wǎng)站設計,專業(yè)設計,全程項目跟蹤,成都創(chuàng)新互聯(lián)公司專業(yè)和態(tài)度為您提供的服務

* Licensed to the Apache Software Foundation (ASF) under one

package org.apache.hadoop.mapreduce;

import java.io.IOException;

/** 
 * Maps input key/value pairs to a set of intermediate key/value pairs.  
 * 
 * 

Maps are the individual tasks which transform input records into a   * intermediate records. The transformed intermediate records need not be of   * the same type as the input records. A given input pair may map to zero or   * many output pairs.

   *   * 

The Hadoop Map-Reduce framework spawns one map task for each   * {@link InputSplit} generated by the {@link InputFormat} for the job.  * Mapper implementations can access the {@link Configuration} for   * the job via the {@link JobContext#getConfiguration()}.  *   * 

The framework first calls   * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by  * {@link #map(Object, Object, Context)}   * for each key/value pair in the InputSplit. Finally   * {@link #cleanup(Context)} is called.

 *   * 

All intermediate values associated with a given output key are   * subsequently grouped by the framework, and passed to a {@link Reducer} to    * determine the final output. Users can control the sorting and grouping by   * specifying two key {@link RawComparator} classes.

 *  * 

The Mapper outputs are partitioned per   * Reducer. Users can control which keys (and hence records) go to   * which Reducer by implementing a custom {@link Partitioner}.  *   * 

Users can optionally specify a combiner, via   * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the   * intermediate outputs, which helps to cut down the amount of data transferred   * from the Mapper to the Reducer.  *   * 

Applications can specify if and how the intermediate  * outputs are to be compressed and which {@link CompressionCodec}s are to be  * used via the Configuration.

 *    * 

If the job has zero  * reduces then the output of the Mapper is directly written  * to the {@link OutputFormat} without sorting by keys.

 *   * 

Example:

 * 

 * public class TokenCounterMapper 
 *     extends Mapper{
 *    
 *   private final static IntWritable one = new IntWritable(1);
 *   private Text word = new Text();
 *   
 *   public void map(Object key, Text value, Context context) throws IOException {
 *     StringTokenizer itr = new StringTokenizer(value.toString());
 *     while (itr.hasMoreTokens()) {
 *       word.set(itr.nextToken());
 *       context.collect(word, one);
 *     }
 *   }
 * }
 * 

 *  * 

Applications may override the {@link #run(Context)} method to exert   * greater control on map processing e.g. multi-threaded Mappers   * etc.

 *   * @see InputFormat  * @see JobContext  * @see Partitioner    * @see Reducer  */ public class Mapper {   public class Context      extends MapContext {     public Context(Configuration conf, TaskAttemptID taskid,                    RecordReader reader,                    RecordWriter writer,                    OutputCommitter committer,                    StatusReporter reporter,                    InputSplit split) throws IOException, InterruptedException {       super(conf, taskid, reader, writer, committer, reporter, split);     }   }      /**    * Called once at the beginning of the task.    */   protected void setup(Context context                        ) throws IOException, InterruptedException {     // NOTHING   }   /**    * Called once for each key/value pair in the input split. Most applications    * should override this, but the default is the identity function.    */   @SuppressWarnings("unchecked")   protected void map(KEYIN key, VALUEIN value,                       Context context) throws IOException, InterruptedException {     context.write((KEYOUT) key, (VALUEOUT) value);   }   /**    * Called once at the end of the task.    */   protected void cleanup(Context context                          ) throws IOException, InterruptedException {     // NOTHING   }      /**    * Expert users can override this method for more complete control over the    * execution of the Mapper.    * @param context    * @throws IOException    */   public void run(Context context) throws IOException, InterruptedException {     setup(context);     while (context.nextKeyValue()) {       map(context.getCurrentKey(), context.getCurrentValue(), context);     }     cleanup(context);   } }

Mapper的四個方法是setup,map,cleanup和run。其中,setup和cleanup用于管理Mapper生命周期中的資源,setup在完成Mapper構(gòu)造,即將開始執(zhí)行map動作前調(diào)用,cleanup則在所有的map動作完成后被調(diào)用。方法map用于對一次輸入的key/value對進行map動作。run方法執(zhí)行了上面描述的過程,它調(diào)用setup,讓后迭代所有的key/value對,進行map,最后調(diào)用cleanup。

org.apache.hadoop.mapreduce.lib.map中實現(xiàn)了Mapper的三個子類,分別是InverseMapper(將輸入 map為輸出),MultithreadedMapper(多線程執(zhí)行map方法)和TokenCounterMapper(對輸入的value分解為token并計數(shù))。其中最復雜的是MultithreadedMapper,我們就以它為例,來分析Mapper的實現(xiàn)。

InverseMapper源代碼:

 * Licensed to the Apache Software Foundation (ASF) under one


package org.apache.hadoop.mapreduce.lib.map;


import java.io.IOException;


/** A {@link Mapper} that swaps keys and values. */
public class InverseMapper extends Mapper {


  /** The inverse function.  Input keys and values are swapped.*/
  @Override
  public void map(K key, V value, Context context
                  ) throws IOException, InterruptedException {
    context.write(value, key);
  }
  
}

TokenCountMapper源代碼:

 * Licensed to the Apache Software Foundation (ASF) under one


package org.apache.hadoop.mapreduce.lib.map;


import java.io.IOException;


/**
 * Tokenize the input values and emit each word with a count of 1.
 */
public class TokenCounterMapper extends Mapper{
    
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();
  
  @Override
  public void map(Object key, Text value, Context context
                  ) throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    while (itr.hasMoreTokens()) {
      word.set(itr.nextToken());
      context.write(word, one);
    }
  }
}

MultithreadedMapper會啟動多個線程執(zhí)行另一個Mapper的map方法,它會啟動mapred.map.multithreadedrunner.threads(配置項)個線程執(zhí)行Mapper:mapred.map.multithreadedrunner.class(配置項)。MultithreadedMapper重寫了基類Mapper的run方法,啟動N個線程(對應的類為MapRunner)執(zhí)行mapred.map.multithreadedrunner.class(我們稱為目標Mapper)的run方法(就是說,目標Mapper的setup和cleanup會被執(zhí)行多次)。目標Mapper共享同一份InputSplit,這就意味著,對InputSplit的數(shù)據(jù)讀必須線程安全。為此,MultithreadedMapper引入了內(nèi)部類SubMapRecordReader,SubMapRecordWriter,SubMapStatusReporter,分別繼承自RecordReader,RecordWriter和StatusReporter,它們通過互斥訪問MultithreadedMapper的Mapper.Context,實現(xiàn)了對同一份InputSplit的線程安全訪問,為Mapper提供所需的Context。這些類的實現(xiàn)方法都很簡單。

關于“hadoop-Mapper的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。


網(wǎng)站名稱:hadoop-Mapper的示例分析
標題網(wǎng)址:http://weahome.cn/article/ggjcjg.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部