本文小編為大家詳細(xì)介紹“SpringBoot怎么整合Kafka”,內(nèi)容詳細(xì),步驟清晰,細(xì)節(jié)處理妥當(dāng),希望這篇“SpringBoot怎么整合Kafka”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學(xué)習(xí)新知識吧。
目前成都創(chuàng)新互聯(lián)已為數(shù)千家的企業(yè)提供了網(wǎng)站建設(shè)、域名、網(wǎng)頁空間、網(wǎng)站托管、服務(wù)器租用、企業(yè)網(wǎng)站設(shè)計、霞山網(wǎng)站維護等服務(wù),公司將堅持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長,共同發(fā)展。
一、準(zhǔn)備工作
提前說明:如果你運行出問題,請檢查Kafka的版本與SpringBoot的版本是否與我文中的一致,本文中的環(huán)境已經(jīng)經(jīng)過測試。
Kafka服務(wù)版本為 kafka_2.11-1.1.0 (Scala), 也就是1.1.0
SpringBoot版本:1.5.10.RELEASE
提前啟動zk,kafka,并且創(chuàng)建一個Topic
[root@Basic kafka_2.11-1.1.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_topic
確保你的kafka能夠訪問,如果訪問不了,需要打開外網(wǎng)訪問。
config/server.properties
advertised.listeners=PLAINTEXT://192.168.239.128:9092
Maven 依賴
二、項目結(jié)構(gòu)
為了更加體現(xiàn)實際開發(fā)需求,一般生產(chǎn)者都是在調(diào)用某些接口的服務(wù)處理完邏輯之后然后往kafka里面扔數(shù)據(jù),然后有一個消費者不停的監(jiān)控這個Topic,然后處理數(shù)據(jù),所以這里把生產(chǎn)者作為一個接口,消費者放到kafka這個目錄下,注意@Component注解,不然掃描不到@KafkaListener
三、具體實現(xiàn)代碼
SpringBoot配置文件
application.yml
spring:
kafka:
bootstrap-servers: 192.168.239.128:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: test
enable-auto-commit: true
auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
生產(chǎn)者
package cn.saytime.web;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 測試kafka生產(chǎn)者
*/
@RestController
@RequestMapping("kafka")
public class TestKafkaProducerController {
@Autowired
private KafkaTemplate
@RequestMapping("send")
public String send(String msg){
kafkaTemplate.send("test_topic", msg);
return "success";
}
}
消費者
這里的消費者會監(jiān)聽這個主題,有消息就會執(zhí)行,不需要進行while(true)
package cn.saytime.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* kafka消費者測試
*/
@Component
public class TestConsumer {
@KafkaListener(topics = "test_topic")
public void listen (ConsumerRecord, ?> record) throws Exception {
System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
}
}
項目啟動類
package cn.saytime;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class TestApplication{
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
}
四、測試
運行項目,執(zhí)行:http://localhost:8080/kafka/send?msg=hello
控制臺輸出:
topic = test_topic, offset = 19, value = hello
1
為了體現(xiàn)消費者不止執(zhí)行一次就結(jié)束,再調(diào)用一次接口:
http://localhost:8080/kafka/send?msg=kafka
topic = test_topic, offset = 20, value = kafka
1
所以可以看到這里消費者實際上是不停的poll Topic數(shù)據(jù)的。
讀到這里,這篇“SpringBoot怎么整合Kafka”文章已經(jīng)介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領(lǐng)會,如果想了解更多相關(guān)內(nèi)容的文章,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。