一 、RabbitMQ的介紹
創(chuàng)新互聯(lián)是專業(yè)的滑縣網(wǎng)站建設(shè)公司,滑縣接單;提供網(wǎng)站設(shè)計(jì)制作、成都網(wǎng)站建設(shè),網(wǎng)頁(yè)設(shè)計(jì),網(wǎng)站設(shè)計(jì),建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行滑縣網(wǎng)站開發(fā)網(wǎng)頁(yè)制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊(duì),希望更多企業(yè)前來合作!
RabbitMQ是消息中間件的一種,消息中間件即分布式系統(tǒng)中完成消息的發(fā)送和接收的基礎(chǔ)軟件,消息中間件的工作過程可以用生產(chǎn)者消費(fèi)者模型來表示.即,生產(chǎn)者不斷的向消息隊(duì)列發(fā)送信息,而消費(fèi)者從消息隊(duì)列中消費(fèi)信息.具體過程如下:
從上圖可看出,對(duì)于消息隊(duì)列來說,生產(chǎn)者、消息隊(duì)列、消費(fèi)者是最重要的三個(gè)概念,生產(chǎn)者發(fā)消息到消息隊(duì)列中去,消費(fèi)者監(jiān)聽指定的消息隊(duì)列,并且當(dāng)消息隊(duì)列收到消息之后,接收消息隊(duì)列傳來的消息,并且給予相應(yīng)的處理。消息隊(duì)列常用于分布式系統(tǒng)之間互相信息的傳遞。
對(duì)于RabbitMQ來說,除了這三個(gè)基本模塊以外,還添加了一個(gè)模塊,即交換機(jī)(Exchange)。它使得生產(chǎn)者和消息隊(duì)列之間產(chǎn)生了隔離,生產(chǎn)者將消息發(fā)送給交換機(jī),而交換機(jī)則根據(jù)調(diào)度策略把相應(yīng)的消息轉(zhuǎn)發(fā)給對(duì)應(yīng)的消息隊(duì)列。
交換機(jī)的主要作用是接收相應(yīng)的消息并且綁定到指定的隊(duì)列。交換機(jī)有四種類型,分別為Direct、topic、headers、Fanout。
Direct是RabbitMQ默認(rèn)的交換機(jī)模式,也是最簡(jiǎn)單的模式。即創(chuàng)建消息隊(duì)列的時(shí)候,指定一個(gè)BindingKey。當(dāng)發(fā)送者發(fā)送消息的時(shí)候,指定對(duì)應(yīng)的Key。當(dāng)Key和消息隊(duì)列的BindingKey一致的時(shí)候,消息將會(huì)被發(fā)送到該消息隊(duì)列中。
topic轉(zhuǎn)發(fā)信息主要是依據(jù)通配符,隊(duì)列和交換機(jī)的綁定主要是依據(jù)一種模式(通配符+字符串),而當(dāng)發(fā)送消息的時(shí)候,只有指定的Key和該模式相匹配的時(shí)候,消息才會(huì)被發(fā)送到該消息隊(duì)列中。
headers也是根據(jù)一個(gè)規(guī)則進(jìn)行匹配,在消息隊(duì)列和交換機(jī)綁定的時(shí)候會(huì)指定一組鍵值對(duì)規(guī)則,而發(fā)送消息的時(shí)候也會(huì)指定一組鍵值對(duì)規(guī)則,當(dāng)兩組鍵值對(duì)規(guī)則相匹配的時(shí)候,消息會(huì)被發(fā)送到匹配的消息隊(duì)列中。
Fanout是路由廣播的形式,將會(huì)把消息發(fā)給綁定它的全部隊(duì)列,即便設(shè)置了key,也會(huì)被忽略。
二 、SpringBoot整合RabbitMQ(Direct模式)
SpringBoot整合RabbitMQ非常簡(jiǎn)單,首先還是pom.xml引入依賴。
org.springframework.boot spring-boot-starter-amqp
在application.properties中配置RabbitMQ相關(guān)的信息,并首先啟動(dòng)了RabbitMQ實(shí)例,并創(chuàng)建兩個(gè)queue。
spring.application.name=spirng-boot-rabbitmq spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin
配置Queue(消息隊(duì)列),由于采用的是Direct模式,需要在配置Queue的時(shí)候指定一個(gè)鍵,使其和交換機(jī)綁定。
@Configuration public class RabbitConfig { @Bean public org.springframework.amqp.core.Queue Queue() { return new org.springframework.amqp.core.Queue("hello"); } }
接著就可以發(fā)送消息啦。在SpringBoot中,我們使用AmqpTemplate去發(fā)送消息。代碼如下:
@Component public class HelloSender { @Autowired private AmqpTemplate rabbitTemplate; public void send(int index) { String context = "hello Queue "+index + new Date(); System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("hello", context); } }
生產(chǎn)者發(fā)送消息之后就需要消費(fèi)者接收消息。這里定義了兩個(gè)消息消費(fèi)者,用來模擬生產(chǎn)者與消費(fèi)者一對(duì)多的關(guān)系。
@Component @RabbitListener(queues = "hello") public class HelloReceiver { @RabbitHandler public void process(String hello) { System.out.println("Receiver1 : " + hello); } } @Component @RabbitListener(queues = "hello") public class HelloReceiver2 { @RabbitHandler public void process(String hello) { System.out.println("Receiver2 : " + hello); } }
在單元測(cè)試中模擬發(fā)送消息,批量發(fā)送10條消息,兩個(gè)接收者分別接收了5條消息。
@Autowired private HelloSender helloSender; @Test public void hello() throws Exception { for(int i=0;i<10;i++) { helloSender.send(i); } }
實(shí)際上RabbitMQ還可以支持發(fā)送對(duì)象,當(dāng)然由于涉及到序列化和反序列化,該對(duì)象要實(shí)現(xiàn)Serilizable接口。這里定義了User對(duì)象,用來做發(fā)送消息內(nèi)容。
import java.io.Serializable; public class User implements Serializable{ private String name; private String pwd; public String getPwd() { return pwd; } public void setPwd(String pwd) { this.pwd = pwd; } public String getName() { return name; } public void setName(String name) { this.name = name; } public User(String name, String pwd) { this.name = name; this.pwd = pwd; } @Override public String toString() { return "User{" +"name='" + name + '\'' +", pwd='" + pwd + '\'' +'}'; } }
在生產(chǎn)者中發(fā)送User對(duì)象。
@Component public class ModelSender { @Autowired private AmqpTemplate rabbitTemplate; public void sendModel(User user) { System.out.println("Sender object: " + user.toString()); this.rabbitTemplate.convertAndSend("object", user); } }
在消費(fèi)者中接收User對(duì)象。
@Component @RabbitListener(queues = "object") public class ModelRecevicer { @RabbitHandler public void process(User user) { System.out.println("Receiver object : " + user); } }
在單元測(cè)試中注入ModelSender 對(duì)象,實(shí)例化User對(duì)象,然后發(fā)送。
@Autowired private ModelSender modelSender; @Test public void model() throws Exception { User user=new User("abc","123"); modelSender.sendModel(user); }
三 、SpringBoot整合RabbitMQ(Topic轉(zhuǎn)發(fā)模式)
首先需要在RabbitMQ服務(wù)端創(chuàng)建交換機(jī)topicExchange,并綁定兩個(gè)queue:topic.message、topic.messages。
新建TopicRabbitConfig,設(shè)置對(duì)應(yīng)的queue與binding。
@Configuration public class TopicRabbitConfig { final static String message = "topic.message"; final static String messages = "topic.messages"; @Bean public Queue queueMessage() { return new Queue(TopicRabbitConfig.message); } @Bean public Queue queueMessages() { return new Queue(TopicRabbitConfig.messages); } @Bean TopicExchange exchange() { return new TopicExchange("topicExchange"); } @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } }
創(chuàng)建消息生產(chǎn)者,在TopicSender中發(fā)送3個(gè)消息。
@Component public class TopicSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hi, i am message all"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("topicExchange", "topic.1", context); } public void send1() { String context = "hi, i am message 1"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("topicExchange", "topic.message", context); } public void send2() { String context = "hi, i am messages 2"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages", context); } }
生產(chǎn)者發(fā)送消息,這里創(chuàng)建了兩個(gè)接收消息的消費(fèi)者。
@Component @RabbitListener(queues = "topic.message") public class TopicReceiver { @RabbitHandler public void process(String message) { System.out.println("Topic Receiver1 : " + message); } } @Component @RabbitListener(queues = "topic.messages") public class TopicReceiver2 { @RabbitHandler public void process(String message) { System.out.println("Topic Receiver2 : " + message); } }
在單元測(cè)試中注入TopicSender,利用topicSender 發(fā)送消息。
@Autowired private TopicSender topicSender; @Test public void topicSender() throws Exception { topicSender.send(); topicSender.send1(); topicSender.send2(); }
從上面的輸出結(jié)果可以看到,Topic Receiver2 匹配到了所有消息,Topic Receiver1只匹配到了1個(gè)消息。
四 、SpringBoot整合RabbitMQ(Fanout Exchange形式)
Fanout Exchange形式又叫廣播形式,因此我們發(fā)送到路由器的消息會(huì)使得綁定到該路由器的每一個(gè)Queue接收到消息。首先需要在RabbitMQ服務(wù)端創(chuàng)建交換機(jī)fanoutExchange,并綁定三個(gè)queue:fanout.A、fanout.B、fanout.C。
與Topic類似,新建FanoutRabbitConfig,綁定交換機(jī)和隊(duì)列。
@Configuration public class FanoutRabbitConfig { @Bean public Queue AMessage() { return new Queue("fanout.A"); } @Bean public Queue BMessage() { return new Queue("fanout.B"); } @Bean public Queue CMessage() { return new Queue("fanout.C"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); } }
創(chuàng)建消息生產(chǎn)者,在FanoutSender中發(fā)送消息。
@Component public class FanoutSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hi, fanout msg "; System.out.println("FanoutSender : " + context); this.rabbitTemplate.convertAndSend("fanoutExchange","", context); } }
然后創(chuàng)建了3個(gè)接收者FanoutReceiverA、FanoutReceiverB、FanoutReceiverC。
@Component @RabbitListener(queues = "fanout.A") public class FanoutReceiverA { @RabbitHandler public void process(String message) { System.out.println("fanout Receiver A : " + message); } } @Component @RabbitListener(queues = "fanout.B") public class FanoutReceiverB { @RabbitHandler public void process(String message) { System.out.println("fanout Receiver B: " + message); } } @Component @RabbitListener(queues = "fanout.C") public class FanoutReceiverC { @RabbitHandler public void process(String message) { System.out.println("fanout Receiver C: " + message); } }
在單元測(cè)試中注入消息發(fā)送者,發(fā)送消息。
@Autowired private FanoutSender fanoutSender; @Test public void fanoutSender() throws Exception { fanoutSender.send(); }
從下圖可以看到3個(gè)隊(duì)列都接收到了消息。
本章節(jié)創(chuàng)建的類比較多,下圖為本章節(jié)的結(jié)構(gòu),也可以直接查看demo源碼了解。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持創(chuàng)新互聯(lián)。