真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

如何在springboot中使用spring-kafka實現(xiàn)一個接收消息功能

本篇文章為大家展示了如何在spring boot中使用spring-kafka實現(xiàn)一個接收消息功能,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。

創(chuàng)新互聯(lián)公司主營鐵西網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,app開發(fā)定制,鐵西h5小程序開發(fā)搭建,鐵西網(wǎng)站營銷推廣歡迎鐵西等地區(qū)企業(yè)咨詢

實現(xiàn)方法

pom.xml文件如下

<?xml version="1.0" encoding="UTF-8"?>

 4.0.0
 
 org.linuxsogood.sync
 linuxsogood-sync
 1.0.0-SNAPSHOT
 
 
  org.springframework.boot
  spring-boot-starter-parent
  1.4.0.RELEASE
 
 
 
  1.8
  
  3.3.1
  1.2.4
  3.3.6
  4.1.1
 
 
 
  
   org.springframework.boot
   spring-boot-starter-web
  
  
   org.springframework.boot
   spring-boot-starter-jdbc
  
  
   org.springframework.boot
   spring-boot-starter-aop
  
  
   org.springframework.boot
   spring-boot-starter-freemarker
  
  
  
   org.springframework.kafka
   spring-kafka
   1.1.0.RELEASE
  
  
  
   junit
   junit
   4.12
   test
  
  
   org.assertj
   assertj-core
   3.5.2
  
  
   org.hamcrest
   hamcrest-all
   1.3
   test
  
  
   org.mockito
   mockito-all
   1.9.5
   test
  
  
   org.springframework
   spring-test
   4.2.3.RELEASE
   test
  
  
   org.springframework.boot
   spring-boot-starter-test
   test
  
  
   MySQL
   mysql-connector-java
  
  
   com.microsoft.sqlserver
   sqljdbc4
   4.0.0
  
  
   com.alibaba
   druid
   1.0.11
  
 
  
  
   org.mybatis
   mybatis
   ${mybatis.version}
  
  
   org.mybatis
   mybatis-spring
   ${mybatis.spring.version}
  
  
  
  
   org.mybatis.generator
   mybatis-generator-core
   1.3.2
   compile
   true
  
  
  
   com.github.pagehelper
   pagehelper
   ${pagehelper.version}
  
  
  
   tk.mybatis
   mapper
   ${mapper.version}
  
  
   com.alibaba
   fastjson
   1.2.17
  
 
 
  
   repo.spring.io.milestone
   Spring Framework Maven Milestone Repository
   https://repo.spring.io/libs-milestone
  
 
 
  mybatis_generator
  
   
    org.mybatis.generator
    mybatis-generator-maven-plugin
    1.3.2
    
     true
     true
    
   
   
    org.springframework.boot
    spring-boot-maven-plugin
    
     org.linuxsogood.sync.Starter
    
   
  
 

orm層使用了MyBatis,又使用了通用Mapper和分頁插件.

kafka消費(fèi)端配置

import org.linuxsogood.sync.listener.Listener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
 
import java.util.HashMap;
import java.util.Map;
 
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
 
 @Value("${kafka.broker.address}")
 private String brokerAddress;
 
 @Bean
 KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
 ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
 factory.setConsumerFactory(consumerFactory());
 factory.setConcurrency(3);
 factory.getContainerProperties().setPollTimeout(3000);
 return factory;
 }
 
 @Bean
 public ConsumerFactory consumerFactory() {
 return new DefaultKafkaConsumerFactory<>(consumerConfigs());
 }
 
 @Bean
 public Map consumerConfigs() {
 Map propsMap = new HashMap<>();
 propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
 propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
 propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
 propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "firehome-group");
 propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 return propsMap;
 }
 
 @Bean
 public Listener listener() {
 return new Listener();
 }
}

生產(chǎn)者的配置.

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
 
import java.util.HashMap;
import java.util.Map;
 
@Configuration
@EnableKafka
public class KafkaProducerConfig {
 
 @Value("${kafka.broker.address}")
 private String brokerAddress;
 
 @Bean
 public ProducerFactory producerFactory() {
 return new DefaultKafkaProducerFactory<>(producerConfigs());
 }
 
 @Bean
 public Map producerConfigs() {
 Map props = new HashMap<>();
 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
 props.put(ProducerConfig.RETRIES_CONFIG, 0);
 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
 props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 return props;
 }
 
 @Bean
 public KafkaTemplate kafkaTemplate() {
 return new KafkaTemplate(producerFactory());
 }
}

監(jiān)聽,監(jiān)聽里面,寫的就是業(yè)務(wù)邏輯了,從kafka里面得到數(shù)據(jù)后,具體怎么去處理. 如果需要開啟kafka處理消息的廣播模式,多個監(jiān)聽要監(jiān)聽不同的group,即方法上的注解@KafkaListener里的group一定要不一樣.如果多個監(jiān)聽里的group寫的一樣,就會造成只有一個監(jiān)聽能處理其中的消息,另外監(jiān)聽就不能處理消息了.也即是kafka的分布式消息處理方式.

在同一個group里的監(jiān)聽,共同處理接收到的消息,會根據(jù)一定的算法來處理.如果不在一個組,但是監(jiān)聽的是同一個topic的話,就會形成廣播模式

import com.alibaba.fastjson.JSON;
import org.linuxsogood.qilian.enums.CupMessageType;
import org.linuxsogood.qilian.kafka.MessageWrapper;
import org.linuxsogood.qilian.model.store.Store;
import org.linuxsogood.sync.mapper.StoreMapper;
import org.linuxsogood.sync.model.StoreExample;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;

import java.util.List;
import java.util.Optional;

public class Listener {

 private static final Logger LOGGER = LoggerFactory.getLogger(Listener.class);

 @Autowired
 private StoreMapper storeMapper;

 /**
  * 監(jiān)聽kafka消息,如果有消息則消費(fèi),同步數(shù)據(jù)到新烽火的庫
  * @param record 消息實體bean
  */
 @KafkaListener(topics = "linuxsogood-topic", group = "sync-group")
 public void listen(ConsumerRecord<?, ?> record) {
  Optional<?> kafkaMessage = Optional.ofNullable(record.value());
  if (kafkaMessage.isPresent()) {
   Object message = kafkaMessage.get();
   try {
    MessageWrapper messageWrapper = JSON.parseObject(message.toString(), MessageWrapper.class);
    CupMessageType type = messageWrapper.getType();
    //判斷消息的數(shù)據(jù)類型,不同的數(shù)據(jù)入不同的表
    if (CupMessageType.STORE == type) {
     proceedStore(messageWrapper);
    }
   } catch (Exception e) {
    LOGGER.error("將接收到的消息保存到數(shù)據(jù)庫時異常, 消息:{}, 異常:{}",message.toString(),e);
   }
  }
 }

 /**
  * 消息是店鋪類型,店鋪消息處理入庫
  * @param messageWrapper 從kafka中得到的消息
  */
 private void proceedStore(MessageWrapper messageWrapper) {
  Object data = messageWrapper.getData();
  Store cupStore = JSON.parseObject(data.toString(), Store.class);
  StoreExample storeExample = new StoreExample();
  String storeName = StringUtils.isBlank(cupStore.getStoreOldName()) ? cupStore.getStoreName() : cupStore.getStoreOldName();
  storeExample.createCriteria().andStoreNameEqualTo(storeName);
  List stores = storeMapper.selectByExample(storeExample);
  org.linuxsogood.sync.model.Store convertStore = new org.linuxsogood.sync.model.Store();
  org.linuxsogood.sync.model.Store store = convertStore.convert(cupStore);
  //如果查詢不到記錄則新增
  if (stores.size() == 0) {
   storeMapper.insert(store);
  } else {
   store.setStoreId(stores.get(0).getStoreId());
   storeMapper.updateByPrimaryKey(store);
  }
 }

}

上述內(nèi)容就是如何在spring boot中使用spring-kafka實現(xiàn)一個接收消息功能,你們學(xué)到知識或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識儲備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


網(wǎng)頁題目:如何在springboot中使用spring-kafka實現(xiàn)一個接收消息功能
本文來源:http://weahome.cn/article/pcpojh.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部