一、消息確認(rèn)機(jī)制
創(chuàng)新互聯(lián)公司專注于平順企業(yè)網(wǎng)站建設(shè),自適應(yīng)網(wǎng)站建設(shè),商城網(wǎng)站建設(shè)。平順網(wǎng)站建設(shè)公司,為平順等地區(qū)提供建站服務(wù)。全流程按需策劃,專業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,創(chuàng)新互聯(lián)公司專業(yè)和態(tài)度為您提供的服務(wù)rabbitmq在發(fā)送消息后立即從內(nèi)存中刪除消息,因此如果消費(fèi)者處理消息耗時(shí)較長,在處理過程中消費(fèi)者被kill,則處理中的消息、以及其他發(fā)往該消費(fèi)者的消息都將丟失。
為了保證消息不丟失,rabbitmq支持消息確認(rèn)機(jī)制,消費(fèi)者可以發(fā)送ack告訴rabbitm指定消息已經(jīng)收到并處理,因此rabbitmq可以刪除該消息。
如果消費(fèi)者死掉(channel關(guān)閉、connection關(guān)閉、或者TCP connection丟失),導(dǎo)致rabbitmq沒有收到ack,rabbitmq將把消息重入隊(duì)列。
不存在消息超時(shí),這意味著處理一個(gè)消息非常長的時(shí)間也是ok的。
消息確認(rèn)機(jī)制默認(rèn)是開啟的,通過在channel.basic_consume中設(shè)置no_ack=True關(guān)閉。
注意消費(fèi)者在處理消息后,不要忘記調(diào)用channel.basic_ack進(jìn)行消息確認(rèn),否則rabbitmq將不斷消耗內(nèi)存把消息重入隊(duì)列。
二、隊(duì)列/消息持久化
為了防止rabbitmq服務(wù)終止導(dǎo)致隊(duì)列和消息丟失,需要將隊(duì)列和消息標(biāo)記為持久化的:
確保rabbitmq永遠(yuǎn)不丟失隊(duì)列,需要將隊(duì)列 聲明為持久化的:
將消息聲明為持久化的:
注意:盡管已經(jīng)很健壯了,但是仍然無法完全保證消息不會(huì)丟失,例如rabbitmq接收消息但是還沒有保存到硬盤的情況。
三、exchange
簡單的說,exchange的一端接收消息,另一端把消息放進(jìn)隊(duì)列。
在rabbitmq中生產(chǎn)者不會(huì)將請求直接發(fā)送給消費(fèi)者,生產(chǎn)值只會(huì)把消息發(fā)給exchange,exchange收到消息后需要知道怎么做:添加到特定隊(duì)列、添加到多個(gè)隊(duì)列、還是丟棄。
exchange的類型包括direct,topic,headers,fanout
四、綁定
exchange和queue之間的聯(lián)系被稱為綁定(binding),可以簡單的看:隊(duì)列對(duì)于特定exchange上的消息感興趣
channel.queue_bind(exchange='logs', queue=result.method.queue)此時(shí)'logs' exchange將添加消息到指定queue
綁定可以使用一個(gè)額外的routing_key參數(shù),例如:
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')對(duì)于fanout類型的exchange來說,routing_key參數(shù)是被忽略的
五、topic exchange
發(fā)往topic exchange的消息不能攜帶任意的routing_key,必須是以點(diǎn)隔開的一串字符,大255個(gè)字節(jié)
binding key也必須是相同的形式,注意binding key有兩個(gè)重要的特殊情況:
* 可以替代一個(gè)單詞
#可以替代零個(gè)或多個(gè)單詞
例如,如果binding key是*.orange.*,則可以匹配所有
如果binding key是lazy.#,則類似于帶有l(wèi)azy.orange.male.rabbit的key的消息可以匹配。
topic exchange非常強(qiáng)大,通過匹配routing_key可以表現(xiàn)的像存在多個(gè)exchange
六、RPC
為了接收響應(yīng),客戶端需要在發(fā)送請求時(shí)附加發(fā)送回調(diào)隊(duì)列地址:
result = channel.queue_declare(exclusive=True) callback_queue = result.method.queue channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = callback_queue, ), body=request) # ... and some code to read a response message from the callback_queue ...另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)cdcxhl.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。