從AMQP協(xié)議可以看出,MessageQueue、Exchange和Binding構(gòu)成了AMQP協(xié)議的核心,下面我們就圍繞這三個主要組件 從應(yīng)用使用的角度全面的介紹如何利用Rabbit MQ構(gòu)建消息隊列以及使用過程中的注意事項。
創(chuàng)新互聯(lián)服務(wù)項目包括芒市網(wǎng)站建設(shè)、芒市網(wǎng)站制作、芒市網(wǎng)頁制作以及芒市網(wǎng)絡(luò)營銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢、行業(yè)經(jīng)驗、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,芒市網(wǎng)站推廣取得了明顯的社會效益與經(jīng)濟效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到芒市省份的部分城市,未來相信會繼續(xù)擴大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!1.聲明MessageQueue
在Rabbit MQ中,無論是生產(chǎn)者發(fā)送消息還是消費者接受消息,都首先需要聲明一個MessageQueue。這就存在一個問題,是生產(chǎn)者聲明還是消費者聲明呢?要解決這個問題,首先需要明確:
a)消費者是無法訂閱或者獲取不存在的MessageQueue中信息。
b)消息被Exchange接受以后,如果沒有匹配的Queue,則會被丟棄。
在明白了上述兩點以后,就容易理解如果是消費者去聲明Queue,就有可能會出現(xiàn)在聲明Queue之前,生產(chǎn)者已發(fā)送的消息被丟棄的隱患。如果應(yīng)用能夠通過消息重發(fā)的機制允許消息丟失,則使用此方案沒有任何問題。但是如果不能接受該方案,這就需要無論是生產(chǎn)者還是消費者,在發(fā)送或者接受消息前,都需要去嘗試建立消息隊列。這里有一點需要明確,如果客戶端嘗試建立一個已經(jīng)存在的消息隊列,Rabbit MQ不會做任何事情,并返回客戶端建立成功的。
如果一個消費者在一個信道中正在監(jiān)聽某一個隊列的消息,Rabbit MQ是不允許該消費者在同一個channel去聲明其他隊列的。Rabbit MQ中,可以通過queue.declare命令聲明一個隊列,可以設(shè)置該隊列以下屬性:
a) Exclusive:排他隊列,如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見,并在連接斷開時自動刪除。這里需要注意三點:其一,排他隊列是基于連接可見的,同一連接的不同信道是可以同時訪問同一個連接創(chuàng)建的排他隊列的。其二,“首次”,如果一個連接已經(jīng)聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同。其三,即使該隊列是持久化的,一旦連接關(guān)閉或者客戶端退出,該排他隊列都會被自動刪除的。這種隊列適用于只限于一個客戶端發(fā)送讀取消息的應(yīng)用場景。
b) Auto-delete:自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用于臨時隊列。
c) Durable:持久化,這個會在后面作為專門一個章節(jié)討論。
d)其他選項,例如如果用戶僅僅想查詢某一個隊列是否已存在,如果不存在,不想建立該隊列,仍然可以調(diào)用queue.declare,只不過需要將參數(shù)passive設(shè)為true,傳給queue.declare,如果該隊列已存在,則會返回true;如果不存在,則會返回Error,但是不會創(chuàng)建新的隊列。
2. 生產(chǎn)者發(fā)送消息
在AMQP模型中,Exchange是接受生產(chǎn)者消息并將消息路由到消息隊列的關(guān)鍵組件。ExchangeType和Binding決定了消息的路由規(guī)則。所以生產(chǎn)者想要發(fā)送消息,首先必須要聲明一個Exchange和該Exchange對應(yīng)的Binding??梢酝ㄟ^ ExchangeDeclare和BindingDeclare完成。在Rabbit MQ中,聲明一個Exchange需要三個參數(shù):ExchangeName,ExchangeType和Durable。ExchangeName是該Exchange的名字,該屬性在創(chuàng)建Binding和生產(chǎn)者通過publish推送消息時需要指定。ExchangeType,指Exchange的類型,在RabbitMQ中,有三種類型的Exchange:direct ,fanout和topic,不同的Exchange會表現(xiàn)出不同路由行為。Durable是該Exchange的持久化屬性,這個會在消息持久化章節(jié)討論。聲明一個Binding需要提供一個QueueName,ExchangeName和BindingKey。下面我們就分析一下不同的ExchangeType表現(xiàn)出的不同路由規(guī)則。
生產(chǎn)者在發(fā)送消息時,都需要指定一個RoutingKey和Exchange,Exchange在接到該RoutingKey以后,會判斷該ExchangeType:
a) 如果是Direct類型,則會將消息中的RoutingKey與該Exchange關(guān)聯(lián)的所有Binding中的BindingKey進行比較,如果相等,則發(fā)送到該Binding對應(yīng)的Queue中。
b) 如果是Fanout 類型,則會將消息發(fā)送給所有與該Exchange 定義過Binding 的所有Queues 中去,其實是一種廣播行為。
c)如果是Topic類型,則會按照正則表達式,對RoutingKey與BindingKey進行匹配,如果匹配成功,則發(fā)送到對應(yīng)的Queue中。
3. 消費者訂閱消息
在RabbitMQ中消費者有2種方式獲取隊列中的消息:
a)一種是通過basic.consume命令,訂閱某一個隊列中的消息,channel會自動在處理完上一條消息之后,接收下一條消息。(同一個channel消息處理是串行的)。除非關(guān)閉channel或者取消訂閱,否則客戶端將會一直接收隊列的消息。
b)另外一種方式是通過basic.get命令主動獲取隊列中的消息,但是絕對不可以通過循環(huán)調(diào)用basic.get來代替basic.consume,這是因為basic.get RabbitMQ在實際執(zhí)行的時候,是首先consume某一個隊列,然后檢索第一條消息,然后再取消訂閱。如果是高吞吐率的消費者,最好還是建議使用basic.consume。
如果有多個消費者同時訂閱同一個隊列的話,RabbitMQ是采用循環(huán)的方式分發(fā)消息的,每一條消息只能被一個訂閱者接收。例如,有隊列Queue,其中ClientA和ClientB都Consume了該隊列,MessageA到達隊列后,被分派到ClientA,ClientA回復(fù)服務(wù)器收到響應(yīng),服務(wù)器刪除MessageA;再有一條消息MessageB抵達隊列,服務(wù)器根據(jù)“循環(huán)推送”原則,將消息會發(fā)給ClientB,然后收到ClientB的確認后,刪除MessageB;等到再下一條消息時,服務(wù)器會再將消息發(fā)送給ClientA。
這里我們可以看出,消費者再接到消息以后,都需要給服務(wù)器發(fā)送一條確認命令,這個即可以在handleDelivery里顯示的調(diào)用basic.ack實現(xiàn),也可以在Consume某個隊列的時候,設(shè)置autoACK屬性為true實現(xiàn)。這個ACK僅僅是通知服務(wù)器可以安全的刪除該消息,而不是通知生產(chǎn)者,與RPC不同。如果消費者在接到消息以后還沒來得及返回ACK就斷開了連接,消息服務(wù)器會重傳該消息給下一個訂閱者,如果沒有訂閱者就會存儲該消息。
既然RabbitMQ提供了ACK某一個消息的命令,當然也提供了Reject某一個消息的命令。當客戶端發(fā)生錯誤,調(diào)用basic.reject命令拒絕某一個消息時,可以設(shè)置一個requeue的屬性,如果為true,則消息服務(wù)器會重傳該消息給下一個訂閱者;如果為false,則會直接刪除該消息。當然,也可以通過ack,讓消息服務(wù)器直接刪除該消息并且不會重傳。
4.持久化:
Rabbit MQ默認是不持久隊列、Exchange、Binding以及隊列中的消息的,這意味著一旦消息服務(wù)器重啟,所有已聲明的隊列,Exchange,Binding以及隊列中的消息都會丟失。通過設(shè)置Exchange和MessageQueue的durable屬性為true,可以使得隊列和Exchange持久化,但是這還不能使得隊列中的消息持久化,這需要生產(chǎn)者在發(fā)送消息的時候,將delivery mode設(shè)置為2,只有這3個全部設(shè)置完成后,才能保證服務(wù)器重啟不會對現(xiàn)有的隊列造成影響。這里需要注意的是,只有durable為true的Exchange和durable為ture的Queues才能綁定,否則在綁定時,RabbitMQ都會拋錯的。持久化會對RabbitMQ的性能造成比較大的影響,可能會下降10倍不止。
5.事務(wù):
對事務(wù)的支持是AMQP協(xié)議的一個重要特性。假設(shè)當生產(chǎn)者將一個持久化消息發(fā)送給服務(wù)器時,因為consume命令本身沒有任何Response返回,所以即使服務(wù)器崩潰,沒有持久化該消息,生產(chǎn)者也無法獲知該消息已經(jīng)丟失。如果此時使用事務(wù),即通過txSelect()開啟一個事務(wù),然后發(fā)送消息給服務(wù)器,然后通過txCommit()提交該事務(wù),即可以保證,如果txCommit()提交了,則該消息一定會持久化,如果txCommit()還未提交即服務(wù)器崩潰,則該消息不會服務(wù)器就收。當然Rabbit MQ也提供了txRollback()命令用于回滾某一個事務(wù)。
6.Confirm機制:
使用事務(wù)固然可以保證只有提交的事務(wù),才會被服務(wù)器執(zhí)行。但是這樣同時也將客戶端與消息服務(wù)器同步起來,這背離了消息隊列解耦的本質(zhì)。Rabbit MQ提供了一個更加輕量級的機制來保證生產(chǎn)者可以感知服務(wù)器消息是否已被路由到正確的隊列中——Confirm。如果設(shè)置channel為confirm狀態(tài),則通過該channel發(fā)送的消息都會被分配一個唯一的ID,然后一旦該消息被正確的路由到匹配的隊列中后,服務(wù)器會返回給生產(chǎn)者一個Confirm,該Confirm包含該消息的ID,這樣生產(chǎn)者就會知道該消息已被正確分發(fā)。對于持久化消息,只有該消息被持久化后,才會返回Confirm。Confirm機制的優(yōu)點在于異步,生產(chǎn)者在發(fā)送消息以后,即可繼續(xù)執(zhí)行其他任務(wù)。而服務(wù)器返回Confirm后,會觸發(fā)生產(chǎn)者的回調(diào)函數(shù),生產(chǎn)者在回調(diào)函數(shù)中處理Confirm信息。如果消息服務(wù)器發(fā)生異常,導(dǎo)致該消息丟失,會返回給生產(chǎn)者一個nack,表示消息已經(jīng)丟失,這樣生產(chǎn)者就可以通過重發(fā)消息,保證消息不丟失。Confirm機制在性能上要比事務(wù)優(yōu)越很多。但是Confirm機制,無法進行回滾,就是一旦服務(wù)器崩潰,生產(chǎn)者無法得到Confirm信息,生產(chǎn)者其實本身也不知道該消息吃否已經(jīng)被持久化,只有繼續(xù)重發(fā)來保證消息不丟失,但是如果原先已經(jīng)持久化的消息,并不會被回滾,這樣隊列中就會存在兩條相同的消息,系統(tǒng)需要支持去重。
其他:
Broker:簡單來說就是消息隊列服務(wù)器實體。
Exchange:消息交換機,它指定消息按什么規(guī)則,路由到哪個隊列。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來。
Routing Key:路由關(guān)鍵字,exchange根據(jù)這個關(guān)鍵字進行消息投遞。
vhost:虛擬主機,一個broker里可以開設(shè)多個vhost,用作不同用戶的權(quán)限分離。
producer:消息生產(chǎn)者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務(wù)。
消息隊列的使用過程大概如下:
(1)客戶端連接到消息隊列服務(wù)器,打開一個channel。
(2)客戶端聲明一個exchange,并設(shè)置相關(guān)屬性。
(3)客戶端聲明一個queue,并設(shè)置相關(guān)屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關(guān)系。
(5)客戶端投遞消息到exchange。