這篇文章主要為大家展示了“Hadoop如何實現(xiàn)輔助排序”,內(nèi)容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學習一下“Hadoop如何實現(xiàn)輔助排序”這篇文章吧。
創(chuàng)新互聯(lián)公司是一家專業(yè)的成都網(wǎng)站建設公司,我們專注成都網(wǎng)站設計、網(wǎng)站制作、網(wǎng)絡營銷、企業(yè)網(wǎng)站建設,友情鏈接,廣告投放平臺為企業(yè)客戶提供一站式建站解決方案,能帶給客戶新的互聯(lián)網(wǎng)理念。從網(wǎng)站結(jié)構(gòu)的規(guī)劃UI設計到用戶體驗提高,創(chuàng)新互聯(lián)力求做到盡善盡美。
1. 樣例數(shù)據(jù)
011990-99999 SIHCCAJAVRI 012650-99999 TYNSET-HANSMOEN
012650-99999 194903241200 111 012650-99999 194903241800 78 011990-99999 195005150700 0 011990-99999 195005151200 22 011990-99999 195005151800 -11
2. 需求
3. 思路、代碼
將氣象站ID相同的氣象站信息和天氣信息交由同一個 Reducer 處理,并保證氣象站信息首先到達;然后 reduce() 函數(shù)從第一行中獲取氣象臺名稱,從第二行開始獲取天氣信息并輸出。
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableUtils; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * 組合鍵,此例中用于輔助排序,包括氣象站ID和“標記”。 * “標記”是一個虛擬字段,其唯一目的是對記錄排序,使氣象站的記錄比天氣記錄先到達。 * 雖然可以不指定數(shù)據(jù)傳輸次序,并將待處理的記錄緩存在內(nèi)存之中,但應該盡量避免這種情況, * 因為其中任何一組的記錄數(shù)量都可能非常龐大,遠遠超出 reducer 的可用內(nèi)存量 */ public class TextPair implements WritableComparable{ private Text first; private Text second; public TextPair() { set(new Text(), new Text()); } public TextPair(String first, String second) { set(new Text(first), new Text(second)); } public TextPair(Text first, Text second) { set(first, second); } public void set(Text first, Text second) { this.first = first; this.second = second; } public Text getFirst() { return first; } public Text getSecond() { return second; } public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } @Override public int hashCode() { return first.hashCode() * 163 + second.hashCode(); } @Override public boolean equals(Object obj) { if (obj instanceof TextPair) { TextPair tp = (TextPair) obj; return first.equals(tp.first) && second.equals(tp.second); } return false; } @Override public String toString() { return first + "\t" + second; } public int compareTo(TextPair o) { int cmp = first.compareTo(o.first); if (cmp == 0) { cmp = second.compareTo(o.second); } return cmp; } // RawComparator 允許直接比較數(shù)據(jù)流中的記錄,無須先把數(shù)據(jù)流反序列化為對象,這樣避免了新建對象的額外開銷 // WritableComparator 是對繼承自 WritableComparable 類的 RawComparator 的一個通用實現(xiàn)。 public static class FirstComparator extends WritableComparator { private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); public FirstComparator() { super(TextPair.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { // firstL1、firstL2 表示每個字節(jié)流中第一個 Text 字段的長度 int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); } catch (IOException e) { throw new IllegalArgumentException(e); } } @Override public int compare(WritableComparable a, WritableComparable b) { if (a instanceof TextPair && b instanceof TextPair) { return ((TextPair) a).first.compareTo(((TextPair) b).first); } return super.compare(a, b); } } }
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 標記氣象站記錄的 mapper */ public class JoinStationMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] val = value.toString().split("\\t"); if (val.length == 2) { context.write(new TextPair(val[0], "0"), new Text(val[1])); } } }
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 標記天氣記錄的 mapper */ public class JoinRecordMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] val = value.toString().split("\\t"); if (val.length == 3) { context.write(new TextPair(val[0], "1"), new Text(val[1] + "\t" + val[2])); } } }
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; /** * 連接已標記的氣象站記錄和天氣記錄的 reducer */ public class JoinReducer extends Reducer{ @Override protected void reduce(TextPair key, Iterable values, Context context) throws IOException, InterruptedException { Iterator iter = values.iterator(); Text stationName = new Text(iter.next()); // reducer 會先接收氣象站記錄(這里千萬不能寫成 Text stationName = iter.next(); ) while (iter.hasNext()) { Text record = iter.next(); Text outValue = new Text(stationName.toString() + "\t" + record.toString()); context.write(key.getFirst(), outValue); } } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class JoinRecordWithStationName { static class KeyPartitioner extends Partitioner{ @Override public int getPartition(TextPair textPair, Text text, int numPartitions) { return (textPair.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 3) { System.err.println("Parameter number is wrong, please enter three parameters:
4. 運行結(jié)果
以上是“Hadoop如何實現(xiàn)輔助排序”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學習更多知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!