RabbitMq(一)走進RabbitMq
讓客戶滿意是我們工作的目標,不斷超越客戶的期望值來自于我們對這個行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價值的長期合作伙伴,公司提供的服務(wù)項目有:國際域名空間、雅安服務(wù)器托管、營銷軟件、網(wǎng)站建設(shè)、南川網(wǎng)站維護、網(wǎng)站推廣。目錄
RabbitMQ 概念 exchange交換機機制 什么是交換機 binding? Direct Exchange交換機 Topic Exchange交換機 Fanout Exchange交換機 Header Exchange交換機 RabbitMQ 的 Hello - Demo(springboot實現(xiàn)) RabbitMQ 的 Hello Demo(spring xml實現(xiàn)) RabbitMQ 在生產(chǎn)環(huán)境下運用和出現(xiàn)的問題 Spring RabbitMQ 注解 消息的 JSON 傳輸 消息持久化,斷線重連,ACK。RabbitMQ 概念
RabbitMQ 即一個消息隊列,主要是用來實現(xiàn)應(yīng)用程序的異步和解耦,同時也能起到消息緩沖,消息分發(fā)的作用。RabbitMQ使用的是AMQP協(xié)議,它是一種二進制協(xié)議。默認啟動端口 5672。
在 RabbitMQ 中,如下圖結(jié)構(gòu):
rabbitmq
左側(cè) P 代表 生產(chǎn)者,也就是往 RabbitMQ 發(fā)消息的程序。 中間即是 RabbitMQ,其中包括了 交換機 和 隊列。 右側(cè) C 代表 消費者,也就是往 RabbitMQ 拿消息的程序。 那么,其中比較重要的概念有 4 個,分別為:虛擬主機,交換機,隊列,和綁定。
虛擬主機:一個虛擬主機持有一組交換機、隊列和綁定。為什么需要多個虛擬主機呢?很簡單,RabbitMQ當中,用戶只能在虛擬主機的粒度進行權(quán)限控制。 因此,如果需要禁止A組訪問B組的交換機/隊列/綁定,必須為A和B分別創(chuàng)建一個虛擬主機。每一個RabbitMQ服務(wù)器都有一個默認的虛擬主機“/”。 交換機:Exchange 用于轉(zhuǎn)發(fā)消息,但是它不會做存儲 ,如果沒有 Queue bind 到 Exchange 的話,它會直接丟棄掉 Producer 發(fā)送過來的消息。 這里有一個比較重要的概念:路由鍵 。消息到交換機的時候,交互機會轉(zhuǎn)發(fā)到對應(yīng)的隊列中,那么究竟轉(zhuǎn)發(fā)到哪個隊列,就要根據(jù)該路由鍵。 綁定:也就是交換機需要和隊列相綁定,這其中如上圖所示,是多對多的關(guān)系。 exchange交換機機制
什么是交換機
rabbitmq的message model實際上消息不直接發(fā)送到queue中,中間有一個exchange是做消息分發(fā),producer甚至不知道消息發(fā)送到那個隊列中去。因此,當exchange收到message時,必須準確知道該如何分發(fā)。是append到一定規(guī)則的queue,還是append到多個queue中,還是被丟棄?這些規(guī)則都是通過exchagne的4種type去定義的。
The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn\'t even know if a message will be delivered to any queue at all.
Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type. exchange是一個消息的agent,每一個虛擬的host中都有定義。它的職責是把message路由到不同的queue中。
binding?
exchange和queue通過routing-key關(guān)聯(lián),這兩者之間的關(guān)系是就是binding。如下圖所示,X表示交換機,紅色表示隊列,交換機通過一個routing-key去binding一個queue,routing-key有什么作用呢?看Direct exchange類型交換機。
Directed Exchange
路由鍵exchange,該交換機收到消息后會把消息發(fā)送到指定routing-key的queue中。那消息交換機是怎么知道的呢?其實,producer deliver消息的時候會把routing-key add到 message header中。routing-key只是一個messgae的attribute。
A direct exchange delivers messages to queues based on a message routing key. The routing key is a message attribute added into the message header by the producer. The routing key can be seen as an "address" that the exchange use to decide how to route the message. A message goes to the queue(s) whose binding key exactly matches the routing key of the message. Default Exchange 這種是特殊的Direct Exchange,是rabbitmq內(nèi)部默認的一個交換機。該交換機的name是空字符串,所有queue都默認binding 到該交換機上。所有binding到該交換機上的queue,routing-key都和queue的name一樣。 ** Topic Exchange**
通配符交換機,exchange會把消息發(fā)送到一個或者多個滿足通配符規(guī)則的routing-key的queue。其中表號匹配一個word,#匹配多個word和路徑,路徑之間通過.隔開。如滿足a..c的routing-key有a.hello.c;滿足#.hello的routing-key有a.b.c.helo。 ** Fanout Exchange**
扇形交換機,該交換機會把消息發(fā)送到所有binding到該交換機上的queue。這種是publisher/subcribe模式。用來做廣播最好。 所有該exchagne上指定的routing-key都會被ignore掉。
The fanout copies and routes a received message to all queues that are bound to it regardless of routing keys or pattern matching as with direct and topic exchanges. Keys provided will simply be ignored. Header Exchange
設(shè)置header attribute參數(shù)類型的交換機。
RabbitMQ 的 Hello Demo
安裝就不說了,建議按照官方文檔上做。先貼代碼,稍后解釋,代碼如下:
配置 交換機,隊列,交換機與隊列的綁定,消息監(jiān)視容器:
@Configuration @Data public class RabbitMQConfig {
final static String queueName = "spring-boot"; [@Bean](https://my.oschina.net/bean) Queue queue() { return new Queue(queueName, false); } [@Bean](https://my.oschina.net/bean) TopicExchange exchange() { return new TopicExchange("spring-boot-exchange"); } [@Bean](https://my.oschina.net/bean) Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(queueName); } @Bean SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(queueName); container.setMessageListener(listenerAdapter); return container; } @Bean Receiver receiver() { return new Receiver(); } @Bean MessageListenerAdapter listenerAdapter(Receiver receiver) { return new MessageListenerAdapter(receiver, "receiveMessage"); }
} 配置接收信息者(即消費者):
public class Receiver {
private CountDownLatch latch = new CountDownLatch(1); public void receiveMessage(String message) { System.out.println("Received <" + message + ">"); latch.countDown(); } public CountDownLatch getLatch() { return latch; }
} 配置發(fā)送信息者(即生產(chǎn)者):
@RestController public class Test { @Autowired RabbitTemplate rabbitTemplate;
@RequestMapping(value = "/test/{abc}",method = RequestMethod.GET) public String test(@PathVariable(value = "abc") String abc){ rabbitTemplate.convertAndSend("spring-boot", abc + " from RabbitMQ!"); return "abc"; }
} 以上便可實現(xiàn)一個簡單的 RabbitMQ Demo,具體代碼在:點這里
那么,這里,分為三個部分分析:發(fā)消息,交換機隊列,收消息。
對于發(fā)送消息:我們一般可以使用 RabbitTemplate,這個是 Spring 封裝給了我們,便于我們發(fā)送信息,我們調(diào)用 rabbitTemplate.convertAndSend("spring-boot", xxx); 即可發(fā)送信息。 對于交換機隊列:如上代碼,我們需要配置交換機 TopicExchange,配置隊列 Queue,并且配置他們之間的綁定 Binding 對于接受消息:首先需要創(chuàng)建一個消息監(jiān)聽容器,然后把我們的接受者注冊到該容器中,這樣,隊列中有信息,那么就會調(diào)用接收者的對應(yīng)的方法。如上代碼 container.setMessageListener(listenerAdapter); 其中,MessageListenerAdapter 可以看做是 我們接收者的一個包裝類,new MessageListenerAdapter(receiver, "receiveMessage"); 指明了如果有消息來,那么調(diào)用接收者哪個方法進行處理。 RabbitMQ 的 Hello Demo(spring xml實現(xiàn))
spring xml方式實現(xiàn)RabbitMQ簡單,可讀性較好,配置簡單,配置和實現(xiàn)如下所示。
上文已經(jīng)講述了rabbitmq的配置,xml方式通過properites文件存放用戶配置信息:
mq.host=127.0.0.1 mq.username=guest mq.password=guest mq.port=5672 配置application-mq.xml配置文件,聲明連接、交換機、queue以及consumer監(jiān)聽。
上述代碼中,引入properties文件就不多說了。
rabbit:connection-factory標簽聲明創(chuàng)建connection的factory工廠。
rabbit:queue聲明一個queue并設(shè)置queue的配置項,直接看標簽屬性就可以明白queue的配置項。
rabbit:direct-exchange聲明交換機并綁定queue。
rabbit:listener-container申明監(jiān)聽container并配置consumer和監(jiān)聽routing-key。
剩下就簡單了,application-context.xml中把rabbitmq配置import進去。
Producer實現(xiàn),發(fā)送消息還是使用template的convertAndSend() deliver消息。
@Service public class Producer {
@Autowired private AmqpTemplate amqpTemplate; private final static Logger logger = LoggerFactory.getLogger(Producer.class); public void sendDataToQueue(String queueKey, Object object) { try { amqpTemplate.convertAndSend(queueKey, object); } catch (Exception e) { e.printStackTrace(); logger.error("exeception={}",e); } }
} 配置consumer
package com.demo.mq.receive;
import org.springframework.stereotype.Service; import java.util.concurrent.CountDownLatch;
@Service public class Reveiver { private CountDownLatch latch = new CountDownLatch(1);
public void receiveMessage(String message) { System.out.println("reveice msg=" + message.toString()); latch.countDown(); }
} 測試deliver消息
Controller @RequestMapping("/demo/") public class TestController { private final static Logger logger = LoggerFactory.getLogger(TestController.class); @Resource private Producer producer;
@RequestMapping("/test/{msg}") public String send(@PathVariable("msg") String msg){ logger.info("#TestController.send#abc={msg}", msg); System.out.println("msg="+msg); producer.sendDataToQueue("test_queue_key",msg); return "index"; }
} RabbitMQ 在生產(chǎn)環(huán)境下運用和出現(xiàn)的問題
在生產(chǎn)環(huán)境中,由于 Spring 對 RabbitMQ 提供了一些方便的注解,所以首先可以使用這些注解。例如:
@EnableRabbit:@EnableRabbit 和 @Configuration 注解在一個類中結(jié)合使用,如果該類能夠返回一個 RabbitListenerContainerFactory 類型的 bean,那么就相當于能夠把該終端(消費端)和 RabbitMQ 進行連接。Ps:(生成端不是通過 RabbitListenerContainerFactory 來和 RabbitMQ 連接,而是通過 RabbitTemplate ) @RabbitListener:當對應(yīng)的隊列中有消息的時候,該注解修飾下的方法會被執(zhí)行。 @RabbitHandler:接收者可以監(jiān)聽多個隊列,不同的隊列消息的類型可能不同,該注解可以使得不同的消息讓不同方法來響應(yīng)。 具體這些注解的使用,可以參考這里的代碼:點這里
首先,生產(chǎn)環(huán)境下的 RabbitMQ 可能不會在生產(chǎn)者或者消費者本機上,所以需要重新定義 ConnectionFactory,即:
@Bean ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port); connectionFactory.setUsername(userName); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(vhost); return connectionFactory; } 這里,可以重新設(shè)置需要連接的 RabbitMQ 的 ip,端口,虛擬主機,用戶名,密碼。
然后,可以先從生產(chǎn)端考慮,生產(chǎn)端需要連接 RabbitMQ,那么可以通過 RabbitTemplate 進行連接。 Ps:(RabbitTemplate 用于生產(chǎn)端發(fā)送消息到交換機中),如下代碼:
@Bean(name="myTemplate") RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(integrationEventMessageConverter()); template.setExchange(exchangeName); return template; } 在該代碼中,new RabbitTemplate(connectionFactory); 設(shè)置了生產(chǎn)端連接到RabbitMQ,template.setMessageConverter(integrationEventMessageConverter()); 設(shè)置了 生產(chǎn)端發(fā)送給交換機的消息是以什么格式的,在 integrationEventMessageConverter() 代碼中:
public MessageConverter integrationEventMessageConverter() { Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter(); return messageConverter; } 如上 Jackson2JsonMessageConverter 指明了 JSON。上述代碼的最后 template.setExchange(exchangeName); 指明了 要把生產(chǎn)者要把消息發(fā)送到哪個交換機上。
有了上述,那么,我們即可使用 rabbitTemplate.convertAndSend("spring-boot", xxx); 發(fā)送消息,xxx 表示任意類型,因為上述的設(shè)置會幫我們把這些類型轉(zhuǎn)化成 JSON 傳輸。
接著,生產(chǎn)端發(fā)送我們說過了,那么現(xiàn)在可以看看消費端:
對于消費端,我們可以只創(chuàng)建 SimpleRabbitListenerContainerFactory,它能夠幫我們生成 RabbitListenerContainer,然后我們再使用 @RabbitListener 指定接收者收到信息時處理的方法。
@Bean(name="myListenContainer") public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setMessageConverter(integrationEventMessageConverter()); factory.setConnectionFactory(connectionFactory()); return factory; } 這其中 factory.setMessageConverter(integrationEventMessageConverter()); 指定了我們接受消息的時候,以 JSON 傳輸?shù)南⒖梢赞D(zhuǎn)換成對應(yīng)的類型傳入到方法中。例如:
@Slf4j @Component @RabbitListener(containerFactory = "helloRabbitListenerContainer",queues = "spring-boot") public class Receiver { @RabbitHandler public void receiveTeacher(Teacher teacher) { log.info("##### = {}",teacher); } } 可能出現(xiàn)的問題:
消息持久化
在生產(chǎn)環(huán)境中,我們需要考慮萬一生產(chǎn)者掛了,消費者掛了,或者 rabbitmq 掛了怎么樣。一般來說,如果生產(chǎn)者掛了或者消費者掛了,其實是沒有影響,因為消息就在隊列里面。那么萬一 rabbitmq 掛了,之前在隊列里面的消息怎么辦,其實可以做消息持久化,RabbitMQ 會把信息保存在磁盤上。
做法是可以先從 Connection 對象中拿到一個 Channel 信道對象,然后再可以通過該對象設(shè)置 消息持久化。
生產(chǎn)者或者消費者斷線重連
這里 Spring 有自動重連機制。
ACK 確認機制
每個Consumer可能需要一段時間才能處理完收到的數(shù)據(jù)。如果在這個過程中,Consumer出錯了,異常退出了,而數(shù)據(jù)還沒有處理完成,那么 非常不幸,這段數(shù)據(jù)就丟失了。因為我們采用no-ack的方式進行確認,也就是說,每次Consumer接到數(shù)據(jù)后,而不管是否處理完 成,RabbitMQ Server會立即把這個Message標記為完成,然后從queue中刪除了。
如果一個Consumer異常退出了,它處理的數(shù)據(jù)能夠被另外的Consumer處理,這樣數(shù)據(jù)在這種情況下就不會丟失了(注意是這種情況下)。 為了保證數(shù)據(jù)不被丟失,RabbitMQ支持消息確認機制,即acknowledgments。為了保證數(shù)據(jù)能被正確處理而不僅僅是被Consumer收到,那么我們不能采用no-ack。而應(yīng)該是在處理完數(shù)據(jù)后發(fā)送ack。
在處理數(shù)據(jù)后發(fā)送的ack,就是告訴RabbitMQ數(shù)據(jù)已經(jīng)被接收,處理完成,RabbitMQ可以去安全的刪除它了。 如果Consumer退出了但是沒有發(fā)送ack,那么RabbitMQ就會把這個Message發(fā)送到下一個Consumer。這樣就保證了在Consumer異常退出的情況下數(shù)據(jù)也不會丟失。
個人對 RabbitMQ ACK 的一些疑問,求助:點這里
總結(jié)
RabbitMQ 作用:異步,解耦,緩沖,消息分發(fā)。 RabbitMQ 主要分為3個部分,生產(chǎn)者,交換機和隊列,消費者。 需要注意消息持久化,目的為了防止 RabbitMQ 宕機;考慮 ACK 機制,目的為了如果消費者對消息的處理失敗了,那么后續(xù)要如何處理。 寫在最后
寫出來,說出來才知道對不對,知道不對才能改正,改正了才能成長。 在技術(shù)方面,希望大家眼里都容不得沙子。如果有不對的地方或者需要改進的地方希望可以指出,萬分感謝。