RocketMQ消息隊列,專業(yè)消息中間件,既可為分布式應(yīng)用系統(tǒng)提供異步解耦和削峰填谷的能力,同時也具備互聯(lián)網(wǎng)應(yīng)用所需的海量消息堆積、高吞吐、可靠重試等特性,是應(yīng)對企業(yè)業(yè)務(wù)峰值時刻必備的技術(shù)。
目前創(chuàng)新互聯(lián)建站已為上1000家的企業(yè)提供了網(wǎng)站建設(shè)、域名、虛擬主機、網(wǎng)站托管、服務(wù)器租用、企業(yè)網(wǎng)站設(shè)計、儋州網(wǎng)站維護等服務(wù),公司將堅持客戶導向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長,共同發(fā)展。
云片由于業(yè)務(wù)特點,對消息隊列的使用十分頻繁,由此云片服務(wù)號從本期推文開始將發(fā)布“云片RocketMQ實戰(zhàn)”系列文章,講述云片根據(jù)短信業(yè)務(wù)的特點,運用RocketMQ消息隊列實戰(zhàn)經(jīng)驗。
本期推文《Stargate的前世今生》,由云片資深Java開發(fā)工程師周凱帆提供。云片由于其業(yè)務(wù)特點對應(yīng)消息隊列的使用十分頻繁,這里以云片短信業(yè)務(wù)為例,短信業(yè)務(wù)的邏輯十分簡單,我們只看主流程,本質(zhì)就是接受用戶請求,尋找合適的通道,使用cmpp/smpp協(xié)議提交給運營商。
我們可以發(fā)現(xiàn),用戶的每次請求要一直等待運營商的響應(yīng),這樣主要的問題是:
云片的服務(wù)器再運營商返回時需要一直維護http連接
運營商的處理速率直接限制了云片的處理速率
云片的最大并發(fā)為所有供應(yīng)商并發(fā)之和
這種情況下我們無法提供穩(wěn)定的服務(wù)。
實際上對于用戶來說并不關(guān)心具體流程他們只需將短信提交給云片即可,所以我們可以異步的處理這些發(fā)送過程,確認收到短信后就可以給用戶返回結(jié)果以提高響應(yīng)速度。
日常情況我們的系統(tǒng)流量會是一個比較平穩(wěn)的值X,所以我們提供能滿足的當前流量的消費能力,這樣消息就不會積壓。
不過隨著流量的增加最終實踐流量會超過我們的消費能力,這樣就會出現(xiàn)短信送達延遲,圖中虛線右邊是我們不希望看到的。
所以我們在流量到達虛線前提高系統(tǒng)的消費能力。
我們可以看到云片對應(yīng)消息隊列的重度依賴,使得在微服務(wù)化的時候沒有找到一個適合云片的好用的annotation組件,當時SpringCloud框架并沒有對RocketMQ支持的相關(guān)組件,而RocketMQ官方github上僅有一個不成熟的項目。
對于目前重度依賴RocketMQ的短信業(yè)務(wù)我們需要一個簡單易用并且能夠與我們老項目中代碼兼容的注解,同時還要滿足各團隊的不同需求,于是我們開始一個名為Stargate的組件來支持我們后續(xù)的服務(wù)化推進。
@StargateProducer
public
?interface
?TestProducter {
????
@StargateMapper
(
"testaaa"
)
????
SendResult test(
@StargateBody
?TestVO message);
}
@StargateConsumer public ?class ?TestConsumer { ???? @StargateMapper ( "testaaa" ) ???? public ?void ?test( @StargateBody ?TestVO message){ ???????? //TODO ???? } } |
Stargate組件為什么出現(xiàn)?
事實上在有Stargate之前我們的項目中有一個對RocketMQ SDK的封裝,它確實解決的許多問題,但是面對越來越多的生產(chǎn)者和消費者我們越來越難以維護,甚至于碰到不熟悉的代碼我曾經(jīng)花了半小時去找一個生產(chǎn)者的消費者在哪里,這個消費者被各種繼承重寫,生產(chǎn)者的topic是以一種極其復雜的規(guī)則生成的,而且各種配置文件散落在代碼的各各角落。
然后舊的組件也十分難以遷移到新的微服務(wù)項目中,以至于有的業(yè)務(wù)線開始自己從新封裝一套組件,而且他們生產(chǎn)者的消息難以被其他團隊的消費。于是我希望做一套能夠避免這些不良使用習慣的組件,并且提供強大的兼容性讓大家遷移過來。
設(shè)計Stargate的目標是?
所以在設(shè)計Stargate的時候主要考慮以下幾點:
簡單易用
注解的方式相比直接使用RocketMQ更加清晰,上手更快,更易用,避免各種不良寫法。
擴展性強
SG1提供的擴展插件能夠豐富Stargate的功能,而且這個插件的開發(fā)能力是開放的,后續(xù)會提到。實際上插件功能是在2.0增加應(yīng)為我發(fā)現(xiàn)在限制大家的使用方式后,我需要定制許多的注解來兼容各種使用場景,于是我開放了這部分能力讓大家選擇性的開發(fā)和使用自己需要的功能。
兼容各種老項目
通過編解碼器我們可以兼容各種不同的老項目,不需要修改老項目的代碼。
單元測試更方便
另外再單元測試和開發(fā)階段,無需對外部依賴可以方便的進行mock。
Stargate組件的價值
最初的時候我簡單的認為這個組件的價值在于提供了一個更方便使用RocketMQ的方式,但后續(xù)的開發(fā)中慢慢的我發(fā)現(xiàn)并不是這樣,目前來說我認為最有價值的兩點在于:
服務(wù)間異步調(diào)用的“規(guī)范”
由于它的擴展性和兼容性被各各業(yè)務(wù)線團隊采用,似乎成了一個“規(guī)范”,服務(wù)之間的通信多了一種可選項。我們可以把自己的StargateProducer接口定義放在一個jar包中提供給其他人
“使用心得”的分享中心
插件的開發(fā)能力使得大家會把自己的使用模式封裝成一個注解發(fā)布出來,這樣許多十分巧妙的使用方法會被發(fā)布出來,而且這些都是開箱即用的,使用者只需要知道這個注解能實現(xiàn)什么,在發(fā)布要求上讓開發(fā)者符上文檔,這樣就能形成一個生態(tài),把大家的經(jīng)驗使用沉淀下來。
另外,Stargate對于代碼結(jié)構(gòu)方面的幫助也是巨大的,現(xiàn)在我們可以很快速的找到一個生產(chǎn)者的消費者在什么地方,后續(xù)甚至考慮提供IDE插件來更好的維護這些代碼。
Stargate的初始化
那么我們接下來,看一下Stargate的這些設(shè)計目標是如何實現(xiàn)的,首先我們來看一下組件的入口,我們?nèi)绾纬跏蓟疭targate的。
在應(yīng)用啟動時我們處理部分注解獲得一個Bean的配置信息,然后向Spring注冊這些Bean,我們?yōu)镻roducer生成代理類,創(chuàng)建Consumer客戶端監(jiān)聽消息并且調(diào)用StargateConsumer處理這些消息。
生成這些的Bean的入口是從一個工廠類開始的,通常一個StargateProducer|StargateConsumer的創(chuàng)建會經(jīng)過以下幾個步驟:
事實上Stargate并不負責初始化這些生產(chǎn)者消費者bean,Stargate僅僅提供了創(chuàng)建的過程,我們把這些bean注冊到Spring中然后提供一個工廠方法,由spring在適當?shù)臅r候創(chuàng)建這些bean,維護這些bean。
這樣我們在spring中就有了這些生產(chǎn)者接口的實例,我們可以把他注入到任何地方然后使用它們發(fā)送消息。監(jiān)聽他的消費者就會調(diào)用事先配置好的StargateConsumer。
編解碼器
每個發(fā)送者和消費者都會在收發(fā)消息前進行編解碼,這也是兼容原有項目的關(guān)鍵。大家可以思考一下,所有項目的都是使用RocketMQ的,本質(zhì)的直接調(diào)用SDK的Send方法就能發(fā)送消息,但是老項目對于如何將一個消息變成二進制數(shù)組這是不一樣的,所以我們提供編解碼器的接口讓大家可以替換這些轉(zhuǎn)換過程的實現(xiàn)。
擴展接口
但是消息的編解碼只能實現(xiàn)兼容性但是對于擴展能力的需求無法滿足,所以我們再初始化過程和發(fā)送消費過程中抽象出了6個接口,讓用戶可以擴展自己的邏輯。
這個些接口主要用于處理”注解解析“,”RocketMQ Client創(chuàng)建“,”消息加工“,我們可以從下圖中看到,工廠在返回一個bean前會調(diào)用這些接口的實現(xiàn)。消息收發(fā)階段也會調(diào)用相應(yīng)的實現(xiàn)。
通常情況下我們實現(xiàn)一個新的注解@DemoModel有這么幾個流程:
實現(xiàn)注解處理器處理注解數(shù)據(jù)
實現(xiàn)Client處理器,根據(jù)注解解析的數(shù)據(jù)加工Client
實現(xiàn)消息處理器,根據(jù)注解解析的數(shù)據(jù)加工消息
上下文
這時會有另一個問題,我們通常的流程在解析注解獲得的數(shù)據(jù)需要保存給另外兩個處理器使用,我們當然不希望讓用戶自己處理這些數(shù)據(jù),這會增加用戶的使用成本。
于是我們定義了一個上下文的概念,上下文有這幾個特點:
每個生成消費者有自己的上下文
上下文是會被繼承的
舉一個例子,假如我們現(xiàn)在所有的生產(chǎn)者的topic加上一個公共前綴,那么我們只需要在生產(chǎn)者根上下文中的topic加上這個前綴的內(nèi)容,所有的生產(chǎn)者都會有這個前綴。
事實上在Stargate2.0開始提供擴展功能后,我停止了對core項目的功能迭代,所有的新功能以插件方式在一個名為SG1的項目的發(fā)布,而每個用戶也可以擴展自己的插件上傳到SG1中,以此期望形成一個生態(tài)。
另外在云片國際版YCloud上線之后我們又有一些新的挑戰(zhàn),YCloud主要服務(wù)海外客戶我們的服務(wù)器并不在國內(nèi),但是我們的消息需要提交回國內(nèi)節(jié)點消費,而且MQ都部署在國內(nèi),從香港節(jié)點到國內(nèi)節(jié)點的延時讓人無法接受,因為這個延時是用戶能感知到的,所以我的目標是把這個延時交給消費者來承擔。
如果簡單的在部署一套集群或許實施起來是最快的,但是這樣成本非常大,而且消費者1和消費者2的負載通常是不均衡的,如果有了第三的機房,難道每個消費者都要部署3套?
所以我們的方向是在香港節(jié)點部署一個broker,重寫客戶端的隊列選擇器,讓生產(chǎn)者找出離自己最近的broker中的隊列,而消費者消費所有隊列。
其實這里對應(yīng)消費者也是一樣的,我們可以通過改變消費者的客戶端負載均衡器來消費指定的隊列,這樣我們就能實現(xiàn)一個隔離的環(huán)境。
在后續(xù)的發(fā)展中我們將消費者使用同樣的方式去指定消費指定的broker上的隊列,于是就形成了圖中這樣的隔離的環(huán)境,這樣做的好處是:
多個環(huán)境隔離正常消息不互通,達到隔離的目標
多個環(huán)境的broker仍然是一體的,如果一個消費者出現(xiàn)故障,另一個消費者可以代替
如果發(fā)現(xiàn)消費者1出現(xiàn)異常的話,可以臨時讓消費者2代替消費者1的工作保證功能正常,但是目前來說這部分功能仍然無法實現(xiàn),應(yīng)為我們需求有一個指揮中心告訴消費者2去消費環(huán)境1的消息。
為了實現(xiàn)生產(chǎn)者消費者之間的協(xié)作,我們需要一個指揮中心,去收集和協(xié)調(diào)全部Stargate應(yīng)用的工作。
StargateCommand會充當一個指揮中心,但是它只是一個協(xié)調(diào)機制,如果沒有它Stargate應(yīng)用仍然會按照其原型設(shè)定的方式去運行。