真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Hadoop2.6.0學(xué)習(xí)筆記(七)MapReduce分區(qū)

魯春利的工作筆記,誰(shuí)說(shuō)程序員不能有文藝范?

創(chuàng)新互聯(lián)2013年至今,是專業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項(xiàng)目網(wǎng)站設(shè)計(jì)制作、成都網(wǎng)站設(shè)計(jì)網(wǎng)站策劃,項(xiàng)目實(shí)施與項(xiàng)目整合能力。我們以讓每一個(gè)夢(mèng)想脫穎而出為使命,1280元博野做網(wǎng)站,已為上家服務(wù),為博野各地企業(yè)和個(gè)人服務(wù),聯(lián)系電話:028-86922220


MapReduce中map task任務(wù)的數(shù)量是由spli分片決定,那么reduce task的數(shù)量由什么來(lái)確定的呢?就是這里要討論的MapReduce分區(qū)。默認(rèn)情況下,MapReduce中使用的是HashPartitioner。

/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner extends Partitioner {

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value, int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}

在HashPartitioner中g(shù)etPartition()方法有三個(gè)形參,key、value分別指的是Mapper任務(wù)的輸出,numReduceTasks指的是設(shè)置的Reducer任務(wù)數(shù)量,默認(rèn)值是1。通過(guò)取key的hashCode,然后通過(guò)和Integer.MAX_VALUE與運(yùn)算被轉(zhuǎn)換為一個(gè)非負(fù)整數(shù),任何整數(shù)與1相除的余數(shù)肯定是0。也就是說(shuō)getPartition(…)方法的返回值總是0,也就是Mapper任務(wù)的輸出總是送給一個(gè)Reducer任務(wù),最終只能輸出到一個(gè)文件中。

示例:對(duì)于通過(guò)不同協(xié)議訪問(wèn)某些url數(shù)據(jù)進(jìn)行統(tǒng)計(jì)(日志五元組)

原始數(shù)據(jù)

[hadoop@nnode code]$ hdfs dfs -text /http_interceptor_20130913.txt
2013-09-13 16:04:08     www.subnetc1.com        192.168.1.7     80      192.168.1.139   18863   FTP     www.subnetc1.com/index.html
2013-09-13 16:04:08     www.subnetc2.com        192.168.1.7     80      192.168.1.159   14100   HTTP    www.subnetc2.com/index.html
2013-09-13 16:04:08     www.subnetc3.com        192.168.1.7     80      192.168.1.130   4927    HTTPS   www.subnetc3.com/index.html
2013-09-13 16:04:08     www.subnetc4.com        192.168.1.7     80      192.168.1.154   39044   HTTP    www.subnetc4.com/index.html
[hadoop@nnode code]$

實(shí)現(xiàn)Mapper

package com.lucl.hadoop.mapreduce.part;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * 
 * @author luchunli
 * @description 實(shí)現(xiàn)Mapper
 *
 */
public class ProtocolMapper extends Mapper {

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String [] values = value.toString().split("\t");
        if (null == values || values.length != 8) {
            return;
        }
        Text newKey = new Text();
        Text newValue = new Text();
        newKey.set(values[6].trim());
        newValue.set(values[7].trim());
        
        context.write(newKey, newValue);
    }
}

實(shí)現(xiàn)Reducer

package com.lucl.hadoop.mapreduce.part;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 
 * @author luchunli
 * @description 實(shí)現(xiàn)Reducer
 *
 */
public class ProtocolReducer extends Reducer {
    @Override
    protected void reduce(Text key, Iterable values, Context context)
            throws IOException, InterruptedException {
        StringBuffer sbf = new StringBuffer();
        for (Text text : values) {
            sbf.append(text.toString());
            sbf.append(";");
        }
        context.write(key, new Text(sbf.toString()));
    }
}

實(shí)現(xiàn)Partitioner

package com.lucl.hadoop.mapreduce.part;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * 
 * @author luchunli
 * @description 自定義分區(qū)類
 *
 */
public class ProtocolPartitioner extends Partitioner {

    @Override
    public int getPartition(Text key, Text value, int numReduceTasks) {
        if (key.toString().equals("FTP")) {
            return 0;
        } 
        if (key.toString().equals("HTTP")) {
            return 1;
        }
        if (key.toString().equals("HTTPS")) {
            return 2;
        }
        return 0;
    }

}

實(shí)現(xiàn)驅(qū)動(dòng)器類

package com.lucl.hadoop.mapreduce.part;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ProtocolDriver extends Configured implements Tool {

    public static void main(String[] args) {
        try {
            ToolRunner.run(new ProtocolDriver(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());
        
        job.setJarByClass(ProtocolDriver.class);
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        
        job.setMapperClass(ProtocolMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        
        // 設(shè)置task reduce的個(gè)數(shù)
        job.setNumReduceTasks(3);
        job.setPartitionerClass(ProtocolPartitioner.class);
        
        job.setReducerClass(ProtocolReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        // job.setOutputFormatClass(ProtocolOutputFormat.class);
        
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        return job.waitForCompletion(true) ? 0 : 1;
    }

}

調(diào)用執(zhí)行

[hadoop@nnode code]$ hadoop jar PartMR.jar /http_interceptor_20130913.txt /2015120500018
15/12/05 21:41:12 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032
15/12/05 21:41:13 INFO input.FileInputFormat: Total input paths to process : 1
15/12/05 21:41:13 INFO mapreduce.JobSubmitter: number of splits:1
15/12/05 21:41:13 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1449302623953_0008
15/12/05 21:41:13 INFO impl.YarnClientImpl: Submitted application application_1449302623953_0008
15/12/05 21:41:14 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1449302623953_0008/
15/12/05 21:41:14 INFO mapreduce.Job: Running job: job_1449302623953_0008
15/12/05 21:41:43 INFO mapreduce.Job: Job job_1449302623953_0008 running in uber mode : false
15/12/05 21:41:43 INFO mapreduce.Job:  map 0% reduce 0%
15/12/05 21:42:12 INFO mapreduce.Job:  map 100% reduce 0%
15/12/05 21:42:32 INFO mapreduce.Job:  map 100% reduce 33%
15/12/05 21:42:52 INFO mapreduce.Job:  map 100% reduce 100%
15/12/05 21:42:55 INFO mapreduce.Job: Job job_1449302623953_0008 completed successfully
15/12/05 21:42:55 INFO mapreduce.Job: Counters: 50
        File System Counters
                FILE: Number of bytes read=158
                FILE: Number of bytes written=431827
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=532
                HDFS: Number of bytes written=130
                HDFS: Number of read operations=12
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=6
        Job Counters 
                Killed reduce tasks=1
                Launched map tasks=1
                Launched reduce tasks=4
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=26277
                Total time spent by all reduces in occupied slots (ms)=105054
                Total time spent by all map tasks (ms)=26277
                Total time spent by all reduce tasks (ms)=105054
                Total vcore-seconds taken by all map tasks=26277
                Total vcore-seconds taken by all reduce tasks=105054
                Total megabyte-seconds taken by all map tasks=26907648
                Total megabyte-seconds taken by all reduce tasks=107575296
        Map-Reduce Framework
                Map input records=4
                Map output records=4
                Map output bytes=132
                Map output materialized bytes=158
                Input split bytes=109
                Combine input records=0
                Combine output records=0
                Reduce input groups=3
                Reduce shuffle bytes=158
                Reduce input records=4
                Reduce output records=3
                Spilled Records=8
                Shuffled Maps =3
                Failed Shuffles=0
                Merged Map outputs=3
                GC time elapsed (ms)=410
                CPU time spent (ms)=4360
                Physical memory (bytes) snapshot=515862528
                Virtual memory (bytes) snapshot=3399213056
                Total committed heap usage (bytes)=167907328
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters 
                Bytes Read=423
        File Output Format Counters 
                Bytes Written=130
[hadoop@nnode code]$

查看結(jié)果

[hadoop@nnode code]$ hdfs dfs -ls /2015120500018
Found 4 items
-rw-r--r--   2 hadoop hadoop          0 2015-12-05 21:42 /2015120500018/_SUCCESS
-rw-r--r--   2 hadoop hadoop         33 2015-12-05 21:42 /2015120500018/part-r-00000
-rw-r--r--   2 hadoop hadoop         62 2015-12-05 21:42 /2015120500018/part-r-00001
-rw-r--r--   2 hadoop hadoop         35 2015-12-05 21:42 /2015120500018/part-r-00002
[hadoop@nnode code]$ hdfs dfs -text /2015120500018/part-r-00000
FTP     www.subnetc1.com/index.html;
[hadoop@nnode code]$ hdfs dfs -text /2015120500018/part-r-00001
HTTP    www.subnetc4.com/index.html;www.subnetc2.com/index.html;
[hadoop@nnode code]$ hdfs dfs -text /2015120500018/part-r-00002
HTTPS   www.subnetc3.com/index.html;
[hadoop@nnode code]$

上述生成的文件命名格式是MapReduce根據(jù)任務(wù)自動(dòng)生成的,我們可以通過(guò)自定義OutputFormat來(lái)自定義輸出文件的名稱。

Hadoop2.6.0學(xué)習(xí)筆記(七)MapReduce分區(qū)

自定義的OutputFormat代碼如下,這里和之前的MultipleWorkCount的區(qū)別在于本示例中直接通過(guò)FSDataOutputStream來(lái)實(shí)現(xiàn),而不是之前調(diào)用LineRecordWriter的方式。

package com.lucl.hadoop.mapreduce.part;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * 
 * @author luchunli
 * @description 自定義OutputFormat
 */
public class ProtocolOutputFormat extends TextOutputFormat {
    protected static class ProtocolRecordWriter extends RecordWriter {
        private static final String utf8 = "UTF-8";
        private static final byte[] newline;
        static {
          try {
            newline = "\n".getBytes(utf8);
          } catch (UnsupportedEncodingException uee) {
            throw new IllegalArgumentException("can't find " + utf8 + " encoding");
          }
        }
        
        protected TaskAttemptContext context = null;
        
        protected HashMap recordStream = null;
        protected Path workPath = null;
        
        public ProtocolRecordWriter () {}
        
        public ProtocolRecordWriter (TaskAttemptContext context, Path workPath) {
            this.context = context;
            this.workPath = workPath;
            recordStream = new HashMap();
        }

        @Override
        public void write(Text key, Text value) throws IOException, InterruptedException {
              boolean nullKey = key == null;
              boolean nullValue = value == null;
              if (nullKey && nullValue) {
                return;
              }
              DataOutputStream out = recordStream.get(key);
              if (null == out) {
                  Path file = new Path(workPath, key + ".txt");
                  out = file.getFileSystem(this.context.getConfiguration()).create(file, false);
                  recordStream.put(key, out);
              }
              if (!nullKey) {
                 out.write(key.getBytes(), 0, key.getLength());
              }
              if (!(nullKey || nullValue)) {
                out.write("\t".getBytes());
              }
              if (!nullValue) {
                 out.write(value.getBytes(), 0, value.getLength());
              }
              out.write(newline);
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException,
                InterruptedException {
            for (DataOutputStream out : recordStream.values()) {
                out.close();
            }
            recordStream.clear();
            recordStream = null;
        }
    }
     
    @Override
    public RecordWriter getRecordWriter(TaskAttemptContext context)
            throws IOException, InterruptedException {
        Path workPath = this.getTaskOutputPath(context);
        return new ProtocolRecordWriter(context, workPath);
    }
    
    private Path getTaskOutputPath(TaskAttemptContext context) throws IOException {
        Path workPath = null;
        OutputCommitter committer = super.getOutputCommitter(context);
        
        if (committer instanceof FileOutputCommitter) {
            // Get the directory that the task should write results into.
            workPath = ((FileOutputCommitter) committer).getWorkPath();
        } else {
            // Get the {@link Path} to the output directory for the map-reduce job.
            // context.getConfiguration().get(FileOutputFormat.OUTDIR);
            Path outputPath = super.getOutputPath(context);
            if (null == outputPath) {
                throw new IOException("Undefined job output-path.");
            }
            workPath = outputPath;
        }
        
        return workPath;
    }
}

再次運(yùn)行

[hadoop@nnode code]$ hadoop jar PartMR.jar /http_interceptor_20130913.txt /2015120500020
15/12/05 21:59:28 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032
15/12/05 21:59:30 INFO input.FileInputFormat: Total input paths to process : 1
15/12/05 21:59:30 INFO mapreduce.JobSubmitter: number of splits:1
15/12/05 21:59:30 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1449302623953_0010
15/12/05 21:59:30 INFO impl.YarnClientImpl: Submitted application application_1449302623953_0010
15/12/05 21:59:31 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1449302623953_0010/
15/12/05 21:59:31 INFO mapreduce.Job: Running job: job_1449302623953_0010
15/12/05 22:00:00 INFO mapreduce.Job: Job job_1449302623953_0010 running in uber mode : false
15/12/05 22:00:00 INFO mapreduce.Job:  map 0% reduce 0%
15/12/05 22:00:29 INFO mapreduce.Job:  map 100% reduce 0%
15/12/05 22:00:48 INFO mapreduce.Job:  map 100% reduce 33%
15/12/05 22:01:07 INFO mapreduce.Job:  map 100% reduce 100%
15/12/05 22:01:07 INFO mapreduce.Job: Job job_1449302623953_0010 completed successfully
15/12/05 22:01:07 INFO mapreduce.Job: Counters: 50
        File System Counters
                FILE: Number of bytes read=158
                FILE: Number of bytes written=432595
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=532
                HDFS: Number of bytes written=130
                HDFS: Number of read operations=12
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=6
        Job Counters 
                Killed reduce tasks=1
                Launched map tasks=1
                Launched reduce tasks=4
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=26075
                Total time spent by all reduces in occupied slots (ms)=92427
                Total time spent by all map tasks (ms)=26075
                Total time spent by all reduce tasks (ms)=92427
                Total vcore-seconds taken by all map tasks=26075
                Total vcore-seconds taken by all reduce tasks=92427
                Total megabyte-seconds taken by all map tasks=26700800
                Total megabyte-seconds taken by all reduce tasks=94645248
        Map-Reduce Framework
                Map input records=4
                Map output records=4
                Map output bytes=132
                Map output materialized bytes=158
                Input split bytes=109
                Combine input records=0
                Combine output records=0
                Reduce input groups=3
                Reduce shuffle bytes=158
                Reduce input records=4
                Reduce output records=3
                Spilled Records=8
                Shuffled Maps =3
                Failed Shuffles=0
                Merged Map outputs=3
                GC time elapsed (ms)=339
                CPU time spent (ms)=4690
                Physical memory (bytes) snapshot=513667072
                Virtual memory (bytes) snapshot=3405312000
                Total committed heap usage (bytes)=167907328
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters 
                Bytes Read=423
        File Output Format Counters 
                Bytes Written=130
[hadoop@nnode code]$

查看結(jié)果

[hadoop@nnode code]$ hdfs dfs -ls /2015120500020
Found 4 items
-rw-r--r--   2 hadoop hadoop         33 2015-12-05 22:01 /2015120500020/FTP.txt
-rw-r--r--   2 hadoop hadoop         62 2015-12-05 22:00 /2015120500020/HTTP.txt
-rw-r--r--   2 hadoop hadoop         35 2015-12-05 22:01 /2015120500020/HTTPS.txt
-rw-r--r--   2 hadoop hadoop          0 2015-12-05 22:01 /2015120500020/_SUCCESS
[hadoop@nnode code]$ hdfs dfs -text /2015120500020/FTP.txt
FTP     www.subnetc1.com/index.html;
[hadoop@nnode code]$ hdfs dfs -text /2015120500020/HTTP.txt
HTTP    www.subnetc4.com/index.html;www.subnetc2.com/index.html;
[hadoop@nnode code]$ hdfs dfs -text /2015120500020/HTTPS.txt
HTTPS   www.subnetc3.com/index.html;
[hadoop@nnode code]$

文章名稱:Hadoop2.6.0學(xué)習(xí)筆記(七)MapReduce分區(qū)
文章鏈接:http://weahome.cn/article/ghoepd.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部