RabbitMQ是流行的開源消息隊(duì)列系統(tǒng),用erlang語言開發(fā),完整的實(shí)現(xiàn)了AMQP(高級消息隊(duì)列協(xié)議)。網(wǎng)站在:http://www.rabbitmq.com/上面有教程和實(shí)例代碼(Python和Java的)。
鹽湖網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)公司!從網(wǎng)頁設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、響應(yīng)式網(wǎng)站開發(fā)等網(wǎng)站項(xiàng)目制作,到程序開發(fā),運(yùn)營維護(hù)。創(chuàng)新互聯(lián)公司從2013年創(chuàng)立到現(xiàn)在10年的時(shí)間,我們擁有了豐富的建站經(jīng)驗(yàn)和運(yùn)維經(jīng)驗(yàn),來保證我們的工作的順利進(jìn)行。專注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)公司。AMPQ協(xié)議為了能夠滿足各種消息隊(duì)列需求,在概念上比較復(fù)雜。首先,rabbitMQ啟動(dòng)默認(rèn)是沒有任何配置的,需要客戶端連接上去,設(shè)置交換機(jī)等才能工作。不把這些基礎(chǔ)概念弄清楚,后面程序設(shè)計(jì)就容易產(chǎn)生問題。
1.vhosts:虛擬主機(jī)。
一個(gè)RabbitMQ的實(shí)體上可以有多個(gè)vhosts,用戶與權(quán)限設(shè)置就是依附于vhosts。對一般PHP應(yīng)用,不需要用戶權(quán)限設(shè)定,直接使用默認(rèn)就存在的"/"就可以了,用戶可以使用默認(rèn)就存在的"guest"。一個(gè)簡單的配置示例:
$conn_args = array( \'host\' => \'127.0.0.1\', \'port\' => \'5672\', \'login\' => \'guest\', \'password\' => \'guest\', \'vhost\'=>\'/\' );
2.connection與channel:連接與信道
connection是指物理的連接,一個(gè)client與一個(gè)server之間有一個(gè)連接;一個(gè)連接上可以建立多個(gè)channel,可以理解為邏輯上的連接。一般應(yīng)用的情況下,有一個(gè)channel就夠用了,不需要?jiǎng)?chuàng)建更多的channel。示例代碼:
//創(chuàng)建連接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker!n"); } $channel = new AMQPChannel($conn);
3.exchange與routingkey:交換機(jī)與路由鍵
為了將不同類型的消息進(jìn)行區(qū)分,設(shè)置了交換機(jī)與路由兩個(gè)概念。比如,將A類型的消息發(fā)送到名為‘C1’的交換機(jī),將類型為B的發(fā)送到\'C2\'的交換 機(jī)。當(dāng)客戶端連接C1處理隊(duì)列消息時(shí),取到的就只是A類型消息。進(jìn)一步的,如果A類型消息也非常多,需要進(jìn)一步細(xì)化區(qū)分,比如某個(gè)客戶端只處理A類型消息 中針對K用戶的消息,routingkey就是來做這個(gè)用途的。
$e_name = \'e_linvo\'; //交換機(jī)名 $k_route = array(0=> \'key_1\', 1=> \'key_2\'); //路由key //創(chuàng)建交換機(jī) $ex = new AMQPExchange($channel); $ex->setName($e_name); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型 $ex->setFlags(AMQP_DURABLE); //持久化 echo "Exchange Status:".$ex->declare()."n"; for($i=0; $i<5; ++$i){ echo "Send Message:".$ex->publish($message . date(\'H:i:s\'), $k_route[i%2])."n"; }
由以上代碼可以看到,發(fā)送消息時(shí),只要有“交換機(jī)”就夠了。至于交換機(jī)后面有沒有對應(yīng)的處理隊(duì)列,發(fā)送方是不用管的。routingkey可以是空的字符串。在示例中,我使用了兩個(gè)key交替發(fā)送消息,是為了下面更便于理解routingkey的作用。
對于交換機(jī),有兩個(gè)重要的概念:
A,類型。有三種類型:Fanout類型最簡單,這種模型忽略routingkey;Direct類型是使用最多的,使用確定的 routingkey。這種模型下,接收消息時(shí)綁定\'key_1\'則只接收key_1的消息;最后一種是Topic,這種模式與Direct類似,但是支 持通配符進(jìn)行匹配,比如:\'key_*\',就會(huì)接受key_1和key_2。Topic貌似美好,但是有可能導(dǎo)致不嚴(yán)謹(jǐn),所以還是推薦使用 Direct。
B,持久化。指定了持久化的交換機(jī),在重新啟動(dòng)時(shí)才能重建,否則需要客戶端重新聲明生成才行。
需要特別明確的概念:交換機(jī)的持久化,并不等于消息的持久化。只有在持久化隊(duì)列中的消息,才能持久化;如果沒有隊(duì)列,消息是沒有地方存儲(chǔ)的;消息本身在投遞時(shí)也有一個(gè)持久化標(biāo)志的,PHP中默認(rèn)投遞到持久化交換機(jī)就是持久的消息,不用特別指定。
4.queue:隊(duì)列
講了這么多,才講到隊(duì)列呀。事實(shí)上,隊(duì)列僅是針對接收方(consumer)的,由接收方根據(jù)需求創(chuàng)建的。只有隊(duì)列創(chuàng)建了,交換機(jī)才會(huì)將新接受到的 消息送到隊(duì)列中,交換機(jī)是不會(huì)在隊(duì)列創(chuàng)建之前的消息放進(jìn)來的。換句話說,在建立隊(duì)列之前,發(fā)出的所有消息都被丟棄了。下面這個(gè)圖比RabbitMQ官方的 圖更清楚——Queue是屬于ReceiveMessage的一部分。
接下來看一下創(chuàng)建隊(duì)列及接收消息的示例:
$e_name = \'e_linvo\'; //交換機(jī)名 $q_name = \'q_linvo\'; //隊(duì)列名 $k_route = \'\'; //路由key //創(chuàng)建連接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker!n"); } $channel = new AMQPChannel($conn); //創(chuàng)建交換機(jī) $ex = new AMQPExchange($channel); $ex->setName($e_name); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型 $ex->setFlags(AMQP_DURABLE); //持久化 echo "Exchange Status:".$ex->declare()."n"; //創(chuàng)建隊(duì)列 $q = new AMQPQueue($channel); $q->setName($q_name); $q->setFlags(AMQP_DURABLE); //持久化 //綁定交換機(jī)與隊(duì)列,并指定路由鍵 echo \'Queue Bind: \'.$q->bind($e_name, $k_route)."n"; //阻塞模式接收消息 echo "Message:n"; $q->consume(\'processMessage\', AMQP_AUTOACK); //自動(dòng)ACK應(yīng)答 $conn->disconnect(); /** * 消費(fèi)回調(diào)函數(shù) * 處理消息 */ function processMessage($envelope, $queue) { var_dump($envelope->getRoutingKey); $msg = $envelope->getBody(); echo $msg."n"; //處理消息 }
從上述示例中可以看到,交換機(jī)既可以由消息發(fā)送端創(chuàng)建,也可以由消息消費(fèi)者創(chuàng)建。
創(chuàng)建一個(gè)隊(duì)列(line:20)后,需要將隊(duì)列綁定到交換機(jī)上(line:25)隊(duì)列才能工作,routingkey也是在這里指定的。有的資料上寫成bindingkey,其實(shí)一回事兒,弄兩個(gè)名詞反倒容易混淆。
消息的處理,是有兩種方式:
A,一次性。用$q->get([...]),不管取到取不到消息都會(huì)立即返回,一般情況下使用輪詢處理消息隊(duì)列就要用這種方式;
B,阻塞。用$q->consum(callback,[...])程序會(huì)進(jìn)入持續(xù)偵聽狀態(tài),每收到一個(gè)消息就會(huì)調(diào)用callback指定的函數(shù)一次,直到某個(gè)callback函數(shù)返回FALSE才結(jié)束。
關(guān)于callback,這里多說幾句:PHP的call_back是支持使用數(shù)組的,比如:$c=newMyClass();$c-& gt;counter=100;$q->consume(array($c,\'myfunc\'))這樣就可以調(diào)用自己寫的處理類。 MyClass中myfunc的參數(shù)定義,與上例中processMessage一樣就行。
在上述示例中,使用的$routingkey=\'\',意味著接收全部的消息。我們可以將其改為$routingkey=\'key_1\',可以看到結(jié)果中僅有設(shè)置routingkey為key_1的內(nèi)容了。
注意:routingkey=\'key_1\'與routingkey=\'key_2\'是兩個(gè) 不同的隊(duì)列。假設(shè):client1與client2都連接到key_1的隊(duì)列上,一個(gè)消息被client1處理之后,就不會(huì)被client2 處理。而routingkey=\'\'是另類,client_all綁定到\'\'上,將消息全都處理后,client1和client2上也就沒 有消息了。
在程序設(shè)計(jì)上,需要規(guī)劃好exchange的名稱,以及如何使用key區(qū)分開不同類型的標(biāo)記,在消息產(chǎn)生的地方插入發(fā)送消息代碼。后端處理,可以針 對每一個(gè)key啟動(dòng)一個(gè)或多個(gè)client,以提高消息處理的實(shí)時(shí)性。如何使用PHP進(jìn)行多線程的消息處理,將在下一節(jié)中講述。