真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

javarocketmq中與消息發(fā)送緊密相關(guān)的幾行代碼以及用法

這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)碛嘘P(guān)java rocketmq中與消息發(fā)送緊密相關(guān)的幾行代碼以及用法,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

專注于為中小企業(yè)提供成都網(wǎng)站設(shè)計(jì)、網(wǎng)站制作服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)永修免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動(dòng)了成百上千企業(yè)的穩(wěn)健成長(zhǎng),幫助中小企業(yè)通過網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。

前言

與消息發(fā)送緊密相關(guān)的幾行代碼:

1. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

2. producer.start();

3. Message msg = new Message(...)

4. SendResult sendResult = producer.send(msg);

5. producer.shutdown();

那這幾行代碼執(zhí)行時(shí),背后都做了什么?

一. 首先是DefaultMQProducer.start

@Overridepublic void start() throws MQClientException {this.defaultMQProducerImpl.start();}

調(diào)用了默認(rèn)生成消息的實(shí)現(xiàn)類 -- DefaultMQProducerImpl

調(diào)用defaultMQProducerImpl.start()方法,DefaultMQProducerImpl.start()會(huì)初始化得到MQClientInstance實(shí)例對(duì)象,MQClientInstance實(shí)例對(duì)象調(diào)用它自己的start方法會(huì) ,啟動(dòng)一些服務(wù),如拉去消息服務(wù)PullMessageService.Start()、啟動(dòng)負(fù)載平衡服務(wù)RebalanceService.Start(),比如網(wǎng)絡(luò)通信服務(wù)MQClientAPIImpl.Start()

另外,還會(huì)執(zhí)行與生產(chǎn)消息相關(guān)的信息,如注冊(cè)produceGroup、new一個(gè)TopicPublishInfo對(duì)象并以默認(rèn)TopicKey為鍵值,構(gòu)成鍵值對(duì)存入DefaultMQProducerImpl的topicPublishInfoTable中。

efaultMQProducerImpl.start()后,獲取的MQClientInstance實(shí)例對(duì)象會(huì)調(diào)用sendHeartbeatToAllBroker()方法,不斷向broker發(fā)送心跳包,yin'b可以使用下面一幅圖大致描述DefaultMQProducerImpl.start()過程:

上圖中的三個(gè)部分中涉及的內(nèi)容:

1.1 初始化MQClientInstance

一個(gè)客戶端只能產(chǎn)生一個(gè)MQClientInstance實(shí)例對(duì)象,產(chǎn)生方式使用了工廠模式與單例模式。MQClientInstance.start()方法啟動(dòng)一些服務(wù),源碼如下:

public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// Start request-response channelthis.mQClientAPIImpl.start();// Start various schedule tasksthis.startScheduledTask();// Start pull servicethis.pullMessageService.start();// Start rebalance servicethis.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case RUNNING:break;case SHUTDOWN_ALREADY:break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}}

1.2 注冊(cè)producer

該過程會(huì)將這個(gè)當(dāng)前producer對(duì)象注冊(cè)到MQClientInstance實(shí)例對(duì)象的的producerTable中。一個(gè)jvm(一個(gè)客戶端)中一個(gè)producerGroup只能有一個(gè)實(shí)例,MQClientInstance操作producerTable大概有如下幾個(gè)方法:

-- selectProducer  -- updateTopicRouteInfoFromNameServer  -- prepareHeartbeatData  -- isNeedUpdateTopicRouteInfo  -- shutdown

注:

根據(jù)不同的clientId,MQClientManager將給出不同的MQClientInstance;

根據(jù)不同的group,MQClientInstance將給出不同的MQProducer和MQConsumer

1.3 向路由信息表中添加路由

topicPublishInfoTable定義:

public class DefaultMQProducerImpl implements MQProducerInner {private final Logger log = ClientLogger.getLog();private final Random random = new Random();private final DefaultMQProducer defaultMQProducer;private final ConcurrentMap topicPublishInfoTable = new ConcurrentHashMap();

它是一個(gè)以topic為key的Map型數(shù)據(jù)結(jié)構(gòu),DefaultMQProducerImpl.start()時(shí)會(huì)默認(rèn)創(chuàng)建一個(gè)key=MixAll.DEFAULT_TOPIC的TopicPublishInfo存放到topicPublishInfoTable中。

1.4 發(fā)送心跳包

MQClientInstance向broker發(fā)送心跳包時(shí),調(diào)用sendHeartbeatToAllBroker( ),以及從MQClientInstance實(shí)例對(duì)象的brokerAddrTable中拿到所有broker地址,向這些broker發(fā)送心跳包。

sendHeartbeatToAllBroker會(huì)涉及到prepareHeartbeatData()方法,該方法會(huì)生成heartbeatData數(shù)據(jù),發(fā)送心跳包時(shí),heartbeatData作為心跳包的body。與producer相關(guān)的部分代碼如下:

// Producerfor (Map.Entry entry : this.producerTable.entrySet()) {MQProducerInner impl = entry.getValue();if (impl != null) {ProducerData producerData = new ProducerData();producerData.setGroupName(entry.getKey());heartbeatData.getProducerDataSet().add(producerData);}

二、. SendResult sendResult = producer.send(msg)

首先會(huì)調(diào)用DefaultMQProducer.send(msg) ,繼而調(diào)用sendDefaultImpl:

public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);}

sendDefaultImpl做了啥?

2.1. 獲取topicPublishInfo

根據(jù)msg的topic從topicPublishInfoTable獲取對(duì)應(yīng)的topicPublishInfo,如果沒有則更新路由信息,從nameserver端拉取最新路由信息。從nameserver端拉取最新路由信息大致為:

首先getTopicRouteInfoFromNameServer,然后topicRouteData2TopicPublishInfo。

2.2 選擇消息發(fā)送的隊(duì)列

普通消息:默認(rèn)方式下,selectOneMessageQueue從topicPublishInfo中的messageQueueList中選擇一個(gè)隊(duì)列(MessageQueue)進(jìn)行發(fā)送消息,默認(rèn)采用長(zhǎng)輪詢的方式選擇隊(duì)列 。

它的機(jī)制如下:正常情況下,順序選擇queue進(jìn)行發(fā)送;如果某一個(gè)節(jié)點(diǎn)發(fā)生了超時(shí),則下次選擇queue時(shí),跳過相同的broker。不同的隊(duì)列選擇策略形成了生產(chǎn)消息的幾種模式,如順序消息,事務(wù)消息。

順序消息:將一組需要有序消費(fèi)的消息發(fā)往同一個(gè)broker的同一個(gè)隊(duì)列上即可實(shí)現(xiàn)順序消息,假設(shè)相同訂單號(hào)的支付,退款需要放到同一個(gè)隊(duì)列,那么就可以在send的時(shí)候,自己實(shí)現(xiàn)MessageQueueSelector,根據(jù)參數(shù)arg字段來選擇queue。

private SendResult sendSelectImpl(Message msg,MessageQueueSelector selector,Object arg,final CommunicationMode communicationMode,final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 。。。}

事務(wù)消息:只有在消息發(fā)送成功,并且本地操作執(zhí)行成功時(shí),才發(fā)送提交事務(wù)消息,做事務(wù)提交,消息發(fā)送失敗,直接發(fā)送回滾消息,進(jìn)行回滾,具體如何實(shí)現(xiàn)后面會(huì)單獨(dú)成文分析。

2.3 封裝消息體通信包,發(fā)送數(shù)據(jù)包

首先,根據(jù)獲取的MessageQueue中的getBrokerName,調(diào)用findBrokerAddressInPublish得到該消息存放對(duì)應(yīng)的broker地址,如果沒有找到則跟新路由信息,重新獲取地址 :

brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID)

可知獲取的broker均為master(id=0)

然后, 將與該消息相關(guān)信息打包成RemotingCommand數(shù)據(jù)包,其RequestCode.SEND_MESSAGE

根據(jù)獲取的broke地址,將數(shù)據(jù)包到對(duì)應(yīng)的broker,默認(rèn)是發(fā)送超時(shí)時(shí)間為3s。

封裝消息請(qǐng)求包的包頭:

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);

發(fā)送消息包(普通消息默認(rèn)為同步方式):

SendResult sendResult = null;switch (communicationMode) {   case SYNC:  sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(  brokerAddr,  mq.getBrokerName(),   msg,  requestHeader,   timeout,  communicationMode,  context,  this);break;

處理來自broker端的響應(yīng)數(shù)據(jù)包:

private SendResult sendMessageSync(final String addr,final String brokerName,final Message msg,final long timeoutMillis,final RemotingCommand request) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);assert response != null;return this.processSendResponse(brokerName, msg, response);}

broker端處理request數(shù)據(jù)包后會(huì)將消息存儲(chǔ)到commitLog.

上述就是小編為大家分享的java rocketmq中與消息發(fā)送緊密相關(guān)的幾行代碼以及用法了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


當(dāng)前標(biāo)題:javarocketmq中與消息發(fā)送緊密相關(guān)的幾行代碼以及用法
分享URL:http://weahome.cn/article/goipji.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部