Apache Kafka框架是怎樣的呢,相信很多沒有經(jīng)驗(yàn)的人對(duì)此束手無(wú)策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個(gè)問題。
網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)!專注于網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、微信小程序、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項(xiàng)目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了孝義免費(fèi)建站歡迎大家使用!
下面將對(duì)Kafka做一個(gè)簡(jiǎn)單的描述。
Kafka是Apache下的一個(gè)用于處理數(shù)據(jù)流的分布式消息框架,它擁有水平擴(kuò)展、容錯(cuò)、高效等特性,可以使用該框架來(lái)實(shí)現(xiàn)以下功能:
構(gòu)建在系統(tǒng)間進(jìn)行實(shí)時(shí)數(shù)據(jù)傳輸?shù)耐ǖ馈?/p>
構(gòu)建對(duì)數(shù)據(jù)流行轉(zhuǎn)換或響應(yīng)的實(shí)時(shí)應(yīng)用。
Kafka的整體結(jié)構(gòu)與RabbitMQ類似,消息生產(chǎn)者向Kafka服務(wù)器發(fā)送消息,Kafak接收消息后,再投遞給消費(fèi)者。在Kafka中,生產(chǎn)者消息會(huì)被發(fā)送到Topic中,Topic保存著各類的數(shù)據(jù),每一條數(shù)據(jù)都使用鍵、值進(jìn)行保存。每一個(gè)Topic下都包含一個(gè)或多個(gè)物理分區(qū)(Partition),這些分區(qū)維護(hù)著消息的內(nèi)容和索引,它們有可能被保存在不同的服務(wù)器上面。對(duì)于客戶端來(lái)說,無(wú)須關(guān)心數(shù)據(jù)如何被保存,只需要關(guān)心將消息發(fā)往哪個(gè)Topic。
Kafka依賴了ZooKeeper,啟動(dòng)Kafka服務(wù)器前,要先啟動(dòng)ZooKeeper。本章所使用的ZooKeeper版本為3.4.8,Kafka版本為2.11。下載兩個(gè)框架的壓縮包后解壓,分別得到zookeeper-3.4.8與kafka_2.11-0.11.0.0目錄。
先進(jìn)入zookeeper-3.4.8/conf目錄,將zoo_sample.cfg文件復(fù)制一份,并重命名為zoo.cfg。使用命令行工具,進(jìn)行zookeeper-3.4.8/bin目錄,運(yùn)行“zkServer”命令,如果正常啟動(dòng),將會(huì)占用2181端口,命令行窗口不必關(guān)閉,接下來(lái)啟動(dòng)Kafka。
使用命令行工具,進(jìn)行“kafka_2.11-0.11.0.0/bin/windows”目錄,運(yùn)行“kafka-server-start ../../config/server.properties”命令啟動(dòng)Kafka服務(wù)器,如果正常啟動(dòng),將會(huì)占用9092端口。此處的Kafka就相當(dāng)于前面章節(jié)中的RabbitMQ服務(wù)器,Kafka同樣提供了API讓我們編寫客戶端。接下來(lái),我們按照同樣的方式,使用Kafka的API來(lái)進(jìn)行測(cè)試。
新建一個(gè)名稱為“kafka-test”的Maven項(xiàng)目,加入以下依賴:
org.apache.kafka kafka-clients 0.11.0.0 org.slf4j slf4j-log4j12 1.7.9
新建生產(chǎn)者的運(yùn)行類,請(qǐng)見代碼清單8-3。
代碼清單8-3:codes\08\8.3\kafka-test\src\main\java\org\crazyit\cloud\ProducerMain.java
public class ProducerMain { public static void main(String[] args) throws Exception { // 配置信息 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); // 設(shè)置數(shù)據(jù)key的序列化處理類 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 設(shè)置數(shù)據(jù)value的序列化處理類 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 創(chuàng)建生產(chǎn)者實(shí)例 Producerproducer = new KafkaProducer<>(props); // 創(chuàng)建一條新的記錄,第一個(gè)參數(shù)為Topic名稱 ProducerRecord record = new ProducerRecord ("my-topic", "userName", "Angus"); // 發(fā)送記錄 producer.send(record); producer.close(); } }
生產(chǎn)者的代碼較RabbitMQ的簡(jiǎn)單,創(chuàng)建屬性實(shí)例,直接使用配置實(shí)例創(chuàng)建Producer(生產(chǎn)者),再創(chuàng)建一個(gè)ProducerRecord(記錄),最后直接發(fā)送。在創(chuàng)建記錄時(shí),指定了向“my-topic”投遞消息,消息的key為“userName”,value為“Angus”。消息發(fā)送后,Kafka會(huì)在服務(wù)器上創(chuàng)建一個(gè)相應(yīng)的Topic。運(yùn)行代碼清單8-3,將消息投遞到Kafka服務(wù)器的Topic中,接下來(lái)可以使用命令查看服務(wù)器的Topic。
使用命令行工具進(jìn)入kafka_2.11-0.11.0.0/bin/windows目錄,輸入命令“kafka-topics --list --zookeeper localhost:2181”,看到當(dāng)前Kafka服務(wù)器的Topic,如圖8-8所示。
圖8-8 查看Topic
如果想刪除服務(wù)器上面的Topic,可使用“kafka-topics --delete --zookeeper localhost:2181 --topic my-topic”命令,但在默認(rèn)情況下,執(zhí)行該命令只是將Topic標(biāo)記為刪除,如果想真正刪除Topic,需要修改config/server.properties文件,加入“delete.topic.enable=true”配置。
本例中生產(chǎn)者與消費(fèi)同在一個(gè)項(xiàng)目,只是使用不同的啟動(dòng)類。前面小節(jié)在編寫生產(chǎn)者時(shí),指定消息發(fā)送到“my-topic”,消費(fèi)者訂閱該Topic,就可以獲取到消息,詳細(xì)請(qǐng)見代碼清單8-4。
代碼清單8-4:codes\08\8.3\kafka-test\src\main\java\org\crazyit\cloud\ConsumerMain.java
public class ConsumerMain { public static void main(String[] args) { // 配置信息 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); // 必須指定消費(fèi)者組 props.put("group.id", "test"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerconsumer = new KafkaConsumer<>(props); // 訂閱 my-topic 的消息 consumer.subscribe(Arrays.asList("my-topic")); // 到服務(wù)器中讀取記錄 while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.println("key: " + record.key() + ", value: " + record.value()); } } } }
設(shè)置了配置的信息后,創(chuàng)建一個(gè)KafkaConsumer實(shí)例,通過該實(shí)例訂閱“my-topic”的消息,最后使用KafkaConsumer的poll方法獲取服務(wù)器消息并輸出。運(yùn)地代碼清單8-4,再運(yùn)行代碼清單8-5,可以看到輸出如下:
key: userName, value: Angus
在編寫消費(fèi)者時(shí),需要指定消費(fèi)者組的id,關(guān)于消費(fèi)者組,由于Spring Cloud Stream中也涉及這個(gè)概念,因此需要特別說明一下。
消費(fèi)者會(huì)為自己添加一個(gè)消費(fèi)者組的標(biāo)識(shí),每一條發(fā)布到Topic的記錄,都會(huì)被交付給消費(fèi)者組的一個(gè)消費(fèi)者實(shí)例。如果多個(gè)消費(fèi)者實(shí)例擁有相同的消費(fèi)者組,那么這些記錄將會(huì)分配到各個(gè)消費(fèi)者實(shí)例上,以達(dá)到負(fù)載均衡的目的。如果所有的消費(fèi)者都有不同的消費(fèi)者組,那么每一條記錄都會(huì)被廣播到全部的消費(fèi)者進(jìn)行處理。如果理解不了這段文字,請(qǐng)見圖8-9。
圖8-9 消費(fèi)者組
如圖8-9,如果消費(fèi)者A與消費(fèi)者B屬于同一個(gè)“消費(fèi)者組”,那么當(dāng)生產(chǎn)者發(fā)送一條消息過來(lái)時(shí),僅會(huì)交給其中一個(gè)消費(fèi)者處理;如果兩個(gè)消費(fèi)者不屬于同一個(gè)消費(fèi)者組,那么該消息都會(huì)發(fā)給他們(廣播)進(jìn)行處理。
看完上述內(nèi)容,你們掌握Apache Kafka框架是怎樣的呢的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!