這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)?lái)有關(guān)如何進(jìn)行RabbitMq的簡(jiǎn)單使用,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
創(chuàng)新互聯(lián)是一家從事企業(yè)網(wǎng)站建設(shè)、做網(wǎng)站、成都網(wǎng)站建設(shè)、行業(yè)門戶網(wǎng)站建設(shè)、網(wǎng)頁(yè)設(shè)計(jì)制作的專業(yè)的建站公司,擁有經(jīng)驗(yàn)豐富的網(wǎng)站建設(shè)工程師和網(wǎng)頁(yè)設(shè)計(jì)人員,具備各種規(guī)模與類型網(wǎng)站建設(shè)的實(shí)力,在網(wǎng)站建設(shè)領(lǐng)域樹立了自己獨(dú)特的設(shè)計(jì)風(fēng)格。自公司成立以來(lái)曾獨(dú)立設(shè)計(jì)制作的站點(diǎn)超過(guò)千家。
1.pom文件中加入依賴
org.springframework.boot spring-boot-starter-amqp 2.3.3.RELEASE
2.配置文件,配置mq
自動(dòng)配置信息 這里我開啟ACK消息確認(rèn)server.port=8088#服務(wù)器配置spring.application.name=rabbitmq-test-sending#rabbitmq連接參數(shù)spring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=guest spring.rabbitmq.password=guest# 開啟發(fā)送確認(rèn)spring.rabbitmq.publisher-confirms=true# 開啟發(fā)送失敗退回spring.rabbitmq.publisher-returns=true# 開啟ACKspring.rabbitmq.listener.direct.acknowledge-mode=manual
3.Rabbit配置類,使用topic交換器,使用通配符,一個(gè)交換器對(duì)應(yīng)多個(gè)queue
import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class RabbitmqConfig {//隊(duì)列 @Bean public Queue queueTest1(){return new Queue("queueTest1",true); }/* * 設(shè)置消息隊(duì)列的TTL(過(guò)期時(shí)間) * */ @Bean public Queue queueTest2(){/** * 隊(duì)列的名稱,是否持久化,是否獨(dú)享、排外的,是否自動(dòng)刪除,隊(duì)列的其他屬性參數(shù) * (1)x-message-ttl:消息的過(guò)期時(shí)間,單位:毫秒; * (2)x-expires:隊(duì)列過(guò)期時(shí)間,隊(duì)列在多長(zhǎng)時(shí)間未被訪問將被刪除,單位:毫秒; * (3)x-max-length:隊(duì)列最大長(zhǎng)度,超過(guò)該最大值,則將從隊(duì)列頭部開始刪除消息; * (4)x-max-length-bytes:隊(duì)列消息內(nèi)容占用最大空間,受限于內(nèi)存大小,超過(guò)該閾值則從隊(duì)列頭部開始刪除消息; * (5)x-overflow:設(shè)置隊(duì)列溢出行為。這決定了當(dāng)達(dá)到隊(duì)列的最大長(zhǎng)度時(shí)消息會(huì)發(fā)生什么。有效值是drop-head、reject-publish或reject-publish-dlx。 * (6)x-dead-letter-exchange:死信交換器名稱,過(guò)期或被刪除(因隊(duì)列長(zhǎng)度超長(zhǎng)或因空間超出閾值)的消息可指定發(fā)送到該交換器中; * (7)x-dead-letter-routing-key:死信消息路由鍵,在消息發(fā)送到死信交換器時(shí)會(huì)使用該路由鍵,如果不設(shè)置,則使用消息的原來(lái)的路由鍵值 * (8)x-single-active-consumer:表示隊(duì)列是否是單一活動(dòng)消費(fèi)者,true時(shí),注冊(cè)的消費(fèi)組內(nèi)只有一個(gè)消費(fèi)者消費(fèi)消息,其他被忽略,false時(shí)消息循環(huán)分發(fā)給所有消費(fèi)者(默認(rèn)false) * (9)x-max-priority:隊(duì)列要支持的最大優(yōu)先級(jí)數(shù);如果未設(shè)置,隊(duì)列將不支持消息優(yōu)先級(jí); * (10)x-queue-mode(Lazy mode):將隊(duì)列設(shè)置為延遲模式,在磁盤上保留盡可能多的消息,以減少RAM的使用;如果未設(shè)置,隊(duì)列將保留內(nèi)存緩存以盡可能快地傳遞消息; * (11)x-queue-master-locator:在集群模式下設(shè)置鏡像隊(duì)列的主節(jié)點(diǎn)信息。 */ Maparguments = new HashMap<>(); arguments.put("x-message-ttl", 5000);return new Queue("queueTest2", true, false, false, arguments); }//交換機(jī) @Bean public TopicExchange exchangeTest(){//可以傳exchange名字,是否支持持久化,是否可以自動(dòng)刪除 return new TopicExchange("exchangeTest",true,false); }@Bean public Binding bindQueueTest1AndExchange(){return BindingBuilder.bind(queueTest1()).to(exchangeTest()).with("phone.routing.*"); }@Bean public Binding bindQueueTest2AndExchange(){return BindingBuilder.bind(queueTest2()).to(exchangeTest()).with("web.routing.*"); } }
4.生產(chǎn)者
import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.util.Date;import java.util.UUID;/** 生產(chǎn)者,帶消息確認(rèn)* */@Servicepublic class PruSender implements RabbitTemplate.ReturnCallback {@Autowired private RabbitTemplate rabbitTemplate;//routing_key,把消息發(fā)送到相應(yīng)的隊(duì)列中 public void sendMessage(String routing_key){//發(fā)送內(nèi)容 String context = "你好現(xiàn)在是 " + new Date();this.rabbitTemplate.setReturnCallback(this);//發(fā)送失敗退回 this.rabbitTemplate.setConfirmCallback((correlationData,ack,message)->{//手動(dòng)發(fā)送消息確認(rèn) if(!ack){ System.out.println("消息發(fā)送失敗" + message + correlationData.toString()); }else{ System.out.println("消息發(fā)送成功" + correlationData.toString()); } }); CorrelationData correlationData = new CorrelationData(); correlationData.setId(UUID.randomUUID().toString());//交換機(jī)名稱、routingKey、內(nèi)容、消息Id this.rabbitTemplate.convertAndSend("exchangeTest",routing_key, context,correlationData); }@Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("sender return success" + message.toString() + "===" + i + "===" + s1 + "===" + s2); } }
5.消費(fèi)者
import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Service;import java.io.IOException;/** 消費(fèi)者,帶消息確認(rèn)** */@Service@RabbitListener(queues = "queueTest")public class Receiver {//消息內(nèi)容,通道,消息的屬性信息 @RabbitHandler public void immediateProcess(String text,Channel channel,Message message) throws IOException {try { System.out.println("Receiver" + text);/** * 手動(dòng)確認(rèn),通知mq已經(jīng)成功消費(fèi)改條信息,可以刪除了 * //消息的標(biāo)識(shí),false只確認(rèn)當(dāng)前一個(gè)消息收到,true確認(rèn)所有consumer獲得的消息 */ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { e.printStackTrace();/* *消費(fèi)消息失敗 * 第二個(gè)參數(shù)是否應(yīng)用于多消息,第三個(gè)參數(shù)是否從新計(jì)入隊(duì)列 * */ channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true); } } }
交換機(jī)類型:
Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列
Direct:定向,把消息交給符合指定routing key 的隊(duì)列
Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊(duì)列
上述就是小編為大家分享的如何進(jìn)行RabbitMq的簡(jiǎn)單使用了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。