幾個概念說明:
創(chuàng)新互聯(lián)公司是一家集網(wǎng)站建設,云安企業(yè)網(wǎng)站建設,云安品牌網(wǎng)站建設,網(wǎng)站定制,云安網(wǎng)站建設報價,網(wǎng)絡營銷,網(wǎng)絡優(yōu)化,云安網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強企業(yè)競爭力。可充分滿足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時我們時刻保持專業(yè)、時尚、前沿,時刻以成就客戶成長自我,堅持不斷學習、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實用型網(wǎng)站。Broker:簡單來說就是消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什么規(guī)則,路由到哪個隊列。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來。
Routing Key:路由關鍵字,exchange根據(jù)這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離。
producer:消息生產(chǎn)者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。
消息隊列的使用過程大概如下:
(1)客戶端連接到消息隊列服務器,打開一個channel。
(2)客戶端聲明一個exchange,并設置相關屬性。
(3)客戶端聲明一個queue,并設置相關屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關系。
(5)客戶端投遞消息到exchange。
exchange接收到消息后,就根據(jù)消息的key和已經(jīng)設置的binding,進行消息路由,將消息投遞到一個或多個隊列里。
exchange也有幾個類型,完全根據(jù)key進行投遞的叫做Direct交換機,例如,綁定時設置了routing key為”abc”,那么客戶端提交的消息,只有設置了key為”abc”的才會投遞到隊列。對key進行模式匹配后進行投遞的叫做Topic交換機,符號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。還有一種不需要key的,叫做Fanout交換機,它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列。
RabbitMQ支持消息的持久化,也就是數(shù)據(jù)寫在磁盤上,為了數(shù)據(jù)安全考慮,我想大多數(shù)用戶都會選擇持久化。消息隊列持久化包括3個部分:
(1)exchange持久化,在聲明時指定durable => 1
(2)queue持久化,在聲明時指定durable => 1
(3)消息持久化,在投遞時指定delivery_mode => 2(1是非持久化)
如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。
1.必需掌握的指令
添加用戶:
rabbitmqctl add_user rainbird password
添加權限:
rabbitmqctl set_permissions -p "/" rainbird ".*" ".*" ".*"
刪除測試用戶:
rabbitmqctl delete_user guest
所有指令列表(很簡單的英文):
add_user
delete_user
change_password
list_users
add_vhost
delete_vhost
list_vhosts
set_permissions [-p
clear_permissions [-p
list_permissions [-p
list_user_permissions
list_queues [-p
list_exchanges [-p
list_bindings [-p
list_connections [
2.vhost / 不能刪除
刪除/以后,新建立的vhost不能正常使用(即便不刪除/,新建立的vhost也是不能正常使用).不知道為什么,有待研究.
3.關于持久化
示例里沒有一點兒和持久化相關的東東,而這卻是筆者最關心的,想想作為消息服務器如果不能保證消息一定被接收到,算什么事兒啊?比著網(wǎng)上狂轉的python版本從php-amqp的庫里一點一點兒翻,找到了如下持久化的設置:
接收端聲明隊列和交換機自動建立:
$ch->queue_declare($_QUEUE,false,true,false,false);
第三個參數(shù)設置true保證服務器重啟后,自動建立隊列
第五個參數(shù)設置成false防止接收端沒連接的時候丟失消息
$ch->exchange_declare($EXCHANGE, \'direct\', false, true, false);
第四個參數(shù)設置true保證重啟后,自動建立交換機
第五個參數(shù)設置false防止接收端斷開后,交換機被刪除
發(fā)布端聲明消息持久:
$message = new AMQPMessage(serialize($object), array(\'content_type\' => \'text/plain\', \'delivery_mode\' => 2));
同時滿足了上面三個條件,就可以保證未接收的消息在服務器意外重啟以后依然存在了.
4.持久化的后遺癥
比如說你初始化了一個隊列msgs.你會發(fā)現(xiàn)它真的持久了!每次服務器端重啟后,通過list_queues命令查看的時候都存在.但是時間久了,這個msgs我們并不需要了,怎么辦呢?筆者發(fā)現(xiàn),想清除這個隊列只能刪除它所在的vhost,然后再重建vhost,再設置vhost的權限.
rabbitmqctl delete_vhost /
rabbitmqctl add_vhost /
rabbitmqctl set_permissions -p / rainbird \'.*\' \'.*\' \'.*\'
要注意,如果這個操作過程中有接收端處于連接狀態(tài)它們不會自動斷開,但也不會再收到消息,需要手動重新連接一下.
5.關于修改監(jiān)聽ip和監(jiān)聽端口
出于一些需要,比如我們有多個ip,我們希望rabbitmq僅運行在指定的ip上.或者考慮到安全問題,我們希望修改一下rabbitmq的監(jiān)聽端口.默認安裝完成以后,在/etc下面會有一個rabbitmq的空目錄,這時候我們需要手工創(chuàng)建rabbitmq.conf,并寫入相關內(nèi)容.
vi /etc/rabbitmq/rabbitmq.conf
RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
RABBITMQ_NODE_PORT=2222
保存以后重啟服務就生效了.
這個東東網(wǎng)上又沒介紹,翻了半天+無限嘗試才搞出來.
6.關于運行接收端cpu100%問題
第一眼看到接收端會運行一個while等待消息的時候,筆者就知道這個進程肯定cpu占用會100%.在代碼里幾處while嘗試添加usleep無效后,筆者最后還是在官方的問題列表里找到了答案:
vi +286 amqp_wire.inc
293 while ($read < $n && (false !== ($buf = fread($this->sock, $n - $read))))
294 {
295 usleep(50000);
296 $read += strlen($buf);
297 $res .= $buf;
298 }
筆者的出發(fā)點是對的,只是沒找對while.可能有人會奇怪為什么要用usleep(50000)呢?實際上筆者有遇到運行php起來的daemon導致cpu100%的情況.當時筆者加的是usleep(500000)也就是半秒鐘.這樣就可以使進程看上去cpu占用為0.沒想到再降一個數(shù)量級也是可以正常的,這次算賺到了.
7.學到了error_log函數(shù)
以前有見過這個函數(shù),以為是向系統(tǒng)日志里寫log的時候才用得到呢,沒想到還可以像下面這樣用:
function debug_msg($s)
{
//error_log($s);
}
在不同的地方寫上debug_msg,最后不用的時候時候,直接注釋掉error_log,不錯的小技巧!
RabbitMQ將消息投遞到客戶端后,客戶端如果沒處理完這個消息就死掉了,這個消息還會不會存在?這取決于RabbitMQ的消息確認機制(Message acknowledgment)是否打開。
為了確保消息不會丟失,RabbitMQ支持消息確認機制。客戶端在接受到消息并處理完后,可以發(fā)送一個ack消息給RabbitMQ,告訴它該消息可以安全的刪除了。假如客戶端在發(fā)送ack之前意外死掉了,那么RabbitMQ會將消息投遞到下一個consumer客戶端。如果有多個consumer客戶端,RabbitMQ在投遞消息時是輪詢的。
RabbitMQ如何判斷客戶端死掉了?唯一根據(jù)是客戶端連接是否斷開。這里沒有超時機制,也就是說客戶端可以處理一個消息很長時間,只要沒斷開連接,RabbitMQ就一直等待ack消息。
消息確認機制默認是打開的,除非你設置no_ack=True標記來手工關閉它。
通過如下命令查看系統(tǒng)里的未確認消息:
# rabbitmqctl list_queues -p /path name messages_unacknowledged
Listing queues …
queue_storm_actionlog 0
dev_queue_storm_actionlog 0
…done.