真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

RabbitMQ如何通過(guò)持久化保證消息99.99%不丟失?-創(chuàng)新互聯(lián)

1. 本篇概要

要解決該問(wèn)題,就要用到RabbitMQ中持久化的概念,所謂持久化,就是RabbitMQ會(huì)將內(nèi)存中的數(shù)據(jù)(Exchange 交換器,Queue 隊(duì)列,Message 消息)固化到磁盤(pán),以防異常情況發(fā)生時(shí),數(shù)據(jù)丟失。

玉泉街道網(wǎng)站制作公司哪家好,找成都創(chuàng)新互聯(lián)!從網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開(kāi)發(fā)、APP開(kāi)發(fā)、成都響應(yīng)式網(wǎng)站建設(shè)公司等網(wǎng)站項(xiàng)目制作,到程序開(kāi)發(fā),運(yùn)營(yíng)維護(hù)。成都創(chuàng)新互聯(lián)自2013年起到現(xiàn)在10年的時(shí)間,我們擁有了豐富的建站經(jīng)驗(yàn)和運(yùn)維經(jīng)驗(yàn),來(lái)保證我們的工作的順利進(jìn)行。專(zhuān)注于網(wǎng)站建設(shè)就選成都創(chuàng)新互聯(lián)

其中,RabblitMQ的持久化分為三個(gè)部分:

  1. 交換器(Exchange)的持久化
  2. 隊(duì)列(Queue)的持久化
  3. 消息(Message)的持久化

2. 交換器(Exchange)的持久化

在上篇博客中,我們聲明Exchange的代碼是這樣的:

private final static String EXCHANGE_NAME = "normal-confirm-exchange";

// 創(chuàng)建一個(gè)Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");

這種情況下聲明的Exchange是非持久化的,在RabbitMQ出現(xiàn)異常情況(重啟,宕機(jī))時(shí),該Exchange會(huì)丟失,會(huì)影響后續(xù)的消息寫(xiě)入該Exchange,那么如何設(shè)置Exchange為持久化的呢?答案是設(shè)置durable參數(shù)。

durable:設(shè)置是否持久化。durable設(shè)置為true表示持久化,反之是非持久化。

持久化可以將交換器存盤(pán),在服務(wù)器重啟的時(shí)候不會(huì)丟失相關(guān)信息。

設(shè)置Exchange持久化:

channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);

此時(shí)調(diào)用的重載方法為:

public DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException {
    return this.exchangeDeclare(exchange, (String)type, durable, false, (Map)null);
}

為了能更好的理解,我們新建個(gè)生產(chǎn)類(lèi)如下:

package com.zwwhnly.springbootaction.rabbitmq.durable;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DurableProducer {
    private final static String EXCHANGE_NAME = "durable-exchange";
    private final static String QUEUE_NAME = "durable-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 創(chuàng)建連接
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置 RabbitMQ 的主機(jī)名
        factory.setHost("localhost");
        // 創(chuàng)建一個(gè)連接
        Connection connection = factory.newConnection();
        // 創(chuàng)建一個(gè)通道
        Channel channel = connection.createChannel();
        // 創(chuàng)建一個(gè)Exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 發(fā)送消息
        String message = "durable exchange test";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

        // 關(guān)閉頻道和連接
        channel.close();
        connection.close();
    }
}

示例代碼中,我們新建了1個(gè)非持久化的Exchange,1個(gè)非持久化的Queue,并將它們做了綁定,此時(shí)運(yùn)行代碼,Exchange和Queue新建成功,消息‘durable exchange test’也被正確地投遞到了隊(duì)列中:

RabbitMQ如何通過(guò)持久化保證消息99.99%不丟失?

RabbitMQ如何通過(guò)持久化保證消息99.99%不丟失?

此時(shí)重啟下RabbitMQ服務(wù),會(huì)發(fā)現(xiàn)Exchange丟失了:

RabbitMQ如何通過(guò)持久化保證消息99.99%不丟失?

修改下代碼,將durable參數(shù)設(shè)置為ture:

// 創(chuàng)建一個(gè)Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);

此時(shí)運(yùn)行完代碼,然后重啟下RabbitMQ服務(wù),會(huì)發(fā)現(xiàn)Exchange不再丟失:

RabbitMQ如何通過(guò)持久化保證消息99.99%不丟失?

3. 隊(duì)列(Queue)的持久化

細(xì)心的網(wǎng)友可能會(huì)發(fā)現(xiàn),雖然現(xiàn)在重啟RabbitMQ服務(wù)后,Exchange不丟失了,但是隊(duì)列和消息丟失了,那么如何解決隊(duì)列不丟失呢?答案也是設(shè)置durable參數(shù)。

durable:設(shè)置是否持久化。為true則設(shè)置隊(duì)列為持久化。

持久化的隊(duì)列會(huì)存盤(pán),在服務(wù)器重啟的時(shí)候可以保證不丟失相關(guān)信息。

簡(jiǎn)單修改下上面聲明Queue的代碼,將durable參數(shù)設(shè)置為true:

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

此時(shí)調(diào)用的重載方法如下:

public com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) throws IOException {
    validateQueueNameLength(queue);
    return (com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk)this.exnWrappingRpc((new com.rabbitmq.client.AMQP.Queue.Declare.Builder()).queue(queue).durable(durable).exclusive(exclusive).autoDelete(autoDelete).arguments(arguments).build()).getMethod();
}

運(yùn)行代碼,然后重啟RabbitMQ服務(wù),會(huì)發(fā)現(xiàn)隊(duì)列現(xiàn)在不丟失了:

RabbitMQ如何通過(guò)持久化保證消息99.99%不丟失?

4. 消息(Message)的持久化

雖然現(xiàn)在RabbitMQ重啟后,Exchange和Queue都不丟失了,但是存儲(chǔ)在Queue里的消息卻仍然會(huì)丟失,那么如何保證消息不丟失呢?答案是設(shè)置消息的投遞模式為2,即代表持久化。

修改發(fā)送消息的代碼為:

// 發(fā)送消息
String message = "durable exchange test";
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

調(diào)用的重載方法為:

public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
    this.basicPublish(exchange, routingKey, false, props, body);
}

運(yùn)行代碼,然后重啟RabbitMQ服務(wù),發(fā)現(xiàn)此時(shí)Exchange,Queue,消息都不丟失了:

RabbitMQ如何通過(guò)持久化保證消息99.99%不丟失?

RabbitMQ如何通過(guò)持久化保證消息99.99%不丟失?

至此,我們完美的解決了RabbitMQ重啟后,消息丟失的問(wèn)題。

最終的代碼如下,你也可以通過(guò)文末的源碼鏈接下載本文用到的所有源碼:

package com.zwwhnly.springbootaction.rabbitmq.durable;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DurableProducer {
    private final static String EXCHANGE_NAME = "durable-exchange";
    private final static String QUEUE_NAME = "durable-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 創(chuàng)建連接
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置 RabbitMQ 的主機(jī)名
        factory.setHost("localhost");
        // 創(chuàng)建一個(gè)連接
        Connection connection = factory.newConnection();
        // 創(chuàng)建一個(gè)通道
        Channel channel = connection.createChannel();
        // 創(chuàng)建一個(gè)Exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 發(fā)送消息
        String message = "durable exchange test";
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
        channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

        // 關(guān)閉頻道和連接
        channel.close();
        connection.close();
    }
}

5. 注意事項(xiàng)

1)理論上可以將所有的消息都設(shè)置為持久化,但是這樣會(huì)嚴(yán)重影響RabbitMQ的性能。因?yàn)閷?xiě)入磁盤(pán)的速度比寫(xiě)入內(nèi)存的速度慢得不止一點(diǎn)點(diǎn)。對(duì)于可靠性不是那么高的消息可以不采用持久化處理以提高整體的吞吐量。在選擇是否要將消息持久化時(shí),需要在可靠性和吞吐量之間做一個(gè)權(quán)衡。

2)將交換器、隊(duì)列、消息都設(shè)置了持久化之后仍然不能百分之百保證數(shù)據(jù)不丟失,因?yàn)楫?dāng)持久化的消息正確存入RabbitMQ之后,還需要一段時(shí)間(雖然很短,但是不可忽視)才能存入磁盤(pán)之中。如果在這段時(shí)間內(nèi)RabbitMQ服務(wù)節(jié)點(diǎn)發(fā)生了宕機(jī)、重啟等異常情況,消息還沒(méi)來(lái)得及落盤(pán),那么這些消息將會(huì)丟失。

3)單單只設(shè)置隊(duì)列持久化,重啟之后消息會(huì)丟失;單單只設(shè)置消息的持久化,重啟之后隊(duì)列消失,繼而消息也丟失。單單設(shè)置消息持久化而不設(shè)置隊(duì)列的持久化顯得毫無(wú)意義。

文末彩蛋

Java學(xué)習(xí)、面試;文檔、視頻資源免費(fèi)獲取

RabbitMQ如何通過(guò)持久化保證消息99.99%不丟失?

創(chuàng)新互聯(lián)www.cdcxhl.cn,專(zhuān)業(yè)提供香港、美國(guó)云服務(wù)器,動(dòng)態(tài)BGP最優(yōu)骨干路由自動(dòng)選擇,持續(xù)穩(wěn)定高效的網(wǎng)絡(luò)助力業(yè)務(wù)部署。公司持有工信部辦法的idc、isp許可證, 機(jī)房獨(dú)有T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確進(jìn)行流量調(diào)度,確保服務(wù)器高可用性。佳節(jié)活動(dòng)現(xiàn)已開(kāi)啟,新人活動(dòng)云服務(wù)器買(mǎi)多久送多久。


文章名稱(chēng):RabbitMQ如何通過(guò)持久化保證消息99.99%不丟失?-創(chuàng)新互聯(lián)
本文鏈接:http://weahome.cn/article/desogp.html

其他資訊

在線咨詢(xún)

微信咨詢(xún)

電話咨詢(xún)

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部