這篇文章主要介紹了RocketMQ中如何實(shí)現(xiàn)并行模式,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
為武川等地區(qū)用戶提供了全套網(wǎng)頁設(shè)計(jì)制作服務(wù),及武川網(wǎng)站建設(shè)行業(yè)解決方案。主營(yíng)業(yè)務(wù)為網(wǎng)站設(shè)計(jì)、成都網(wǎng)站制作、武川網(wǎng)站設(shè)計(jì),以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠(chéng)的服務(wù)。我們深信只要達(dá)到每一位用戶的要求,就會(huì)得到認(rèn)可,從而選擇與我們長(zhǎng)期合作。這樣,我們也可以走得更遠(yuǎn)!
DefaultMQPushConsumerImpl.pullMessage中的PullCallback在接收到拉取的message之后,會(huì)調(diào)用ConsumeMessageService.submitConsumeRequest方法將消息“推”給listener來執(zhí)行業(yè)務(wù)處理。RocketMQ支持并行和順序兩種消費(fèi)模式,本文主要講解并行模式ConsumeMessageConcurrentlyService的實(shí)現(xiàn)。該類包含一下關(guān)鍵屬性:
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService { // ... private final MessageListenerConcurrently messageListener; // 業(yè)務(wù)處理回掉 private final BlockingQueueconsumeRequestQueue; // consumerExecutor的并發(fā)隊(duì)列 private final ThreadPoolExecutor consumeExecutor; private final String consumerGroup; // ... public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerConcurrently messageListener) { this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.messageListener = messageListener; this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); this.consumeRequestQueue = new LinkedBlockingQueue (); this.consumeExecutor = new ThreadPoolExecutor( this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_")); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_")); } //... }
ConsumeMessageConcurrentlyService.submitConsumeRequest會(huì)將拉取到的message列表按配置的分批策略做分割,提交執(zhí)行:
public void submitConsumeRequest( final Listmsgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) { final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); // 配置的批次大小 if (msgs.size() <= consumeBatchSize) { // 拉取的消息列表長(zhǎng)度小于配置的批次大小,一次性提交處理 ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this.submitConsumeRequestLater(consumeRequest); } } else { // 否則,將message分割后提交 for (int total = 0; total < msgs.size(); ) { List msgThis = new ArrayList (consumeBatchSize); for (int i = 0; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { msgThis.add(msgs.get(total)); } else { break; } } ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); } this.submitConsumeRequestLater(consumeRequest); } } } }
實(shí)際處理邏輯在ConsumeRequest.run方法中
// ConsumeRequest public void run() { if (this.processQueue.isDropped()) { log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); return; } MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); ConsumeMessageContext consumeMessageContext = null; // .... long beginTimestamp = System.currentTimeMillis(); boolean hasException = false; ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; try { if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); } } // 1. 調(diào)用listener status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); hasException = true; } long consumeRT = System.currentTimeMillis() - beginTimestamp; // ... if (!processQueue.isDropped()) { // 2. 處理業(yè)務(wù)調(diào)用結(jié)果 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { // ... } } }
processConsumeResult將處理消費(fèi)結(jié)果:
public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ) { int ackIndex = context.getAckIndex(); // 消費(fèi)成功的index if (consumeRequest.getMsgs().isEmpty()) return; // ... // 1. 消費(fèi)失敗msg處理 switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { // 廣播模式,僅打印消費(fèi)失敗的msg MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING: // 集群模式,首先嘗試將消費(fèi)失敗的msg發(fā)回到broker,若失敗則在本地嘗試reconsume ListmsgBackFailed = new ArrayList (consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; } // 2. 更新offset store long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“RocketMQ中如何實(shí)現(xiàn)并行模式”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來學(xué)習(xí)!