這篇文章主要介紹hadoop streaming如何實(shí)現(xiàn)多路輸出擴(kuò)展,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!
成都創(chuàng)新互聯(lián)專業(yè)為企業(yè)提供冷水江網(wǎng)站建設(shè)、冷水江做網(wǎng)站、冷水江網(wǎng)站設(shè)計(jì)、冷水江網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計(jì)與制作、冷水江企業(yè)網(wǎng)站模板建站服務(wù),十余年冷水江做網(wǎng)站經(jīng)驗(yàn),不只是建網(wǎng)站,更提供有價(jià)值的思路和整體網(wǎng)絡(luò)服務(wù)。
PrefixMultipleOutputFormat 實(shí)現(xiàn)的功能點(diǎn)有兩個(gè)
按照key的前綴輸入到不同的目錄
刪除最終輸出結(jié)果中的tab
##使用方式### ####按照key 的 前綴輸出到不同目錄中
$maserati_hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -libjars ./adts.jar -D mapred.job.name=$name \ -D mapred.reduce.tasks=5 \ -inputformat org.apache.hadoop.mapred.TextInputFormat \ -outputformat com.sogou.adt.adts.PrefixMultipleOutputFormat \ -input $input \ -output $output \ -mapper ./m_mapper.sh \ -reducer ./m_reducer.sh \ -file m_mapper.sh \ -file m_reducer.sh
其中outputformat
指定的是 自己時(shí)間的類 -libjars ./adts.jar
導(dǎo)入的是自己的jar包
###mapper 和 reduer.sh ##m_maper.sh## #!/bin/bash awk -F " " '{ for(i=1;i<=NF;i++) print $i; }' ###m_reduer.sh### #!/bin/bash awk -F "\t" '{ if(NR%3==0) print "A#"$1; if(NR%3==1) print "B#"$1; if(NR%3==2) print "C#"$1; }'
這樣就可以將數(shù)字分別輸入到不同的路徑中了
####刪除行尾的tab 只需要加入com.sogou.adt.adts.ignoreseparator=true
指定忽略行尾的tab 即可
$maserati_hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -libjars ./adts.jar -D mapred.job.name=$name \ -D mapred.reduce.tasks=5 \ -D com.sogou.adt.adts.ignoreseparator=true \ -inputformat org.apache.hadoop.mapred.TextInputFormat \ -outputformat com.sogou.adt.adts.PrefixMultipleOutputFormat \ -input $input \ -output $output \ -mapper ./m_mapper.sh \ -reducer ./m_reducer.sh \ -file m_mapper.sh \ -file m_reducer.sh
###PrefixMultipleOutputFormat的實(shí)現(xiàn)方式 由于并不熟悉java語言,在大學(xué)學(xué)的那點(diǎn)java也早就還給老師了^v^ 搭建編譯環(huán)境費(fèi)了些時(shí)日,不過好在有個(gè)現(xiàn)成的eclipse java 環(huán)境 還有兩年前搭建好的hadoop環(huán)境(它稍微修復(fù)一點(diǎn)點(diǎn)就ok了, 能跑程序了, 真是萬幸)。
###我的環(huán)境
eclipse
jdk1.6.0
jar包
hadoop-common-2.6.0.jar
hadoop-mapreduce-client-core-2.6.0.jar
這個(gè)簡單介紹一下 編譯之前我還在擔(dān)心hadoop streaming 依賴的jar包哪里去找,用不用自己編譯(hadoop所有的源碼編譯讓人有點(diǎn)頭疼),后來發(fā)現(xiàn)jar 包都可以在 hadoop 運(yùn)行環(huán)境中找到,瞬間釋然了。
###源碼 這段代碼挺好理解的了一個(gè)LineRecordWriter類 (大部分都是從現(xiàn)有的TextOutputFormat 類中扒的 只是改動(dòng)一點(diǎn) 讀配置 關(guān)閉輸出tab) generateFileNameForKeyValue
實(shí)現(xiàn)了從前綴讀取并輸出到不同的目錄中,代碼一目了然
package com.sogou.adt.adts; import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; public class PrefixMultipleOutputFormat extends MultipleTextOutputFormat{ [@Override](https://my.oschina.net/u/1162528) protected Text generateActualKey(Text key, Text value) { // TODO Auto-generated method stub return super.generateActualKey(key, value); } protected static class LineRecordWriter implements RecordWriter { private static final String utf8 = "UTF-8"; private static final byte[] newline; static { try { newline = "\n".getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public LineRecordWriter(DataOutputStream out) { this(out, "\t"); } /** * Write the object to the byte stream, handling Text as a special * case. * [@param](https://my.oschina.net/u/2303379) o the object to print * [@throws](https://my.oschina.net/throws) IOException if the write throws, we pass it on */ private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(utf8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write(newline); } public synchronized void close(Reporter reporter) throws IOException { out.close(); } } [@Override](https://my.oschina.net/u/1162528) protected RecordWriter getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException { boolean isCompressed = getCompressOutput(job); String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t"); Boolean ignoreseparator = job.getBoolean("com.sogou.adt.adts.ignoreseparator", false); if(ignoreseparator) { keyValueSeparator=""; } if (!isCompressed) { Path file = FileOutputFormat.getTaskOutputPath(job, name); fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, arg3); return new LineRecordWriter (fileOut, keyValueSeparator); } else { Class extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); // create the named codec CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job); // build the filename including the extension Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension()); fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, arg3); return new LineRecordWriter (new DataOutputStream (codec.createOutputStream(fileOut)), keyValueSeparator); } } [@Override](https://my.oschina.net/u/1162528) protected String generateFileNameForKeyValue(Text key, Text value, String name) { int keyLength = key.getLength(); String outputName = name; if(keyLength < 2) return outputName; Text sep = new Text(); sep.append(key.getBytes(), 1, 1); if(sep.find("#") != -1) { Text newFlag = new Text(); newFlag.append(key.getBytes(), 0, 1); String flag = newFlag.toString(); //outputName = name+"-"+flag; outputName = flag+"/"+name+"-"+flag; Text newValue = new Text(); newValue.append(key.getBytes(), 2, keyLength-2); key.set(newValue); } System.out.printf("[shishuai]System[key [%s]][value:[%s]] output[%s]\n",key.toString(),value.toString(),outputName); return outputName; }
}
以上是“hadoop streaming如何實(shí)現(xiàn)多路輸出擴(kuò)展”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對(duì)大家有幫助,更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!