這篇文章主要介紹Storm-Hbase接口怎么用,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!
10余年的長洲網(wǎng)站建設(shè)經(jīng)驗(yàn),針對設(shè)計(jì)、前端、開發(fā)、售后、文案、推廣等六對一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。營銷型網(wǎng)站的優(yōu)勢是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動調(diào)整長洲建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。創(chuàng)新互聯(lián)從事“長洲網(wǎng)站設(shè)計(jì)”,“長洲網(wǎng)站推廣”以來,每個(gè)客戶項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。
package storm.contrib.hbase.bolts; import static backtype.storm.utils.Utils.tuple; import java.util.Map; import org.apache.hadoop.hbase.HBaseConfiguration; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import storm.contrib.hbase.utils.HBaseCommunicator; import storm.contrib.hbase.utils.HBaseConnector; /* 一個(gè)讀取Hbase的Bolt,不斷的從Hbase中讀取表中的行KEY,和列,通過tuples來發(fā)送 * Reads the specified column of HBase table and emits the row key and the column values in the form of tuples */ public class HBaseColumnValueLookUpBolt implements IBasicBolt { private static final long serialVersionUID = 1L; private String tableName = null, colFamilyName = null, colName = null, rowKeyField = null, columnValue = null; private static transient HBaseConnector connector = null; private static transient HBaseConfiguration conf = null; private static transient HBaseCommunicator communicator = null; OutputCollector _collector; /* * Constructor initializes the variables storing the hbase table information and connects to hbase */ public HBaseColumnValueLookUpBolt(final String hbaseXmlLocation, final String rowKeyField, final String tableName, final String colFamilyName, final String colName) { this.tableName = tableName; this.colFamilyName = colFamilyName; this.colName = colName; this.rowKeyField = rowKeyField; connector = new HBaseConnector(); conf = connector.getHBaseConf(hbaseXmlLocation); communicator = new HBaseCommunicator(conf); } /* * emits the value of the column with name @colName and rowkey @rowKey * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector) */ public void execute(Tuple input, BasicOutputCollector collector) { String rowKey = input.getStringByField(this.rowKeyField); columnValue = communicator.getColEntry(this.tableName, rowKey, this.colFamilyName, this.colName); collector.emit(tuple(rowKey, columnValue)); } public void prepare(Map confMap, TopologyContext context, OutputCollector collector) { _collector = collector; } public void cleanup() { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("rowKey", "columnValue")); } public MapgetComponentConfiguration() { Map map = null; return map; } public void prepare(Map stormConf, TopologyContext context) { } }
package storm.contrib.hbase.bolts; import static backtype.storm.utils.Utils.tuple; import java.util.Map; import org.apache.hadoop.hbase.HBaseConfiguration; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import storm.contrib.hbase.utils.HBaseCommunicator; import storm.contrib.hbase.utils.HBaseConnector; /* * Reads the specified column of HBase table and emits the row key and the column values in the form of tuples */ public class HBaseColumnValueLookUpBolt implements IBasicBolt { private static final long serialVersionUID = 1L; private String tableName = null, colFamilyName = null, colName = null, rowKeyField = null, columnValue = null; private static transient HBaseConnector connector = null; private static transient HBaseConfiguration conf = null; private static transient HBaseCommunicator communicator = null; OutputCollector _collector; /* * Constructor initializes the variables storing the hbase table information and connects to hbase */ public HBaseColumnValueLookUpBolt(final String hbaseXmlLocation, final String rowKeyField, final String tableName, final String colFamilyName, final String colName) { this.tableName = tableName; this.colFamilyName = colFamilyName; this.colName = colName; this.rowKeyField = rowKeyField; connector = new HBaseConnector(); conf = connector.getHBaseConf(hbaseXmlLocation); communicator = new HBaseCommunicator(conf); } /* * emits the value of the column with name @colName and rowkey @rowKey * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector) */ public void execute(Tuple input, BasicOutputCollector collector) { String rowKey = input.getStringByField(this.rowKeyField); //通過指定我們的 表名,行鍵,列族,列名,直接通過communitor拿到列的值。 columnValue = communicator.getColEntry(this.tableName, rowKey, this.colFamilyName, this.colName); collector.emit(tuple(rowKey, columnValue)); } public void prepare(Map confMap, TopologyContext context, OutputCollector collector) { _collector = collector; } public void cleanup() { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("rowKey", "columnValue")); } public MapgetComponentConfiguration() { Map map = null; return map; } public void prepare(Map stormConf, TopologyContext context) { } }
Rowkey
package storm.contrib.hbase.spouts; import backtype.storm.topology.OutputFieldsDeclarer; import java.util.Map; import java.util.UUID; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.Random; import org.apache.log4j.Logger; /* 這個(gè)Spout主要是用來發(fā)射 Hbase的RowKey,rowkey的集合為自己設(shè)置的。 * Spout emitting tuples containing the rowkey of the hbase table */ public class RowKeyEmitterSpout implements IRichSpout { private static final long serialVersionUID = 6814162766489261607L; public static Logger LOG = Logger.getLogger(RowKeyEmitterSpout.class); boolean _isDistributed; SpoutOutputCollector _collector; public RowKeyEmitterSpout() { this(true); } public RowKeyEmitterSpout(boolean isDistributed) { _isDistributed = isDistributed; } public boolean isDistributed() { return _isDistributed; } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } public void close() { } public void nextTuple() { Utils.sleep(100); Thread.yield(); final String[] words = new String[] {"rowKey1", "rowKey2", "rowKey3", "rowKey4"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word), UUID.randomUUID()); } public void ack(Object msgId) { } public void fail(Object msgId) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } public void activate() { } public void deactivate() { } public MapgetComponentConfiguration() { return null; } }
// 我們用來簡單的測試系統(tǒng)的代碼,測試接口是否正確
package storm.contrib.hbase.spouts; import java.util.Map; import java.util.Random; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; public class TestSpout implements IRichSpout { SpoutOutputCollector _collector; Random _rand; int count = 0; public boolean isDistributed() { return true; } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); } public void nextTuple() { Utils.sleep(1000); String[] words = new String[] { "hello", "tiwari", "indore", "jayati"}; Integer[] numbers = new Integer[] { 1,2,3,4,5 }; if(count == numbers.length -1) { count = 0; } count ++; int number = numbers[count]; String word = words[count]; int randomNum = (int) (Math.random()*1000); _collector.emit(new Values(word, number)); } public void close() { } public void ack(Object id) { } public void fail(Object id) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "number")); } public void activate() { } public void deactivate() { } public MapgetComponentConfiguration() { return null; } }
比較簡單,也就不做解釋了,Storm-hbase的接口并沒有像Storm-kafka的接口那樣,自身去處理輪詢,自身去處理連接的問題。只是簡單的構(gòu)造了一個(gè)Hbase的連接,在連接的過程之中,直接構(gòu)造了一個(gè)Connector就可以了。
以上是“Storm-Hbase接口怎么用”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!