真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Storm-Hbase接口怎么用

這篇文章主要介紹Storm-Hbase接口怎么用,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!

10余年的長洲網(wǎng)站建設(shè)經(jīng)驗(yàn),針對設(shè)計(jì)、前端、開發(fā)、售后、文案、推廣等六對一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。營銷型網(wǎng)站的優(yōu)勢是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動調(diào)整長洲建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。創(chuàng)新互聯(lián)從事“長洲網(wǎng)站設(shè)計(jì)”,“長洲網(wǎng)站推廣”以來,每個(gè)客戶項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。

package storm.contrib.hbase.bolts;

import static backtype.storm.utils.Utils.tuple;

import java.util.Map;

import org.apache.hadoop.hbase.HBaseConfiguration;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

import storm.contrib.hbase.utils.HBaseCommunicator;
import storm.contrib.hbase.utils.HBaseConnector;

/*

   一個(gè)讀取Hbase的Bolt,不斷的從Hbase中讀取表中的行KEY,和列,通過tuples來發(fā)送
 * Reads the specified column of HBase table and emits the row key and the column values in the form of tuples
 */
public class HBaseColumnValueLookUpBolt implements IBasicBolt {
	
	private static final long serialVersionUID = 1L;
	
	private String tableName = null, colFamilyName = null, colName = null, rowKeyField = null, columnValue = null;
	
	private static transient HBaseConnector connector = null;
	private static transient HBaseConfiguration conf = null;
	private static transient HBaseCommunicator communicator = null;
	OutputCollector _collector;

	/*
	 * Constructor initializes the variables storing the hbase table information and connects to hbase
	 */
	public HBaseColumnValueLookUpBolt(final String hbaseXmlLocation, final String rowKeyField, final String tableName, final String colFamilyName, final String colName) {

		this.tableName = tableName;
		this.colFamilyName = colFamilyName;
		this.colName = colName;
		this.rowKeyField = rowKeyField;

		connector = new HBaseConnector();
		conf = connector.getHBaseConf(hbaseXmlLocation);
		communicator = new HBaseCommunicator(conf);
	}

	/*
	 * emits the value of the column with name @colName and rowkey @rowKey
	 * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector)
	 */
	public void execute(Tuple input, BasicOutputCollector collector) {

		String rowKey = input.getStringByField(this.rowKeyField);
		columnValue = communicator.getColEntry(this.tableName, rowKey, this.colFamilyName, this.colName);
		collector.emit(tuple(rowKey, columnValue));
	}

	public void prepare(Map confMap, TopologyContext context,
			OutputCollector collector) {
		_collector = collector;
	}

	public void cleanup() {
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("rowKey", "columnValue"));
	}

	public Map getComponentConfiguration() {
		Map map = null;
		return map;
	}

	public void prepare(Map stormConf, TopologyContext context) {
	}
}
package storm.contrib.hbase.bolts;

import static backtype.storm.utils.Utils.tuple;

import java.util.Map;

import org.apache.hadoop.hbase.HBaseConfiguration;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

import storm.contrib.hbase.utils.HBaseCommunicator;
import storm.contrib.hbase.utils.HBaseConnector;

/*
 * Reads the specified column of HBase table and emits the row key and the column values in the form of tuples
 */
public class HBaseColumnValueLookUpBolt implements IBasicBolt {
	
	private static final long serialVersionUID = 1L;
	
	private String tableName = null, colFamilyName = null, colName = null, rowKeyField = null, columnValue = null;
	
	private static transient HBaseConnector connector = null;
	private static transient HBaseConfiguration conf = null;
	private static transient HBaseCommunicator communicator = null;
	OutputCollector _collector;

	/*
	 * Constructor initializes the variables storing the hbase table information and connects to hbase
	 */
	public HBaseColumnValueLookUpBolt(final String hbaseXmlLocation, final String rowKeyField, final String tableName, final String colFamilyName, final String colName) {

		this.tableName = tableName;
		this.colFamilyName = colFamilyName;
		this.colName = colName;
		this.rowKeyField = rowKeyField;

		connector = new HBaseConnector();
		conf = connector.getHBaseConf(hbaseXmlLocation);
		communicator = new HBaseCommunicator(conf);
	}

	/*
	 * emits the value of the column with name @colName and rowkey @rowKey
	 * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector)
	 */
	public void execute(Tuple input, BasicOutputCollector collector) {

		String rowKey = input.getStringByField(this.rowKeyField);
	
		//通過指定我們的 表名,行鍵,列族,列名,直接通過communitor拿到列的值。
		columnValue = communicator.getColEntry(this.tableName, rowKey, this.colFamilyName, this.colName);
		collector.emit(tuple(rowKey, columnValue));
	}

	public void prepare(Map confMap, TopologyContext context,
			OutputCollector collector) {
		_collector = collector;
	}

	public void cleanup() {
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("rowKey", "columnValue"));
	}

	public Map getComponentConfiguration() {
		Map map = null;
		return map;
	}

	public void prepare(Map stormConf, TopologyContext context) {
	}
}

   Rowkey 

package storm.contrib.hbase.spouts;

import backtype.storm.topology.OutputFieldsDeclarer;
import java.util.Map;
import java.util.UUID;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.Random;
import org.apache.log4j.Logger;

/*

這個(gè)Spout主要是用來發(fā)射 Hbase的RowKey,rowkey的集合為自己設(shè)置的。
 * Spout emitting tuples containing the rowkey of the hbase table
 */
public class RowKeyEmitterSpout implements IRichSpout {
    
	private static final long serialVersionUID = 6814162766489261607L;
	public static Logger LOG = Logger.getLogger(RowKeyEmitterSpout.class);
    boolean _isDistributed;
    SpoutOutputCollector _collector;

    public RowKeyEmitterSpout() {
        this(true);
    }

    public RowKeyEmitterSpout(boolean isDistributed) {
        _isDistributed = isDistributed;
    }
    
    public boolean isDistributed() {
        return _isDistributed;
    }
    
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
    }
    
    public void close() {
    }
        
    public void nextTuple() {
        Utils.sleep(100);
        Thread.yield();
        final String[] words = new String[] {"rowKey1", "rowKey2", "rowKey3", "rowKey4"};
        final Random rand = new Random();
        final String word = words[rand.nextInt(words.length)];
        _collector.emit(new Values(word), UUID.randomUUID());
    }
    
    public void ack(Object msgId) {
    }

    public void fail(Object msgId) {
    }
    
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

	public void activate() {
	}

	public void deactivate() {
	}

	public Map getComponentConfiguration() {
		return null;
	}
}

//  我們用來簡單的測試系統(tǒng)的代碼,測試接口是否正確

package storm.contrib.hbase.spouts;

import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class TestSpout implements IRichSpout {

	SpoutOutputCollector _collector;
	Random _rand;  
	int count = 0;


	public boolean isDistributed() {
		return true;
	}

	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		_collector = collector;
		_rand = new Random();
	}

	public void nextTuple() {
		Utils.sleep(1000);
		String[] words = new String[] { "hello", "tiwari", "indore", "jayati"};
		Integer[] numbers = new Integer[] {
				1,2,3,4,5
		};

		if(count == numbers.length -1) {
			count = 0;
		}
		count ++;
		int number = numbers[count];
		String word = words[count];
		int randomNum = (int) (Math.random()*1000);
		_collector.emit(new Values(word, number));
	}


	public void close() {        
	}


	public void ack(Object id) {
	}

	public void fail(Object id) {
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word", "number"));
	}

	public void activate() {
	}

	public void deactivate() {
	}

	public Map getComponentConfiguration() {
		return null;
	}

}

    比較簡單,也就不做解釋了,Storm-hbase的接口并沒有像Storm-kafka的接口那樣,自身去處理輪詢,自身去處理連接的問題。只是簡單的構(gòu)造了一個(gè)Hbase的連接,在連接的過程之中,直接構(gòu)造了一個(gè)Connector就可以了。 

以上是“Storm-Hbase接口怎么用”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!


文章名稱:Storm-Hbase接口怎么用
標(biāo)題URL:http://weahome.cn/article/ihgjjs.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部