真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Hadoop用戶怎么自定義

這篇文章主要講解了“Hadoop用戶怎么自定義”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Hadoop用戶怎么自定義”吧!

公司主營業(yè)務(wù):成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)、移動網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競爭能力。創(chuàng)新互聯(lián)是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對我們的高要求,感謝他們從不同領(lǐng)域給我們帶來的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會用頭腦與智慧不斷的給客戶帶來驚喜。創(chuàng)新互聯(lián)推出麻栗坡免費(fèi)做網(wǎng)站回饋大家。

一:Hadoop內(nèi)置的數(shù)據(jù)類型。

    Hadoop提供如下內(nèi)置的數(shù)據(jù)類型,這些數(shù)據(jù)類型都實(shí)現(xiàn)了WritableComparable接口,以便用這些類型定義的數(shù)據(jù)可以被序列化進(jìn)行網(wǎng)絡(luò)傳輸和文件存儲,以及進(jìn)行大小比較。

BooleanWritable標(biāo)準(zhǔn)布爾型數(shù)值
ByteWritable單字節(jié)數(shù)值
DoubleWritable雙字節(jié)數(shù)
FloatWritable浮點(diǎn)數(shù)
IntWritable整型數(shù)
LongWritable長整型數(shù)
Text使用UTF-8格式存儲的文本
NullWritable當(dāng)中的key或value為空時(shí)使用
//簡單知道這些類型
IntWritable iw = new IntWritable(1);
System.out.println(  iw.get() );  // 1 
	
BooleanWritable bw = new BooleanWritable(true);
System.out.println(  bw.get() );  // true

二:Hadoop-用戶自定義的數(shù)據(jù)類型。

    自定義數(shù)據(jù)類型時(shí),需滿足兩個(gè)基本要求,即

        1.實(shí)現(xiàn)Writable接口,以便該數(shù)據(jù)能被序列化后完成網(wǎng)絡(luò)傳輸或文件輸入/輸出。

        2.如果該數(shù)據(jù)需要作為主鍵key使用,或需要比較數(shù)值大小時(shí),則需要實(shí)現(xiàn)WritableComparable接口。

//Hadoop2.6.4版 - Writable源碼:
public interface Writable {
 
  void write(DataOutput out) throws IOException;

  void readFields(DataInput in) throws IOException;

}
public interface WritableComparable extends Writable, Comparable {}

三:Hadoop內(nèi)置的數(shù)據(jù)輸入格式和RecordReader。

    數(shù)據(jù)輸入格式(InputFormat)用于描述MapReduce作業(yè)的數(shù)據(jù)輸入規(guī)范。MapReduce框架依靠數(shù)據(jù)輸入格式完成輸入規(guī)范檢查、對數(shù)據(jù)文件進(jìn)行輸入分塊(InputSplit),以及提供從輸入分塊中將數(shù)據(jù)記錄逐一讀出、并轉(zhuǎn)換為Map過程的輸入鍵值對等功能。

    Hadoop提供了豐富的內(nèi)置數(shù)據(jù)輸入格式,最常用的數(shù)據(jù)輸入格式包括:TextInputFormat 和 KeyValueInputFormat。

    TextInputFormat是系統(tǒng)默認(rèn)的數(shù)據(jù)輸入格式,可以將文本文件分塊并逐行讀入以便Map節(jié)點(diǎn)進(jìn)行處理。讀入一行時(shí),所產(chǎn)生的主鍵key就是當(dāng)前行在整個(gè)文本文件中的字節(jié)偏移位置,而value就是該行的內(nèi)容。

//TextInputFormat部分源碼:
public class TextInputFormat extends FileInputFormat {

  @Override
  public RecordReader 
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) {
    String delimiter = context.getConfiguration().get(
        "textinputformat.record.delimiter");
    byte[] recordDelimiterBytes = null;
    if (null != delimiter)
      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
    return new LineRecordReader(recordDelimiterBytes);
  }

  //....
}

    KeyValueTextInputFormat是另一個(gè)常用的數(shù)據(jù)輸入格式,可將一個(gè)按照格式逐行存放的文本文件逐行讀出,并自動解析生成相應(yīng)的key和value。

//KeyValueTextInputFormat部分源碼:
public class KeyValueTextInputFormat extends FileInputFormat {

  // ...

  public RecordReader createRecordReader(InputSplit genericSplit,
      TaskAttemptContext context) throws IOException {
    
    context.setStatus(genericSplit.toString());
    return new KeyValueLineRecordReader(context.getConfiguration());
  }

}

    RecordReader:對于一個(gè)數(shù)據(jù)輸入格式,都需要有一個(gè)對應(yīng)的RecordReader,主要用于將一個(gè)文件中的數(shù)據(jù)記錄拆分成具體的鍵值對。TextInputFormat的默認(rèn)RecordReader是LineRecordReader,而KeyValueTextInputFormat的默認(rèn)RecordReader是KeyValueLineRecordReader。

四:Hadoop內(nèi)置的數(shù)據(jù)輸出格式與RecordWriter。

    數(shù)據(jù)輸出格式(OutputFormat)用于描述MapReduce作業(yè)的數(shù)據(jù)輸出規(guī)范。MapReduce框架依靠數(shù)據(jù)輸出格式完成輸出規(guī)范檢查以及提供作業(yè)結(jié)果數(shù)據(jù)輸出功能。

    同樣,最常用的數(shù)據(jù)輸出格式是TextOutputFormat,也是系統(tǒng)默認(rèn)的數(shù)據(jù)輸出格式,可以將計(jì)算結(jié)果以 “key + \t + vaue”的形式逐行輸出到文本文件中。

    與數(shù)據(jù)輸入格式類似樣,數(shù)據(jù)輸出格式也提供一個(gè)對應(yīng)的RecordWriter,以便系統(tǒng)明確輸出結(jié)果寫入到文件中的具體格式。TextInputFormat的默認(rèn)RecordWriter是LineRecordWriter,其實(shí)際操作是將結(jié)果數(shù)據(jù)以“key + \t + value”的形式輸出到文本文件中。

//TextOutputFormat的部分源碼:
public class TextOutputFormat extends FileOutputFormat {

  protected static class LineRecordWriter extends RecordWriter {
    // ...

    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
      //...
    }

    public LineRecordWriter(DataOutputStream out) {
      this(out, "\t");
    }

    private void writeObject(Object o) throws IOException {
       // ...
    }

    public synchronized void write(K key, V value) throws IOException {
      //...
      out.write(newline);
    }

  }

  public RecordWriter getRecordWriter(TaskAttemptContext job
                         ) throws IOException, InterruptedException {
     // ...
  }
}

五:通過打印輸出UserInfo小例子來實(shí)現(xiàn)簡單的用戶自定義數(shù)據(jù)類型,數(shù)據(jù)輸入格式,數(shù)據(jù)輸出格式。 (簡單的說就是模仿源碼,基本上沒多大變化)。

        以下附上案例源碼:

1.定義自己的UserInfo,作為數(shù)據(jù)類型。

package com.hadoop.mapreduce.test4.outputformat;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

public class UserInfo implements WritableComparable {

	private int id;
	private String name;
	private int age;
	private String sex;
	private String address;
	
	public UserInfo() {
	}
	public UserInfo(int id, String name, int age, String sex, String address) {
		this.id = id;
		this.name = name;
		this.age = age;
		this.sex = sex;
		this.address = address;
	}

	// JavaBean 普通的get set方法....

	@Override
	public void readFields(DataInput in) throws IOException {
		this.id = in.readInt();
		this.name = in.readUTF();
		this.age = in.readInt();
		this.sex = in.readUTF();
		this.address = in.readUTF();
	}
	
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(id);
		out.writeUTF(name);
		out.writeInt(age);
		out.writeUTF(sex);
		out.writeUTF(address);
	}

	@Override
	public String toString() {
		return "Id:" + id + ", Name:" + name + ", Age:" + age + ", Sex:" + sex + ", Address:" + address ;
	}

	@Override
	public int compareTo(UserInfo userInfo) {
		return 0;
	}
}

2.定制自己的數(shù)據(jù)輸入格式:UserInfoTextInputFormat。

package com.hadoop.mapreduce.test4.outputformat;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class UserInfoTextInputFormat extends FileInputFormat {
	@Override
	public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		context.setStatus(split.toString());
		UserInfoRecordReader userInforRecordReader = new UserInfoRecordReader(context.getConfiguration() );
		return userInforRecordReader;
	}
}

3.定制自己的RecordReader:UserInfoRecordReader。

package com.hadoop.mapreduce.test4.outputformat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

public class UserInfoRecordReader extends RecordReader {
	public static final String KEY_VALUE_SEPERATOR = 
			"mapreduce.input.keyvaluelinerecordreader.key.value.separator";

	private final LineRecordReader lineRecordReader;

	private byte separator = (byte) '\t';

	private Text innerValue;
	private Text key;

	private UserInfo value;


	public Class getKeyClass() { 
		return Text.class;
	}

	public UserInfoRecordReader(Configuration conf)throws IOException {
		lineRecordReader = new LineRecordReader();
		String sepStr = conf.get(KEY_VALUE_SEPERATOR,"\t");
		this.separator = (byte) sepStr.charAt(0);
	}

	public void initialize(InputSplit genericSplit,TaskAttemptContext context) throws IOException {
		lineRecordReader.initialize(genericSplit, context);
	}

	public static int findSeparator(byte[] utf, int start, int length, byte sep) {
		for (int i = start; i < (start + length); i++) {
			if (utf[i] == sep) {
				return i;
			}
		}
		return -1; //將這個(gè)截取標(biāo)識符的位置給返回回去。
	}

	public static void setKeyValue(Text key, UserInfo value, byte[] line,int lineLen, int pos) {
		if (pos == -1) {
			key.set(line, 0, lineLen);
			value.setId(0);
			value.setName("");
			value.setAge(0);
			value.setSex("");
			value.setAddress("");
		} else {
			key.set(line, 0, pos); //設(shè)置鍵  從 第 0位置 到 截取標(biāo)識符的位置
			Text text = new Text();
			text.set(line, pos + 1, lineLen - pos - 1);
			System.out.println("text的值: "+text);
			String[] str = text.toString().split(",");
			for (int i=0;i"+value);
				if("ID".equals(strKeyValue[0])){
					value.setId(Integer.parseInt( strKeyValue[1]) );
				}else if("Name".equals(strKeyValue[0])){
					value.setName( strKeyValue[1]);
				}else if("Age".equals(strKeyValue[0])){
					value.setAge(Integer.parseInt( strKeyValue[1]) );
				}else if("Sex".equals(strKeyValue[0])){
					value.setSex(strKeyValue[1] );
				}else if("Address".equals(strKeyValue[0])){
					value.setAddress(strKeyValue[1] );
				}
			}
//			System.out.println( "key--> " + key);
//			System.out.println( "value--> "+value +"\n\n");
		}
	}
	
	public synchronized boolean nextKeyValue()throws IOException {
		byte[] line = null;
		int lineLen = -1;
		if (key == null) {
			key = new Text();
		}
		if (value == null) {
			value = new UserInfo(); 
		}
		if (lineRecordReader.nextKeyValue()) {
			innerValue = lineRecordReader.getCurrentValue();
			line = innerValue.getBytes();
			lineLen = innerValue.getLength();
		} else {
			return false;
		}
		if (line == null){
			return false;
		}

		int pos = findSeparator(line, 0, lineLen, this.separator);
		setKeyValue(key, value, line, lineLen, pos);
		return true;
	}

	public Text getCurrentKey() {
		return key;
	}
	public UserInfo getCurrentValue() {
		return value;
	}

	public float getProgress() throws IOException {
		return lineRecordReader.getProgress();
	}

	public synchronized void close() throws IOException { 
		lineRecordReader.close();
	}

}

3.定制自己的輸出格式:UserInfoTextOutputFormat。

package com.hadoop.mapreduce.test4.outputformat;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

public class UserInfoTextOutputFormat extends FileOutputFormat {
	public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
	protected static class LineRecordWriter extends RecordWriter {
		private static final String utf8 = "UTF-8";
		private static final byte[] newline;
		static {
			try {
				newline = "\n".getBytes(utf8);
				//System.out.println(  "newline --> " + newline);
			} catch (UnsupportedEncodingException uee) {
				throw new IllegalArgumentException("can't find " + utf8 + " encoding");
			}
		}

		protected DataOutputStream out;
		private final byte[] keyValueSeparator;

		public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
			this.out = out;
			try {
				this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
			} catch (UnsupportedEncodingException uee) {
				throw new IllegalArgumentException("can't find " + utf8 + " encoding");
			}
		}

		public LineRecordWriter(DataOutputStream out) {
			this(out, "\t");
		}

		private void writeObject(Object o) throws IOException {
			if (o instanceof Text) {
				Text to = (Text) o;
				System.out.println(  "o instanceof Text  --> True : "+ to.toString()  );
				out.write(to.getBytes(), 0, to.getLength());
			} else {
				out.write(o.toString().getBytes(utf8));
				System.out.println( "o instanceof Text  --> false : "+ o.toString()  );
			}
		}

		public synchronized void write(K key, V value) throws IOException {
			boolean nullKey = key == null || key instanceof NullWritable;
			boolean nullValue = value == null || value instanceof NullWritable;
			System.out.println(  "nullKey--> "+nullKey +" ,  nullValue--> "+nullValue);
			if (nullKey && nullValue) {
				return;
			}
			System.out.println( " nullkey --> "+ nullKey + ", !nullkey -->"+nullKey);
			if (!nullKey) {
				writeObject(key);
			}
			System.out.println( "(nullKey || nullValue) --> " + (nullKey || nullValue) );
			if (!(nullKey || nullValue)) {
				out.write(keyValueSeparator);
			}
			if (!nullValue) {
				writeObject(value);
			}
			out.write(newline);
		}

		public synchronized void close(TaskAttemptContext context) throws IOException {
			out.close();
		}
	}

	public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
		Configuration conf = job.getConfiguration();
		boolean isCompressed = getCompressOutput(job);
		String keyValueSeparator= conf.get(SEPERATOR, "---->");
		System.out.println(  "keyValueSeparator---> "+keyValueSeparator);
		CompressionCodec codec = null;
		String extension = "";
		if (isCompressed) {
			Class codecClass = 
					getOutputCompressorClass(job, GzipCodec.class);
			codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
			extension = codec.getDefaultExtension();
		}
		Path file = getDefaultWorkFile(job, extension);
		System.out.println(  "file --> Path : "+ file  );
		FileSystem fs = file.getFileSystem(conf);
		
		if (!isCompressed) {
			FSDataOutputStream fileOut = fs.create(file, false);
			System.out.println( "if---isCompressed-->: "+fileOut);
			return new LineRecordWriter(fileOut, keyValueSeparator);
		} else {
			FSDataOutputStream fileOut = fs.create(file, false);
			System.out.println( "else---isCompressed-->: "+fileOut);
			return new LineRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);
		}
	}
}

5.測試類:PrintUserInfo

package com.hadoop.mapreduce.test4.outputformat;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class PrintUserInfo {
	public static final IntWritable ONE = new IntWritable(1);
	public static class UserInfoMapper extends Mapper{
		@Override
		protected void map(Text key, UserInfo value, Mapper.Context context)
				throws IOException, InterruptedException {
			super.map(key, value, context);
		}
	}
	
	public static void main(String[] args) {
		try {
			Configuration conf = new Configuration();
			Job job = Job.getInstance(conf, "UserInfo");
			
			job.setJarByClass(PrintUserInfo.class);
			job.setMapperClass(UserInfoMapper.class);
			
			//定制輸入格式:
			job.setInputFormatClass(UserInfoTextInputFormat.class);
			//定制輸出格式:
			job.setOutputFormatClass(UserInfoTextOutputFormat.class);
			
			job.setMapOutputKeyClass(Text.class);
			//用的自己定義的數(shù)據(jù)類型
			job.setMapOutputValueClass(UserInfo.class);
			
			FileInputFormat.addInputPath(job, new Path("hdfs://192.168.226.129:9000/rootdir/mapuserinfo.txt"));
			FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.226.129:9000/rootdir/data/output7/"+System.currentTimeMillis()+"/"));
			System.exit(job.waitForCompletion(true) ? 0 : 1);//執(zhí)行job
			
		} catch (IOException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

6.輸出結(jié)果:

1.數(shù)據(jù)文件:

1	ID:221,Name:xj,Age:22,Sex:man,Address:hunan,
2	ID:222,Name:cc,Age:21,Sex:Woman,Address:miluo,

2.結(jié)果文件:

1---->Id:221, Name:xj, Age:22, Sex:man, Address:hunan
2---->Id:222, Name:cc, Age:21, Sex:Woman, Address:miluo

感謝各位的閱讀,以上就是“Hadoop用戶怎么自定義”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對Hadoop用戶怎么自定義這一問題有了更深刻的體會,具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!


標(biāo)題名稱:Hadoop用戶怎么自定義
網(wǎng)址分享:http://weahome.cn/article/jopcid.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部