本篇內(nèi)容介紹了“RabbitMQ如何實(shí)現(xiàn)延時隊(duì)列”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
創(chuàng)新互聯(lián)是一家專業(yè)提供惠安企業(yè)網(wǎng)站建設(shè),專注與成都網(wǎng)站設(shè)計(jì)、網(wǎng)站制作、HTML5建站、小程序制作等業(yè)務(wù)。10年已為惠安眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)絡(luò)公司優(yōu)惠進(jìn)行中。
延時隊(duì)列:顧名思義,是一個用于做消息延時消費(fèi)的隊(duì)列。但是它也是一個普通隊(duì)列,所以它具備普通隊(duì)列的特性,相比之下,延時的特性就是它最大的特點(diǎn)。所謂的延時就是將我們需要的消息,延遲多久之后被消費(fèi)。普通隊(duì)列是即時消費(fèi)的,延時隊(duì)列是根據(jù)延時時間,多久之后才能消費(fèi)的。
訂單在十分鐘之內(nèi)未支付則自動取消。
會員續(xù)費(fèi)的定時推送
用戶注冊成功后,如果三天內(nèi)沒有登陸則進(jìn)行短信提醒。
預(yù)定會議后,需要在預(yù)定的時間點(diǎn)前十分鐘通知各個與會人員參加會議。
優(yōu)惠券過期提醒
核心的應(yīng)用內(nèi)容基本都是基于需要設(shè)定過期時間的
方式1、通過RabbitMQ的高級特性TTL和配合死信隊(duì)列
方式2、安裝rabbitmq_delayed_message_exchange插件
TTL是什么呢?TTL是RabbitMQ中一個消息或者隊(duì)列的屬性,表明一條消息或者該隊(duì)列中的所有消息的最大存活時間,單位是毫秒,為什么延時隊(duì)列要介紹它?TTL就是一種消息過期策略。給我們的消息做過期處理,當(dāng)消息在隊(duì)列中存活了指定時候之后,改隊(duì)列就會將這個消息直接丟棄。在RabbitMQ中并沒有直接實(shí)現(xiàn)好的延時隊(duì)列,我們可以使用TTL這種高級特性,然后配合死信隊(duì)列,即可實(shí)現(xiàn)延時隊(duì)列的功能。
那么,如何設(shè)置這個TTL值呢?有兩種方式,第一種是在創(chuàng)建隊(duì)列的時候設(shè)置隊(duì)列的“x-message-ttl”屬性,如下: 方式一:
Mapargs = new HashMap (); args.put("x-message-ttl", 6000); channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
使用這種方式,消息被設(shè)定TTL,一旦消息過期,就會被隊(duì)列丟棄
方式二:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("6000"); AMQP.BasicProperties properties = builder.build(); channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());
使用這種方式,消息即使過期,也不一定會被馬上丟棄,因?yàn)橄⑹欠襁^期是在即將投遞到消費(fèi)者之前判定的,如果當(dāng)前隊(duì)列有嚴(yán)重的消息積壓情況,則已過期的消息也許還能存活較長時間。
另外,還需要注意的一點(diǎn)是,如果不設(shè)置TTL,表示消息永遠(yuǎn)不會過期,如果將TTL設(shè)置為0,則表示除非此時可以直接投遞該消息到消費(fèi)者,否則該消息將會被丟棄。
步驟一:創(chuàng)建一個正常的隊(duì)列,指定消息過期時間,并且指定消息過期后需要投遞的死信交換器和死信交換隊(duì)列。
步驟二:創(chuàng)建死信隊(duì)列和死信交換器
package com.example.demo; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; /** * @author echo * @date 2021-01-14 14:35 */ public class TopicDealProductTest { /** * 延時隊(duì)列交換機(jī) */ private static final String DIRECT_EXCHANGE_DELAY = "dir_exchange_delay"; /** * 死信隊(duì)列交換機(jī) */ private static final String DIRECT_EXCHANGE_DEAD = "dir_exchange_dead"; /** * 延時隊(duì)列 */ private static final String DIRECT_QUEUE_DELAY = "dir.queue.delay"; /** * 死信隊(duì)列 */ private static final String DIRECT_QUEUE_DEAD = "dir.queue.dead"; /** * 延時隊(duì)列ROUTING_KEY */ private static final String DIRECT_DELAY_ROUTING_KEY = "delay.queue.routingKey"; /** * 延時隊(duì)列ROUTING_KEY */ private static final String DIRECT_DEAD_ROUTING_KEY = "dead.queue.routingKey"; private static final String IP_ADDRESS = "192.168.230.131"; private static final int PORT = 5672; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = createConnection(); // 創(chuàng)建一個頻道 Channel channel = connection.createChannel(); sendMsg(channel); Thread.sleep(10000); closeConnection(connection, channel); } private static void sendMsg(Channel channel) throws IOException { // 創(chuàng)建延時隊(duì)列和延時交換器 channel.exchangeDeclare(DIRECT_EXCHANGE_DELAY, BuiltinExchangeType.DIRECT); Mapmap = new HashMap<>(16); // 在延時交換器上指定死信交換器 map.put("x-dead-letter-exchange", DIRECT_EXCHANGE_DEAD); // 在延時交換器上指定死信隊(duì)列的routing-key map.put("x-dead-letter-routing-key", DIRECT_DEAD_ROUTING_KEY); // 設(shè)定延時隊(duì)列的延長時長 10s map.put("x-message-ttl", 10000); // 創(chuàng)建延時隊(duì)列 channel.queueDeclare(DIRECT_QUEUE_DELAY, true, false, false, map); // 在延時交換器上綁定延時隊(duì)列 channel.queueBind(DIRECT_QUEUE_DELAY, DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY); // 創(chuàng)建死信隊(duì)列和死信交換器 channel.exchangeDeclare(DIRECT_EXCHANGE_DEAD, BuiltinExchangeType.TOPIC, true, false, null); // 創(chuàng)建死信隊(duì)列 channel.queueDeclare(DIRECT_QUEUE_DEAD, true, false, false, null); // 在死信交換器上綁定死信隊(duì)列 channel.queueBind(DIRECT_QUEUE_DEAD, DIRECT_EXCHANGE_DEAD, DIRECT_DEAD_ROUTING_KEY); channel.basicPublish(DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY, null, "hello world".getBytes()); } private static void closeConnection(Connection connection, Channel channel) throws IOException, TimeoutException { // 關(guān)閉資源 channel.close(); connection.close(); } private static Connection createConnection() throws IOException, TimeoutException { // 創(chuàng)建連接工廠 ConnectionFactory factory = new ConnectionFactory(); // 設(shè)置RabbitMQ的鏈接參數(shù) factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername("echo"); factory.setPassword("123456"); // 和RabbitMQ建立一個鏈接 return factory.newConnection(); } }
到這里,其實(shí)我們不難發(fā)現(xiàn),我們無非是利用了TTL這個特性,讓消息在過期的時候丟棄到指定隊(duì)列,死信隊(duì)列其實(shí)也是一個普通隊(duì)列。
執(zhí)行之后,我們來看看結(jié)果,在Exchange里面,我們創(chuàng)建了兩個交換器和兩個隊(duì)列,但是兩個隊(duì)列和交換器還是有區(qū)別的,我們來看圖片
我們可以看到兩個隊(duì)列的Features標(biāo)志是不一樣的
TTL: 消息在隊(duì)列中的過期時間
DLX: 該隊(duì)列綁定了死信交換器
DLK: 該隊(duì)列綁定的死信隊(duì)列的ROUTING_KEY
在我們執(zhí)行完成只有,我們可以看到,消息先被投遞到了delay,該隊(duì)列里面的消息,到達(dá)過期時間之后就被投遞到了dead隊(duì)列中去了。
那么我們上面介紹了TTL和設(shè)置AMQP.BasicProperties,這兩種有一定的區(qū)別,前一個是設(shè)置隊(duì)列消息過期時間,后一個是設(shè)定每條消息的過期時間。那他們的區(qū)別在哪里?
其實(shí)這兩種方式的區(qū)別就在于怎么判斷該消息是否要被丟棄。TTL設(shè)定的隊(duì)列,只要消息到達(dá)過期時間,立馬就會將消息丟棄。如果是后者,可能我們隊(duì)列里面有很多的消息,然后每條消息的過期時間又不一致,這個時候,如果隊(duì)列出口處堵了很多沒有設(shè)定過期時間的消息又不被消費(fèi)的時候,隊(duì)列后面的消息及時設(shè)定了過期時間也不會被丟棄,只有在設(shè)定了過期時間的消息到了隊(duì)列該消費(fèi)的位置,才會判定
package com.example.demo; import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; /** * @author echo * @date 2021-01-14 14:35 */ public class TopicDealProductTest { /** * 延時隊(duì)列交換機(jī) */ private static final String DIRECT_EXCHANGE_DELAY = "dir_exchange_delay"; /** * 死信隊(duì)列交換機(jī) */ private static final String DIRECT_EXCHANGE_DEAD = "dir_exchange_dead"; /** * 延時隊(duì)列 */ private static final String DIRECT_QUEUE_DELAY = "dir.queue.delay"; /** * 死信隊(duì)列 */ private static final String DIRECT_QUEUE_DEAD = "dir.queue.dead"; /** * 延時隊(duì)列ROUTING_KEY */ private static final String DIRECT_DELAY_ROUTING_KEY = "delay.queue.routingKey"; /** * 延時隊(duì)列ROUTING_KEY */ private static final String DIRECT_DEAD_ROUTING_KEY = "dead.queue.routingKey"; private static final String IP_ADDRESS = "192.168.230.131"; private static final int PORT = 5672; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = createConnection(); // 創(chuàng)建一個頻道 Channel channel = connection.createChannel(); sendMsg(channel); Thread.sleep(10000); closeConnection(connection, channel); } private static void sendMsg(Channel channel) throws IOException { // 創(chuàng)建延時隊(duì)列和延時交換器 channel.exchangeDeclare(DIRECT_EXCHANGE_DELAY, BuiltinExchangeType.DIRECT); Mapmap = new HashMap<>(16); // 在延時交換器上指定死信交換器 map.put("x-dead-letter-exchange", DIRECT_EXCHANGE_DEAD); map.put("x-dead-letter-routing-key", DIRECT_DEAD_ROUTING_KEY); // 設(shè)定延時隊(duì)列的延長時長 10s // map.put("x-message-ttl", 10000); // 創(chuàng)建延時隊(duì)列 channel.queueDeclare(DIRECT_QUEUE_DELAY, true, false, false, map); // 在延時交換器上綁定延時隊(duì)列 channel.queueBind(DIRECT_QUEUE_DELAY, DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY); // 創(chuàng)建死信隊(duì)列和死信交換器 channel.exchangeDeclare(DIRECT_EXCHANGE_DEAD, BuiltinExchangeType.TOPIC, true, false, null); // 創(chuàng)建死信隊(duì)列 channel.queueDeclare(DIRECT_QUEUE_DEAD, true, false, false, null); // 在死信交換器上綁定死信隊(duì)列 channel.queueBind(DIRECT_QUEUE_DEAD, DIRECT_EXCHANGE_DEAD, DIRECT_DEAD_ROUTING_KEY); AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("10000"); AMQP.BasicProperties properties = builder.build(); channel.basicPublish(DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY, false, properties, "hello world".getBytes()); } private static void closeConnection(Connection connection, Channel channel) throws IOException, TimeoutException { // 關(guān)閉資源 channel.close(); connection.close(); } private static Connection createConnection() throws IOException, TimeoutException { // 創(chuàng)建連接工廠 ConnectionFactory factory = new ConnectionFactory(); // 設(shè)置RabbitMQ的鏈接參數(shù) factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername("echo"); factory.setPassword("123456"); // 和RabbitMQ建立一個鏈接 return factory.newConnection(); } }
我們運(yùn)行完成成之后,可以看到和我們之前那一種方式的效果是一樣的
兩種設(shè)定過期時間的方式其實(shí)區(qū)別就在于一個統(tǒng)一設(shè)定了過期時間,一個指定每條過期時間。但是并不影響我們延時隊(duì)列的實(shí)現(xiàn),那我們怎么選擇呢?
根據(jù)兩種方式的特性來選定使用場景才是最合理的。我們?nèi)绻脕碜鲅訒r隊(duì)列的,想將延時隊(duì)列的特性應(yīng)用到實(shí)際場景的,并且對時時性要求比較高的,建議選擇第一種方式。
延時隊(duì)列的實(shí)現(xiàn)并不難,關(guān)鍵是我們要知道他的一個原理,了解RabbitMQ他的TTL和死信對了。掌握了它的這些特性之后,我們就可以很好的應(yīng)用延時隊(duì)列。延時隊(duì)列在工作中對我們的幫組也非常大,不過RabbiTMQ沒有原生延時隊(duì)列,我們用這種方式實(shí)現(xiàn)了它并不意味著我們一定要選擇它。其實(shí)還有很多的方式,比如Java中的DelayQueu、kafka的時間輪等。
“RabbitMQ如何實(shí)現(xiàn)延時隊(duì)列”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!