這篇“SpringBoot如何整合RocketMQ事務(wù)、廣播以及順序消息”文章的知識點大部分人都不太理解,所以小編給大家總結(jié)了以下內(nèi)容,內(nèi)容詳細(xì),步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“SpringBoot如何整合RocketMQ事務(wù)、廣播以及順序消息”文章吧。
創(chuàng)新互聯(lián)建站是一家集網(wǎng)站建設(shè),閻良企業(yè)網(wǎng)站建設(shè),閻良品牌網(wǎng)站建設(shè),網(wǎng)站定制,閻良網(wǎng)站建設(shè)報價,網(wǎng)絡(luò)營銷,網(wǎng)絡(luò)優(yōu)化,閻良網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強企業(yè)競爭力??沙浞譂M足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時我們時刻保持專業(yè)、時尚、前沿,時刻以成就客戶成長自我,堅持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實用型網(wǎng)站。
環(huán)境:springboot2.3.9RELEASE + RocketMQ4.8.0
org.springframework.boot spring-boot-starter-web org.apache.rocketmq rocketmq-spring-boot-starter 2.2.0
server: port: 8080 --- rocketmq: nameServer: localhost:9876 producer: group: demo-mq
發(fā)送
@Resource private RocketMQTemplate rocketMQTemplate ; public void send(String message) { rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build()); }
接受
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2") @Component public class ConsumerListener implements RocketMQListener{ @Override public void onMessage(String message) { System.out.println("接收到消息:" + message) ; } }
發(fā)送
@Resource private RocketMQTemplate rocketMQTemplate ; public void sendOrder(String topic, String message, String tags, int id) { rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(), "order-" + id, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message +"\tqueueId: " + sendResult.getMessageQueue().getQueueId()) ; } @Override public void onException(Throwable e) { e.printStackTrace() ; } }); }
這里是根據(jù)hashkey將消息發(fā)送到不同的隊列中
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group", selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY) @Component public class ConsumerOrderListener implements RocketMQListener{ @Override public void onMessage(String message) { System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message) ; } }
consumeMode = ConsumeMode.ORDERLY,指明了消息模式為順序模式,一個隊列,一個線程。
結(jié)果
當(dāng)consumeMode = ConsumeMode.CONCURRENTLY執(zhí)行結(jié)果如下:
發(fā)送端
@Resource private RocketMQTemplate rocketMQTemplate ; public void send(String topic, String message, String tags) { rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build()) ; }
消費端
@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING) @Component public class ConsumerBroadListener implements RocketMQListener{ @Override public void onMessage(String message) { System.out.println("ConsumerBroadListener1接收到消息:" + message) ; } }
messageModel = MessageModel.CLUSTERING
測試
啟動兩個服務(wù)分別端口是8080,8081
8080服務(wù)
8081服務(wù)
集群消息模式下,每個服務(wù)分別接收一部分消息,實現(xiàn)了 messageModel = MessageModel.BROADCASTING 測試 啟動兩個服務(wù)分別端口是8080,8081 8080服務(wù) 8081服務(wù) 集群消息模式下,每個服務(wù)分別都接受了同樣的消息。 RocketMQ事務(wù)的3個狀態(tài) TransactionStatus.CommitTransaction:提交事務(wù)消息,消費者可以消費此消息 TransactionStatus.RollbackTransaction:回滾事務(wù),它代表該消息將被刪除,不允許被消費。 TransactionStatus.Unknown :中間狀態(tài),它代表需要檢查消息隊列來確定狀態(tài)。 RocketMQ實現(xiàn)事務(wù)消息主要分為兩個階段:正常事務(wù)的發(fā)送及提交、事務(wù)信息的補償流程 整體流程為: 正常事務(wù)發(fā)送與提交階段 1、生產(chǎn)者發(fā)送一個半消息給MQServer(半消息是指消費者暫時不能消費的消息) 2、服務(wù)端響應(yīng)消息寫入結(jié)果,半消息發(fā)送成功 3、開始執(zhí)行本地事務(wù) 4、根據(jù)本地事務(wù)的執(zhí)行狀態(tài)執(zhí)行Commit或者Rollback操作 事務(wù)信息的補償流程 1、如果MQServer長時間沒收到本地事務(wù)的執(zhí)行狀態(tài)會向生產(chǎn)者發(fā)起一個確認(rèn)回查的操作請求 2、生產(chǎn)者收到確認(rèn)回查請求后,檢查本地事務(wù)的執(zhí)行狀態(tài) 3、根據(jù)檢查后的結(jié)果執(zhí)行Commit或者Rollback操作 補償階段主要是用于解決生產(chǎn)者在發(fā)送Commit或者Rollback操作時發(fā)生超時或失敗的情況。 發(fā)送端 生產(chǎn)者對應(yīng)的監(jiān)聽器 消費端 Service Controller 測試 調(diào)用接口后,控制臺輸出: 從打印日志看出來都保存完畢了后 消費端才接受到消息。 刪除數(shù)據(jù),再測試ID為1會報錯的。 數(shù)據(jù)庫中沒有數(shù)據(jù)。。。 是不是也不是很復(fù)雜,2個階段來處理。 以上就是關(guān)于“SpringBoot如何整合RocketMQ事務(wù)、廣播以及順序消息”這篇文章的內(nèi)容,相信大家都有了一定的了解,希望小編分享的內(nèi)容對大家有幫助,若想了解更多相關(guān)的知識內(nèi)容,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。事務(wù)消息
@Resource private RocketMQTemplate rocketMQTemplate ; public void sendTx(String topic, Long id, String tags) { rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload( new Users(id, UUID.randomUUID().toString().replaceAll("-", ""))). setHeader("BID", UUID.randomUUID().toString().replaceAll("-", "")).build(), UUID.randomUUID().toString().replaceAll("-", "")) ; }
@RocketMQTransactionListener public class ProducerTxListener implements RocketMQLocalTransactionListener { @Resource private BusinessService bs ; @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 這里執(zhí)行本地的事務(wù)操作,比如保存數(shù)據(jù)。 try { // 創(chuàng)建一個日志記錄表,將這唯一的ID存入數(shù)據(jù)庫中,在下面的check方法中可以根據(jù)這個id查詢是否有數(shù)據(jù) String id = (String) msg.getHeaders().get("BID") ; Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class) ; System.out.println("消息內(nèi)容:" + users + "\t參與數(shù)據(jù):" + arg + "\t本次事務(wù)的唯一編號:" + id) ; bs.save(users, new UsersLog(users.getId(), id)) ; } catch (Exception e) { e.printStackTrace() ; return RocketMQLocalTransactionState.ROLLBACK ; } return RocketMQLocalTransactionState.COMMIT ; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // 這里檢查本地事務(wù)是否執(zhí)行成功 String id = (String) msg.getHeaders().get("BID") ; System.out.println("執(zhí)行查詢ID為:" + id + " 的數(shù)據(jù)是否存在") ; UsersLog usersLog = bs.queryUsersLog(id) ; if (usersLog == null) { return RocketMQLocalTransactionState.ROLLBACK ; } return RocketMQLocalTransactionState.COMMIT ; } }
@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10") @Component public class ConsumerTxListener implements RocketMQListener
@Transactional public boolean save(Users users, UsersLog usersLog) { usersRepository.save(users) ; usersLogRepository.save(usersLog) ; if (users.getId() == 1) { throw new RuntimeException("數(shù)據(jù)錯誤") ; } return true ; } public UsersLog queryUsersLog(String bid) { return usersLogRepository.findByBid(bid) ; }
@GetMapping("/tx/{id}") public Object sendTx(@PathVariable("id")Long id) { ps.sendTx("tx-topic", id, "tag10") ; return "send transaction success" ; }
網(wǎng)站標(biāo)題:SpringBoot如何整合RocketMQ事務(wù)、廣播以及順序消息
文章轉(zhuǎn)載:http://weahome.cn/article/pjijjg.html