真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

RabbitMQ如何實(shí)現(xiàn)延時隊(duì)列

本篇內(nèi)容介紹了“RabbitMQ如何實(shí)現(xiàn)延時隊(duì)列”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

創(chuàng)新互聯(lián)是一家專業(yè)提供惠安企業(yè)網(wǎng)站建設(shè),專注與成都網(wǎng)站設(shè)計(jì)、網(wǎng)站制作、HTML5建站、小程序制作等業(yè)務(wù)。10年已為惠安眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)絡(luò)公司優(yōu)惠進(jìn)行中。

什么是延時隊(duì)列

延時隊(duì)列:顧名思義,是一個用于做消息延時消費(fèi)的隊(duì)列。但是它也是一個普通隊(duì)列,所以它具備普通隊(duì)列的特性,相比之下,延時的特性就是它最大的特點(diǎn)。所謂的延時就是將我們需要的消息,延遲多久之后被消費(fèi)。普通隊(duì)列是即時消費(fèi)的,延時隊(duì)列是根據(jù)延時時間,多久之后才能消費(fèi)的。

RabbitMQ如何實(shí)現(xiàn)延時隊(duì)列

延時隊(duì)列使用場景

  • 訂單在十分鐘之內(nèi)未支付則自動取消。

  • 會員續(xù)費(fèi)的定時推送

  • 用戶注冊成功后,如果三天內(nèi)沒有登陸則進(jìn)行短信提醒。

  • 預(yù)定會議后,需要在預(yù)定的時間點(diǎn)前十分鐘通知各個與會人員參加會議。

  • 優(yōu)惠券過期提醒

核心的應(yīng)用內(nèi)容基本都是基于需要設(shè)定過期時間的

RabbitMQ如何實(shí)現(xiàn)延時隊(duì)列

  • 方式1、通過RabbitMQ的高級特性TTL和配合死信隊(duì)列

  • 方式2、安裝rabbitmq_delayed_message_exchange插件

RabbitMQ中的高級特性TTL

TTL是什么呢?TTL是RabbitMQ中一個消息或者隊(duì)列的屬性,表明一條消息或者該隊(duì)列中的所有消息的最大存活時間,單位是毫秒,為什么延時隊(duì)列要介紹它?TTL就是一種消息過期策略。給我們的消息做過期處理,當(dāng)消息在隊(duì)列中存活了指定時候之后,改隊(duì)列就會將這個消息直接丟棄。在RabbitMQ中并沒有直接實(shí)現(xiàn)好的延時隊(duì)列,我們可以使用TTL這種高級特性,然后配合死信隊(duì)列,即可實(shí)現(xiàn)延時隊(duì)列的功能。

那么,如何設(shè)置這個TTL值呢?有兩種方式,第一種是在創(chuàng)建隊(duì)列的時候設(shè)置隊(duì)列的“x-message-ttl”屬性,如下: 方式一:

Map args = new HashMap();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

使用這種方式,消息被設(shè)定TTL,一旦消息過期,就會被隊(duì)列丟棄

方式二:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());

使用這種方式,消息即使過期,也不一定會被馬上丟棄,因?yàn)橄⑹欠襁^期是在即將投遞到消費(fèi)者之前判定的,如果當(dāng)前隊(duì)列有嚴(yán)重的消息積壓情況,則已過期的消息也許還能存活較長時間。

另外,還需要注意的一點(diǎn)是,如果不設(shè)置TTL,表示消息永遠(yuǎn)不會過期,如果將TTL設(shè)置為0,則表示除非此時可以直接投遞該消息到消費(fèi)者,否則該消息將會被丟棄。

RabbitMQ到底怎么實(shí)現(xiàn)延時隊(duì)列

  • 步驟一:創(chuàng)建一個正常的隊(duì)列,指定消息過期時間,并且指定消息過期后需要投遞的死信交換器和死信交換隊(duì)列。

  • 步驟二:創(chuàng)建死信隊(duì)列和死信交換器

RabbitMQ實(shí)現(xiàn)延時隊(duì)列實(shí)例

package com.example.demo;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * @author echo
 * @date 2021-01-14 14:35
 */
public class TopicDealProductTest {

    /**
     * 延時隊(duì)列交換機(jī)
     */
    private static final String DIRECT_EXCHANGE_DELAY = "dir_exchange_delay";
    /**
     * 死信隊(duì)列交換機(jī)
     */
    private static final String DIRECT_EXCHANGE_DEAD = "dir_exchange_dead";
    /**
     * 延時隊(duì)列
     */
    private static final String DIRECT_QUEUE_DELAY = "dir.queue.delay";
    /**
     * 死信隊(duì)列
     */
    private static final String DIRECT_QUEUE_DEAD = "dir.queue.dead";
    /**
     * 延時隊(duì)列ROUTING_KEY
     */
    private static final String DIRECT_DELAY_ROUTING_KEY = "delay.queue.routingKey";
    /**
     * 延時隊(duì)列ROUTING_KEY
     */
    private static final String DIRECT_DEAD_ROUTING_KEY = "dead.queue.routingKey";
    private static final String IP_ADDRESS = "192.168.230.131";
    private static final int PORT = 5672;


    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = createConnection();
        // 創(chuàng)建一個頻道
        Channel channel = connection.createChannel();
        sendMsg(channel);
        Thread.sleep(10000);
        closeConnection(connection, channel);
    }

    private static void sendMsg(Channel channel) throws IOException {

        // 創(chuàng)建延時隊(duì)列和延時交換器
        channel.exchangeDeclare(DIRECT_EXCHANGE_DELAY, BuiltinExchangeType.DIRECT);
        Map map = new HashMap<>(16);
        // 在延時交換器上指定死信交換器
        map.put("x-dead-letter-exchange", DIRECT_EXCHANGE_DEAD);
        // 在延時交換器上指定死信隊(duì)列的routing-key
        map.put("x-dead-letter-routing-key", DIRECT_DEAD_ROUTING_KEY);
        // 設(shè)定延時隊(duì)列的延長時長 10s
        map.put("x-message-ttl", 10000);
        // 創(chuàng)建延時隊(duì)列
        channel.queueDeclare(DIRECT_QUEUE_DELAY, true, false, false, map);
        // 在延時交換器上綁定延時隊(duì)列
        channel.queueBind(DIRECT_QUEUE_DELAY, DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY);

        // 創(chuàng)建死信隊(duì)列和死信交換器
        channel.exchangeDeclare(DIRECT_EXCHANGE_DEAD, BuiltinExchangeType.TOPIC, true, false, null);
        // 創(chuàng)建死信隊(duì)列
        channel.queueDeclare(DIRECT_QUEUE_DEAD, true, false, false, null);
        // 在死信交換器上綁定死信隊(duì)列
        channel.queueBind(DIRECT_QUEUE_DEAD, DIRECT_EXCHANGE_DEAD, DIRECT_DEAD_ROUTING_KEY);

        channel.basicPublish(DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY, null, "hello world".getBytes());

    }

    private static void closeConnection(Connection connection, Channel channel) throws IOException, TimeoutException {
        // 關(guān)閉資源
        channel.close();
        connection.close();
    }

    private static Connection createConnection() throws IOException, TimeoutException {
        // 創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置RabbitMQ的鏈接參數(shù)
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername("echo");
        factory.setPassword("123456");
        // 和RabbitMQ建立一個鏈接
        return factory.newConnection();
    }

}

到這里,其實(shí)我們不難發(fā)現(xiàn),我們無非是利用了TTL這個特性,讓消息在過期的時候丟棄到指定隊(duì)列,死信隊(duì)列其實(shí)也是一個普通隊(duì)列。

執(zhí)行之后,我們來看看結(jié)果,在Exchange里面,我們創(chuàng)建了兩個交換器和兩個隊(duì)列,但是兩個隊(duì)列和交換器還是有區(qū)別的,我們來看圖片

RabbitMQ如何實(shí)現(xiàn)延時隊(duì)列

我們可以看到兩個隊(duì)列的Features標(biāo)志是不一樣的

  • TTL: 消息在隊(duì)列中的過期時間

  • DLX: 該隊(duì)列綁定了死信交換器

  • DLK: 該隊(duì)列綁定的死信隊(duì)列的ROUTING_KEY

在我們執(zhí)行完成只有,我們可以看到,消息先被投遞到了delay,該隊(duì)列里面的消息,到達(dá)過期時間之后就被投遞到了dead隊(duì)列中去了。

那么我們上面介紹了TTL和設(shè)置AMQP.BasicProperties,這兩種有一定的區(qū)別,前一個是設(shè)置隊(duì)列消息過期時間,后一個是設(shè)定每條消息的過期時間。那他們的區(qū)別在哪里?

設(shè)置每條消息和設(shè)置TTL的區(qū)別

其實(shí)這兩種方式的區(qū)別就在于怎么判斷該消息是否要被丟棄。TTL設(shè)定的隊(duì)列,只要消息到達(dá)過期時間,立馬就會將消息丟棄。如果是后者,可能我們隊(duì)列里面有很多的消息,然后每條消息的過期時間又不一致,這個時候,如果隊(duì)列出口處堵了很多沒有設(shè)定過期時間的消息又不被消費(fèi)的時候,隊(duì)列后面的消息及時設(shè)定了過期時間也不會被丟棄,只有在設(shè)定了過期時間的消息到了隊(duì)列該消費(fèi)的位置,才會判定

怎么使用AMQP.BasicProperties?

package com.example.demo;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * @author echo
 * @date 2021-01-14 14:35
 */
public class TopicDealProductTest {

    /**
     * 延時隊(duì)列交換機(jī)
     */
    private static final String DIRECT_EXCHANGE_DELAY = "dir_exchange_delay";
    /**
     * 死信隊(duì)列交換機(jī)
     */
    private static final String DIRECT_EXCHANGE_DEAD = "dir_exchange_dead";
    /**
     * 延時隊(duì)列
     */
    private static final String DIRECT_QUEUE_DELAY = "dir.queue.delay";
    /**
     * 死信隊(duì)列
     */
    private static final String DIRECT_QUEUE_DEAD = "dir.queue.dead";
    /**
     * 延時隊(duì)列ROUTING_KEY
     */
    private static final String DIRECT_DELAY_ROUTING_KEY = "delay.queue.routingKey";
    /**
     * 延時隊(duì)列ROUTING_KEY
     */
    private static final String DIRECT_DEAD_ROUTING_KEY = "dead.queue.routingKey";
    private static final String IP_ADDRESS = "192.168.230.131";
    private static final int PORT = 5672;


    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = createConnection();
        // 創(chuàng)建一個頻道
        Channel channel = connection.createChannel();
        sendMsg(channel);
        Thread.sleep(10000);
        closeConnection(connection, channel);
    }

    private static void sendMsg(Channel channel) throws IOException {

        // 創(chuàng)建延時隊(duì)列和延時交換器
        channel.exchangeDeclare(DIRECT_EXCHANGE_DELAY, BuiltinExchangeType.DIRECT);
        Map map = new HashMap<>(16);
        // 在延時交換器上指定死信交換器
        map.put("x-dead-letter-exchange", DIRECT_EXCHANGE_DEAD);
        map.put("x-dead-letter-routing-key", DIRECT_DEAD_ROUTING_KEY);
        // 設(shè)定延時隊(duì)列的延長時長 10s
//        map.put("x-message-ttl", 10000);
        // 創(chuàng)建延時隊(duì)列
        channel.queueDeclare(DIRECT_QUEUE_DELAY, true, false, false, map);
        // 在延時交換器上綁定延時隊(duì)列
        channel.queueBind(DIRECT_QUEUE_DELAY, DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY);

        // 創(chuàng)建死信隊(duì)列和死信交換器
        channel.exchangeDeclare(DIRECT_EXCHANGE_DEAD, BuiltinExchangeType.TOPIC, true, false, null);
        // 創(chuàng)建死信隊(duì)列
        channel.queueDeclare(DIRECT_QUEUE_DEAD, true, false, false, null);
        // 在死信交換器上綁定死信隊(duì)列
        channel.queueBind(DIRECT_QUEUE_DEAD, DIRECT_EXCHANGE_DEAD, DIRECT_DEAD_ROUTING_KEY);

        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
        builder.expiration("10000");
        AMQP.BasicProperties properties = builder.build();
        channel.basicPublish(DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY, false, properties,  "hello world".getBytes());

    }

    private static void closeConnection(Connection connection, Channel channel) throws IOException, TimeoutException {
        // 關(guān)閉資源
        channel.close();
        connection.close();
    }

    private static Connection createConnection() throws IOException, TimeoutException {
        // 創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置RabbitMQ的鏈接參數(shù)
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername("echo");
        factory.setPassword("123456");
        // 和RabbitMQ建立一個鏈接
        return factory.newConnection();
    }

}

我們運(yùn)行完成成之后,可以看到和我們之前那一種方式的效果是一樣的

RabbitMQ如何實(shí)現(xiàn)延時隊(duì)列

兩種設(shè)定過期時間的方式其實(shí)區(qū)別就在于一個統(tǒng)一設(shè)定了過期時間,一個指定每條過期時間。但是并不影響我們延時隊(duì)列的實(shí)現(xiàn),那我們怎么選擇呢?

怎么選擇TTL設(shè)定方式?

根據(jù)兩種方式的特性來選定使用場景才是最合理的。我們?nèi)绻脕碜鲅訒r隊(duì)列的,想將延時隊(duì)列的特性應(yīng)用到實(shí)際場景的,并且對時時性要求比較高的,建議選擇第一種方式。

總結(jié)

延時隊(duì)列的實(shí)現(xiàn)并不難,關(guān)鍵是我們要知道他的一個原理,了解RabbitMQ他的TTL和死信對了。掌握了它的這些特性之后,我們就可以很好的應(yīng)用延時隊(duì)列。延時隊(duì)列在工作中對我們的幫組也非常大,不過RabbiTMQ沒有原生延時隊(duì)列,我們用這種方式實(shí)現(xiàn)了它并不意味著我們一定要選擇它。其實(shí)還有很多的方式,比如Java中的DelayQueu、kafka的時間輪等。

“RabbitMQ如何實(shí)現(xiàn)延時隊(duì)列”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!


網(wǎng)站標(biāo)題:RabbitMQ如何實(shí)現(xiàn)延時隊(duì)列
轉(zhuǎn)載來源:http://weahome.cn/article/jsddph.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部