小編給大家分享一下如何自定義hadoop MapReduce InputFormat切分輸入文件,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!
創(chuàng)新互聯(lián)是一家集網(wǎng)站建設(shè),貴州企業(yè)網(wǎng)站建設(shè),貴州品牌網(wǎng)站建設(shè),網(wǎng)站定制,貴州網(wǎng)站建設(shè)報(bào)價(jià),網(wǎng)絡(luò)營(yíng)銷,網(wǎng)絡(luò)優(yōu)化,貴州網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競(jìng)爭(zhēng)力??沙浞譂M足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時(shí)我們時(shí)刻保持專業(yè)、時(shí)尚、前沿,時(shí)刻以成就客戶成長(zhǎng)自我,堅(jiān)持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實(shí)用型網(wǎng)站。
我們實(shí)現(xiàn)了按 cookieId 和 time 進(jìn)行二次排序,現(xiàn)在又有新問(wèn)題:假如我需要按 cookieId 和 cookieId&time 的組合進(jìn)行分析呢?此時(shí)最好的辦法是自定義 InputFormat,讓 mapreduce 一次讀取一個(gè) cookieId 下的所有記錄,然后再按 time 進(jìn)行切分 session,邏輯偽碼如下:
for OneSplit in MyInputFormat.getSplit() // OneSplit 是某個(gè) cookieId 下的所有記錄
for session in OneSplit // session 是按 time 把 OneSplit 進(jìn)行了二次分割
for line in session // line 是 session 中的每條記錄,對(duì)應(yīng)原始日志的某條記錄
1、原理:
InputFormat是MapReduce中一個(gè)很常用的概念,它在程序的運(yùn)行中到底起到了什么作用呢?
InputFormat其實(shí)是一個(gè)接口,包含了兩個(gè)方法:
public interface InputFormat
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader
TaskAttemptContext context) throws IOException;
}
這兩個(gè)方法有分別完成著以下工作:
方法 getSplits 將輸入數(shù)據(jù)切分成splits,splits的個(gè)數(shù)即為map tasks的個(gè)數(shù),splits的大小默認(rèn)為塊大小,即64M
方法
getRecordReader 將每個(gè) split
解析成records, 再依次將record解析成
也就是說(shuō) InputFormat完成以下工作:
InputFile -->
splits
-->
系統(tǒng)常用的 InputFormat 又有哪些呢?
其中Text
InputFormat便是最常用的,它的
然而系統(tǒng)所提供的這幾種固定的將
InputFile轉(zhuǎn)換為
此時(shí)需要我們自定義 InputFormat ,從而使Hadoop框架按照我們預(yù)設(shè)的方式來(lái)將
InputFile解析為
在領(lǐng)會(huì)自定義 InputFormat 之前,需要弄懂一下幾個(gè)抽象類、接口及其之間的關(guān)系:
InputFormat(interface), FileInputFormat(abstract class), TextInputFormat(class),
RecordReader (interface), Line RecordReader(class)的關(guān)系
FileInputFormat implements InputFormat
TextInputFormat extends FileInputFormat
TextInputFormat.get RecordReader calls Line RecordReader
Line RecordReader implements RecordReader
對(duì)于InputFormat接口,上面已經(jīng)有詳細(xì)的描述
再看看 FileInputFormat,它實(shí)現(xiàn)了 InputFormat接口中的 getSplits方法,而將 getRecordReader與isSplitable留給具體類(如 TextInputFormat )實(shí)現(xiàn), isSplitable方法通常不用修改,所以只需要在自定義的 InputFormat中實(shí)現(xiàn)
getRecordReader方法即可,而該方法的核心是調(diào)用
Line
RecordReader(即由LineRecorderReader類來(lái)實(shí)現(xiàn) "
將每個(gè)s
plit解析成records, 再依次將record解析成
public interface RecordReader
boolean
next(K key, V value) throws IOException;
K
createKey();
V
createValue();
long
getPos() throws IOException;
public void
close() throws IOException;
float
getProgress() throws IOException;
}
因此自定義InputFormat的核心是自定義一個(gè)實(shí)現(xiàn)接口RecordReader類似于LineRecordReader的類,該類的核心也正是重寫(xiě)接口RecordReader中的幾大方法,
定義一個(gè)InputFormat的核心是定義一個(gè)類似于LineRecordReader的,自己的RecordReader
2、代碼:
package MyInputFormat; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class TrackInputFormat extends FileInputFormat{ @SuppressWarnings("deprecation") @Override public RecordReader createRecordReader( InputSplit split, TaskAttemptContext context) { return new TrackRecordReader(); } @Override protected boolean isSplitable(JobContext context, Path file) { CompressionCodec codec = new CompressionCodecFactory( context.getConfiguration()).getCodec(file); return codec == null; } }
package MyInputFormat; import java.io.IOException; import java.io.InputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; /** * Treats keys as offset in file and value as line. * * @deprecated Use * {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader} * instead. */ public class TrackRecordReader extends RecordReader{ private static final Log LOG = LogFactory.getLog(TrackRecordReader.class); private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private NewLineReader in; private int maxLineLength; private LongWritable key = null; private Text value = null; // ---------------------- // 行分隔符,即一條記錄的分隔符 private byte[] separator = "END\n".getBytes(); // -------------------- public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); boolean skipFirstLine = false; if (codec != null) { in = new NewLineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { if (start != 0) { skipFirstLine = true; this.start -= separator.length;// // --start; fileIn.seek(start); } in = new NewLineReader(fileIn, job); } if (skipFirstLine) { // skip first line and re-establish "start". start += in.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start)); } this.pos = start; } public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } key.set(pos); if (value == null) { value = new Text(); } int newSize = 0; while (pos < end) { newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); if (newSize == 0) { break; } pos += newSize; if (newSize < maxLineLength) { break; } LOG.info("Skipped line of size ">
package MyInputFormat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class TestMyInputFormat { public static class MapperClass extends Mapper{ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println("key:\t " + key); System.out.println("value:\t " + value); System.out.println("-------------------------"); } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Path outPath = new Path("/hive/11"); FileSystem.get(conf).delete(outPath, true); Job job = new Job(conf, "TestMyInputFormat"); job.setInputFormatClass(TrackInputFormat.class); job.setJarByClass(TestMyInputFormat.class); job.setMapperClass(TestMyInputFormat.MapperClass.class); job.setNumReduceTasks(0); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, outPath); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
3、測(cè)試數(shù)據(jù):
cookieId time url cookieOverFlag
1 a 1_hao123 1 a 1_baidu 1 b 1_google 2END 2 c 2_google 2 c 2_hao123 2 c 2_google 1END 3 a 3_baidu 3 a 3_sougou 3 b 3_soso 2END
4、結(jié)果:
key: 0 value: 1 a 1_hao123 1 a 1_baidu 1 b 1_google 2 ------------------------- key: 47 value: 2 c 2_google 2 c 2_hao123 2 c 2_google 1 ------------------------- key: 96 value: 3 a 3_baidu 3 a 3_sougou 3 b 3_soso 2 -------------------------
看完了這篇文章,相信你對(duì)“如何自定義hadoop MapReduce InputFormat切分輸入文件”有了一定的了解,如果想了解更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!