這篇文章主要為大家展示了“分布式系統(tǒng)消息中間件RabbitMQ怎么用”,內(nèi)容簡(jiǎn)而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“分布式系統(tǒng)消息中間件RabbitMQ怎么用”這篇文章吧。
創(chuàng)新互聯(lián)公司是一家專注于成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)與策劃設(shè)計(jì),鐵嶺縣網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)公司做網(wǎng)站,專注于網(wǎng)站建設(shè)10多年,網(wǎng)設(shè)計(jì)領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:鐵嶺縣等地區(qū)。鐵嶺縣做網(wǎng)站價(jià)格咨詢:18982081108
前言:
這篇文章主要總結(jié)一下RabbitMQ在日常項(xiàng)目開發(fā)中比較常用的幾個(gè)特性。
一。 mandatory 參數(shù)
上一篇文章中我們知道,生產(chǎn)者將消息發(fā)送到RabbitMQ的交換器中通過RoutingKey與BindingKey的匹配將之路由到具體的隊(duì)列中以供消費(fèi)者消費(fèi)。那么當(dāng)我們通過匹配規(guī)則找不到隊(duì)列的時(shí)候,消息將何去何從呢?Rabbit給我們提供了兩種方式。mandatory與備份交換器。
mandatory參數(shù)是channel.BasicPublish方法中的參數(shù)。其主要功能是消息傳遞過程中不可達(dá)目的地時(shí)將消息返回給生產(chǎn)者。當(dāng)mandatory 參數(shù)設(shè)為true 時(shí),交換器無法根據(jù)自身的類型和路由鍵找到一個(gè)符合條件的隊(duì)列,那么RabbitMQ 會(huì)調(diào)用BasicReturn 命令將消息返回給生產(chǎn)者。當(dāng)mandatory 參數(shù)設(shè)置為false 時(shí)。則消息直接被丟棄。其運(yùn)轉(zhuǎn)流程與實(shí)現(xiàn)代碼如下(以C# RabbitMQ.Client 3.6.9為例):
//連接與創(chuàng)建信道--后續(xù)的示例代碼我們會(huì)省略掉這部分代碼和釋放連接 ConnectionFactory factory = new ConnectionFactory(); factory.UserName = "admin"; factory.Password = "admin"; factory.HostName = "192.168.121.205"; IConnection conn = factory.CreateConnection();//連接Rabbit IModel channel = conn.CreateModel();//創(chuàng)建信道 channel.ExchangeDeclare("exchangeName", "direct", true);//定義交換器 String queueName = channel.QueueDeclare("TestQueue", true, false, false, null).QueueName;//定義 隊(duì)列 隊(duì)列名TestQueue,持久化的,非排它的,非自動(dòng)刪除的。 channel.QueueBind(queueName, "exchangeName", "routingKey");//隊(duì)列綁定交換器 var message = Encoding.UTF8.GetBytes("TestMsg"); channel.BasicPublish("exchangeName", "routingKey", true, null, message);//發(fā)布一個(gè)可以路由到隊(duì)列的消息,mandatory參數(shù)設(shè)置為true var message1 = Encoding.UTF8.GetBytes("TestMsg1"); channel.BasicPublish("exchangeName", "routingKey1", true, null, message);//發(fā)布一個(gè)不可以路由到隊(duì)列的消息,mandatory參數(shù)設(shè)置為true //生產(chǎn)者回調(diào)函數(shù) channel.BasicReturn += (model, ea) => { //do something... 消息若不能路由到隊(duì)列則會(huì)調(diào)用此回調(diào)函數(shù)。 }; //關(guān)閉信道與連接 channel.close(); conn.close() ;
二。 備份交換器
當(dāng)消息不能路由到隊(duì)列時(shí),通過mandatory設(shè)置參數(shù),我們可以將消息返回給生產(chǎn)者處理。但這樣會(huì)有一個(gè)問題,就是生產(chǎn)者需要開一個(gè)回調(diào)的函數(shù)來處理不能路由到的消息,這無疑會(huì)增加生產(chǎn)者的處理邏輯。備份交換器(Altemate Exchange)則提供了另一種方式來處理不能路由的消息。備份交換器可以將未被路由的消息存儲(chǔ)在RabbitMQ中,在需要的時(shí)候去處理這些消息。其主要實(shí)現(xiàn)代碼如下:
IDictionaryargs = new Dictionary (); args.Add("alternate-exchange", "altExchange"); channel.ExchangeDeclare("normalExchange", "direct", true, false, args);//定義普通交換器并添加備份交換器參數(shù) channel.ExchangeDeclare("altExchange", "fanout", true, false, null); //定義備份交換器,并聲明為扇形交換器 channel.QueueDeclare("normalQueue", true, false, false, null);//定義普通隊(duì)列 channel.QueueBind("normalQueue", "normalExchange", "NormalRoutingKey1");//普通隊(duì)列隊(duì)列綁定普通交換器 channel.QueueDeclare("altQueue", true, false, false, null);//定義備份隊(duì)列 channel.QueueBind("altQueue", "altExchange", "");//綁定備份隊(duì)列與交換器 var msg1 = Encoding.UTF8.GetBytes("TestMsg"); channel.BasicPublish("normalExchange", "NormalRoutingKey1", false, null, msg1);//發(fā)布一個(gè)可以路由到隊(duì)列的消息,消息最終會(huì)路由到normalQueue var msg2 = Encoding.UTF8.GetBytes("TestMsg1"); channel.BasicPublish("normalExchange", "NormalRoutingKey2", false, null, msg2);//發(fā)布一個(gè)不可以被路由的消息,消息最終會(huì)進(jìn)入altQueue
備份交換器其實(shí)和普通的交換器沒有太大的區(qū)別,為了方便使用,建議設(shè)置為fanout類型,若設(shè)置為direct 或者topic的類型。需要注意的是,消息被重新發(fā)送到備份交換器時(shí)的路由鍵和從生產(chǎn)者發(fā)出的路由鍵是一樣的??紤]這樣一種情況,如果備份交換器的類型是direct,并且有一個(gè)與其綁定的隊(duì)列,假設(shè)綁定的路由鍵是key1,當(dāng)某條攜帶路由鍵為key2 的消息被轉(zhuǎn)發(fā)到這個(gè)備份交換器的時(shí)候,備份交換器沒有匹配到合適的隊(duì)列,則消息丟失。如果消息攜帶的路由鍵為keyl,則可以存儲(chǔ)到隊(duì)列中。
對(duì)于備份交換器,有以下幾種特殊情況:
如果設(shè)置的備份交換器不存在,客戶端和RabbitMQ 服務(wù)端都不會(huì)有異常出現(xiàn),此時(shí)消息會(huì)丟失。
如果備份交換器沒有綁定任何隊(duì)列,客戶端和RabbitMQ 服務(wù)端都不會(huì)有異常出現(xiàn),此時(shí)消息會(huì)丟失。
如果備份交換器沒有任何匹配的隊(duì)列,客戶端和RabbitMQ 服務(wù)端都不會(huì)有異常出現(xiàn),此時(shí)消息會(huì)丟失。
如果備份交換器和mandatory參數(shù)一起使用,那么mandatory參數(shù)無效。
三。 過期時(shí)間(TTL)
3.1 設(shè)置消息的TTL
目前有兩種方法可以設(shè)置消息的TTL。第一種方法是通過隊(duì)列屬性設(shè)置,隊(duì)列中所有消息都有相同的過期時(shí)間。第二種方法是對(duì)消息本身進(jìn)行單獨(dú)設(shè)置,每條消息的TTL可以不同。如果兩種方法一起使用,則消息的TTL 以兩者之間較小的那個(gè)數(shù)值為準(zhǔn)。消息在隊(duì)列中的生存時(shí)間一旦超過設(shè)置的TTL值時(shí),就會(huì)變成"死信" (Dead Message) ,消費(fèi)者將無法再收到該消息。(有關(guān)死信隊(duì)列請(qǐng)往下看)
通過隊(duì)列屬性設(shè)置消息TTL的方法是在channel.QueueDeclare方法中加入x-message-ttl參數(shù)實(shí)現(xiàn)的,這個(gè)參數(shù)的單位是毫秒。示例代碼下:
IDictionaryargs = new Dictionary (); args.Add("x-message-ttl", 6000); channel.QueueDeclare("ttlQueue", true, false, false, args);
如果不設(shè)置TTL.則表示此消息不會(huì)過期;如果將TTL設(shè)置為0 ,則表示除非此時(shí)可以直接將消息投遞到消費(fèi)者,否則該消息會(huì)被立即丟棄(或由死信隊(duì)列來處理)。
針對(duì)每條消息設(shè)置TTL的方法是在channel.BasicPublish方法中加入Expiration的屬性參數(shù),單位為毫秒。關(guān)鍵代碼如下:
BasicProperties properties = new BasicProperties() { Expiration = "20000",//設(shè)置TTL為20000毫秒 }; var message = Encoding.UTF8.GetBytes("TestMsg"); channel.BasicPublish("normalExchange", "NormalRoutingKey", true, properties, message);
注意:對(duì)于第一種設(shè)置隊(duì)列TTL屬性的方法,一旦消息過期,就會(huì)從隊(duì)列中抹去,而在第二種方法中,即使消息過期,也不會(huì)馬上從隊(duì)列中抹去,因?yàn)槊織l消息是否過期是在即將投遞到消費(fèi)者之前判定的。Why?在第一種方法里,隊(duì)列中己過期的消息肯定在隊(duì)列頭部, RabbitMQ 只要定期從隊(duì)頭開始掃描是否有過期的消息即可。而第二種方法里,每條消息的過期時(shí)間不同,如果要?jiǎng)h除所有過期消息勢(shì)必要掃描整個(gè)隊(duì)列,所以不如等到此消息即將被消費(fèi)時(shí)再判定是否過期,如果過期再進(jìn)行刪除即可。
3.2 設(shè)置隊(duì)列的TTL
注意,這里和上述通過隊(duì)列設(shè)置消息的TTL不同。上面刪除的是消息,而這里刪除的是隊(duì)列。通過channel.QueueDeclare 方法中的x-expires參數(shù)可以控制隊(duì)列被自動(dòng)刪除前處于未使用狀態(tài)的時(shí)間。這個(gè)未使用的意思是隊(duì)列上沒有任何的消費(fèi)者,隊(duì)列也沒有被重新聲明,并且在過期時(shí)間段內(nèi)也未調(diào)用過channel.BasicGet命令。
設(shè)置隊(duì)列里的TTL可以應(yīng)用于類似RPC方式的回復(fù)隊(duì)列,在RPC中,許多隊(duì)列會(huì)被創(chuàng)建出來,但是卻是未被使用的(有關(guān)RabbitMQ實(shí)現(xiàn)RPC請(qǐng)往下看)。RabbitMQ會(huì)確保在過期時(shí)間到達(dá)后將隊(duì)列刪除,但是不保障刪除的動(dòng)作有多及時(shí)。在RabbitMQ 重啟后, 持久化的隊(duì)列的過期時(shí)間會(huì)被重新計(jì)算。用于表示過期時(shí)間的x-expires參數(shù)以毫秒為單位, 井且服從和x-message-ttl一樣的約束條件,不同的是它不能設(shè)置為0(會(huì)報(bào)錯(cuò))。
示例代碼如下:
IDictionaryargs = new Dictionary (); args.Add("x-expires", 6000); channel.QueueDeclare("ttlQueue", false, false, false, args);
四。 死信隊(duì)列
DLX(Dead-Letter-Exchange)死信交換器,當(dāng)消息在一個(gè)隊(duì)列中變成死信之后,它能被重新被發(fā)送到另一個(gè)交換器中,這個(gè)交換器就是DLX ,綁定DLX的隊(duì)列就稱之為死信隊(duì)列。
消息變成死信主要有以下幾種情況:
消息被拒絕(BasicReject/BasicNack) ,井且設(shè)置requeue 參數(shù)為false;(消費(fèi)者確認(rèn)機(jī)制將會(huì)在下一篇文章中涉及)
消息過期;
隊(duì)列達(dá)到最大長(zhǎng)度。
DLX也是一個(gè)正常的交換器,和一般的交換器沒有區(qū)別,它能在任何的隊(duì)列上被指定,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性。當(dāng)這個(gè)隊(duì)列中存在死信時(shí),RabbitMQ 就會(huì)自動(dòng)地將這個(gè)消息重新發(fā)布到設(shè)置的DLX上去,進(jìn)而被路由到另一個(gè)隊(duì)列,即死信隊(duì)列。可以監(jiān)聽這個(gè)隊(duì)列中的消息、以進(jìn)行相應(yīng)的處理。
通過在channel.QueueDeclare 方法中設(shè)置x-dead-letter-exchange參數(shù)來為這個(gè)隊(duì)列添加DLX。其示例代碼如下:
channel.ExchangeDeclare("exchange.dlx", "direct", true);//定義死信交換器 channel.ExchangeDeclare("exchange.normal", "direct", true);//定義普通交換器 IDictionaryargs = new Dictionary (); args.Add("x-message-ttl",10000);//定義消息過期時(shí)間為10000毫秒 args.Add("x-dead-letter-exchange", "exchange.dlx");//定義exchange.dlx為死信交換器 args.Add("x-dead-letter-routing-key", "routingkey");//定義死信交換器的綁定key,這里也可以不指定,則默認(rèn)使用原隊(duì)列的路由key channel.QueueDeclare("queue.normal", true, false, false, args);//定義普通隊(duì)列 channel.QueueBind("queue.normal", "exchange.normal", "normalKey");//普通隊(duì)列交換器綁定 channel.QueueDeclare("queue.dlx", true, false, false, null);//定義死信隊(duì)列 channel.QueueBind("queue.dlx", "exchange.dlx", "routingkey");//死信隊(duì)列交換器綁定,若上方為制定死信隊(duì)列路由key則這里需要使用原隊(duì)列的路由key //發(fā)布消息 var message = Encoding.UTF8.GetBytes("TestMsg"); channel.BasicPublish("exchange.normal", "normalKey", null, message) ;
以下為死信隊(duì)列的運(yùn)轉(zhuǎn)流程:
五。 延遲隊(duì)列
RabbitMQ本身并未提供延遲隊(duì)列的功能。延遲隊(duì)列是一個(gè)邏輯上的概念,可以通過過期時(shí)間+死信隊(duì)列來模擬它的實(shí)現(xiàn)。延遲隊(duì)列的邏輯架構(gòu)大致如下:
生產(chǎn)者將消息發(fā)送到過期時(shí)間為n的隊(duì)列中,這個(gè)隊(duì)列并未有消費(fèi)者來消費(fèi)消息,當(dāng)過期時(shí)間到達(dá)時(shí),消息會(huì)通過死信交換器被轉(zhuǎn)發(fā)到死信隊(duì)列中。而消費(fèi)者從死信隊(duì)列中消費(fèi)消息。這個(gè)時(shí)候就達(dá)到了生產(chǎn)者發(fā)布了消息在講過了n時(shí)間后消費(fèi)者消費(fèi)了消息,起到了延遲消費(fèi)的作用。
延遲隊(duì)列在我們的項(xiàng)目中可以應(yīng)用于很多場(chǎng)景,如:下單后兩個(gè)消息取消訂單,七天自動(dòng)收貨,七天自動(dòng)好評(píng),密碼凍結(jié)后24小時(shí)解凍,以及在分布式系統(tǒng)中消息補(bǔ)償機(jī)制(1s后補(bǔ)償,10s后補(bǔ)償,5m后補(bǔ)償......)。
六。 優(yōu)先級(jí)隊(duì)列
就像我們生活中的“特殊”人士一樣,我們的業(yè)務(wù)上也存在一些“特殊”消息,可能需要優(yōu)先進(jìn)行處理,在生活上我們可能會(huì)對(duì)這部分特殊人士開辟一套VIP通道,而Rabbit同樣也有這樣的VIP通道(前提是在3.5的版本以后),即優(yōu)先級(jí)隊(duì)列,隊(duì)列中的消息會(huì)有優(yōu)先級(jí)優(yōu)先級(jí)高的消息具備優(yōu)先被消費(fèi)的特權(quán)。針對(duì)這些VIP消息,我們只需做兩件事:
我們只需做兩件事情:
將隊(duì)列聲明為優(yōu)先級(jí)隊(duì)列,即在創(chuàng)建隊(duì)列的時(shí)候添加參數(shù) x-max-priority 以指定最大的優(yōu)先級(jí),值為0-255(整數(shù))。
為優(yōu)先級(jí)消息添加優(yōu)先級(jí)。
其示例代碼如下:
channel.ExchangeDeclare("exchange.priority", "direct", true);//定義交換器 IDictionaryargs = new Dictionary (); args.Add("x-max-priority", 10);//定義優(yōu)先級(jí)隊(duì)列的最大優(yōu)先級(jí)為10 channel.QueueDeclare("queue.priority", true, false, false, args);//定義優(yōu)先級(jí)隊(duì)列 channel.QueueBind("queue.priority", "exchange.priority", "priorityKey");//隊(duì)列交換器綁定 BasicProperties properties = new BasicProperties() { Priority =8,//設(shè)置消息優(yōu)先級(jí)為8 }; var message = Encoding.UTF8.GetBytes("TestMsg8"); //發(fā)布消息 channel.BasicPublish("exchange.priority", "priorityKey", properties, message);
注意:沒有指定優(yōu)先級(jí)的消息會(huì)將優(yōu)先級(jí)以0對(duì)待。 對(duì)于超過優(yōu)先級(jí)隊(duì)列所定最大優(yōu)先級(jí)的消息,優(yōu)先級(jí)以最大優(yōu)先級(jí)對(duì)待。對(duì)于相同優(yōu)先級(jí)的消息,后進(jìn)的排在前面。如果在消費(fèi)者的消費(fèi)速度大于生產(chǎn)者的速度且Broker 中沒有消息堆積的情況下, 對(duì)發(fā)送的消息設(shè)置優(yōu)先級(jí)也就沒有什么實(shí)際意義。因?yàn)樯a(chǎn)者剛發(fā)送完一條消息就被消費(fèi)者消費(fèi)了,那么就相當(dāng)于Broker 中至多只有一條消息,對(duì)于單條消息來說優(yōu)先級(jí)是沒有什么意義的。
關(guān)于優(yōu)先級(jí)隊(duì)列,好像違背了隊(duì)列這種數(shù)據(jù)結(jié)構(gòu)先進(jìn)先出的原則,其具體是怎么實(shí)現(xiàn)的在這里就不過多討論。有興趣的可以自己研究研究。后續(xù)可能也會(huì)有相關(guān)的文章來分析其原理。
七。 RPC 實(shí)現(xiàn)
RPC,是Remote Procedure Call 的簡(jiǎn)稱,即遠(yuǎn)程過程調(diào)用。它是一種通過網(wǎng)絡(luò)從遠(yuǎn)程計(jì)算機(jī)上請(qǐng)求服務(wù),而不需要了解底層網(wǎng)絡(luò)的技術(shù)。RPC 的主要功用是讓構(gòu)建分布式計(jì)算更容易,在提供強(qiáng)大的遠(yuǎn)程調(diào)用能力時(shí)不損失本地調(diào)用的語義簡(jiǎn)潔性。
有關(guān)RPC不多介紹,這里我們主要介紹RabbitMQ如何實(shí)現(xiàn)RPC。RabbitMQ 可以實(shí)現(xiàn)很簡(jiǎn)單的RPC??蛻舳税l(fā)送請(qǐng)求消息,服務(wù)端回復(fù)響應(yīng)的消息,為了接收響應(yīng)的消息,我們需要在請(qǐng)求消息中發(fā)送一個(gè)回調(diào)隊(duì)列(可以使用默認(rèn)的隊(duì)列)。其 以上是Rabbit客戶端自己幫我們封裝好的Rpc客戶端與服務(wù)端的邏輯。當(dāng)然我們也可以自己實(shí)現(xiàn),主要是借助于BasicProperties的兩個(gè)參數(shù)。 ReplyTo: 通常用來設(shè)置一個(gè)回調(diào)隊(duì)列。 CorrelationId : 用來關(guān)聯(lián)請(qǐng)求(request) 和其調(diào)用RPC 之后的回復(fù)(response) 。 其處理流程如下: 當(dāng)客戶端啟動(dòng)時(shí),創(chuàng)建一個(gè)匿名的回調(diào)隊(duì)列。 客戶端為RPC 請(qǐng)求設(shè)置2個(gè)屬性: ReplyTo用來告知RPC 服務(wù)端回復(fù)請(qǐng)求時(shí)的目的隊(duì)列,即回調(diào)隊(duì)列; Correlationld 用來標(biāo)記一個(gè)請(qǐng)求。 請(qǐng)求被發(fā)送到RpcQueue隊(duì)列中。 RPC 服務(wù)端監(jiān)聽RpcQueue隊(duì)列中的請(qǐng)求,當(dāng)請(qǐng)求到來時(shí),服務(wù)端會(huì)處理并且把帶有結(jié)果的消息發(fā)送給客戶端。接收的隊(duì)列就是ReplyTo設(shè)定的回調(diào)隊(duì)列。 客戶端監(jiān)昕回調(diào)隊(duì)列,當(dāng)有消息時(shí),檢查Correlationld 屬性,如果與請(qǐng)求匹配,那就是結(jié)果了。 以上是“分布式系統(tǒng)消息中間件RabbitMQ怎么用”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!
當(dāng)前名稱:分布式系統(tǒng)消息中間件RabbitMQ怎么用
標(biāo)題來源:http://weahome.cn/article/jechjh.html