魯春利的工作筆記,誰(shuí)說(shuō)程序員不能有文藝范?
創(chuàng)新互聯(lián)2013年至今,是專業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項(xiàng)目網(wǎng)站設(shè)計(jì)制作、成都網(wǎng)站設(shè)計(jì)網(wǎng)站策劃,項(xiàng)目實(shí)施與項(xiàng)目整合能力。我們以讓每一個(gè)夢(mèng)想脫穎而出為使命,1280元博野做網(wǎng)站,已為上家服務(wù),為博野各地企業(yè)和個(gè)人服務(wù),聯(lián)系電話:028-86922220
MapReduce中map task任務(wù)的數(shù)量是由spli分片決定,那么reduce task的數(shù)量由什么來(lái)確定的呢?就是這里要討論的MapReduce分區(qū)。默認(rèn)情況下,MapReduce中使用的是HashPartitioner。
/** Partition keys by their {@link Object#hashCode()}. */ public class HashPartitionerextends Partitioner { /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
在HashPartitioner中g(shù)etPartition()方法有三個(gè)形參,key、value分別指的是Mapper任務(wù)的輸出,numReduceTasks指的是設(shè)置的Reducer任務(wù)數(shù)量,默認(rèn)值是1。通過(guò)取key的hashCode,然后通過(guò)和Integer.MAX_VALUE與運(yùn)算被轉(zhuǎn)換為一個(gè)非負(fù)整數(shù),任何整數(shù)與1相除的余數(shù)肯定是0。也就是說(shuō)getPartition(…)方法的返回值總是0,也就是Mapper任務(wù)的輸出總是送給一個(gè)Reducer任務(wù),最終只能輸出到一個(gè)文件中。
示例:對(duì)于通過(guò)不同協(xié)議訪問(wèn)某些url數(shù)據(jù)進(jìn)行統(tǒng)計(jì)(日志五元組)
原始數(shù)據(jù)
[hadoop@nnode code]$ hdfs dfs -text /http_interceptor_20130913.txt 2013-09-13 16:04:08 www.subnetc1.com 192.168.1.7 80 192.168.1.139 18863 FTP www.subnetc1.com/index.html 2013-09-13 16:04:08 www.subnetc2.com 192.168.1.7 80 192.168.1.159 14100 HTTP www.subnetc2.com/index.html 2013-09-13 16:04:08 www.subnetc3.com 192.168.1.7 80 192.168.1.130 4927 HTTPS www.subnetc3.com/index.html 2013-09-13 16:04:08 www.subnetc4.com 192.168.1.7 80 192.168.1.154 39044 HTTP www.subnetc4.com/index.html [hadoop@nnode code]$
實(shí)現(xiàn)Mapper
package com.lucl.hadoop.mapreduce.part; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * * @author luchunli * @description 實(shí)現(xiàn)Mapper * */ public class ProtocolMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String [] values = value.toString().split("\t"); if (null == values || values.length != 8) { return; } Text newKey = new Text(); Text newValue = new Text(); newKey.set(values[6].trim()); newValue.set(values[7].trim()); context.write(newKey, newValue); } }
實(shí)現(xiàn)Reducer
package com.lucl.hadoop.mapreduce.part; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * * @author luchunli * @description 實(shí)現(xiàn)Reducer * */ public class ProtocolReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { StringBuffer sbf = new StringBuffer(); for (Text text : values) { sbf.append(text.toString()); sbf.append(";"); } context.write(key, new Text(sbf.toString())); } }
實(shí)現(xiàn)Partitioner
package com.lucl.hadoop.mapreduce.part; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; /** * * @author luchunli * @description 自定義分區(qū)類 * */ public class ProtocolPartitioner extends Partitioner{ @Override public int getPartition(Text key, Text value, int numReduceTasks) { if (key.toString().equals("FTP")) { return 0; } if (key.toString().equals("HTTP")) { return 1; } if (key.toString().equals("HTTPS")) { return 2; } return 0; } }
實(shí)現(xiàn)驅(qū)動(dòng)器類
package com.lucl.hadoop.mapreduce.part; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ProtocolDriver extends Configured implements Tool { public static void main(String[] args) { try { ToolRunner.run(new ProtocolDriver(), args); } catch (Exception e) { e.printStackTrace(); } } @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName()); job.setJarByClass(ProtocolDriver.class); FileInputFormat.addInputPath(job, new Path(args[0])); job.setMapperClass(ProtocolMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 設(shè)置task reduce的個(gè)數(shù) job.setNumReduceTasks(3); job.setPartitionerClass(ProtocolPartitioner.class); job.setReducerClass(ProtocolReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // job.setOutputFormatClass(ProtocolOutputFormat.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } }
調(diào)用執(zhí)行
[hadoop@nnode code]$ hadoop jar PartMR.jar /http_interceptor_20130913.txt /2015120500018 15/12/05 21:41:12 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032 15/12/05 21:41:13 INFO input.FileInputFormat: Total input paths to process : 1 15/12/05 21:41:13 INFO mapreduce.JobSubmitter: number of splits:1 15/12/05 21:41:13 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1449302623953_0008 15/12/05 21:41:13 INFO impl.YarnClientImpl: Submitted application application_1449302623953_0008 15/12/05 21:41:14 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1449302623953_0008/ 15/12/05 21:41:14 INFO mapreduce.Job: Running job: job_1449302623953_0008 15/12/05 21:41:43 INFO mapreduce.Job: Job job_1449302623953_0008 running in uber mode : false 15/12/05 21:41:43 INFO mapreduce.Job: map 0% reduce 0% 15/12/05 21:42:12 INFO mapreduce.Job: map 100% reduce 0% 15/12/05 21:42:32 INFO mapreduce.Job: map 100% reduce 33% 15/12/05 21:42:52 INFO mapreduce.Job: map 100% reduce 100% 15/12/05 21:42:55 INFO mapreduce.Job: Job job_1449302623953_0008 completed successfully 15/12/05 21:42:55 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=158 FILE: Number of bytes written=431827 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=532 HDFS: Number of bytes written=130 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=6 Job Counters Killed reduce tasks=1 Launched map tasks=1 Launched reduce tasks=4 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=26277 Total time spent by all reduces in occupied slots (ms)=105054 Total time spent by all map tasks (ms)=26277 Total time spent by all reduce tasks (ms)=105054 Total vcore-seconds taken by all map tasks=26277 Total vcore-seconds taken by all reduce tasks=105054 Total megabyte-seconds taken by all map tasks=26907648 Total megabyte-seconds taken by all reduce tasks=107575296 Map-Reduce Framework Map input records=4 Map output records=4 Map output bytes=132 Map output materialized bytes=158 Input split bytes=109 Combine input records=0 Combine output records=0 Reduce input groups=3 Reduce shuffle bytes=158 Reduce input records=4 Reduce output records=3 Spilled Records=8 Shuffled Maps =3 Failed Shuffles=0 Merged Map outputs=3 GC time elapsed (ms)=410 CPU time spent (ms)=4360 Physical memory (bytes) snapshot=515862528 Virtual memory (bytes) snapshot=3399213056 Total committed heap usage (bytes)=167907328 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=423 File Output Format Counters Bytes Written=130 [hadoop@nnode code]$
查看結(jié)果
[hadoop@nnode code]$ hdfs dfs -ls /2015120500018 Found 4 items -rw-r--r-- 2 hadoop hadoop 0 2015-12-05 21:42 /2015120500018/_SUCCESS -rw-r--r-- 2 hadoop hadoop 33 2015-12-05 21:42 /2015120500018/part-r-00000 -rw-r--r-- 2 hadoop hadoop 62 2015-12-05 21:42 /2015120500018/part-r-00001 -rw-r--r-- 2 hadoop hadoop 35 2015-12-05 21:42 /2015120500018/part-r-00002 [hadoop@nnode code]$ hdfs dfs -text /2015120500018/part-r-00000 FTP www.subnetc1.com/index.html; [hadoop@nnode code]$ hdfs dfs -text /2015120500018/part-r-00001 HTTP www.subnetc4.com/index.html;www.subnetc2.com/index.html; [hadoop@nnode code]$ hdfs dfs -text /2015120500018/part-r-00002 HTTPS www.subnetc3.com/index.html; [hadoop@nnode code]$
上述生成的文件命名格式是MapReduce根據(jù)任務(wù)自動(dòng)生成的,我們可以通過(guò)自定義OutputFormat來(lái)自定義輸出文件的名稱。
自定義的OutputFormat代碼如下,這里和之前的MultipleWorkCount的區(qū)別在于本示例中直接通過(guò)FSDataOutputStream來(lái)實(shí)現(xiàn),而不是之前調(diào)用LineRecordWriter的方式。
package com.lucl.hadoop.mapreduce.part; import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.HashMap; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** * * @author luchunli * @description 自定義OutputFormat */ public class ProtocolOutputFormat extends TextOutputFormat{ protected static class ProtocolRecordWriter extends RecordWriter { private static final String utf8 = "UTF-8"; private static final byte[] newline; static { try { newline = "\n".getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } protected TaskAttemptContext context = null; protected HashMap recordStream = null; protected Path workPath = null; public ProtocolRecordWriter () {} public ProtocolRecordWriter (TaskAttemptContext context, Path workPath) { this.context = context; this.workPath = workPath; recordStream = new HashMap (); } @Override public void write(Text key, Text value) throws IOException, InterruptedException { boolean nullKey = key == null; boolean nullValue = value == null; if (nullKey && nullValue) { return; } DataOutputStream out = recordStream.get(key); if (null == out) { Path file = new Path(workPath, key + ".txt"); out = file.getFileSystem(this.context.getConfiguration()).create(file, false); recordStream.put(key, out); } if (!nullKey) { out.write(key.getBytes(), 0, key.getLength()); } if (!(nullKey || nullValue)) { out.write("\t".getBytes()); } if (!nullValue) { out.write(value.getBytes(), 0, value.getLength()); } out.write(newline); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { for (DataOutputStream out : recordStream.values()) { out.close(); } recordStream.clear(); recordStream = null; } } @Override public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { Path workPath = this.getTaskOutputPath(context); return new ProtocolRecordWriter(context, workPath); } private Path getTaskOutputPath(TaskAttemptContext context) throws IOException { Path workPath = null; OutputCommitter committer = super.getOutputCommitter(context); if (committer instanceof FileOutputCommitter) { // Get the directory that the task should write results into. workPath = ((FileOutputCommitter) committer).getWorkPath(); } else { // Get the {@link Path} to the output directory for the map-reduce job. // context.getConfiguration().get(FileOutputFormat.OUTDIR); Path outputPath = super.getOutputPath(context); if (null == outputPath) { throw new IOException("Undefined job output-path."); } workPath = outputPath; } return workPath; } }
再次運(yùn)行
[hadoop@nnode code]$ hadoop jar PartMR.jar /http_interceptor_20130913.txt /2015120500020 15/12/05 21:59:28 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032 15/12/05 21:59:30 INFO input.FileInputFormat: Total input paths to process : 1 15/12/05 21:59:30 INFO mapreduce.JobSubmitter: number of splits:1 15/12/05 21:59:30 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1449302623953_0010 15/12/05 21:59:30 INFO impl.YarnClientImpl: Submitted application application_1449302623953_0010 15/12/05 21:59:31 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1449302623953_0010/ 15/12/05 21:59:31 INFO mapreduce.Job: Running job: job_1449302623953_0010 15/12/05 22:00:00 INFO mapreduce.Job: Job job_1449302623953_0010 running in uber mode : false 15/12/05 22:00:00 INFO mapreduce.Job: map 0% reduce 0% 15/12/05 22:00:29 INFO mapreduce.Job: map 100% reduce 0% 15/12/05 22:00:48 INFO mapreduce.Job: map 100% reduce 33% 15/12/05 22:01:07 INFO mapreduce.Job: map 100% reduce 100% 15/12/05 22:01:07 INFO mapreduce.Job: Job job_1449302623953_0010 completed successfully 15/12/05 22:01:07 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=158 FILE: Number of bytes written=432595 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=532 HDFS: Number of bytes written=130 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=6 Job Counters Killed reduce tasks=1 Launched map tasks=1 Launched reduce tasks=4 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=26075 Total time spent by all reduces in occupied slots (ms)=92427 Total time spent by all map tasks (ms)=26075 Total time spent by all reduce tasks (ms)=92427 Total vcore-seconds taken by all map tasks=26075 Total vcore-seconds taken by all reduce tasks=92427 Total megabyte-seconds taken by all map tasks=26700800 Total megabyte-seconds taken by all reduce tasks=94645248 Map-Reduce Framework Map input records=4 Map output records=4 Map output bytes=132 Map output materialized bytes=158 Input split bytes=109 Combine input records=0 Combine output records=0 Reduce input groups=3 Reduce shuffle bytes=158 Reduce input records=4 Reduce output records=3 Spilled Records=8 Shuffled Maps =3 Failed Shuffles=0 Merged Map outputs=3 GC time elapsed (ms)=339 CPU time spent (ms)=4690 Physical memory (bytes) snapshot=513667072 Virtual memory (bytes) snapshot=3405312000 Total committed heap usage (bytes)=167907328 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=423 File Output Format Counters Bytes Written=130 [hadoop@nnode code]$
查看結(jié)果
[hadoop@nnode code]$ hdfs dfs -ls /2015120500020 Found 4 items -rw-r--r-- 2 hadoop hadoop 33 2015-12-05 22:01 /2015120500020/FTP.txt -rw-r--r-- 2 hadoop hadoop 62 2015-12-05 22:00 /2015120500020/HTTP.txt -rw-r--r-- 2 hadoop hadoop 35 2015-12-05 22:01 /2015120500020/HTTPS.txt -rw-r--r-- 2 hadoop hadoop 0 2015-12-05 22:01 /2015120500020/_SUCCESS [hadoop@nnode code]$ hdfs dfs -text /2015120500020/FTP.txt FTP www.subnetc1.com/index.html; [hadoop@nnode code]$ hdfs dfs -text /2015120500020/HTTP.txt HTTP www.subnetc4.com/index.html;www.subnetc2.com/index.html; [hadoop@nnode code]$ hdfs dfs -text /2015120500020/HTTPS.txt HTTPS www.subnetc3.com/index.html; [hadoop@nnode code]$