真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

RabbitMQ延遲隊(duì)列及消息延遲推送實(shí)現(xiàn)的示例分析

小編給大家分享一下RabbitMQ延遲隊(duì)列及消息延遲推送實(shí)現(xiàn)的示例分析,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!

我們一直強(qiáng)調(diào)網(wǎng)站設(shè)計(jì)、網(wǎng)站建設(shè)對(duì)于企業(yè)的重要性,如果您也覺得重要,那么就需要我們慎重對(duì)待,選擇一個(gè)安全靠譜的網(wǎng)站建設(shè)公司,企業(yè)網(wǎng)站我們建議是要么不做,要么就做好,讓網(wǎng)站能真正成為企業(yè)發(fā)展過程中的有力推手。專業(yè)網(wǎng)站設(shè)計(jì)公司不一定是大公司,成都創(chuàng)新互聯(lián)公司作為專業(yè)的網(wǎng)絡(luò)公司選擇我們就是放心。

應(yīng)用場(chǎng)景

目前常見的應(yīng)用軟件都有消息的延遲推送的影子,應(yīng)用也極為廣泛,例如:

  • 淘寶七天自動(dòng)確認(rèn)收貨。在我們簽收商品后,物流系統(tǒng)會(huì)在七天后延時(shí)發(fā)送一個(gè)消息給支付系統(tǒng),通知支付系統(tǒng)將款打給商家,這個(gè)過程持續(xù)七天,就是使用了消息中間件的延遲推送功能。

  • 12306 購(gòu)票支付確認(rèn)頁(yè)面。我們?cè)谶x好票點(diǎn)擊確定跳轉(zhuǎn)的頁(yè)面中往往都會(huì)有倒計(jì)時(shí),代表著 30 分鐘內(nèi)訂單不確認(rèn)的話將會(huì)自動(dòng)取消訂單。其實(shí)在下訂單那一刻開始購(gòu)票業(yè)務(wù)系統(tǒng)就會(huì)發(fā)送一個(gè)延時(shí)消息給訂單系統(tǒng),延時(shí)30分鐘,告訴訂單系統(tǒng)訂單未完成,如果我們?cè)?0分鐘內(nèi)完成了訂單,則可以通過邏輯代碼判斷來忽略掉收到的消息。

在上面兩種場(chǎng)景中,如果我們使用下面兩種傳統(tǒng)解決方案無(wú)疑大大降低了系統(tǒng)的整體性能和吞吐量:

  • 使用 redis 給訂單設(shè)置過期時(shí)間,最后通過判斷 redis 中是否還有該訂單來決定訂單是否已經(jīng)完成。這種解決方案相較于消息的延遲推送性能較低,因?yàn)槲覀冎?redis 都是存儲(chǔ)于內(nèi)存中,我們遇到惡意下單或者刷單的將會(huì)給內(nèi)存帶來巨大壓力。

  • 使用傳統(tǒng)的數(shù)據(jù)庫(kù)輪詢來判斷數(shù)據(jù)庫(kù)表中訂單的狀態(tài),這無(wú)疑增加了IO次數(shù),性能極低。

  • 使用 jvm 原生的 DelayQueue ,也是大量占用內(nèi)存,而且沒有持久化策略,系統(tǒng)宕機(jī)或者重啟都會(huì)丟失訂單信息。

消息延遲推送的實(shí)現(xiàn)

在 RabbitMQ 3.6.x 之前我們一般采用死信隊(duì)列+TTL過期時(shí)間來實(shí)現(xiàn)延遲隊(duì)列,我們這里不做過多介紹,可以參考之前文章來了解:TTL、死信隊(duì)列

在 RabbitMQ 3.6.x 開始,RabbitMQ 官方提供了延遲隊(duì)列的插件,可以下載放置到 RabbitMQ 根目錄下的 plugins 下。延遲隊(duì)列插件下載

RabbitMQ延遲隊(duì)列及消息延遲推送實(shí)現(xiàn)的示例分析

首先我們創(chuàng)建交換機(jī)和消息隊(duì)列

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MQConfig {

  public static final String LAZY_EXCHANGE = "Ex.LazyExchange";
  public static final String LAZY_QUEUE = "MQ.LazyQueue";
  public static final String LAZY_KEY = "lazy.#";

  @Bean
  public TopicExchange lazyExchange(){
    //Map pros = new HashMap<>();
    //設(shè)置交換機(jī)支持延遲消息推送
    //pros.put("x-delayed-message", "topic");
    TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
    exchange.setDelayed(true);
    return exchange;
  }

  @Bean
  public Queue lazyQueue(){
    return new Queue(LAZY_QUEUE, true);
  }

  @Bean
  public Binding lazyBinding(){
    return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);
  }
}

我們?cè)?Exchange 的聲明中可以設(shè)置exchange.setDelayed(true)來開啟延遲隊(duì)列,也可以設(shè)置為以下內(nèi)容傳入交換機(jī)聲明的方法中,因?yàn)榈谝环N方式的底層就是通過這種方式來實(shí)現(xiàn)的。

//Map pros = new HashMap<>();
    //設(shè)置交換機(jī)支持延遲消息推送
    //pros.put("x-delayed-message", "topic");
    TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);

發(fā)送消息時(shí)我們需要指定延遲推送的時(shí)間,我們這里在發(fā)送消息的方法中傳入?yún)?shù) new MessagePostProcessor() 是為了獲得 Message對(duì)象,因?yàn)樾枰柚?Message對(duì)象的api 來設(shè)置延遲時(shí)間。

import com.anqi.mq.config.MQConfig;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class MQSender {

  @Autowired
  private RabbitTemplate rabbitTemplate;

  //confirmCallback returnCallback 代碼省略,請(qǐng)參照上一篇
 
  public void sendLazy(Object message){
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setConfirmCallback(confirmCallback);
    rabbitTemplate.setReturnCallback(returnCallback);
    //id + 時(shí)間戳 全局唯一
    CorrelationData correlationData = new CorrelationData("12345678909"+new Date());

    //發(fā)送消息時(shí)指定 header 延遲時(shí)間
    rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,
        new MessagePostProcessor() {
      @Override
      public Message postProcessMessage(Message message) throws AmqpException {
        //設(shè)置消息持久化
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        //message.getMessageProperties().setHeader("x-delay", "6000");
        message.getMessageProperties().setDelay(6000);
        return message;
      }
    }, correlationData);
  }
}

我們可以觀察 setDelay(Integer i)底層代碼,也是在 header 中設(shè)置 x-delay。等同于我們手動(dòng)設(shè)置 header

message.getMessageProperties().setHeader("x-delay", "6000");

/**
 * Set the x-delay header.
 * @param delay the delay.
 * @since 1.6
 */
public void setDelay(Integer delay) {
  if (delay == null || delay < 0) {
    this.headers.remove(X_DELAY);
  }
  else {
    this.headers.put(X_DELAY, delay);
  }
}

消費(fèi)端進(jìn)行消費(fèi)

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Component
public class MQReceiver {

  @RabbitListener(queues = "MQ.LazyQueue")
  @RabbitHandler
  public void onLazyMessage(Message msg, Channel channel) throws IOException{
    long deliveryTag = msg.getMessageProperties().getDeliveryTag();
    channel.basicAck(deliveryTag, true);
    System.out.println("lazy receive " + new String(msg.getBody()));

  }

測(cè)試結(jié)果

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class MQSenderTest {

  @Autowired
  private MQSender mqSender;

  @Test
  public void sendLazy() throws Exception {
    String msg = "hello spring boot";

    mqSender.sendLazy(msg + ":");
  }
}

果然在 6 秒后收到了消息 lazy receive hello spring boot:

看完了這篇文章,相信你對(duì)“RabbitMQ延遲隊(duì)列及消息延遲推送實(shí)現(xiàn)的示例分析”有了一定的了解,如果想了解更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!


文章題目:RabbitMQ延遲隊(duì)列及消息延遲推送實(shí)現(xiàn)的示例分析
網(wǎng)站URL:http://weahome.cn/article/ggdish.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部