公司主營業(yè)務(wù):成都網(wǎng)站建設(shè)、網(wǎng)站制作、移動(dòng)網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競(jìng)爭(zhēng)能力。創(chuàng)新互聯(lián)是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對(duì)我們的高要求,感謝他們從不同領(lǐng)域給我們帶來的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會(huì)用頭腦與智慧不斷的給客戶帶來驚喜。創(chuàng)新互聯(lián)推出龍鳳免費(fèi)做網(wǎng)站回饋大家。
消息隊(duì)列對(duì)比參照表:
RocketMQ vs. ActiveMQ vs. Kafka:
參考至:
環(huán)境要求:
1、下載RocketMQ的二進(jìn)制包,我這里使用的是4.5.1版本,下載地址如下:
http://rocketmq.apache.org/release_notes/release-notes-4.5.1/
使用wget命令下載:
[root@study-01 ~]# cd /usr/local/src
[root@study-01 /usr/local/src]# wget http://mirror.bit.edu.cn/apache/rocketmq/4.5.1/rocketmq-all-4.5.1-bin-release.zip
2、解壓下載好的壓縮包,并移動(dòng)到合適的目錄下:
[root@study-01 /usr/local/src]# unzip rocketmq-all-4.5.1-bin-release.zip
[root@study-01 /usr/local/src]# mv rocketmq-all-4.5.1-bin-release /usr/local/rocketmq-4.5.1
注:若沒有安裝unzip命令則使用如下命令安裝:
yum install -y unzip
3、進(jìn)入rocketmq的根目錄并查看是否包含如下目錄及文件:
[root@study-01 /usr/local/src]# cd /usr/local/rocketmq-4.5.1
[root@study-01 /usr/local/rocketmq-4.5.1]# ls
benchmark bin conf lib LICENSE NOTICE README.md
4、沒問題后,使用如下命令啟動(dòng)Name Server:
[root@study-01 /usr/local/rocketmq-4.5.1]# nohup sh bin/mqnamesrv &
[1] 2448
[root@study-01 /usr/local/rocketmq-4.5.1]#
5、查看默認(rèn)的9876端口是否被監(jiān)聽,以驗(yàn)證Name Server是否啟動(dòng)成功:
[root@study-01 /usr/local/rocketmq-4.5.1]# netstat -lntp |grep java
tcp6 0 0 :::9876 :::* LISTEN 2454/java
[root@study-01 /usr/local/rocketmq-4.5.1]#
6、啟動(dòng)Broker:
[root@study-01 /usr/local/rocketmq-4.5.1]# nohup sh bin/mqbroker -n localhost:9876 &
[2] 2485
[root@study-01 /usr/local/rocketmq-4.5.1]#
7、驗(yàn)證Broker是否啟動(dòng)成功,如果啟動(dòng)成功,能看到類似如下的日志::
[root@study-01 /usr/local/rocketmq-4.5.1]# cat ~/logs/rocketmqlogs/broker.log |grep "boot success"
2019-08-04 01:27:38 INFO main - The broker[study-01, 192.168.190.129:10911] boot success. serializeType=JSON and name server is localhost:9876
[root@study-01 /usr/local/rocketmq-4.5.1]#
若想停止Name Server和Broker,則依次執(zhí)行以下兩條命令即可:
[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/mqshutdown broker
The mqbroker(2492) is running...
Send shutdown request to mqbroker(2492) OK # 輸出該信息說明停止成功
[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/mqshutdown namesrv
The mqnamesrv(2454) is running...
Send shutdown request to mqnamesrv(2454) OK # 輸出該信息說明停止成功
[2]+ 退出 143 nohup sh bin/mqbroker -n localhost:9876
[root@study-01 /usr/local/rocketmq-4.5.1]#
1、驗(yàn)證生產(chǎn)消息正常,執(zhí)行如下命令:
[root@study-01 /usr/local/rocketmq-4.5.1]# export NAMESRV_ADDR=localhost:9876
[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
正常的情況下,會(huì)看到一堆的類似于如下的輸出,這是生產(chǎn)消息后成功的result:
SendResult [sendStatus=SEND_OK, msgId=C0A8BE810A690D7163610FCC253B03E7, offsetMsgId=C0A8BE8100002A9F000000000002BDFE, messageQueue=MessageQueue [topic=TopicTest, brokerName=study-01, queueId=3], queueOffset=249]
2、驗(yàn)證消費(fèi)消息正常,執(zhí)行如下命令:
[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
正常的情況下,會(huì)看到一堆的類似于如下的輸出,這是消費(fèi)的消息內(nèi)容:
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=3, storeSize=180, queueOffset=242, sysFlag=0, bornTimestamp=1564853837073, bornHost=/192.168.190.129:34708, storeTimestamp=1564853837074, storeHost=/192.168.190.129:10911, msgId=C0A8BE8100002A9F000000000002AA4E, commitLogOffset=174670, bodyCRC=911284903, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1564854006859, UNIQ_KEY=C0A8BE810A690D7163610FCC251103CB, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 55, 49], transactionId='null'}]]
RocketMQ官方提供了一個(gè)基于Spring Boot開發(fā)的可視化控制臺(tái),可以方便我們查看RocketMQ的運(yùn)行情況以及提升運(yùn)維效率。所以本小節(jié)將介紹一下如何搭建搭建RocketMQ的控制臺(tái),由于我們使用的RocketMQ版本是4.5.1,所以需要對(duì)控制臺(tái)的源碼進(jìn)行一些改動(dòng)以適配RocketMQ的4.5.1版本。
1、首先需要下載源碼,有兩種方式,一是使用git克隆代碼倉庫,二是直接下載rocketmq-externals的zip包,我這里使用git方式,執(zhí)行如下命令:
git clone https://github.com/apache/rocketmq-externals.git
2、修改控制臺(tái)代碼,使用IDE打開rocketmq-console
項(xiàng)目,如下圖所示:
2.1、修改項(xiàng)目中的application.properties
配置文件,我這里主要是修改了監(jiān)聽端口和Name Server的連接地址,至于其他配置項(xiàng)有需要的話可按照說明自行修改:
# console的監(jiān)聽端口,默認(rèn)是8080
server.port=8011
# Name Server的連接地址;非必須,可以在啟動(dòng)了console后,在控制臺(tái)導(dǎo)航欄 - 運(yùn)維 - NameSvrAddrList一欄設(shè)置
rocketmq.config.namesrvAddr=192.168.190.129:9876
2.2、修改依賴,由于console項(xiàng)目默認(rèn)使用的rocketmq版本是4.4.0,與我們這里使用的是4.5.1不完全兼容,所以需要修改一下依賴版本,找到這一行:
4.4.0
修改為:
4.5.1
2.3、修改代碼,由于修改了rocketmq的版本,會(huì)導(dǎo)致org.apache.rocketmq.console.service.impl.MessageServiceImpl#queryMessageByTopic
方法編譯報(bào)錯(cuò),所以需要改動(dòng)一下此處代碼 ,將:
@Override
public List queryMessageByTopic(String topic, final long begin, final long end) {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null);
...
修改為:
@Override
public List queryMessageByTopic(String topic, final long begin, final long end) {
RPCHook rpcHook = null;
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
...
3、打包構(gòu)建并啟動(dòng),打開idea的terminal,執(zhí)行如下命令:
# 在rocketmq-console目錄下執(zhí)行
mvn clean package -DskipTests
# 進(jìn)入jar包存放目錄
cd target
# 啟動(dòng)rocketmq console
java -jar rocketmq-console-ng-1.0.1.jar
4、使用瀏覽器訪問控制臺(tái),我這里由于修改了端口,所以訪問地址是:http://localhost:8011
,正常的情況下能看到如下界面:
不習(xí)慣英文的話可以在右上角切換語言:
由于控制臺(tái)是可視化界面并且支持中文,這里就不過多介紹了,可以參考官方的控制臺(tái)使用說明文檔:
我這里將基本的術(shù)語與概念簡(jiǎn)單總結(jié)成了思維導(dǎo)圖:
官方文檔:
在以上小節(jié)搭建完RocketMQ之后,我們來使用Spring的消息編程模型,編寫一個(gè)簡(jiǎn)單的示例。首先需要在項(xiàng)目中添加相關(guān)依賴如下:
org.apache.rocketmq
rocketmq-spring-boot-starter
2.0.3
在配置文件中添加rocketmq相關(guān)的配置如下:
rocketmq:
name-server: 192.168.190.129:9876
producer:
# 小坑:必須指定group
group: test-group
編寫生產(chǎn)者的代碼,這里以Controller做示例,具體代碼如下:
package com.zj.node.contentcenter.controller.content;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
/**
* 生產(chǎn)者
*
* @author 01
* @date 2019-08-03
**/
@RestController
@RequiredArgsConstructor
public class TestProducerController {
/**
* 用于發(fā)送消息到 RocketMQ 的api
*/
private final RocketMQTemplate rocketMQTemplate;
@GetMapping("/test-rocketmq/sendMsg")
public String testSendMsg() {
String topic = "test-topic";
// 發(fā)送消息
rocketMQTemplate.convertAndSend(topic, MyMessage.getInstance());
return "send message success";
}
}
@Data
class MyMessage {
private Integer id;
private String name;
private String status;
private Date createTime;
static MyMessage getInstance() {
MyMessage message = new Message();
message.id = 1;
message.name = "×××";
message.status = "default";
message.createTime = new Date();
return message;
}
}
編寫完成后,啟動(dòng)項(xiàng)目,訪問該接口:
消息發(fā)送成功后,可以到RocketMQ的控制臺(tái)中進(jìn)行查看:
消息體可以在消息詳情中查看,如下:
從生產(chǎn)者的代碼來看,可以說是十分的簡(jiǎn)單了,只需要使用一個(gè)RocketMQTemplate就可以實(shí)現(xiàn)將對(duì)象轉(zhuǎn)換成消息體并發(fā)送消息。實(shí)際上除了RocketMQ外,其他的MQ也有對(duì)應(yīng)的Template,如下:
在消費(fèi)者項(xiàng)目中,也需要添加rocketmq的依賴:
org.apache.rocketmq
rocketmq-spring-boot-starter
2.0.3
同樣需要配置Name Server的連接地址:
rocketmq:
name-server: 192.168.190.129:9876
編寫消費(fèi)者的代碼,具體代碼如下:
package com.zj.node.usercenter.rocketmq;
import com.alibaba.fastjson.JSON;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 消費(fèi)者監(jiān)聽器
*
* @author 01
* @date 2019-08-03
**/
@Slf4j
@Component
// topic需要和生產(chǎn)者的topic一致,consumerGroup屬性是必須指定的,內(nèi)容可以隨意
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer-group")
public class TestConsumerListener implements RocketMQListener {
/**
* 監(jiān)聽到消息的時(shí)候就會(huì)調(diào)用該方法
*
* @param message 消息體
*/
@Override
public void onMessage(MyMessage message) {
log.info("從test-topic中監(jiān)聽到消息");
log.info(JSON.toJSONString(message));
}
}
/**
* 消息體結(jié)構(gòu)需要一致
*/
@Data
class MyMessage {
private Integer id;
private String name;
private String status;
private Date createTime;
}
編寫完成后啟動(dòng)項(xiàng)目,由于之前我們已經(jīng)往隊(duì)列里發(fā)送了消息,所以此時(shí)消費(fèi)者項(xiàng)目一啟動(dòng),就可以監(jiān)聽到消息并消費(fèi),控制臺(tái)就會(huì)輸出如下日志:
眾所周知RocketMQ是支持事務(wù)消息的,這也是很多人選擇使用RocketMQ作為消息中間件的一大原因,也是RocketMQ的一大特定。RocketMQ事務(wù)消息的流程如下圖所示:
由于原圖是英文的,所以進(jìn)行了一個(gè)大致的翻譯。如下:
簡(jiǎn)單剖析一下流程:
1、生產(chǎn)者向MQ Server發(fā)送半消息,半消息是一種特殊的消息,這種消息會(huì)被存儲(chǔ)到MQ Server里,但是會(huì)標(biāo)記為暫時(shí)不能投遞的狀態(tài),所以此時(shí)消費(fèi)者不會(huì)消費(fèi)該消息
2、當(dāng)半消息發(fā)送成功后,生產(chǎn)者就會(huì)去執(zhí)行本地事務(wù)
3、生產(chǎn)者根據(jù)本地事務(wù)的執(zhí)行結(jié)果,向MQ Server發(fā)送commit或rollback消息進(jìn)行二次確認(rèn)。如果MQ Server接收到的是commit則會(huì)將半消息標(biāo)記為可投遞狀態(tài),那么消費(fèi)者就可以進(jìn)行消費(fèi)。反之,MQ Server接收到的是rollback則會(huì)將半消息丟棄掉,消費(fèi)者就無法進(jìn)行消費(fèi)
4、若MQ Server未接收到二次確認(rèn)的消息或生產(chǎn)者暫停了本地事務(wù)的執(zhí)行,MQ Server則會(huì)定時(shí)(默認(rèn)1分鐘)向生產(chǎn)者發(fā)送回查消息,檢查生產(chǎn)者的本地事務(wù)狀態(tài)。然后生產(chǎn)者會(huì)根據(jù)回查的本地事務(wù)執(zhí)行結(jié)果向MQ Server再次發(fā)送commit或rollback消息
概念術(shù)語:
消息三態(tài):
要想實(shí)現(xiàn)RocketMQ事務(wù)消息的話,需要按照流程圖編寫一些代碼。在開始編碼之前,先在數(shù)據(jù)庫中創(chuàng)建一張RocketMQ的事務(wù)日志表,用作于本地事務(wù)回查的依據(jù),表結(jié)構(gòu)如下:
然后再建一張表,作為事務(wù)方法操作的數(shù)據(jù)表,表結(jié)構(gòu)如下:
接著開始寫代碼,首先定義一個(gè)service,里面有帶有事務(wù)注解的方法以及發(fā)送事務(wù)消息的方法。具體代碼如下:
package com.zj.node.contentcenter.service.test;
import com.zj.node.contentcenter.dao.content.NoticeMapper;
import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper;
import com.zj.node.contentcenter.domain.entity.content.Notice;
import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.UUID;
/**
* @author 01
* @date 2019-08-08
**/
@Service
@RequiredArgsConstructor
public class TestProducerService {
private final RocketMQTemplate rocketMQTemplate;
private final NoticeMapper noticeMapper;
private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
public String testSendMsg(Notice notice) {
// topic
String topic = "test-topic";
// 生產(chǎn)者所在的事務(wù)組
String txProducerGroup = "tx-test-producer-group";
// 生產(chǎn)事務(wù)id
String transactionId = UUID.randomUUID().toString();
// 發(fā)送半消息
rocketMQTemplate.sendMessageInTransaction(
txProducerGroup, topic,
// 消息體
MessageBuilder.withPayload("事務(wù)消息")
// header是消息的頭部分,可以用作傳參
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("notice_id", notice.getId())
.build(),
// 傳遞到executeLocalTransaction的參數(shù)
notice);
return "send message success";
}
@Transactional(rollbackFor = Exception.class)
public void updateNotice(Integer noticeId, Notice notice) {
Notice newNotice = new Notice();
newNotice.setId(noticeId);
newNotice.setContent(notice.getContent());
noticeMapper.updateByPrimaryKeySelective(newNotice);
}
@Transactional(rollbackFor = Exception.class)
public void updateNoticeWithRocketMQLog(Integer noticeId, Notice notice, String transactionId) {
updateNotice(noticeId, notice);
// 寫入事務(wù)日志
rocketmqTransactionLogMapper.insertSelective(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.log("updateNotice")
.build()
);
}
}
實(shí)現(xiàn)一個(gè)本地事務(wù)監(jiān)聽器,用于執(zhí)行事務(wù)方法及提供本地事務(wù)狀態(tài)的回查方法。具體代碼如下:
package com.zj.node.contentcenter.rocketmq;
import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper;
import com.zj.node.contentcenter.domain.entity.content.Notice;
import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog;
import com.zj.node.contentcenter.service.test.TestProducerService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
/**
* 本地事務(wù)監(jiān)聽器
*
* @author 01
* @date 2019-08-08
**/
@Slf4j
@RequiredArgsConstructor
// 這里的txProducerGroup需要與sendMessageInTransaction里設(shè)置的一致
@RocketMQTransactionListener(txProducerGroup = "tx-test-producer-group")
public class TestTransactionListener implements RocketMQLocalTransactionListener {
private final TestProducerService service;
private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
/**
* 用于執(zhí)行本地事務(wù)的方法
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
log.info("執(zhí)行本地事務(wù)方法. 事務(wù)id: {}", transactionId);
// header里拿出來的都是String類型
Integer noticeId = Integer.parseInt((String) headers.get("notice_id"));
try {
// 執(zhí)行帶有事務(wù)注解的方法
service.updateNoticeWithRocketMQLog(noticeId, (Notice) arg, transactionId);
// 正常執(zhí)行,向MQ Server發(fā)送commit消息
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("本地事務(wù)方法發(fā)生異常,消息將被回滾", e);
// 發(fā)生異常向MQ Server發(fā)送rollback消息
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 用于回查本地事務(wù)的執(zhí)行結(jié)果
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
log.warn("回查本地事務(wù)狀態(tài). 事務(wù)id: {}", transactionId);
// 按事務(wù)id查詢?nèi)罩緮?shù)據(jù)
RocketmqTransactionLog transactionLog = rocketmqTransactionLogMapper.selectOne(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.build()
);
// 如果能按事務(wù)id查詢出來數(shù)據(jù)表示本地事務(wù)執(zhí)行成功,沒有數(shù)據(jù)則表示本地事務(wù)執(zhí)行失敗
if (transactionLog == null) {
log.warn("本地事務(wù)執(zhí)行失敗,事務(wù)日志不存在,消息將被回滾. 事務(wù)id: {}", transactionId);
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.COMMIT;
}
}
簡(jiǎn)單說明一下這些方法的執(zhí)行流程:
首先調(diào)用
TestProducerService.testSendMsg
向MQ Server發(fā)送半消息,從代碼也可以看到該方法里不會(huì)執(zhí)行本地事務(wù)方法。當(dāng)MQ Server接收半消息成功后,會(huì)告訴生產(chǎn)者接收成功,接著就會(huì)執(zhí)行本地事務(wù)監(jiān)聽器里的executeLocalTransaction
方法,該方法里會(huì)調(diào)用TestProducerService
里帶有事務(wù)注解的方法updateNoticeWithRocketMQLog
,并在事務(wù)方法執(zhí)行完畢后返回本地事務(wù)狀態(tài)給MQ Server。若executeLocalTransaction
方法返回的事務(wù)狀態(tài)是UNKNOWN
或者該方法出于某種原因沒有被執(zhí)行完畢,那么MQ Server就接收不到二次確認(rèn)消息,默認(rèn)會(huì)在一分鐘后向生產(chǎn)者發(fā)送回查消息,生產(chǎn)者接收到回查消息的話就會(huì)執(zhí)行本地事務(wù)監(jiān)聽器里的checkLocalTransaction
方法,通過事務(wù)日志記錄表的數(shù)據(jù)來確認(rèn)該事務(wù)狀態(tài)并返回。
由于rocketmq有自己內(nèi)部的日志體系,所以默認(rèn)不會(huì)使用Slf4j。體現(xiàn)到executeLocalTransaction
方法的話,就是如果該方法的執(zhí)行過程中拋出了異常的話,異常信息不會(huì)被打印到控制臺(tái),而是輸出到rocketmq_client.log日志文件中。相關(guān)源碼:org.apache.rocketmq.client.log.ClientLogger
如果希望rocketmq的日志輸出到控制臺(tái)的話,需要在啟動(dòng)類的main方法中增加如下代碼:
// 讓rocketmq使用slf4j日志
System.setProperty(ClientLogger.CLIENT_LOG_USESLF4J, "true");