新建一個(gè)maven項(xiàng)目,在pom.xml文件加入以下依賴(lài)
在息縣等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場(chǎng)前瞻性、產(chǎn)品創(chuàng)新能力,以專(zhuān)注、極致的服務(wù)理念,為客戶(hù)提供網(wǎng)站建設(shè)、成都做網(wǎng)站 網(wǎng)站設(shè)計(jì)制作按需搭建網(wǎng)站,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),品牌網(wǎng)站制作,營(yíng)銷(xiāo)型網(wǎng)站建設(shè),外貿(mào)營(yíng)銷(xiāo)網(wǎng)站建設(shè),息縣網(wǎng)站建設(shè)費(fèi)用合理。
com.rabbitmq
amqp-client
3.6.5
新建一個(gè)P1類(lèi)
package com.rabbitMQ.test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author mowen
* @create 2019/11/20-11:23
*/
public class P1 {
public static void main(String[] args) throws IOException, TimeoutException {
//消息隊(duì)列名字
String queueName="queue";
//實(shí)例連接工廠
ConnectionFactory connectionFactory=new ConnectionFactory();
//設(shè)置地址
connectionFactory.setHost("192.168.128.233");
//設(shè)置端口
connectionFactory.setPort(5672);
//設(shè)置用戶(hù)名
connectionFactory.setUsername("mowen");
//設(shè)置密碼
connectionFactory.setPassword("123456");
//獲取連接(跟jdbc很像)
Connection connection = connectionFactory.newConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
//聲明隊(duì)列。
//參數(shù)1:隊(duì)列名
//參數(shù)2:持久化 (true表示是,隊(duì)列將在服務(wù)器重啟時(shí)依舊存在)
//參數(shù)3:獨(dú)占隊(duì)列(創(chuàng)建者可以使用的私有隊(duì)列,斷開(kāi)后自動(dòng)刪除)
//參數(shù)4:當(dāng)所有消費(fèi)者客戶(hù)端連接斷開(kāi)時(shí)是否自動(dòng)刪除隊(duì)列
//參數(shù)5:隊(duì)列的其他參數(shù)
channel.queueDeclare(queueName,true,false,false,null);
for (int i = 0; i < 10; i++) {
String msg="msg"+i;
// 基本發(fā)布消息
// 第一個(gè)參數(shù)為交換機(jī)名稱(chēng)、
// 第二個(gè)參數(shù)為隊(duì)列映射的路由key、
// 第三個(gè)參數(shù)為消息的其他屬性、
// 第四個(gè)參數(shù)為發(fā)送信息的主體
channel.basicPublish("",queueName,null,msg.getBytes());
}
channel.close();
connection.close();
}
}
運(yùn)行后再瀏覽器進(jìn)入RabbitMQ的控制臺(tái),切換到queue看到
新建一個(gè)C1類(lèi)
package com.rabbitMQ.test;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author mowen
* @create 2019/11/20-13:12
*/
public class C1 {
public static void main(String[] args) throws IOException, TimeoutException {
//消息隊(duì)列名字
String queueName="queue";
//實(shí)例連接工廠
ConnectionFactory connectionFactory=new ConnectionFactory();
//設(shè)置地址
connectionFactory.setHost("192.168.128.233");
//設(shè)置端口
connectionFactory.setPort(5672);
//設(shè)置用戶(hù)名
connectionFactory.setUsername("mowen");
//設(shè)置密碼
connectionFactory.setPassword("123456");
//獲取連接(跟jdbc很像)
Connection connection = connectionFactory.newConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
// 創(chuàng)建一個(gè)消費(fèi)者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消費(fèi)收到消息的時(shí)候調(diào)用的回調(diào)
System.out.println("C3接收到:" + new String(body));
}
};
//把消費(fèi)著綁定到指定隊(duì)列
//第一個(gè)是隊(duì)列名
//第二個(gè)是 是否自動(dòng)確認(rèn)
//第三個(gè)是消費(fèi)者
channel.basicConsume(queueName,true,consumer);
}
}
運(yùn)行后輸出為
消費(fèi)者一般都不會(huì)關(guān)閉,會(huì)一直等待隊(duì)列消息,可以手動(dòng)關(guān)閉程序。
channel.basicConsume(queueName,true,consumer);中的true為收到消息后自動(dòng)確認(rèn),改為false取消自動(dòng)確認(rèn)。
在handleDelivery方法最后面用
channel.basicAck(envelope.getDeliveryTag(),false);
來(lái)收到手動(dòng)確認(rèn)消息。消費(fèi)者可以有多個(gè)并且可以同時(shí)消費(fèi)一個(gè)隊(duì)列;
當(dāng)有多個(gè)消費(fèi)者同時(shí)消費(fèi)同一個(gè)隊(duì)列時(shí),收到的消息是平均分配的(消費(fèi)者沒(méi)收到之前已經(jīng)確認(rèn)每個(gè)消費(fèi)者受到的消息),
但當(dāng)其中一個(gè)消費(fèi)者性能差的話(huà),會(huì)影響其他的消費(fèi)者,因?yàn)檫€要等它收完消息,這樣會(huì)拖累其他消費(fèi)者。
可以設(shè)置channel 的basicQos方法
//設(shè)置最多接受消息數(shù)量
// 設(shè)置了這個(gè)參數(shù)之后要吧自動(dòng)確認(rèn)關(guān)掉
channel.basicQos(1);
扇形交換機(jī)是基本的交換機(jī)類(lèi)型,會(huì)把收到的消息以廣播的形式發(fā)送到綁定的隊(duì)列里,因?yàn)椴恍枰?jīng)過(guò)條件篩選,所以它的速度最快。
在生產(chǎn)者項(xiàng)目新建一個(gè)fanout類(lèi)
package com.rabbitMQ.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author mowen
* @date 2019/11/20-11:23
*/
public class fanout {
public static void main(String[] args) throws IOException, TimeoutException {
//交換機(jī)名字
String exchangeName="fanout";
//交換機(jī)名字類(lèi)型
String exchangeType="fanout";
//消息隊(duì)列名字
String queueName1="fanout.queue1";
String queueName2="fanout.queue2";
String queueName3="fanout.queue3";
//實(shí)例連接工廠
ConnectionFactory connectionFactory=new ConnectionFactory();
//設(shè)置地址
connectionFactory.setHost("192.168.128.233");
//設(shè)置端口
connectionFactory.setPort(5672);
//設(shè)置用戶(hù)名
connectionFactory.setUsername("mowen");
//設(shè)置密碼
connectionFactory.setPassword("123456");
//獲取連接(跟jdbc很像)
Connection connection = connectionFactory.newConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
//聲明隊(duì)列。
//參數(shù)1:隊(duì)列名
//參數(shù)2:持久化 (true表示是,隊(duì)列將在服務(wù)器重啟時(shí)依舊存在)
//參數(shù)3:獨(dú)占隊(duì)列(創(chuàng)建者可以使用的私有隊(duì)列,斷開(kāi)后自動(dòng)刪除)
//參數(shù)4:當(dāng)所有消費(fèi)者客戶(hù)端連接斷開(kāi)時(shí)是否自動(dòng)刪除隊(duì)列
//參數(shù)5:隊(duì)列的其他參數(shù)
channel.queueDeclare(queueName1,true,false,false,null);
channel.queueDeclare(queueName2,true,false,false,null);
channel.queueDeclare(queueName3,true,false,false,null);
//聲明交換機(jī)
channel.exchangeDeclare(exchangeName,exchangeType);
//隊(duì)列綁定到交換機(jī)
channel.queueBind(queueName1,exchangeName,"");
channel.queueBind(queueName2,exchangeName,"");
channel.queueBind(queueName3,exchangeName,"");
for (int i = 0; i < 10; i++) {
String msg="msg"+i;
// 基本發(fā)布消息
// 第一個(gè)參數(shù)為交換機(jī)名稱(chēng)、
// 第二個(gè)參數(shù)為隊(duì)列映射的路由key、
// 第三個(gè)參數(shù)為消息的其他屬性、
// 第四個(gè)參數(shù)為發(fā)送信息的主體
channel.basicPublish(exchangeName,"",null,msg.getBytes());
}
channel.close();
connection.close();
}
}
運(yùn)行后在RabbitMQ網(wǎng)頁(yè)管理后臺(tái)的queue會(huì)看到
切換到Exchanges會(huì)看到一個(gè)
就是我們聲明的交換機(jī),點(diǎn)擊會(huì)看到我們綁定的隊(duì)列
直連交換機(jī)會(huì)帶路由功能,隊(duì)列通過(guò)routing_key與直連交換機(jī)綁定,發(fā)送消息需要指定routing_key,交換機(jī)收到消息時(shí),交換機(jī)會(huì)根據(jù)routing_key發(fā)送到指定隊(duì)列里,同樣的routing_key可以支持多個(gè)隊(duì)列。
在生產(chǎn)者項(xiàng)目新建direct類(lèi)
package com.rabbitMQ.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author mowen
* @date 2019/11/20-11:23
*/
public class direct {
public static void main(String[] args) throws IOException, TimeoutException {
String exchangeName="direct";
String exchangeType="direct";
//消息隊(duì)列名字
String queueName1="direct.queue1";
String queueName2="direct.queue2";
String queueName3="direct.queue3";
//實(shí)例連接工廠
ConnectionFactory connectionFactory=new ConnectionFactory();
//設(shè)置地址
connectionFactory.setHost("192.168.128.233");
//設(shè)置端口
connectionFactory.setPort(5672);
//設(shè)置用戶(hù)名
connectionFactory.setUsername("mowen");
//設(shè)置密碼
connectionFactory.setPassword("123456");
//獲取連接(跟jdbc很像)
Connection connection = connectionFactory.newConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
//聲明隊(duì)列。
//參數(shù)1:隊(duì)列名
//參數(shù)2:持久化 (true表示是,隊(duì)列將在服務(wù)器重啟時(shí)依舊存在)
//參數(shù)3:獨(dú)占隊(duì)列(創(chuàng)建者可以使用的私有隊(duì)列,斷開(kāi)后自動(dòng)刪除)
//參數(shù)4:當(dāng)所有消費(fèi)者客戶(hù)端連接斷開(kāi)時(shí)是否自動(dòng)刪除隊(duì)列
//參數(shù)5:隊(duì)列的其他參數(shù)
channel.queueDeclare(queueName1,true,false,false,null);
channel.queueDeclare(queueName2,true,false,false,null);
channel.queueDeclare(queueName3,true,false,false,null);
//聲明交換機(jī)
channel.exchangeDeclare(exchangeName,exchangeType);
//隊(duì)列綁定到交換機(jī)并指定rouing_key
channel.queueBind(queueName1,exchangeName,"key1");
channel.queueBind(queueName2,exchangeName,"key2");
channel.queueBind(queueName3,exchangeName,"key1");
for (int i = 0; i < 10; i++) {
String msg="msg"+i;
// 基本發(fā)布消息
// 第一個(gè)參數(shù)為交換機(jī)名稱(chēng)、
// 第二個(gè)參數(shù)為隊(duì)列映射的路由key、
// 第三個(gè)參數(shù)為消息的其他屬性、
// 第四個(gè)參數(shù)為發(fā)送信息的主體
channel.basicPublish(exchangeName,"key1",null,msg.getBytes());
}
channel.close();
connection.close();
}
}
運(yùn)行后到后臺(tái)的queue會(huì)看到
切換到Exchanges會(huì)看到
點(diǎn)擊進(jìn)去
主題交換機(jī)的routing_key可以有一定的規(guī)則,交換機(jī)和隊(duì)列的routing_key需要采用.#.…..的格式
每個(gè)部分用.分開(kāi)
*代表一個(gè)單詞(不是字符)
#代表任意數(shù)量(0或n個(gè))單詞
在生產(chǎn)者項(xiàng)目新進(jìn)topic類(lèi)
package com.rabbitMQ.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author mowen
* @date 2019/11/20-11:23
*/
public class topic {
public static void main(String[] args) throws IOException, TimeoutException {
String exchangeName="topic";
String exchangeType="topic";
//消息隊(duì)列名字
String queueName1="topic.queue1";
String queueName2="topic.queue2";
String queueName3="topic.queue3";
//實(shí)例連接工廠
ConnectionFactory connectionFactory=new ConnectionFactory();
//設(shè)置地址
connectionFactory.setHost("192.168.128.233");
//設(shè)置端口
connectionFactory.setPort(5672);
//設(shè)置用戶(hù)名
connectionFactory.setUsername("mowen");
//設(shè)置密碼
connectionFactory.setPassword("123456");
//獲取連接(跟jdbc很像)
Connection connection = connectionFactory.newConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
//聲明隊(duì)列。
//參數(shù)1:隊(duì)列名
//參數(shù)2:持久化 (true表示是,隊(duì)列將在服務(wù)器重啟時(shí)依舊存在)
//參數(shù)3:獨(dú)占隊(duì)列(創(chuàng)建者可以使用的私有隊(duì)列,斷開(kāi)后自動(dòng)刪除)
//參數(shù)4:當(dāng)所有消費(fèi)者客戶(hù)端連接斷開(kāi)時(shí)是否自動(dòng)刪除隊(duì)列
//參數(shù)5:隊(duì)列的其他參數(shù)
channel.queueDeclare(queueName1,true,false,false,null);
channel.queueDeclare(queueName2,true,false,false,null);
channel.queueDeclare(queueName3,true,false,false,null);
//聲明交換機(jī)
channel.exchangeDeclare(exchangeName,exchangeType);
//隊(duì)列綁定到交換機(jī)并指定rouing_key
channel.queueBind(queueName1,exchangeName,"com.aaa.*");
channel.queueBind(queueName2,exchangeName,"com.*.topic");
channel.queueBind(queueName3,exchangeName,"com.bbb.*");
for (int i = 0; i < 10; i++) {
String msg="msg"+i;
// 基本發(fā)布消息
// 第一個(gè)參數(shù)為交換機(jī)名稱(chēng)、
// 第二個(gè)參數(shù)為隊(duì)列映射的路由key、
// 第三個(gè)參數(shù)為消息的其他屬性、
// 第四個(gè)參數(shù)為發(fā)送信息的主體
channel.basicPublish(exchangeName,"com.aaa.topic",null,msg.getBytes());
}
channel.close();
connection.close();
}
}
運(yùn)行后,到后臺(tái)queue會(huì)看到
切換到Exchanges會(huì)看到
點(diǎn)擊進(jìn)入會(huì)看到