真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

RabbitMQ中怎么實(shí)現(xiàn)延遲隊(duì)列

這篇文章給大家介紹RabbitMQ中怎么實(shí)現(xiàn)延遲隊(duì)列,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

成都創(chuàng)新互聯(lián),為您提供網(wǎng)站建設(shè)公司成都網(wǎng)站制作公司、網(wǎng)站營銷推廣、網(wǎng)站開發(fā)設(shè)計(jì),對服務(wù)三維植被網(wǎng)等多個(gè)行業(yè)擁有豐富的網(wǎng)站建設(shè)及推廣經(jīng)驗(yàn)。成都創(chuàng)新互聯(lián)網(wǎng)站建設(shè)公司成立于2013年,提供專業(yè)網(wǎng)站制作報(bào)價(jià)服務(wù),我們深知市場的競爭激烈,認(rèn)真對待每位客戶,為客戶提供賞心悅目的作品。 與客戶共同發(fā)展進(jìn)步,是我們永遠(yuǎn)的責(zé)任!

在 RabbitMQ 3.6.x 之前我們一般采用死信隊(duì)列+TTL過期時(shí)間來實(shí)現(xiàn)延遲隊(duì)列,我們這里不做過多介紹,可以參考之前文章來了解:TTL、死信隊(duì)列

在 RabbitMQ 3.6.x 開始,RabbitMQ 官方提供了延遲隊(duì)列的插件,可以下載放置到 RabbitMQ 根目錄下的 plugins 下。延遲隊(duì)列插件下載

首先我們創(chuàng)建交換機(jī)和消息隊(duì)列

import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class MQConfig {  public static final String LAZY_EXCHANGE = "Ex.LazyExchange";  public static final String LAZY_QUEUE = "MQ.LazyQueue";  public static final String LAZY_KEY = "lazy.#";  @Bean  public TopicExchange lazyExchange(){    //Map pros = new HashMap<>();    //設(shè)置交換機(jī)支持延遲消息推送    //pros.put("x-delayed-message", "topic");    TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);    exchange.setDelayed(true);    return exchange;  }  @Bean  public Queue lazyQueue(){    return new Queue(LAZY_QUEUE, true);  }  @Bean  public Binding lazyBinding(){    return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);  }}

我們在 Exchange 的聲明中可以設(shè)置exchange.setDelayed(true)來開啟延遲隊(duì)列,也可以設(shè)置為以下內(nèi)容傳入交換機(jī)聲明的方法中,因?yàn)榈谝环N方式的底層就是通過這種方式來實(shí)現(xiàn)的。

//Map pros = new HashMap<>();    //設(shè)置交換機(jī)支持延遲消息推送    //pros.put("x-delayed-message", "topic");    TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);

發(fā)送消息時(shí)我們需要指定延遲推送的時(shí)間,我們這里在發(fā)送消息的方法中傳入?yún)?shù) new MessagePostProcessor() 是為了獲得 Message對象,因?yàn)樾枰柚?Message對象的api 來設(shè)置延遲時(shí)間。

import com.anqi.mq.config.MQConfig;import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageDeliveryMode;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.Date;@Componentpublic class MQSender {  @Autowired  private RabbitTemplate rabbitTemplate;  //confirmCallback returnCallback 代碼省略,請參照上一篇   public void sendLazy(Object message){    rabbitTemplate.setMandatory(true);    rabbitTemplate.setConfirmCallback(confirmCallback);    rabbitTemplate.setReturnCallback(returnCallback);    //id + 時(shí)間戳 全局唯一    CorrelationData correlationData = new CorrelationData("12345678909"+new Date());    //發(fā)送消息時(shí)指定 header 延遲時(shí)間    rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,        new MessagePostProcessor() {      @Override      public Message postProcessMessage(Message message) throws AmqpException {        //設(shè)置消息持久化        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);        //message.getMessageProperties().setHeader("x-delay", "6000");        message.getMessageProperties().setDelay(6000);        return message;      }    }, correlationData);  }}

我們可以觀察 setDelay(Integer i)底層代碼,也是在 header 中設(shè)置 x-delay。等同于我們手動(dòng)設(shè)置 header

message.getMessageProperties().setHeader("x-delay", "6000");

/** * Set the x-delay header. * @param delay the delay. * @since 1.6 */public void setDelay(Integer delay) {  if (delay == null || delay < 0) {    this.headers.remove(X_DELAY);  }  else {    this.headers.put(X_DELAY, delay);  }}

消費(fèi)端進(jìn)行消費(fèi)

import com.rabbitmq.client.Channel;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.stereotype.Component;import java.io.IOException;import java.util.Map;@Componentpublic class MQReceiver {  @RabbitListener(queues = "MQ.LazyQueue")  @RabbitHandler  public void onLazyMessage(Message msg, Channel channel) throws IOException{    long deliveryTag = msg.getMessageProperties().getDeliveryTag();    channel.basicAck(deliveryTag, true);    System.out.println("lazy receive " + new String(msg.getBody()));  }

測試結(jié)果

import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest@RunWith(SpringRunner.class)public class MQSenderTest {  @Autowired  private MQSender mqSender;  @Test  public void sendLazy() throws Exception {    String msg = "hello spring boot";    mqSender.sendLazy(msg + ":");  }}

果然在 6 秒后收到了消息 lazy receive hello spring boot:

關(guān)于RabbitMQ中怎么實(shí)現(xiàn)延遲隊(duì)列就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。


當(dāng)前標(biāo)題:RabbitMQ中怎么實(shí)現(xiàn)延遲隊(duì)列
網(wǎng)頁網(wǎng)址:http://weahome.cn/article/pechje.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部