(一)基本概念
成都創(chuàng)新互聯(lián)堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:網(wǎng)站建設(shè)、網(wǎng)站設(shè)計(jì)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時(shí)代的安陸網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!RabbitMQ是流行的開源消息隊(duì)列系統(tǒng),用erlang語言開發(fā)。我曾經(jīng)對這門語言挺有興趣,學(xué)過一段時(shí)間,后來沒堅(jiān)持。RabbitMQ是AMQP(高級消息隊(duì)列協(xié)議)的標(biāo)準(zhǔn)實(shí)現(xiàn)。如果不熟悉AMQP,直接看RabbitMQ的文檔會比較困難。不過它也只有幾個(gè)關(guān)鍵概念,這里簡單介紹。
RabbitMQ的結(jié)構(gòu)圖如下:
幾個(gè)概念說明:
Broker:簡單來說就是消息隊(duì)列服務(wù)器實(shí)體。
Exchange:消息交換機(jī),它指定消息按什么規(guī)則,路由到哪個(gè)隊(duì)列。
Queue:消息隊(duì)列載體,每個(gè)消息都會被投入到一個(gè)或多個(gè)隊(duì)列。
Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來。
Routing Key:路由關(guān)鍵字,exchange根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞。
vhost:虛擬主機(jī),一個(gè)broker里可以開設(shè)多個(gè)vhost,用作不同用戶的權(quán)限分離。
producer:消息生產(chǎn)者,就是投遞消息的程序。
consumer:消息消費(fèi)者,就是接受消息的程序。
channel:消息通道,在客戶端的每個(gè)連接里,可建立多個(gè)channel,每個(gè)channel代表一個(gè)會話任務(wù)。
消息隊(duì)列的使用過程大概如下:
(1)客戶端連接到消息隊(duì)列服務(wù)器,打開一個(gè)channel。
(2)客戶端聲明一個(gè)exchange,并設(shè)置相關(guān)屬性。
(3)客戶端聲明一個(gè)queue,并設(shè)置相關(guān)屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關(guān)系。
(5)客戶端投遞消息到exchange。
exchange接收到消息后,就根據(jù)消息的key和已經(jīng)設(shè)置的binding,進(jìn)行消息路由,將消息投遞到一個(gè)或多個(gè)隊(duì)列里。
exchange也有幾個(gè)類型,完全根據(jù)key進(jìn)行投遞的叫做Direct交換機(jī),例如,綁定時(shí)設(shè)置了routing key為”abc”,那么客戶端提交的消息,只有設(shè)置了key為”abc”的才會投遞到隊(duì)列。對key進(jìn)行模式匹配后進(jìn)行投遞的叫做Topic交換機(jī),符號”#”匹配一個(gè)或多個(gè)詞,符號”*”匹配正好一個(gè)詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。還有一種不需要key的,叫做Fanout交換機(jī),它采取廣播模式,一個(gè)消息進(jìn)來時(shí),投遞到與該交換機(jī)綁定的所有隊(duì)列。
RabbitMQ支持消息的持久化,也就是數(shù)據(jù)寫在磁盤上,為了數(shù)據(jù)安全考慮,我想大多數(shù)用戶都會選擇持久化。消息隊(duì)列持久化包括3個(gè)部分:
(1)exchange持久化,在聲明時(shí)指定durable => 1
(2)queue持久化,在聲明時(shí)指定durable => 1
(3)消息持久化,在投遞時(shí)指定delivery_mode => 2(1是非持久化)
如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個(gè)持久化,一個(gè)非持久化,就不允許建立綁定。
(二)應(yīng)用實(shí)際
我使用Linux服務(wù)器(ubuntu 9.10 64位),安裝RabbitMQ非常方便。
先運(yùn)行如下命令安裝erlang:
apt-get install erlang-nox
再到rabbitmq.com上下載RabbitMQ的安裝包,如下安裝:
dpkg -i rabbitmq-server_2.6.1-1_all.deb
安裝完后,使用
/etc/init.d/rabbitmq-server start|stop|restart
來啟動(dòng)、停止、重啟rabbitmq。
在正式應(yīng)用之前,我們先在RabbitMQ里創(chuàng)建一個(gè)vhost,加一個(gè)用戶,并設(shè)置該用戶的權(quán)限。
使用rabbitmqctl客戶端工具,在根目錄下創(chuàng)建”/pyhtest”這個(gè)vhost:
rabbitmqctl add_vhost /pyhtest
創(chuàng)建一個(gè)用戶名”pyh”,設(shè)置密碼”pyh1234″:
rabbitmqctl add_user pyh pyh1234
設(shè)置pyh用戶對/pyhtest這個(gè)vhost擁有全部權(quán)限:
rabbitmqctl set_permissions -p /pyhtest pyh “.*” “.*” “.*”
后面三個(gè)”*”代表pyh用戶擁有對/pyhtest的配置、寫、讀全部權(quán)限
設(shè)置好后,開始編程,我用Perl寫一個(gè)消息投遞程序(producer):
#!/usr/bin/perl
use strict;
use Net::RabbitMQ;
use UUID::Tiny;
my $channel = 1000; # channel ID,可以隨意指定,只要不沖突
my $queuename = “pyh_queue”; # 隊(duì)列名
my $exchange = “pyh_exchange”; # 交換機(jī)名
my $routing_key = “test”; # routing key
my $mq = Net::RabbitMQ->new(); # 創(chuàng)建一個(gè)RabbitMQ對象
$mq->connect(“localhost”, { vhost => “/pyhtest”, user => “pyh”, password => “pyh1234″ }); # 建立連接
$mq->channel_open($channel); # 打開一個(gè)channel
$mq->exchange_declare($channel, $exchange, {durable => 1}); # 聲明一個(gè)持久化的交換機(jī)
$mq->queue_declare($channel, $queuename, {durable => 1}); # 聲明一個(gè)持久化的隊(duì)列
$mq->queue_bind($channel, $queuename, $exchange, $routing_key); # 使用routing key在交換機(jī)和隊(duì)列間建立綁定
for (my $i=0;$i<10000000;$i++) { # 循環(huán)1000萬次
my $string = create_UUID_as_string(UUID_V1); # 產(chǎn)生一條UUID作為消息主體
$mq->publish($channel, $routing_key, $string, { exchange => $exchange }, { delivery_mode => 2 }); # 將消息結(jié)合key以持久化模式投遞到交換機(jī)
}
$mq->disconnect(); # 斷開連接
消息接受程序(consumer)大概如下:
#!/usr/bin/perl
use strict;
use Net::RabbitMQ;
my $channel = 1001;
my $queuename = “pyh_queue”;
my $mq = Net::RabbitMQ->new();
$mq->connect(“localhost”, { vhost=>”/pyhtest”, user => “pyh”, password => “pyh1234″ });
$mq->channel_open($channel);
while (1) {
my $hashref = $mq->get($channel, $queuename);
last unless defined $hashref;
print $hashref->{message_count}, “: “, $hashref->{body},”n”;
}
$mq->disconnect();
consumer連接后只要指定隊(duì)列就可獲取到消息。
上述程序共投遞1000萬條消息,每條消息36字節(jié)(UUID),打開持久化,共耗時(shí)17分多鐘(包括產(chǎn)生UUID的時(shí)間),每秒投遞消息約9500條。測試機(jī)器是8G內(nèi)存、8核志強(qiáng)CPU。
投遞完后,在/var/lib/rabbitmq/mnesia/rabbit@${hostname}/msg_store_persistent目錄,產(chǎn)生2G多的持久化消息數(shù)據(jù)。在運(yùn)行consumer程序后,這些數(shù)據(jù)都會消失,因?yàn)橄⒁呀?jīng)被消費(fèi)了。