真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網站制作重慶分公司

RabbitMQ學習筆記

RabbitMQ

整合RabbitMQ

/**
 * 使用RabbitMQ
 *  1、引入ampq場景,RabbitAutoConfiguration 就會自動生效
 *  2、給容器中自動配置了
 *      RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate
 *      所有的屬性都是在
 *          @EnableConfigurationProperties(RabbitProperties.class)
 *          @ConfigurationProperties(prefix = "spring.rabbitmq")
 *          public class RabbitProperties
 *  3、給配置文件中配置 spring.rabbitmq 信息
 *  4、@EnableRabbit 開啟功能
 *  5、監(jiān)聽消息:使用 @RabbitListener,必須有 @EnableRabbit
 *      @RabbitListener:類 + 方法上
 *      @RabbitHandler: 只能標在方法上
 */

    org.springframework.boot
    spring-boot-starter-amqp

# rabbit 配置文件
spring.rabbitmq.host=192.168.106.101
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

測試

創(chuàng)新互聯(lián)建站網站建設公司,提供成都網站設計、成都網站制作,網頁設計,建網站,PHP網站建設等專業(yè)做網站服務;可快速的進行網站開發(fā)網頁制作和功能擴展;專業(yè)做搜索引擎喜愛的網站,是專業(yè)的做網站團隊,希望更多企業(yè)前來合作!

package com.atguigu.gulimall.order;

import com.atguigu.gulimall.order.entity.OrderReturnApplyEntity;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Date;


@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class GulimallOrderApplicationTests {

    @Autowired
    AmqpAdmin amqpAdmin;

    @Autowired
    RabbitTemplate rabbitTemplate;
    /**
     * 1、創(chuàng)建Exchange[hello.java.exchange]、Queue、Binding
     *      - 使用 AmqpAdmin 進行創(chuàng)建
     *
     * 2、如何收發(fā)消息 -> RabbitTemplate
     *      如果發(fā)送的消息是個對象,使用序列化機制,將對象寫出去,對象實現(xiàn) Serializable 接口
     *      自定義序列化添加配置
     *      @Configuration
     *      public class MyRabbitConfig {
     *          @Bean
     *          public MessageConverter messageConverter() {
     *              return new Jackson2JsonMessageConverter();
     *           }
     *      }
     */

    @Test
    public void sendMessageTest() {
        String msg = "Hello World";
        OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity();
        orderReturnApplyEntity.setId(1L);
        orderReturnApplyEntity.setSkuName("華為");
        orderReturnApplyEntity.setCreateTime(new Date());
        rabbitTemplate.convertAndSend("hello.java.exchange", "hello.java", orderReturnApplyEntity);
        log.info("消息發(fā)送完成:{}", orderReturnApplyEntity);
    }

    @Test
    public void createExchange() {
        //amqpAdmin
        /**
         * DirectExchange
         * public DirectExchange(String name, boolean durable, boolean autoDelete, Map arguments)
         */
        DirectExchange exchange = new DirectExchange("hello.java.exchange", true,false);
        amqpAdmin.declareExchange(exchange);
        log.info("Exchange[{}]創(chuàng)建成功", "hello.java.exchange");
    }

    @Test
    public void createQueue() {
        /**
         * public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)
         */
        Queue queue = new Queue("hello-java-queue", true, false,true);
        amqpAdmin.declareQueue(queue);
        log.info("Queue[{}]創(chuàng)建成功", "hello-java-queue");
    }

    @Test
    public void createBinding() {
        /**
         * public Binding(String destination【目的地】,
         * DestinationType destinationType【目的地類型】,
         * String exchange【交換機】,
         * String routingKey【路由鍵】,
         * Map arguments)【參數】
         * 將 exchange 指定交換機和 destination目的地進行綁定,使用routingKey作為指定路由鍵
         */
        Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,"hello.java.exchange","hello.java",null);
        amqpAdmin.declareBinding(binding);
        log.info("Binding == 創(chuàng)建成功");
    }
}

測試監(jiān)聽消息

/**
 * queues:聲明需要監(jiān)聽的所欲隊列
 *
 * org.springframework.amqp.core.Message;
 *
 * 參數可以寫以下類型
 *  1、Message message;原生消息詳細信息,頭 + 體
 *  2、T<發(fā)送的消息的類型> OrderReturnApplyEntity content
 *  3、Channel channel:當前傳輸數據的通道
 *
 *  Queue:可以很多人都來監(jiān)聽,只要收到消息,隊列刪除消息,而且只有一個人收到此消息
 *      1、訂單服務啟動多個:同一個消息,只能有一個客戶端收到
 *      2、只有一個消息完全處理完,方法運行結束,我們就可以接受到下一個消息
 */
//@RabbitListener(queues = {"hello-java-queue"})
@RabbitHander
public void receiveMessage(Message message, OrderReturnReasonEntity content) {
    System.out.println("接收到消息....:"+ message + "===>內容;" + content + "類型是:" + message.getClass());
    byte[] body = message.getBody();
    //消息頭屬性信息
    MessageProperties properties = message.getMessageProperties();
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println("消息處理完成=》" + content.getName());
}

@RabbitListener

簡介:

1.用于標注在監(jiān)聽類或監(jiān)聽方法上,接收消息,需要指定監(jiān)聽的隊列(數組)
2.使用該注解之前,需要在啟動類加上該注解:@EnableRabbit
3.@RabbitListener即可以標注在方法上又可以標注在類上
	標注在類上:表示該類是監(jiān)聽類,使得@RabbitHandler注解生效
	標注在方法上:表示該方法時監(jiān)聽方法,會監(jiān)聽指定隊列獲得消息
4.一般只標注在方法上,并配合@RabbitHandler使用,重載的方式接收不同消息對象

@RabbitHandler

作用:

配合@RabbitListener,使用方法重載的方法接收不同的消息類型

簡介:

1.用于標注在監(jiān)聽方法上,接收消息,不需要指定監(jiān)聽的隊列
2.使用該注解之前,需要在啟動類加上該注解:@EnableRabbit
3.@RabbitListener只可以標注在方法,重載的方式接收不同消息對象

發(fā)送端消息確認配置

1、配置

2、定制 RabbitTemplate,設置確認回調

# rabbit 配置文件
spring.rabbitmq.host=192.168.106.101
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

# 開啟發(fā)送端確認
spring.rabbitmq.publisher-confirms=true
#開啟發(fā)送端消息抵達確認
spring.rabbitmq.publisher-returns=true
#只要抵達隊列。以異步發(fā)送優(yōu)先回調returnconfirm
spring.rabbitmq.template.mandatory=true

# 手動ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual

package com.atguigu.gulimall.order.config;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;

@Configuration
public class MyRabbitConfig {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 定制 rabbitTemplate
     * 1、服務收到消息就回調
     *      1、spring.rabbitmq.publisher-confirms=true
     *      2、設置確認回調ConfirmCallback
     * 2、消息正確地打隊列進行回調
     *      1、spring.rabbitmq.publisher-returns=true
     *         spring.rabbitmq.template.mandatory=true
     *      2、設置消息抵達隊列的回調
     * 3、消費端確認【保證每一個消息被正確消費,此時才可以讓broker刪除】
     *      1、默認是自動確認,只要消息接受到,自動確認,服務端就會移除這個消息
     *      2、手動確認默認,只要沒有明確告訴MQ,貨物被簽收,沒有ACK,消息一直是unacked狀態(tài)。
     *          即使Cosumer宕機,消息也不會丟失,會重新變成Ready,等待下一次新的consumer鏈接發(fā)給他
     *      3、如果手動確認:Channel channel -> long deliveryTag = properties.getDeliveryTag(); -> channel.basicAck(deliveryTag, false);
     *          channel.basicAck(deliveryTag, false);           簽收
     *          channel.basicNack(deliveryTag, false, true);    拒簽
     */
    @PostConstruct // MyRabbitConfig 對象創(chuàng)建完成以后執(zhí)行這個方法
    public void initRabbitTemplate(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 只要抵達服務器,ack就確認為true
             * @param correlationData 當前消息的唯一關聯(lián)數據(消息的唯一id)
             * @param ack 是否成功或者失敗
             * @param cause 失敗的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm..." + correlationData + "==> ack:" + ack + "==> cause:" + cause);
            }
        });

        //設置消息抵達隊列的回調
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * 只要消息沒有投遞給指定的隊列,就觸發(fā)失敗回調
             * @param message   投遞失敗的消息詳細信息
             * @param replyCode 回復的狀態(tài)碼
             * @param replyText 回復的文本內容
             * @param exchange  消息發(fā)給那個交換機
             * @param routingKey 當時這個消息使用哪個路由鍵
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("Fail Message:" + message + "==> replyTest:" + replyText + "==>exchange" + exchange + "==>routingKey:" + routingKey);
            }
        });
    }

}

/**
 * queues:聲明需要監(jiān)聽的所欲隊列
 * 

* org.springframework.amqp.core.Message; *

* 參數可以寫以下類型 * 1、Message message;原生消息詳細信息,頭 + 體 * 2、T<發(fā)送的消息的類型> OrderReturnApplyEntity content * 3、Channel channel:當前傳輸數據的通道 *

* Queue:可以很多人都來監(jiān)聽,只要收到消息,隊列刪除消息,而且只有一個人收到此消息 * 1、訂單服務啟動多個:同一個消息,只能有一個客戶端收到 * 2、只有一個消息完全處理完,方法運行結束,我們就可以接受到下一個消息 */ @RabbitListener(queues = {"hello-java-queue"}) public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel) throws IOException { //System.out.println("接收到消息....:"+ message + "===>內容;" + content + "類型是:" + message.getClass()); System.out.println("接收到消息....:" + content); byte[] body = message.getBody(); //消息頭屬性信息 MessageProperties properties = message.getMessageProperties(); /*try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); }*/ System.out.println("消息處理完成=》" + content.getName()); long deliveryTag = properties.getDeliveryTag(); System.out.println("deliverTag: " + deliveryTag); if (deliveryTag % 2 == 0) { //收貨 // 簽收獲取,非批量模式 channel.basicAck(deliveryTag, false); } else { //requeue 重新入隊 //basicNack(long deliveryTag, boolean multiple, boolean requeue) channel.basicNack(deliveryTag, false, true); System.out.println("沒有簽收的貨物....." + deliveryTag); } }

最終整合

1.導入mq依賴


    org.springframework.boot
    spring-boot-starter-amqp


2.ware模塊導入配置
spring:
  rabbitmq:
    host: 192.168.56.10
    port: 5672
    # 虛擬主機
    virtual-host: /
    # 開啟發(fā)送端發(fā)送確認,無論是否到達broker都會觸發(fā)回調【發(fā)送端確認機制+本地事務表】
    publisher-confirm-type: correlated
    # 開啟發(fā)送端抵達隊列確認,消息未被隊列接收時觸發(fā)回調【發(fā)送端確認機制+本地事務表】
    publisher-returns: true
    # 消息在沒有被隊列接收時是否強行退回
    template:
      mandatory: true
    # 消費者手動確認模式,關閉自動確認,否則會消息丟失
    listener:
      simple:
        acknowledge-mode: manual

3.添加注解
// 開啟rabbit
@EnableRabbit

4.創(chuàng)建配置類
/**
 * @Author: wanzenghui
 * @Date: 2021/12/15 0:04
 */
@Configuration
public class MyRabbitConfig {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Bean
    public MessageConverter messageConverter() {
        // 使用json序列化器來序列化消息,發(fā)送消息時,消息對象會被序列化成json格式
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 定制RabbitTemplate
     * 1、服務收到消息就會回調
     * 1、spring.rabbitmq.publisher-confirms: true
     * 2、設置確認回調
     * 2、消息正確抵達隊列就會進行回調
     * 1、spring.rabbitmq.publisher-returns: true
     * spring.rabbitmq.template.mandatory: true
     * 2、設置確認回調ReturnCallback
     * 

* 3、消費端確認(保證每個消息都被正確消費,此時才可以broker刪除這個消息) */ @PostConstruct // (MyRabbitConfig對象創(chuàng)建完成以后,執(zhí)行這個方法) public void initRabbitTemplate() { /** * 發(fā)送消息觸發(fā)confirmCallback回調 * @param correlationData:當前消息的唯一關聯(lián)數據(如果發(fā)送消息時未指定此值,則回調時返回null) * @param ack:消息是否成功收到(ack=true,消息抵達Broker) * @param cause:失敗的原因 */ rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { System.out.println("發(fā)送消息觸發(fā)confirmCallback回調" + "\ncorrelationData ===> " + correlationData + "\nack ===> " + ack + "" + "\ncause ===> " + cause); System.out.println("================================================="); }); /** * 消息未到達隊列觸發(fā)returnCallback回調 * 只要消息沒有投遞給指定的隊列,就觸發(fā)這個失敗回調 * @param message:投遞失敗的消息詳細信息 * @param replyCode:回復的狀態(tài)碼 * @param replyText:回復的文本內容 * @param exchange:接收消息的交換機 * @param routingKey:接收消息的路由鍵 */ rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 需要修改數據庫 消息的狀態(tài)【后期定期重發(fā)消息】 System.out.println("消息未到達隊列觸發(fā)returnCallback回調" + "\nmessage ===> " + message + "\nreplyCode ===> " + replyCode + "\nreplyText ===> " + replyText + "\nexchange ===> " + exchange + "\nroutingKey ===> " + routingKey); System.out.println("=================================================="); }); } } 5.創(chuàng)建ware解鎖庫存的延時隊列、死信隊列、交換機、綁定關系 /** * 創(chuàng)建隊列,交換機,延時隊列,綁定關系 的configuration * 1.Broker中的Queue、Exchange、Binding不存在的情況下,會自動創(chuàng)建(在RabbitMQ),不會重復創(chuàng)建覆蓋 * 2.懶加載,只有第一次使用的時候才會創(chuàng)建(例如監(jiān)聽隊列) */ @Configuration public class MyRabbitMQConfig { /** * 用于首次創(chuàng)建隊列、交換機、綁定關系的監(jiān)聽 * @param message */ @RabbitListener(queues = "stock.release.stock.queue") public void handle(Message message) { } /** * 交換機 * Topic,可以綁定多個隊列 */ @Bean public Exchange stockEventExchange() { //String name, boolean durable, boolean autoDelete, Map arguments return new TopicExchange("stock-event-exchange", true, false); } /** * 死信隊列 */ @Bean public Queue stockReleaseStockQueue() { //String name, boolean durable, boolean exclusive, boolean autoDelete, Map arguments return new Queue("stock.release.stock.queue", true, false, false); } /** * 延時隊列 */ @Bean public Queue stockDelay() { HashMap arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "stock-event-exchange"); arguments.put("x-dead-letter-routing-key", "stock.release"); // 消息過期時間 2分鐘 arguments.put("x-message-ttl", 120000); return new Queue("stock.delay.queue", true, false, false,arguments); } /** * 綁定:交換機與死信隊列 */ @Bean public Binding stockLocked() { //String destination, DestinationType destinationType, String exchange, String routingKey, // Map arguments return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.release.#", null); } /** * 綁定:交換機與延時隊列 */ @Bean public Binding stockLockedBinding() { return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.locked", null); } }


分享題目:RabbitMQ學習筆記
標題鏈接:http://weahome.cn/article/dsoioed.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部