本篇文章給大家分享的是有關(guān)如何進(jìn)行storm1.1.3與kafka1.0.0整合,小編覺得挺實用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
創(chuàng)新互聯(lián)堅持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:成都做網(wǎng)站、網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時代的會同網(wǎng)站設(shè)計、移動媒體設(shè)計的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!
package hgs.core.sk; import java.util.Map; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.kafka.BrokerHosts; import org.apache.storm.kafka.KafkaSpout; import org.apache.storm.kafka.SpoutConfig; import org.apache.storm.kafka.ZkHosts; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; @SuppressWarnings("deprecation") public class StormKafkaMainTest { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); //zookeeper鏈接地址 BrokerHosts hosts = new ZkHosts("bigdata01:2181,bigdata02:2181,bigdata03:2181"); //KafkaSpout需要一個config,參數(shù)代表的意義1:zookeeper鏈接,2:消費kafka的topic,3,4:記錄消費offset的zookeeper地址 ,這里會保存在 zookeeper //集群的/test7/consume下面 SpoutConfig sconfig = new SpoutConfig(hosts, "test7", "/test7", "consume"); //消費的時候忽略offset從頭開始消費,這里可以注釋掉,因為消費的offset在zookeeper中可以找到 sconfig.ignoreZkOffsets=true; //sconfig.scheme = new SchemeAsMultiScheme( new StringScheme() ); builder.setSpout("kafkaspout", new KafkaSpout(sconfig), 1); builder.setBolt("mybolt1", new MyboltO(), 1).shuffleGrouping("kafkaspout"); Config config = new Config(); config.setNumWorkers(1); try { StormSubmitter.submitTopology("storm----kafka--test", config, builder.createTopology()); } catch (Exception e) { e.printStackTrace(); } /* LocalCluster cu = new LocalCluster(); cu.submitTopology("test", config, builder.createTopology());*/ } } class MyboltO extends BaseRichBolt{ private static final long serialVersionUID = 1L; OutputCollector collector = null; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { //這里把消息大一出來,在對應(yīng)的woker下面的日志可以找到打印的內(nèi)容 //因為得到的內(nèi)容是byte數(shù)組,所以需要轉(zhuǎn)換 String out = new String((byte[])input.getValue(0)); System.out.println(out); collector.ack(input); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
pom.xml文件的依賴
4.0.0 hgs core.sk 1.0.0-SNAPSHOT jar core.sk http://maven.apache.org UTF-8 junit junit 3.8.1 test org.apache.storm storm-kafka 1.1.3 org.apache.storm storm-core 1.1.3 provided org.apache.kafka kafka_2.11 1.0.0 org.slf4j slf4j-log4j12 org.apache.zookeeper zookeeper org.clojure clojure 1.7.0 org.apache.kafka kafka-clients 1.0.0 maven-assembly-plugin 2.2 hgs.core.sk.StormKafkaMainTest jar-with-dependencies make-assembly package single org.apache.maven.plugins maven-compiler-plugin 1.8
以上就是如何進(jìn)行storm1.1.3與kafka1.0.0整合,小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降摹OM隳芡ㄟ^這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。