RabbitMQ是一種消息隊列,與線程queue和進程QUEUE作用是一樣的。
RabbitMQ是一個中間程序,可以實現(xiàn)不同進程之間的通信(比如python和Java之間,QQ和Word之間等);
普通情況下A進程與B進程之間通信,兩者之間需要建立很多連接和單獨寫一些代碼,但是使用RabbitMQ的話就可以實現(xiàn)幫助不同進程之間的數(shù)據(jù)通信。
A進程交給RabbitMQ,RabbitMQ在交給B,同樣B交給RabbitMQ,RabbitMQ在交給A,RabbitMQ可以實現(xiàn)A與B進程之間的連接和信息轉換。
使用RabbitMQ可以實現(xiàn)很多個獨立進程之間的交互,所有其他獨立進程都可以用RabbitMQ作為中間程序。
創(chuàng)新互聯(lián)建站專注于淮南企業(yè)網(wǎng)站建設,成都響應式網(wǎng)站建設公司,商城建設。淮南網(wǎng)站建設公司,為淮南等地區(qū)提供建站服務。全流程定制開發(fā),專業(yè)設計,全程項目跟蹤,創(chuàng)新互聯(lián)建站專業(yè)和態(tài)度為您提供的服務
py 消息隊列:
線程 queue(同一進程下線程之間進行交互)
進程 Queue(父子進程進行交互 或者 同屬于同一進程下的多個子進程進行交互)
如果是兩個完全獨立的python程序,也是不能用上面兩個queue進行交互的,或者和其他語言交互有哪些實現(xiàn)方式呢。
【Disk、Socket、其他中間件】這里中間件不僅可以支持兩個程序之間交互,可以支持多個程序,可以維護好多個程序的隊列。
雖然可以通過硬盤的方式實現(xiàn)多個獨立進程交互,但是硬盤速度比較慢,而RabbitMQ則能夠很好的、快速的幫助兩個獨立進程實現(xiàn)交互。
像這種公共的中間件有好多成熟的產(chǎn)品:
RabbitMQ
ZeroMQ
ActiveMQ
……
RabbitMQ:erlang語言 開發(fā)的。
Python中連接RabbitMQ的模塊:pika 、Celery(分布式任務隊列) 、haigha
可以維護很多的隊列
其中pika是RabbitMQ常用的模塊
RabbitMQ 教程官網(wǎng):http://www.rabbitmq.com/getstarted.html
幾個概念說明:
Broker:簡單來說就是消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什么規(guī)則,路由到哪個隊列。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來。
Routing Key:路由關鍵字,exchange根據(jù)這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離。
producer:消息生產(chǎn)者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務
RabbitMQ不像之前學的python Queue都在一個隊列里實現(xiàn)交互,RabbitMQ有多個隊列(圖中紅色部分代表隊列),每個隊列都可以將消息發(fā)給多個接收端(C是接收端,P是生產(chǎn)消息端)
1、Rabbitmq 安裝
Windos系統(tǒng)
pip install pika
ubuntu系統(tǒng)
install rabbitmq-server # 直接搞定
以下centos系統(tǒng)
1)Install Erlang
# For EL5:
rpm -Uvh http://download.fedoraproject.org/pub/epel/5/i386/epel-release-5-4.noarch.rpm
# For EL6:
rpm -Uvh http://download.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
# For EL7:
rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpm
yum install erlang
2)Install RabbitMQ Server
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
yum install rabbitmq-server-3.6.5-1.noarch.rpm
3)use RabbitMQ Server
chkconfig rabbitmq-server on
service rabbitmq-server stop/start
或者
rabbitmq-server start
rabbitmq已經(jīng)開啟,等待傳輸
2、基本示例
發(fā)送端 producer
import pika
# 建立一個實例;相當于建立一個socket。
#通過ctrl+ConnectionParameters可以看到能傳很多參數(shù),如果遠程還可以傳用戶名密碼。
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost',5672) # 默認端口5672,可不寫
)
# 聲明一個管道,在管道里發(fā)消息
channel = connection.channel()
# 在管道里聲明一個叫hello的queue
channel.queue_declare(queue='hello')
# RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
routing_key='hello', # queue名字,將消息發(fā)給hello這個queue
body='Hello World!') # 消息內容
print(" [x] Sent 'Hello World!'")
connection.close() # 發(fā)完消息后關閉隊列
執(zhí)行結果:
[x] Sent 'Hello World!'
注意一定要開啟rabbitmq,否則會報錯
接收端 consumer
import pika
import time
# 建立實例
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
# 聲明管道
channel = connection.channel()
# 為什么又聲明了一個‘hello’隊列?
# 如果這個queue確定已經(jīng)聲明了,可以不聲明。但是你不知道是生產(chǎn)者還是消費者先運行,所以要聲明兩次。如果消費者沒聲明,且消費者先運行的話,就會報錯。
# 生產(chǎn)者先聲明queue,消費者不聲明,但是消費者后運行就不會報錯。
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body): # 四個參數(shù)為標準格式
print(ch, method, properties) # 打印看一下是什么
# ch是管道內存對象地址;method是內容相關信息 properties后面講 body消息內容
print(" [x] Received %r" % body)
#time.sleep(15)
#ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume( # 消費消息
'hello', # 你要從哪個隊列里收消息
callback, # 如果收到消息,就調用callback函數(shù)來處理消息 # 注意調用的函數(shù)(callback)以前在basic_consume模塊是放在形參第一個位置的,后面修改到第二個位置了,如果放錯位置會報錯
# auto_ack=True # 寫的話,如果接收消息,機器宕機消息就丟了
# 一般不寫。宕機則生產(chǎn)者檢測到發(fā)給其他消費者
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 開始消費消息(開始接收消息,一直收,如果沒消息就卡主在這里了)
執(zhí)行結果:
[*] Waiting for messages. To exit press CTRL+C
params=>>>
[x] Received b'Hello World!'
收到了bytes格式的 Hello World!
消費者(接收端)這邊看到已經(jīng)卡主了
如果此時單獨在運行一下生產(chǎn)者(發(fā)送端),直接可以從消費者看到新收到的消息
重新開啟rabbitmq
運行三個接收者(消費者)
運行發(fā)送者,可以看到被第一個接收者給收到信息了
第二次運行發(fā)送者,第二個接收者收到信息了
第三次運行發(fā)送者,第三個接收者收到信息了
上面幾次運行說明了,依次的將信息發(fā)送每一個接收者
接收端 consumer
import pika
import time
# 建立實例
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
# 聲明管道
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(ch, method, properties)
print(" [x] Received %r" % body)
# 正?;卣{函數(shù)(callback)執(zhí)行完成就表示信息接收完成,如果在還沒執(zhí)行完成時就出現(xiàn)異常就表示信息沒有正常接收,比如斷網(wǎng)、斷電等,會導致信息不能正常接收。
# 下面sleep 60秒,在60秒之前就將該模塊終止執(zhí)行來模擬異常情況。
time.sleep(60)
#ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(
'hello',
callback,
# auto_ack=True 表示不管消息是否接收(處理)完成,都不會回復確認消息
# 如果producer不關心 comsumer是否處理完,可以使用該參數(shù)
# 但是一般都不會使用它
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #
在centos中重新執(zhí)行rabbitmq-server start來清空隊列里的消息
然后在pycharm開啟三個comsumer,在去運行等待接收消息
再去執(zhí)行producer來發(fā)送消息,執(zhí)行producer后,立即關閉第一個comsumer,這樣消息就會因為第一個comsumer沒接收成功跑到第二個comsumer去,以此類推。
關閉第二個comsumer,第三個comsumer收到信息
這張圖是將三個comsumer同時都關閉了,這樣三個comsumer都收不到消息,說明producer的消息沒有被接收,此時再去開啟第一個comsumer,這時第一個comsumer會將消息給接收過來。
我們將sleep注釋掉,也是這種現(xiàn)象,這是因為comsumer并沒有發(fā)送確認消息給producer
import pika
import time
# 建立實例
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
# 聲明管道
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(ch, method, properties)
print(" [x] Received %r" % body)
time.sleep(30)
ch.basic_ack(delivery_tag = method.delivery_tag) # 告訴生成者,消息處理完成
channel.basic_consume(
'hello',
callback,
# auto_ack=True
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #
此時的代碼:當其中一個comsumer執(zhí)行完成,并發(fā)送確認消息后再去中斷,下一個comsumer就不會收到消息;反之,如果還沒發(fā)送確認消息就中斷了,那么消息就會被下一個comsumer接收到。
如果producer端宕機,那么隊列的數(shù)據(jù)也會消失;這樣就需要讓隊列消息持久化
# durable=True 該代碼只是將生成的隊列持久化(不是消息),如果producer宕機,隊列會存在,單消息會丟
# 要注意需要在producer端和 comsumer端都要 寫durable=True
channel.queue_declare(queue='hello',durable=True)
在centos重新開啟 rabbitmq-server start
在producer端
將producer代碼執(zhí)行三次,將三個消息放入隊列
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost',5672)
)
channel = connection.channel()
channel.queue_declare(queue='hello',durable=True)
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!',
# 下面的代碼是讓消息持久化
properties = pika.BasicProperties(delivery_mode=2)
)
print(" [x] Sent 'Hello World!'")
connection.close()
將producer代碼執(zhí)行三次,將三個消息放入隊列
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello',durable=True)
def callback(ch, method, properties, body):
print(ch, method, properties)
print(" [x] Received %r" % body)
# time.sleep(30) #注釋掉
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(
'hello',
callback
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #
可以看到因為producer執(zhí)行了三次,所以運行comsumer端收到了三條消息
producer端沒有改變
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost',5672)
)
channel = connection.channel()
channel.queue_declare(queue='hello',durable=True)
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!',
properties = pika.BasicProperties(delivery_mode=2)
)
print(" [x] Sent 'Hello World!'")
connection.close()
comsumer 1(消費者:1)
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello',durable=True)
def callback(ch, method, properties, body):
print(ch, method, properties)
print(" [x] Received %r" % body)
# time.sleep(30) #注釋掉
ch.basic_ack(delivery_tag = method.delivery_tag)
# channel.basic_qos可以使其消費者最多同時多少個消息;如果其中一個消費者處理慢(如:CPU處理性能低下),達到了最多處理的限制的話 生產(chǎn)者就不會再發(fā)送給該消費者。
channel.basic_qos(prefetch_count=1) #這里限制最多同時只處理1個消息
channel.basic_consume(
'hello',
callback
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #
此時有兩個comsumer模塊,comsumer2比comsumer1多用了sleep 30秒來模擬性能處理慢的情況
comsumer 2(消費者:2)
復制一個comsumer模塊為comsumer2
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello',durable=True)
def callback(ch, method, properties, body):
print(ch, method, properties)
print(" [x] Received %r" % body)
time.sleep(30) #comsumer2這里sleep30秒
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
'hello',
callback
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #
我們運行兩個comsumer后,一直去運行producer。 可以看到comsumer 1接收到了3條信息,而comsumer 2只接收到了1條信息,這是因為comsumer 2 sleep了30秒來模擬信息處理慢的情況;
comsumer 1 和 comsumer 2都指定了同時只能處理1條信息,producer會與comsumer 2協(xié)商,因為comsumer2一直沒有處理完限制的1條信息,所以信息都被comsumer 1處理了。
新建fanout_publiser模塊,用于發(fā)送廣播的producer
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
# 定義一個轉發(fā)器叫l(wèi)ogs,屬于一個中間人的角色,用于將producer的消息轉發(fā)給消費者(comsumer)
# 定義廣播類型使用fanout
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# message = ''.join(sys.argv[1:]) or "info: Hello World!"
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message) # routing_key為空即可,因為是廣播沒有定義隊列,所以也不需要指定隊列,但這里必須要定義為空
print(" [x] Send %r " % message)
connection.close()
新建fanout_consumer模塊,用于接收廣播的消費者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost') )
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費斷開后,自動將queue刪除
# 就是這里會隨機生成一個隨機的唯一queue,用完之后會將生成的queue刪除
# 這里要寫queue='',如果不指定隊列名字,但也要寫一個空的字符串,不然會報錯缺少參數(shù)
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue # 拿到隨機生成的queue名字
# producer綁定了logs轉發(fā)器
# 消費者將隨機生成的隊列也綁定了logs轉發(fā)器
# 這樣producer將消息交給logs轉發(fā)器,logs轉發(fā)器將消息交給對應綁定的隨機隊列,消費者從隊列里在拿消息
channel.queue_bind(exchange='logs',queue=queue_name)
print('[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print("[x] %r" % body)
channel.basic_consume(
queue=queue_name, on_message_callback=callback
# auto_ack=True # 寫的話,如果接收消息,機器宕機消息就丟了
)
channel.start_consuming()
因為是廣播,所以兩個consumer都收到了發(fā)送者發(fā)送的消息。
不過有一點要注意!?。。。。。。?!
要先運行consumer(接收者),在運行發(fā)送者。就好比收音機一樣,只有你先打開收音機,發(fā)送者才能將信息發(fā)給你。 如果發(fā)送者先發(fā)送,你卻沒有接收,之前發(fā)送的信息,你就不會再接收到了。
direct 可以區(qū)分廣播,將指定的消息發(fā)送給指定的接收者;
圖中顯示了將error級別消息發(fā)送給C1,將info、error、warning級別消息發(fā)送給C2。
producer
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
# 定義消息級別
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ''.join(sys.argv[2:]) or "direct info: Hello World!"
# message = "direct info: Hello World!"
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message) # routing_key為空即可,因為是廣播沒有定義隊列,所以也不需要指定隊列,但這里必須要定義為空
print(" [x] Send %r " % message)
connection.close()
consumer
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost') )
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
# 獲取參數(shù)列表
log_levels = sys.argv[1:]
if not log_levels: # 如果沒有參數(shù),就報錯,提示要指定消息級別
sys.stderr.write("Usage: %s [info] [warning] [error] \n" % sys.argv[0])
sys.exit(1) # 沒有參數(shù)就退出程序
# print(log_levels)
for severity in log_levels: # 循環(huán)參數(shù)列表并綁定
channel.queue_bind(
exchange='direct_logs',
queue=queue_name,
routing_key=severity
) #所有發(fā)送到severity的參數(shù),都接收
print('[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print('[x] %r:%r' % (method.routing_key, body))
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True
)
channel.start_consuming()
下面在centos中運行代碼
運行C1,只接收error的消息
運行C2,接收 info、warning、error的消息
producer運行,指定發(fā)送消息給error,可以看到兩個consumer都接收到了error的消息
只有C2接收到了warning的消息
producer
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
# 定義消息級別
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' # 發(fā)送*.info的信息
message = ''.join(sys.argv[2:]) or "topic info: Hello World!"
# message = "direct info: Hello World!"
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Send %r:%r " % (routing_key,message))
connection.close()
consumer
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost') )
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write('Usage: %s [binding_key]...\n' % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(
exchange='topic_logs',
queue=queue_name,
routing_key=binding_key
)
print('[*] Waiting for logs. To exit press CTRL+c')
def callback(ch, method, properties, body):
print('[x %r:%r' % (method.routing_key, body))
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True
)
channel.start_consuming()
圖中顯示過濾中間有".orange."的數(shù)據(jù),過濾以rabbit為結尾的數(shù)據(jù),過濾以lazy開頭的數(shù)據(jù)。
運行了兩個consumer。C1接收.info為結尾的數(shù)據(jù),C2接收.error為結尾和MySQL為開頭的數(shù)據(jù)。
在運行publisher(已經(jīng)定義了發(fā)送anonymous.info,相當于以.info為結尾的信息)
C1接收到了信息
執(zhí)行publisher代碼時 后面加上 test.error,然后此時在去看C2
C2 看到test.error相關信息
執(zhí)行publisher代碼 加上 mysql.info,這樣C1和C2都可以收到消息了
運行C3,代碼后面加一個 '#' 符號,表示C3可以接收所有信息(注意#號要被引號括起來)
在publisher隨意發(fā)送信息,C3都能收到