這篇文章主要講解了“如何使用kafka技術(shù)”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“如何使用kafka技術(shù)”吧!
恩施土家網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián),恩施土家網(wǎng)站設(shè)計制作,有大型網(wǎng)站制作公司豐富經(jīng)驗。已為恩施土家成百上千提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\外貿(mào)網(wǎng)站制作要多少錢,請找那個售后服務(wù)好的恩施土家做網(wǎng)站的公司定做!
1)在eclipse中創(chuàng)建一個java工程
2)在工程的根目錄創(chuàng)建一個lib文件夾
3)解壓kafka安裝包,將安裝包libs目錄下的jar包拷貝到工程的lib目錄下,并build path。
4)啟動zk和kafka集群,在kafka集群中打開一個消費者
[root@hadoop102 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first
package com.root.kafka;import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;public class OldProducer { @SuppressWarnings("deprecation") public static void main(String[] args) { Properties properties = new Properties(); properties.put("metadata.broker.list", "hadoop102:9092"); properties.put("request.required.acks", "1"); properties.put("serializer.class", "kafka.serializer.StringEncoder"); Producerproducer = new Producer (new ProducerConfig(properties)); KeyedMessage message = new KeyedMessage ("first", "hello world"); producer.send(message ); }}
package com.root.kafka;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;public class NewProducer { public static void main(String[] args) { Properties props = new Properties(); // Kafka服務(wù)端的主機名和端口號 props.put("bootstrap.servers", "hadoop103:9092"); // 等待所有副本節(jié)點的應(yīng)答 props.put("acks", "all"); // 消息發(fā)送最大嘗試次數(shù) props.put("retries", 0); // 一批消息處理大小 props.put("batch.size", 16384); // 請求延時 props.put("linger.ms", 1); // 發(fā)送緩存區(qū)內(nèi)存大小 props.put("buffer.memory", 33554432); // key序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producerproducer = new KafkaProducer<>(props); for (int i = 0; i < 50; i++) { producer.send(new ProducerRecord ("first", Integer.toString(i), "hello world-" + i)); } producer.close(); }}
package com.root.kafka;import java.util.Properties;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;public class CallBackProducer { public static void main(String[] args) {Properties props = new Properties(); // Kafka服務(wù)端的主機名和端口號 props.put("bootstrap.servers", "hadoop103:9092"); // 等待所有副本節(jié)點的應(yīng)答 props.put("acks", "all"); // 消息發(fā)送最大嘗試次數(shù) props.put("retries", 0); // 一批消息處理大小 props.put("batch.size", 16384); // 增加服務(wù)端請求延時 props.put("linger.ms", 1);// 發(fā)送緩存區(qū)內(nèi)存大小 props.put("buffer.memory", 33554432); // key序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducerkafkaProducer = new KafkaProducer<>(props); for (int i = 0; i < 50; i++) { kafkaProducer.send(new ProducerRecord ("first", "hello" + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (metadata != null) { System.err.println(metadata.partition() + "---" + metadata.offset()); } } }); } kafkaProducer.close(); }}
0)需求:將所有數(shù)據(jù)存儲到topic的第0號分區(qū)上
1)定義一個類實現(xiàn)Partitioner接口,重寫里面的方法(過時API)
package com.root.kafka;import java.util.Map;import kafka.producer.Partitioner;public class CustomPartitioner implements Partitioner { public CustomPartitioner() { super(); } @Override public int partition(Object key, int numPartitions) { // 控制分區(qū) return 0; }}
2)自定義分區(qū)(新API)
package com.root.kafka;import java.util.Map;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;public class CustomPartitioner implements Partitioner { @Override public void configure(Mapconfigs) { } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 控制分區(qū) return 0; } @Override public void close() { }}
3)在代碼中調(diào)用
package com.root.kafka;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;public class PartitionerProducer { public static void main(String[] args) { Properties props = new Properties(); // Kafka服務(wù)端的主機名和端口號 props.put("bootstrap.servers", "hadoop103:9092"); // 等待所有副本節(jié)點的應(yīng)答 props.put("acks", "all"); // 消息發(fā)送最大嘗試次數(shù) props.put("retries", 0); // 一批消息處理大小 props.put("batch.size", 16384); // 增加服務(wù)端請求延時 props.put("linger.ms", 1); // 發(fā)送緩存區(qū)內(nèi)存大小 props.put("buffer.memory", 33554432); // key序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 自定義分區(qū) props.put("partitioner.class", "com.root.kafka.CustomPartitioner"); Producerproducer = new KafkaProducer<>(props); producer.send(new ProducerRecord ("first", "1", "root")); producer.close(); }}
4)測試
(1)在hadoop102上監(jiān)控/opt/module/kafka/logs/目錄下first主題3個分區(qū)的log日志動態(tài)變化情況
[root@hadoop102 first-0]$ tail -f 00000000000000000000.log[root@hadoop102 first-1]$ tail -f 00000000000000000000.log[root@hadoop102 first-2]$ tail -f 00000000000000000000.log
(2)發(fā)現(xiàn)數(shù)據(jù)都存儲到指定的分區(qū)了。
0)在控制臺創(chuàng)建發(fā)送者
[root@hadoop104 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
>hello world
1)創(chuàng)建消費者(過時API)
package com.root.kafka.consume;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;public class CustomConsumer { @SuppressWarnings("deprecation") public static void main(String[] args) { Properties properties = new Properties(); properties.put("zookeeper.connect", "hadoop102:2181"); properties.put("group.id", "g1"); properties.put("zookeeper.session.timeout.ms", "500"); properties.put("zookeeper.sync.time.ms", "250"); properties.put("auto.commit.interval.ms", "1000"); // 創(chuàng)建消費者連接器 ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); HashMaptopicCount = new HashMap<>(); topicCount.put("first", 1); Map >> consumerMap = consumer.createMessageStreams(topicCount); KafkaStream stream = consumerMap.get("first").get(0); ConsumerIterator it = stream.iterator(); while (it.hasNext()) { System.out.println(new String(it.next().message())); } }}
2)官方提供案例(自動維護消費情況)(新API)
package com.root.kafka.consume; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class CustomNewConsumer { public static void main(String[] args) { Properties props = new Properties(); // 定義kakfa 服務(wù)的地址,不需要將所有broker指定上 props.put("bootstrap.servers", "hadoop102:9092"); // 制定consumer group props.put("group.id", "test"); // 是否自動確認offset props.put("enable.auto.commit", "true"); // 自動確認offset的時間間隔 props.put("auto.commit.interval.ms", "1000"); // key的序列化類 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的序列化類 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 定義consumer KafkaConsumerconsumer = new KafkaConsumer<>(props); // 消費者訂閱的topic, 可同時訂閱多個 consumer.subscribe(Arrays.asList("first", "second","third")); while (true) { // 讀取數(shù)據(jù),讀取超時時間為100ms ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }`
Producer攔截器(interceptor)是在Kafka 0.10版本被引入的,主要用于實現(xiàn)clients端的定制化控制邏輯。
對于producer而言,interceptor使得用戶在消息發(fā)送前以及producer回調(diào)邏輯前有機會對消息做一些定制化需求,比如修改消息等。同時,producer允許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)。Intercetpor的實現(xiàn)接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括:
(1)configure(configs)
獲取配置信息和初始化數(shù)據(jù)時調(diào)用。
(2)onSend(ProducerRecord):
該方法封裝進KafkaProducer.send方法中,即它運行在用戶主線程中。Producer確保在消息被序列化以及計算分區(qū)前調(diào)用該方法。用戶可以在該方法中對消息做任何操作,但最好保證不要修改消息所屬的topic和分區(qū),否則會影響目標(biāo)分區(qū)的計算
(3)onAcknowledgement(RecordMetadata, Exception):
該方法會在消息被應(yīng)答或消息發(fā)送失敗時調(diào)用,并且通常都是在producer回調(diào)邏輯觸發(fā)之前。onAcknowledgement運行在producer的IO線程中,因此不要在該方法中放入很重的邏輯,否則會拖慢producer的消息發(fā)送效率
(4)close:
關(guān)閉interceptor,主要用于執(zhí)行一些資源清理工作
如前所述,interceptor可能被運行在多個線程中,因此在具體實現(xiàn)時用戶需要自行確保線程安全。另外倘若指定了多個interceptor,則producer將按照指定順序調(diào)用它們,并僅僅是捕獲每個interceptor可能拋出的異常記錄到錯誤日志中而非在向上傳遞。這在使用過程中要特別留意。
1)需求:
實現(xiàn)一個簡單的雙interceptor組成的攔截鏈。第一個interceptor會在消息發(fā)送前將時間戳信息加到消息value的最前部;第二個interceptor會在消息發(fā)送后更新成功發(fā)送消息數(shù)或失敗發(fā)送消息數(shù)。
2)案例實操
(1)增加時間戳攔截器
package com.root.kafka.interceptor; import java.util.Map; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TimeInterceptor implements ProducerInterceptor{ @Override public void configure(Map configs) { } @Override public ProducerRecord onSend(ProducerRecord record) { // 創(chuàng)建一個新的record,把時間戳寫入消息體的最前部 return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + ",">(2)統(tǒng)計發(fā)送消息成功和發(fā)送失敗消息數(shù),并在producer關(guān)閉時打印這兩個計數(shù)器package com.root.kafka.interceptor; import java.util.Map; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class CounterInterceptor implements ProducerInterceptor { private int errorCounter = 0; private int successCounter = 0; @Override public void configure(Map configs) { } @Override public ProducerRecord onSend(ProducerRecord record) { return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { // 統(tǒng)計成功和失敗的次數(shù) if (exception == null) { successCounter++; } else { errorCounter++; } } @Override public void close() { // 保存結(jié)果 System.out.println("Successful sent: " + successCounter); System.out.println("Failed sent: " + errorCounter); } } (3)producer主程序package com.root.kafka.interceptor; import java.util.ArrayList; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; public class InterceptorProducer { public static void main(String[] args) throws Exception { // 1 設(shè)置配置信息 Properties props = new Properties(); props.put("bootstrap.servers", "hadoop102:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2 構(gòu)建攔截鏈 List interceptors = new ArrayList<>(); interceptors.add("com.root.kafka.interceptor.TimeInterceptor"); interceptors.add("com.root.kafka.interceptor.CounterInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); String topic = "first"; Producer producer = new KafkaProducer<>(props); // 3 發(fā)送消息 for (int i = 0; i < 10; i++) { ProducerRecord record = new ProducerRecord<>(topic, "message" + i); producer.send(record); } // 4 一定要關(guān)閉producer,這樣才會調(diào)用interceptor的close方法 producer.close(); } } 3)測試(1)在kafka上啟動消費者,然后運行客戶端java程序。[root@hadoop102 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --from-beginning --topic first 1501904047034,message0 1501904047225,message1 1501904047230,message2 1501904047234,message3 1501904047236,message4 1501904047240,message5 1501904047243,message6 1501904047246,message7 1501904047249,message8 1501904047252,message9 (2)觀察java平臺控制臺輸出數(shù)據(jù)如下:Successful sent: 10 Failed sent: 0 kafka Streams 概述 Kafka StreamsKafka Streams。Apache Kafka開源項目的一個組成部分。是一個功能強大,易于使用的庫。用于在Kafka上構(gòu)建高可分布式、拓展性,容錯的應(yīng)用程序。 Kafka Streams特點1)功能強大高擴展性,彈性,容錯2)輕量級無需專門的集群一個庫,而不是框架3)完全集成100%的Kafka 0.10.0版本兼容易于集成到現(xiàn)有的應(yīng)用程序4)實時性毫秒級延遲并非微批處理窗口允許亂序數(shù)據(jù)允許遲到數(shù)據(jù) 為什么要有Kafka Stream當(dāng)前已經(jīng)有非常多的流式處理系統(tǒng),最知名且應(yīng)用最多的開源流式處理系統(tǒng)有Spark Streaming和Apache Storm。Apache Storm發(fā)展多年,應(yīng)用廣泛,提供記錄級別的處理能力,當(dāng)前也支持SQL on Stream。而Spark Streaming基于Apache Spark,可以非常方便與圖計算,SQL處理等集成,功能強大,對于熟悉其它Spark應(yīng)用開發(fā)的用戶而言使用門檻低。另外,目前主流的Hadoop發(fā)行版,如Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。既然Apache Spark與Apache Storm擁有如此多的優(yōu)勢,那為何還需要Kafka Stream呢?主要有如下原因。第一,Spark和Storm都是流式處理框架,而Kafka Stream提供的是一個基于Kafka的流式處理類庫??蚣芤箝_發(fā)者按照特定的方式去開發(fā)邏輯部分,供框架調(diào)用。開發(fā)者很難了解框架的具體運行方式,從而使得調(diào)試成本高,并且使用受限。而Kafka Stream作為流式處理類庫,直接提供具體的類給開發(fā)者調(diào)用,整個應(yīng)用的運行方式主要由開發(fā)者控制,方便使用和調(diào)試。 第二,雖然Cloudera與Hortonworks方便了Storm和Spark的部署,但是這些框架的部署仍然相對復(fù)雜。而Kafka Stream作為類庫,可以非常方便的嵌入應(yīng)用程序中,它對應(yīng)用的打包和部署基本沒有任何要求。第三,就流式處理系統(tǒng)而言,基本都支持Kafka作為數(shù)據(jù)源。例如Storm具有專門的kafka-spout,而Spark也提供專門的spark-streaming-kafka模塊。事實上,Kafka基本上是主流的流式處理系統(tǒng)的標(biāo)準(zhǔn)數(shù)據(jù)源。換言之,大部分流式系統(tǒng)中都已部署了Kafka,此時使用Kafka Stream的成本非常低。第四,使用Storm或Spark Streaming時,需要為框架本身的進程預(yù)留資源,如Storm的supervisor和Spark on YARN的node manager。即使對于應(yīng)用實例而言,框架本身也會占用部分資源,如Spark Streaming需要為shuffle和storage預(yù)留內(nèi)存。但是Kafka作為類庫不占用系統(tǒng)資源。第五,由于Kafka本身提供數(shù)據(jù)持久化,因此Kafka Stream提供滾動部署和滾動升級以及重新計算的能力。第六,由于Kafka Consumer Rebalance機制,Kafka Stream可以在線動態(tài)調(diào)整并行度。 Kafka Stream數(shù)據(jù)清洗案例0)需求:實時處理單詞帶有”>>>”前綴的內(nèi)容。例如輸入”root>>>ximenqing”,最終處理成“ximenqing”1)需求分析: 2)案例實操(1)創(chuàng)建一個工程,并添加jar包(2)創(chuàng)建主類package com.root.kafka.stream; import java.util.Properties; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.TopologyBuilder; public class Application { public static void main(String[] args) { // 定義輸入的topic String from = "first"; // 定義輸出的topic String to = "second"; // 設(shè)置參數(shù) Properties settings = new Properties(); settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter"); settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); StreamsConfig config = new StreamsConfig(settings); // 構(gòu)建拓撲 TopologyBuilder builder = new TopologyBuilder(); builder.addSource("SOURCE", from) .addProcessor("PROCESS", new ProcessorSupplier () { @Override public Processor get() { // 具體分析處理 return new LogProcessor(); } }, "SOURCE") .addSink("SINK", to, "PROCESS"); // 創(chuàng)建kafka stream KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); } } (3)具體業(yè)務(wù)處理package com.root.kafka.stream;import org.apache.kafka.streams.processor.Processor;import org.apache.kafka.streams.processor.ProcessorContext;public class LogProcessor implements Processor { private ProcessorContext context; @Override public void init(ProcessorContext context) { this.context = context; } @Override public void process(byte[] key, byte[] value) { String input = new String(value); // 如果包含“>>>”則只保留該標(biāo)記后面的內(nèi)容 if (input.contains(">>>")) { input = input.split(">>>")[1].trim(); // 輸出到下一個topic context.forward("logProcessor".getBytes(), input.getBytes()); }else{ context.forward("logProcessor".getBytes(), input.getBytes()); } } @Override public void punctuate(long timestamp) { } @Override public void close() { }} (4)運行程序(5)在hadoop104上啟動生產(chǎn)者[root@hadoop104 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first>hello>>>world>h>>>root>hahaha (6)在hadoop103上啟動消費者[root@hadoop103 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --from-beginning --topic secondworldroothahaha
感謝各位的閱讀,以上就是“如何使用kafka技術(shù)”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對如何使用kafka技術(shù)這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!