真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

如何使用kafka技術(shù)

這篇文章主要講解了“如何使用kafka技術(shù)”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“如何使用kafka技術(shù)”吧!

恩施土家網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián),恩施土家網(wǎng)站設(shè)計制作,有大型網(wǎng)站制作公司豐富經(jīng)驗。已為恩施土家成百上千提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\外貿(mào)網(wǎng)站制作要多少錢,請找那個售后服務(wù)好的恩施土家做網(wǎng)站的公司定做!

環(huán)境準(zhǔn)備

1)在eclipse中創(chuàng)建一個java工程

2)在工程的根目錄創(chuàng)建一個lib文件夾

3)解壓kafka安裝包,將安裝包libs目錄下的jar包拷貝到工程的lib目錄下,并build path。

4)啟動zk和kafka集群,在kafka集群中打開一個消費者

[root@hadoop102 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first

Kafka生產(chǎn)者Java API

創(chuàng)建生產(chǎn)者(過時的API)

package com.root.kafka;import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;public class OldProducer {    @SuppressWarnings("deprecation")    public static void main(String[] args) {                Properties properties = new Properties();        properties.put("metadata.broker.list", "hadoop102:9092");        properties.put("request.required.acks", "1");        properties.put("serializer.class", "kafka.serializer.StringEncoder");                Producer producer = new Producer(new ProducerConfig(properties));                KeyedMessage message = new KeyedMessage("first", "hello world");        producer.send(message );    }}

4.2.2 創(chuàng)建生產(chǎn)者(新API**)

package com.root.kafka;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;public class NewProducer {    public static void main(String[] args) {                Properties props = new Properties();        // Kafka服務(wù)端的主機名和端口號        props.put("bootstrap.servers", "hadoop103:9092");        // 等待所有副本節(jié)點的應(yīng)答        props.put("acks", "all");        // 消息發(fā)送最大嘗試次數(shù)        props.put("retries", 0);        // 一批消息處理大小        props.put("batch.size", 16384);        // 請求延時        props.put("linger.ms", 1);        // 發(fā)送緩存區(qū)內(nèi)存大小        props.put("buffer.memory", 33554432);        // key序列化        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");        // value序列化        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");        Producer producer = new KafkaProducer<>(props);        for (int i = 0; i < 50; i++) {            producer.send(new ProducerRecord("first", Integer.toString(i), "hello world-" + i));        }        producer.close();    }}

創(chuàng)建生產(chǎn)者帶回調(diào)函數(shù)(新API)

package com.root.kafka;import java.util.Properties;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;public class CallBackProducer {    public static void main(String[] args) {Properties props = new Properties();        // Kafka服務(wù)端的主機名和端口號        props.put("bootstrap.servers", "hadoop103:9092");        // 等待所有副本節(jié)點的應(yīng)答        props.put("acks", "all");        // 消息發(fā)送最大嘗試次數(shù)        props.put("retries", 0);        // 一批消息處理大小        props.put("batch.size", 16384);        // 增加服務(wù)端請求延時        props.put("linger.ms", 1);// 發(fā)送緩存區(qū)內(nèi)存大小        props.put("buffer.memory", 33554432);        // key序列化        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");        // value序列化        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");        KafkaProducer kafkaProducer = new KafkaProducer<>(props);        for (int i = 0; i < 50; i++) {            kafkaProducer.send(new ProducerRecord("first", "hello" + i), new Callback() {                @Override                public void onCompletion(RecordMetadata metadata, Exception exception) {                    if (metadata != null) {                        System.err.println(metadata.partition() + "---" + metadata.offset());                    }                }            });        }        kafkaProducer.close();    }}

4.2.4 自定義分區(qū)生產(chǎn)者

0)需求:將所有數(shù)據(jù)存儲到topic的第0號分區(qū)上

1)定義一個類實現(xiàn)Partitioner接口,重寫里面的方法(過時API)

package com.root.kafka;import java.util.Map;import kafka.producer.Partitioner;public class CustomPartitioner implements Partitioner {    public CustomPartitioner() {        super();    }    @Override    public int partition(Object key, int numPartitions) {        // 控制分區(qū)        return 0;    }}

2)自定義分區(qū)(新API)

package com.root.kafka;import java.util.Map;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;public class CustomPartitioner implements Partitioner {    @Override    public void configure(Map configs) {            }    @Override    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {        // 控制分區(qū)        return 0;    }    @Override    public void close() {            }}

3)在代碼中調(diào)用

package com.root.kafka;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;public class PartitionerProducer {    public static void main(String[] args) {                Properties props = new Properties();        // Kafka服務(wù)端的主機名和端口號        props.put("bootstrap.servers", "hadoop103:9092");        // 等待所有副本節(jié)點的應(yīng)答        props.put("acks", "all");        // 消息發(fā)送最大嘗試次數(shù)        props.put("retries", 0);        // 一批消息處理大小        props.put("batch.size", 16384);        // 增加服務(wù)端請求延時        props.put("linger.ms", 1);        // 發(fā)送緩存區(qū)內(nèi)存大小        props.put("buffer.memory", 33554432);        // key序列化        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");        // value序列化        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");        // 自定義分區(qū)        props.put("partitioner.class", "com.root.kafka.CustomPartitioner");        Producer producer = new KafkaProducer<>(props);        producer.send(new ProducerRecord("first", "1", "root"));        producer.close();    }}

4)測試

(1)在hadoop102上監(jiān)控/opt/module/kafka/logs/目錄下first主題3個分區(qū)的log日志動態(tài)變化情況

[root@hadoop102 first-0]$ tail -f 00000000000000000000.log[root@hadoop102 first-1]$ tail -f 00000000000000000000.log[root@hadoop102 first-2]$ tail -f 00000000000000000000.log

(2)發(fā)現(xiàn)數(shù)據(jù)都存儲到指定的分區(qū)了。

Kafka消費者Java API

0)在控制臺創(chuàng)建發(fā)送者

[root@hadoop104 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first

>hello world

1)創(chuàng)建消費者(過時API)

package com.root.kafka.consume;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;public class CustomConsumer {    @SuppressWarnings("deprecation")    public static void main(String[] args) {        Properties properties = new Properties();                properties.put("zookeeper.connect", "hadoop102:2181");        properties.put("group.id", "g1");        properties.put("zookeeper.session.timeout.ms", "500");        properties.put("zookeeper.sync.time.ms", "250");        properties.put("auto.commit.interval.ms", "1000");                // 創(chuàng)建消費者連接器        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));                HashMap topicCount = new HashMap<>();        topicCount.put("first", 1);                Map>> consumerMap = consumer.createMessageStreams(topicCount);                KafkaStream stream = consumerMap.get("first").get(0);                ConsumerIterator it = stream.iterator();                while (it.hasNext()) {            System.out.println(new String(it.next().message()));        }    }}

2)官方提供案例(自動維護消費情況)(新API)

package com.root.kafka.consume;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class CustomNewConsumer {

	public static void main(String[] args) {

		Properties props = new Properties();
		// 定義kakfa 服務(wù)的地址,不需要將所有broker指定上 
		props.put("bootstrap.servers", "hadoop102:9092");
		// 制定consumer group 
		props.put("group.id", "test");
		// 是否自動確認offset 
		props.put("enable.auto.commit", "true");
		// 自動確認offset的時間間隔 
		props.put("auto.commit.interval.ms", "1000");
		// key的序列化類
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		// value的序列化類 
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		// 定義consumer 
		KafkaConsumer consumer = new KafkaConsumer<>(props);
		
		// 消費者訂閱的topic, 可同時訂閱多個 
		consumer.subscribe(Arrays.asList("first", "second","third"));

		while (true) {
			// 讀取數(shù)據(jù),讀取超時時間為100ms 
			ConsumerRecords records = consumer.poll(100);
			
			for (ConsumerRecord record : records)
				System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
		}
	}
}`

Kafka producer攔截器(interceptor)

攔截器原理

Producer攔截器(interceptor)是在Kafka 0.10版本被引入的,主要用于實現(xiàn)clients端的定制化控制邏輯。

對于producer而言,interceptor使得用戶在消息發(fā)送前以及producer回調(diào)邏輯前有機會對消息做一些定制化需求,比如修改消息等。同時,producer允許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)。Intercetpor的實現(xiàn)接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括:

(1)configure(configs)

獲取配置信息和初始化數(shù)據(jù)時調(diào)用。

(2)onSend(ProducerRecord):

該方法封裝進KafkaProducer.send方法中,即它運行在用戶主線程中。Producer確保在消息被序列化以及計算分區(qū)前調(diào)用該方法。用戶可以在該方法中對消息做任何操作,但最好保證不要修改消息所屬的topic和分區(qū),否則會影響目標(biāo)分區(qū)的計算

(3)onAcknowledgement(RecordMetadata, Exception):

該方法會在消息被應(yīng)答或消息發(fā)送失敗時調(diào)用,并且通常都是在producer回調(diào)邏輯觸發(fā)之前。onAcknowledgement運行在producer的IO線程中,因此不要在該方法中放入很重的邏輯,否則會拖慢producer的消息發(fā)送效率

(4)close:

關(guān)閉interceptor,主要用于執(zhí)行一些資源清理工作

如前所述,interceptor可能被運行在多個線程中,因此在具體實現(xiàn)時用戶需要自行確保線程安全。另外倘若指定了多個interceptor,則producer將按照指定順序調(diào)用它們,并僅僅是捕獲每個interceptor可能拋出的異常記錄到錯誤日志中而非在向上傳遞。這在使用過程中要特別留意。

攔截器案例

如何使用kafka技術(shù)

1)需求:

實現(xiàn)一個簡單的雙interceptor組成的攔截鏈。第一個interceptor會在消息發(fā)送前將時間戳信息加到消息value的最前部;第二個interceptor會在消息發(fā)送后更新成功發(fā)送消息數(shù)或失敗發(fā)送消息數(shù)。

2)案例實操

(1)增加時間戳攔截器

package com.root.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TimeInterceptor implements ProducerInterceptor {

	@Override
	public void configure(Map configs) {

	}

	@Override
	public ProducerRecord onSend(ProducerRecord record) {
		// 創(chuàng)建一個新的record,把時間戳寫入消息體的最前部
		return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
				System.currentTimeMillis() + ",">(2)統(tǒng)計發(fā)送消息成功和發(fā)送失敗消息數(shù),并在producer關(guān)閉時打印這兩個計數(shù)器package com.root.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class CounterInterceptor implements ProducerInterceptor{
    private int errorCounter = 0;
    private int successCounter = 0;

	@Override
	public void configure(Map configs) {
		
	}

	@Override
	public ProducerRecord onSend(ProducerRecord record) {
		 return record;
	}

	@Override
	public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
		// 統(tǒng)計成功和失敗的次數(shù)
        if (exception == null) {
            successCounter++;
        } else {
            errorCounter++;
        }
	}

	@Override
	public void close() {
        // 保存結(jié)果
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
	}
} (3)producer主程序package com.root.kafka.interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

public class InterceptorProducer {

	public static void main(String[] args) throws Exception {
		// 1 設(shè)置配置信息
		Properties props = new Properties();
		props.put("bootstrap.servers", "hadoop102:9092");
		props.put("acks", "all");
		props.put("retries", 0);
		props.put("batch.size", 16384);
		props.put("linger.ms", 1);
		props.put("buffer.memory", 33554432);
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		
		// 2 構(gòu)建攔截鏈
		List interceptors = new ArrayList<>();
		interceptors.add("com.root.kafka.interceptor.TimeInterceptor"); 	interceptors.add("com.root.kafka.interceptor.CounterInterceptor"); 
		props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
		 
		String topic = "first";
		Producer producer = new KafkaProducer<>(props);
		
		// 3 發(fā)送消息
		for (int i = 0; i < 10; i++) {
			
		    ProducerRecord record = new ProducerRecord<>(topic, "message" + i);
		    producer.send(record);
		}
		 
		// 4 一定要關(guān)閉producer,這樣才會調(diào)用interceptor的close方法
		producer.close();
	}
} 3)測試(1)在kafka上啟動消費者,然后運行客戶端java程序。[root@hadoop102 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --from-beginning --topic first
1501904047034,message0
1501904047225,message1
1501904047230,message2
1501904047234,message3
1501904047236,message4
1501904047240,message5
1501904047243,message6
1501904047246,message7
1501904047249,message8
1501904047252,message9 (2)觀察java平臺控制臺輸出數(shù)據(jù)如下:Successful sent: 10
Failed sent: 0  kafka Streams 概述 Kafka StreamsKafka Streams。Apache Kafka開源項目的一個組成部分。是一個功能強大,易于使用的庫。用于在Kafka上構(gòu)建高可分布式、拓展性,容錯的應(yīng)用程序。 Kafka Streams特點1)功能強大高擴展性,彈性,容錯2)輕量級無需專門的集群一個庫,而不是框架3)完全集成100%的Kafka 0.10.0版本兼容易于集成到現(xiàn)有的應(yīng)用程序4)實時性毫秒級延遲并非微批處理窗口允許亂序數(shù)據(jù)允許遲到數(shù)據(jù) 為什么要有Kafka Stream當(dāng)前已經(jīng)有非常多的流式處理系統(tǒng),最知名且應(yīng)用最多的開源流式處理系統(tǒng)有Spark Streaming和Apache Storm。Apache Storm發(fā)展多年,應(yīng)用廣泛,提供記錄級別的處理能力,當(dāng)前也支持SQL on Stream。而Spark Streaming基于Apache Spark,可以非常方便與圖計算,SQL處理等集成,功能強大,對于熟悉其它Spark應(yīng)用開發(fā)的用戶而言使用門檻低。另外,目前主流的Hadoop發(fā)行版,如Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。既然Apache Spark與Apache Storm擁有如此多的優(yōu)勢,那為何還需要Kafka Stream呢?主要有如下原因。第一,Spark和Storm都是流式處理框架,而Kafka Stream提供的是一個基于Kafka的流式處理類庫??蚣芤箝_發(fā)者按照特定的方式去開發(fā)邏輯部分,供框架調(diào)用。開發(fā)者很難了解框架的具體運行方式,從而使得調(diào)試成本高,并且使用受限。而Kafka Stream作為流式處理類庫,直接提供具體的類給開發(fā)者調(diào)用,整個應(yīng)用的運行方式主要由開發(fā)者控制,方便使用和調(diào)試。 第二,雖然Cloudera與Hortonworks方便了Storm和Spark的部署,但是這些框架的部署仍然相對復(fù)雜。而Kafka Stream作為類庫,可以非常方便的嵌入應(yīng)用程序中,它對應(yīng)用的打包和部署基本沒有任何要求。第三,就流式處理系統(tǒng)而言,基本都支持Kafka作為數(shù)據(jù)源。例如Storm具有專門的kafka-spout,而Spark也提供專門的spark-streaming-kafka模塊。事實上,Kafka基本上是主流的流式處理系統(tǒng)的標(biāo)準(zhǔn)數(shù)據(jù)源。換言之,大部分流式系統(tǒng)中都已部署了Kafka,此時使用Kafka Stream的成本非常低。第四,使用Storm或Spark Streaming時,需要為框架本身的進程預(yù)留資源,如Storm的supervisor和Spark on YARN的node manager。即使對于應(yīng)用實例而言,框架本身也會占用部分資源,如Spark Streaming需要為shuffle和storage預(yù)留內(nèi)存。但是Kafka作為類庫不占用系統(tǒng)資源。第五,由于Kafka本身提供數(shù)據(jù)持久化,因此Kafka Stream提供滾動部署和滾動升級以及重新計算的能力。第六,由于Kafka Consumer Rebalance機制,Kafka Stream可以在線動態(tài)調(diào)整并行度。 Kafka Stream數(shù)據(jù)清洗案例0)需求:實時處理單詞帶有”>>>”前綴的內(nèi)容。例如輸入”root>>>ximenqing”,最終處理成“ximenqing”1)需求分析: 2)案例實操(1)創(chuàng)建一個工程,并添加jar包(2)創(chuàng)建主類package com.root.kafka.stream;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;

public class Application {

	public static void main(String[] args) {

		// 定義輸入的topic
        String from = "first";
        // 定義輸出的topic
        String to = "second";

        // 設(shè)置參數(shù)
        Properties settings = new Properties();
        settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
        settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");

        StreamsConfig config = new StreamsConfig(settings);

        // 構(gòu)建拓撲
        TopologyBuilder builder = new TopologyBuilder();

        builder.addSource("SOURCE", from)
               .addProcessor("PROCESS", new ProcessorSupplier() {

					@Override
					public Processor get() {
						// 具體分析處理
						return new LogProcessor();
					}
				}, "SOURCE")
                .addSink("SINK", to, "PROCESS");

        // 創(chuàng)建kafka stream
        KafkaStreams streams = new KafkaStreams(builder, config);
        streams.start();
	}
} (3)具體業(yè)務(wù)處理package com.root.kafka.stream;import org.apache.kafka.streams.processor.Processor;import org.apache.kafka.streams.processor.ProcessorContext;public class LogProcessor implements Processor {        private ProcessorContext context;        @Override    public void init(ProcessorContext context) {        this.context = context;    }    @Override    public void process(byte[] key, byte[] value) {        String input = new String(value);                // 如果包含“>>>”則只保留該標(biāo)記后面的內(nèi)容        if (input.contains(">>>")) {            input = input.split(">>>")[1].trim();            // 輸出到下一個topic            context.forward("logProcessor".getBytes(), input.getBytes());        }else{            context.forward("logProcessor".getBytes(), input.getBytes());        }    }    @Override    public void punctuate(long timestamp) {            }    @Override    public void close() {            }} (4)運行程序(5)在hadoop104上啟動生產(chǎn)者[root@hadoop104 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first>hello>>>world>h>>>root>hahaha (6)在hadoop103上啟動消費者[root@hadoop103 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --from-beginning --topic secondworldroothahaha

感謝各位的閱讀,以上就是“如何使用kafka技術(shù)”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對如何使用kafka技術(shù)這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!


網(wǎng)站欄目:如何使用kafka技術(shù)
本文路徑:http://weahome.cn/article/iieejs.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部