真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

如何進(jìn)行storm1.1.3與kafka1.0.0整合

本篇文章給大家分享的是有關(guān)如何進(jìn)行storm1.1.3與kafka1.0.0整合,小編覺得挺實用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

創(chuàng)新互聯(lián)堅持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:成都做網(wǎng)站、網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時代的會同網(wǎng)站設(shè)計、移動媒體設(shè)計的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!

package hgs.core.sk;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
@SuppressWarnings("deprecation")
public class StormKafkaMainTest {
	
	public static void main(String[] args) {
		TopologyBuilder builder = new TopologyBuilder();
		//zookeeper鏈接地址
		BrokerHosts hosts = new ZkHosts("bigdata01:2181,bigdata02:2181,bigdata03:2181");
		//KafkaSpout需要一個config,參數(shù)代表的意義1:zookeeper鏈接,2:消費kafka的topic,3,4:記錄消費offset的zookeeper地址 ,這里會保存在 zookeeper
		//集群的/test7/consume下面
		SpoutConfig sconfig = new SpoutConfig(hosts, "test7", "/test7", "consume");
		//消費的時候忽略offset從頭開始消費,這里可以注釋掉,因為消費的offset在zookeeper中可以找到
		sconfig.ignoreZkOffsets=true;
		//sconfig.scheme = new SchemeAsMultiScheme( new StringScheme() );
		builder.setSpout("kafkaspout", new KafkaSpout(sconfig), 1);
		builder.setBolt("mybolt1", new MyboltO(), 1).shuffleGrouping("kafkaspout");
		
     	Config config = new Config();
     	config.setNumWorkers(1);
     	try {
			StormSubmitter.submitTopology("storm----kafka--test", config, builder.createTopology());
		} catch (Exception e) {
			e.printStackTrace();
		}
     	
 /*    	LocalCluster cu  = new LocalCluster();
     	cu.submitTopology("test", config, builder.createTopology());*/
	}
}
class  MyboltO extends  BaseRichBolt{
	private static final long serialVersionUID = 1L;
	OutputCollector collector = null;
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
	}
	public void execute(Tuple input) {
		//這里把消息大一出來,在對應(yīng)的woker下面的日志可以找到打印的內(nèi)容
		//因為得到的內(nèi)容是byte數(shù)組,所以需要轉(zhuǎn)換
		String out = new String((byte[])input.getValue(0));
		System.out.println(out);
		collector.ack(input);
		
	}
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		
	}
	
	
}
pom.xml文件的依賴

  4.0.0
  hgs
  core.sk
  1.0.0-SNAPSHOT
  jar
  core.sk
  http://maven.apache.org
  
    UTF-8
  
  
    
      junit
      junit
      3.8.1
      test
    
    
    
    	org.apache.storm
    	storm-kafka
    	1.1.3
	
	
  		org.apache.storm
 		 storm-core
  		1.1.3
  		provided
	
	
    	org.apache.kafka
    	kafka_2.11
    	1.0.0
    
    		
          		org.slf4j
          		slf4j-log4j12
        	
        	
            	org.apache.zookeeper
            	zookeeper
       		
    	
	
	


	
	
	    org.clojure
	    clojure
	    1.7.0
	
	
	
	    org.apache.kafka
	    kafka-clients
	    1.0.0
	
	
 
  
  
  
  
        
            
                maven-assembly-plugin
                2.2
                
                    
                        
                            
                            hgs.core.sk.StormKafkaMainTest
                        
                    
                    
                        
                            
                            jar-with-dependencies
                        
                    
                
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
            
             
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    1.8
                    1.8
                
            
        
    

以上就是如何進(jìn)行storm1.1.3與kafka1.0.0整合,小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降摹OM隳芡ㄟ^這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


本文題目:如何進(jìn)行storm1.1.3與kafka1.0.0整合
標(biāo)題路徑:http://weahome.cn/article/ggeioe.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部