真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Java消息隊(duì)列什么時候使用-創(chuàng)新互聯(lián)

本篇內(nèi)容主要講解“Java消息隊(duì)列什么時候使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“Java消息隊(duì)列什么時候使用”吧!

創(chuàng)新互聯(lián)為企業(yè)提供:成都品牌網(wǎng)站建設(shè)、網(wǎng)絡(luò)營銷策劃、微信平臺小程序開發(fā)、營銷型網(wǎng)站建設(shè)和網(wǎng)站運(yùn)營托管,一站式網(wǎng)絡(luò)營銷整體服務(wù)。實(shí)現(xiàn)不斷獲取潛在客戶之核心目標(biāo),建立了企業(yè)專屬的“營銷型網(wǎng)站建設(shè)”,就用不著再為了獲取潛在客戶而苦惱,相反,客戶會主動找您,生意就找上門來了!

何時需要消息隊(duì)列

當(dāng)你需要使用消息隊(duì)列時,首先需要考慮它的必要性??梢允褂胢q的場景有很多,最常用的幾種,是做業(yè)務(wù)解耦/最終一致性/廣播/錯峰流控等。反之,如果需要強(qiáng)一致性,關(guān)注業(yè)務(wù)邏輯的處理結(jié)果,則RPC顯得更為合適。

解耦

解耦是消息隊(duì)列要解決的最本質(zhì)問題。所謂解耦,簡單點(diǎn)講就是一個事務(wù),只關(guān)心核心的流程。而需要依賴其他系統(tǒng)但不那么重要的事情,有通知即可,無需等待結(jié)果。換句話說,基于消息的模型,關(guān)心的是“通知”,而非“處理”。
比如在美團(tuán)旅游,我們有一個產(chǎn)品中心,產(chǎn)品中心上游對接的是主站、移動后臺、旅游供應(yīng)鏈等各個數(shù)據(jù)源;下游對接的是篩選系統(tǒng)、API系統(tǒng)等展示系統(tǒng)。當(dāng)上游的數(shù)據(jù)發(fā)生變更的時候,如果不使用消息系統(tǒng),勢必要調(diào)用我們的接口來更新數(shù)據(jù),就特別依賴產(chǎn)品中心接口的穩(wěn)定性和處理能力。但其實(shí),作為旅游的產(chǎn)品中心,也許只有對于旅游自建供應(yīng)鏈,產(chǎn)品中心更新成功才是他們關(guān)心的事情。而對于團(tuán)購等外部系統(tǒng),產(chǎn)品中心更新成功也好、失敗也罷,并不是他們的職責(zé)所在。他們只需要保證在信息變更的時候通知到我們就好了。
而我們的下游,可能有更新索引、刷新緩存等一系列需求。對于產(chǎn)品中心來說,這也不是我們的職責(zé)所在。說白了,如果他們定時來拉取數(shù)據(jù),也能保證數(shù)據(jù)的更新,只是實(shí)時性沒有那么強(qiáng)。但使用接口方式去更新他們的數(shù)據(jù),顯然對于產(chǎn)品中心來說太過于“重量級”了,只需要發(fā)布一個產(chǎn)品ID變更的通知,由下游系統(tǒng)來處理,可能更為合理。
再舉一個例子,對于我們的訂單系統(tǒng),訂單最終支付成功之后可能需要給用戶發(fā)送短信積分什么的,但其實(shí)這已經(jīng)不是我們系統(tǒng)的核心流程了。如果外部系統(tǒng)速度偏慢(比如短信網(wǎng)關(guān)速度不好),那么主流程的時間會加長很多,用戶肯定不希望點(diǎn)擊支付過好幾分鐘才看到結(jié)果。那么我們只需要通知短信系統(tǒng)“我們支付成功了”,不一定非要等待它處理完成。

最終一致性

最終一致性指的是兩個系統(tǒng)的狀態(tài)保持一致,要么都成功,要么都失敗。當(dāng)然有個時間限制,理論上越快越好,但實(shí)際上在各種異常的情況下,可能會有一定延遲達(dá)到最終一致狀態(tài),但最后兩個系統(tǒng)的狀態(tài)是一樣的。
業(yè)界有一些為“最終一致性”而生的消息隊(duì)列,如Notify(阿里)、QMQ(去哪兒)等,其設(shè)計初衷,就是為了交易系統(tǒng)中的高可靠通知。
以一個銀行的轉(zhuǎn)賬過程來理解最終一致性,轉(zhuǎn)賬的需求很簡單,如果A系統(tǒng)扣錢成功,則B系統(tǒng)加錢一定成功。反之則一起回滾,像什么都沒發(fā)生一樣。
然而,這個過程中存在很多可能的意外:

  1. A扣錢成功,調(diào)用B加錢接口失敗。

  2. A扣錢成功,調(diào)用B加錢接口雖然成功,但獲取最終結(jié)果時網(wǎng)絡(luò)異常引起超時。

  3. A扣錢成功,B加錢失敗,A想回滾扣的錢,但A機(jī)器down機(jī)。

可見,想把這件看似簡單的事真正做成,真的不那么容易。所有跨VM的一致性問題,從技術(shù)的角度講通用的解決方案是:

  1. 強(qiáng)一致性,分布式事務(wù),但落地太難且成本太高,后文會具體提到。

  2. 最終一致性,主要是用“記錄”和“補(bǔ)償”的方式。在做所有的不確定的事情之前,先把事情記錄下來,然后去做不確定的事情,結(jié)果可能是:成功、失敗或是不確定,“不確定”(例如超時等)可以等價為失敗。成功就可以把記錄的東西清理掉了,對于失敗和不確定,可以依靠定時任務(wù)等方式把所有失敗的事情重新搞一遍,直到成功為止。
    回到剛才的例子,系統(tǒng)在A扣錢成功的情況下,把要給B“通知”這件事記錄在庫里(為了保證最高的可靠性可以把通知B系統(tǒng)加錢和扣錢成功這兩件事維護(hù)在一個本地事務(wù)里),通知成功則刪除這條記錄,通知失敗或不確定則依靠定時任務(wù)補(bǔ)償性地通知我們,直到我們把狀態(tài)更新成正確的為止。
    整個這個模型依然可以基于RPC來做,但可以抽象成一個統(tǒng)一的模型,基于消息隊(duì)列來做一個“企業(yè)總線”。
    具體來說,本地事務(wù)維護(hù)業(yè)務(wù)變化和通知消息,一起落地(失敗則一起回滾),然后RPC到達(dá)broker,在broker成功落地后,RPC返回成功,本地消息可以刪除。否則本地消息一直靠定時任務(wù)輪詢不斷重發(fā),這樣就保證了消息可靠落地broker。
    broker往consumer發(fā)送消息的過程類似,一直發(fā)送消息,直到consumer發(fā)送消費(fèi)成功確認(rèn)。
    我們先不理會重復(fù)消息的問題,通過兩次消息落地加補(bǔ)償,下游是一定可以收到消息的。然后依賴狀態(tài)機(jī)版本號等方式做判重,更新自己的業(yè)務(wù),就實(shí)現(xiàn)了最終一致性。

最終一致性不是消息隊(duì)列的必備特性,但確實(shí)可以依靠消息隊(duì)列來做最終一致性的事情。另外,所有不保證100%不丟消息的消息隊(duì)列,理論上無法實(shí)現(xiàn)最終一致性。好吧,應(yīng)該說理論上的100%,排除系統(tǒng)嚴(yán)重故障和bug。
像Kafka一類的設(shè)計,在設(shè)計層面上就有丟消息的可能(比如定時刷盤,如果掉電就會丟消息)。哪怕只丟千分之一的消息,業(yè)務(wù)也必須用其他的手段來保證結(jié)果正確。

廣播

消息隊(duì)列的基本功能之一是進(jìn)行廣播。如果沒有消息隊(duì)列,每當(dāng)一個新的業(yè)務(wù)方接入,我們都要聯(lián)調(diào)一次新接口。有了消息隊(duì)列,我們只需要關(guān)心消息是否送達(dá)了隊(duì)列,至于誰希望訂閱,是下游的事情,無疑極大地減少了開發(fā)和聯(lián)調(diào)的工作量。
比如本文開始提到的產(chǎn)品中心發(fā)布產(chǎn)品變更的消息,以及景點(diǎn)庫很多去重更新的消息,可能“關(guān)心”方有很多個,但產(chǎn)品中心和景點(diǎn)庫只需要發(fā)布變更消息即可,誰關(guān)心誰接入。

錯峰與流控

試想上下游對于事情的處理能力是不同的。比如,Web前端每秒承受上千萬的請求,并不是什么神奇的事情,只需要加多一點(diǎn)機(jī)器,再搭建一些LVS負(fù)載均衡設(shè)備和Nginx等即可。但數(shù)據(jù)庫的處理能力卻十分有限,即使使用SSD加分庫分表,單機(jī)的處理能力仍然在萬級。由于成本的考慮,我們不能奢求數(shù)據(jù)庫的機(jī)器數(shù)量追上前端。
這種問題同樣存在于系統(tǒng)和系統(tǒng)之間,如短信系統(tǒng)可能由于短板效應(yīng),速度卡在網(wǎng)關(guān)上(每秒幾百次請求),跟前端的并發(fā)量不是一個數(shù)量級。但用戶晚上個半分鐘左右收到短信,一般是不會有太大問題的。如果沒有消息隊(duì)列,兩個系統(tǒng)之間通過協(xié)商、滑動窗口等復(fù)雜的方案也不是說不能實(shí)現(xiàn)。但系統(tǒng)復(fù)雜性指數(shù)級增長,勢必在上游或者下游做存儲,并且要處理定時、擁塞等一系列問題。而且每當(dāng)有處理能力有差距的時候,都需要單獨(dú)開發(fā)一套邏輯來維護(hù)這套邏輯。所以,利用中間系統(tǒng)轉(zhuǎn)儲兩個系統(tǒng)的通信內(nèi)容,并在下游系統(tǒng)有能力處理這些消息的時候,再處理這些消息,是一套相對較通用的方式。

總而言之,消息隊(duì)列不是萬能的。對于需要強(qiáng)事務(wù)保證而且延遲敏感的,RPC是優(yōu)于消息隊(duì)列的。
對于一些無關(guān)痛癢,或者對于別人非常重要但是對于自己不是那么關(guān)心的事情,可以利用消息隊(duì)列去做。
支持最終一致性的消息隊(duì)列,能夠用來處理延遲不那么敏感的“分布式事務(wù)”場景,而且相對于笨重的分布式事務(wù),可能是更優(yōu)的處理方式。
當(dāng)上下游系統(tǒng)處理能力存在差距的時候,利用消息隊(duì)列做一個通用的“漏斗”。在下游有能力處理的時候,再進(jìn)行分發(fā)。

如果下游有很多系統(tǒng)關(guān)心你的系統(tǒng)發(fā)出的通知的時候,果斷地使用消息隊(duì)列吧。

消息隊(duì)列的流派之爭

這篇文章的標(biāo)題很難起,網(wǎng)上一翻全是各種MQ的性能比較,很容易讓人以為我也是這么“粗俗”的人(o(╯□╰)o)。我這篇文章想要表達(dá)的是——它們根本不是一個東西,有毛的性能好比較?

MQ是什么

Message Queue(MQ),消息隊(duì)列中間件。很多人都說:MQ通過將消息的發(fā)送和接收分離來實(shí)現(xiàn)應(yīng)用程序的異步和解偶,這個給人的直覺是——MQ是異步的,用來解耦的,但是這個只是MQ的效果而不是目的。MQ真正的目的是為了通訊,屏蔽底層復(fù)雜的通訊協(xié)議,定義了一套應(yīng)用層的、更加簡單的通訊協(xié)議。一個分布式系統(tǒng)中兩個模塊之間通訊要么是HTTP,要么是自己開發(fā)的TCP,但是這兩種協(xié)議其實(shí)都是原始的協(xié)議。HTTP協(xié)議很難實(shí)現(xiàn)兩端通訊——模塊A可以調(diào)用B,B也可以主動調(diào)用A,如果要做到這個兩端都要背上WebServer,而且還不支持長連接(HTTP 2.0的庫根本找不到)。TCP就更加原始了,粘包、心跳、私有的協(xié)議,想一想頭皮就發(fā)麻。MQ所要做的就是在這些協(xié)議之上構(gòu)建一個簡單的“協(xié)議”——生產(chǎn)者/消費(fèi)者模型。MQ帶給我的“協(xié)議”不是具體的通訊協(xié)議,而是更高層次通訊模型。它定義了兩個對象——發(fā)送數(shù)據(jù)的叫生產(chǎn)者;消費(fèi)數(shù)據(jù)的叫消費(fèi)者, 提供一個SDK讓我們可以定義自己的生產(chǎn)者和消費(fèi)者實(shí)現(xiàn)消息通訊而無視底層通訊協(xié)議。

MQ的流派

列出功能表來比較MQ差異或者來一場“MQ性能大比武”的做法都是比較扯的,首先要做的事情應(yīng)該是分類。我理解的MQ分為兩個流派

有broker

這個流派通常有一臺服務(wù)器作為Broker,所有的消息都通過它中轉(zhuǎn)。生產(chǎn)者把消息發(fā)送給它就結(jié)束自己的任務(wù)了,Broker則把消息主動推送給消費(fèi)者(或者消費(fèi)者主動輪詢)。

  • 重Topic流

    kafka、JMS就屬于這個流派,生產(chǎn)者會發(fā)送key和數(shù)據(jù)到Broker,由Broker比較key之后決定給那個消費(fèi)者。這種模式是我們最常見的模式,是我們對MQ最多的印象。在這種模式下一個topic往往是一個比較大的概念,甚至一個系統(tǒng)中就可能只有一個topic,topic某種意義上就是queue,生產(chǎn)者發(fā)送key相當(dāng)于說:“hi,把數(shù)據(jù)放到key的隊(duì)列中”。

Java消息隊(duì)列什么時候使用 如上圖所示,Broker定義了三個隊(duì)列,key1,key2,key3,生產(chǎn)者發(fā)送數(shù)據(jù)的時候會發(fā)送key1和data,Broker在推送數(shù)據(jù)的時候則推送data(也可能把key帶上)。雖然架構(gòu)一樣但是kafka的性能要比jms的性能不知道高到多少倍,所以基本這種類型的MQ只有kafka一種備選方案。如果你需要一條暴力的數(shù)據(jù)流(在乎性能而非靈活性)那么kafka是最好的選擇。

  • 輕Topic流

這種的代表是RabbitMQ(或者說是AMQP)。生產(chǎn)者發(fā)送key和數(shù)據(jù),消費(fèi)者定義訂閱的隊(duì)列,Broker收到數(shù)據(jù)之后會通過一定的邏輯計算出key對應(yīng)的隊(duì)列,然后把數(shù)據(jù)交給隊(duì)列。

Java消息隊(duì)列什么時候使用 注意到了嗎?這種模式下解耦了key和queue,在這種架構(gòu)中queue是非常輕量級的(在RabbitMQ中它的上限取決于你的內(nèi)存),消費(fèi)者關(guān)心的只是自己的queue;生產(chǎn)者不必關(guān)心數(shù)據(jù)最終給誰只要指定key就行了,中間的那層映射在AMQP中叫exchange(交換機(jī))。AMQP中有四種種exchange——Direct exchange:key就等于queue;Fanout exchange:無視key,給所有的queue都來一份;Topic exchange:key可以用“寬字符”模糊匹配queue;最后一個厲害了Headers exchange:無視key,通過查看消息的頭部元數(shù)據(jù)來決定發(fā)給那個queue(AMQP頭部元數(shù)據(jù)非常豐富而且可以自定義)。這種結(jié)構(gòu)的架構(gòu)給通訊帶來了很大的靈活性,我們能想到的通訊方式都可以用這四種exchange表達(dá)出來。如果你需要一個企業(yè)數(shù)據(jù)總線(在乎靈活性)那么RabbitMQ絕對的值得一用。

無broker

此門派是AMQP的“叛徒”,某位道友嫌棄AMQP太“重”(那是他沒看到用Erlang實(shí)現(xiàn)的時候是多么的行云流水) 所以設(shè)計了zeromq。這位道友非常睿智,他非常敏銳的意識到——MQ是更高級的Socket,它是解決通訊問題的。所以ZeroMQ被設(shè)計成了一個“庫”而不是一個中間件,這種實(shí)現(xiàn)也可以達(dá)到——沒有broker的目的。

Java消息隊(duì)列什么時候使用 節(jié)點(diǎn)之間通訊的消息都是發(fā)送到彼此的隊(duì)列中,每個節(jié)點(diǎn)都既是生產(chǎn)者又是消費(fèi)者。ZeroMQ做的事情就是封裝出一套類似于scoket的API可以完成發(fā)送數(shù)據(jù),讀取數(shù)據(jù)。如果你仔細(xì)想一下其實(shí)ZeroMQ是這樣的

Java消息隊(duì)列什么時候使用 頓悟了嗎?Actor模型,ZeroMQ其實(shí)就是一個跨語言的、重量級的Actor模型郵箱庫。你可以把自己的程序想象成一個actor,zeromq就是提供郵箱功能的庫;zeromq可以實(shí)現(xiàn)同一臺機(jī)器的IPC通訊也可以實(shí)現(xiàn)不同機(jī)器的TCP、UDP通訊。如果你需要一個強(qiáng)大的、靈活、野蠻的通訊能力,別猶豫zeromq。

MQ只能異步嗎

答案是否定了,首先ZeroMQ支持請求->應(yīng)答模式;其次RabbitMQ提供了RPC是地地道道的同步通訊,只有JMS、kafka這種架構(gòu)才只能做異步。我們很多人第一次接觸MQ都是JMS之類的這種所以才會產(chǎn)生這種錯覺。

總結(jié)

kafka,zeromq,rabbitmq代表了三種完全不同風(fēng)格的MQ架構(gòu);關(guān)注點(diǎn)完全不同:

  • kafka在乎的是性能,速度

  • rabbitmq追求的是靈活

  • zeromq追求的是輕量級、分布式

如果你拿zeromq來做大數(shù)據(jù)量的傳輸功能,不是生產(chǎn)者的內(nèi)存“爆掉”就是消費(fèi)者被“壓死”;如果你用kafka做通訊總線那絕對的不會快只能更慢;你想要rabbitmq實(shí)現(xiàn)分布式,那真的是難為它。

如何設(shè)計一個消息隊(duì)列

綜述

我們現(xiàn)在明確了消息隊(duì)列的使用場景,下一步就是如何設(shè)計實(shí)現(xiàn)一個消息隊(duì)列了。
Java消息隊(duì)列什么時候使用
基于消息的系統(tǒng)模型,不一定需要broker(消息隊(duì)列服務(wù)端)。市面上的的Akka(actor模型)、ZeroMQ等,其實(shí)都是基于消息的系統(tǒng)設(shè)計范式,但是沒有broker。
我們之所以要設(shè)計一個消息隊(duì)列,并且配備broker,無外乎要做兩件事情:

  1. 消息的轉(zhuǎn)儲,在更合適的時間點(diǎn)投遞,或者通過一系列手段輔助消息最終能送達(dá)消費(fèi)機(jī)。

  2. 規(guī)范一種范式和通用的模式,以滿足解耦、最終一致性、錯峰等需求。
    掰開了揉碎了看,最簡單的消息隊(duì)列可以做成一個消息轉(zhuǎn)發(fā)器,把一次RPC做成兩次RPC。發(fā)送者把消息投遞到服務(wù)端(以下簡稱broker),服務(wù)端再將消息轉(zhuǎn)發(fā)一手到接收端,就是這么簡單。

一般來講,設(shè)計消息隊(duì)列的整體思路是先build一個整體的數(shù)據(jù)流,例如producer發(fā)送給broker,broker發(fā)送給consumer,consumer回復(fù)消費(fèi)確認(rèn),broker刪除/備份消息等。
利用RPC將數(shù)據(jù)流串起來。然后考慮RPC的高可用性,盡量做到無狀態(tài),方便水平擴(kuò)展。
之后考慮如何承載消息堆積,然后在合適的時機(jī)投遞消息,而處理堆積的最佳方式,就是存儲,存儲的選型需要綜合考慮性能/可靠性和開發(fā)維護(hù)成本等諸多因素。
為了實(shí)現(xiàn)廣播功能,我們必須要維護(hù)消費(fèi)關(guān)系,可以利用zk/config server等保存消費(fèi)關(guān)系。
在完成了上述幾個功能后,消息隊(duì)列基本就實(shí)現(xiàn)了。然后我們可以考慮一些高級特性,如可靠投遞,事務(wù)特性,性能優(yōu)化等。
下面我們會以設(shè)計消息隊(duì)列時重點(diǎn)考慮的模塊為主線,穿插灌輸一些消息隊(duì)列的特性實(shí)現(xiàn)方法,來具體分析設(shè)計實(shí)現(xiàn)一個消息隊(duì)列時的方方面面。

實(shí)現(xiàn)隊(duì)列基本功能

RPC通信協(xié)議

剛才講到,所謂消息隊(duì)列,無外乎兩次RPC加一次轉(zhuǎn)儲,當(dāng)然需要消費(fèi)端最終做消費(fèi)確認(rèn)的情況是三次RPC。既然是RPC,就必然牽扯出一系列話題,什么負(fù)載均衡啊、服務(wù)發(fā)現(xiàn)啊、通信協(xié)議啊、序列化協(xié)議啊,等等。在這一塊,我的強(qiáng)烈建議是不要重復(fù)造輪子。利用公司現(xiàn)有的RPC框架:Thrift也好,Dubbo也好,或者是其他自定義的框架也好。因?yàn)橄㈥?duì)列的RPC,和普通的RPC沒有本質(zhì)區(qū)別。當(dāng)然了,自主利用Memchached或者Redis協(xié)議重新寫一套RPC框架并非不可(如MetaQ使用了自己封裝的Gecko NIO框架,卡夫卡也用了類似的協(xié)議)。但實(shí)現(xiàn)成本和難度無疑倍增。排除對效率的極端要求,都可以使用現(xiàn)成的RPC框架。
簡單來講,服務(wù)端提供兩個RPC服務(wù),一個用來接收消息,一個用來確認(rèn)消息收到。并且做到不管哪個server收到消息和確認(rèn)消息,結(jié)果一致即可。當(dāng)然這中間可能還涉及跨IDC的服務(wù)的問題。這里和RPC的原則是一致的,盡量優(yōu)先選擇本機(jī)房投遞。你可能會問,如果producer和consumer本身就在兩個機(jī)房了,怎么辦?首先,broker必須保證感知的到所有consumer的存在。其次,producer盡量選擇就近的機(jī)房就好了。

高可用

其實(shí)所有的高可用,是依賴于RPC和存儲的高可用來做的。先來看RPC的高可用,美團(tuán)的基于MTThrift的RPC框架,阿里的Dubbo等,其本身就具有服務(wù)自動發(fā)現(xiàn),負(fù)載均衡等功能。而消息隊(duì)列的高可用,只要保證broker接受消息和確認(rèn)消息的接口是冪等的,并且consumer的幾臺機(jī)器處理消息是冪等的,這樣就把消息隊(duì)列的可用性,轉(zhuǎn)交給RPC框架來處理了。
那么怎么保證冪等呢?最簡單的方式莫過于共享存儲。broker多機(jī)器共享一個DB或者一個分布式文件/kv系統(tǒng),則處理消息自然是冪等的。就算有單點(diǎn)故障,其他節(jié)點(diǎn)可以立刻頂上。另外failover可以依賴定時任務(wù)的補(bǔ)償,這是消息隊(duì)列本身天然就可以支持的功能。存儲系統(tǒng)本身的可用性我們不需要操太多心,放心大膽的交給DBA們吧!
對于不共享存儲的隊(duì)列,如Kafka使用分區(qū)加主備模式,就略微麻煩一些。需要保證每一個分區(qū)內(nèi)的高可用性,也就是每一個分區(qū)至少要有一個主備且需要做數(shù)據(jù)的同步,關(guān)于這塊HA的細(xì)節(jié),可以參考下篇pull模型消息系統(tǒng)設(shè)計。

服務(wù)端承載消息堆積的能力

消息到達(dá)服務(wù)端如果不經(jīng)過任何處理就到接收者了,broker就失去了它的意義。為了滿足我們錯峰/流控/最終可達(dá)等一系列需求,把消息存儲下來,然后選擇時機(jī)投遞就顯得是順理成章的了。
只是這個存儲可以做成很多方式。比如存儲在內(nèi)存里,存儲在分布式KV里,存儲在磁盤里,存儲在數(shù)據(jù)庫里等等。但歸結(jié)起來,主要有持久化和非持久化兩種。
持久化的形式能更大程度地保證消息的可靠性(如斷電等不可抗外力),并且理論上能承載更大限度的消息堆積(外存的空間遠(yuǎn)大于內(nèi)存)。
但并不是每種消息都需要持久化存儲。很多消息對于投遞性能的要求大于可靠性的要求,且數(shù)量極大(如日志)。這時候,消息不落地直接暫存內(nèi)存,嘗試幾次failover,最終投遞出去也未嘗不可。
市面上的消息隊(duì)列普遍兩種形式都支持。當(dāng)然具體的場景還要具體結(jié)合公司的業(yè)務(wù)來看。

存儲子系統(tǒng)的選擇

我們來看看如果需要數(shù)據(jù)落地的情況下各種存儲子系統(tǒng)的選擇。理論上,從速度來看,文件系統(tǒng)>分布式KV(持久化)>分布式文件系統(tǒng)>數(shù)據(jù)庫,而可靠性卻截然相反。還是要從支持的業(yè)務(wù)場景出發(fā)作出最合理的選擇,如果你們的消息隊(duì)列是用來支持支付/交易等對可靠性要求非常高,但對性能和量的要求沒有這么高,而且沒有時間精力專門做文件存儲系統(tǒng)的研究,DB是最好的選擇。
但是DB受制于IOPS,如果要求單broker 5位數(shù)以上的QPS性能,基于文件的存儲是比較好的解決方案。整體上可以采用數(shù)據(jù)文件+索引文件的方式處理,具體這塊的設(shè)計比較復(fù)雜,可以參考下篇的存儲子系統(tǒng)設(shè)計。
分布式KV(如MongoDB,HBase)等,或者持久化的Redis,由于其編程接口較友好,性能也比較可觀,如果在可靠性要求不是那么高的場景,也不失為一個不錯的選擇。

消費(fèi)關(guān)系解析

現(xiàn)在我們的消息隊(duì)列初步具備了轉(zhuǎn)儲消息的能力。下面一個重要的事情就是解析發(fā)送接收關(guān)系,進(jìn)行正確的消息投遞了。
市面上的消息隊(duì)列定義了一堆讓人暈頭轉(zhuǎn)向的名詞,如JMS 規(guī)范中的Topic/Queue,Kafka里面的Topic/Partition/ConsumerGroup,RabbitMQ里面的Exchange等等。拋開現(xiàn)象看本質(zhì),無外乎是單播與廣播的區(qū)別。所謂單播,就是點(diǎn)到點(diǎn);而廣播,是一點(diǎn)對多點(diǎn)。當(dāng)然,對于互聯(lián)網(wǎng)的大部分應(yīng)用來說,組間廣播、組內(nèi)單播是最常見的情形。
消息需要通知到多個業(yè)務(wù)集群,而一個業(yè)務(wù)集群內(nèi)有很多臺機(jī)器,只要一臺機(jī)器消費(fèi)這個消息就可以了。
當(dāng)然這不是絕對的,很多時候組內(nèi)的廣播也是有適用場景的,如本地緩存的更新等等。另外,消費(fèi)關(guān)系除了組內(nèi)組間,可能會有多級樹狀關(guān)系。這種情況太過于復(fù)雜,一般不列入考慮范圍。所以,一般比較通用的設(shè)計是支持組間廣播,不同的組注冊不同的訂閱。組內(nèi)的不同機(jī)器,如果注冊一個相同的ID,則單播;如果注冊不同的ID(如IP地址+端口),則廣播。
至于廣播關(guān)系的維護(hù),一般由于消息隊(duì)列本身都是集群,所以都維護(hù)在公共存儲上,如config server、zookeeper等。維護(hù)廣播關(guān)系所要做的事情基本是一致的:

  1. 發(fā)送關(guān)系的維護(hù)。

  2. 發(fā)送關(guān)系變更時的通知。

隊(duì)列高級特性設(shè)計

上面都是些消息隊(duì)列基本功能的實(shí)現(xiàn),下面來看一些關(guān)于消息隊(duì)列特性相關(guān)的內(nèi)容,不管可靠投遞/消息丟失與重復(fù)以及事務(wù)乃至于性能,不是每個消息隊(duì)列都會照顧到,所以要依照業(yè)務(wù)的需求,來仔細(xì)衡量各種特性實(shí)現(xiàn)的成本,利弊,最終做出最為合理的設(shè)計。

可靠投遞(最終一致性)

這是個激動人心的話題,完全不丟消息,究竟可不可能?答案是,完全可能,前提是消息可能會重復(fù),并且,在異常情況下,要接受消息的延遲。
方案說簡單也簡單,就是每當(dāng)要發(fā)生不可靠的事情(RPC等)之前,先將消息落地,然后發(fā)送。當(dāng)失敗或者不知道成功失?。ū热绯瑫r)時,消息狀態(tài)是待發(fā)送,定時任務(wù)不停輪詢所有待發(fā)送消息,最終一定可以送達(dá)。
具體來說:

  1. producer往broker發(fā)送消息之前,需要做一次落地。

  2. 請求到server后,server確保數(shù)據(jù)落地后再告訴客戶端發(fā)送成功。

  3. 支持廣播的消息隊(duì)列需要對每個待發(fā)送的endpoint,持久化一個發(fā)送狀態(tài),直到所有endpoint狀態(tài)都OK才可刪除消息。

對于各種不確定(超時、down機(jī)、消息沒有送達(dá)、送達(dá)后數(shù)據(jù)沒落地、數(shù)據(jù)落地了回復(fù)沒收到),其實(shí)對于發(fā)送方來說,都是一件事情,就是消息沒有送達(dá)。
重推消息所面臨的問題就是消息重復(fù)。重復(fù)和丟失就像兩個噩夢,你必須要面對一個。好在消息重復(fù)還有處理的機(jī)會,消息丟失再想找回就難了。
Anyway,作為一個成熟的消息隊(duì)列,應(yīng)該盡量在各個環(huán)節(jié)減少重復(fù)投遞的可能性,不能因?yàn)橹貜?fù)有解決方案就放縱的亂投遞。
最后說一句,不是所有的系統(tǒng)都要求最終一致性或者可靠投遞,比如一個論壇系統(tǒng)、一個招聘系統(tǒng)。一個重復(fù)的簡歷或話題被發(fā)布,可能比丟失了一個發(fā)布顯得更讓用戶無法接受。不斷重復(fù)一句話,任何基礎(chǔ)組件要服務(wù)于業(yè)務(wù)場景。

消費(fèi)確認(rèn)

當(dāng)broker把消息投遞給消費(fèi)者后,消費(fèi)者可以立即響應(yīng)我收到了這個消息。但收到了這個消息只是第一步,我能不能處理這個消息卻不一定。或許因?yàn)橄M(fèi)能力的問題,系統(tǒng)的負(fù)荷已經(jīng)不能處理這個消息;或者是剛才狀態(tài)機(jī)里面提到的消息不是我想要接收的消息,主動要求重發(fā)。
把消息的送達(dá)和消息的處理分開,這樣才真正的實(shí)現(xiàn)了消息隊(duì)列的本質(zhì)-解耦。所以,允許消費(fèi)者主動進(jìn)行消費(fèi)確認(rèn)是必要的。當(dāng)然,對于沒有特殊邏輯的消息,默認(rèn)Auto Ack也是可以的,但一定要允許消費(fèi)方主動ack。
對于正確消費(fèi)ack的,沒什么特殊的。但是對于reject和error,需要特別說明。reject這件事情,往往業(yè)務(wù)方是無法感知到的,系統(tǒng)的流量和健康狀況的評估,以及處理能力的評估是一件非常復(fù)雜的事情。舉個極端的例子,收到一個消息開始build索引,可能這個消息要處理半個小時,但消息量卻是非常的小。所以reject這塊建議做成滑動窗口/線程池類似的模型來控制,
消費(fèi)能力不匹配的時候,直接拒絕,過一段時間重發(fā),減少業(yè)務(wù)的負(fù)擔(dān)。
但業(yè)務(wù)出錯這件事情是只有業(yè)務(wù)方自己知道的,就像上文提到的狀態(tài)機(jī)等等。這時應(yīng)該允許業(yè)務(wù)方主動ack error,并可以與broker約定下次投遞的時間。

重復(fù)消息和順序消息

上文談到重復(fù)消息是不可能100%避免的,除非可以允許丟失,那么,順序消息能否100%滿足呢? 答案是可以,但條件更為苛刻:

  1. 允許消息丟失。

  2. 從發(fā)送方到服務(wù)方到接受者都是單點(diǎn)單線程。

所以絕對的順序消息基本上是不能實(shí)現(xiàn)的,當(dāng)然在METAQ/Kafka等pull模型的消息隊(duì)列中,單線程生產(chǎn)/消費(fèi),排除消息丟失,也是一種順序消息的解決方案。
一般來講,一個主流消息隊(duì)列的設(shè)計范式里,應(yīng)該是不丟消息的前提下,盡量減少重復(fù)消息,不保證消息的投遞順序。
談到重復(fù)消息,主要是兩個話題:

  1. 如何鑒別消息重復(fù),并冪等的處理重復(fù)消息。

  2. 一個消息隊(duì)列如何盡量減少重復(fù)消息的投遞。

先來看看第一個話題,每一個消息應(yīng)該有它的唯一身份。不管是業(yè)務(wù)方自定義的,還是根據(jù)IP/PID/時間戳生成的MessageId,如果有地方記錄這個MessageId,消息到來是能夠進(jìn)行比對就
能完成重復(fù)的鑒定。數(shù)據(jù)庫的唯一鍵/bloom filter/分布式KV中的key,都是不錯的選擇。由于消息不能被永久存儲,所以理論上都存在消息從持久化存儲移除的瞬間上游還在投遞的可能(上游因種種原因投遞失敗,不停重試,都到了下游清理消息的時間)。這種事情都是異常情況下才會發(fā)生的,畢竟是小眾情況。兩分鐘消息都還沒送達(dá),多送一次又能怎樣呢?冪等的處理消息是一門藝術(shù),因?yàn)榉N種原因重復(fù)消息或者錯亂的消息還是來到了,說兩種通用的解決方案:

  1. 版本號。

  2. 狀態(tài)機(jī)。

事務(wù)

持久性是事務(wù)的一個特性,然而只滿足持久性卻不一定能滿足事務(wù)的特性。還是拿扣錢/加錢的例子講。滿足事務(wù)的一致性特征,則必須要么都不進(jìn)行,要么都能成功。
解決方案從大方向上有兩種:

  1. 兩階段提交,分布式事務(wù)。

  2. 本地事務(wù),本地落地,補(bǔ)償發(fā)送。

分布式事務(wù)存在的大問題是成本太高,兩階段提交協(xié)議,對于仲裁down機(jī)或者單點(diǎn)故障,幾乎是一個無解的黑洞。對于交易密集型或者I/O密集型的應(yīng)用,沒有辦法承受這么高的網(wǎng)絡(luò)延遲,系統(tǒng)復(fù)雜性。
并且成熟的分布式事務(wù)一定構(gòu)建與比較靠譜的商用DB和商用中間件上,成本也太高。
那如何使用本地事務(wù)解決分布式事務(wù)的問題呢?以本地和業(yè)務(wù)在一個數(shù)據(jù)庫實(shí)例中建表為例子,與扣錢的業(yè)務(wù)操作同一個事務(wù)里,將消息插入本地數(shù)據(jù)庫。如果消息入庫失敗,則業(yè)務(wù)回滾;如果消息入庫成功,事務(wù)提交。
然后發(fā)送消息(注意這里可以實(shí)時發(fā)送,不需要等定時任務(wù)檢出,以提高消息實(shí)時性)。以后的問題就是前文的最終一致性問題所提到的了,只要消息沒有發(fā)送成功,就一直靠定時任務(wù)重試。
這里有一個關(guān)鍵的點(diǎn),本地事務(wù)做的,是業(yè)務(wù)落地和消息落地的事務(wù),而不是業(yè)務(wù)落地和RPC成功的事務(wù)。這里很多人容易混淆,如果是后者,無疑是事務(wù)嵌套RPC,是大忌,會有長事務(wù)死鎖等各種風(fēng)險。
而消息只要成功落地,很大程度上就沒有丟失的風(fēng)險(磁盤物理損壞除外)。而消息只要投遞到服務(wù)端確認(rèn)后本地才做刪除,就完成了producer->broker的可靠投遞,并且當(dāng)消息存儲異常時,業(yè)務(wù)也是可以回滾的。
本地事務(wù)存在兩個大的使用障礙:

  1. 配置較為復(fù)雜,“綁架”業(yè)務(wù)方,必須本地數(shù)據(jù)庫實(shí)例提供一個庫表。

  2. 對于消息延遲高敏感的業(yè)務(wù)不適用。

話說回來,不是每個業(yè)務(wù)都需要強(qiáng)事務(wù)的。扣錢和加錢需要事務(wù)保證,但下單和生成短信卻不需要事務(wù),不能因?yàn)橐蟀l(fā)短信的消息存儲投遞失敗而要求下單業(yè)務(wù)回滾。所以,一個完整的消息隊(duì)列應(yīng)該定義清楚自己可以投遞的消息類型,如事務(wù)型消息,本地非持久型消息,以及服務(wù)端不落地的非可靠消息等。對不同的業(yè)務(wù)場景做不同的選擇。另外事務(wù)的使用應(yīng)該盡量低成本、透明化,可以依托于現(xiàn)有的成熟框架,如Spring的聲明式事務(wù)做擴(kuò)展。業(yè)務(wù)方只需要使用 @Transactional標(biāo)簽即可。

性能相關(guān)

異步/同步

首先澄清一個概念,異步,同步和oneway是三件事。異步,歸根結(jié)底你還是需要關(guān)心結(jié)果的,但可能不是當(dāng)時的時間點(diǎn)關(guān)心,可以用輪詢或者回調(diào)等方式處理結(jié)果;同步是需要當(dāng)時關(guān)心
的結(jié)果的;而oneway是發(fā)出去就不管死活的方式,這種對于某些完全對可靠性沒有要求的場景還是適用的,但不是我們重點(diǎn)討論的范疇。
回歸來看,任何的RPC都是存在客戶端異步與服務(wù)端異步的,而且是可以任意組合的:客戶端同步對服務(wù)端異步,客戶端異步對服務(wù)端異步,客戶端同步對服務(wù)端同步,客戶端異步對服務(wù)端同步。
對于客戶端來說,同步與異步主要是拿到一個Result,還是Future(Listenable)的區(qū)別。實(shí)現(xiàn)方式可以是線程池,NIO或者其他事件機(jī)制,這里先不展開講。
服務(wù)端異步可能稍微難理解一點(diǎn),這個是需要RPC協(xié)議支持的。參考servlet 3.0規(guī)范,服務(wù)端可以吐一個future給客戶端,并且在future done的時候通知客戶端。
整個過程可以參考下面的代碼:

客戶端同步服務(wù)端異步。

Future future = request(server);//server立刻返回futuresynchronized(future){while(!future.isDone()){   future.wait();//server處理結(jié)束后會notify這個future,并修改isdone標(biāo)志}}return future.get();

客戶端同步服務(wù)端同步。

Result result = request(server);

客戶端異步服務(wù)端同步(這里用線程池的方式)。

Future future = executor.submit(new Callable(){public void call(){    result = request(server);}})return future;

客戶端異步服務(wù)端異步。

Future future = request(server);//server立刻返回future return future

上面說了這么多,其實(shí)是想讓大家脫離兩個誤區(qū):

  1. RPC只有客戶端能做異步,服務(wù)端不能。

  2. 異步只能通過線程池。

那么,服務(wù)端使用異步大的好處是什么呢?說到底,是解放了線程和I/O。試想服務(wù)端有一堆I/O等待處理,如果每個請求都需要同步響應(yīng),每條消息都需要結(jié)果立刻返回,那么就幾乎沒法做I/O合并
(當(dāng)然接口可以設(shè)計成batch的,但可能batch發(fā)過來的仍然數(shù)量較少)。而如果用異步的方式返回給客戶端future,就可以有機(jī)會進(jìn)行I/O的合并,把幾個批次發(fā)過來的消息一起落地(這種合并對于MySQL等允許batch insert的數(shù)據(jù)庫效果尤其明顯),并且徹底釋放了線程。不至于說來多少請求開多少線程,能夠支持的并發(fā)量直線提高。
來看第二個誤區(qū),返回future的方式不一定只有線程池。換句話說,可以在線程池里面進(jìn)行同步操作,也可以進(jìn)行異步操作,也可以不使用線程池使用異步操作(NIO、事件)。
回到消息隊(duì)列的議題上,我們當(dāng)然不希望消息的發(fā)送阻塞主流程(前面提到了,server端如果使用異步模型,則可能因消息合并帶來一定程度上的消息延遲),所以可以先使用線程池提交一個發(fā)送請求,主流程繼續(xù)往下走。
但是線程池中的請求關(guān)心結(jié)果嗎?Of course,必須等待服務(wù)端消息成功落地,才算是消息發(fā)送成功。所以這里的模型,準(zhǔn)確地說事客戶端半同步半異步(使用線程池不阻塞主流程,但線程池中的任務(wù)需要等待server端的返回),server端是純異步??蛻舳说木€程池wait在server端吐回的future上,直到server端處理完畢,才解除阻塞繼續(xù)進(jìn)行。

總結(jié)一句,同步能夠保證結(jié)果,異步能夠保證效率,要合理的結(jié)合才能做到最好的效率。

push還是pull

上文提到的消息隊(duì)列,大多是針對push模型的設(shè)計?,F(xiàn)在市面上有很多經(jīng)典的也比較成熟的pull模型的消息隊(duì)列,如Kafka、MetaQ等。這跟JMS中傳統(tǒng)的push方式有很大的區(qū)別,可謂另辟蹊徑。
我們簡要分析下push和pull模型各自存在的利弊。

慢消費(fèi)

慢消費(fèi)無疑是push模型大的致命傷,穿成流水線來看,如果消費(fèi)者的速度比發(fā)送者的速度慢很多,勢必造成消息在broker的堆積。假設(shè)這些消息都是有用的無法丟棄的,消息就要一直在broker端保存。當(dāng)然這還不是最致命的,最致命的是broker給consumer推送一堆consumer無法處理的消息,consumer不是reject就是error,然后來回踢皮球。
反觀pull模式,consumer可以按需消費(fèi),不用擔(dān)心自己處理不了的消息來騷擾自己,而broker堆積消息也會相對簡單,無需記錄每一個要發(fā)送消息的狀態(tài),只需要維護(hù)所有消息的隊(duì)列和偏移量就可以了。所以對于建立索引等慢消費(fèi),消息量有限且到來的速度不均勻的情況,pull模式比較合適。

消息延遲與忙等

這是pull模式大的短板。由于主動權(quán)在消費(fèi)方,消費(fèi)方無法準(zhǔn)確地決定何時去拉取最新的消息。如果一次pull取到消息了還可以繼續(xù)去pull,如果沒有pull取到則需要等待一段時間重新pull。
但等待多久就很難判定了。你可能會說,我可以有xx動態(tài)pull取時間調(diào)整算法,但問題的本質(zhì)在于,有沒有消息到來這件事情決定權(quán)不在消費(fèi)方。也許1分鐘內(nèi)連續(xù)來了1000條消息,然后半個小時沒有新消息產(chǎn)生,
可能你的算法算出下次最有可能到來的時間點(diǎn)是31分鐘之后,或者60分鐘之后,結(jié)果下條消息10分鐘后到了,是不是很讓人沮喪?
當(dāng)然也不是說延遲就沒有解決方案了,業(yè)界較成熟的做法是從短時間開始(不會對broker有太大負(fù)擔(dān)),然后指數(shù)級增長等待。比如開始等5ms,然后10ms,然后20ms,然后40ms……直到有消息到來,然后再回到5ms。
即使這樣,依然存在延遲問題:假設(shè)40ms到80ms之間的50ms消息到來,消息就延遲了30ms,而且對于半個小時來一次的消息,這些開銷就是白白浪費(fèi)的。
在阿里的RocketMq里,有一種優(yōu)化的做法-長輪詢,來平衡推拉模型各自的缺點(diǎn)?;舅悸肥?消費(fèi)者如果嘗試?yán)∈?,不是直接return,而是把連接掛在那里wait,服務(wù)端如果有新的消息到來,把連接notify起來,這也是不錯的思路。但海量的長連接block對系統(tǒng)的開銷還是不容小覷的,還是要合理的評估時間間隔,給wait加一個時間上限比較好~

到此,相信大家對“Java消息隊(duì)列什么時候使用”有了更深的了解,不妨來實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!


本文題目:Java消息隊(duì)列什么時候使用-創(chuàng)新互聯(lián)
鏈接地址:http://weahome.cn/article/dgppji.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部