這篇文章主要講解了“Hadoop用戶怎么自定義”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Hadoop用戶怎么自定義”吧!
公司主營業(yè)務(wù):成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)、移動網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競爭能力。創(chuàng)新互聯(lián)是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對我們的高要求,感謝他們從不同領(lǐng)域給我們帶來的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會用頭腦與智慧不斷的給客戶帶來驚喜。創(chuàng)新互聯(lián)推出麻栗坡免費(fèi)做網(wǎng)站回饋大家。
一:Hadoop內(nèi)置的數(shù)據(jù)類型。
Hadoop提供如下內(nèi)置的數(shù)據(jù)類型,這些數(shù)據(jù)類型都實(shí)現(xiàn)了WritableComparable接口,以便用這些類型定義的數(shù)據(jù)可以被序列化進(jìn)行網(wǎng)絡(luò)傳輸和文件存儲,以及進(jìn)行大小比較。
BooleanWritable | 標(biāo)準(zhǔn)布爾型數(shù)值 |
ByteWritable | 單字節(jié)數(shù)值 |
DoubleWritable | 雙字節(jié)數(shù) |
FloatWritable | 浮點(diǎn)數(shù) |
IntWritable | 整型數(shù) |
LongWritable | 長整型數(shù) |
Text | 使用UTF-8格式存儲的文本 |
NullWritable | 當(dāng) |
//簡單知道這些類型 IntWritable iw = new IntWritable(1); System.out.println( iw.get() ); // 1 BooleanWritable bw = new BooleanWritable(true); System.out.println( bw.get() ); // true
二:Hadoop-用戶自定義的數(shù)據(jù)類型。
自定義數(shù)據(jù)類型時(shí),需滿足兩個(gè)基本要求,即
1.實(shí)現(xiàn)Writable接口,以便該數(shù)據(jù)能被序列化后完成網(wǎng)絡(luò)傳輸或文件輸入/輸出。
2.如果該數(shù)據(jù)需要作為主鍵key使用,或需要比較數(shù)值大小時(shí),則需要實(shí)現(xiàn)WritableComparable接口。
//Hadoop2.6.4版 - Writable源碼: public interface Writable { void write(DataOutput out) throws IOException; void readFields(DataInput in) throws IOException; }
public interface WritableComparableextends Writable, Comparable {}
三:Hadoop內(nèi)置的數(shù)據(jù)輸入格式和RecordReader。
數(shù)據(jù)輸入格式(InputFormat)用于描述MapReduce作業(yè)的數(shù)據(jù)輸入規(guī)范。MapReduce框架依靠數(shù)據(jù)輸入格式完成輸入規(guī)范檢查、對數(shù)據(jù)文件進(jìn)行輸入分塊(InputSplit),以及提供從輸入分塊中將數(shù)據(jù)記錄逐一讀出、并轉(zhuǎn)換為Map過程的輸入鍵值對等功能。
Hadoop提供了豐富的內(nèi)置數(shù)據(jù)輸入格式,最常用的數(shù)據(jù)輸入格式包括:TextInputFormat 和 KeyValueInputFormat。
TextInputFormat是系統(tǒng)默認(rèn)的數(shù)據(jù)輸入格式,可以將文本文件分塊并逐行讀入以便Map節(jié)點(diǎn)進(jìn)行處理。讀入一行時(shí),所產(chǎn)生的主鍵key就是當(dāng)前行在整個(gè)文本文件中的字節(jié)偏移位置,而value就是該行的內(nèi)容。
//TextInputFormat部分源碼: public class TextInputFormat extends FileInputFormat{ @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { String delimiter = context.getConfiguration().get( "textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); return new LineRecordReader(recordDelimiterBytes); } //.... }
KeyValueTextInputFormat是另一個(gè)常用的數(shù)據(jù)輸入格式,可將一個(gè)按照
//KeyValueTextInputFormat部分源碼: public class KeyValueTextInputFormat extends FileInputFormat{ // ... public RecordReader createRecordReader(InputSplit genericSplit, TaskAttemptContext context) throws IOException { context.setStatus(genericSplit.toString()); return new KeyValueLineRecordReader(context.getConfiguration()); } }
RecordReader:對于一個(gè)數(shù)據(jù)輸入格式,都需要有一個(gè)對應(yīng)的RecordReader,主要用于將一個(gè)文件中的數(shù)據(jù)記錄拆分成具體的鍵值對。TextInputFormat的默認(rèn)RecordReader是LineRecordReader,而KeyValueTextInputFormat的默認(rèn)RecordReader是KeyValueLineRecordReader。
四:Hadoop內(nèi)置的數(shù)據(jù)輸出格式與RecordWriter。
數(shù)據(jù)輸出格式(OutputFormat)用于描述MapReduce作業(yè)的數(shù)據(jù)輸出規(guī)范。MapReduce框架依靠數(shù)據(jù)輸出格式完成輸出規(guī)范檢查以及提供作業(yè)結(jié)果數(shù)據(jù)輸出功能。
同樣,最常用的數(shù)據(jù)輸出格式是TextOutputFormat,也是系統(tǒng)默認(rèn)的數(shù)據(jù)輸出格式,可以將計(jì)算結(jié)果以 “key + \t + vaue”的形式逐行輸出到文本文件中。
與數(shù)據(jù)輸入格式類似樣,數(shù)據(jù)輸出格式也提供一個(gè)對應(yīng)的RecordWriter,以便系統(tǒng)明確輸出結(jié)果寫入到文件中的具體格式。TextInputFormat的默認(rèn)RecordWriter是LineRecordWriter,其實(shí)際操作是將結(jié)果數(shù)據(jù)以“key + \t + value”的形式輸出到文本文件中。
//TextOutputFormat的部分源碼: public class TextOutputFormatextends FileOutputFormat { protected static class LineRecordWriter extends RecordWriter { // ... public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { //... } public LineRecordWriter(DataOutputStream out) { this(out, "\t"); } private void writeObject(Object o) throws IOException { // ... } public synchronized void write(K key, V value) throws IOException { //... out.write(newline); } } public RecordWriter getRecordWriter(TaskAttemptContext job ) throws IOException, InterruptedException { // ... } }
五:通過打印輸出UserInfo小例子來實(shí)現(xiàn)簡單的用戶自定義數(shù)據(jù)類型,數(shù)據(jù)輸入格式,數(shù)據(jù)輸出格式。 (簡單的說就是模仿源碼,基本上沒多大變化)。
以下附上案例源碼:
1.定義自己的UserInfo,作為數(shù)據(jù)類型。
package com.hadoop.mapreduce.test4.outputformat; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class UserInfo implements WritableComparable{ private int id; private String name; private int age; private String sex; private String address; public UserInfo() { } public UserInfo(int id, String name, int age, String sex, String address) { this.id = id; this.name = name; this.age = age; this.sex = sex; this.address = address; } // JavaBean 普通的get set方法.... @Override public void readFields(DataInput in) throws IOException { this.id = in.readInt(); this.name = in.readUTF(); this.age = in.readInt(); this.sex = in.readUTF(); this.address = in.readUTF(); } @Override public void write(DataOutput out) throws IOException { out.writeInt(id); out.writeUTF(name); out.writeInt(age); out.writeUTF(sex); out.writeUTF(address); } @Override public String toString() { return "Id:" + id + ", Name:" + name + ", Age:" + age + ", Sex:" + sex + ", Address:" + address ; } @Override public int compareTo(UserInfo userInfo) { return 0; } }
2.定制自己的數(shù)據(jù)輸入格式:UserInfoTextInputFormat。
package com.hadoop.mapreduce.test4.outputformat; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class UserInfoTextInputFormat extends FileInputFormat{ @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { context.setStatus(split.toString()); UserInfoRecordReader userInforRecordReader = new UserInfoRecordReader(context.getConfiguration() ); return userInforRecordReader; } }
3.定制自己的RecordReader:UserInfoRecordReader。
package com.hadoop.mapreduce.test4.outputformat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; public class UserInfoRecordReader extends RecordReader{ public static final String KEY_VALUE_SEPERATOR = "mapreduce.input.keyvaluelinerecordreader.key.value.separator"; private final LineRecordReader lineRecordReader; private byte separator = (byte) '\t'; private Text innerValue; private Text key; private UserInfo value; public Class getKeyClass() { return Text.class; } public UserInfoRecordReader(Configuration conf)throws IOException { lineRecordReader = new LineRecordReader(); String sepStr = conf.get(KEY_VALUE_SEPERATOR,"\t"); this.separator = (byte) sepStr.charAt(0); } public void initialize(InputSplit genericSplit,TaskAttemptContext context) throws IOException { lineRecordReader.initialize(genericSplit, context); } public static int findSeparator(byte[] utf, int start, int length, byte sep) { for (int i = start; i < (start + length); i++) { if (utf[i] == sep) { return i; } } return -1; //將這個(gè)截取標(biāo)識符的位置給返回回去。 } public static void setKeyValue(Text key, UserInfo value, byte[] line,int lineLen, int pos) { if (pos == -1) { key.set(line, 0, lineLen); value.setId(0); value.setName(""); value.setAge(0); value.setSex(""); value.setAddress(""); } else { key.set(line, 0, pos); //設(shè)置鍵 從 第 0位置 到 截取標(biāo)識符的位置 Text text = new Text(); text.set(line, pos + 1, lineLen - pos - 1); System.out.println("text的值: "+text); String[] str = text.toString().split(","); for (int i=0;i "+value); if("ID".equals(strKeyValue[0])){ value.setId(Integer.parseInt( strKeyValue[1]) ); }else if("Name".equals(strKeyValue[0])){ value.setName( strKeyValue[1]); }else if("Age".equals(strKeyValue[0])){ value.setAge(Integer.parseInt( strKeyValue[1]) ); }else if("Sex".equals(strKeyValue[0])){ value.setSex(strKeyValue[1] ); }else if("Address".equals(strKeyValue[0])){ value.setAddress(strKeyValue[1] ); } } // System.out.println( "key--> " + key); // System.out.println( "value--> "+value +"\n\n"); } } public synchronized boolean nextKeyValue()throws IOException { byte[] line = null; int lineLen = -1; if (key == null) { key = new Text(); } if (value == null) { value = new UserInfo(); } if (lineRecordReader.nextKeyValue()) { innerValue = lineRecordReader.getCurrentValue(); line = innerValue.getBytes(); lineLen = innerValue.getLength(); } else { return false; } if (line == null){ return false; } int pos = findSeparator(line, 0, lineLen, this.separator); setKeyValue(key, value, line, lineLen, pos); return true; } public Text getCurrentKey() { return key; } public UserInfo getCurrentValue() { return value; } public float getProgress() throws IOException { return lineRecordReader.getProgress(); } public synchronized void close() throws IOException { lineRecordReader.close(); } }
3.定制自己的輸出格式:UserInfoTextOutputFormat。
package com.hadoop.mapreduce.test4.outputformat; import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; public class UserInfoTextOutputFormatextends FileOutputFormat { public static String SEPERATOR = "mapreduce.output.textoutputformat.separator"; protected static class LineRecordWriter extends RecordWriter { private static final String utf8 = "UTF-8"; private static final byte[] newline; static { try { newline = "\n".getBytes(utf8); //System.out.println( "newline --> " + newline); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public LineRecordWriter(DataOutputStream out) { this(out, "\t"); } private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; System.out.println( "o instanceof Text --> True : "+ to.toString() ); out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(utf8)); System.out.println( "o instanceof Text --> false : "+ o.toString() ); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; System.out.println( "nullKey--> "+nullKey +" , nullValue--> "+nullValue); if (nullKey && nullValue) { return; } System.out.println( " nullkey --> "+ nullKey + ", !nullkey -->"+nullKey); if (!nullKey) { writeObject(key); } System.out.println( "(nullKey || nullValue) --> " + (nullKey || nullValue) ); if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write(newline); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); } } public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator= conf.get(SEPERATOR, "---->"); System.out.println( "keyValueSeparator---> "+keyValueSeparator); CompressionCodec codec = null; String extension = ""; if (isCompressed) { Class extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); extension = codec.getDefaultExtension(); } Path file = getDefaultWorkFile(job, extension); System.out.println( "file --> Path : "+ file ); FileSystem fs = file.getFileSystem(conf); if (!isCompressed) { FSDataOutputStream fileOut = fs.create(file, false); System.out.println( "if---isCompressed-->: "+fileOut); return new LineRecordWriter (fileOut, keyValueSeparator); } else { FSDataOutputStream fileOut = fs.create(file, false); System.out.println( "else---isCompressed-->: "+fileOut); return new LineRecordWriter (new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator); } } }
5.測試類:PrintUserInfo
package com.hadoop.mapreduce.test4.outputformat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class PrintUserInfo { public static final IntWritable ONE = new IntWritable(1); public static class UserInfoMapper extends Mapper{ @Override protected void map(Text key, UserInfo value, Mapper .Context context) throws IOException, InterruptedException { super.map(key, value, context); } } public static void main(String[] args) { try { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "UserInfo"); job.setJarByClass(PrintUserInfo.class); job.setMapperClass(UserInfoMapper.class); //定制輸入格式: job.setInputFormatClass(UserInfoTextInputFormat.class); //定制輸出格式: job.setOutputFormatClass(UserInfoTextOutputFormat.class); job.setMapOutputKeyClass(Text.class); //用的自己定義的數(shù)據(jù)類型 job.setMapOutputValueClass(UserInfo.class); FileInputFormat.addInputPath(job, new Path("hdfs://192.168.226.129:9000/rootdir/mapuserinfo.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.226.129:9000/rootdir/data/output7/"+System.currentTimeMillis()+"/")); System.exit(job.waitForCompletion(true) ? 0 : 1);//執(zhí)行job } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
6.輸出結(jié)果:
1.數(shù)據(jù)文件:
1 ID:221,Name:xj,Age:22,Sex:man,Address:hunan, 2 ID:222,Name:cc,Age:21,Sex:Woman,Address:miluo,
2.結(jié)果文件:
1---->Id:221, Name:xj, Age:22, Sex:man, Address:hunan 2---->Id:222, Name:cc, Age:21, Sex:Woman, Address:miluo
感謝各位的閱讀,以上就是“Hadoop用戶怎么自定義”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對Hadoop用戶怎么自定義這一問題有了更深刻的體會,具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!