這篇文章給大家介紹RabbitMQ中怎么實(shí)現(xiàn)延遲隊(duì)列,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
成都創(chuàng)新互聯(lián),為您提供網(wǎng)站建設(shè)公司、成都網(wǎng)站制作公司、網(wǎng)站營銷推廣、網(wǎng)站開發(fā)設(shè)計(jì),對服務(wù)三維植被網(wǎng)等多個(gè)行業(yè)擁有豐富的網(wǎng)站建設(shè)及推廣經(jīng)驗(yàn)。成都創(chuàng)新互聯(lián)網(wǎng)站建設(shè)公司成立于2013年,提供專業(yè)網(wǎng)站制作報(bào)價(jià)服務(wù),我們深知市場的競爭激烈,認(rèn)真對待每位客戶,為客戶提供賞心悅目的作品。 與客戶共同發(fā)展進(jìn)步,是我們永遠(yuǎn)的責(zé)任!
在 RabbitMQ 3.6.x 之前我們一般采用死信隊(duì)列+TTL過期時(shí)間來實(shí)現(xiàn)延遲隊(duì)列,我們這里不做過多介紹,可以參考之前文章來了解:TTL、死信隊(duì)列
在 RabbitMQ 3.6.x 開始,RabbitMQ 官方提供了延遲隊(duì)列的插件,可以下載放置到 RabbitMQ 根目錄下的 plugins 下。延遲隊(duì)列插件下載
首先我們創(chuàng)建交換機(jī)和消息隊(duì)列
import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class MQConfig { public static final String LAZY_EXCHANGE = "Ex.LazyExchange"; public static final String LAZY_QUEUE = "MQ.LazyQueue"; public static final String LAZY_KEY = "lazy.#"; @Bean public TopicExchange lazyExchange(){ //Map
我們在 Exchange 的聲明中可以設(shè)置exchange.setDelayed(true)來開啟延遲隊(duì)列,也可以設(shè)置為以下內(nèi)容傳入交換機(jī)聲明的方法中,因?yàn)榈谝环N方式的底層就是通過這種方式來實(shí)現(xiàn)的。
//Map
發(fā)送消息時(shí)我們需要指定延遲推送的時(shí)間,我們這里在發(fā)送消息的方法中傳入?yún)?shù) new MessagePostProcessor() 是為了獲得 Message對象,因?yàn)樾枰柚?Message對象的api 來設(shè)置延遲時(shí)間。
import com.anqi.mq.config.MQConfig;import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageDeliveryMode;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.Date;@Componentpublic class MQSender { @Autowired private RabbitTemplate rabbitTemplate; //confirmCallback returnCallback 代碼省略,請參照上一篇 public void sendLazy(Object message){ rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + 時(shí)間戳 全局唯一 CorrelationData correlationData = new CorrelationData("12345678909"+new Date()); //發(fā)送消息時(shí)指定 header 延遲時(shí)間 rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //設(shè)置消息持久化 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); //message.getMessageProperties().setHeader("x-delay", "6000"); message.getMessageProperties().setDelay(6000); return message; } }, correlationData); }}
我們可以觀察 setDelay(Integer i)底層代碼,也是在 header 中設(shè)置 x-delay。等同于我們手動(dòng)設(shè)置 header
message.getMessageProperties().setHeader("x-delay", "6000");
/** * Set the x-delay header. * @param delay the delay. * @since 1.6 */public void setDelay(Integer delay) { if (delay == null || delay < 0) { this.headers.remove(X_DELAY); } else { this.headers.put(X_DELAY, delay); }}
消費(fèi)端進(jìn)行消費(fèi)
import com.rabbitmq.client.Channel;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.stereotype.Component;import java.io.IOException;import java.util.Map;@Componentpublic class MQReceiver { @RabbitListener(queues = "MQ.LazyQueue") @RabbitHandler public void onLazyMessage(Message msg, Channel channel) throws IOException{ long deliveryTag = msg.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag, true); System.out.println("lazy receive " + new String(msg.getBody())); }
測試結(jié)果
import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest@RunWith(SpringRunner.class)public class MQSenderTest { @Autowired private MQSender mqSender; @Test public void sendLazy() throws Exception { String msg = "hello spring boot"; mqSender.sendLazy(msg + ":"); }}
果然在 6 秒后收到了消息 lazy receive hello spring boot:
關(guān)于RabbitMQ中怎么實(shí)現(xiàn)延遲隊(duì)列就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。