本篇文章為大家展示了RabbitMQ消息丟失如何解決 ,內(nèi)容簡明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。
我們一直強(qiáng)調(diào)成都做網(wǎng)站、網(wǎng)站設(shè)計(jì)對(duì)于企業(yè)的重要性,如果您也覺得重要,那么就需要我們慎重對(duì)待,選擇一個(gè)安全靠譜的網(wǎng)站建設(shè)公司,企業(yè)網(wǎng)站我們建議是要么不做,要么就做好,讓網(wǎng)站能真正成為企業(yè)發(fā)展過程中的有力推手。專業(yè)網(wǎng)站建設(shè)公司不一定是大公司,成都創(chuàng)新互聯(lián)作為專業(yè)的網(wǎng)絡(luò)公司選擇我們就是放心。
首先我們看下消息周期投遞過程:
我們把該圖分三部分,左中右,每部分都會(huì)導(dǎo)致消息丟失情況,下面就詳細(xì)聊聊每個(gè)階段消息是如何丟的:
1) 外界環(huán)境問題導(dǎo)致:發(fā)生網(wǎng)絡(luò)丟包、網(wǎng)絡(luò)故障等造成RabbitMQ Server端收不到消息,因?yàn)樯a(chǎn)環(huán)境的網(wǎng)絡(luò)是很復(fù)雜的,網(wǎng)絡(luò)抖動(dòng),丟包現(xiàn)象很常見,下面會(huì)講到針對(duì)這個(gè)問題是如何解決的。
2) 代碼層面,配置層面,考慮不全導(dǎo)致消息丟失
事例1:
一般情況下,生產(chǎn)者使用Confirm模式投遞消息,如果方案不夠嚴(yán)謹(jǐn),比如RabbitMQ Server 接收消息失敗后會(huì)發(fā)送nack消息通知生產(chǎn)者,生產(chǎn)者監(jiān)聽消息失敗或者沒做任何事情,消息存在丟失風(fēng)險(xiǎn);
事例2:
生產(chǎn)者發(fā)送消息到exchange后,發(fā)送的路由和queue沒有綁定,消息會(huì)存在丟失情況,下面會(huì)講到具體的例子,保證意外情況的發(fā)生,即使發(fā)生,也在可控范圍內(nèi)。
1)消息未完全持久化,當(dāng)機(jī)器重啟后,消息會(huì)全部丟失,甚至Queue也不見了
假如:你僅僅持久化了Message,而Exchange,Queue沒有持久化,這個(gè)持久化是無效的。 記得之前公司有一哥們忘記持久化Queue導(dǎo)致機(jī)器重啟后,Queue不見了,自然Message也丟失了。
2)單節(jié)點(diǎn)模式問題,如果某個(gè)節(jié)點(diǎn)掛了,消息就不能用了,業(yè)務(wù)可能癱瘓,只能等待
如果做了消息持久化方案,消息會(huì)持久化硬盤,機(jī)器重啟后消息不會(huì)丟失;但是還有一個(gè)極端情況,這臺(tái)服務(wù)器磁盤突然壞了(公司遇到過磁盤問題還是很多的),消息持久化不了,非高可用狀態(tài),這個(gè)模式生產(chǎn)環(huán)境慎重考慮。
3)普通集群模式:某個(gè)節(jié)點(diǎn)掛了,該節(jié)點(diǎn)上的消息不能用,有影響的業(yè)務(wù)癱瘓,只能等待節(jié)點(diǎn)恢復(fù)重啟可用(建立在消息持久化)
雖然這個(gè)模式進(jìn)步了一點(diǎn)點(diǎn),多個(gè)節(jié)點(diǎn),但是消息還是不能保證可靠,為什么呢?
因?yàn)镽abbitMQ 集群模式有點(diǎn)特殊,隊(duì)列的內(nèi)容僅僅存在某一個(gè)節(jié)點(diǎn)上面,不會(huì)存在所有節(jié)點(diǎn)上面,所有節(jié)點(diǎn)僅僅存放消息結(jié)構(gòu)和元數(shù)據(jù)(可以理解為索引,這也是為了提高性能,如果每次把所有內(nèi)容同步到所有節(jié)點(diǎn)是有開銷代價(jià)的)。 下面自己畫了一張圖介紹普通集群丟失消息情況:
這里有三個(gè)節(jié)點(diǎn),通常情況下一個(gè)磁盤節(jié)點(diǎn),兩個(gè)內(nèi)存節(jié)點(diǎn),首先先說明下, Queue1 內(nèi)容僅僅存在節(jié)點(diǎn)note1上面,在創(chuàng)建隊(duì)列的時(shí)候已經(jīng)固定了,note2,note3 僅僅存放的是元數(shù)據(jù),這個(gè)一定要清楚,Producer發(fā)送消息到note2,note2 會(huì)同步元數(shù)據(jù)到其他節(jié)點(diǎn),內(nèi)容會(huì)同步note1。
那我們想下,圖中的Q1問題,note1掛了,這個(gè)節(jié)點(diǎn)的Queues全部暫時(shí)不可用,節(jié)點(diǎn)恢復(fù)后可用。
我們說下圖片中備注2中的問題,Producer發(fā)送消息到note2,note2在同步note1前note1掛了,此時(shí)你的心情是怎么樣的。。。后面會(huì)講具體的策略
4)鏡像模式:可以解決上面的問題,但是還是有意外情況發(fā)生
比如:持久化的消息,保存到硬盤過程中,當(dāng)前隊(duì)列節(jié)點(diǎn)掛了,存儲(chǔ)節(jié)點(diǎn)硬盤又壞了,消息丟了,怎么辦?下面會(huì)詳細(xì)介紹
消費(fèi)端接收到相關(guān)消息之后,消費(fèi)端還沒來得及處理消息,消費(fèi)端機(jī)器就宕機(jī)了,此時(shí)消息如果處理不當(dāng)會(huì)有丟失風(fēng)險(xiǎn),后面會(huì)講到如何處理這個(gè)情況,消費(fèi)端也有ack機(jī)制
下面也是從三個(gè)方面介紹:
1.生產(chǎn)者生產(chǎn)消息到RabbitMQ Server 可靠性保證
2.RabbitMQ Server中存儲(chǔ)的消息如何保證
3.RabbitMQ Server到消費(fèi)者消息如何不丟
這個(gè)過程,消息可能會(huì)丟,比如發(fā)生網(wǎng)絡(luò)丟包、網(wǎng)絡(luò)故障等造成消息丟失,一般情況下如果不采取措施,生產(chǎn)者無法感知消息是否已經(jīng)正確無誤的發(fā)送到exchange中,如果生產(chǎn)者能感知到的話,它可以進(jìn)行進(jìn)一步的處理動(dòng)作,比如重新投遞相關(guān)消息以確保消息的可靠性。
1.1 通常有一種方案可以解決:就是 AMQP協(xié)議提供的一個(gè)事務(wù)機(jī)制
RabbitMQ客戶端中Channel 接口提供了幾個(gè)事務(wù)機(jī)制相關(guān)的方法:
channel.txSelect
channel.txCommit
channel.txRollback
源碼截圖如下:com.rabbitmq.client 包中public interface Channel extendsShutdownNotifier {}接口
在生產(chǎn)者發(fā)送消息之前,通過channel.txSelect開啟一個(gè)事務(wù),接著發(fā)送消息, 如果消息投遞server失敗,進(jìn)行事務(wù)回滾channel.txRollback,然后重新發(fā)送, 如果server收到消息,就提交事務(wù)channel.txCommit
但是,很少有人這么干,因?yàn)檫@是同步操作,一條消息發(fā)送之后會(huì)使發(fā)送端阻塞,以等待RabbitMQ Server的回應(yīng),之后才能繼續(xù)發(fā)送下一條消息,生產(chǎn)者生產(chǎn)消息的吞吐量和性能都會(huì)大大降低。
1.2 幸運(yùn)的是RabbitMQ提供了一個(gè)改進(jìn)方案,即發(fā)送方確認(rèn)機(jī)制(publisher confirm)
首先生產(chǎn)者通過調(diào)用channel.confirmSelect方法將信道設(shè)置為confirm模式,一旦信道進(jìn)入confirm模式,所有在該信道上面發(fā)布的消息都會(huì)被指派一個(gè)唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊(duì)列之后,RabbitMQ就會(huì)發(fā)送一個(gè)確認(rèn)(Basic.Ack)給生產(chǎn)者(包含消息的唯一deliveryTag和multiple參數(shù)),這就使得生產(chǎn)者知曉消息已經(jīng)正確到達(dá)了目的地了。
其實(shí)Confirm模式有三種方式實(shí)現(xiàn):
串行confirm模式:producer每發(fā)送一條消息后,調(diào)用waitForConfirms()方法,等待broker端confirm,如果服務(wù)器端返回false或者在超時(shí)時(shí)間內(nèi)未返回,客戶端進(jìn)行消息重傳。
批量confirm模式:producer每發(fā)送一批消息后,調(diào)用waitForConfirms()方法,等待broker端confirm。
異步confirm模式:提供一個(gè)回調(diào)方法,broker confirm了一條或者多條消息后producer端會(huì)回調(diào)這個(gè)方法。 我們分別來看看這三種confirm模式
串行confirm
for(int i = 0;i<50;i++){ channel.basicPublish( exchange, routingKey, mandatory, immediate, messageProperties, message.getContent() ); if (channel.waitForConfirms()) { System.out.println("發(fā)送成功"); } else { //發(fā)送失敗這里可進(jìn)行消息重新投遞的邏輯 System.out.println("發(fā)送失敗"); } }
批量confirm模式
for(int i = 0;i<50;i++){ channel.basicPublish( exchange, routingKey, mandatory, immediate, messageProperties, message.getContent() ); } if (channel.waitForConfirms()) { System.out.println("發(fā)送成功"); } else { System.out.println("發(fā)送失敗"); }
上面代碼是簡單版本的,生產(chǎn)環(huán)境絕對(duì)不是循環(huán)發(fā)送的,而是根據(jù)業(yè)務(wù)情況, 各個(gè)客戶端程序需要定期(每x秒)或定量(每x條)或者兩者結(jié)合來publish消息,然后等待服務(wù)器端confirm。相比普通confirm模式,批量可以極大提升confirm效率。
但是有沒有發(fā)現(xiàn)什么問題?
問題1: 批量發(fā)送的邏輯復(fù)雜化了。
問題2: 一旦出現(xiàn)confirm返回false或者超時(shí)的情況時(shí),客戶端需要將這一批次的消息全部重發(fā),這會(huì)帶來明顯的重復(fù)消息數(shù)量,并且當(dāng)消息經(jīng)常丟失時(shí),批量confirm性能應(yīng)該是不升反降的。
異步confirm模式
Channel channel = channelManager.getPublisherChannel(namespaceName); ProxiedConfirmListener confirmListener = new ProxiedConfirmListener();//監(jiān)聽類 confirmListener.setChannelManager(channelManager); confirmListener.setChannel(channel); confirmListener.setNamespace(namespaceName); confirmListener.addSuccessCallbacks(successCallbacks); channel.addConfirmListener(confirmListener); channel.confirmSelect();//開啟confirm模式 AMQP.BasicProperties messageProperties = null; if (message.getProperty() instanceof AMQP.BasicProperties) { messageProperties = (AMQP.BasicProperties) message.getProperty(); } confirmListener.toConfirm(channel.getNextPublishSeqNo(), rawMsg); for(int i = 0;i<50;i++){ channel.basicPublish( exchange, routingKey, mandatory, immediate, messageProperties, message.getContent() ); }
異步模式需要自己多寫一部分復(fù)雜的代碼實(shí)現(xiàn),異步監(jiān)聽類,監(jiān)聽server端的通知消息,異步的好處性能會(huì)大幅度提升,發(fā)送完畢之后,可以繼續(xù)發(fā)送其他消息。 MQServer通知生產(chǎn)端ConfirmListener監(jiān)聽類:用戶可以繼承接口實(shí)現(xiàn)自己的實(shí)現(xiàn)類,處理消息確認(rèn)機(jī)制,此處繼承類代碼省略,就是上面 ProxiedConfirmListener 類: 下面貼下要實(shí)現(xiàn)的接口:
package com.rabbitmq.client; import java.io.IOException; /** * Implement this interface in order to be notified of Confirm events. * Acks represent messages handled successfully; Nacks represent * messages lost by the broker. Note, the lost messages could still * have been delivered to consumers, but the broker cannot guarantee * this. */ public interface ConfirmListener { /** ** handleAck RabbitMQ消息接收成功的方法,成功后業(yè)務(wù)可以做的事情 ** 發(fā)送端投遞消息前,需要把消息先存起來,比如用KV存儲(chǔ),接收到ack后刪除 **/ void handleAck(long deliveryTag, boolean multiple) throws IOException; //handleNack RabbitMQ消息接收失敗的通知方法,用戶可以在這里重新投遞消息 void handleNack(long deliveryTag, boolean multiple) throws IOException; }
上面的接口很有意思,如果是你的話,怎么實(shí)現(xiàn)? 消息投遞前如何存儲(chǔ)消息,ack 和 nack 如何處理消息?
下面看下異步confirm的消息投遞流程:
解釋下這張圖片:
channel1 連續(xù)發(fā)類1,2,3條消息到RabbitMQ-Server,RabbitMQ-Server通知返回一條通知,里面包含回傳給生產(chǎn)者的確認(rèn)消息中的deliveryTag包含了確認(rèn)消息的序號(hào),此外還有一個(gè)參數(shù)multiple=true,表示到這個(gè)序號(hào)之前的所有消息都已經(jīng)得到了處理。這樣客戶端和服務(wù)端通知的次數(shù)就減少類,提升類性能。
channel3 發(fā)送的消息失敗了,生產(chǎn)端需要對(duì)投遞消息重新投遞,需要額外處理代碼。 那么生產(chǎn)端需要做什么事情呢?因?yàn)槭钱惒降?,生產(chǎn)端需要存儲(chǔ)消息然后根據(jù)server通知的消息,確認(rèn)如何處理,于是我們面臨的問題是:
第一:發(fā)送消息之前把消息存起來
第二:監(jiān)聽ack 和 nack 并做響應(yīng)處理
那么怎么存儲(chǔ)呢?
我們分析下,可以使用SortedMap 存儲(chǔ),保證有序,但是有個(gè)問題高并發(fā)情況下, 每秒可能幾千甚至上萬的消息投遞出去,消息的ack要等幾百毫秒的話,放內(nèi)存可能有內(nèi)存溢出的風(fēng)險(xiǎn)。所以建議采用KV存儲(chǔ),KV存儲(chǔ)承載高并發(fā)能力高,性能好,但是要保證KV 高可用,單個(gè)有個(gè)缺點(diǎn)就是又引入了第三方中間件,復(fù)雜度升高。
解決了上面的問題,下面還會(huì)遇到一個(gè)問題,消息丟失的另一個(gè)情況?
事務(wù)機(jī)制和publisher confirm機(jī)制確保的是消息能夠正確的發(fā)送至RabbitMQ,這里的“發(fā)送至RabbitMQ”的含義是指消息被正確的發(fā)往至RabbitMQ的交換器,如果此交換器沒有匹配的隊(duì)列的話,那么消息也將會(huì)丟失,怎么辦?
這里有兩個(gè)解決方案,
我們看下RabbitMQ客戶端代碼方法
Channel 類中 發(fā)布消息方法
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
解釋下:basicPublish 方法中的,mandatory和immediate
/** * 當(dāng)mandatory標(biāo)志位設(shè)置為true時(shí),如果exchange根據(jù)自身類型和消息routeKey無法找到一個(gè)符合條件的queue, 那么會(huì)調(diào)用basic.return方法將消息返回給生產(chǎn)者
* 當(dāng)mandatory設(shè)置為false時(shí),出現(xiàn)上述情形broker會(huì)直接將消息扔掉。 */ @Setter(AccessLevel.PACKAGE) private boolean mandatory = false; /** * 當(dāng)immediate標(biāo)志位設(shè)置為true時(shí),如果exchange在將消息路由到queue(s)時(shí)發(fā)現(xiàn)對(duì)于的queue上沒有消費(fèi)者, 那么這條消息不會(huì)放入隊(duì)列中。 當(dāng)immediate標(biāo)志位設(shè)置為false時(shí),exchange路由的隊(duì)列沒有消費(fèi)者時(shí),該消息會(huì)通過basic.return方法返還給生產(chǎn)者。 * RabbitMQ 3.0版本開始去掉了對(duì)于immediate參數(shù)的支持,對(duì)此RabbitMQ官方解釋是:這個(gè)關(guān)鍵字違背了生產(chǎn)者和消費(fèi)者之間解耦的特性,因?yàn)樯a(chǎn)者不關(guān)心消息是否被消費(fèi)者消費(fèi)掉 */ @Setter(AccessLevel.PACKAGE) private boolean immediate;
所以為了保證消息的可靠性,需要設(shè)置發(fā)送消息代碼邏輯。如果不單獨(dú)形式設(shè)置mandatory=false
使用mandatory 設(shè)置true的時(shí)候有個(gè)關(guān)鍵點(diǎn)要調(diào)整,生產(chǎn)者如何獲取到?jīng)]有被正確路由到合適隊(duì)列的消息呢?通過調(diào)用channel.addReturnListener來添加ReturnListener監(jiān)聽器實(shí)現(xiàn),只要發(fā)送的消息,沒有路由到具體的隊(duì)列,ReturnListener就會(huì)收到監(jiān)聽消息。
channel.addReturnListener(new ReturnListener() { public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP .BasicProperties basicProperties, byte[] body) throws IOException { String message = new String(body); //進(jìn)入該方法表示,沒路由到具體的隊(duì)列 //監(jiān)聽到消息,可以重新投遞或者其它方案來提高消息的可靠性。 System.out.println("Basic.Return返回的結(jié)果是:" + message); } });
此時(shí)有人問了,不想復(fù)雜化生產(chǎn)者的編程邏輯,又不想消息丟失,那么怎么辦? 還好RabbitMQ提供了一個(gè)叫做alternate-exchange東西,翻譯下就是備份交換器,這個(gè)干什么用呢?很簡單,它可以將未被路由的消息存儲(chǔ)在另一個(gè)exchange隊(duì)列中,再在需要的時(shí)候去處理這些消息。
那如何實(shí)現(xiàn)呢?
簡單一點(diǎn)可以通過webui管理后臺(tái)設(shè)置,當(dāng)你新建一個(gè)exchange業(yè)務(wù)的時(shí)候,可以給它設(shè)置Arguments,這個(gè)參數(shù)就是 alternate-exchange,其實(shí)alternate-exchange就是一個(gè)普通的exchange,類型最好是fanout 方便管理
當(dāng)你發(fā)送消息到你自己的exchange時(shí)候,對(duì)應(yīng)key沒有路由到queue,就會(huì)自動(dòng)轉(zhuǎn)移到alternate-exchange對(duì)應(yīng)的queue,起碼消息不會(huì)丟失。
下面一張圖看下投遞過程:
上述內(nèi)容就是RabbitMQ消息丟失如何解決 ,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。