真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

如何自定義hadoopMapReduceInputFormat切分輸入文件

小編給大家分享一下如何自定義hadoop MapReduce InputFormat切分輸入文件,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!

創(chuàng)新互聯(lián)是一家集網(wǎng)站建設(shè),貴州企業(yè)網(wǎng)站建設(shè),貴州品牌網(wǎng)站建設(shè),網(wǎng)站定制,貴州網(wǎng)站建設(shè)報(bào)價(jià),網(wǎng)絡(luò)營(yíng)銷,網(wǎng)絡(luò)優(yōu)化,貴州網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競(jìng)爭(zhēng)力??沙浞譂M足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時(shí)我們時(shí)刻保持專業(yè)、時(shí)尚、前沿,時(shí)刻以成就客戶成長(zhǎng)自我,堅(jiān)持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實(shí)用型網(wǎng)站。

我們實(shí)現(xiàn)了按 cookieId 和 time 進(jìn)行二次排序,現(xiàn)在又有新問(wèn)題:假如我需要按 cookieId 和 cookieId&time 的組合進(jìn)行分析呢?此時(shí)最好的辦法是自定義 InputFormat,讓 mapreduce 一次讀取一個(gè) cookieId 下的所有記錄,然后再按 time 進(jìn)行切分 session,邏輯偽碼如下:

for OneSplit in MyInputFormat.getSplit() // OneSplit 是某個(gè) cookieId 下的所有記錄

    for session in OneSplit // session 是按 time 把 OneSplit 進(jìn)行了二次分割

        for line in session // line 是 session 中的每條記錄,對(duì)應(yīng)原始日志的某條記錄

1、原理:

InputFormat是MapReduce中一個(gè)很常用的概念,它在程序的運(yùn)行中到底起到了什么作用呢?

InputFormat其實(shí)是一個(gè)接口,包含了兩個(gè)方法:

public interface InputFormat {
  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
 

  RecordReader createRecordReader(InputSplit split, 

                                  TaskAttemptContext context)  throws IOException;

}

這兩個(gè)方法有分別完成著以下工作:

      方法 getSplits 將輸入數(shù)據(jù)切分成splits,splits的個(gè)數(shù)即為map tasks的個(gè)數(shù),splits的大小默認(rèn)為塊大小,即64M

     方法  getRecordReader 將每個(gè) split   解析成records, 再依次將record解析成對(duì)

也就是說(shuō) InputFormat完成以下工作:

 InputFile -->  splits  --> 


 

系統(tǒng)常用的  InputFormat 又有哪些呢?

                      如何自定義hadoop MapReduce InputFormat切分輸入文件

其中Text InputFormat便是最常用的,它的 就代表 <行偏移,該行內(nèi)容>


 

然而系統(tǒng)所提供的這幾種固定的將  InputFile轉(zhuǎn)換為 的方式有時(shí)候并不能滿足我們的需求:

此時(shí)需要我們自定義   InputFormat ,從而使Hadoop框架按照我們預(yù)設(shè)的方式來(lái)將

InputFile解析為

在領(lǐng)會(huì)自定義   InputFormat 之前,需要弄懂一下幾個(gè)抽象類、接口及其之間的關(guān)系:


 

InputFormat(interface), FileInputFormat(abstract class), TextInputFormat(class),

RecordReader  (interface), Line  RecordReader(class)的關(guān)系

       FileInputFormat implements   InputFormat

       TextInputFormat extends   FileInputFormat

       TextInputFormat.get  RecordReader calls   Line  RecordReader

       Line  RecordReader   implements   RecordReader


 

對(duì)于InputFormat接口,上面已經(jīng)有詳細(xì)的描述

再看看 FileInputFormat,它實(shí)現(xiàn)了 InputFormat接口中的 getSplits方法,而將 getRecordReader與isSplitable留給具體類(如 TextInputFormat )實(shí)現(xiàn), isSplitable方法通常不用修改,所以只需要在自定義的 InputFormat中實(shí)現(xiàn)

getRecordReader方法即可,而該方法的核心是調(diào)用  Line  RecordReader(即由LineRecorderReader類來(lái)實(shí)現(xiàn) "  將每個(gè)s  plit解析成records, 再依次將record解析成對(duì)"  ),該方法實(shí)現(xiàn)了接口RecordReader


  public interface RecordReader {

  boolean   next(K key, V value) throws IOException;
  K   createKey();
  V   createValue();
  long   getPos() throws IOException;
  public void   close() throws IOException;
  float   getProgress() throws IOException;
}


 

     因此自定義InputFormat的核心是自定義一個(gè)實(shí)現(xiàn)接口RecordReader類似于LineRecordReader的類,該類的核心也正是重寫(xiě)接口RecordReader中的幾大方法,

     定義一個(gè)InputFormat的核心是定義一個(gè)類似于LineRecordReader的,自己的RecordReader


 

2、代碼:

package MyInputFormat;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class TrackInputFormat extends FileInputFormat {

	@SuppressWarnings("deprecation")
	@Override
	public RecordReader createRecordReader(
			InputSplit split, TaskAttemptContext context) {
		return new TrackRecordReader();
	}

	@Override
	protected boolean isSplitable(JobContext context, Path file) {
		CompressionCodec codec = new CompressionCodecFactory(
				context.getConfiguration()).getCodec(file);
		return codec == null;
	}

}

package MyInputFormat;

import java.io.IOException;
import java.io.InputStream;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 * Treats keys as offset in file and value as line.
 * 
 * @deprecated Use
 *             {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader}
 *             instead.
 */
public class TrackRecordReader extends RecordReader {
	private static final Log LOG = LogFactory.getLog(TrackRecordReader.class);

	private CompressionCodecFactory compressionCodecs = null;
	private long start;
	private long pos;
	private long end;
	private NewLineReader in;
	private int maxLineLength;
	private LongWritable key = null;
	private Text value = null;
	// ----------------------
	// 行分隔符,即一條記錄的分隔符
	private byte[] separator = "END\n".getBytes();

	// --------------------

	public void initialize(InputSplit genericSplit, TaskAttemptContext context)
			throws IOException {
		FileSplit split = (FileSplit) genericSplit;
		Configuration job = context.getConfiguration();
		this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
				Integer.MAX_VALUE);
		start = split.getStart();
		end = start + split.getLength();
		final Path file = split.getPath();
		compressionCodecs = new CompressionCodecFactory(job);
		final CompressionCodec codec = compressionCodecs.getCodec(file);

		FileSystem fs = file.getFileSystem(job);
		FSDataInputStream fileIn = fs.open(split.getPath());
		boolean skipFirstLine = false;
		if (codec != null) {
			in = new NewLineReader(codec.createInputStream(fileIn), job);
			end = Long.MAX_VALUE;
		} else {
			if (start != 0) {
				skipFirstLine = true;
				this.start -= separator.length;//
				// --start;
				fileIn.seek(start);
			}
			in = new NewLineReader(fileIn, job);
		}
		if (skipFirstLine) { // skip first line and re-establish "start".
			start += in.readLine(new Text(), 0,
					(int) Math.min((long) Integer.MAX_VALUE, end - start));
		}
		this.pos = start;
	}

	public boolean nextKeyValue() throws IOException {
		if (key == null) {
			key = new LongWritable();
		}
		key.set(pos);
		if (value == null) {
			value = new Text();
		}
		int newSize = 0;
		while (pos < end) {
			newSize = in.readLine(value, maxLineLength,
					Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
							maxLineLength));
			if (newSize == 0) {
				break;
			}
			pos += newSize;
			if (newSize < maxLineLength) {
				break;
			}

			LOG.info("Skipped line of size ">

package MyInputFormat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class TestMyInputFormat {

	public static class MapperClass extends Mapper {

		public void map(LongWritable key, Text value, Context context) throws IOException,
				InterruptedException {
			System.out.println("key:\t " + key);
			System.out.println("value:\t " + value);
			System.out.println("-------------------------");
		}
	}

	public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
		Configuration conf = new Configuration();
		 Path outPath = new Path("/hive/11");
		 FileSystem.get(conf).delete(outPath, true);
		Job job = new Job(conf, "TestMyInputFormat");
		job.setInputFormatClass(TrackInputFormat.class);
		job.setJarByClass(TestMyInputFormat.class);
		job.setMapperClass(TestMyInputFormat.MapperClass.class);
		job.setNumReduceTasks(0);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, outPath);

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

3、測(cè)試數(shù)據(jù):

  cookieId    time     url                 cookieOverFlag

1       a        1_hao123
1       a        1_baidu
1       b        1_google       2END
2       c        2_google
2       c        2_hao123
2       c        2_google       1END
3       a        3_baidu
3       a        3_sougou
3       b        3_soso         2END

4、結(jié)果:

key:	 0
value:	 1	a	1_hao123	
1	a	 1_baidu	
1	b	 1_google	2
-------------------------
key:	 47
value:	 2	c	 2_google	
2	c	 2_hao123	
2	c	 2_google	1
-------------------------
key:	 96
value:	 3	a	 3_baidu	
3	a	 3_sougou	
3	b	 3_soso	2
-------------------------

看完了這篇文章,相信你對(duì)“如何自定義hadoop MapReduce InputFormat切分輸入文件”有了一定的了解,如果想了解更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!


當(dāng)前標(biāo)題:如何自定義hadoopMapReduceInputFormat切分輸入文件
標(biāo)題來(lái)源:http://weahome.cn/article/ghppge.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部