小編給大家分享一下spring kakfa如何集成,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
為企業(yè)提供成都網(wǎng)站設(shè)計(jì)、網(wǎng)站制作、外貿(mào)營(yíng)銷網(wǎng)站建設(shè)、網(wǎng)站優(yōu)化、成都全網(wǎng)營(yíng)銷推廣、競(jìng)價(jià)托管、品牌運(yùn)營(yíng)等營(yíng)銷獲客服務(wù)。創(chuàng)新互聯(lián)建站擁有網(wǎng)絡(luò)營(yíng)銷運(yùn)營(yíng)團(tuán)隊(duì),以豐富的互聯(lián)網(wǎng)營(yíng)銷經(jīng)驗(yàn)助力企業(yè)精準(zhǔn)獲客,真正落地解決中小企業(yè)營(yíng)銷獲客難題,做到“讓獲客更簡(jiǎn)單”。自創(chuàng)立至今,成功用技術(shù)實(shí)力解決了企業(yè)“網(wǎng)站建設(shè)、網(wǎng)絡(luò)品牌塑造、網(wǎng)絡(luò)營(yíng)銷”三大難題,同時(shí)降低了營(yíng)銷成本,提高了有效客戶轉(zhuǎn)化率,獲得了眾多企業(yè)客戶的高度認(rèn)可!
1.1 kafka-producer.xml配置說明
classpath:/properties/kafka-producer.properties
//topic名稱
1.2 kafka-producer.properties屬性文件
bootstrap.servers=192.168.0.75:9092,192.168.0.75:9093,192.168.0.75:9094 group.id=testGroup retries=1 batch.size=16384 linger.ms=1 buffer.memory=33554432 defaultTopic=topic-test
1.3 生產(chǎn)端接口封裝說明:
1)類名:
com.rkhd.ienterprise.kafka.producer.KafkaProducerServer
2)方法:
/** * 發(fā)送信息(不分區(qū)) * @param data 要發(fā)送的數(shù)據(jù) * @return 返回一個(gè)map。如果成功code為0,其他則為失敗 */ public MapsendDefault(Object data);
/** * 發(fā)送信息(不分區(qū)) * @param key 要發(fā)送的鍵 * @param data 要發(fā)送的數(shù)據(jù) * @return 返回一個(gè)map。如果成功code為0,其他則為失敗 */ public MapsendDefault(Object key, Object data);
/** * 發(fā)送信息(分區(qū)) * @param partitionNum 分區(qū)數(shù)(大于1),請(qǐng)注意分區(qū)數(shù)是在topic創(chuàng)建的時(shí)候就指定了,不能改變了 * @param key 要發(fā)送的鍵 * @param data 要發(fā)送的數(shù)據(jù) * @return 返回一個(gè)map。如果成功code為0,其他則為失敗 */ public MapsendDefault(int partitionNum, Object key, Object data);
/** * 發(fā)送信息(不分區(qū)) * @param topic 發(fā)送目的topic名稱,如果topic為null或者是為"",則會(huì)使用xml中配置的defaultTopic * @param data 要發(fā)送的數(shù)據(jù) * @return 返回一個(gè)map。如果成功code為0,其他則為失敗 */ public MapsendMessage(String topic, Object data);
/** * 發(fā)送信息(不分區(qū)) * @param topic 發(fā)送目的topic名稱,如果topic為null或者是為"",則會(huì)使用xml中配置的defaultTopic * @param key 要發(fā)送的鍵 * @param data 要發(fā)送的數(shù)據(jù) * @return 返回一個(gè)map。如果成功code為0,其他則為失敗 * */ public MapsendMessage(String topic, Object key, Object data);
/** * 發(fā)送信息(分區(qū)) * @param topic 發(fā)送目的topic名稱,如果topic為null或者是為"",則會(huì)使用xml中配置的defaultTopic * @param partitionNum 分區(qū)數(shù)(大于1),請(qǐng)注意分區(qū)數(shù)是在topic創(chuàng)建的時(shí)候就指定了,不能改變了 * @param data 要發(fā)送的數(shù)據(jù) * @return 返回一個(gè)map。如果成功code為0,其他則為失敗 */ public MapsendMessage(String topic, Integer partitionNum, Object data);
/** * 發(fā)送信息(分區(qū)) * @param topic 發(fā)送目的topic名稱,如果topic為null或者是為"",則會(huì)使用xml中配置的defaultTopic * @param key 要發(fā)送的鍵 * @param value 要發(fā)送的數(shù)據(jù) * @param partitionNum 分區(qū)數(shù)(大于1),請(qǐng)注意分區(qū)數(shù)是在topic創(chuàng)建的時(shí)候就指定了,不能改變了 * @return 返回一個(gè)map。如果成功code為0,其他則為失敗 * */ public MapsendMessage(String topic, int partitionNum, Object key, Object value);
2.1 kafka-consumer.xml配置說明
//配置消費(fèi)端數(shù)量
2.2 kafka-consumer.properties屬性文件
bootstrap.servers=192.168.0.75:9092,192.168.0.75:9093,192.168.0.75:9094 group.id=testGroup enable.auto.commit=false auto.commit.interval.ms=1000 session.timeout.ms=15000 topicName=ahao-test
2.3 消費(fèi)端接口封裝說明
1)類名:com.rkhd.ienterprise.mq.client.consumer.client.KafkaConsumerClient
2)對(duì)外提供抽象方法(根據(jù)不同的業(yè)務(wù)實(shí)現(xiàn)):
public abstract void onConsumer(ConsumerRecordrecord);
3)實(shí)現(xiàn)說明:各業(yè)務(wù)線通過繼承該類實(shí)現(xiàn)該抽象方法;
3.1 Kafka的特性
高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒
可擴(kuò)展性:kafka集群支持熱擴(kuò)展
持久性、可靠性:消息被持久化到本地磁盤,并且支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失
容錯(cuò)性:允許集群中節(jié)點(diǎn)失敗(若副本數(shù)量為n,則允許n-1個(gè)節(jié)點(diǎn)失?。?/p>
高并發(fā):支持?jǐn)?shù)千個(gè)客戶端同時(shí)讀寫
3.2 Kafka架構(gòu)組件
Kafka中發(fā)布訂閱的對(duì)象是topic。我們可以為每類數(shù)據(jù)創(chuàng)建一個(gè)topic,把向topic發(fā)布消息的客戶端稱作producer,從topic訂閱消息的客戶端稱作consumer。Producers和 consumers可以同時(shí)從多個(gè)topic讀寫數(shù)據(jù)。一個(gè)kafka集群由一個(gè)或多個(gè)broker服務(wù)器組成,它負(fù)責(zé)持久化和備份具體的kafka消息。
topic:消息存放的目錄即主題
Producer:生產(chǎn)消息到topic的一方
Consumer:訂閱topic消費(fèi)消息的一方
Broker:Kafka的服務(wù)實(shí)例就是一個(gè)broker
3.3 kafka 應(yīng)用場(chǎng)景
日志收集:一個(gè)公司可以用Kafka可以收集各種服務(wù)的log,通過kafka以統(tǒng)一接口服務(wù)的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
消息系統(tǒng):解耦和生產(chǎn)者和消費(fèi)者、緩存消息等。
用戶活動(dòng)跟蹤:Kafka經(jīng)常被用來記錄web用戶或者app用戶的各種活動(dòng),如瀏覽網(wǎng)頁、搜索、點(diǎn)擊等活動(dòng),這些活動(dòng)信息被各個(gè)服務(wù)器發(fā)布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實(shí)時(shí)的監(jiān)控分析,或者裝載到hadoop、數(shù)據(jù)倉庫中做離線分析和挖掘。
運(yùn)營(yíng)指標(biāo):Kafka也經(jīng)常用來記錄運(yùn)營(yíng)監(jiān)控?cái)?shù)據(jù)。包括收集各種分布式應(yīng)用的數(shù)據(jù),生產(chǎn)各種操作的集中反饋,比如報(bào)警和報(bào)告。
流式處理:比如spark streaming和storm
事件源
以上是“spring kakfa如何集成”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!