真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

RabbitMQ學(xué)習(xí)

一、概要

成都創(chuàng)新互聯(lián)專業(yè)為企業(yè)提供丹棱網(wǎng)站建設(shè)、丹棱做網(wǎng)站、丹棱網(wǎng)站設(shè)計、丹棱網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計與制作、丹棱企業(yè)網(wǎng)站模板建站服務(wù),十載丹棱做網(wǎng)站經(jīng)驗,不只是建網(wǎng)站,更提供有價值的思路和整體網(wǎng)絡(luò)服務(wù)。

官網(wǎng): http://www.rabbitmq.com/

一個不錯的入門教程: http://blog.csdn.net/linvo/article/details/5750987 寫的挺好的,只是剛開始看可能不太懂,模模糊糊,多看幾遍,試著寫點代碼之后,再看。就比較清晰了。

官方文檔使用了 using the pika 0.9.8 Python client 。本文使用 http://github.com/celery/py-amqp amqp 1.4.6

至于安裝,自己找下教程吧。不難,先安裝Erlang,再安裝RabbitMQ。然后配置一下,有個web控制臺。之后就是python編程使用了。

再加一個不錯的中文資料: http://blog.chinaunix.net/topic/surpershi/


MQ全稱為Message Queue, 消息隊列(MQ)是一種應(yīng)用程序?qū)?yīng)用程序的通信方法。RabbitMQ是流行的開源消息隊列系統(tǒng),用erlang語言開發(fā)。RabbitMQ是AMQP(高級消息隊列協(xié)議)的標準實現(xiàn)。

Broker:簡單來說就是消息隊列服務(wù)器實體。
Exchange:消息交換機,它指定消息按什么規(guī)則,路由到哪個隊列。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來。
Routing Key:路由關(guān)鍵字,exchange根據(jù)這個關(guān)鍵字進行消息投遞。
vhost:虛擬主機,一個broker里可以開設(shè)多個vhost,用作不同用戶的權(quán)限分離。
producer:消息生產(chǎn)者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務(wù)。

消息隊列的使用過程大概如下:

(1)客戶端連接到消息隊列服務(wù)器,打開一個channel。
(2)客戶端聲明一個exchange,并設(shè)置相關(guān)屬性。
(3)客戶端聲明一個queue,并設(shè)置相關(guān)屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關(guān)系。
(5)客戶端投遞消息到exchange。

exchange接收到消息后,就根據(jù)消息的key和已經(jīng)設(shè)置的binding,進行消息路由,將消息投遞到一個或多個隊列里。

exchange也有幾個類型,完全根據(jù)key進行投遞的叫做Direct交換機,例如,綁定時設(shè)置了 routing key為”abc”,那么客戶端提交的消息,只有設(shè)置了key為”abc”的才會投遞到隊列。對key進行模式匹配后進行投遞的叫做Topic交換機,符 號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。還 有一種不需要key的,叫做Fanout交換機,它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列。

RabbitMQ支持消息的持久化,也就是數(shù)據(jù)寫在磁盤上,為了數(shù)據(jù)安全考慮,我想大多數(shù)用戶都會選擇持久化。消息隊列持久化包括3個部分:
(1)exchange持久化,在聲明時指定durable => 1
(2)queue持久化,在聲明時指定durable => 1
(3)消息持久化,在投遞時指定delivery_mode => 2(1是非持久化)

如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。


二、基本使用

入門教程看會之后,就差不多了。

下面示例代碼:

consumer 消費者

# amqp_consumer.py

#-*-coding:utf-8-*- __author__=\'lpe234\' __date__=\'2014-12-15\' importamqp conn=amqp.Connection(host="localhost:5672",userid="guest",password="guest",virtual_host="/",insist=False) chan=conn.channel() chan.queue_declare(queue="po_box",durable=True,exclusive=False,auto_delete=False) chan.exchange_declare(exchange="sorting_room",type="direct",durable=True,auto_delete=False,) chan.queue_bind(queue="po_box",exchange="sorting_room",routing_key="1111") defreceive_callback(msg): print\'Received:\'+msg.body+\'fromchannel#\'+str(msg.channel.channel_id) chan.basic_consume(queue=\'po_box\',no_ack=True,callback=receive_callback,consumer_tag="consumer") whileTrue: chan.wait() chan.basic_cancel("consumer") chan.close() conn.close()

producer生產(chǎn)者

# amqp_publisher.py

#-*-coding:utf-8-*- __author__=\'lpe234\' __date__=\'2014-12-15\' importamqp importjson conn=amqp.Connection(host="localhost:5672",userid="guest",password="guest",virtual_host="/",insist=False) chan=conn.channel() forxinxrange(10): msg=json.dumps({\'id\':str(x)+\'111\',\'lists\':[{\'id\':12345},{\'id\':12345},{\'id\':15656},{\'id\':\'4545\'},]}) printmsg msg=amqp.Message(msg) msg.properties["delivery_mode"]=2 chan.basic_publish(msg,exchange="sorting_room",routing_key="1111") chan.close() conn.close()

代碼基本都是在csdn那個博客里面弄下來的。稍微的修改了以下。

啟動時,先運行consumer消費者進程,它會先連接,并創(chuàng)建Queue和Exchange ,然后一直等待隊列中的消息。

然后,啟動publisher ,它會先連接,然后向指定 Exchange 交換機推送帶有特定 routing_key路由鍵的消息。

如果消費者對應(yīng)的Queue隊列與Exchange交換機的routing_key路由鍵 相對應(yīng)的話。那么消費者就會接收到相應(yīng)消息。至此,整個傳遞過程結(jié)束。


三、補充

注釋代碼

#-*-coding:utf-8-*- __author__=\'lpe234\' __date__=\'2014-12-15\' importamqp """ amqprabbitmqDEMO測試 先啟動amqp_consumer.py消費者,創(chuàng)建 """ conn=amqp.Connection(host=\'localhost:5672\',userid=\'guest\',password=\'guest\',virtual_host=\'/\',insist=False) #每個channel都被分配了一個整數(shù)標識,自動由Connection()類的.channel()方法維護。可以使用.channel(x)來指定channel標識。 chan=conn.channel(channel_id=1) #當(dāng)多個channel_id相同時,實際為同一channel #現(xiàn)在已經(jīng)有了一個可用的連接和channel。 #現(xiàn)在將代碼分為兩類,生產(chǎn)者(producer)和消費者(consumer)。 #創(chuàng)建一個消費者程序,會創(chuàng)建一個"po_box"的隊列和一個叫"sorting_room"的交換機。 chan.queue_declare(queue=\'po_box\',durable=True,exclusive=False,auto_delete=False) chan.exchange_declare(exchange=\'sorting_room\',type=\'direct\',durable=True,auto_delete=False) #創(chuàng)建了"po_box"的隊列,durable重啟之后會重新建立,auto_delete=False最后一個消費者斷開之后不會自動刪除,exclusive私有隊列 #創(chuàng)建了"sorting_room"的交換機,type指定交換機類型, #現(xiàn)在已經(jīng)有了一個可以接收消息的隊列和一個可以發(fā)送消息的交換機。不過還需要創(chuàng)建一個綁定 chan.queue_bind(queue=\'po_box\',exchange=\'sorting_room\',routing_key=\'jason\') #這個綁定非常直接,任何送到交換機"sorting_room"的具有路由鍵"jason"的消息都被路由到"po_box"隊列 #現(xiàn)在有兩個方法,從隊列中取出消息。 #第一個是調(diào)用chan.basic_get(),主動從隊列中拉出下一條消息(若沒有則返回None) #msg=chan.basic_get(queue=\'po_box\') #ifmsg: #printmsg.body #chan.basic_ack(msg.delivery_tag) #第二種 defreceive_callback(msg): printmsg.body chan.basic_consume(queue=\'po_box\',no_ack=True,callback=receive_callback,consumer_tag=\'testtag\') whileTrue: chan.wait() chan.basic_cancel(\'testtag\') #chan.wait()放在無限循環(huán)里面,這個函數(shù)會等待在隊列上,知道下一個消息到達隊列。 #chan.basic_cancel()用來注銷該回調(diào)函數(shù) #no_ack這個參數(shù),可以傳給chan.basic_get(),chan.basic_consume。是否等待回饋,

其他的后續(xù)再補充吧


網(wǎng)頁題目:RabbitMQ學(xué)習(xí)
URL分享:http://weahome.cn/article/cpjpgi.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部