這篇文章主要介紹RocketMQ中事務(wù)消息狀態(tài)回查的示例分析,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!
創(chuàng)新互聯(lián)專(zhuān)業(yè)為企業(yè)提供永昌網(wǎng)站建設(shè)、永昌做網(wǎng)站、永昌網(wǎng)站設(shè)計(jì)、永昌網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁(yè)設(shè)計(jì)與制作、永昌企業(yè)網(wǎng)站模板建站服務(wù),十多年永昌做網(wǎng)站經(jīng)驗(yàn),不只是建網(wǎng)站,更提供有價(jià)值的思路和整體網(wǎng)絡(luò)服務(wù)。學(xué)習(xí)事務(wù)狀態(tài)消息回查,我們知道,第一次提交到消息服務(wù)器時(shí)消息的主題被替換為RMQ_SYS_TRANS_HALF_TOPIC,本地事務(wù)執(zhí)行完后如果返回本地事務(wù)狀態(tài)為UN_KNOW時(shí),第二次提交到服務(wù)器時(shí)將不會(huì)做任何操作,也就是說(shuō)此時(shí)消息還存在與RMQ_SYS_TRANS_HALF_TOPIC主題中,并不能被消息消費(fèi)者消費(fèi),那這些消息最終如何被提交或回滾呢?
原來(lái)RocketMQ使用TransactionalMessageCheckService線程定時(shí)去檢測(cè)
RMQ_SYS_TRANS_HALF_TOPIC主題中的消息,回查消息的事務(wù)狀態(tài)。TransactionalMessageCheckService的檢測(cè)頻率默認(rèn)1分鐘,可通過(guò)在broker.conf文件中設(shè)置transactionCheckInterval的值來(lái)改變默認(rèn)值,單位為毫秒。
接下來(lái)將深入分析該線程的實(shí)現(xiàn)原理,從而解開(kāi)事務(wù)消息回查機(jī)制。
TransactionalMessageCheckService#onWaitEndprotected void onWaitEnd() { long timeout = brokerController.getBrokerConfig().getTransactionTimeOut(); // @1 int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax(); // @2 long begin = System.currentTimeMillis(); log.info("Begin to check prepare message, begin time:{}", begin); this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener()); // @3 log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin); }
代碼@1:從broker配置文件中獲取transactionTimeOut參數(shù)值。
代碼@2:從broker配置文件中獲取transactionCheckMax參數(shù)值,表示事務(wù)的大檢測(cè)次數(shù),如果超過(guò)檢測(cè)次數(shù),消息會(huì)默認(rèn)為丟棄,即回滾消息。
接下來(lái)重點(diǎn)分析TransactionalMessageService#check的實(shí)現(xiàn)邏輯:
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl TransactionalMessageServiceImpl#check String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC; SetmsgQueues = transactionalMessageBridge.fetchMessageQueues(topic);if (msgQueues == null || msgQueues.size() == 0) { log.warn("The queue of topic is empty :" + topic); return; }
step1:根據(jù)主題名稱(chēng),獲取該主題下所有的消息隊(duì)列。
TransactionalMessageServiceImpl#checkfor (MessageQueue messageQueue : msgQueues) { // ...}
Step2:循環(huán)遍歷消息隊(duì)列,從單個(gè)消息消費(fèi)隊(duì)列去獲取消息。
TransactionalMessageServiceImpl#checklong startTime = System.currentTimeMillis(); MessageQueue opQueue = getOpQueue(messageQueue);long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue); log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);if (halfOffset < 0 || opOffset < 0) { log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue, halfOffset, opOffset); continue; }
Step3:獲取對(duì)應(yīng)的操作隊(duì)列,其主題為:RMQ_SYS_TRANS_OP_HALF_TOPIC,然后獲取操作隊(duì)列的消費(fèi)進(jìn)度、待操作的消費(fèi)隊(duì)列的消費(fèi)進(jìn)度,如果任意一小于0,忽略該消息隊(duì)列,繼續(xù)處理下一個(gè)隊(duì)列。
TransactionalMessageServiceImpl#check ListdoneOpOffset = new ArrayList<>(); HashMap removeMap = new HashMap<>(); PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);if (null == pullResult) { log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null", messageQueue, halfOffset, opOffset); continue; }
Step4:調(diào)用fillOpRemoveMap主題填充removeMap、doneOpOffset數(shù)據(jù)結(jié)構(gòu),這里主要的目的是避免重復(fù)調(diào)用事務(wù)回查接口,這里說(shuō)一下RMQ_SYS_TRANS_HALF_TOPIC、RMQ_SYS_TRANS_OP_HALF_TOPIC這兩個(gè)主題的作用。
RMQ_SYS_TRANS_HALF_TOPIC:prepare消息的主題,事務(wù)消息首先先進(jìn)入到該主題。
RMQ_SYS_TRANS_OP_HALF_TOPIC:當(dāng)消息服務(wù)器收到事務(wù)消息的提交或回滾請(qǐng)求后,會(huì)將消息存儲(chǔ)在該主題下。
TransactionalMessageServiceImpl#check// single threadint getMessageNullCount = 1;long newOffset = halfOffset;long i = halfOffset; // @1 while (true) { if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) { // @2 log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT); break; } if (removeMap.containsKey(i)) { // @3 log.info("Half offset {} has been committed/rolled back", i); removeMap.remove(i); } else { GetResult getResult = getHalfMsg(messageQueue, i); // @4 MessageExt msgExt = getResult.getMsg(); if (msgExt == null) { // @5 if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) { break; } if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) { log.info("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult()); break; } else { log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult()); i = getResult.getPullResult().getNextBeginOffset(); newOffset = i; continue; } } if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) { // @6 listener.resolveDiscardMsg(msgExt); newOffset = i + 1; i++; continue; } if (msgExt.getStoreTimestamp() >= startTime) { log.info("Fresh stored. the miss offset={}, check it later, store={}", i, new Date(msgExt.getStoreTimestamp())); break; } long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp(); // @7 long checkImmunityTime = transactionTimeout; String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS); if (null != checkImmunityTimeStr) { // @8 checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout); if (valueOfCurrentMinusBorn < checkImmunityTime) { if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt, checkImmunityTime)) { newOffset = i + 1; i++; continue; } } } else { // @9 if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) { log.info("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i, checkImmunityTime, new Date(msgExt.getBornTimestamp())); break; } } ListopMsg = pullResult.getMsgFoundList(); boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)) || (valueOfCurrentMinusBorn <= -1); // @10 if (isNeedCheck) { if (!putBackHalfMsgQueue(msgExt, i)) { // @11 continue; } listener.resolveHalfMsg(msgExt); } else { pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); // @12 log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult); continue; } } newOffset = i + 1; i++; }if (newOffset != halfOffset) { // @13 transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset); }long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);if (newOpOffset != opOffset) { // @14 transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset); }
本段代碼比較長(zhǎng),卻是事務(wù)狀態(tài)回查的重點(diǎn)實(shí)現(xiàn)。
代碼@1:先解釋幾個(gè)局部變量的含義。
getMessageNullCount :獲取空消息的次數(shù)
newOffset :當(dāng)前處理RMQ_SYS_TRANS_HALF_TOPIC#queueId的最新進(jìn)度
i:當(dāng)前處理消息的隊(duì)列偏移量,其主題依然為RMQ_SYS_TRANS_HALF_TOPIC。
代碼@2:這段代碼應(yīng)該不陌生,這是RocketMQ處理任務(wù)的一個(gè)通用處理邏輯,就是一個(gè)任務(wù)處理,可以限制每次最多處理的時(shí)間,RocketMQ為待檢測(cè)主題RMQ_SYS_TRANS_HALF_TOPIC的每個(gè)隊(duì)列,做事務(wù)狀態(tài)回查,一次最多不超過(guò)60S,目前該值不可配置。
代碼@3:如果removeMap中包含當(dāng)前處理的消息,則繼續(xù)下一條,removeMap中的值是通過(guò)Step3中填充的,具體實(shí)現(xiàn)邏輯是從RMQ_SYS_TRANS_OP_HALF_TOPIC主題中拉取32條,如果拉取的消息隊(duì)列偏移量大于等于RMQ_SYS_TRANS_HALF_TOPIC#queueId當(dāng)前的處理進(jìn)度時(shí),會(huì)添加到removeMap中,表示已處理過(guò)。
代碼@4:根據(jù)消息隊(duì)列偏移量i從消費(fèi)隊(duì)列中獲取消息。
代碼@5:如果消息為空,則根據(jù)允許重復(fù)次數(shù)進(jìn)行操作,默認(rèn)重試一次,目前不可配置。其具體實(shí)現(xiàn)為:
如果超過(guò)重試次數(shù),直接跳出,結(jié)束該消息隊(duì)列的事務(wù)狀態(tài)回查。
如果是由于沒(méi)有新的消息而返回為空(拉取狀態(tài)為:PullStatus.NO_NEW_MSG),則結(jié)束該消息隊(duì)列的事務(wù)狀態(tài)回查。
1.其他原因,則將偏移量i設(shè)置為: getResult.getPullResult().getNextBeginOffset(),重新拉取。
代碼@6:判斷該消息是否需要discard(吞沒(méi),丟棄,不處理)、或skip(跳過(guò)),其依據(jù)如下:
needDiscard 依據(jù):如果該消息回查的次數(shù)超過(guò)允許的大回查次數(shù),則該消息將被丟棄,即事務(wù)消息提交失敗,不能被消費(fèi)者消費(fèi),其做法,主要是每回查一次,在消息屬性TRANSACTION_CHECK_TIMES中增1,默認(rèn)大回查次數(shù)為5次。
needSkip依據(jù):如果事務(wù)消息超過(guò)文件的過(guò)期時(shí)間,默認(rèn)72小時(shí)(具體請(qǐng)查看RocketMQ過(guò)期文件相關(guān)內(nèi)容),則跳過(guò)該消息。
代碼@7:處理事務(wù)超時(shí)相關(guān)概念,先解釋幾個(gè)局部變量:、
valueOfCurrentMinusBorn :該消息已存儲(chǔ)的時(shí)間,等于系統(tǒng)當(dāng)前時(shí)間減去消息存儲(chǔ)的時(shí)間戳。
checkImmunityTime :立即檢測(cè)事務(wù)消息的時(shí)間。
transactionTimeout:事務(wù)消息的超時(shí)時(shí)間,其設(shè)計(jì)的意義是,應(yīng)用程序在發(fā)送事務(wù)消息后,事務(wù)不會(huì)馬上提交,該時(shí)間就是假設(shè)事務(wù)消息發(fā)送成功后,應(yīng)用程序事務(wù)提交的時(shí)間,在這段時(shí)間內(nèi),RocketMQ任務(wù)事務(wù)未提交,故不應(yīng)該在這個(gè)時(shí)間段向應(yīng)用程序發(fā)送回查請(qǐng)求。
代碼@8:如果消息指定了事務(wù)消息過(guò)期時(shí)間屬性(PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS),如果當(dāng)前時(shí)間已超過(guò)該值。
代碼@9:如果當(dāng)前時(shí)間還未過(guò)(應(yīng)用程序事務(wù)結(jié)束時(shí)間),則跳出本次回查處理的,等下一次再試。
代碼@10:判斷是否需要發(fā)送事務(wù)回查消息,具體邏輯:
如果從操作隊(duì)列(RMQ_SYS_TRANS_OP_HALF_TOPIC)中沒(méi)有已處理消息并且已經(jīng)超過(guò)(應(yīng)用程序事務(wù)結(jié)束時(shí)間),參數(shù)transactionTimeOut值。
如果操作隊(duì)列不為空,并且最后一天條消息的存儲(chǔ)時(shí)間已經(jīng)超過(guò)transactionTimeOut值。
代碼@11:如果需要發(fā)送事務(wù)狀態(tài)回查消息,則先將消息再次發(fā)送到RMQ_SYS_TRANS_HALF_TOPIC主題中,發(fā)送成功則返回true,否則返回false,這里還有一個(gè)實(shí)現(xiàn)關(guān)鍵點(diǎn):
if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { msgExt.setQueueOffset( putMessageResult.getAppendMessageResult().getLogicsOffset()); msgExt.setCommitLogOffset( putMessageResult.getAppendMessageResult().getWroteOffset()); msgExt.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); }
如果發(fā)送成功,會(huì)將該消息的queueOffset、commitLogOffset設(shè)置為重新存入的偏移量,為什么需要這樣呢,答案在listener.resolveHalfMsg(msgExt)中。
AbstractTransactionalMessageCheckListener#resolveHalfMsgpublic void resolveHalfMsg(final MessageExt msgExt) { executorService.execute(new Runnable() { @Override public void run() { try { sendCheckMessage(msgExt); } catch (Exception e) { LOGGER.error("Send check message error!", e); } } }); }
發(fā)送具體的事務(wù)回查機(jī)制,這里用一個(gè)線程池來(lái)異步發(fā)送回查消息,為了回查進(jìn)度保存的簡(jiǎn)化,這里只要發(fā)送了回查消息,當(dāng)前回查進(jìn)度會(huì)向前推動(dòng),如果回查失敗,上一步驟新增的消息將可以再次發(fā)送回查消息,那如果回查消息發(fā)送成功,那會(huì)不會(huì)下一次又重復(fù)發(fā)送回查消息呢?這個(gè)可以根據(jù)OP隊(duì)列中的消息來(lái)判斷是否重復(fù),如果回查消息發(fā)送成功并且消息服務(wù)器完成提交或回滾操作,這條消息會(huì)發(fā)送到OP隊(duì)列中,然后fillOpRemoveMap根據(jù)處理進(jìn)度獲取一批已處理的消息,來(lái)與消息判斷是否重復(fù),由于fillopRemoveMap一次只拉32條消息,那又如何保證一定能拉取到與當(dāng)前消息的處理記錄呢?其實(shí)就是通過(guò)代碼@10來(lái)實(shí)現(xiàn)的,如果此批消息最后一條未超過(guò)事務(wù)延遲消息,則繼續(xù)拉取更多消息進(jìn)行判斷(@12)和(@14),op隊(duì)列也會(huì)隨著回查進(jìn)度的推進(jìn)而推進(jìn)。
代碼@12:如果無(wú)法判斷是否發(fā)送回查消息,則加載更多的已處理消息進(jìn)行刷選。
代碼@13:保存(Prepare)消息隊(duì)列的回查進(jìn)度。
代碼@14:保存處理隊(duì)列(op)的進(jìn)度。
上述講解了TransactionalMessageCheckService回查定時(shí)線程的發(fā)送回查消息的整體流程與實(shí)現(xiàn)細(xì)節(jié),接下來(lái)重點(diǎn)分析一下上述步驟@11,通過(guò)異步方式發(fā)送消息回查的實(shí)現(xiàn)過(guò)程。
AbstractTransactionalMessageCheckListener#sendCheckMessagepublic void sendCheckMessage(MessageExt msgExt) throws Exception { CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader(); checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset()); checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId()); checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId()); checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset()); // @1 msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); msgExt.setStoreSize(0); // @2 String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); // @3 Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId); if (channel != null) { brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt); // @4 } else { LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId); } }
代碼@1:首先構(gòu)建回查事務(wù)狀態(tài)請(qǐng)求消息,請(qǐng)求核心參數(shù)包括:消息offsetId、消息ID(索引)、消息事務(wù)ID、事務(wù)消息隊(duì)列中的偏移量(RMQ_SYS_TRANS_HALF_TOPIC)。
代碼@2:恢復(fù)原消息的主題、隊(duì)列,并設(shè)置storeSize為0。
代碼@3:獲取生產(chǎn)者組名稱(chēng)。
代碼@4:根據(jù)生產(chǎn)者組獲取任意一個(gè)生產(chǎn)者,通過(guò)與其連接發(fā)送事務(wù)回查消息,回查消息的請(qǐng)求者為【Broker服務(wù)器】,接收者為(client,具體為消息生產(chǎn)者)。
其處理類(lèi)為:org.apache.rocketmq.client.impl.ClientRemotingProcessor#processRequest,其詳細(xì)邏輯實(shí)現(xiàn)方法為:
ClientRemotingProcessor#checkTransactionStatepublic RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final CheckTransactionStateRequestHeader requestHeader = (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class); final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody()); final MessageExt messageExt = MessageDecoder.decode(byteBuffer); if (messageExt != null) { String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { messageExt.setTransactionId(transactionId); } final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); if (group != null) { MQProducerInner producer = this.mqClientFactory.selectProducer(group); if (producer != null) { final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); producer.checkTransactionState(addr, messageExt, requestHeader); // @1 } else { log.debug("checkTransactionState, pick producer by group[{}] failed", group); } } else { log.warn("checkTransactionState, pick producer group failed"); } } else { log.warn("checkTransactionState, decode message failed"); } return null; }
代碼@1:最終調(diào)用生產(chǎn)者的checkTransactionState方法。
DefaultMQProducerImpl#checkTransactionStatepublic void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) { Runnable request = new Runnable() { // @1 private final String brokerAddr = addr; private final MessageExt message = msg; private final CheckTransactionStateRequestHeader checkRequestHeader = header; private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup(); @Override public void run() { TransactionListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener(); // @1 if (transactionCheckListener != null) { LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable exception = null; try { localTransactionState = transactionCheckListener.checkLocalTransaction(message); // @2 } catch (Throwable e) { log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e); exception = e; } this.processTransactionState( // @3 localTransactionState, group, exception); } else { log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group); } } private void processTransactionState( final LocalTransactionState localTransactionState, final String producerGroup, final Throwable exception) { final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader(); thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset()); thisHeader.setProducerGroup(producerGroup); thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset()); thisHeader.setFromTransactionCheck(true); String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (uniqueKey == null) { uniqueKey = message.getMsgId(); } thisHeader.setMsgId(uniqueKey); thisHeader.setTransactionId(checkRequestHeader.getTransactionId()); switch (localTransactionState) { case COMMIT_MESSAGE: thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); log.warn("when broker check, client rollback this transaction, {}", thisHeader); break; case UNKNOW: thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); log.warn("when broker check, client does not know this transaction state, {}", thisHeader); break; default: break; } String remark = null; if (exception != null) { remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception); } try { DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark, 3000); } catch (Exception e) { log.error("endTransactionOneway exception", e); } } }; this.checkExecutor.submit(request); }
上述代碼雖多,其實(shí)實(shí)現(xiàn)思路非常清晰,先使用一個(gè)匿名類(lèi)( Runnable )構(gòu)建一個(gè)運(yùn)行任務(wù),然后提交到checkExecutor線程池中執(zhí)行,這與我第一篇文章的猜測(cè)是吻合的,那重點(diǎn)分析一下該任務(wù)的允許邏輯,對(duì)應(yīng)在run方法中。
代碼@1:獲取消息發(fā)送者的TransactionListener。
代碼@2:執(zhí)行TransactionListener#checkLocalTransaction,檢測(cè)本地事務(wù)狀態(tài),也就是應(yīng)用程序需要實(shí)現(xiàn)TransactionListener#checkLocalTransaction,告知RocketMQ該事務(wù)的事務(wù)狀態(tài),然后返回COMMIT_MESSAGE、ROLLBACK_MESSAGE、UNKNOW中的一個(gè),然后向Broker發(fā)送END_TRANSACTION命令即可,
代碼@3:發(fā)送END_TRANSACTION到Broker,其具體實(shí)現(xiàn),已經(jīng)在
https://blog.csdn.net/prestigeding/article/details/81263833
中詳細(xì)講解過(guò),在此不重復(fù)分析。
到這里,事務(wù)消息狀態(tài)回查流程就講解完畢,接下來(lái)以一張流程圖結(jié)束本篇的講解。
以上是“RocketMQ中事務(wù)消息狀態(tài)回查的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對(duì)大家有幫助,更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)-成都網(wǎng)站建設(shè)公司行業(yè)資訊頻道!