本篇內(nèi)容主要講解“rabbitmq多個消費者同時接收_RabbitMQ實現(xiàn)公平派遣任務(wù)的方法”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“rabbitmq多個消費者同時接收_RabbitMQ實現(xiàn)公平派遣任務(wù)的方法”吧!
創(chuàng)新互聯(lián)是一家專業(yè)提供炎陵企業(yè)網(wǎng)站建設(shè),專注與成都網(wǎng)站設(shè)計、成都網(wǎng)站建設(shè)、H5建站、小程序制作等業(yè)務(wù)。10年已為炎陵眾多企業(yè)、政府機構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)站制作公司優(yōu)惠進行中。
先來看一下 RabbitMQ 工作隊列和競爭消費者簡化模型:
P:(producer/ publisher):生產(chǎn)者,一個發(fā)送消息的用戶應(yīng)用程序。
C1:消費者1,一個主要用來等待接收消息的用戶應(yīng)用程序1。
C2:消費者2,一個主要用來等待接收消息的用戶應(yīng)用程序2。
(C1 和 C2 指代多個消費者(工作人員))
工作隊列(任務(wù)隊列):主要用于在多個工作人員(消費者)之間分配耗時的任務(wù),主要思想是避免必須等待某些立即執(zhí)行的資源密集型任務(wù)完成的尷尬局面發(fā)生。將任務(wù)封裝為消息并將其發(fā)送到隊列,這樣,安排的任務(wù)可以在以后完成。當(dāng)有很多消費者時,任務(wù)將在他們之間共享,一個消息只能被一個消費者獲取。
這個概念在Web應(yīng)用程序中特別有用,因為在Web應(yīng)用程序中,不可能在較短的HTTP請求窗口內(nèi)處理復(fù)雜的任務(wù)。
生產(chǎn)者發(fā)送新任務(wù)消息:
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//循環(huán)發(fā)送 50 條信息
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
for(int i = 0; i < 50; i++) {
String message = "task..." + i;
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
System.out.println(" [x]Sent:" + message);
try {
Thread.sleep(i * 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
channel.close();
connection.close();
}
}
消費者1(消費者1設(shè)置消費耗時):
假設(shè)消耗 1 秒來處理消費過程
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
// 設(shè)置每個消費者同時只能處理一條消息
//channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body);
System.out.println("[消費者1] Received '" + msg + "'");
try{
Thread.sleep(1000); // 模擬消費者耗時
}catch (InterruptedException e){
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
}
}
消費者2(消費者2不設(shè)置消費耗時):
public class Worker2 {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
// 設(shè)置每個消費者同時只能處理一條消息
//channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body);
System.out.println("[消費者2] Received '" + msg + "'");
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
}
}
首先啟動消費者1(Worker)和消費者2(Worker2)應(yīng)用程序,再啟動生產(chǎn)者(NewTask)。
運行結(jié)果如下:
消費者1:
消費者2:
由結(jié)果可見,兩個消費者各自消費了 25 條信息,消息各不相同,從而實現(xiàn)了任務(wù)的分發(fā)。
公平派遣任務(wù)
由結(jié)果,我們注意到,工作者1(消費者1)的處理任務(wù)比較忙碌(消費耗時),而消費者2卻有大量時間處于空閑狀態(tài)不做任何任務(wù)。而 RabbitMQ 對此一無所知,進行平均分配消息,這是因為 RabbitMQ 在消息進入隊列才進行信息調(diào)度,不會進行未確認消息數(shù),只是盲目地將消息發(fā)送給消費者。
為了解決這一點,可以使用 basicQos 方法并將入?yún)?shù)設(shè)置為 1,這樣做的意義是:RabbitMQ 一次不要給消費者一個以上的消息。換句話即是在消費者處理并確認上一條信息之前,不要向其發(fā)送新的消息,而是分派給不繁忙的消費者進行消費。
在消費者1和消費者2加入一段代碼:
// 設(shè)置每個消費者同時只能處理一條消息
channel.basicQos(1);
再次運行結(jié)果:
消費者1:
消費者2:
因此,通過設(shè)置 basicQos ,可以讓 RabbitMQ 實現(xiàn)分輕重地對任務(wù)進行分派。
到此,相信大家對“rabbitmq多個消費者同時接收_RabbitMQ實現(xiàn)公平派遣任務(wù)的方法”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!