這篇文章主要介紹了RocketMQ中如何實(shí)現(xiàn)producer消息發(fā)送,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
在阿合奇等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場(chǎng)前瞻性、產(chǎn)品創(chuàng)新能力,以專(zhuān)注、極致的服務(wù)理念,為客戶(hù)提供成都網(wǎng)站建設(shè)、成都做網(wǎng)站 網(wǎng)站設(shè)計(jì)制作定制網(wǎng)站建設(shè),公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),成都品牌網(wǎng)站建設(shè),全網(wǎng)整合營(yíng)銷(xiāo)推廣,成都外貿(mào)網(wǎng)站建設(shè)公司,阿合奇網(wǎng)站建設(shè)費(fèi)用合理。
RocketMQ中一個(gè)topic可以分布在多個(gè)broker上,每個(gè)broker上又可以包含多個(gè)message queue。每個(gè)message具體發(fā)送到哪個(gè)broker的哪個(gè)message queue中,這個(gè)決策過(guò)程是在client端完成的。client會(huì)從nameserver訂閱topic的路由信息,按照一定的負(fù)載均衡算法為每個(gè)message選擇出一個(gè)queue,并完成投遞。producer的消息投遞過(guò)程大體如下圖:
producer的投遞出錯(cuò)重試分為兩種情況,同步出錯(cuò)重試(sync)以及異步出錯(cuò)重試(async)。同步發(fā)送出錯(cuò)重試是在DefaultMQProducerImpl.sendDefaultImpl 中完成:
// SYNC模式則開(kāi)啟重試,ASYNC模式重試在MQClientAPIImpl中實(shí)現(xiàn) int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { // 循環(huán)重試最多timesTotal次 String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { // .... sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); // ... } catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); // ... exception = e; continue; // 出錯(cuò),進(jìn)入下一次重試 } catch (MQClientException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); // ... exception = e; continue; // 出錯(cuò),進(jìn)入下一次重試 } catch (MQBrokerException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); // ... exception = e; switch (e.getResponseCode()) { case ResponseCode.TOPIC_NOT_EXIST: case ResponseCode.SERVICE_NOT_AVAILABLE: case ResponseCode.SYSTEM_ERROR: case ResponseCode.NO_PERMISSION: case ResponseCode.NO_BUYER_ID: case ResponseCode.NOT_IN_CURRENT_UNIT: continue; // 出錯(cuò),進(jìn)入下一次重試 default: if (sendResult != null) { // 放棄重試 return sendResult; } throw e; } } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); // ... throw e; // 放棄重試 } } else { break; } }
異步發(fā)送的出錯(cuò)重試則是在更底層的MQClientAPIImpl.sendMessageAsync里實(shí)現(xiàn):
// 調(diào)用invokeAsync發(fā)送消息 this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { long cost = System.currentTimeMillis() - beginStartTime; RemotingCommand response = responseFuture.getResponseCommand(); // ... if (response != null) { try { SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response); assert sendResult != null; if (context != null) { context.setSendResult(sendResult); context.getProducer().executeSendMessageHookAfter(context); } try { sendCallback.onSuccess(sendResult); } catch (Throwable e) { } producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); } catch (Exception e) { producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); // 發(fā)送成功,processSendReponse異常重試 onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, e, context, false, producer); } } else { producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); if (!responseFuture.isSendRequestOK()) { MQClientException ex = new MQClientException("send request failed", responseFuture.getCause()); // 消息發(fā)送失敗,重試 onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } else if (responseFuture.isTimeout()) { MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", responseFuture.getCause()); // 發(fā)送超時(shí),重試 onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } else { MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause()); // 其他錯(cuò)誤,重試 onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } } } });
在異常處理的onExceptionImpl方法中會(huì)再次觸發(fā)sendMessageAsync方法:
private void onExceptionImpl(final String brokerName, final Message msg, final long timeoutMillis, final RemotingCommand request, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final MQClientInstance instance, final int timesTotal, final AtomicInteger curTimes, final Exception e, final SendMessageContext context, final boolean needRetry, final DefaultMQProducerImpl producer ) { int tmp = curTimes.incrementAndGet(); // 將已重試次數(shù)+1 if (needRetry && tmp <= timesTotal) { // 重試 String retryBrokerName = brokerName;//by default, it will send to the same broker if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName); retryBrokerName = mqChosen.getBrokerName(); } String addr = instance.findBrokerAddressInPublish(retryBrokerName); log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr, retryBrokerName); try { request.setOpaque(RemotingCommand.createNewRequestId()); sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, context, producer); } catch (InterruptedException e1) { onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer); } catch (RemotingConnectException e1) { producer.updateFaultItem(brokerName, 3000, true); onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, true, producer); } catch (RemotingTooMuchRequestException e1) { onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer); } catch (RemotingException e1) { producer.updateFaultItem(brokerName, 3000, true); onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, true, producer); } } else { // ... } }
消息的負(fù)載均衡是通過(guò)MQFaultStrategy.selectOneMessageQueue方法來(lái)實(shí)現(xiàn)的:
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { // 輪詢(xún)的方式選擇下一個(gè)queue int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); // 根據(jù)latency統(tǒng)計(jì)數(shù)據(jù)判斷是否可用 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { // 優(yōu)先選擇同一個(gè)broker上的queue if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); }
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“RocketMQ中如何實(shí)現(xiàn)producer消息發(fā)送”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來(lái)學(xué)習(xí)!