真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Hadoop2.6.0學(xué)習(xí)筆記(三)Hadoop序列化

魯春利的工作筆記,誰(shuí)說(shuō)程序員不能有文藝范?

成都創(chuàng)新互聯(lián)專(zhuān)業(yè)為企業(yè)提供寧蒗網(wǎng)站建設(shè)、寧蒗做網(wǎng)站、寧蒗網(wǎng)站設(shè)計(jì)、寧蒗網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁(yè)設(shè)計(jì)與制作、寧蒗企業(yè)網(wǎng)站模板建站服務(wù),10多年寧蒗做網(wǎng)站經(jīng)驗(yàn),不只是建網(wǎng)站,更提供有價(jià)值的思路和整體網(wǎng)絡(luò)服務(wù)。


 

序列化和反序列化在分布式數(shù)據(jù)處理中,主要應(yīng)用于進(jìn)程建通信和永久存儲(chǔ)兩個(gè)領(lǐng)域。

序列化(serialization)就是結(jié)構(gòu)化的數(shù)據(jù)轉(zhuǎn)換為字節(jié)流以便在網(wǎng)絡(luò)上傳輸或?qū)懙酱疟P(pán)進(jìn)行永久存儲(chǔ)的過(guò)程;反序列化(deserialization)就是將字節(jié)流轉(zhuǎn)換回結(jié)構(gòu)化對(duì)象的逆過(guò)程。

Hadoop系統(tǒng)節(jié)點(diǎn)進(jìn)程間通信采用RPC實(shí)現(xiàn),Hadoop沒(méi)有采用Java的序列化機(jī)制,而是定義了兩個(gè)序列化相關(guān)的接口:Writable和Comparable,而這兩個(gè)接口由抽象出了一個(gè)WritableComparable接口。

在Hadoop中,Writable接口定義了兩個(gè)方法:

    將數(shù)據(jù)寫(xiě)入二進(jìn)制格式的DataOutput流;

    從二進(jìn)制格式讀取數(shù)據(jù)的DataInput流;

package org.apache.hadoop.io;
public interface Writable {
  /** 
   * Serialize the fields of this object to out.
   */
  void write(DataOutput out) throws IOException;

  /** 
   * Deserialize the fields of this object from in.  
   */
  void readFields(DataInput in) throws IOException;
}

    Hadoop中Writable接口的結(jié)構(gòu)為

 Writable
     WritableComparable
         IntWritable                 int(定長(zhǎng))
         VintWritable                int(變長(zhǎng))
         BooleanWritable             boolean
         ByteWritable                byte(single byte)
         ShortWritable               short
         FloatWritable               float
         DoubleWritable              double
         LongWritable                long(定長(zhǎng))
         VlongWirtable               long(變長(zhǎng))
         Text 是針對(duì)UTF-8序列的Writable類(lèi),一般認(rèn)為它等價(jià)于java.lang.String
         BytesWritable               byte(byte sequence)
   ArrayWritable                     數(shù)組
   TwoDArrayWritable                 二維數(shù)組
   MapWritable    implements Map    
   SortedMapWritable implements SortedMap

WritableComparable接口提供了類(lèi)型比較的功能,而類(lèi)型比較對(duì)MapReduce至關(guān)重要。

package org.apache.hadoop.io;
public interface WritableComparable extends Writable, Comparable {}

# WritableComparator類(lèi)是一個(gè)通用實(shí)現(xiàn)。
1. 提供對(duì)原始compare()方法的一個(gè)默認(rèn)實(shí)現(xiàn);
2. 充當(dāng)RawComparator實(shí)例的工廠(已注冊(cè)Writable實(shí)現(xiàn))
package org.apache.hadoop.io;
public class WritableComparator implements RawComparator, Configurable {
      /** For backwards compatibility. **/
      public static WritableComparator get(Class c) {
          return get(c, null);
      }
      
        /** Get a comparator for a {@link WritableComparable} implementation. */
      public static WritableComparator get(Class c, Configuration conf) {
          // ......
      }
}

簡(jiǎn)單示例

package com.lucl.hadoop.hdfs;

import java.io.ByteArrayOutputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;

/**
 * 
 * @author lucl
 *
 */
public class CustomizeComparator {
    public static void main(String[] args) {
        @SuppressWarnings("unchecked")
        RawComparator comparator = WritableComparator.get(IntWritable.class);
        IntWritable int003 = new IntWritable(300);
        IntWritable int004 = new IntWritable(400);
        // 利用內(nèi)存字節(jié)數(shù)字實(shí)現(xiàn)writable到bytes的轉(zhuǎn)化
        ByteArrayOutputStream bytes001 = new ByteArrayOutputStream();
        DataOutput out001 = new DataOutputStream(bytes001);
        try {
            int003.write(out001);
        } catch (IOException e) {
            e.printStackTrace();
        }
        byte [] b1 = bytes001.toByteArray();
        
        // 利用內(nèi)存字節(jié)數(shù)字實(shí)現(xiàn)int到bytes的轉(zhuǎn)化
        ByteArrayOutputStream bytes002 = new ByteArrayOutputStream();
        DataOutput out002 = new DataOutputStream(bytes002);
        try {
            int004.write(out002);
        } catch (IOException e) {
            e.printStackTrace();
        }
        byte [] b2 = bytes002.toByteArray();
        
        int comvalue = comparator.compare(b1, 0, b1.length, b2, 0, b2.length);
        System.out.println("comvalue : " + comvalue);
        
        // 利用原始值比較int數(shù)據(jù)
        int value1 = int003.get();
        int value2 = int004.get();
        if (value1 > value2) {
            System.out.println("value1 > value2");
        } else {
            System.out.println("value1 < value2");
        }
    }
}

MapReduce程序

需要處理的數(shù)據(jù)(不同類(lèi)型網(wǎng)站的訪問(wèn)量及流量)

[hadoop@nnode code]$ hdfs dfs -text /data/HTTP_SITE_FLOW.log
視頻網(wǎng)站        15      1527
信息安全        20      3156
站點(diǎn)統(tǒng)計(jì)        24      6960
搜索引擎        28      3659
站點(diǎn)統(tǒng)計(jì)        3       1938
綜合門(mén)戶        15      1938
搜索引擎        21      9531
搜索引擎        63      11058

自定義序列化類(lèi)

package com.lucl.hadoop.mapreduce.serialize;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class CustomizeWritable implements Writable {
    private Long pv;
    private Long flow;
    
    public CustomizeWritable () {
        // ......
    }
    
    public CustomizeWritable (String pv, String flow) {
        this(Long.valueOf(pv), Long.valueOf(flow));
    }
    
    public CustomizeWritable (Long pv, Long flow) {
        this.pv = pv;
        this.flow = flow;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(pv);
        out.writeLong(flow);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        pv = in.readLong();
        flow = in.readLong();
    }
    
    public Long getPv() {
        return pv;
    }

    public void setPv(Long pv) {
        this.pv = pv;
    }

    public Long getFlow() {
        return flow;
    }

    public void setFlow(Long flow) {
        this.flow = flow;
    }

    @Override
    public String toString() {
        return this.pv + "\t" + this.flow;
    }
}

Mapper端代碼

package com.lucl.hadoop.mapreduce.serialize;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WritableMapper extends Mapper {
    Text text = new Text();
    
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String [] splited = value.toString().split("\t");
        
        text.set(splited[0]);
        CustomizeWritable wr = new CustomizeWritable(splited[1], splited[2]);
        
        context.write(text, wr);
    }
    
}

Reducer端代碼

package com.lucl.hadoop.mapreduce.serialize;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WritableReducer extends Reducer {
    @Override
    protected void reduce(Text key, Iterable values, Context context)
            throws IOException, InterruptedException {
        Long pv = 0L;
        Long flow = 0L;
        
        for (CustomizeWritable customizeWritable : values) {
            pv += customizeWritable.getPv();
            flow += customizeWritable.getFlow();
        }
        
        CustomizeWritable wr = new CustomizeWritable(pv, flow);
        
        context.write(key, wr);
    }
}

驅(qū)動(dòng)類(lèi)

package com.lucl.hadoop.mapreduce.serialize;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;

import com.lucl.hadoop.mapreduce.customize.MyWordCountApp;

/**
 * @author lucl
 */
public class CustomizeWritableMRApp extends Configured implements Tool {
    private static final Logger logger = Logger.getLogger(MyWordCountApp.class);
    
    public static void main(String[] args) {
        try {
            ToolRunner.run(new CustomizeWritableMRApp(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length < 2) {
            logger.info("Usage: wordcount  [...] ");
            System.exit(2);
        }
        
        Job job = Job.getInstance(conf, this.getClass().getSimpleName());
        
        job.setJarByClass(CustomizeWritableMRApp.class);
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        
        /**
         * map
         */
        job.setMapperClass(WritableMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(CustomizeWritable.class);
        
        /**
         * reduce
         */
        job.setReducerClass(WritableReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(CustomizeWritable.class);
        
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        return job.waitForCompletion(true) ? 0 : 1;
    }
}

程序運(yùn)行

[hadoop@nnode code]$ hadoop jar WRApp.jar /data/HTTP_SITE_FLOW.log /201511291404
15/11/29 14:44:13 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032
15/11/29 14:44:15 INFO input.FileInputFormat: Total input paths to process : 1
15/11/29 14:44:15 INFO mapreduce.JobSubmitter: number of splits:1
15/11/29 14:44:15 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1448763754600_0004
15/11/29 14:44:15 INFO impl.YarnClientImpl: Submitted application application_1448763754600_0004
15/11/29 14:44:15 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1448763754600_0004/
15/11/29 14:44:15 INFO mapreduce.Job: Running job: job_1448763754600_0004
15/11/29 14:44:45 INFO mapreduce.Job: Job job_1448763754600_0004 running in uber mode : false
15/11/29 14:44:45 INFO mapreduce.Job:  map 0% reduce 0%
15/11/29 14:45:14 INFO mapreduce.Job:  map 100% reduce 0%
15/11/29 14:45:34 INFO mapreduce.Job:  map 100% reduce 100%
15/11/29 14:45:34 INFO mapreduce.Job: Job job_1448763754600_0004 completed successfully
15/11/29 14:45:34 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=254
                FILE: Number of bytes written=216031
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=277
                HDFS: Number of bytes written=107
                HDFS: Number of read operations=6
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters 
                Launched map tasks=1
                Launched reduce tasks=1
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=27010
                Total time spent by all reduces in occupied slots (ms)=16429
                Total time spent by all map tasks (ms)=27010
                Total time spent by all reduce tasks (ms)=16429
                Total vcore-seconds taken by all map tasks=27010
                Total vcore-seconds taken by all reduce tasks=16429
                Total megabyte-seconds taken by all map tasks=27658240
                Total megabyte-seconds taken by all reduce tasks=16823296
        Map-Reduce Framework
                Map input records=8
                Map output records=8
                Map output bytes=232
                Map output materialized bytes=254
                Input split bytes=103
                Combine input records=0
                Combine output records=0
                Reduce input groups=5
                Reduce shuffle bytes=254
                Reduce input records=8
                Reduce output records=5
                Spilled Records=16
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=167
                CPU time spent (ms)=2320
                Physical memory (bytes) snapshot=304205824
                Virtual memory (bytes) snapshot=1695969280
                Total committed heap usage (bytes)=136450048
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters 
                Bytes Read=174
        File Output Format Counters 
                Bytes Written=107
[hadoop@nnode code]$

查看結(jié)果

[hadoop@nnode code]$ hdfs dfs -ls /201511291404
Found 2 items
-rw-r--r--   2 hadoop hadoop          0 2015-11-29 14:45 /201511291404/_SUCCESS
-rw-r--r--   2 hadoop hadoop        107 2015-11-29 14:45 /201511291404/part-r-00000
[hadoop@nnode code]$ hdfs dfs -text /201511291404/part-r-00000
信息安全        20      3156
搜索引擎        112     24248
站點(diǎn)統(tǒng)計(jì)        27      8898
綜合門(mén)戶        15      1938
視頻網(wǎng)站        15      1527
[hadoop@nnode code]$

注意:在第一次執(zhí)行的時(shí)候報(bào)錯(cuò)如下

15/11/29 14:41:28 INFO mapreduce.Job:  map 100% reduce 0%
15/11/29 14:42:04 INFO mapreduce.Job: Task Id : attempt_1448763754600_0003_r_000000_0, Status : FAILED
Error: java.lang.RuntimeException: java.lang.NoSuchMethodException: com.lucl.hadoop.mapreduce.serialize.CustomizeWritable.()
        at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:131)
        at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:66)
        at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
        at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKeyValue(ReduceContextImpl.java:146)
        at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKey(ReduceContextImpl.java:121)
        at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.nextKey(WrappedReducer.java:302)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:170)
        at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.NoSuchMethodException: com.lucl.hadoop.mapreduce.serialize.CustomizeWritable.()
        at java.lang.Class.getConstructor0(Class.java:2892)
        at java.lang.Class.getDeclaredConstructor(Class.java:2058)
        at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:125)
        ... 13 more

在網(wǎng)上查詢?cè)驗(yàn)椤霸谧远xwritable的時(shí)候需要注意,反射過(guò)程中需要調(diào)用無(wú)參構(gòu)造,需要定義無(wú)參的構(gòu)造方法。”。


文章名稱(chēng):Hadoop2.6.0學(xué)習(xí)筆記(三)Hadoop序列化
文章鏈接:http://weahome.cn/article/goeidc.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部