成都創(chuàng)新互聯(lián)始終堅(jiān)持【策劃先行,效果至上】的經(jīng)營(yíng)理念,通過(guò)多達(dá)十載累計(jì)超上千家客戶的網(wǎng)站建設(shè)總結(jié)了一套系統(tǒng)有效的全網(wǎng)營(yíng)銷推廣解決方案,現(xiàn)已廣泛運(yùn)用于各行各業(yè)的客戶,其中包括:成都陽(yáng)臺(tái)護(hù)欄等企業(yè),備受客戶贊許。
魯春利的工作筆記,誰(shuí)說(shuō)程序員不能有文藝范?
環(huán)境:
hadoop-2.6.0
hbase-1.0.1
zookeeper-3.4.6
1、Hadoop集群配置過(guò)程略;
2、Zookeeper集群配置過(guò)程略;
3、HBase集群配置過(guò)程略;
4、HBase作為輸入源示例
查看當(dāng)前hbase表m_domain中的數(shù)據(jù)
[hadoop@dnode1 conf]$ hbase shell HBase Shell; enter 'help' for list of supported commands. Type "exit " to leave the HBase Shell Version 1.0.1, r66a93c09df3b12ff7b86c39bc8475c60e15af82d, Fri Apr 17 22:14:06 PDT 2015 hbase(main):001:0> list TABLE m_domain t_domain 2 row(s) in 0.9270 seconds => ["m_domain", "t_domain"] hbase(main):002:0> scan 'm_domain' ROW COLUMN+CELL alibaba.com_19990415_20220523 column=cf:access_server, timestamp=1440947490018, value=\xE6\x9D\xAD\xE5\xB7\x9E alibaba.com_19990415_20220523 column=cf:exp_date, timestamp=1440947490018, value=2022\xE5\xB9\xB405\xE6\x9C\x8823\xE6\x97\xA5 alibaba.com_19990415_20220523 column=cf:ipstr, timestamp=1440947490018, value=205.204.101.42 alibaba.com_19990415_20220523 column=cf:owner, timestamp=1440947490018, value=Hangzhou Alibaba Advertising Co. alibaba.com_19990415_20220523 column=cf:reg_date, timestamp=1440947490018, value=1999\xE5\xB9\xB404\xE6\x9C\x8815\xE6\x97\xA5 baidu.com_19991011_20151011 column=cf:access_server, timestamp=1440947489956, value=\xE5\x8C\x97\xE4\xBA\xAC baidu.com_19991011_20151011 column=cf:exp_date, timestamp=1440947489956, value=2015\xE5\xB9\xB410\xE6\x9C\x8811\xE6\x97\xA5 baidu.com_19991011_20151011 column=cf:ipstr, timestamp=1440947489956, value=220.181.57.217 baidu.com_19991011_20151011 column=cf:reg_date, timestamp=1440947489956, value=1999\xE5\xB9\xB410\xE6\x9C\x8811\xE6\x97\xA5 2 row(s) in 1.4560 seconds hbase(main):003:0> quit
實(shí)現(xiàn)Mapper端
package com.invic.mapreduce.hbase.source; import java.io.IOException; import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; import java.util.Set; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; /** * * @author lucl * TableMapper擴(kuò)展自Mapper類,所有以HBase作為輸入源的Mapper類都需要繼承該類 */ public class HBaseReaderMapper extends TableMapper{ private Text key = new Text(); private Text value = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override protected void map(ImmutableBytesWritable row, Result result,Context context) throws IOException, InterruptedException { // 可以明確給定family { NavigableMap map = result.getFamilyMap("cf".getBytes()); Set > values = map.entrySet(); for (Entry entry : values) { String columnQualifier = new String(entry.getKey()); String cellValue = new String(entry.getValue()); System.out.println(columnQualifier + "\t" + cellValue); // } } // 存在多個(gè)列族或者不確定列族名字 { String rowKey = new String(row.get()); byte [] columnFamily = null; byte [] columnQualifier = null; byte [] cellValue = null; StringBuffer sbf = new StringBuffer(1024); for (Cell cell : result.listCells()) { columnFamily = CellUtil.cloneFamily(cell); columnQualifier = CellUtil.cloneQualifier(cell); cellValue = CellUtil.cloneValue(cell); sbf.append(Bytes.toString(columnFamily)); sbf.append("."); sbf.append(Bytes.toString(columnQualifier)); sbf.append(":"); sbf.append(new String(cellValue, "UTF-8")); } key.set(rowKey); value.set(sbf.toString()); context.write(key, value); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException{ super.cleanup(context); } }
實(shí)現(xiàn)MapReduce的Driver類
package com.invic.mapreduce.hbase.source; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * * @author lucl * HBase作為輸入源示例 * */ public class HBaseASDataSourceDriver extends Configured implements Tool { /** * * @param args * @throws Exception */ public static void main(String[] args) throws Exception { // System.setProperty("hadoop.home.dir", "E:\\hadoop-2.6.0\\hadoop-2.6.0\\"); int exit = ToolRunner.run(new HBaseASDataSourceDriver(), args); System.out.println("receive exit : " + exit); } @Override public int run(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); // hadoop的參數(shù)配置 /*conf.set("fs.defaultFS", "hdfs://cluster"); conf.set("dfs.nameservices", "cluster"); conf.set("dfs.ha.namenodes.cluster", "nn1,nn2"); conf.set("dfs.namenode.rpc-address.cluster.nn1", "nnode:8020"); conf.set("dfs.namenode.rpc-address.cluster.nn2", "dnode1:8020"); conf.set("dfs.client.failover.proxy.provider.cluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");*/ // hbase master // property "hbase.master" has been deprecated since 0.90 // Just passing the ZK configuration makes your client auto-discover the master // conf.set("hbase.master", "nnode:60000"); // zookeeper quorum getConf().set("hbase.zookeeper.property.clientport", "2181"); getConf().set("hbase.zookeeper.quorum", "nnode,dnode1,dnode2"); // 是否對(duì)Map Task啟用推測(cè)執(zhí)行機(jī)制 getConf().setBoolean("mapreduce.map.speculative", false); // 是否對(duì)Reduce Task啟用推測(cè)執(zhí)行機(jī)制 getConf().setBoolean("mapreduce.reduce.speculative", false); Job job = Job.getInstance(conf); job.setJobName("MyBaseReaderFromHBase"); job.setJarByClass(HBaseASDataSourceDriver.class); job.setOutputFormatClass(TextOutputFormat.class); /** * 從HBase讀取數(shù)據(jù)時(shí)數(shù)據(jù)會(huì)傳給下面定義的Mapper來(lái),在Mapper類中進(jìn)行了數(shù)據(jù)的處理 * 由于在job中未指定Reducer類,會(huì)調(diào)用默認(rèn)的Reducer類來(lái)將Mapper的輸出原封不動(dòng)的寫(xiě)入; * 如果需要在Reducer中再做些其他的單獨(dú)的處理,則可以自定義Reducer類再做些處理。 */ Scan scan = new Scan(); // scan.addFamily(family); // scan.addColumn(family, qualifier); byte [] tableName = Bytes.toBytes("m_domain"); TableMapReduceUtil.initTableMapperJob(tableName, scan, HBaseReaderMapper.class, Text.class, Text.class, job); Path path = new Path("/" + System.currentTimeMillis()); FileOutputFormat.setOutputPath(job, path); return job.waitForCompletion(true) ? 0 : 1; } }
查看結(jié)果:
問(wèn)題記錄:
a. 通過(guò)Eclipse執(zhí)行時(shí)報(bào)錯(cuò),但未分析出原因
b. 放到集群環(huán)境中運(yùn)行時(shí)Mapper類如果定義在Driver類中,則報(bào)錯(cuò)
ClassNotFound for HBaseASDataSourceDriver$HBaseReaderMapper init()
c. zookeeper連接符總是顯示連接的為127.0.0.1而非配置的zookeeper.quorum
如果zookeeper集群環(huán)境與hbase環(huán)境在不同的機(jī)器不知道是否會(huì)出現(xiàn)問(wèn)題。
5、Hbase作為輸出源示例
文本文件內(nèi)容如下:
2013-09-13 16:04:08 www.subnetc1.com 192.168.1.7 80 192.168.1.139 18863 HTTP www.subnetc1.com/index.html 2013-09-13 16:04:08 www.subnetc2.com 192.168.1.7 80 192.168.1.159 14100 HTTP www.subnetc2.com/index.html 2013-09-13 16:04:08 www.subnetc3.com 192.168.1.7 80 192.168.1.130 4927 HTTP www.subnetc3.com/index.html 2013-09-13 16:04:08 www.subnetc4.com 192.168.1.7 80 192.168.1.154 39044 HTTP www.subnetc4.com/index.html
Map端代碼:
package com.invic.mapreduce.hbase.target; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MyMapper extends Mapper
Reducer端代碼:
package com.invic.mapreduce.hbase.target; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; /** * * @author lucl * */ public class MyReducer extends TableReducer{ @Override public void reduce(Text key, Iterable value, Context context) throws IOException, InterruptedException { // for wordcount // TableReducer // Iterable /*{ int sum = 0; for (Iterator it = value.iterator(); it.hasNext(); ) { IntWritable val = it.next(); sum += val.get(); } Put put = new Put(key.getBytes()); // sum為Integer類型,需要先轉(zhuǎn)為S他ring,然后再取byte值,否則查看數(shù)據(jù)時(shí)無(wú)法顯示sum的值 byte [] datas = Bytes.toBytes(String.valueOf(sum)); put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), datas); context.write(new ImmutableBytesWritable(key.getBytes()), put); }*/ // 需要將多列寫(xiě)入HBase // TableReducer // Iterable value { byte [] family = "cf".getBytes(); Put put = new Put(key.getBytes()); StringBuffer sbf = new StringBuffer(); for (Text text : value) { sbf.append(text.toString()); } put.addColumn(family, Bytes.toBytes("detail"), Bytes.toBytes(sbf.toString())); context.write(new ImmutableBytesWritable(key.getBytes()), put); } } }
Driver驅(qū)動(dòng)類:
package com.invic.mapreduce.hbase.target; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * * @author lucl * HBase作為輸出源示例 * */ public class HBaseASDataTargetDriver extends Configured implements Tool { private static final String TABLE_NAME = "t_inter_log"; private static final String COLUMN_FAMILY_NAME = "cf"; /** * * @param args * @throws Exception */ public static void main(String[] args) throws Exception { // for eclipse // System.setProperty("hadoop.home.dir", "E:\\hadoop-2.6.0\\hadoop-2.6.0\\"); int exit = ToolRunner.run(new HBaseASDataTargetDriver(), args); System.out.println("receive exit : " + exit); } @Override public int run(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(getConf()); // hadoop的參數(shù)配置 conf.set("fs.defaultFS", "hdfs://cluster"); conf.set("dfs.nameservices", "cluster"); conf.set("dfs.ha.namenodes.cluster", "nn1,nn2"); conf.set("dfs.namenode.rpc-address.cluster.nn1", "nnode:8020"); conf.set("dfs.namenode.rpc-address.cluster.nn2", "dnode1:8020"); conf.set("dfs.client.failover.proxy.provider.cluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); // hbase master // property "hbase.master" has been deprecated since 0.90 // Just passing the ZK configuration makes your client auto-discover the master // conf.set("hbase.master", "nnode:60000"); // zookeeper quorum conf.set("hbase.zookeeper.property.clientport", "2181"); conf.set("hbase.zookeeper.quorum", "nnode,dnode1,dnode2"); // 是否對(duì)Map Task啟用推測(cè)執(zhí)行機(jī)制 conf.setBoolean("mapreduce.map.speculative", false); // 是否對(duì)Reduce Task啟用推測(cè)執(zhí)行機(jī)制 conf.setBoolean("mapreduce.reduce.speculative", false); /** * HBase創(chuàng)建表 */ Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); TableName tableName = TableName.valueOf(TABLE_NAME); boolean exists = admin.tableExists(tableName); if (exists) { admin.disableTable(tableName); admin.deleteTable(tableName); } HTableDescriptor tableDesc = new HTableDescriptor(tableName); HColumnDescriptor columnDesc = new HColumnDescriptor(COLUMN_FAMILY_NAME); tableDesc.addFamily(columnDesc); admin.createTable(tableDesc); /** * 讀取文件內(nèi)容 */ String fileName = "http_interceptor_20130913.txt"; Job job = Job.getInstance(conf); job.setJobName("MyBaseWriterToHBase"); job.setJarByClass(HBaseASDataTargetDriver.class); job.setMapperClass(MyMapper.class); /** * MapReduce讀取文本文件時(shí)默認(rèn)的的四個(gè)參數(shù)(KeyIn, ValueIn,KeyOut,ValueOut) * 說(shuō)明: * 默認(rèn)情況下KeyIn為IntWrite類型,為在文本文件中的偏移量,ValueIn為一行數(shù)據(jù) * 第一次測(cè)試時(shí)未設(shè)置的設(shè)置map端輸出的key-value類型,程序執(zhí)行正常 * 第二次增加map端輸出的key-value類型設(shè)置 * job.setMapOutputKeyClass * job.setMapOutputValueClass * Hadoop應(yīng)用開(kāi)發(fā)技術(shù)詳解2015年1月第1版P191頁(yè)寫(xiě)的: * map端輸出的key-value默認(rèn)類型分別為L(zhǎng)ongWritable和Text * 根據(jù)示例程序MyMapper中實(shí)現(xiàn)的map端輸出的key-value實(shí)際為T(mén)ext和IntWritable * // job.setMapOutputKeyClass(LongWritable.class); // job.setMapOutputValueClass(Text.class); // 設(shè)置后頁(yè)面調(diào)用時(shí)報(bào)錯(cuò)如下: 15/09/04 22:19:06 INFO mapreduce.Job: Task Id : attempt_1441346242717_0014_m_000000_0, Status : FAILED Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.LongWritable, received org.apache.hadoop.io.Text at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1069) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712) at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) at com.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:21) at com.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:1) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) 第三次設(shè)置為與Mapper類中一致的,程序執(zhí)行正確。 */ job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 下面這句話不能加,在測(cè)試中發(fā)現(xiàn)加了這句話竟然報(bào)錯(cuò)找不到MyReducer類了。 // job.setReducerClass(MyReducer.class); Path path = new Path(fileName); FileInputFormat.addInputPath(job, path); TableMapReduceUtil.initTableReducerJob(TABLE_NAME, MyReducer.class, job); // for wordcount // job.setOutputKeyClass(Text.class); // job.setOutputValueClass(IntWritable.class); // for multi columns job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); return job.waitForCompletion(true) ? 0 : 1; } }
未設(shè)置Map輸出的key-value的類型時(shí)報(bào)錯(cuò)如下(wordcount的示例未報(bào)錯(cuò),在Hadoop應(yīng)用開(kāi)發(fā)技術(shù)詳解中說(shuō)map端輸出的key-value默認(rèn)類型為:LongWritable.class和Text.class,但是wordcount示例中map端輸出的key-value類型卻為T(mén)ext.class和IntWritable):
15/09/04 21:15:54 INFO mapreduce.Job: map 0% reduce 0% 15/09/04 21:16:27 INFO mapreduce.Job: Task Id : attempt_1441346242717_0011_m_000000_0, Status : FAILED Error: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.IntWritable, received org.apache.hadoop.io.Text at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1074) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712) at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) at com.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:29) at com.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:1) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) # 由于出現(xiàn)錯(cuò)誤時(shí)Map端為0%,所以分析問(wèn)題出現(xiàn)在map端,且根據(jù)提示信息說(shuō)明默認(rèn)value應(yīng)該是IntWritable,我第二次的示例與wordcount的差別主要在map端輸出的value由IntWritabe->Text,設(shè)置了如下參數(shù)后問(wèn)題解決。 # job.setMapOutputKeyClass(Text.class); # job.setMapOutputValueClass(Text.class);
wordcount及數(shù)據(jù)入庫(kù)示例程序執(zhí)行結(jié)果驗(yàn)證:
hbase(main):005:0> scan 't_inter_log' ROW COLUMN+CELL 14100 column=cf:count, timestamp=1441370812728, value=1 16:04:08 column=cf:count, timestamp=1441370812728, value=4 18863:08 column=cf:count, timestamp=1441370812728, value=1 192.168.1.130 column=cf:count, timestamp=1441370812728, value=1 192.168.1.139 column=cf:count, timestamp=1441370812728, value=1 192.168.1.154 column=cf:count, timestamp=1441370812728, value=1 192.168.1.159 column=cf:count, timestamp=1441370812728, value=1 192.168.1.759 column=cf:count, timestamp=1441370812728, value=4 2013-09-13759 column=cf:count, timestamp=1441370812728, value=4 3904409-13759 column=cf:count, timestamp=1441370812728, value=1 4927409-13759 column=cf:count, timestamp=1441370812728, value=1 8027409-13759 column=cf:count, timestamp=1441370812728, value=4 HTTP409-13759 column=cf:count, timestamp=1441370812728, value=4 www.subnetc1.com column=cf:count, timestamp=1441370812728, value=1 www.subnetc1.com/index.html column=cf:count, timestamp=1441370812728, value=1 www.subnetc2.com/index.html column=cf:count, timestamp=1441370812728, value=1 www.subnetc3.com/index.html column=cf:count, timestamp=1441370812728, value=1 www.subnetc4.com/index.html column=cf:count, timestamp=1441370812728, value=1 18 row(s) in 1.2290 seconds # 每次執(zhí)行時(shí)都會(huì)先刪除t_inter_log表 hbase(main):007:0> scan 't_inter_log' ROW COLUMN+CELL www.subnetc1.com column=cf:detail, timestamp=1441373481468, value=2013-09-13 16:04:08\x09www.subnetc1.com\x09192.168.1.7\x0980\x09192.168.1.139\x0918863\x09HTTP\x09www.subnetc1.com/index.html www.subnetc2.com column=cf:detail, timestamp=1441373481468, value=2013-09-13 16:04:08\x09www.subnetc2.com\x09192.168.1.7\x0980\x09192.168.1.159\x0914100\x09HTTP\x09www.subnetc2.com/index.html www.subnetc3.com column=cf:detail, timestamp=1441373481468, value=2013-09-13 16:04:08\x09www.subnetc3.com\x09192.168.1.7\x0980\x09192.168.1.130\x094927\x09HTTP\x09www.subnetc3.com/index.html www.subnetc4.com column=cf:detail, timestamp=1441373481468, value=2013-09-13 16:04:08\x09www.subnetc4.com\x09192.168.1.7\x0980\x09192.168.1.154\x0939044\x09HTTP\x09www.subnetc4.com/index.html 4 row(s) in 3.3280 seconds
6、HBase作為共享源示例