本篇內(nèi)容主要講解“Rabbit MQ的廣播模式是什么意思”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Rabbit MQ的廣播模式是什么意思”吧!
為寧縣等地區(qū)用戶提供了全套網(wǎng)頁設計制作服務,及寧縣網(wǎng)站建設行業(yè)解決方案。主營業(yè)務為網(wǎng)站制作、成都網(wǎng)站制作、寧縣網(wǎng)站設計,以傳統(tǒng)方式定制建設網(wǎng)站,并提供域名空間備案等一條龍服務,秉承以專業(yè)、用心的態(tài)度為用戶提供真誠的服務。我們深信只要達到每一位用戶的要求,就會得到認可,從而選擇與我們長期合作。這樣,我們也可以走得更遠!
最近做了一個需求,需要一個接口,可以給3萬多人發(fā)消息,因為人數(shù)較多,用一條線程同步發(fā)送肯定是不行的,如果是一條線程,同步發(fā)送的話,哪怕有一條數(shù)據(jù)出現(xiàn)了問題,都會導致本次發(fā)送的失敗,所以,想到了用Rabbit MQ的廣播模式來做。
廣播模式其實就是將你要發(fā)送的消息放到“交換機”中,然后對應的隊列去交換機中獲取消息給消費者(接收消息的一端)消費。如下圖所示:
(emmm 圖片是我在網(wǎng)上找的,自己畫的太丑了)
在上述過程中,主線程執(zhí)行到第一步之后就可以返回了,這是一個異步的過程,不需要等待隊列去交換機中取完數(shù)據(jù),“主線程返回“和“隊列取數(shù)據(jù)給消費者”這兩個過程是同時進行的。這樣的話,發(fā)送消息的時候就不必等到3萬多條消息都發(fā)送完成才能繼續(xù)進行下一步了。
功能大概介紹了一下,接下來就是代碼了,我只做一個簡單的記錄
首先是一個配置類
package com.apps.rabbit.FanoutMessage; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class TestExchangeConfiguration { /** * 創(chuàng)建廣播形式的交換機 * * @return 交換機實例 */ @Bean public FanoutExchange fanoutExchanges() { System.out.println("【【【交換機實例創(chuàng)建成功】】】"); return new FanoutExchange(Constants.FANOUT_EXCHANGE_NAME); } /** * 測試隊列一 * * @return 隊列實例 */ @Bean public Queue queue1() { System.out.println("【【【測試隊列一實例創(chuàng)建成功】】】"); return new Queue(Constants.TEST_QUEUE1_NAME); } /** * 綁定隊列一到交換機 * * @return 綁定對象 */ @Bean public Binding bingQueue1ToExchange() { System.out.println("【【【綁定隊列一到交換機成功】】】"); return BindingBuilder.bind(queue1()).to(fanoutExchanges()); } }
上邊這個用來創(chuàng)建交換機和隊列,并將隊列和交換機進行綁定
下邊這個是消息生產(chǎn)者
package com.apps.rabbit.FanoutMessage; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class Sender { @Autowired private RabbitTemplate rabbitTemplate; /** * 發(fā)送消息 * * @param message 消息內(nèi)容 * 說明: routingKey可以指定也可以不指定,這里我們給的是隊列名 */ public void sendMessage(Object message) { System.out.println("【消息發(fā)送者】發(fā)送消息到fanout交換機,消息內(nèi)容為: {}"+message); rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE_NAME, Constants.TEST_QUEUE1_NAME, message); } }
再添加一個輔助類,存常量(別問為啥,大家都是這么做的哈哈哈哈)
package com.apps.rabbit.FanoutMessage; public class Constants { /** * 交換機名稱 */ public static final String FANOUT_EXCHANGE_NAME = "message.fanout.exchange"; /** * 測試隊列名稱1 */ public static final String TEST_QUEUE1_NAME = "messageReceiverQueue"; }
最后是消息的消費者
package com.apps.rabbit.FanoutMessage; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; import com.apps.bcodemsg.MsgResponse; import com.apps.model.basic.message.InfoMessageUser; import com.apps.model.basic.message.InfoMessageUserVO; import com.apps.service.basic.message.InfoMessageUserService; import com.rabbitmq.client.Channel; import org.apache.commons.lang.StringUtils; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.ArrayList; import java.util.List; @Component public class Receiver1 { @Autowired private InfoMessageUserService infoMessageUserService; private static String CONTENT_KEY="message"; private static String MESSAGE_ID_KEY="messageId"; private static String USER_ID_KEY="userId"; @RabbitListener(queues = "messageReceiverQueue") @RabbitHandler public void receiveMessage(String messageInfo, Channel channel, Message message) throws IOException { doSomething(); } }
到此就完事了,只需要在你的業(yè)務中調(diào)用生產(chǎn)者就行了,消息會被隊列自動接收,而接收到的消息需要怎么處理,就是業(yè)務邏輯的事了。
上邊只是一個最簡單的例子,也只用了一個隊列,如果想要添加多個隊列,需要在配置類中進行聲明,并進行綁定,此外,在調(diào)用生產(chǎn)者的時候,需要將rabbitTemplate.convertAndSend()方法的第二個參數(shù)去掉。
ok!
到此,相信大家對“Rabbit MQ的廣播模式是什么意思”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學習!