p>首先編寫WordCountDriver:
創(chuàng)新互聯(lián)公司服務(wù)項(xiàng)目包括鄂溫克網(wǎng)站建設(shè)、鄂溫克網(wǎng)站制作、鄂溫克網(wǎng)頁(yè)制作以及鄂溫克網(wǎng)絡(luò)營(yíng)銷策劃等。多年來(lái),我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢(shì)、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,鄂溫克網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到鄂溫克省份的部分城市,未來(lái)相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!
package com.jym.hadoop.mr.demo;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 這個(gè)程序相當(dāng)于一個(gè)yarn集群的客戶端,
* 需要在此封裝我們的mr程序的相關(guān)運(yùn)行參數(shù),指定jar包,
* 最后提交給yarn
* */
public class WordcountDriver
{
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException
{
Configuration conf=new Configuration();
/*其實(shí)如果在本地運(yùn)行MR程序其實(shí)不用配置下面的代碼程序,在MR默認(rèn)下就是本地運(yùn)行*/
/**下面這段代碼配置的是在本地模式下運(yùn)行MR程序*/
/**是否運(yùn)行為本地模式,就是看這個(gè)參數(shù)值是否為local,默認(rèn)就是local;*/
//conf.set("mapreduce.framework.name", "local"); //在本地運(yùn)行MR程序
//本地模式運(yùn)行MR程序時(shí),輸入輸出的數(shù)據(jù)可以在本地,也可以在hdfs上
//到底在哪里,就看以下兩行配置用哪一行了,默認(rèn)是“file:///”
/**conf.set("fs.defaultFS", "hdfs://hadoop1:9000");*/ //使用的是HDFS系統(tǒng)
//conf.set("fs.defaultFS", "file:///"); //使用的是本地Windows磁盤
/**運(yùn)行集群模式,就是把程序提交到y(tǒng)arn中去運(yùn)行
* 要想運(yùn)行為集群模式,以下3個(gè)參數(shù)要指定為集群上的值
* */
conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.hostname", "hadoop1");
conf.set("fs.defaultFS", "hdfs://hadoop1:9000");
Job job = Job.getInstance(conf);
/**要想在Windows的Eclipse上運(yùn)行程序,并提交到hadoop的YARN集群上需要指定jar包,如下:*/
/**job.setJar("c:/wc.jar");*/
//job.setJar("/home/hadoop/wc.jar"); //這種是將程序打包成jar包,放到指定的位置,缺乏靈活性,不建議使用;
//指定本程序的jar包所在的本地路徑
job.setJarByClass(WordcountDriver.class);
//指定本業(yè)務(wù)job要使用的mapper/reducer業(yè)務(wù)類
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducerr.class);
//指定mapper輸出數(shù)據(jù)的kv類型;
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定最終輸出的數(shù)據(jù)的kv類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定需要使用的combiner,以及用哪一個(gè)類作為combiner的邏輯
/*job.setCombinerClass(WordcountCombiner.class);*/
job.setCombinerClass(WordcountReducerr.class);
/**因?yàn)閏ombiner的工作原理通reducecer的作用是一樣的,所以直接反射調(diào)用reducerr類其實(shí)作用是一樣的*/
/**此處為之后為測(cè)試添加的*/
//如果不設(shè)置InputFormat,它默認(rèn)使用的是TextInputFormat.class
/**job.setInputFormatClass(CombineTextInputFormatInputFormatInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);
*/
//指定job的輸入原始文件所在目錄
//FileInputFormat.setInputPaths(job, new Path("/wordcount/input")); //此處添加的路徑為HDFS文件系統(tǒng)的路徑;
FileInputFormat.setInputPaths(job, new Path(args[0])); //傳一個(gè)路徑參數(shù)
//指定job的輸出結(jié)果所在目錄
FileOutputFormat.setOutputPath(job, new Path(args[1])); //傳一個(gè)參數(shù)進(jìn)來(lái)作為輸出的路徑參數(shù)
//將job中配置的相關(guān)參數(shù),以及job所用的Java類所在的jar包,提交給yarn去運(yùn)行;
/*job.submit(); */
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
其次編寫WordCountMapper:
package com.jym.hadoop.mr.demo;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
//這是一個(gè)簡(jiǎn)單的MapReduce例子,進(jìn)行單詞數(shù)量的統(tǒng)計(jì)操作;
import org.apache.hadoop.mapreduce.Mapper;
/**
* KEYIN:默認(rèn)情況下,是mr框架所讀到的一行文本的起始偏移量,Long類型,但是在Hadoop中有更精簡(jiǎn)的序列化接口,因此采用LongWritable類型;
* VALUEIN:默認(rèn)情況下,是mr框架所讀到的一行文本的內(nèi)容,String類型的,同上用Text(org.apache.hadoop.io.Text)類型;
* KEYOUT:是用戶自定義邏輯處理完成之后輸出數(shù)據(jù)中的key,在此處是單詞,為String類型,同上用Text類型;
* VALUEOUT:是用戶自定義邏輯處理完成之后輸出數(shù)據(jù)中的value,在此處是單詞數(shù)量,為Integer類型,同上用IntWritable類型;
* */
public class WordcountMapper extends Mapper
{
/**
* map階段的業(yè)務(wù)邏輯就寫在自定義的map()方法中,
* maptask會(huì)對(duì)每一行輸入數(shù)據(jù)調(diào)用一次我們自定義的map()方法;
* */
@Override //覆寫Mapper中的方法;
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
//將maptask傳給我們的文本內(nèi)容先轉(zhuǎn)換成String類型
String line = value.toString();
//根據(jù)空格將這一行切分成單詞;
String[] words = line.split(" ");
//將單詞輸出為<單詞,1>
for(String word:words)
{
//將單詞作為key,將次數(shù)1作為value,以便于后續(xù)的數(shù)據(jù)分發(fā),可以根據(jù)單詞分發(fā),以便于相同單詞會(huì)分到相同的reduce task中;
context.write(new Text(word),new IntWritable(1)); //進(jìn)行類型轉(zhuǎn)換一下;
}無(wú)錫×××醫(yī)院 https://yyk.familydoctor.com.cn/20612/
}
最后編寫WordCountReduceer:
package com.jym.hadoop.mr.demo;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* KEYIN,VALUEIN應(yīng)該對(duì)應(yīng)mapper中的輸出的KEYOUT,VALUEOUT類型;
* KEYOUT是單詞
* VALUEOUT是總次數(shù)*/
public class WordcountReducerr extends Reducer
{
/**
* 例如:
*
* 輸入?yún)?shù)key,是一組相同單詞kv對(duì)的key
* */
@Override
protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException
{
int count= 0;
/* //采用迭代器的方式進(jìn)行統(tǒng)計(jì)單詞的數(shù)量;
Iterator iterator = values.iterator();
while(iterator.hasNext())
{
count+=iterator.next().get(); //獲取key對(duì)應(yīng)的value值
}
*/
//下面的for循環(huán)和上面注釋中的效果是一樣的;
for(IntWritable value:values)
{
count+=value.get();
}
//輸出統(tǒng)計(jì)結(jié)果
context.write(key, new IntWritable(count));
/**
* 默認(rèn)情況下reduce task會(huì)將輸出結(jié)果放到一個(gè)文件中(最好是HDFS文件系統(tǒng)上的一個(gè)文件)
* */
}
}
然而還可以編寫一個(gè)Combiner類:
package com.jym.hadoop.mr.demo;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
* 此處的這個(gè)combiner其實(shí)不用自己編寫,因?yàn)閏ombiner的工作原理同reducer的原理是一樣
* 的,故可以直接反射調(diào)用WordcountReducer類即可
* */
public class WordcountCombiner extends Reducer
{
@Override
protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException
{
}