(一)基本概念
成都創(chuàng)新互聯(lián)公司專注于瀏陽網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗。 熱誠為您提供瀏陽營銷型網(wǎng)站建設(shè),瀏陽網(wǎng)站制作、瀏陽網(wǎng)頁設(shè)計、瀏陽網(wǎng)站官網(wǎng)定制、重慶小程序開發(fā)公司服務(wù),打造瀏陽網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供瀏陽網(wǎng)站排名全網(wǎng)營銷落地服務(wù)。RabbitMQ 是流行的開源消息隊列系統(tǒng),用 erlang 語言開發(fā)。我曾經(jīng)對這門語言挺有興趣,學(xué)過一段時間,后來沒堅持。RabbitMQ 是 AMQP(高級消息隊列協(xié)議)的標(biāo)準(zhǔn)實現(xiàn)。如果不熟悉 AMQP,直接看 RabbitMQ 的文檔會比較困難。不過它也只有幾個關(guān)鍵概念,這里簡單介紹。
RabbitMQ 的結(jié)構(gòu)圖如下:
幾個概念說明:
Broker:簡單來說就是消息隊列服務(wù)器實體。
Exchange:消息交換機,它指定消息按什么規(guī)則,路由到哪個隊列。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把 exchange 和 queue 按照路由規(guī)則綁定起來。
Routing Key:路由關(guān)鍵字,exchange 根據(jù)這個關(guān)鍵字進行消息投遞。
vhost:虛擬主機,一個 broker 里可以開設(shè)多個 vhost,用作不同用戶的權(quán)限分離。
producer:消息生產(chǎn)者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個 channel,每個 channel 代表一個會話任務(wù)。
消息隊列的使用過程大概如下:
(1)客戶端連接到消息隊列服務(wù)器,打開一個 channel 。
(2)客戶端聲明一個 exchange,并設(shè)置相關(guān)屬性。
(3)客戶端聲明一個 queue,并設(shè)置相關(guān)屬性。
(4)客戶端使用 routing key(本質(zhì)上是 binding key,后文已做相應(yīng)糾正),在 exchange 和 queue 之間建立好綁定關(guān)系。
(5)客戶端投遞消息到 exchange 。
exchange 接收到消息后,就根據(jù)消息的 routing key 和已經(jīng)設(shè)置的 binding 關(guān)系,進行消息路由,將消息投遞到一個或多個隊列里。
exchange 也有幾個類型:
完全根據(jù) routing key 進行投遞的叫做 direct 交換機。例如,綁定時設(shè)置了 binding key 為 "abc",那么客戶端提交的消息,只有設(shè)置了routing key 為 "abc" 的才會被投遞到該隊列。 對 routing key 進行模式匹配后進行投遞的叫做 topic 交換機。符號 "#" 匹配一個或多個詞,符號 "*" 匹配正好一個詞。例如 "abc.#" 可以匹配 "abc.def.ghi" ,"abc.*" 只能匹配 "abc.def" 。 還有一種不需要 routing key 的,叫做 fanout 交換機。它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列。RabbitMQ 支持消息的持久化,也就是數(shù)據(jù)寫在磁盤上,為了數(shù)據(jù)安全考慮,我想大多數(shù)用戶都會選擇持久化。消息隊列持久化包括 3 個部分:
(1)exchange 持久化,在聲明時指定 durable => 1
(2)queue 持久化,在聲明時指定 durable => 1
(3)消息持久化,在投遞時指定 delivery_mode => 2(1 是非持久化)
如果 exchange 和 queue 都是持久化的,那么它們之間的 binding 也是持久化的。
(二)應(yīng)用實際
使用 Linux 服務(wù)器(ubuntu 9.10 64位),安裝 RabbitMQ 非常方便。
先運行如下命令安裝 erlang:
# apt-get install erlang-nox
再到rabbitmq.com上下載RabbitMQ的安裝包,如下安裝:
# dpkg -i rabbitmq-server_2.6.1-1_all.deb
安裝完后,使用
# /etc/init.d/rabbitmq-server start|stop|restart
來啟動、停止、重啟 rabbitmq。
在正式應(yīng)用之前,我們先在 RabbitMQ 里創(chuàng)建一個 vhost ,加一個用戶,并設(shè)置該用戶的權(quán)限。
使用 rabbitmqctl 客戶端工具,在根目錄下創(chuàng)建 "/pyhtest" 這個 vhost :
# rabbitmqctl add_vhost /pyhtest
創(chuàng)建一個用戶名 "pyh" ,設(shè)置密碼 "pyh1234" :
# rabbitmqctl add_user pyh pyh1234
設(shè)置pyh用戶對/pyhtest這個vhost擁有全部權(quán)限:
# rabbitmqctl set_permissions -p /pyhtest pyh “.*” “.*” “.*”
后面三個”*”代表pyh用戶擁有對/pyhtest的配置、寫、讀全部權(quán)限
設(shè)置好后,開始編程,我用 Perl 寫一個消息投遞程序(producer):
#!/usr/bin/perl use strict; use Net::RabbitMQ; use UUID::Tiny; my $channel = 1000; # channel ID,可以隨意指定,只要不沖突 my $queuename = “pyh_queue”; # 隊列名 my $exchange = “pyh_exchange”; # 交換機名 my $routing_key = “test”; # routing key my $mq = Net::RabbitMQ->new(); # 創(chuàng)建一個RabbitMQ對象 $mq->connect(“localhost”, { vhost => “/pyhtest”, user => “pyh”, password => “pyh1234″ }); # 建立連接 $mq->channel_open($channel); # 打開一個channel $mq->exchange_declare($channel, $exchange, {durable => 1}); # 聲明一個持久化的交換機 $mq->queue_declare($channel, $queuename, {durable => 1}); # 聲明一個持久化的隊列 $mq->queue_bind($channel, $queuename, $exchange, $routing_key); # 使用routing key在交換機和隊列間建立綁定 for (my $i=0;$i<10000000;$i++) { # 循環(huán)1000萬次 my $string = create_UUID_as_string(UUID_V1); # 產(chǎn)生一條UUID作為消息主體 $mq->publish($channel, $routing_key, $string, { exchange => $exchange }, { delivery_mode => 2 }); # 將消息結(jié)合key以持久化模式投遞到交換機 } $mq->disconnect(); # 斷開連接
消息接受程序(consumer)大概如下:
#!/usr/bin/perl use strict; use Net::RabbitMQ; my $channel = 1001; my $queuename = “pyh_queue”; my $mq = Net::RabbitMQ->new(); $mq->connect(“localhost”, { vhost=>”/pyhtest”, user => “pyh”, password => “pyh1234″ }); $mq->channel_open($channel); while (1) { my $hashref = $mq->get($channel, $queuename); last unless defined $hashref; print $hashref->{message_count}, “: “, $hashref->{body},”n”; } $mq->disconnect();
consumer 連接后只要指定隊列就可獲取到消息。
上述程序共投遞 1000 萬條消息,每條消息 36 字節(jié)(UUID),打開持久化,共耗時 17 分多鐘(包括產(chǎn)生 UUID 的時間),每秒投遞消息約 9500 條。測試機器是 8G 內(nèi)存、8 核志強 CPU 。
投遞完后,在 /var/lib/rabbitmq/mnesia/rabbit@${hostname}/msg_store_persistent 目錄,產(chǎn)生 2G 多的持久化消息數(shù)據(jù)。在運行 consumer 程序后,這些數(shù)據(jù)都會消失,因為消息已經(jīng)被消費了。