Message Queue Selector如何實現(xiàn)順序消費,相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。
讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來自于我們對這個行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價值的長期合作伙伴,公司提供的服務(wù)項目有:主機(jī)域名、網(wǎng)頁空間、營銷軟件、網(wǎng)站建設(shè)、三水網(wǎng)站維護(hù)、網(wǎng)站推廣。
順序消息是指消息的消費順序和生產(chǎn)順序相同,在某些場景下,必須保證順序消息。比如訂單的生成、付款、發(fā)貨.順序消息又分為全局順序消息和部分順序消息,全局順序消息指某一個topic下的所有消息都要保證順序;部分順序消息只要保證某一組消息被順序消費。對于訂單消息來說,只要保證同一個訂單ID的生成、付款、發(fā)貨消息按照順序消費即可。
1. 發(fā)送端:保證相同訂單ID的各種消息發(fā)往同一個MessageQueue(同一個Topic下的某一個queue)
2.消費端:保證同一個MessageQueue里面的消息不被并發(fā)處理 (同一個Topic的不同MessageQueue是可以同時消費的)
DefaultMQProducer producer = new DefaultMQProducer("local-test-producer"); producer.setNamesrvAddr("10.76.0.38:9876"); producer.start(); for (int i = 0; i < 1000; i++) { Order order = new Order(); order.orderId = i; order.status = "生成"; Message msg1 = new Message("local-test-producer", "TagA", JsonUtils.toJson(order).getBytes() ); SendResult sendResult1 = producer.send(msg1, new MessageQueueSelector() { @Override public MessageQueue select(Listmqs, Message msg, Object arg) { return null; } }, order.orderId); log.info("sendResult1={}",sendResult1); Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); order.status="付款"; Message msg2 = new Message("local-test-producer", "TagA", JsonUtils.toJson(order).getBytes() ); SendResult sendResult2 = producer.send(msg2, new MessageQueueSelector() { @Override public MessageQueue select(List mqs, Message msg, Object arg) { return null; } }, order.orderId); log.info("sendResult2={}",sendResult2); Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); order.status="發(fā)貨"; Message msg3 = new Message("local-test-producer", "TagA", JsonUtils.toJson(order).getBytes() ); producer.send(msg2, new MessageQueueSelector() { @Override public MessageQueue select(List mqs, Message msg, Object arg) { return null; } }, order.orderId); Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); SendResult sendResult3 = producer.send(msg3, new MessageQueueSelector() { @Override public MessageQueue select(List mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } //MessageQueueSelector保證同一個orderId的消息都存儲在同一個MessageQueue。 }, order.orderId); log.info("sendResult3={}",sendResult1); }
消費端主要邏輯如下,主要MessageListenerOrderly回調(diào)實現(xiàn)同一個MessageQueue里面的消息不會被并發(fā)消費:
//同一個MessageQueue里面的消息要順序消費,不能并發(fā)消費。 //但是同一個Topic的不同MessageQueue是可以同時消費的 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("local-test-consumer2"); consumer.setNamesrvAddr("10.76.0.38:9876"); consumer.subscribe("test", ""); consumer.setPullBatchSize(1); consumer.setConsumeThreadMin(1); consumer.setConsumeThreadMax(1); // consumer.registerMessageListener(new MessageListenerConcurrently() { consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) { List messages = new ArrayList<>(); for (MessageExt msg : msgs) { messages.add(new String(msg.getBody()) +"\tbroker:"+msg.getStoreHost()); } System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), messages); return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); Thread.currentThread().join();
源碼分析:
我們知道在RocketMQ中是可以給一個消費者實例設(shè)置多個線程并發(fā)消費的. consumer.setConsumeThreadMin 和 setConsumeThreadMax,
那MessageListenerOrderly是如何保證某一個時刻,只有一個消費者的某一個線程在消費某一個MessageQueue的呢?
就在Client模塊的 ConsumeMessageOrderlyService里面,消費者端并不是簡單的禁止并發(fā)處理,而是給每一個Consumer Queue加鎖,
private final MessageQueueLock messageQueueLock = new MessageQueueLock();
在消費每個消息之前,需要先獲取這個消息對應(yīng)的Consumer Queue所對應(yīng)的鎖,保證同一個Consumer Queue的消息不會被并發(fā)消費,但是不同的Consumer Queue的消息是可以并發(fā)處理的。
看完上述內(nèi)容,你們掌握Message Queue Selector如何實現(xiàn)順序消費的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!