真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網站制作重慶分公司

consumer數(shù)量變化會怎樣

本篇文章給大家分享的是有關consumer數(shù)量變化會怎樣,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

讓客戶滿意是我們工作的目標,不斷超越客戶的期望值來自于我們對這個行業(yè)的熱愛。我們立志把好的技術通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領域值得信任、有價值的長期合作伙伴,公司提供的服務項目有:域名申請、網頁空間、營銷軟件、網站建設、景縣網站維護、網站推廣。

consumer數(shù)量變化會怎樣

ConsumerManager
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
    ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
    final Set subList, boolean isNotifyConsumerIdsChangedEnable) {

    ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
    if (null == consumerGroupInfo) {
        ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
        ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
        consumerGroupInfo = prev != null ? prev : tmp;
    }

    boolean r1 =
        consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
            consumeFromWhere);
    boolean r2 = consumerGroupInfo.updateSubscription(subList);

    if (r1 || r2) {
        if (isNotifyConsumerIdsChangedEnable) {
            //通知同組內的其他consumer
            this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
        }
    }

    this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);

    return r1 || r2;
}

public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,
    boolean isNotifyConsumerIdsChangedEnable) {
    ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
    if (null != consumerGroupInfo) {
        consumerGroupInfo.unregisterChannel(clientChannelInfo);
        if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
            ConsumerGroupInfo remove = this.consumerTable.remove(group);
            if (remove != null) {
                log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group);

                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);
            }
        }
        if (isNotifyConsumerIdsChangedEnable) {
            //單向通知channel
            this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
        }
    }
}
DefaultConsumerIdsChangeListener
@Override
public void handle(ConsumerGroupEvent event, String group, Object... args) {
    case CHANGE:
        if (args == null || args.length < 1) {
            return;
        }
        List channels = (List) args[0];
        if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
            //對組內的其他consumer的channel連接發(fā)送單向通知(不管對方有木有收到)
            for (Channel chl : channels) {
                this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
            }
        }
        break;
}
Broker2Client
public void notifyConsumerIdsChanged(
    final Channel channel,
    final String consumerGroup) {
    if (null == consumerGroup) {
        log.error("notifyConsumerIdsChanged consumerGroup is null");
        return;
    }

    NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
    requestHeader.setConsumerGroup(consumerGroup);
    RemotingCommand request =
        RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);

    try {
        this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
    } catch (Exception e) {
        //發(fā)送異常,只是打印log
        log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage());
    }
}

通知channel是單向的,也就是不管對方有沒有答復,都認為發(fā)送成功了,這樣會有兩種情況發(fā)生:

  1. channel收到消息:收到消息后,channel會觸發(fā)rebalance,正常邏輯

  2. channel沒收到消息:該consumer不會觸發(fā)rebalance,存在問題!

    1. register:該consumer不知道已經有新的consumer加入,造成同一個mq會有多個consumer進行消費

    2. unregister:該consumer不知道有consumer下線,造成部分mq沒有consumer負責消費

我們先看unregister這種情況

在consumer啟動時,會同時啟動一個RebalanceService線程,這個線程做的事就是每隔20秒主動進行一次rebalance,這樣就能把unregister這種影響降低,最多導致該mq的消息會延遲20秒之后才有consumer負責消費

RebalanceService
private static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));
@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        this.waitForRunning(waitInterval);
        this.mqClientFactory.doRebalance();
    }

    log.info(this.getServiceName() + " service end");
}

接下來分析比較大條的Register

同一個mq在同一組內有不同的consumer消費,這種情況在clustering模式下是有大問題的,會造成重復消費,消費進度錯誤等問題,帶著rocketmq應該不至于犯如此低級錯誤的想法再繼續(xù)看代碼,果然別有洞天

RebalanceImpl
private void rebalanceByTopic(final String topic, final boolean isOrder) {
    //rebalance過程
    //關鍵點在這,在上面rebalance完之后, 就能知道自己該負責哪些mq的消費   
    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
}

private boolean updateProcessQueueTableInRebalance(final String topic, final Set mqSet, final boolean isOrder) {
    for (MessageQueue mq : mqSet) {
        //如果是新增的mq,會嘗試調用遠程broker lock mq,獲取鎖失敗,則說明有其他consumer獲取了鎖,自己應該放棄消費該mq
        if (!this.processQueueTable.containsKey(mq)) {
            if (isOrder && !this.lock(mq)) {
                log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                continue;
            }   
        }
    }
}

以上就是consumer數(shù)量變化會怎樣,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


文章名稱:consumer數(shù)量變化會怎樣
標題URL:http://weahome.cn/article/jigeph.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部