魯春利的工作筆記,誰(shuí)說(shuō)程序員不能有文藝范?
成都創(chuàng)新互聯(lián)專(zhuān)業(yè)為企業(yè)提供寧蒗網(wǎng)站建設(shè)、寧蒗做網(wǎng)站、寧蒗網(wǎng)站設(shè)計(jì)、寧蒗網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁(yè)設(shè)計(jì)與制作、寧蒗企業(yè)網(wǎng)站模板建站服務(wù),10多年寧蒗做網(wǎng)站經(jīng)驗(yàn),不只是建網(wǎng)站,更提供有價(jià)值的思路和整體網(wǎng)絡(luò)服務(wù)。
序列化和反序列化在分布式數(shù)據(jù)處理中,主要應(yīng)用于進(jìn)程建通信和永久存儲(chǔ)兩個(gè)領(lǐng)域。
序列化(serialization)就是結(jié)構(gòu)化的數(shù)據(jù)轉(zhuǎn)換為字節(jié)流以便在網(wǎng)絡(luò)上傳輸或?qū)懙酱疟P(pán)進(jìn)行永久存儲(chǔ)的過(guò)程;反序列化(deserialization)就是將字節(jié)流轉(zhuǎn)換回結(jié)構(gòu)化對(duì)象的逆過(guò)程。
Hadoop系統(tǒng)節(jié)點(diǎn)進(jìn)程間通信采用RPC實(shí)現(xiàn),Hadoop沒(méi)有采用Java的序列化機(jī)制,而是定義了兩個(gè)序列化相關(guān)的接口:Writable和Comparable,而這兩個(gè)接口由抽象出了一個(gè)WritableComparable接口。
在Hadoop中,Writable接口定義了兩個(gè)方法:
將數(shù)據(jù)寫(xiě)入二進(jìn)制格式的DataOutput流;
從二進(jìn)制格式讀取數(shù)據(jù)的DataInput流;
package org.apache.hadoop.io; public interface Writable { /** * Serialize the fields of this object toout
. */ void write(DataOutput out) throws IOException; /** * Deserialize the fields of this object fromin
. */ void readFields(DataInput in) throws IOException; }
Hadoop中Writable接口的結(jié)構(gòu)為
Writable WritableComparableIntWritable int(定長(zhǎng)) VintWritable int(變長(zhǎng)) BooleanWritable boolean ByteWritable byte(single byte) ShortWritable short FloatWritable float DoubleWritable double LongWritable long(定長(zhǎng)) VlongWirtable long(變長(zhǎng)) Text 是針對(duì)UTF-8序列的Writable類(lèi),一般認(rèn)為它等價(jià)于java.lang.String BytesWritable byte(byte sequence) ArrayWritable 數(shù)組 TwoDArrayWritable 二維數(shù)組 MapWritable implements Map SortedMapWritable implements SortedMap
WritableComparable接口提供了類(lèi)型比較的功能,而類(lèi)型比較對(duì)MapReduce至關(guān)重要。
package org.apache.hadoop.io; public interface WritableComparableextends Writable, Comparable {} # WritableComparator類(lèi)是一個(gè)通用實(shí)現(xiàn)。 1. 提供對(duì)原始compare()方法的一個(gè)默認(rèn)實(shí)現(xiàn); 2. 充當(dāng)RawComparator實(shí)例的工廠(已注冊(cè)Writable實(shí)現(xiàn)) package org.apache.hadoop.io; public class WritableComparator implements RawComparator, Configurable { /** For backwards compatibility. **/ public static WritableComparator get(Class extends WritableComparable> c) { return get(c, null); } /** Get a comparator for a {@link WritableComparable} implementation. */ public static WritableComparator get(Class extends WritableComparable> c, Configuration conf) { // ...... } }
簡(jiǎn)單示例
package com.lucl.hadoop.hdfs; import java.io.ByteArrayOutputStream; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.WritableComparator; /** * * @author lucl * */ public class CustomizeComparator { public static void main(String[] args) { @SuppressWarnings("unchecked") RawComparatorcomparator = WritableComparator.get(IntWritable.class); IntWritable int003 = new IntWritable(300); IntWritable int004 = new IntWritable(400); // 利用內(nèi)存字節(jié)數(shù)字實(shí)現(xiàn)writable到bytes的轉(zhuǎn)化 ByteArrayOutputStream bytes001 = new ByteArrayOutputStream(); DataOutput out001 = new DataOutputStream(bytes001); try { int003.write(out001); } catch (IOException e) { e.printStackTrace(); } byte [] b1 = bytes001.toByteArray(); // 利用內(nèi)存字節(jié)數(shù)字實(shí)現(xiàn)int到bytes的轉(zhuǎn)化 ByteArrayOutputStream bytes002 = new ByteArrayOutputStream(); DataOutput out002 = new DataOutputStream(bytes002); try { int004.write(out002); } catch (IOException e) { e.printStackTrace(); } byte [] b2 = bytes002.toByteArray(); int comvalue = comparator.compare(b1, 0, b1.length, b2, 0, b2.length); System.out.println("comvalue : " + comvalue); // 利用原始值比較int數(shù)據(jù) int value1 = int003.get(); int value2 = int004.get(); if (value1 > value2) { System.out.println("value1 > value2"); } else { System.out.println("value1 < value2"); } } }
MapReduce程序
需要處理的數(shù)據(jù)(不同類(lèi)型網(wǎng)站的訪問(wèn)量及流量)
[hadoop@nnode code]$ hdfs dfs -text /data/HTTP_SITE_FLOW.log 視頻網(wǎng)站 15 1527 信息安全 20 3156 站點(diǎn)統(tǒng)計(jì) 24 6960 搜索引擎 28 3659 站點(diǎn)統(tǒng)計(jì) 3 1938 綜合門(mén)戶 15 1938 搜索引擎 21 9531 搜索引擎 63 11058
自定義序列化類(lèi)
package com.lucl.hadoop.mapreduce.serialize; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class CustomizeWritable implements Writable { private Long pv; private Long flow; public CustomizeWritable () { // ...... } public CustomizeWritable (String pv, String flow) { this(Long.valueOf(pv), Long.valueOf(flow)); } public CustomizeWritable (Long pv, Long flow) { this.pv = pv; this.flow = flow; } @Override public void write(DataOutput out) throws IOException { out.writeLong(pv); out.writeLong(flow); } @Override public void readFields(DataInput in) throws IOException { pv = in.readLong(); flow = in.readLong(); } public Long getPv() { return pv; } public void setPv(Long pv) { this.pv = pv; } public Long getFlow() { return flow; } public void setFlow(Long flow) { this.flow = flow; } @Override public String toString() { return this.pv + "\t" + this.flow; } }
Mapper端代碼
package com.lucl.hadoop.mapreduce.serialize; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WritableMapper extends Mapper{ Text text = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String [] splited = value.toString().split("\t"); text.set(splited[0]); CustomizeWritable wr = new CustomizeWritable(splited[1], splited[2]); context.write(text, wr); } }
Reducer端代碼
package com.lucl.hadoop.mapreduce.serialize; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WritableReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { Long pv = 0L; Long flow = 0L; for (CustomizeWritable customizeWritable : values) { pv += customizeWritable.getPv(); flow += customizeWritable.getFlow(); } CustomizeWritable wr = new CustomizeWritable(pv, flow); context.write(key, wr); } }
驅(qū)動(dòng)類(lèi)
package com.lucl.hadoop.mapreduce.serialize; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; import com.lucl.hadoop.mapreduce.customize.MyWordCountApp; /** * @author lucl */ public class CustomizeWritableMRApp extends Configured implements Tool { private static final Logger logger = Logger.getLogger(MyWordCountApp.class); public static void main(String[] args) { try { ToolRunner.run(new CustomizeWritableMRApp(), args); } catch (Exception e) { e.printStackTrace(); } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { logger.info("Usage: wordcount[ ...] "); System.exit(2); } Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(CustomizeWritableMRApp.class); FileInputFormat.addInputPath(job, new Path(args[0])); /** * map */ job.setMapperClass(WritableMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CustomizeWritable.class); /** * reduce */ job.setReducerClass(WritableReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(CustomizeWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } }
程序運(yùn)行
[hadoop@nnode code]$ hadoop jar WRApp.jar /data/HTTP_SITE_FLOW.log /201511291404 15/11/29 14:44:13 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032 15/11/29 14:44:15 INFO input.FileInputFormat: Total input paths to process : 1 15/11/29 14:44:15 INFO mapreduce.JobSubmitter: number of splits:1 15/11/29 14:44:15 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1448763754600_0004 15/11/29 14:44:15 INFO impl.YarnClientImpl: Submitted application application_1448763754600_0004 15/11/29 14:44:15 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1448763754600_0004/ 15/11/29 14:44:15 INFO mapreduce.Job: Running job: job_1448763754600_0004 15/11/29 14:44:45 INFO mapreduce.Job: Job job_1448763754600_0004 running in uber mode : false 15/11/29 14:44:45 INFO mapreduce.Job: map 0% reduce 0% 15/11/29 14:45:14 INFO mapreduce.Job: map 100% reduce 0% 15/11/29 14:45:34 INFO mapreduce.Job: map 100% reduce 100% 15/11/29 14:45:34 INFO mapreduce.Job: Job job_1448763754600_0004 completed successfully 15/11/29 14:45:34 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=254 FILE: Number of bytes written=216031 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=277 HDFS: Number of bytes written=107 HDFS: Number of read operations=6 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=1 Launched reduce tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=27010 Total time spent by all reduces in occupied slots (ms)=16429 Total time spent by all map tasks (ms)=27010 Total time spent by all reduce tasks (ms)=16429 Total vcore-seconds taken by all map tasks=27010 Total vcore-seconds taken by all reduce tasks=16429 Total megabyte-seconds taken by all map tasks=27658240 Total megabyte-seconds taken by all reduce tasks=16823296 Map-Reduce Framework Map input records=8 Map output records=8 Map output bytes=232 Map output materialized bytes=254 Input split bytes=103 Combine input records=0 Combine output records=0 Reduce input groups=5 Reduce shuffle bytes=254 Reduce input records=8 Reduce output records=5 Spilled Records=16 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=167 CPU time spent (ms)=2320 Physical memory (bytes) snapshot=304205824 Virtual memory (bytes) snapshot=1695969280 Total committed heap usage (bytes)=136450048 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=174 File Output Format Counters Bytes Written=107 [hadoop@nnode code]$
查看結(jié)果
[hadoop@nnode code]$ hdfs dfs -ls /201511291404 Found 2 items -rw-r--r-- 2 hadoop hadoop 0 2015-11-29 14:45 /201511291404/_SUCCESS -rw-r--r-- 2 hadoop hadoop 107 2015-11-29 14:45 /201511291404/part-r-00000 [hadoop@nnode code]$ hdfs dfs -text /201511291404/part-r-00000 信息安全 20 3156 搜索引擎 112 24248 站點(diǎn)統(tǒng)計(jì) 27 8898 綜合門(mén)戶 15 1938 視頻網(wǎng)站 15 1527 [hadoop@nnode code]$
注意:在第一次執(zhí)行的時(shí)候報(bào)錯(cuò)如下
15/11/29 14:41:28 INFO mapreduce.Job: map 100% reduce 0% 15/11/29 14:42:04 INFO mapreduce.Job: Task Id : attempt_1448763754600_0003_r_000000_0, Status : FAILED Error: java.lang.RuntimeException: java.lang.NoSuchMethodException: com.lucl.hadoop.mapreduce.serialize.CustomizeWritable.() at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:131) at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:66) at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42) at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKeyValue(ReduceContextImpl.java:146) at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKey(ReduceContextImpl.java:121) at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.nextKey(WrappedReducer.java:302) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:170) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) Caused by: java.lang.NoSuchMethodException: com.lucl.hadoop.mapreduce.serialize.CustomizeWritable. () at java.lang.Class.getConstructor0(Class.java:2892) at java.lang.Class.getDeclaredConstructor(Class.java:2058) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:125) ... 13 more
在網(wǎng)上查詢?cè)驗(yàn)椤霸谧远xwritable的時(shí)候需要注意,反射過(guò)程中需要調(diào)用無(wú)參構(gòu)造,需要定義無(wú)參的構(gòu)造方法。”。