真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)

一、發(fā)送消息到隊(duì)列(生產(chǎn)者)

新建一個(gè)maven項(xiàng)目,在pom.xml文件加入以下依賴(lài)

在息縣等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場(chǎng)前瞻性、產(chǎn)品創(chuàng)新能力,以專(zhuān)注、極致的服務(wù)理念,為客戶(hù)提供網(wǎng)站建設(shè)、成都做網(wǎng)站 網(wǎng)站設(shè)計(jì)制作按需搭建網(wǎng)站,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),品牌網(wǎng)站制作,營(yíng)銷(xiāo)型網(wǎng)站建設(shè),外貿(mào)營(yíng)銷(xiāo)網(wǎng)站建設(shè),息縣網(wǎng)站建設(shè)費(fèi)用合理。


    
        com.rabbitmq
        amqp-client
        3.6.5
    

新建一個(gè)P1類(lèi)

package com.rabbitMQ.test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author mowen
 * @create 2019/11/20-11:23
 */
public class P1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //消息隊(duì)列名字
        String queueName="queue";
        //實(shí)例連接工廠
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //設(shè)置地址
        connectionFactory.setHost("192.168.128.233");
        //設(shè)置端口
        connectionFactory.setPort(5672);
        //設(shè)置用戶(hù)名
        connectionFactory.setUsername("mowen");
        //設(shè)置密碼
        connectionFactory.setPassword("123456");
        //獲取連接(跟jdbc很像)
        Connection connection = connectionFactory.newConnection();
        //創(chuàng)建通道
        Channel channel = connection.createChannel();
        //聲明隊(duì)列。
        //參數(shù)1:隊(duì)列名
        //參數(shù)2:持久化 (true表示是,隊(duì)列將在服務(wù)器重啟時(shí)依舊存在)
        //參數(shù)3:獨(dú)占隊(duì)列(創(chuàng)建者可以使用的私有隊(duì)列,斷開(kāi)后自動(dòng)刪除)
        //參數(shù)4:當(dāng)所有消費(fèi)者客戶(hù)端連接斷開(kāi)時(shí)是否自動(dòng)刪除隊(duì)列
        //參數(shù)5:隊(duì)列的其他參數(shù)
        channel.queueDeclare(queueName,true,false,false,null);

        for (int i = 0; i < 10; i++) {
            String msg="msg"+i;
            // 基本發(fā)布消息
            // 第一個(gè)參數(shù)為交換機(jī)名稱(chēng)、
            // 第二個(gè)參數(shù)為隊(duì)列映射的路由key、
            // 第三個(gè)參數(shù)為消息的其他屬性、
            // 第四個(gè)參數(shù)為發(fā)送信息的主體
            channel.basicPublish("",queueName,null,msg.getBytes());
        }

        channel.close();
        connection.close();
    }
}

運(yùn)行后再瀏覽器進(jìn)入RabbitMQ的控制臺(tái),切換到queue看到

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)

二、獲取隊(duì)列消息(消費(fèi)者)

新建一個(gè)C1類(lèi)

package com.rabbitMQ.test;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author mowen
 * @create 2019/11/20-13:12
 */
public class C1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //消息隊(duì)列名字
        String queueName="queue";
        //實(shí)例連接工廠
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //設(shè)置地址
        connectionFactory.setHost("192.168.128.233");
        //設(shè)置端口
        connectionFactory.setPort(5672);
        //設(shè)置用戶(hù)名
        connectionFactory.setUsername("mowen");
        //設(shè)置密碼
        connectionFactory.setPassword("123456");
        //獲取連接(跟jdbc很像)
        Connection connection = connectionFactory.newConnection();
        //創(chuàng)建通道
        Channel channel = connection.createChannel();

        // 創(chuàng)建一個(gè)消費(fèi)者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消費(fèi)收到消息的時(shí)候調(diào)用的回調(diào)
                System.out.println("C3接收到:" + new String(body));
            }
        };

        //把消費(fèi)著綁定到指定隊(duì)列
        //第一個(gè)是隊(duì)列名
        //第二個(gè)是 是否自動(dòng)確認(rèn)
        //第三個(gè)是消費(fèi)者
        channel.basicConsume(queueName,true,consumer);

    }
}

運(yùn)行后輸出為

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)

消費(fèi)者一般都不會(huì)關(guān)閉,會(huì)一直等待隊(duì)列消息,可以手動(dòng)關(guān)閉程序。

channel.basicConsume(queueName,true,consumer);中的true為收到消息后自動(dòng)確認(rèn),改為false取消自動(dòng)確認(rèn)。

在handleDelivery方法最后面用

channel.basicAck(envelope.getDeliveryTag(),false);

來(lái)收到手動(dòng)確認(rèn)消息。消費(fèi)者可以有多個(gè)并且可以同時(shí)消費(fèi)一個(gè)隊(duì)列;

當(dāng)有多個(gè)消費(fèi)者同時(shí)消費(fèi)同一個(gè)隊(duì)列時(shí),收到的消息是平均分配的(消費(fèi)者沒(méi)收到之前已經(jīng)確認(rèn)每個(gè)消費(fèi)者受到的消息),

但當(dāng)其中一個(gè)消費(fèi)者性能差的話(huà),會(huì)影響其他的消費(fèi)者,因?yàn)檫€要等它收完消息,這樣會(huì)拖累其他消費(fèi)者。

可以設(shè)置channel 的basicQos方法

//設(shè)置最多接受消息數(shù)量
// 設(shè)置了這個(gè)參數(shù)之后要吧自動(dòng)確認(rèn)關(guān)掉
channel.basicQos(1);

三、扇形(fanout)交換機(jī)

扇形交換機(jī)是基本的交換機(jī)類(lèi)型,會(huì)把收到的消息以廣播的形式發(fā)送到綁定的隊(duì)列里,因?yàn)椴恍枰?jīng)過(guò)條件篩選,所以它的速度最快。

在生產(chǎn)者項(xiàng)目新建一個(gè)fanout類(lèi)

package com.rabbitMQ.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author mowen
 * @date  2019/11/20-11:23
 */
public class fanout {
    public static void main(String[] args) throws IOException, TimeoutException {
        //交換機(jī)名字
        String exchangeName="fanout";
        //交換機(jī)名字類(lèi)型
        String exchangeType="fanout";
        //消息隊(duì)列名字
        String queueName1="fanout.queue1";
        String queueName2="fanout.queue2";
        String queueName3="fanout.queue3";
        //實(shí)例連接工廠
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //設(shè)置地址
        connectionFactory.setHost("192.168.128.233");
        //設(shè)置端口
        connectionFactory.setPort(5672);
        //設(shè)置用戶(hù)名
        connectionFactory.setUsername("mowen");
        //設(shè)置密碼
        connectionFactory.setPassword("123456");
        //獲取連接(跟jdbc很像)
        Connection connection = connectionFactory.newConnection();
        //創(chuàng)建通道
        Channel channel = connection.createChannel();
        //聲明隊(duì)列。
        //參數(shù)1:隊(duì)列名
        //參數(shù)2:持久化 (true表示是,隊(duì)列將在服務(wù)器重啟時(shí)依舊存在)
        //參數(shù)3:獨(dú)占隊(duì)列(創(chuàng)建者可以使用的私有隊(duì)列,斷開(kāi)后自動(dòng)刪除)
        //參數(shù)4:當(dāng)所有消費(fèi)者客戶(hù)端連接斷開(kāi)時(shí)是否自動(dòng)刪除隊(duì)列
        //參數(shù)5:隊(duì)列的其他參數(shù)
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        channel.queueDeclare(queueName3,true,false,false,null);

        //聲明交換機(jī)
        channel.exchangeDeclare(exchangeName,exchangeType);

        //隊(duì)列綁定到交換機(jī)
        channel.queueBind(queueName1,exchangeName,"");
        channel.queueBind(queueName2,exchangeName,"");
        channel.queueBind(queueName3,exchangeName,"");

        for (int i = 0; i < 10; i++) {
            String msg="msg"+i;
            // 基本發(fā)布消息
            // 第一個(gè)參數(shù)為交換機(jī)名稱(chēng)、
            // 第二個(gè)參數(shù)為隊(duì)列映射的路由key、
            // 第三個(gè)參數(shù)為消息的其他屬性、
            // 第四個(gè)參數(shù)為發(fā)送信息的主體
            channel.basicPublish(exchangeName,"",null,msg.getBytes());
        }

        channel.close();
        connection.close();
    }
}

運(yùn)行后在RabbitMQ網(wǎng)頁(yè)管理后臺(tái)的queue會(huì)看到

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)

切換到Exchanges會(huì)看到一個(gè)

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)

就是我們聲明的交換機(jī),點(diǎn)擊會(huì)看到我們綁定的隊(duì)列

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)

四、直連(direct)交換機(jī)

直連交換機(jī)會(huì)帶路由功能,隊(duì)列通過(guò)routing_key與直連交換機(jī)綁定,發(fā)送消息需要指定routing_key,交換機(jī)收到消息時(shí),交換機(jī)會(huì)根據(jù)routing_key發(fā)送到指定隊(duì)列里,同樣的routing_key可以支持多個(gè)隊(duì)列。

在生產(chǎn)者項(xiàng)目新建direct類(lèi)

package com.rabbitMQ.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author mowen
 * @date  2019/11/20-11:23
 */
public class direct {
    public static void main(String[] args) throws IOException, TimeoutException {
        String exchangeName="direct";
        String exchangeType="direct";
        //消息隊(duì)列名字
        String queueName1="direct.queue1";
        String queueName2="direct.queue2";
        String queueName3="direct.queue3";
        //實(shí)例連接工廠
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //設(shè)置地址
        connectionFactory.setHost("192.168.128.233");
        //設(shè)置端口
        connectionFactory.setPort(5672);
        //設(shè)置用戶(hù)名
        connectionFactory.setUsername("mowen");
        //設(shè)置密碼
        connectionFactory.setPassword("123456");
        //獲取連接(跟jdbc很像)
        Connection connection = connectionFactory.newConnection();
        //創(chuàng)建通道
        Channel channel = connection.createChannel();
        //聲明隊(duì)列。
        //參數(shù)1:隊(duì)列名
        //參數(shù)2:持久化 (true表示是,隊(duì)列將在服務(wù)器重啟時(shí)依舊存在)
        //參數(shù)3:獨(dú)占隊(duì)列(創(chuàng)建者可以使用的私有隊(duì)列,斷開(kāi)后自動(dòng)刪除)
        //參數(shù)4:當(dāng)所有消費(fèi)者客戶(hù)端連接斷開(kāi)時(shí)是否自動(dòng)刪除隊(duì)列
        //參數(shù)5:隊(duì)列的其他參數(shù)
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        channel.queueDeclare(queueName3,true,false,false,null);

        //聲明交換機(jī)
        channel.exchangeDeclare(exchangeName,exchangeType);

        //隊(duì)列綁定到交換機(jī)并指定rouing_key
        channel.queueBind(queueName1,exchangeName,"key1");
        channel.queueBind(queueName2,exchangeName,"key2");
        channel.queueBind(queueName3,exchangeName,"key1");

        for (int i = 0; i < 10; i++) {
            String msg="msg"+i;
            // 基本發(fā)布消息
            // 第一個(gè)參數(shù)為交換機(jī)名稱(chēng)、
            // 第二個(gè)參數(shù)為隊(duì)列映射的路由key、
            // 第三個(gè)參數(shù)為消息的其他屬性、
            // 第四個(gè)參數(shù)為發(fā)送信息的主體
            channel.basicPublish(exchangeName,"key1",null,msg.getBytes());
        }

        channel.close();
        connection.close();
    }
}

運(yùn)行后到后臺(tái)的queue會(huì)看到

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)

切換到Exchanges會(huì)看到

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)

點(diǎn)擊進(jìn)去

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)

五、主題(topic)交換機(jī)

主題交換機(jī)的routing_key可以有一定的規(guī)則,交換機(jī)和隊(duì)列的routing_key需要采用.#.…..的格式

每個(gè)部分用.分開(kāi)

*代表一個(gè)單詞(不是字符)

#代表任意數(shù)量(0或n個(gè))單詞

在生產(chǎn)者項(xiàng)目新進(jìn)topic類(lèi)

package com.rabbitMQ.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author mowen
 * @date  2019/11/20-11:23
 */
public class topic {
    public static void main(String[] args) throws IOException, TimeoutException {
        String exchangeName="topic";
        String exchangeType="topic";
        //消息隊(duì)列名字
        String queueName1="topic.queue1";
        String queueName2="topic.queue2";
        String queueName3="topic.queue3";
        //實(shí)例連接工廠
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //設(shè)置地址
        connectionFactory.setHost("192.168.128.233");
        //設(shè)置端口
        connectionFactory.setPort(5672);
        //設(shè)置用戶(hù)名
        connectionFactory.setUsername("mowen");
        //設(shè)置密碼
        connectionFactory.setPassword("123456");
        //獲取連接(跟jdbc很像)
        Connection connection = connectionFactory.newConnection();
        //創(chuàng)建通道
        Channel channel = connection.createChannel();
        //聲明隊(duì)列。
        //參數(shù)1:隊(duì)列名
        //參數(shù)2:持久化 (true表示是,隊(duì)列將在服務(wù)器重啟時(shí)依舊存在)
        //參數(shù)3:獨(dú)占隊(duì)列(創(chuàng)建者可以使用的私有隊(duì)列,斷開(kāi)后自動(dòng)刪除)
        //參數(shù)4:當(dāng)所有消費(fèi)者客戶(hù)端連接斷開(kāi)時(shí)是否自動(dòng)刪除隊(duì)列
        //參數(shù)5:隊(duì)列的其他參數(shù)
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        channel.queueDeclare(queueName3,true,false,false,null);

        //聲明交換機(jī)
        channel.exchangeDeclare(exchangeName,exchangeType);

        //隊(duì)列綁定到交換機(jī)并指定rouing_key
        channel.queueBind(queueName1,exchangeName,"com.aaa.*");
        channel.queueBind(queueName2,exchangeName,"com.*.topic");
        channel.queueBind(queueName3,exchangeName,"com.bbb.*");

        for (int i = 0; i < 10; i++) {
            String msg="msg"+i;
            // 基本發(fā)布消息
            // 第一個(gè)參數(shù)為交換機(jī)名稱(chēng)、
            // 第二個(gè)參數(shù)為隊(duì)列映射的路由key、
            // 第三個(gè)參數(shù)為消息的其他屬性、
            // 第四個(gè)參數(shù)為發(fā)送信息的主體
            channel.basicPublish(exchangeName,"com.aaa.topic",null,msg.getBytes());
        }

        channel.close();
        connection.close();
    }
}

運(yùn)行后,到后臺(tái)queue會(huì)看到

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)

切換到Exchanges會(huì)看到

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)

點(diǎn)擊進(jìn)入會(huì)看到

Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)


當(dāng)前標(biāo)題:Java操作RabbitMQ添加隊(duì)列、消費(fèi)隊(duì)列和三個(gè)交換機(jī)
網(wǎng)站路徑:http://weahome.cn/article/jpjjss.html

其他資訊

在線(xiàn)咨詢(xún)

微信咨詢(xún)

電話(huà)咨詢(xún)

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部