本篇文章給大家分享的是有關(guān)MapReduce中怎么實(shí)現(xiàn)自定義排序功能,小編覺(jué)得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話(huà)不多說(shuō),跟著小編一起來(lái)看看吧。
在滄縣等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場(chǎng)前瞻性、產(chǎn)品創(chuàng)新能力,以專(zhuān)注、極致的服務(wù)理念,為客戶(hù)提供成都做網(wǎng)站、網(wǎng)站制作 網(wǎng)站設(shè)計(jì)制作按需開(kāi)發(fā),公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),品牌網(wǎng)站制作,網(wǎng)絡(luò)營(yíng)銷(xiāo)推廣,外貿(mào)網(wǎng)站建設(shè),滄縣網(wǎng)站建設(shè)費(fèi)用合理。本文測(cè)試文本:
tom 20 8000 nancy 22 8000 ketty 22 9000 stone 19 10000 green 19 11000 white 39 29000 socrates 30 40000
???MapReduce中,根據(jù)key進(jìn)行分區(qū)、排序、分組
MapReduce會(huì)按照基本類(lèi)型對(duì)應(yīng)的key進(jìn)行排序,如int類(lèi)型的IntWritable,long類(lèi)型的LongWritable,Text類(lèi)型,默認(rèn)升序排序
???為什么要自定義排序規(guī)則?現(xiàn)有需求,需要自定義key類(lèi)型,并自定義key的排序規(guī)則,如按照人的salary降序排序,若相同,則再按age升序排序
以Text類(lèi)型為例:
Text類(lèi)實(shí)現(xiàn)了WritableComparable
接口,并且有write()
、readFields()
和compare()
方法readFields()
方法:用來(lái)反序列化操作write()
方法:用來(lái)序列化操作
所以要想自定義類(lèi)型用來(lái)排序需要有以上的方法
自定義類(lèi)代碼:
import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class Person implements WritableComparable{ private String name; private int age; private int salary; public Person() { } public Person(String name, int age, int salary) { //super(); this.name = name; this.age = age; this.salary = salary; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public int getSalary() { return salary; } public void setSalary(int salary) { this.salary = salary; } @Override public String toString() { return this.salary + " " + this.age + " " + this.name; } //先比較salary,高的排序在前;若相同,age小的在前 public int compareTo(Person o) { int compareResult1= this.salary - o.salary; if(compareResult1 != 0) { return -compareResult1; } else { return this.age - o.age; } } //序列化,將NewKey轉(zhuǎn)化成使用流傳送的二進(jìn)制 public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(name); dataOutput.writeInt(age); dataOutput.writeInt(salary); } //使用in讀字段的順序,要與write方法中寫(xiě)的順序保持一致 public void readFields(DataInput dataInput) throws IOException { //read string this.name = dataInput.readUTF(); this.age = dataInput.readInt(); this.salary = dataInput.readInt(); } }
MapReuduce程序:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.net.URI; public class SecondarySort { public static void main(String[] args) throws Exception { System.setProperty("HADOOP_USER_NAME","hadoop2.7"); Configuration configuration = new Configuration(); //設(shè)置本地運(yùn)行的mapreduce程序 jar包 configuration.set("mapreduce.job.jar","C:\\Users\\tanglei1\\IdeaProjects\\Hadooptang\\target\\com.kaikeba.hadoop-1.0-SNAPSHOT.jar"); Job job = Job.getInstance(configuration, SecondarySort.class.getSimpleName()); FileSystem fileSystem = FileSystem.get(URI.create(args[1]), configuration); if (fileSystem.exists(new Path(args[1]))) { fileSystem.delete(new Path(args[1]), true); } FileInputFormat.setInputPaths(job, new Path(args[0])); job.setMapperClass(MyMap.class); job.setMapOutputKeyClass(Person.class); job.setMapOutputValueClass(NullWritable.class); //設(shè)置reduce的個(gè)數(shù) job.setNumReduceTasks(1); job.setReducerClass(MyReduce.class); job.setOutputKeyClass(Person.class); job.setOutputValueClass(NullWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } public static class MyMap extends Mapper{ //LongWritable:輸入?yún)?shù)鍵類(lèi)型,Text:輸入?yún)?shù)值類(lèi)型 //Persion:輸出參數(shù)鍵類(lèi)型,NullWritable:輸出參數(shù)值類(lèi)型 @Override //map的輸出值是鍵值對(duì) ,NullWritable說(shuō)關(guān)心V的值 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //LongWritable key:輸入?yún)?shù)鍵值對(duì)的鍵,Text value:輸入?yún)?shù)鍵值對(duì)的值 //獲得一行數(shù)據(jù),輸入?yún)?shù)的鍵(距首行的位置),Hadoop讀取數(shù)據(jù)的時(shí)候逐行讀取文本 //fields:代表著文本一行的的數(shù)據(jù) String[] fields = value.toString().split(" "); // 本列中文本一行數(shù)據(jù):nancy 22 8000 String name = fields[0]; //字符串轉(zhuǎn)換成int int age = Integer.parseInt(fields[1]); int salary = Integer.parseInt(fields[2]); //在自定義類(lèi)中進(jìn)行比較 Person person = new Person(name, age, salary); context.write(person, NullWritable.get()); } } public static class MyReduce extends Reducer { @Override protected void reduce(Person key, Iterable values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } }
運(yùn)行結(jié)果:
40000 30 socrates 29000 39 white 11000 19 green 10000 19 stone 9000 22 ketty 8000 20 tom 8000 22 nancy
以上就是MapReduce中怎么實(shí)現(xiàn)自定義排序功能,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見(jiàn)到或用到的。希望你能通過(guò)這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注創(chuàng)新互聯(lián)-成都網(wǎng)站建設(shè)公司行業(yè)資訊頻道。