這篇文章主要為大家展示了“rocketmq消費(fèi)負(fù)載均衡之push消費(fèi)的示例分析”,內(nèi)容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“rocketmq消費(fèi)負(fù)載均衡之push消費(fèi)的示例分析”這篇文章吧。
主要從事網(wǎng)頁設(shè)計(jì)、PC網(wǎng)站建設(shè)(電腦版網(wǎng)站建設(shè))、wap網(wǎng)站建設(shè)(手機(jī)版網(wǎng)站建設(shè))、響應(yīng)式網(wǎng)站開發(fā)、程序開發(fā)、微網(wǎng)站、成都微信小程序等,憑借多年來在互聯(lián)網(wǎng)的打拼,我們在互聯(lián)網(wǎng)網(wǎng)站建設(shè)行業(yè)積累了豐富的成都網(wǎng)站制作、成都做網(wǎng)站、網(wǎng)絡(luò)營銷經(jīng)驗(yàn),集策劃、開發(fā)、設(shè)計(jì)、營銷、管理等多方位專業(yè)化運(yùn)作于一體,具備承接不同規(guī)模與類型的建設(shè)項(xiàng)目的能力。
介紹之前首先拋出幾個問題:
1. 要做負(fù)載均衡,首先要解決的一個問題是什么?
2. 負(fù)載均衡是Client端處理還是Broker端處理?
個人理解:
1. 要做負(fù)載均衡,首先要做的就是信號收集。
所謂信號收集,就是得知道每一個consumerGroup有哪些consumer,對應(yīng)的topic是誰。信號收集分為Client端信號收集與Broker端信號收集兩個部分。
2. 負(fù)載均衡放在Client端處理。
具體做法是:消費(fèi)者客戶端在啟動時完善rebalanceImpl實(shí)例,同時拷貝訂閱信息存放rebalanceImpl實(shí)例對象中,另外也是很重要的一個步驟 -- 通過心跳消息,不停的上報(bào)自己到所有Broker,注冊RegisterConsumer,等待上述過程準(zhǔn)備好之后在Client端不斷執(zhí)行的負(fù)載均衡服務(wù)線程從Broker端獲取一份全局信息(該consumerGroup下所有的消費(fèi)Client),然后分配這些全局信息,獲取當(dāng)前客戶端分配到的消費(fèi)隊(duì)列。
本文具體的內(nèi)容:
I. copySubscription
Client端信號收集,拷貝訂閱信息。
在DefaultMQPushConsumerImpl.start()時,會將消費(fèi)者的topic訂閱關(guān)系設(shè)置到rebalanceImpl的SubscriptionInner的map中用于負(fù)載:
private void copySubscription() throws MQClientException { try { //注:一個consumer對象可以訂閱多個topic Mapsub = this.defaultMQPushConsumer.getSubscription(); if (sub != null) { for (final Map.Entry entry : sub.entrySet()) { final String topic = entry.getKey(); final String subString = entry.getValue(); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),// topic, subString); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); } } if (null == this.messageListenerInner) { this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener(); } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: break; case CLUSTERING: final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),// retryTopic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break; default: break; } } catch (Exception e) { throw new MQClientException("subscription exception", e); } }
FilterAPI.buildSubscriptionData接口將訂閱關(guān)系轉(zhuǎn)換為SubscriptionData 數(shù)據(jù),其中subString包含訂閱tag等信息。另外,如果該消費(fèi)者的消費(fèi)模式為集群消費(fèi),則會將retry的topic一并放到。
II. 完善rebalanceImpl實(shí)例
Client繼續(xù)收集信息:
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer .getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
本文以DefaultMQPushConsumerImpl為例,因此this對象類型為DefaultMQPushConsumerImp。
III. this.rebalanceService.start()
開啟負(fù)載均衡服務(wù)。this.rebalanceService是一個RebalanceService實(shí)例對象,它繼承與ServiceThread,是一個線程類。 this.rebalanceService.start()執(zhí)行時,也即執(zhí)行RebalanceService線程體:
@Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStoped()) { this.waitForRunning(WaitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); }
IV. this.mqClientFactory.doRebalance
客戶端遍歷消費(fèi)組table,對該客戶端上所有消費(fèi)者獨(dú)立進(jìn)行負(fù)載均衡,分發(fā)消費(fèi)隊(duì)列:
public void doRebalance() { for (String group : this.consumerTable.keySet()) { MQConsumerInner impl = this.consumerTable.get(group); if (impl != null) { try { impl.doRebalance(); } catch (Exception e) { log.error("doRebalance exception", e); } } } }
V. MQConsumerInner.doRebalance
由于本文以DefaultMQPushConsumerImpl消費(fèi)過程為例,即DefaultMQPushConsumerImpl.doRebalance:
@Override public void doRebalance() { if (this.rebalanceImpl != null) { this.rebalanceImpl.doRebalance(); } }
步驟II 中完善了rebalanceImpl實(shí)例,為調(diào)用rebalanceImpl.doRebalance()提供了初始數(shù)據(jù)。
rebalanceImpl.doRebalance()過程如下:
public void doRebalance() { // 前文copySubscription中初始化了SubscriptionInner MapsubTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry entry : subTable.entrySet()) { final String topic = entry.getKey(); try { this.rebalanceByTopic(topic); } catch (Exception e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } this.truncateMessageQueueNotMyTopic(); }
VI. rebalanceByTopic -- 核心步驟之一
rebalanceByTopic方法中根據(jù)消費(fèi)者的消費(fèi)類型為BROADCASTING或CLUSTERING做不同的邏輯處理。CLUSTERING邏輯包括BROADCASTING邏輯,本部分只介紹集群消費(fèi)負(fù)載均衡的邏輯。
集群消費(fèi)負(fù)載均衡邏輯主要代碼如下(省略了log等代碼):
//1.從topicSubscribeInfoTable列表中獲取與該topic相關(guān)的所有消息隊(duì)列 SetmqSet = this.topicSubscribeInfoTable.get(topic); //2. 從broker端獲取消費(fèi)該消費(fèi)組的所有客戶端clientId List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); f (null == mqSet) { ... } if (null == cidAll) { ... } if (mqSet != null && cidAll != null) { List mqAll = new ArrayList (); mqAll.addAll(mqSet); Collections.sort(mqAll); Collections.sort(cidAll); // 3.創(chuàng)建DefaultMQPushConsumer對象時默認(rèn)設(shè)置為AllocateMessageQueueAveragely AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List allocateResult = null; try { // 4.調(diào)用AllocateMessageQueueAveragely.allocate方法,獲取當(dāng)前client分配消費(fèi)隊(duì)列 allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { return; } // 5. 將分配得到的allocateResult 中的隊(duì)列放入allocateResultSet 集合 Set allocateResultSet = new HashSet (); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } 、 //6. 更新updateProcessQueue boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet); if (changed) { this.messageQueueChanged(topic, mqSet, allocateResultSet); } }
注:BROADCASTING邏輯只包含上述的1、6。
集群消費(fèi)負(fù)載均衡邏輯中的1、2、4這三個點(diǎn)相關(guān)知識為其核心過程,各個點(diǎn)相關(guān)知識如下:
第1點(diǎn):從topicSubscribeInfoTable列表中獲取與該topic相關(guān)的所有消息隊(duì)列
第2點(diǎn): 從broker端獲取消費(fèi)該消費(fèi)組的所有客戶端clientId
首先,消費(fèi)者對象不斷地向所有broker發(fā)送心跳包,上報(bào)自己,注冊并更新訂閱關(guān)系以及客戶端ChannelInfoTable;之后,客戶端在做消費(fèi)負(fù)載均衡時獲取那些消費(fèi)客戶端,對這些客戶端進(jìn)行負(fù)載均衡,分發(fā)消費(fèi)的隊(duì)列。具體過程如下圖所示:
第4點(diǎn):調(diào)用AllocateMessageQueueAveragely.allocate方法,獲取當(dāng)前client分配消費(fèi)隊(duì)列
注:上圖中cId1、cId2、...、cIdN通過 getConsumerIdListByGroup 獲取,它們在這個ConsumerGroup下所有在線客戶端列表中。
當(dāng)前消費(fèi)對進(jìn)行負(fù)載均衡策略后獲取對應(yīng)的消息消費(fèi)隊(duì)列。
以上是“rocketmq消費(fèi)負(fù)載均衡之push消費(fèi)的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!