真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Python38RabbitMQ消息隊列


title: Python38 RabbitMQ
tags: Python學習
grammar_cjkRuby: true


RabbitMQ 消息隊列介紹

RabbitMQ是一種消息隊列,與線程queue和進程QUEUE作用是一樣的。
RabbitMQ是一個中間程序,可以實現(xiàn)不同進程之間的通信(比如python和Java之間,QQ和Word之間等);
普通情況下A進程與B進程之間通信,兩者之間需要建立很多連接和單獨寫一些代碼,但是使用RabbitMQ的話就可以實現(xiàn)幫助不同進程之間的數(shù)據(jù)通信。
A進程交給RabbitMQ,RabbitMQ在交給B,同樣B交給RabbitMQ,RabbitMQ在交給A,RabbitMQ可以實現(xiàn)A與B進程之間的連接和信息轉換。
使用RabbitMQ可以實現(xiàn)很多個獨立進程之間的交互,所有其他獨立進程都可以用RabbitMQ作為中間程序。

創(chuàng)新互聯(lián)建站專注于淮南企業(yè)網(wǎng)站建設,成都響應式網(wǎng)站建設公司,商城建設。淮南網(wǎng)站建設公司,為淮南等地區(qū)提供建站服務。全流程定制開發(fā),專業(yè)設計,全程項目跟蹤,創(chuàng)新互聯(lián)建站專業(yè)和態(tài)度為您提供的服務

py 消息隊列:
線程 queue(同一進程下線程之間進行交互)
進程 Queue(父子進程進行交互 或者 同屬于同一進程下的多個子進程進行交互)

如果是兩個完全獨立的python程序,也是不能用上面兩個queue進行交互的,或者和其他語言交互有哪些實現(xiàn)方式呢。
【Disk、Socket、其他中間件】這里中間件不僅可以支持兩個程序之間交互,可以支持多個程序,可以維護好多個程序的隊列。
雖然可以通過硬盤的方式實現(xiàn)多個獨立進程交互,但是硬盤速度比較慢,而RabbitMQ則能夠很好的、快速的幫助兩個獨立進程實現(xiàn)交互。

像這種公共的中間件有好多成熟的產(chǎn)品:
RabbitMQ
ZeroMQ
ActiveMQ
……

RabbitMQ:erlang語言 開發(fā)的。
Python中連接RabbitMQ的模塊:pika 、Celery(分布式任務隊列) 、haigha
可以維護很多的隊列
其中pika是RabbitMQ常用的模塊

RabbitMQ 教程官網(wǎng):http://www.rabbitmq.com/getstarted.html

幾個概念說明:

Broker:簡單來說就是消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什么規(guī)則,路由到哪個隊列。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來。
Routing Key:路由關鍵字,exchange根據(jù)這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離。
producer:消息生產(chǎn)者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務

Python38 RabbitMQ 消息隊列
RabbitMQ不像之前學的python Queue都在一個隊列里實現(xiàn)交互,RabbitMQ有多個隊列(圖中紅色部分代表隊列),每個隊列都可以將消息發(fā)給多個接收端(C是接收端,P是生產(chǎn)消息端)

RabbitMQ基本示例.

1、Rabbitmq 安裝

Windos系統(tǒng)

pip install pika

ubuntu系統(tǒng)

install  rabbitmq-server  # 直接搞定

以下centos系統(tǒng)
1)Install Erlang

# For EL5:
rpm -Uvh http://download.fedoraproject.org/pub/epel/5/i386/epel-release-5-4.noarch.rpm
# For EL6:
rpm -Uvh http://download.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
# For EL7:
rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpm

yum install erlang

2)Install RabbitMQ Server

rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
yum install rabbitmq-server-3.6.5-1.noarch.rpm

3)use RabbitMQ Server

chkconfig rabbitmq-server on
service rabbitmq-server stop/start
或者
rabbitmq-server start

Python38 RabbitMQ 消息隊列

rabbitmq已經(jīng)開啟,等待傳輸

2、基本示例

發(fā)送端 producer

import pika

# 建立一個實例;相當于建立一個socket。
#通過ctrl+ConnectionParameters可以看到能傳很多參數(shù),如果遠程還可以傳用戶名密碼。
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost',5672)  # 默認端口5672,可不寫
    )
# 聲明一個管道,在管道里發(fā)消息
channel = connection.channel()
# 在管道里聲明一個叫hello的queue
channel.queue_declare(queue='hello')
# RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
                      routing_key='hello',  # queue名字,將消息發(fā)給hello這個queue
                      body='Hello World!')  # 消息內容
print(" [x] Sent 'Hello World!'")
connection.close()  # 發(fā)完消息后關閉隊列 

執(zhí)行結果:

[x] Sent 'Hello World!'

注意一定要開啟rabbitmq,否則會報錯

接收端 consumer

import pika
import time

# 建立實例
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
# 聲明管道
channel = connection.channel()

# 為什么又聲明了一個‘hello’隊列?
# 如果這個queue確定已經(jīng)聲明了,可以不聲明。但是你不知道是生產(chǎn)者還是消費者先運行,所以要聲明兩次。如果消費者沒聲明,且消費者先運行的話,就會報錯。
# 生產(chǎn)者先聲明queue,消費者不聲明,但是消費者后運行就不會報錯。
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):  # 四個參數(shù)為標準格式
    print(ch, method, properties)  # 打印看一下是什么
    # ch是管道內存對象地址;method是內容相關信息  properties后面講  body消息內容
    print(" [x] Received %r" % body)
    #time.sleep(15)
    #ch.basic_ack(delivery_tag = method.delivery_tag)  

channel.basic_consume(  # 消費消息
        'hello',  # 你要從哪個隊列里收消息 
        callback,  # 如果收到消息,就調用callback函數(shù)來處理消息  # 注意調用的函數(shù)(callback)以前在basic_consume模塊是放在形參第一個位置的,后面修改到第二個位置了,如果放錯位置會報錯
        # auto_ack=True  # 寫的話,如果接收消息,機器宕機消息就丟了
        # 一般不寫。宕機則生產(chǎn)者檢測到發(fā)給其他消費者
        )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 開始消費消息(開始接收消息,一直收,如果沒消息就卡主在這里了)

執(zhí)行結果:
 [*] Waiting for messages. To exit press CTRL+C
 params=>>>  
 [x] Received b'Hello World!'

收到了bytes格式的 Hello World!

Python38 RabbitMQ 消息隊列

消費者(接收端)這邊看到已經(jīng)卡主了

Python38 RabbitMQ 消息隊列

如果此時單獨在運行一下生產(chǎn)者(發(fā)送端),直接可以從消費者看到新收到的消息


rabbitmq 消息分發(fā)輪詢

Python38 RabbitMQ 消息隊列
重新開啟rabbitmq

Python38 RabbitMQ 消息隊列
運行三個接收者(消費者)

Python38 RabbitMQ 消息隊列
運行發(fā)送者,可以看到被第一個接收者給收到信息了

Python38 RabbitMQ 消息隊列
第二次運行發(fā)送者,第二個接收者收到信息了

Python38 RabbitMQ 消息隊列
第三次運行發(fā)送者,第三個接收者收到信息了

上面幾次運行說明了,依次的將信息發(fā)送每一個接收者


接收端 consumer

import pika
import time

# 建立實例
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
# 聲明管道
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):  
    print(ch, method, properties) 

    print(" [x] Received %r" % body)
    # 正?;卣{函數(shù)(callback)執(zhí)行完成就表示信息接收完成,如果在還沒執(zhí)行完成時就出現(xiàn)異常就表示信息沒有正常接收,比如斷網(wǎng)、斷電等,會導致信息不能正常接收。
    # 下面sleep 60秒,在60秒之前就將該模塊終止執(zhí)行來模擬異常情況。
    time.sleep(60)  
    #ch.basic_ack(delivery_tag = method.delivery_tag)  

channel.basic_consume(  
        'hello',  
        callback, 
        # auto_ack=True 表示不管消息是否接收(處理)完成,都不會回復確認消息
        # 如果producer不關心 comsumer是否處理完,可以使用該參數(shù)
        # 但是一般都不會使用它

        )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 

Python38 RabbitMQ 消息隊列
在centos中重新執(zhí)行rabbitmq-server start來清空隊列里的消息
然后在pycharm開啟三個comsumer,在去運行等待接收消息
再去執(zhí)行producer來發(fā)送消息,執(zhí)行producer后,立即關閉第一個comsumer,這樣消息就會因為第一個comsumer沒接收成功跑到第二個comsumer去,以此類推。

Python38 RabbitMQ 消息隊列
關閉第二個comsumer,第三個comsumer收到信息

Python38 RabbitMQ 消息隊列
這張圖是將三個comsumer同時都關閉了,這樣三個comsumer都收不到消息,說明producer的消息沒有被接收,此時再去開啟第一個comsumer,這時第一個comsumer會將消息給接收過來。

我們將sleep注釋掉,也是這種現(xiàn)象,這是因為comsumer并沒有發(fā)送確認消息給producer

import pika
import time

# 建立實例
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
# 聲明管道
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):  
    print(ch, method, properties) 

    print(" [x] Received %r" % body)
    time.sleep(30)  
    ch.basic_ack(delivery_tag = method.delivery_tag)   # 告訴生成者,消息處理完成

channel.basic_consume(  
        'hello',  
        callback, 
        # auto_ack=True  

        )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 

此時的代碼:當其中一個comsumer執(zhí)行完成,并發(fā)送確認消息后再去中斷,下一個comsumer就不會收到消息;反之,如果還沒發(fā)送確認消息就中斷了,那么消息就會被下一個comsumer接收到。

rabbitmq 消息持久化

如果producer端宕機,那么隊列的數(shù)據(jù)也會消失;這樣就需要讓隊列消息持久化

# durable=True 該代碼只是將生成的隊列持久化(不是消息),如果producer宕機,隊列會存在,單消息會丟
# 要注意需要在producer端和 comsumer端都要 寫durable=True
channel.queue_declare(queue='hello',durable=True) 
在centos重新開啟 rabbitmq-server start

在producer端

將producer代碼執(zhí)行三次,將三個消息放入隊列

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost',5672)  
    )
channel = connection.channel()
channel.queue_declare(queue='hello',durable=True)

channel.basic_publish(exchange='',
                      routing_key='hello',  
                      body='Hello World!',
                      # 下面的代碼是讓消息持久化
                      properties = pika.BasicProperties(delivery_mode=2)
                      )  
print(" [x] Sent 'Hello World!'")
connection.close()  

將producer代碼執(zhí)行三次,將三個消息放入隊列
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello',durable=True)
def callback(ch, method, properties, body): 
    print(ch, method, properties) 

    print(" [x] Received %r" % body)
    # time.sleep(30) #注釋掉
    ch.basic_ack(delivery_tag = method.delivery_tag)  

channel.basic_consume( 
        'hello', 
        callback
        )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 

Python38 RabbitMQ 消息隊列
可以看到因為producer執(zhí)行了三次,所以運行comsumer端收到了三條消息


  • 協(xié)商處理
producer端沒有改變

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost',5672)  
    )
channel = connection.channel()
channel.queue_declare(queue='hello',durable=True)

channel.basic_publish(exchange='',
                      routing_key='hello',  
                      body='Hello World!',
                      properties = pika.BasicProperties(delivery_mode=2)
                      )  
print(" [x] Sent 'Hello World!'")
connection.close()  
comsumer 1(消費者:1)

import pika 
import time 

connection = pika.BlockingConnection(pika.ConnectionParameters( 
               'localhost')) 
channel = connection.channel() 
channel.queue_declare(queue='hello',durable=True) 
def callback(ch, method, properties, body):  
    print(ch, method, properties)  

    print(" [x] Received %r" % body) 
    # time.sleep(30) #注釋掉 
    ch.basic_ack(delivery_tag = method.delivery_tag)   

# channel.basic_qos可以使其消費者最多同時多少個消息;如果其中一個消費者處理慢(如:CPU處理性能低下),達到了最多處理的限制的話 生產(chǎn)者就不會再發(fā)送給該消費者。

channel.basic_qos(prefetch_count=1)  #這里限制最多同時只處理1個消息

channel.basic_consume(  
        'hello',  
        callback 
        ) 

print(' [*] Waiting for messages. To exit press CTRL+C') 
channel.start_consuming()  #  

Python38 RabbitMQ 消息隊列

此時有兩個comsumer模塊,comsumer2比comsumer1多用了sleep 30秒來模擬性能處理慢的情況

comsumer 2(消費者:2)
復制一個comsumer模塊為comsumer2

import pika 
import time 

connection = pika.BlockingConnection(pika.ConnectionParameters( 
               'localhost')) 
channel = connection.channel() 
channel.queue_declare(queue='hello',durable=True) 
def callback(ch, method, properties, body):  
    print(ch, method, properties)  

    print(" [x] Received %r" % body) 
    time.sleep(30) #comsumer2這里sleep30秒
    ch.basic_ack(delivery_tag = method.delivery_tag)   

channel.basic_qos(prefetch_count=1)  

channel.basic_consume(  
        'hello',  
        callback 
        ) 

print(' [*] Waiting for messages. To exit press CTRL+C') 
channel.start_consuming()  #  

Python38 RabbitMQ 消息隊列

Python38 RabbitMQ 消息隊列

我們運行兩個comsumer后,一直去運行producer。 可以看到comsumer 1接收到了3條信息,而comsumer 2只接收到了1條信息,這是因為comsumer 2 sleep了30秒來模擬信息處理慢的情況;
comsumer 1 和 comsumer 2都指定了同時只能處理1條信息,producer會與comsumer 2協(xié)商,因為comsumer2一直沒有處理完限制的1條信息,所以信息都被comsumer 1處理了。


Rabbitmq fanout廣播模式

Python38 RabbitMQ 消息隊列

新建fanout_publiser模塊,用于發(fā)送廣播的producer

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()

# 定義一個轉發(fā)器叫l(wèi)ogs,屬于一個中間人的角色,用于將producer的消息轉發(fā)給消費者(comsumer)
# 定義廣播類型使用fanout
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# message = ''.join(sys.argv[1:]) or "info: Hello World!"
message = "info: Hello World!"

channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)   # routing_key為空即可,因為是廣播沒有定義隊列,所以也不需要指定隊列,但這里必須要定義為空

print(" [x] Send %r " % message)

connection.close()
新建fanout_consumer模塊,用于接收廣播的消費者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost') )

channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

# 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費斷開后,自動將queue刪除
# 就是這里會隨機生成一個隨機的唯一queue,用完之后會將生成的queue刪除
# 這里要寫queue='',如果不指定隊列名字,但也要寫一個空的字符串,不然會報錯缺少參數(shù)
result = channel.queue_declare(queue='',exclusive=True)

queue_name = result.method.queue  # 拿到隨機生成的queue名字

# producer綁定了logs轉發(fā)器
# 消費者將隨機生成的隊列也綁定了logs轉發(fā)器
# 這樣producer將消息交給logs轉發(fā)器,logs轉發(fā)器將消息交給對應綁定的隨機隊列,消費者從隊列里在拿消息
channel.queue_bind(exchange='logs',queue=queue_name)

print('[*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print("[x] %r" % body)

channel.basic_consume(

    queue=queue_name, on_message_callback=callback

    # auto_ack=True  # 寫的話,如果接收消息,機器宕機消息就丟了
                        )

channel.start_consuming()

Python38 RabbitMQ 消息隊列
因為是廣播,所以兩個consumer都收到了發(fā)送者發(fā)送的消息。
不過有一點要注意!?。。。。。。?!
要先運行consumer(接收者),在運行發(fā)送者。就好比收音機一樣,只有你先打開收音機,發(fā)送者才能將信息發(fā)給你。 如果發(fā)送者先發(fā)送,你卻沒有接收,之前發(fā)送的信息,你就不會再接收到了。


Rabbitmq direct廣播模式

Python38 RabbitMQ 消息隊列
direct 可以區(qū)分廣播,將指定的消息發(fā)送給指定的接收者;
圖中顯示了將error級別消息發(fā)送給C1,將info、error、warning級別消息發(fā)送給C2。

producer

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

# 定義消息級別
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'

message = ''.join(sys.argv[2:])  or "direct info: Hello World!"
# message = "direct info: Hello World!"

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)   # routing_key為空即可,因為是廣播沒有定義隊列,所以也不需要指定隊列,但這里必須要定義為空

print(" [x] Send %r " % message)

connection.close()

consumer

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost') )

channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(queue='',exclusive=True)

queue_name = result.method.queue

# 獲取參數(shù)列表
log_levels = sys.argv[1:]

if not log_levels: # 如果沒有參數(shù),就報錯,提示要指定消息級別
    sys.stderr.write("Usage: %s [info] [warning] [error] \n" % sys.argv[0])
    sys.exit(1) # 沒有參數(shù)就退出程序

# print(log_levels)

for severity in log_levels:  # 循環(huán)參數(shù)列表并綁定
    channel.queue_bind(
        exchange='direct_logs',
        queue=queue_name,
        routing_key=severity
    ) #所有發(fā)送到severity的參數(shù),都接收

print('[*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print('[x] %r:%r' % (method.routing_key, body))

channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

channel.start_consuming()

下面在centos中運行代碼

Python38 RabbitMQ 消息隊列
運行C1,只接收error的消息

Python38 RabbitMQ 消息隊列
運行C2,接收 info、warning、error的消息

Python38 RabbitMQ 消息隊列

Python38 RabbitMQ 消息隊列
producer運行,指定發(fā)送消息給error,可以看到兩個consumer都接收到了error的消息

Python38 RabbitMQ 消息隊列
只有C2接收到了warning的消息


RabbitMQ topic細致的消息過濾廣播模式

producer

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

# 定義消息級別
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'  # 發(fā)送*.info的信息

message = ''.join(sys.argv[2:])  or "topic info: Hello World!"
# message = "direct info: Hello World!"

channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)   

print(" [x] Send %r:%r " % (routing_key,message))

connection.close()

consumer


import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost') )

channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare(queue='',exclusive=True)

queue_name = result.method.queue

binding_keys = sys.argv[1:]

if not binding_keys:
    sys.stderr.write('Usage: %s [binding_key]...\n' % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs',
        queue=queue_name,
        routing_key=binding_key
    )

print('[*] Waiting for logs. To exit press CTRL+c')

def callback(ch, method, properties, body):
    print('[x %r:%r' % (method.routing_key, body))

channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

channel.start_consuming()

Python38 RabbitMQ 消息隊列
圖中顯示過濾中間有".orange."的數(shù)據(jù),過濾以rabbit為結尾的數(shù)據(jù),過濾以lazy開頭的數(shù)據(jù)。

Python38 RabbitMQ 消息隊列

Python38 RabbitMQ 消息隊列

運行了兩個consumer。C1接收.info為結尾的數(shù)據(jù),C2接收.error為結尾和MySQL為開頭的數(shù)據(jù)。

Python38 RabbitMQ 消息隊列
在運行publisher(已經(jīng)定義了發(fā)送anonymous.info,相當于以.info為結尾的信息)

Python38 RabbitMQ 消息隊列
C1接收到了信息

Python38 RabbitMQ 消息隊列
執(zhí)行publisher代碼時 后面加上 test.error,然后此時在去看C2

Python38 RabbitMQ 消息隊列
C2 看到test.error相關信息

Python38 RabbitMQ 消息隊列
執(zhí)行publisher代碼 加上 mysql.info,這樣C1和C2都可以收到消息了

Python38 RabbitMQ 消息隊列

Python38 RabbitMQ 消息隊列

Python38 RabbitMQ 消息隊列

運行C3,代碼后面加一個 '#' 符號,表示C3可以接收所有信息(注意#號要被引號括起來)

Python38 RabbitMQ 消息隊列
在publisher隨意發(fā)送信息,C3都能收到


網(wǎng)站欄目:Python38RabbitMQ消息隊列
文章位置:http://weahome.cn/article/ggooeg.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部