本篇文章為大家展示了如何在spring boot中使用spring-kafka實現(xiàn)一個接收消息功能,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。
創(chuàng)新互聯(lián)公司主營鐵西網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,app開發(fā)定制,鐵西h5小程序開發(fā)搭建,鐵西網(wǎng)站營銷推廣歡迎鐵西等地區(qū)企業(yè)咨詢
實現(xiàn)方法
pom.xml文件如下
<?xml version="1.0" encoding="UTF-8"?>4.0.0 org.linuxsogood.sync linuxsogood-sync 1.0.0-SNAPSHOT org.springframework.boot spring-boot-starter-parent 1.4.0.RELEASE 1.8 3.3.1 1.2.4 3.3.6 4.1.1 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-jdbc org.springframework.boot spring-boot-starter-aop org.springframework.boot spring-boot-starter-freemarker org.springframework.kafka spring-kafka 1.1.0.RELEASE junit junit 4.12 test org.assertj assertj-core 3.5.2 org.hamcrest hamcrest-all 1.3 test org.mockito mockito-all 1.9.5 test org.springframework spring-test 4.2.3.RELEASE test org.springframework.boot spring-boot-starter-test test MySQL mysql-connector-java com.microsoft.sqlserver sqljdbc4 4.0.0 com.alibaba druid 1.0.11 org.mybatis mybatis ${mybatis.version} org.mybatis mybatis-spring ${mybatis.spring.version} org.mybatis.generator mybatis-generator-core 1.3.2 compile true com.github.pagehelper pagehelper ${pagehelper.version} tk.mybatis mapper ${mapper.version} com.alibaba fastjson 1.2.17 repo.spring.io.milestone Spring Framework Maven Milestone Repository https://repo.spring.io/libs-milestone mybatis_generator org.mybatis.generator mybatis-generator-maven-plugin 1.3.2 true true org.springframework.boot spring-boot-maven-plugin org.linuxsogood.sync.Starter
orm層使用了MyBatis,又使用了通用Mapper和分頁插件.
kafka消費(fèi)端配置
import org.linuxsogood.sync.listener.Listener; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.broker.address}") private String brokerAddress; @Bean KafkaListenerContainerFactory> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map consumerConfigs() { Map propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "firehome-group"); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return propsMap; } @Bean public Listener listener() { return new Listener(); } }
生產(chǎn)者的配置.
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaProducerConfig { @Value("${kafka.broker.address}") private String brokerAddress; @Bean public ProducerFactoryproducerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map producerConfigs() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate (producerFactory()); } }
監(jiān)聽,監(jiān)聽里面,寫的就是業(yè)務(wù)邏輯了,從kafka里面得到數(shù)據(jù)后,具體怎么去處理. 如果需要開啟kafka處理消息的廣播模式,多個監(jiān)聽要監(jiān)聽不同的group,即方法上的注解@KafkaListener里的group一定要不一樣.如果多個監(jiān)聽里的group寫的一樣,就會造成只有一個監(jiān)聽能處理其中的消息,另外監(jiān)聽就不能處理消息了.也即是kafka的分布式消息處理方式.
在同一個group里的監(jiān)聽,共同處理接收到的消息,會根據(jù)一定的算法來處理.如果不在一個組,但是監(jiān)聽的是同一個topic的話,就會形成廣播模式
import com.alibaba.fastjson.JSON; import org.linuxsogood.qilian.enums.CupMessageType; import org.linuxsogood.qilian.kafka.MessageWrapper; import org.linuxsogood.qilian.model.store.Store; import org.linuxsogood.sync.mapper.StoreMapper; import org.linuxsogood.sync.model.StoreExample; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import java.util.List; import java.util.Optional; public class Listener { private static final Logger LOGGER = LoggerFactory.getLogger(Listener.class); @Autowired private StoreMapper storeMapper; /** * 監(jiān)聽kafka消息,如果有消息則消費(fèi),同步數(shù)據(jù)到新烽火的庫 * @param record 消息實體bean */ @KafkaListener(topics = "linuxsogood-topic", group = "sync-group") public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); try { MessageWrapper messageWrapper = JSON.parseObject(message.toString(), MessageWrapper.class); CupMessageType type = messageWrapper.getType(); //判斷消息的數(shù)據(jù)類型,不同的數(shù)據(jù)入不同的表 if (CupMessageType.STORE == type) { proceedStore(messageWrapper); } } catch (Exception e) { LOGGER.error("將接收到的消息保存到數(shù)據(jù)庫時異常, 消息:{}, 異常:{}",message.toString(),e); } } } /** * 消息是店鋪類型,店鋪消息處理入庫 * @param messageWrapper 從kafka中得到的消息 */ private void proceedStore(MessageWrapper messageWrapper) { Object data = messageWrapper.getData(); Store cupStore = JSON.parseObject(data.toString(), Store.class); StoreExample storeExample = new StoreExample(); String storeName = StringUtils.isBlank(cupStore.getStoreOldName()) ? cupStore.getStoreName() : cupStore.getStoreOldName(); storeExample.createCriteria().andStoreNameEqualTo(storeName); Liststores = storeMapper.selectByExample(storeExample); org.linuxsogood.sync.model.Store convertStore = new org.linuxsogood.sync.model.Store(); org.linuxsogood.sync.model.Store store = convertStore.convert(cupStore); //如果查詢不到記錄則新增 if (stores.size() == 0) { storeMapper.insert(store); } else { store.setStoreId(stores.get(0).getStoreId()); storeMapper.updateByPrimaryKey(store); } } }
上述內(nèi)容就是如何在spring boot中使用spring-kafka實現(xiàn)一個接收消息功能,你們學(xué)到知識或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識儲備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。