這篇文章將為大家詳細講解有關RabbitMQ怎么用,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
專注于為中小企業(yè)提供成都網(wǎng)站建設、成都網(wǎng)站設計服務,電腦端+手機端+微信端的三站合一,更高效的管理,為中小企業(yè)內(nèi)丘免費做網(wǎng)站提供優(yōu)質(zhì)的服務。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了超過千家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設實現(xiàn)規(guī)模擴充和轉(zhuǎn)變。
由于項目原因,之后會和RabbitMQ比較多的打交道,所以讓我們來好好整理下RabbitMQ的應用實戰(zhàn)技巧,盡量避免日后的采坑
RabbitMQ有幾個重要的概念:虛擬主機,交換機,隊列和綁定
虛擬主機:一個虛擬主機持有一組交換機、隊列和綁定,我們可以從虛擬主機層面的顆粒度進行權限控制
交換機:Exchange用于轉(zhuǎn)發(fā)消息,它并不存儲消息,如果沒有Queue隊列綁定到Exchange,它會直接丟棄掉生產(chǎn)者發(fā)來的數(shù)據(jù)。
交換機還有個關聯(lián)的重要概念:路由鍵,消息轉(zhuǎn)發(fā)到哪個隊列根據(jù)路由鍵決定
綁定:就是綁定交換機和隊列,它是多對多的關系,也就是說多個交換機可以綁同一個隊列,也可以一個交換機綁多個隊列
交換機有四種類型的模式Direct, topic, Headers and Fanout
Direct模式使用的是RabbitMQ的默認交換機,也是最簡單的模式,適合比較簡單的場景
如下圖所示,使用Direct模式,我們需要創(chuàng)建不同的隊列,而默認交換機則通過Routing key
路由鍵的值來決定轉(zhuǎn)發(fā)到哪個隊列,可以看到,路由鍵綁定隊列是可以指定多個的
Topic模式主要是根據(jù)通配符匹配,也就類似于模糊匹配,當這種匹配模式和路由鍵匹配后交換機就能轉(zhuǎn)發(fā)消息到指定隊列
路由鍵為一串字符串,由句號(.
)隔開,比如a.b.c
(*
)代表指定位置一個單詞,(#
)代表零個或者多個單詞,比如a.*.b.#
,表示a和b中間隨意填個單詞,b后面可以跟n個單詞,比如a.x.b.c.d.e
Topic模式和Direct模式的區(qū)別在于交換機需要自己指定,路由鍵支持模糊匹配,例如:
rabbitTemplate.convertAndSend("topicExchange","a.x.b.d", " hello world!");
Headers也是根據(jù)規(guī)則匹配,但它不是根據(jù)路由鍵了,headers有個自定義匹配規(guī)則,它將匹配鍵值設在了消息的headers屬性上,當這些鍵值對有一對或者全部匹配時,消息才會被投遞到對應隊列,這種模式效率相對較低,一般不推薦使用
Fanout即為大名鼎鼎的廣播模式了,它不需要管路由鍵,會把消息發(fā)給綁定它的全部隊列,就算配置了路由鍵也會被忽略
首先我們Direct模式,一個生產(chǎn)者一個消費者的情況,也就對應了一個發(fā)送者和一個隊列A接收,這是沒有疑問的,發(fā)送什么接收什么
當Direct模式,一個生產(chǎn)者發(fā)消息,開啟多個消費者也就是多個相同queue,此時消息由多個消費者均勻分攤,不會重復消費(前提ack正常)
當Topic模式,一個交換機綁定兩個隊列,路由鍵有重疊關系,如下代碼,此時指定路由鍵topic.message
發(fā)送消息,隊列queueMessage
和queueMessages
都能接收到相同消息,也就是說,topic模式可以實現(xiàn)類似于廣播模式的形式,甚至更加靈活,它能否轉(zhuǎn)發(fā)到消息由路由鍵決定。
相比于Fanout模式,我們?nèi)绻獙οM者隊列分組發(fā)送,我們需要指定不同的路由鍵;而Fanout模式則需要指定不同的交換機和隊列綁定,實際使用結(jié)合實際情況
@Configuration public class TopicRabbitConfig { final static String message = "topic.message"; final static String messages = "topic.messages"; @Bean public Queue queueMessage() { return new Queue(TopicRabbitConfig.message); } @Bean public Queue queueMessages() { return new Queue(TopicRabbitConfig.messages); } @Bean TopicExchange exchange() { return new TopicExchange("exchange"); } @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } }
我們的常用配置如下
spring.rabbitmq.addresses=localhost:5672 spring.rabbitmq.username=user spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=1000 ##設置監(jiān)聽限制:最大10,默認5 spring.rabbitmq.listener.simple.concurrency=5 spring.rabbitmq.listener.simple.max-concurrency=10 spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true spring.rabbitmq.listener.simple.acknowledge-mode=manual
其中最后四條配置需要著重解釋:
spring.rabbitmq.publisher-confirms
為true,表示生產(chǎn)者消息發(fā)出后,MQ的broker接收到了消息,發(fā)送回執(zhí)表示確認接收,不設置則可能導致消息丟失
spring.rabbitmq.publisher-returns
為true,表示當消息不能到達MQ的Broker端,,則使用監(jiān)聽器對不可達的消息做后續(xù)處理,這種一般是路由鍵沒配好,或MQ宕機才可能發(fā)生
spring.rabbitmq.template.mandatory
當上面兩個為true時,這個一定要配true,否則上面兩個不起作用
spring.rabbitmq.listener.simple.acknowledge-mode
這個為manual
表示手工確認,實際生產(chǎn)應該設為手工,才能保證你的業(yè)務是處理完成的,注意業(yè)務的冪等性,可重復調(diào)用,手工確認代碼如下例子
@Component public class RabbitReceiver { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "queue-1", durable="true"), exchange = @Exchange(value = "exchange-1", durable="true", type= "topic", ignoreDeclarationExceptions = "true"), key = "springboot.*" ) ) @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { System.err.println("--------------------------------------"); System.err.println("消費端Payload: " + message.getPayload()); Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); //手工ACK,獲取deliveryTag channel.basicAck(deliveryTag, false); } }
queue: 隊列的名字
durable: 為true表示隊列中數(shù)據(jù)持久化到磁盤,可以防止mq宕機重啟數(shù)據(jù)丟失
exclusive: 為true表示排他性,只允許一個當前連接訪問該隊列,當前已連接就不允許新的連接進入否則報錯,當連接斷開當前隊列會銷毀
autoDelete: 為true表示自動刪除,當沒有Connection連接到隊列的時候,會自動刪除
arguments: 這個參數(shù)用來添加一些額外參數(shù)的,如下圖片
比如添加x-message-ttl
為5000,則表示消息超過5秒沒被處理就會超時過期;
x-expires
設置120000表示隊列在2分鐘內(nèi)沒被消費則被刪除;
x-max-length
,x-max-length-bytes
表示傳送數(shù)據(jù)的最大長度和字節(jié)數(shù)
x-dead-letter-exchange
,x-dead-letter-routing-key
表示死信交換機和死信路由,放在需要過期或處理失敗的隊列屬性中,這些數(shù)據(jù)會轉(zhuǎn)發(fā)到死信隊列存儲起來,創(chuàng)建普通的交換機和隊列綁定,把交換機名填到x-dead-letter-exchange
的值,填寫路由鍵要符合死信隊列的路由鍵
x-max-priority
,表示設置優(yōu)先級,范圍為0~255,只有當消息堆積的時候,這個優(yōu)先級才有意義,數(shù)字越大優(yōu)先級越高
x-queue-mode
當為lazy
,表示惰性隊列,3.6.0之后才被引入的概念,相比默認的模式,惰性隊列模式會將生產(chǎn)者產(chǎn)生的消息直接存到磁盤中,這當然會增加IO開銷,但適合應對大量消息堆積的情況;因為當大量消息堆積時,內(nèi)存也不夠存放,會將消息轉(zhuǎn)存到磁盤,這個過程也是比較耗時且過程中不能接收新的消息。如果需要將普通隊列轉(zhuǎn)換成惰性隊列需要將原來的隊列刪除,重新創(chuàng)建個惰性隊列綁定。
exchange : 交換機名稱
type : 交換機類型
durable : 持久化,同隊列
autoDelete : 是否自動刪除,同隊列
internal : 若為true,表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定。
arguments : 額外參數(shù),目前只有個alternate-exchange
,表示當生產(chǎn)者發(fā)送消息到這個交換機,路由不到該交換機的隊列,則會嘗試這個參數(shù)指定的交換機進行路由,若路由鍵匹配,則路由到alternate-exchange
指定的隊列,相當于轉(zhuǎn)發(fā)了,剛好和上一個參數(shù)internal
配合,若不想本交換機起到路由隊列的作用,可以設置internal
為true,把消息都轉(zhuǎn)發(fā)到alternate-exchange
指定的交換機,由該交換機來路由指定隊列,
如下圖:exchange0
設置了alternate-exchange
交換機為exchange1
,生產(chǎn)者發(fā)送數(shù)據(jù)到exchange0
路由鍵為test1
,在exchange0
路由不到,則轉(zhuǎn)發(fā)到exchange1
判斷路由符合,發(fā)送到隊列queue1
在RabbitMQ的管理界面,當我們集群部署時可以看到Nodes節(jié)點中Info字段可能為**disc
也可能ram
**,表示了磁盤存儲或內(nèi)存儲存。事實上,在集群部署的時候,我們至少要一個磁盤儲存,它代表了將交換機,隊列,綁定,用戶等元數(shù)據(jù)持久化保存到磁盤,一遍重啟RabbitMQ也能恢復到原先的狀態(tài),當只有一個節(jié)點時,必定是磁盤存儲;而內(nèi)存儲存也有它的優(yōu)勢,就是效率更高速度更快
當報下列錯誤,表示你一定存在排他性隊列,也就是設置了exclusive
屬性的隊列,由于同一個連接創(chuàng)建的不同通道可以訪問同一個隊列,此時由于這個排他屬性會得到資源被鎖定錯誤,也就是下列的錯誤。
由此我們可以知道,若你把隊列設置成了exclusive
屬性的,那么就別創(chuàng)建新的連接去訪問同一個隊列
ESOURCE_LOCKED - cannot obtain exclusive access to locked queue xxxxxx
關于“RabbitMQ怎么用”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。