真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

使用Rabbitmq延遲隊列怎么實現(xiàn)定時任務

這篇文章給大家介紹使用Rabbitmq延遲隊列怎么實現(xiàn)定時任務,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

創(chuàng)新互聯(lián)是一家集網(wǎng)站建設,烏拉特中企業(yè)網(wǎng)站建設,烏拉特中品牌網(wǎng)站建設,網(wǎng)站定制,烏拉特中網(wǎng)站建設報價,網(wǎng)絡營銷,網(wǎng)絡優(yōu)化,烏拉特中網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強企業(yè)競爭力。可充分滿足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時我們時刻保持專業(yè)、時尚、前沿,時刻以成就客戶成長自我,堅持不斷學習、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實用型網(wǎng)站。

Rabbitmq延遲隊列

Rabbitmq本身是沒有延遲隊列的,只能通過Rabbitmq本身隊列的特性來實現(xiàn),想要Rabbitmq實現(xiàn)延遲隊列,需要使用Rabbitmq的死信交換機(Exchange)和消息的存活時間TTL(Time To Live)

死信交換機

一個消息在滿足如下條件下,會進死信交換機,記住這里是交換機而不是隊列,一個交換機可以對應很多隊列。

  1. 一個消息被Consumer拒收了,并且reject方法的參數(shù)里requeue是false。也就是說不會被再次放在隊列里,被其他消費者使用。

  2. 上面的消息的TTL到了,消息過期了。

  3. 隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。

死信交換機就是普通的交換機,只是因為我們把過期的消息扔進去,所以叫死信交換機,并不是說死信交換機是某種特定的交換機

消息TTL(消息存活時間)

消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連著的保留時間,也可以對每一個單獨的消息做單獨的設置。超過了這個時間,我們認為這個消息就死了,稱之為死信。如果隊列設置了,消息也設置了,那么會取小的。所以一個消息如果被路由到不同的隊列中,這個消息死亡的時間有可能不一樣(不同的隊列設置)。這里單講單個消息的TTL,因為它才是實現(xiàn)延遲任務的關鍵。

byte[] messageBodyBytes = "Hello, world!".getBytes(); 
AMQP.BasicProperties properties = new AMQP.BasicProperties(); 
properties.setExpiration("60000"); 
channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);

可以通過設置消息的expiration字段或者x-message-ttl屬性來設置時間,兩者是一樣的效果。只是expiration字段是字符串參數(shù),所以要寫個int類型的字符串: 當上面的消息扔到隊列中后,過了60秒,如果沒有被消費,它就死了。不會被消費者消費到。這個消息后面的,沒有“死掉”的消息對頂上來,被消費者消費。死信在隊列中并不會被刪除和釋放,它會被統(tǒng)計到隊列的消息數(shù)中去

處理流程圖

使用Rabbitmq延遲隊列怎么實現(xiàn)定時任務

創(chuàng)建交換機(Exchanges)和隊列(Queues)

創(chuàng)建死信交換機

使用Rabbitmq延遲隊列怎么實現(xiàn)定時任務 

如圖所示,就是創(chuàng)建一個普通的交換機,這里為了方便區(qū)分,把交換機的名字取為:delay

創(chuàng)建自動過期消息隊列

這個隊列的主要作用是讓消息定時過期的,比如我們需要2小時候關閉訂單,我們就需要把消息放進這個隊列里面,把消息過期時間設置為2小時

使用Rabbitmq延遲隊列怎么實現(xiàn)定時任務 

創(chuàng)建一個一個名為delay_queue1的自動過期的隊列,當然圖片上面的參數(shù)并不會讓消息自動過期,因為我們并沒有設置x-message-ttl參數(shù),如果整個隊列的消息有消息都是相同的,可以設置,這里為了靈活,所以并沒有設置,另外兩個參數(shù)x-dead-letter-exchange代表消息過期后,消息要進入的交換機,這里配置的是delay,也就是死信交換機,x-dead-letter-routing-key是配置消息過期后,進入死信交換機的routing-key,跟發(fā)送消息的routing-key一個道理,根據(jù)這個key將消息放入不同的隊列

創(chuàng)建消息處理隊列

這個隊列才是真正處理消息的隊列,所有進入這個隊列的消息都會被處理

使用Rabbitmq延遲隊列怎么實現(xiàn)定時任務 

消息隊列的名字為delay_queue2

消息隊列綁定到交換機

進入交換機詳情頁面,將創(chuàng)建的2個隊列(delay queue1和delay queue2)綁定到交換機上面

使用Rabbitmq延遲隊列怎么實現(xiàn)定時任務 

自動過期消息隊列的routing key 設置為delay

綁定delay queue2

使用Rabbitmq延遲隊列怎么實現(xiàn)定時任務

delay queue2 的key要設置為創(chuàng)建自動過期的隊列的x-dead-letter-routing-key參數(shù),這樣當消息過期的時候就可以自動把消息放入delay_queue2這個隊列中了

綁定后的管理頁面如下圖:

使用Rabbitmq延遲隊列怎么實現(xiàn)定時任務

當然這個綁定也可以使用代碼來實現(xiàn),只是為了直觀表現(xiàn),所以本文使用的管理平臺來操作

發(fā)送消息

String msg = "hello word"; 
MessageProperties messageProperties = new MessageProperties(); 
  messageProperties.setExpiration("6000");
  messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());
  Message message = new Message(msg.getBytes(), messageProperties);
  rabbitTemplate.convertAndSend("delay", "delay",message);

主要的代碼就是

messageProperties.setExpiration("6000");

設置了讓消息6秒后過期

注意:因為要讓消息自動過期,所以一定不能設置delay_queue1的監(jiān)聽,不能讓這個隊列里面的消息被接受到,否則消息一旦被消費,就不存在過期了

接收消息

接收消息配置好delay_queue2的監(jiān)聽就好了

package wang.raye.rabbitmq.demo1;
import org.springframework.amqp.core.AcknowledgeMode; 
import org.springframework.amqp.core.Binding; 
import org.springframework.amqp.core.BindingBuilder; 
import org.springframework.amqp.core.DirectExchange; 
import org.springframework.amqp.core.Message; 
import org.springframework.amqp.core.Queue; 
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; 
import org.springframework.amqp.rabbit.connection.ConnectionFactory; 
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; 
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration;
@Configuration
public class DelayQueue { 
 /** 消息交換機的名字*/
 public static final String EXCHANGE = "delay";
 /** 隊列key1*/
 public static final String ROUTINGKEY1 = "delay";
 /** 隊列key2*/
 public static final String ROUTINGKEY2 = "delay_key";

 /**
  * 配置鏈接信息
  * @return
  */
 @Bean
 public ConnectionFactory connectionFactory() {
  CachingConnectionFactory connectionFactory = new CachingConnectionFactory("120.76.237.8",5672);

  connectionFactory.setUsername("kberp");
  connectionFactory.setPassword("kberp");
  connectionFactory.setVirtualHost("/");
  connectionFactory.setPublisherConfirms(true); // 必須要設置
  return connectionFactory;
 }

 /** 
  * 配置消息交換機
  * 針對消費者配置 
  FanoutExchange: 將消息分發(fā)到所有的綁定隊列,無routingkey的概念 
  HeadersExchange :通過添加屬性key-value匹配 
  DirectExchange:按照routingkey分發(fā)到指定隊列 
  TopicExchange:多關鍵字匹配 
  */ 
 @Bean 
 public DirectExchange defaultExchange() { 
  return new DirectExchange(EXCHANGE, true, false);
 } 

 /**
  * 配置消息隊列2
  * 針對消費者配置 
  * @return
  */
 @Bean
 public Queue queue() { 
  return new Queue("delay_queue2", true); //隊列持久 

 }
 /**
  * 將消息隊列2與交換機綁定
  * 針對消費者配置 
  * @return
  */
 @Bean 
 @Autowired
 public Binding binding() { 
  return BindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2); 
 } 

 /**
  * 接受消息的監(jiān)聽,這個監(jiān)聽會接受消息隊列1的消息
  * 針對消費者配置 
  * @return
  */
 @Bean 
 @Autowired
 public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) { 
  SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); 
  container.setQueues(queue()); 
  container.setExposeListenerChannel(true); 
  container.setMaxConcurrentConsumers(1); 
  container.setConcurrentConsumers(1); 
  container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認 
  container.setMessageListener(new ChannelAwareMessageListener() {

   public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
    byte[] body = message.getBody(); 
    System.out.println("delay_queue2 收到消息 : " + new String(body)); 
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費 
   } 
  }); 
  return container; 
 } 
}

關于使用Rabbitmq延遲隊列怎么實現(xiàn)定時任務就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。


網(wǎng)站欄目:使用Rabbitmq延遲隊列怎么實現(xiàn)定時任務
分享網(wǎng)址:http://weahome.cn/article/joojps.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部