前面我也跟大家講述了 RocketMQ 讀寫分離的規(guī)則,但是你可能會問,主從服務(wù)器之間的消費(fèi)進(jìn)度是如何保持同步的?下面我來給大家解答一下。
成都創(chuàng)新互聯(lián)公司-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價比金城江網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫,直接使用。一站式金城江網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋金城江地區(qū)。費(fèi)用合理售后完善,十多年實(shí)體公司更值得信賴。如果消費(fèi)者消費(fèi)模式不同,也會有不同的保存方式,消費(fèi)者端的消息消費(fèi)進(jìn)度保存到 OffsetStore 中,他有兩個實(shí)現(xiàn)類:
org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore?//?本地消費(fèi)進(jìn)度保存實(shí)現(xiàn)org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore?//?遠(yuǎn)程消費(fèi)進(jìn)度保存實(shí)現(xiàn)
其中,如果是廣播模式消費(fèi),消息的消費(fèi)進(jìn)度是保存到本地,如果是集群消費(fèi)模式,消息的消費(fèi)進(jìn)度則是保存到 Broker,但無論是保存到本地,還是保存到 Broker,消費(fèi)者都會在本地留一份緩存,我們暫且看看集群消費(fèi)模式下,消息消費(fèi)進(jìn)度的緩存是如何保存的:http://m.qd8.com.cn/yiyao/xinxi21_3710012.html
org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#updateOffset:
public?void?updateOffset(MessageQueue?mq,?long?offset,?boolean?increaseOnly)?{??if?(mq?!=?null)?{ ????AtomicLong?offsetOld?=?this.offsetTable.get(mq);????if?(null?==?offsetOld)?{ ??????offsetOld?=?this.offsetTable.putIfAbsent(mq,?new?AtomicLong(offset)); ????}????if?(null?!=?offsetOld)?{??????if?(increaseOnly)?{ ????????MixAll.compareAndIncreaseOnly(offsetOld,?offset); ??????}?else?{ ????????offsetOld.set(offset); ??????} ????} ??} }
消息者在消費(fèi)完消息后,會調(diào)用以上方法,講消費(fèi)進(jìn)度放入 offsetTable 緩存中,當(dāng) Rebalance 負(fù)載重新分配生成 PullRequest 對象時,會調(diào)用 RemoteBrokerOffsetStore.readOffset 方法從 offsetTable 緩存中取出對應(yīng)的消費(fèi)進(jìn)度緩存值,再將該值放進(jìn) PullRequest 對象中,接下來消息拉取時就很將消息消費(fèi)進(jìn)度緩存發(fā)送到 Broker 端,所以我們繼續(xù)看 Broker 端的處理邏輯。
之前整理 Broker 啟動流程時,發(fā)現(xiàn) Broker 啟動時會開啟一個定時任務(wù):
org.apache.rocketmq.broker.BrokerController#initialize:
this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{????@Override ????public?void?run()?{????????try?{ ????????????BrokerController.this.slaveSynchronize.syncAll(); ????????}?catch?(Throwable?e)?{ ????????????log.error("ScheduledTask?syncAll?slave?exception",?e); ????????} ????} },?1000?*?10,?1000?*?60,?TimeUnit.MILLISECONDS);
如果 Broker 是從服務(wù)器,則會開啟以上定時任務(wù)。
org.apache.rocketmq.broker.slave.SlaveSynchronize#syncAll:
public?void?syncAll()?{??this.syncTopicConfig();??this.syncConsumerOffset();??this.syncDelayOffset();??this.syncSubscriptionGroupConfig(); }
在主服務(wù)器沒有宕機(jī)的情況下,從服務(wù)器會定時從主服務(wù)器中同步消息消費(fèi)進(jìn)度等信息,那現(xiàn)在問題來了,由于這個同步是單方面同步,即只會從服務(wù)器同步主服務(wù)器,那如果主服務(wù)器宕機(jī)了之后,消費(fèi)者切換成從服務(wù)器拉取消息進(jìn)行消費(fèi),如果之后主服務(wù)器啟動了,從服務(wù)器在把已經(jīng)消費(fèi)過的偏移量同步過來,那豈不是造成同步消費(fèi)了?
其實(shí)消費(fèi)者取在拉取消息的時候,如果消費(fèi)者的緩存中存在消費(fèi)進(jìn)度,也會向 Broker 更新消息消費(fèi)進(jìn)度,所以即使是主服務(wù)器掛了,在它重新啟動之后,消費(fèi)者的消費(fèi)進(jìn)度沒有丟失,依然會更新主服務(wù)器的消息消費(fèi)進(jìn)度,這樣一來,消費(fèi)端與主服務(wù)器只掛了器中一個,并不會導(dǎo)致消息重新被消費(fèi),具體代碼邏輯如下:
org.apache.rocketmq.broker.processor.PullMessageProcessor#proce***equest:
boolean?storeOffsetEnable?=?brokerAllowSuspend; storeOffsetEnable?=?storeOffsetEnable?&&?hasCommitOffsetFlag; storeOffsetEnable?=?storeOffsetEnable ????&&?this.brokerController.getMessageStoreConfig().getBrokerRole()?!=?BrokerRole.SLAVE;if?(storeOffsetEnable)?{?this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),?requestHeader.getConsumerGroup(),?requestHeader.getTopic(),?requestHeader.getQueueId(),?requestHeader.getCommitOffset()); }
其中 brokerAllowSuspend 表示 broker 是否允許掛起,該值默認(rèn)為 true,hasCommitOffsetFlag 表示息消費(fèi)者在內(nèi)存中是否緩存了消息消費(fèi)進(jìn)度,從代碼邏輯可看出,如果 Broker 為主服務(wù)器,并且 brokerAllowSuspend 和 hasCommitOffsetFlag 都為true,那么就會將消費(fèi)者消費(fèi)進(jìn)度更新到本地。焦作國醫(yī)胃腸醫(yī)院評價怎么樣:http://jz.lieju.com/zhuankeyiyuan/37325143.htm
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點(diǎn)與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。