MQ,是一種跨進(jìn)程的通信機(jī)制,用于上下游傳遞消息。在傳統(tǒng)的互聯(lián)網(wǎng)架構(gòu)中通常使用MQ來對(duì)上下游來做解耦合。
成都創(chuàng)新互聯(lián)主要從事網(wǎng)站設(shè)計(jì)、成都網(wǎng)站制作、網(wǎng)頁設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)天山,十多年網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專業(yè),歡迎來電咨詢建站服務(wù):18980820575
舉例:當(dāng)A系統(tǒng)對(duì)B系統(tǒng)進(jìn)行消息通訊,如A系統(tǒng)發(fā)布一條系統(tǒng)公告,B系統(tǒng)可以訂閱該頻道進(jìn)行系統(tǒng)公告同步,整個(gè)過程中A系統(tǒng)并不關(guān)系B系統(tǒng)會(huì)不會(huì)同步,由訂閱該頻道的系統(tǒng)自行處理。
官方說明:
隨著使用越來越多的隊(duì)列和虛擬主題,ActiveMQ IO模塊遇到了瓶頸。我們盡力通過節(jié)流,斷路器或降級(jí)來解決此問題,但效果不佳。因此,我們那時(shí)開始關(guān)注流行的消息傳遞解決方案Kafka。不幸的是,Kafka不能滿足我們的要求,特別是在低延遲和高可靠性方面。
看到這里可以很清楚的知道RcoketMQ 是一款低延遲、高可靠、可伸縮、易于使用的消息中間件。
具有以下特性:
下載地址:https://rocketmq.apache.org/dowloading/releases/
從官方下載二進(jìn)制或者源碼來進(jìn)行使用。源碼編譯需要Maven3.2x,JDK8
在根目錄進(jìn)行打包:
mvn -Prelease-all -DskipTests clean packager -U
distribution/target/apache-rocketmq
文件夾中會(huì)存在一個(gè)文件夾版,zip,tar三個(gè)可運(yùn)行的完整程序。
使用rocketmq-4.6.0.zip
:
SpringBoot 入門:https://www.cnblogs.com/SimpleWu/p/10027237.html
SpringBoot 常用start:https://www.cnblogs.com/SimpleWu/p/9798146.html
當(dāng)前環(huán)境版本為:
org.apache.rocketmq
rocketmq-client
${rocketmq.version}
由于我們這邊已經(jīng)有工程了所以就不在進(jìn)行創(chuàng)建這種過程了。主要是看看如何使用RocketMQ。
創(chuàng)建RocketMQProperties配置屬性類,類中內(nèi)容如下:
@ConfigurationProperties(prefix = "rocketmq")
public class RocketMQProperties {
private boolean isEnable = false;
private String namesrvAddr = "localhost:9876";
private String groupName = "default";
private int producerMaxMessageSize = 1024;
private int producerSendMsgTimeout = 2000;
private int producerRetryTimesWhenSendFailed = 2;
private int consumerConsumeThreadMin = 5;
private int consumerConsumeThreadMax = 30;
private int consumerConsumeMessageBatchMaxSize = 1;
//省略get set
}
現(xiàn)在我們所有子系統(tǒng)中的生產(chǎn)者,消費(fèi)者對(duì)應(yīng):
isEnable 是否開啟mq
namesrvAddr 集群地址
groupName 分組名稱
設(shè)置為統(tǒng)一已方便系統(tǒng)對(duì)接,如有其它需求在進(jìn)行擴(kuò)展,類中我們已經(jīng)給了默認(rèn)值也可以在配置文件或配置中心中獲取配置,配置如下:
#發(fā)送同一類消息的設(shè)置為同一個(gè)group,保證唯一,默認(rèn)不需要設(shè)置,rocketmq會(huì)使用ip@pid(pid代表jvm名字)作為唯一標(biāo)示
rocketmq.groupName=please_rename_unique_group_name
#是否開啟自動(dòng)配置
rocketmq.isEnable=true
#mq的nameserver地址
rocketmq.namesrvAddr=127.0.0.1:9876
#消息最大長度 默認(rèn)1024*4(4M)
rocketmq.producer.maxMessageSize=4096
#發(fā)送消息超時(shí)時(shí)間,默認(rèn)3000
rocketmq.producer.sendMsgTimeout=3000
#發(fā)送消息失敗重試次數(shù),默認(rèn)2
rocketmq.producer.retryTimesWhenSendFailed=2
#消費(fèi)者線程數(shù)量
rocketmq.consumer.consumeThreadMin=5
rocketmq.consumer.consumeThreadMax=32
#設(shè)置一次消費(fèi)消息的條數(shù),默認(rèn)為1條
rocketmq.consumer.consumeMessageBatchMaxSize=1
創(chuàng)建消費(fèi)者接口 RocketConsumer.java 該接口用戶約束消費(fèi)者需要的核心步驟:
/**
* 消費(fèi)者接口
*
* @author SimpleWu
*
*/
public interface RocketConsumer {
/**
* 初始化消費(fèi)者
*/
public abstract void init();
/**
* 注冊(cè)監(jiān)聽
*
* @param messageListener
*/
public void registerMessageListener(MessageListener messageListener);
}
創(chuàng)建抽象消費(fèi)者 AbstractRocketConsumer.java:
/**
* 消費(fèi)者基本信息
*
* @author SimpelWu
*/
public abstract class AbstractRocketConsumer implements RocketConsumer {
protected String topics;
protected String tags;
protected MessageListener messageListener;
protected String consumerTitel;
protected MQPushConsumer mqPushConsumer;
/**
* 必要的信息
*
* @param topics
* @param tags
* @param consumerTitel
*/
public void necessary(String topics, String tags, String consumerTitel) {
this.topics = topics;
this.tags = tags;
this.consumerTitel = consumerTitel;
}
public abstract void init();
@Override
public void registerMessageListener(MessageListener messageListener) {
this.messageListener = messageListener;
}
}
在類中我們必須指定這個(gè)topics,tags與消息監(jiān)聽邏輯public abstract void init();
該方法是用于初始化消費(fèi)者,由子類實(shí)現(xiàn)。
接下來我們編寫自動(dòng)配置類RocketMQConfiguation.java,該類用戶初始化一個(gè)默認(rèn)的生產(chǎn)者連接,以及加載所有的消費(fèi)者。
@EnableConfigurationProperties({ RocketMQProperties.class }) 使用該配置文件
@Configuration 標(biāo)注為配置類
@ConditionalOnProperty(prefix = "rocketmq", value = "isEnable", havingValue = "true") 只有當(dāng)配置中指定rocketmq.isEnable = true的時(shí)候才會(huì)生效
核心內(nèi)容如下:
/**
* mq配置
*
* @author SimpleWu
*/
@Configuration
@EnableConfigurationProperties({ RocketMQProperties.class })
@ConditionalOnProperty(prefix = "rocketmq", value = "isEnable", havingValue = "true")
public class RocketMQConfiguation {
private RocketMQProperties properties;
private ApplicationContext applicationContext;
private Logger log = LoggerFactory.getLogger(RocketMQConfiguation.class);
public RocketMQConfiguation(RocketMQProperties properties, ApplicationContext applicationContext) {
this.properties = properties;
this.applicationContext = applicationContext;
}
/**
* 注入一個(gè)默認(rèn)的消費(fèi)者
* @return
* @throws MQClientException
*/
@Bean
public DefaultMQProducer getRocketMQProducer() throws MQClientException {
if (StringUtils.isEmpty(properties.getGroupName())) {
throw new MQClientException(-1, "groupName is blank");
}
if (StringUtils.isEmpty(properties.getNamesrvAddr())) {
throw new MQClientException(-1, "nameServerAddr is blank");
}
DefaultMQProducer producer;
producer = new DefaultMQProducer(properties.getGroupName());
producer.setNamesrvAddr(properties.getNamesrvAddr());
// producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
// 如果需要同一個(gè)jvm中不同的producer往不同的mq集群發(fā)送消息,需要設(shè)置不同的instanceName
// producer.setInstanceName(instanceName);
producer.setMaxMessageSize(properties.getProducerMaxMessageSize());
producer.setSendMsgTimeout(properties.getProducerSendMsgTimeout());
// 如果發(fā)送消息失敗,設(shè)置重試次數(shù),默認(rèn)為2次
producer.setRetryTimesWhenSendFailed(properties.getProducerRetryTimesWhenSendFailed());
try {
producer.start();
log.info("producer is start ! groupName:{},namesrvAddr:{}", properties.getGroupName(),
properties.getNamesrvAddr());
} catch (MQClientException e) {
log.error(String.format("producer is error {}", e.getMessage(), e));
throw e;
}
return producer;
}
/**
* SpringBoot啟動(dòng)時(shí)加載所有消費(fèi)者
*/
@PostConstruct
public void initConsumer() {
Map consumers = applicationContext.getBeansOfType(AbstractRocketConsumer.class);
if (consumers == null || consumers.size() == 0) {
log.info("init rocket consumer 0");
}
Iterator beans = consumers.keySet().iterator();
while (beans.hasNext()) {
String beanName = (String) beans.next();
AbstractRocketConsumer consumer = consumers.get(beanName);
consumer.init();
createConsumer(consumer);
log.info("init success consumer title {} , toips {} , tags {}", consumer.consumerTitel, consumer.tags,
consumer.topics);
}
}
/**
* 通過消費(fèi)者信心創(chuàng)建消費(fèi)者
*
* @param consumerPojo
*/
public void createConsumer(AbstractRocketConsumer arc) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(this.properties.getGroupName());
consumer.setNamesrvAddr(this.properties.getNamesrvAddr());
consumer.setConsumeThreadMin(this.properties.getConsumerConsumeThreadMin());
consumer.setConsumeThreadMax(this.properties.getConsumerConsumeThreadMax());
consumer.registerMessageListener(arc.messageListenerConcurrently);
/**
* 設(shè)置Consumer第一次啟動(dòng)是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi) 如果非第一次啟動(dòng),那么按照上次消費(fèi)的位置繼續(xù)消費(fèi)
*/
// consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/**
* 設(shè)置消費(fèi)模型,集群還是廣播,默認(rèn)為集群
*/
// consumer.setMessageModel(MessageModel.CLUSTERING);
/**
* 設(shè)置一次消費(fèi)消息的條數(shù),默認(rèn)為1條
*/
consumer.setConsumeMessageBatchMaxSize(this.properties.getConsumerConsumeMessageBatchMaxSize());
try {
consumer.subscribe(arc.topics, arc.tags);
consumer.start();
arc.mqPushConsumer=consumer;
} catch (MQClientException e) {
log.error("info consumer title {}", arc.consumerTitel, e);
}
}
}
然后在src/main/resources文件夾中創(chuàng)建目錄與文件META-INF/spring.factories里面添加自動(dòng)配置類即可開啟啟動(dòng)配置,我們只需要導(dǎo)入依賴即可:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.xcloud.config.rocketmq.RocketMQConfiguation
接下來在服務(wù)中導(dǎo)入依賴,然后通過我們的抽象類獲取所有必要信息對(duì)消費(fèi)者進(jìn)行創(chuàng)建,該步驟會(huì)在所有消費(fèi)者初始化完成后進(jìn)行,且只會(huì)管理是Spring Bean的消費(fèi)者。下面我們看看如何創(chuàng)建一個(gè)消費(fèi)者,創(chuàng)建消費(fèi)者的步驟非常簡(jiǎn)單,只需要繼承AbstractRocketConsumer然后再加上Spring的@Component就能夠完成消費(fèi)者的創(chuàng)建,我們可以在類中自定義消費(fèi)的主題與標(biāo)簽。
br/>下面我們看看如何創(chuàng)建一個(gè)消費(fèi)者,創(chuàng)建消費(fèi)者的步驟非常簡(jiǎn)單,只需要繼承AbstractRocketConsumer然后再加上Spring的@Component就能夠完成消費(fèi)者的創(chuàng)建,我們可以在類中自定義消費(fèi)的主題與標(biāo)簽。
創(chuàng)建一個(gè)默認(rèn)的消費(fèi)者 DefaultConsumerMQ.java
@Component
public class DefaultConsumerMQ extends AbstractRocketConsumer {
/**
* 初始化消費(fèi)者
*/
@Override
public void init() {
// 設(shè)置主題,標(biāo)簽與消費(fèi)者標(biāo)題
super.necessary("TopicTest", "*", "這是標(biāo)題");
//消費(fèi)者具體執(zhí)行邏輯
registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
msgs.forEach(msg -> {
System.out.printf("consumer message boyd %s %n", new String(msg.getBody()));
});
// 標(biāo)記該消息已經(jīng)被成功消費(fèi)
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
}
}
super.necessary("TopicTest", "*", "這是標(biāo)題"); 是必須要設(shè)置的,代表該消費(fèi)者監(jiān)聽TopicTest主題下所有tags,標(biāo)題那個(gè)字段是我自己定義的,所以對(duì)于該配置來說沒什么意義。
我們可以在這里注入Spring的Bean來進(jìn)行任意邏輯處理。
創(chuàng)建一個(gè)消息發(fā)送類進(jìn)行測(cè)試
@Override
public String qmtest(@PathVariable("name")String name) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException {
Message msg = new Message("TopicTest", "tags1", name.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 發(fā)送消息到一個(gè)Broker
SendResult sendResult = defaultMQProducer.send(msg);
// 通過sendResult返回消息是否成功送達(dá)
System.out.printf("%s%n", sendResult);
return null;
}
我們來通過Http請(qǐng)求測(cè)試:
http://localhost:10001/demo/base/mq/hello consumer message boyd hello
http://localhost:10001/demo/base/mq/嘿嘿嘿嘿嘿 consumer message boyd 嘿嘿嘿嘿嘿
好了到這里簡(jiǎn)單的start算是設(shè)計(jì)完成了,后面還有一些:順序消息生產(chǎn),順序消費(fèi)消息,異步消息生產(chǎn)等一系列功能,官人可參照官方去自行處理。