這篇文章將為大家詳細(xì)講解有關(guān)Kafka簡(jiǎn)單客戶端編程的示例分析,小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。
成都創(chuàng)新互聯(lián)專注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務(wù),包含不限于成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站制作、鄉(xiāng)寧網(wǎng)絡(luò)推廣、小程序設(shè)計(jì)、鄉(xiāng)寧網(wǎng)絡(luò)營(yíng)銷、鄉(xiāng)寧企業(yè)策劃、鄉(xiāng)寧品牌公關(guān)、搜索引擎seo、人物專訪、企業(yè)宣傳片、企業(yè)代運(yùn)營(yíng)等,從售前售中售后,我們都將竭誠(chéng)為您服務(wù),您的肯定,是我們最大的嘉獎(jiǎng);成都創(chuàng)新互聯(lián)為所有大學(xué)生創(chuàng)業(yè)者提供鄉(xiāng)寧建站搭建服務(wù),24小時(shí)服務(wù)熱線:028-86922220,官方網(wǎng)址:www.cdcxhl.com
一、創(chuàng)建配置類Config
這個(gè)類很簡(jiǎn)單,只是存放了兩個(gè)常量,一個(gè)是話題TOPIC,一個(gè)是線程數(shù)THREADS
package com.lya.kafka; /** * 配置項(xiàng) * @author liuyazhuang * */ public class Config { /** * 話題 */ public static final String TOPIC = "wordcount"; /** * 線程數(shù) */ public static final Integer THREADS = 1; }
二、編程生產(chǎn)者類ProducerDemo
這個(gè)類的主要作用就是向Kafka寫入相應(yīng)的消息,并且將消息寫入wordcount話題。
package com.lya.kafka; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * 生產(chǎn)者實(shí)例 * @author liuyazhuang * */ public class ProducerDemo { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("zk.connect", "192.168.209.121:2181"); props.put("metadata.broker.list","192.168.209.121:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("zk.connectiontimeout.ms", "15000"); ProducerConfig config = new ProducerConfig(props); Producerproducer = new Producer (config); // 發(fā)送業(yè)務(wù)消息 // 讀取文件 讀取內(nèi)存數(shù)據(jù)庫 讀socket端口 for (int i = 1; i <= 100; i++) { Thread.sleep(500); producer.send(new KeyedMessage (Config.TOPIC, "this number ===>>> " + i)); } } }
三、編寫消息者類ConsumerDemo
這個(gè)類的主要作用就是消費(fèi)Kafka中wordcount話題的消息。
package com.lya.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; /** * 消費(fèi)者實(shí)例 * @author liuyazhuang * */ public class ConsumerDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("zookeeper.connect", "192.168.209.121:2181"); props.put("group.id", "1111"); props.put("auto.offset.reset", "smallest"); props.put("zk.connectiontimeout.ms", "15000"); ConsumerConfig config = new ConsumerConfig(props); ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config); MaptopicCountMap = new HashMap (); topicCountMap.put(Config.TOPIC, Config.THREADS); Map >> consumerMap = consumer.createMessageStreams(topicCountMap); List > streams = consumerMap.get(Config.TOPIC); for(final KafkaStream kafkaStream : streams){ new Thread(new Runnable() { @Override public void run() { for(MessageAndMetadata mm : kafkaStream){ String msg = new String(mm.message()); System.out.println(msg); } } }).start(); } } }
四、運(yùn)行實(shí)例
首先,運(yùn)行消費(fèi)者類ConsumerDemo
運(yùn)行結(jié)果如下:
沒有打印任何信息。
此時(shí),我們運(yùn)行生產(chǎn)者類ProducerDemo
我們?cè)俅未蜷_消費(fèi)者的控制臺(tái)查看如下:
打印出了生產(chǎn)者生產(chǎn)的消息。
至此,Kafka簡(jiǎn)單客戶端編程實(shí)例結(jié)束。
關(guān)于“Kafka簡(jiǎn)單客戶端編程的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。