TTL是time to live 的簡稱,顧名思義指的是消息的存活時間。rabbitMq可以從兩種維度設置消息過期時間,分別是隊列和消息本身。 隊列消息過期時間-Per-Queue Message TTL: 通過設置隊列的x-message-ttl參數(shù)來設置指定隊列上消息的存活時間,其值是一個非負整數(shù),單位為微秒。不同隊列的過期時間互相之間沒有影響,即使是對于同一條消息。隊列中的消息存在隊列中的時間超過過期時間則成為死信。
創(chuàng)新互聯(lián)建站 - 服務器托管德陽,四川服務器租用,成都服務器租用,四川網(wǎng)通托管,綿陽服務器托管,德陽服務器托管,遂寧服務器托管,綿陽服務器托管,四川云主機,成都云主機,西南云主機,服務器托管德陽,西南服務器托管,四川/成都大帶寬,服務器機柜,四川老牌IDC服務商
隊列中的消息在以下三種情況下會變成死信 (1)消息被拒絕(basic.reject 或者 basic.nack),并且requeue=false; (2)消息的過期時間到期了; (3)隊列長度限制超過了。 當隊列中的消息成為死信以后,如果隊列設置了DLX那么消息會被發(fā)送到DLX。通過x-dead-letter-exchange設置DLX,通過這個x-dead-letter-routing-key設置消息發(fā)送到DLX所用的routing-key,如果不設置默認使用消息本身的routing-key.
@Bean
public Queue lindQueue() {
return QueueBuilder.durable(LIND_QUEUE)
.withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)//設置死信交換機
.withArgument("x-message-ttl", makeCallExpire)
.withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE)//設置死信routingKey
.build();
}
@Component
public class AmqpConfig {
/**
* 主要測試一個死信隊列,功能主要實現(xiàn)延時消費,原理是先把消息發(fā)到正常隊列,
* 正常隊列有超時時間,當達到時間后自動發(fā)到死信隊列,然后由消費者去消費死信隊列里的消息.
*/
public static final String LIND_EXCHANGE = "lind.exchange";
public static final String LIND_DL_EXCHANGE = "lind.dl.exchange";
public static final String LIND_QUEUE = "lind.queue";
public static final String LIND_DEAD_QUEUE = "lind.queue.dead";
public static final String LIND_FANOUT_EXCHANGE = "lindFanoutExchange";
/**
* 單位為微秒.
*/
@Value("${tq.makecall.expire:60000}")
private long makeCallExpire;
/**
* 創(chuàng)建普通交換機.
*/
@Bean
public TopicExchange lindExchange() {
return (TopicExchange) ExchangeBuilder.topicExchange(LIND_EXCHANGE).durable(true)
.build();
}
/**
* 創(chuàng)建死信交換機.
*/
@Bean
public TopicExchange lindExchangeDl() {
return (TopicExchange) ExchangeBuilder.topicExchange(LIND_DL_EXCHANGE).durable(true)
.build();
}
/**
* 創(chuàng)建普通隊列.
*/
@Bean
public Queue lindQueue() {
return QueueBuilder.durable(LIND_QUEUE)
.withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)//設置死信交換機
.withArgument("x-message-ttl", makeCallExpire)
.withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE)//設置死信routingKey
.build();
}
/**
* 創(chuàng)建死信隊列.
*/
@Bean
public Queue lindDelayQueue() {
return QueueBuilder.durable(LIND_DEAD_QUEUE).build();
}
/**
* 綁定死信隊列.
*/
@Bean
public Binding bindDeadBuilders() {
return BindingBuilder.bind(lindDelayQueue())
.to(lindExchangeDl())
.with(LIND_DEAD_QUEUE);
}
/**
* 綁定普通隊列.
*
* @return
*/
@Bean
public Binding bindBuilders() {
return BindingBuilder.bind(lindQueue())
.to(lindExchange())
.with(LIND_QUEUE);
}
/**
* 廣播交換機.
*
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(LIND_FANOUT_EXCHANGE);
}
}
//-----------------
@Component
public class Publisher {
@Autowired
private RabbitTemplate rabbitTemplate;
public void publish(String message) {
try {
rabbitTemplate
.convertAndSend(AmqpConfig.LIND_EXCHANGE, AmqpConfig.LIND_DELAY_QUEUE,
message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
//-----------------
@Component
@Slf4j
public class Subscriber {
@RabbitListener(queues = AmqpConfig.LIND_QUEUE)
public void customerSign(String data) {
try {
log.info("從隊列拿到數(shù)據(jù) :{}", data);
} catch (Exception ex) {
e.printStackTrace();
}
}
}