真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

如何用Python集成ActiveMQ

ActiveMQ是一個非常流行的消息隊列服務(wù)中間件,實現(xiàn)JMS規(guī)范,基于STOMP協(xié)議(端口為61613)支持Python訪問。

專注于為中小企業(yè)提供做網(wǎng)站、成都網(wǎng)站建設(shè)服務(wù),電腦端+手機端+微信端的三站合一,更高效的管理,為中小企業(yè)田家庵免費做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了近1000家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實現(xiàn)規(guī)模擴充和轉(zhuǎn)變。

JMS:Java Message Service

STOMP:Simple(or Streaming) Text Orientated Messaging Protocol,簡單(流)文本定向消息協(xié)議

JMS規(guī)范定義了2類消息發(fā)送接收模型:點對點queue,發(fā)布訂閱topic,區(qū)別是能夠重復(fù)消費和是否保存。

1,點對點queue:不可重復(fù)消費,消息被消費前一直保存。

生產(chǎn)者發(fā)送消息到queue,一個消費者取出并消費消息。

消息被消費后,queue中不再保存,所有只有一個消費者能夠取到消息。

queue支持多個消費者存在,但是一個消息只有一個消費者可以消費。

當前沒有消費者時,消息一直保存,直到被消費者消費。

如何用Python集成ActiveMQ

2,發(fā)布訂閱topic:可重復(fù)消費,發(fā)布給所有訂閱者。

生產(chǎn)者發(fā)布消息到topic中,多個訂閱者收到并消費消息。

和queue不同,發(fā)布到topic中的消息會被所有訂閱者消費。

當生產(chǎn)者發(fā)布消息時,不管是否有訂閱者,都不保存消息。

如何用Python集成ActiveMQ

JMS規(guī)范定義的2類消息傳輸模型queue和topic比較:


Queue

Topic

模型

點對點Point-to-Point

發(fā)布訂閱publish/subscribe

有無狀態(tài)

queue消息在消費前被一直保存在mq服務(wù)器上的文件或者配置DB

topic數(shù)據(jù)默認不保存,是無狀態(tài)的。

完整性保障

queue保證每條消息都被消費者接收到

topic不保證生產(chǎn)者發(fā)布的每條消息都被訂閱者接收到

消息是否會丟失

生產(chǎn)者發(fā)送消息到queue,消費者接收到消息。如果沒有消費者,將一直保存,不會丟失。

生產(chǎn)者發(fā)布消息到topic時,當前的訂閱者都能夠接收到消息。如果當前沒有訂閱者,該消息就丟失。

消息發(fā)布接收策略

一對一的消息發(fā)布接收策略,一個生產(chǎn)者發(fā)送的消息只被一個消費者接收。mq服務(wù)器收到回復(fù)后,將這個消息刪除。

一對多的消息發(fā)布接收策略,同一個topic的多個訂閱者都能收到生產(chǎn)者發(fā)布的消息。

Python集成ActiveMQ使用stomp.py,只需簡單配置,本文在Django框架下進一步封裝服務(wù)mq_service.py。典型系統(tǒng)架構(gòu)示意圖和消息隊列:

如何用Python集成ActiveMQ

時序圖如下:

如何用Python集成ActiveMQ

示例代碼:https://github.com/rickding/HelloPython/tree/master/hello_activemq

├── settings.py

├── mq

│   └── mq_service.py

│   └── mq_listener.py

├── tests

│   └── test_mq_service.py

├── management

│   └── commands

│        └── mq.py

一,Python集成ActiveMQ


代碼文件

功能要點

Python集成ActiveMQ

requirements.txt

安裝stomp.py:

stomp.py >= 5.0.1

封裝服務(wù)

mq_serivce.py

封裝ActiveMQ的消息發(fā)送和處理功能。在Django框架下,將地址等配置在settings.py中集中管理,注意端口為61613

接收處理消息

mq_listener.py

增加消息接收處理類,繼承stomp.ConnectionListener

啟動消息監(jiān)聽服務(wù)

mq.py

在Django框架下,將啟動服務(wù)代碼封裝成command,方便調(diào)用和維護。

單元測試

test_mq_serivce.py

測試封裝的功能函數(shù)

功能調(diào)用

views.py

增加REST接口/chk/mq,調(diào)用mq_service發(fā)送消息

1. 新建Django項目,運行:django-admin startproject hello_activemq

2. 進到目錄hello_activemq,增加應(yīng)用:python manage.py startapp app

如何用Python集成ActiveMQ

項目的目錄文件結(jié)構(gòu)如下:

如何用Python集成ActiveMQ

3. 安裝stomp.py,pip install stomp.py >= 5.0.1

二,封裝服務(wù)mq_service.py,調(diào)用ActiveMQ發(fā)送消息

1. 增加mq_service.py:

importjson
importlogging
importstomp
fromdjango.confimportsettings

log = logging.getLogger(__name__)

defsend_msg(msg_dict,queue_or_topic=settings.MQ_QUEUE):
    conn = stomp.Connection10([(settings.MQ_URL,settings.MQ_PORT)])
    conn.connect(settings.MQ_USER,settings.MQ_PASSWORD)

    msg_str = json.dumps(msg_dict)
    log.info('Send msg: %s, %s, %s'% (type(msg_dict),type(msg_str),msg_str))
    conn.send(queue_or_topic,msg_str)
    conn.disconnect()

2. 打開settings.py,配置ActiveMQ信息:

MQ_URL ='127.0.0.1'
MQ_PORT =61613
MQ_USER ='admin'
MQ_PASSWORD ='admin'
MQ_QUEUE ='/queue/SampleQueue'
MQ_TOPIC ='/topic/SampleTopic'

3. 為了增加代碼的兼容和容錯能力,封裝get_conn(), close_conn()等輔助函數(shù),詳見代碼文件mq_service.py。

三,接收處理消息mq_listener.py

1. 增加mq_listener.py,聲明消息處理類,繼承stomp.ConnectionListener:

importjson
importlogging
importstomp

log = logging.getLogger(__name__)

classMqListener(stomp.ConnectionListener):
    defon_message(self,headers,msg_str):
        log.info('Receive msg: %s, %s, %s'% (type(msg_str),msg_str,headers))

        msg_dict =None
        try:
            msg_dict = json.loads(msg_str)
        exceptExceptionase:
            log.warning('Exception when parse msg: %s'%str(e))

        log.info('Parsed msg: {}, {}'.format(type(msg_dict),msg_dict))

    defon_error(self,headers,msg_str):
        log.info('Error msg: %s, %s, %s'% (type(msg_str),msg_str,headers))

2. 在on_message()函數(shù)中,將消息字符串解析為json,方便業(yè)務(wù)處理。

3. 聲明on_error()函數(shù)處理錯誤信息。

四,啟動消息監(jiān)聽服務(wù)mq.py

1. 將循環(huán)接收消息代碼封裝成函數(shù)consume_msg(),增加在服務(wù)中mq_serivce.py:

importlogging
importtime
importstomp
fromdjango.confimportsettings

log = logging.getLogger(__name__)

defconsume_msg(listener,queue=settings.MQ_QUEUE,topic=settings.MQ_TOPIC):
    conn = stomp.Connection10([(settings.MQ_URL,settings.MQ_PORT)])
    conn.connect(settings.MQ_USER,settings.MQ_PASSWORD)
    
    conn.set_listener('',listener)
    conn.subscribe(queue)
    conn.subscribe(topic)

    while1:
        time.sleep(1000)  # secs

    conn.disconnect()

2. 調(diào)用set_listener()設(shè)置消息接收類實例,使用之前創(chuàng)建的MqListener

3. 調(diào)用subscribe()訂閱消息,啟動循環(huán)監(jiān)聽。

4. 我們將啟動服務(wù)代碼封裝成command,在目錄management/commands中增加mq.py

importlogging
fromdjango.core.management.baseimportBaseCommand
fromhello_activemq.mqimportmq_serviceasmq
fromhello_activemq.mq.mq_listenerimportMqListener

log = logging.getLogger(__name__)


classCommand(BaseCommand):
    help ='mq starts listener'

    defhandle(self,*args,**options):
        log.info("mq starts")
        returnmq.consume_msg(MqListener())

5. 運行命令python manage.py mq,看到消息提示,啟動監(jiān)聽服務(wù)成功。

如何用Python集成ActiveMQ

五,單元測試test_mq_service.py

增加測試函數(shù),發(fā)送消息:

importlogging
fromdjango.testimportTestCase
fromhello_activemq.mqimportmq_serviceasmq

log = logging.getLogger(__name__)

classMQServiceTest(TestCase):
    deftest_send_msg(self):
        msg_dict = {'content':'test msg dict','msg':'msg from python'}
        mq.send_msg_to_queue(msg_dict)
        mq.send_msg_to_topic({'msg':"test msg from python"})

運行python manage.py test,同時看到監(jiān)聽服務(wù)收到并處理消息:

如何用Python集成ActiveMQ

六,發(fā)送消息功能調(diào)用

1. 在views.py中發(fā)送消息,調(diào)用mq_servcie.py

importjson
fromdjango.httpimportHttpResponse
fromhello_activemq.mqimportmq_serviceasmq

defchk_mq(req):
    msg_dict = {
        'url': req.get_raw_uri(),
        'path': req.get_full_path(),
        'host': req.get_host(),
    }

    mq.send_msg_to_queue(msg_dict)
    mq.send_msg_to_topic(msg_dict)

    returnHttpResponse(json.dumps(msg_dict))

2. 在urls.py中配置路由

fromdjango.urlsimportpath
fromapp.viewsimportchk_mq

urlpatterns = [
    path('',chk_mq,name='chk'),
]

3. 運行命令啟動服務(wù):python manage.py runserver 0.0.0.0:8001

如何用Python集成ActiveMQ

4. REST接口發(fā)送消息

如何用Python集成ActiveMQ

七,常見問題和解決方法

1. 啟動服務(wù)錯誤:[transport.py: 787, attempt_connection] Could not connect to host 127.0.0.1, port 61613

解決:檢查ActiveMQ是否正常啟動,特別注意是否開啟STOMP協(xié)議端口61613

原因:Python連接ActiveMQ使用STOMP協(xié)議,端口默認61613

2. 發(fā)送消息時錯誤:TypeError: message should be a string or bytes, found

解決:將消息內(nèi)容序列化為JSON,發(fā)送時調(diào)用json.dumps(),接收時調(diào)用json.loads()

原因:Python連接ActiveMQ使用的是STOMP協(xié)議,消息格式為簡單文本。

注:JMS規(guī)范定義的5類消息:

字符串TextMessage,

鍵值對MapMessage,

序列化對象ObjectMessage

字節(jié)流BytesMessage

數(shù)據(jù)流StreamMessage

如何用Python集成ActiveMQ

ActiveMQ支持5類JMS消息,增加了二進制大文件消息BlobMessage:

如何用Python集成ActiveMQ


3. 跨系統(tǒng)對接時接收到的消息類型不是TextMessage

Python開發(fā)的業(yè)務(wù)處理服務(wù) -> Java開發(fā)的API服務(wù),接收到的消息類型為BytesMessage,Python發(fā)送時設(shè)置conn.send('xx', msg_str, content_type="text/plain")仍然接收不到期望的類型TextMessage

解決:stomp建立連接時配置參數(shù)conn = stomp.Connection10([("localhost", 61613)],auto_content_length=False)

原因:Python連接ActiveMQ使用STOMP協(xié)議,消息格式為簡單文本,不攜帶類型信息,只通過header中的content-length來判斷TextMessage和BytesMessage,所以發(fā)送消息時不在header中添加content-length就可以了。


網(wǎng)站欄目:如何用Python集成ActiveMQ
文章轉(zhuǎn)載:http://weahome.cn/article/ijodch.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部