這篇文章將為大家詳細(xì)講解有關(guān)如何解決RocketMQ主從同步若干問題,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。
創(chuàng)新互聯(lián)專注于企業(yè)成都全網(wǎng)營(yíng)銷、網(wǎng)站重做改版、永修網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、成都h5網(wǎng)站建設(shè)、商城建設(shè)、集團(tuán)公司官網(wǎng)建設(shè)、外貿(mào)營(yíng)銷網(wǎng)站建設(shè)、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁設(shè)計(jì)等建站業(yè)務(wù),價(jià)格優(yōu)惠性價(jià)比高,為永修等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。
主從同步基本實(shí)現(xiàn)過程如下圖所示:
RocketMQ 的主從同步機(jī)制如下:
首先啟動(dòng)Master并在指定端口監(jiān)聽;
客戶端啟動(dòng),主動(dòng)連接Master,建立TCP連接;
客戶端以每隔5s的間隔時(shí)間向服務(wù)端拉取消息,如果是第一次拉取的話,先獲取本地commitlog文件中最大的偏移量,以該偏移量向服務(wù)端拉取消息;
服務(wù)端解析請(qǐng)求,并返回一批數(shù)據(jù)給客戶端;
客戶端收到一批消息后,將消息寫入本地commitlog文件中,然后向Master匯報(bào)拉取進(jìn)度,并更新下一次待拉取偏移量;
然后重復(fù)第3步;
RocketMQ主從同步一個(gè)重要的特征:主從同步不具備主從切換功能,即當(dāng)主節(jié)點(diǎn)宕機(jī)后,從不會(huì)接管消息發(fā)送,但可以提供消息讀取。
> 溫馨提示:本文并不會(huì)詳細(xì)分析RocketMQ主從同步的實(shí)現(xiàn)細(xì)節(jié),如大家對(duì)其感興趣,可以查閱筆者所著的《RocketMQ技術(shù)內(nèi)幕》或查看筆者博文:RocketMQ主從同步詳解
主,從服務(wù)器都在運(yùn)行過程中,消息消費(fèi)者是從主拉取消息還是從從拉取?
RocketMQ主從同步架構(gòu)中,如果主服務(wù)器宕機(jī),從服務(wù)器會(huì)接管消息消費(fèi),此時(shí)消息消費(fèi)進(jìn)度如何保持,當(dāng)主服務(wù)器恢復(fù)后,消息消費(fèi)者是從主拉取消息還是從從服務(wù)器拉取,主從服務(wù)器之間的消息消費(fèi)進(jìn)度如何同步?
接下來帶著上述問題,一起來探究其實(shí)現(xiàn)原理。
RocketMQ的主從同步,在默認(rèn)情況下RocketMQ會(huì)優(yōu)先選擇從主服務(wù)器進(jìn)行拉取消息,并不是通常意義的上的讀寫分離,那什么時(shí)候會(huì)從拉取呢?
> 溫馨提示:本節(jié)同樣不會(huì)詳細(xì)整個(gè)流程,只會(huì)點(diǎn)出其關(guān)鍵點(diǎn),如果想詳細(xì)了解消息拉取、消息消費(fèi)等核心流程,建議大家查閱筆者所著的《RocketMQ技術(shù)內(nèi)幕》。
在RocketMQ中判斷是從主拉取,還是從從拉取的核心代碼如下: DefaultMessageStore#getMessage
long diff = maxOffsetPy - maxPhyOffsetPulling; // [@1](https://my.oschina.net/u/1198) long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); // @2 getResult.setSuggestPullingFromSlave(diff > memory); // [@3](https://my.oschina.net/u/2648711)
代碼@1:首先介紹一下幾個(gè)局部變量的含義:
maxOffsetPy 當(dāng)前最大的物理偏移量。返回的偏移量為已存入到操作系統(tǒng)的PageCache中的內(nèi)容。
maxPhyOffsetPulling 本次消息拉取最大物理偏移量,按照消息順序拉取的基本原則,可以基本預(yù)測(cè)下次開始拉取的物理偏移量將大于該值,并且就在其附近。
diff maxOffsetPy與maxPhyOffsetPulling之間的間隔,getMessage通常用于消息消費(fèi)時(shí),即這個(gè)間隔可以理解為目前未處理的消息總大小。
代碼@2:獲取RocketMQ消息存儲(chǔ)在PageCache中的總大小,如果當(dāng)RocketMQ容量超過該闊值,將會(huì)將被置換出內(nèi)存,如果要訪問不在PageCache中的消息,則需要從磁盤讀取。
StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE 返回當(dāng)前系統(tǒng)的總物理內(nèi)存。參數(shù)
accessMessageInMemoryMaxRatio 設(shè)置消息存儲(chǔ)在內(nèi)存中的閥值,默認(rèn)為40。 結(jié)合代碼@2這兩個(gè)參數(shù)的含義,算出RocketMQ消息能映射到內(nèi)存中最大值為40% * (機(jī)器物理內(nèi)存)。
代碼@3:設(shè)置下次拉起是否從從拉取標(biāo)記,觸發(fā)下次從從服務(wù)器拉取的條件為:當(dāng)前所有可用消息數(shù)據(jù)(所有commitlog)文件的大小已經(jīng)超過了其闊值,默認(rèn)為物理內(nèi)存的40%。
那GetResult的suggestPullingFromSlave屬性在哪里使用呢?
PullMessageProcessor#processRequest
if (getMessageResult.isSuggestPullingFromSlave()) { // @1 responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); } else { responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) { // @2 case ASYNC_MASTER: case SYNC_MASTER: break; case SLAVE: if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) { response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } break; } if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) { // @3 // consume too slow ,redirect to another machine if (getMessageResult.isSuggestPullingFromSlave()) { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); } // consume ok else { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); } } else { responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); }
代碼@1:如果從commitlog文件查找消息時(shí),發(fā)現(xiàn)消息堆積太多,默認(rèn)超過物理內(nèi)存的40%后,會(huì)建議從從服務(wù)器讀取。
代碼@2:如果當(dāng)前服務(wù)器的角色為從服務(wù)器:并且slaveReadEnable=true,則忽略代碼@1設(shè)置的值,下次拉取切換為從主拉取。
代碼@3:如果slaveReadEnable=true(從允許讀),并且建議從從服務(wù)器讀取,則從消息消費(fèi)組建議當(dāng)消息消費(fèi)緩慢時(shí)建議的拉取brokerId,由訂閱組配置屬性whichBrokerWhenConsumeSlowly決定;如果消息消費(fèi)速度正常,則使用訂閱組建議的brokerId拉取消息進(jìn)行消費(fèi),默認(rèn)為主服務(wù)器。如果不允許從可讀,則固定使用從主拉取。
> 溫馨提示:請(qǐng)注意broker服務(wù)參數(shù)slaveReadEnable,與訂閱組配置信息:whichBrokerWhenConsumeSlowly、brokerId的值,在生產(chǎn)環(huán)境中,可以通過updateSubGroup命令動(dòng)態(tài)改變訂閱組的配置信息。
如果訂閱組的配置保持默認(rèn)值的話,拉取消息請(qǐng)求發(fā)送到從服務(wù)器后,下一次消息拉取,無論是否開啟slaveReadEnable,下一次拉取,還是會(huì)發(fā)往主服務(wù)器。
上面的步驟,在消息拉取命令的返回字段中,會(huì)將下次建議拉取Broker返回給客戶端,根據(jù)其值從指定的broker拉取。
消息拉取實(shí)現(xiàn)PullAPIWrapper在處理拉取結(jié)果時(shí)會(huì)將服務(wù)端建議的brokerId更新到broker拉取緩存表中。
在發(fā)起拉取請(qǐng)求之前,首先根據(jù)如下代碼,選擇待拉取消息的Broker。
從上面內(nèi)容可知,主從同步引入的主要目的就是消息堆積的內(nèi)容默認(rèn)超過物理內(nèi)存的40%,則消息讀取則由從服務(wù)器來接管,實(shí)現(xiàn)消息的讀寫分離,避免主服務(wù)IO抖動(dòng)嚴(yán)重。那問題來了,主服務(wù)器宕機(jī)后,從服務(wù)器接管消息消費(fèi)后,那消息消費(fèi)進(jìn)度存儲(chǔ)在哪里?當(dāng)主服務(wù)器恢復(fù)正常后,消息是從主服務(wù)器拉取還是從從服務(wù)器拉?。恐鞣?wù)器如何得知最新的消息消費(fèi)進(jìn)度呢?
RocketMQ消息消費(fèi)進(jìn)度管理(集群模式): 集群模式下消息消費(fèi)進(jìn)度存儲(chǔ)文件位于服務(wù)端${ROCKETMQ_HOME}/store/config/consumerOffset.json。消息消費(fèi)者從服務(wù)器拉取一批消息后提交到消費(fèi)組特定的線程池中處理消息,當(dāng)消息消費(fèi)成功后會(huì)向Broker發(fā)送ACK消息,告知消費(fèi)端已成功消費(fèi)到哪條消息,Broker收到消息消費(fèi)進(jìn)度反饋后,首先存儲(chǔ)在內(nèi)存中,然后定時(shí)持久化到consumeOffset.json文件中。備注:關(guān)于消息消費(fèi)進(jìn)度管理更多的實(shí)現(xiàn)細(xì)節(jié),建議查閱筆者所著的《RocketMQ技術(shù)內(nèi)幕》。
我們先看一下客戶端向服務(wù)端反饋消息消費(fèi)進(jìn)度時(shí)如何選擇Broker。 因?yàn)橹鞣?wù)的brokerId為0,默認(rèn)情況下當(dāng)主服務(wù)器存活的時(shí)候,優(yōu)先會(huì)選擇主服務(wù)器,只有當(dāng)主服務(wù)器宕機(jī)的情況下,才會(huì)選擇從服務(wù)器。
既然集群模式下消息消費(fèi)進(jìn)度存儲(chǔ)在Broker端,當(dāng)主服務(wù)器正常時(shí),消息消費(fèi)進(jìn)度文件存儲(chǔ)在主服務(wù)器,那提出如下兩個(gè)問題: 1)消息消費(fèi)端在主服務(wù)器存活的情況下,會(huì)優(yōu)先向主服務(wù)器反饋消息消費(fèi)進(jìn)度,那從服務(wù)器是如何同步消息消費(fèi)進(jìn)度的。 2)當(dāng)主服務(wù)器宕機(jī)后則消息消費(fèi)端會(huì)向從服務(wù)器反饋消息消費(fèi)進(jìn)度,此時(shí)消息消費(fèi)進(jìn)度如何存儲(chǔ),當(dāng)主服務(wù)器恢復(fù)正常后,主服務(wù)器如何得知最新的消息消費(fèi)進(jìn)度。
為了解開上述兩個(gè)疑問,我們優(yōu)先來看一下Broker服務(wù)器在收到提交消息消費(fèi)進(jìn)度反饋命令后的處理邏輯:
客戶端定時(shí)向Broker端發(fā)送更新消息消費(fèi)進(jìn)度的請(qǐng)求,其入口為:RemoteBrokerOffsetStore#updateConsumeOffsetToBroker,該方法中一個(gè)非常關(guān)鍵的點(diǎn)是:選擇broker的邏輯,如下所示:
如果主服務(wù)器存活,則選擇主服務(wù)器,如果主服務(wù)器宕機(jī),則選擇從服務(wù)器。也就是說,不管消息是從主服務(wù)器拉取的還是從從服務(wù)器拉取的,提交消息消費(fèi)進(jìn)度請(qǐng)求,優(yōu)先選擇主服務(wù)器。服務(wù)端就是接收其偏移量,更新到服務(wù)端的內(nèi)存中,然后定時(shí)持久化到${ROCKETMQ_HOME}/store/config/consumerOffset.json。
經(jīng)過上面的分析,我們來討論一下這個(gè)場(chǎng)景: 消息消費(fèi)者首先從主服務(wù)器拉取消息,并向其提交消息消費(fèi)進(jìn)度,如果當(dāng)主服務(wù)器宕機(jī)后,從服務(wù)器會(huì)接管消息拉取服務(wù),此時(shí)消息消費(fèi)進(jìn)度存儲(chǔ)在從服務(wù)器,主從服務(wù)器的消息消費(fèi)進(jìn)度會(huì)出現(xiàn)不一致?那當(dāng)主服務(wù)器恢復(fù)正常后,兩者之間的消息消費(fèi)進(jìn)度如何同步?
如果Broker角色為從服務(wù)器,會(huì)通過定時(shí)任務(wù)調(diào)用syncAll,從主服務(wù)器定時(shí)同步topic路由信息、消息消費(fèi)進(jìn)度、延遲隊(duì)列處理進(jìn)度、消費(fèi)組訂閱信息。
那問題來了,如果主服務(wù)器啟動(dòng)后,從服務(wù)器馬上從主服務(wù)器同步消息消息進(jìn)度,那豈不是又要重新消費(fèi)?
其實(shí)在絕大部分情況下,就算從服務(wù)從主服務(wù)器同步了很久之前的消費(fèi)進(jìn)度,只要消息者沒有重新啟動(dòng),就不需要重新消費(fèi),在這種情況下,RocketMQ提供了兩種機(jī)制來確保不丟失消息消費(fèi)進(jìn)度。
第一種,消息消費(fèi)者在內(nèi)存中存在最新的消息消費(fèi)進(jìn)度,繼續(xù)以該進(jìn)度去服務(wù)器拉取消息后,消息處理完后,會(huì)定時(shí)向Broker服務(wù)器反饋消息消費(fèi)進(jìn)度,在上面也提到過,在反饋消息消費(fèi)進(jìn)度時(shí),會(huì)優(yōu)先選擇主服務(wù)器,此時(shí)主服務(wù)器的消息消費(fèi)進(jìn)度就立馬更新了,從服務(wù)器此時(shí)只需定時(shí)同步主服務(wù)器的消息消費(fèi)進(jìn)度即可。
第二種是,消息消費(fèi)者在向主服務(wù)器拉取消息時(shí),如果是是主服務(wù)器,在處理消息拉取時(shí),也會(huì)更新消息消費(fèi)進(jìn)度。
主服務(wù)器在處理消息拉取命令時(shí),會(huì)觸發(fā)消息消費(fèi)進(jìn)度的更新,其代碼入口為:PullMessageProcessor#processRequest
boolean storeOffsetEnable = brokerAllowSuspend; // @1 storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag; storeOffsetEnable = storeOffsetEnable && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE; // @2 if (storeOffsetEnable) { this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); }
代碼@1:首先介紹幾個(gè)局部變量的含義:
brokerAllowSuspend:broker是否允許掛起,在消息拉取時(shí),該值默認(rèn)為true。
hasCommitOffsetFlag:消息消費(fèi)者在內(nèi)存中是否緩存了消息消費(fèi)進(jìn)度,如果緩存了,該標(biāo)記設(shè)置為true。 如果Broker的角色為主服務(wù)器,并且上面兩個(gè)變量都為true,則首先使用commitOffset更新消息消費(fèi)進(jìn)度。
看到這里,主從同步消息消費(fèi)進(jìn)度的相關(guān)問題,應(yīng)該就有了答案了。
上述實(shí)現(xiàn)原理的講解有點(diǎn)枯燥無味,我們先來回答如下幾個(gè)問題:
1、主,從服務(wù)器都在運(yùn)行過程中,消息消費(fèi)者是從主拉取消息還是從從拉??? 答:默認(rèn)情況下,RocketMQ消息消費(fèi)者從主服務(wù)器拉取,當(dāng)主服務(wù)器積壓的消息超過了物理內(nèi)存的40%,則建議從從服務(wù)器拉取。但如果slaveReadEnable為false,表示從服務(wù)器不可讀,從服務(wù)器也不會(huì)接管消息拉取。
2、當(dāng)消息消費(fèi)者向從服務(wù)器拉取消息后,會(huì)一直從從服務(wù)器拉??? 答:不是的。分如下情況: 1)如果從服務(wù)器的slaveReadEnable設(shè)置為false,則下次拉取,從主服務(wù)器拉取。 2)如果從服務(wù)器允許讀取并且從服務(wù)器積壓的消息未超過其物理內(nèi)存的30%,下次拉取使用的Broker為訂閱組的brokerId指定的Broker服務(wù)器,該值默認(rèn)為0,代表主服務(wù)器。 3)如果從服務(wù)器允許讀取并且從服務(wù)器積壓的消息超過了其物理內(nèi)存的30%,下次拉取使用的Broker為訂閱組的whichBrokerWhenConsumeSlowly指定的Broker服務(wù)器,該值默認(rèn)為1,代表從服務(wù)器。
3、主從服務(wù)消息消費(fèi)進(jìn)是如何同步的? 答:消息消費(fèi)進(jìn)度的同步時(shí)單向的,從服務(wù)器開啟一個(gè)定時(shí)任務(wù),定時(shí)從主服務(wù)器同步消息消費(fèi)進(jìn)度;無論消息消費(fèi)者是從主服務(wù)器拉的消息還是從從服務(wù)器拉取的消息,在向Broker反饋消息消費(fèi)進(jìn)度時(shí),優(yōu)先向主服務(wù)器匯報(bào);消息消費(fèi)者向主服務(wù)器拉取消息時(shí),如果消息消費(fèi)者內(nèi)存中存在消息消費(fèi)進(jìn)度時(shí),主會(huì)嘗試跟新消息消費(fèi)進(jìn)度。
讀寫分離的正確使用姿勢(shì): 1、主從Broker服務(wù)器的slaveReadEnable設(shè)置為true。 2、通過updateSubGroup命令更新消息組whichBrokerWhenConsumeSlowly、brokerId,特別是其brokerId不要設(shè)置為0,不然從從服務(wù)器拉取一次后,下一次拉取就會(huì)從主去拉取。
關(guān)于如何解決RocketMQ主從同步若干問題就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。