這篇文章主要介紹“MQ怎么將多消息合并為一條消息發(fā)送”,在日常操作中,相信很多人在MQ怎么將多消息合并為一條消息發(fā)送問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”MQ怎么將多消息合并為一條消息發(fā)送”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
創(chuàng)新互聯(lián)建站從2013年成立,是專業(yè)互聯(lián)網技術服務公司,擁有項目做網站、成都網站建設網站策劃,項目實施與項目整合能力。我們以讓每一個夢想脫穎而出為使命,1280元南縣做網站,已為上家服務,為南縣各地企業(yè)和個人服務,聯(lián)系電話:18982081108
為什么要將多消息合并為一個消息發(fā)送?
前面也說了,為了節(jié)約成本。以每分鐘50w的廣告點擊數來算,一個月將產生50*60*24*31w的點擊消息,再乘以3就是每個月的sqs請求數,3代表的是發(fā)送消息、拉取消息、刪除消息,按每100w請求0.4美刀的價格計算大概一個月要26784美刀。
由于sqs限制單條消息的大小最大為256k,根據業(yè)務場景估算每點擊消息也不可能達到1k,,所以我將256個請求合并為一個消息發(fā)送,或者1s內未達到256個消息也合并為一個消息發(fā)送,這樣每月的費用可以直接除以256,這不是一個小數目。
什么樣的業(yè)務場景下才適合這么干?
將大量消息合并為一個消息后會導致消息消費失去原子性。你無法保證原本是256個消息的合并為一個消息后,這256個消息能全部消費成功或者全部消費失敗,因此要求業(yè)務必須允許消息消費失敗直接丟棄的情況。無論多少個成功多少個失敗,都需要將整條消息從mq中刪除。筆者考慮過這個問題才決定是否要這樣做的,也考慮過失敗重試的問題,但我覺得沒必要為這種概率買單,因為一個點擊在非異步的情況下,失敗就是失敗了。
如何將大量消息合并為一條消息發(fā)送而不影響服務的高并發(fā)性能呢?
其實不影響是不存在的,只是讓影響變得微弱。經過長時間的觀察,我了解該高并發(fā)服務對內存的消耗并不高,最大qps下也就消耗1.5g左右的堆內存,而netty使用的直接內存大概在2g這樣,對于2核8g的機器,有足夠多的內存來實現隊列緩存消息。
我借簽Dubbo的客戶端與服務端配置多個連接時使用輪詢方式使用連接,同時也借簽了netty的EventLoop的設計,實現消息合并發(fā)送。我定義一個MesaageLoopGroup,一個MesaageLoopGroup可以配置有多少個MesaageLooper,而每個MesaageLooper就是一個線程,且維護一個阻塞隊列,默認隊列大小是102400,這個數字是我配置單個進程所能打開的最大文件句柄數。
當往MesaageLoopGroup push一個點擊消息時,先用原子類自增1與MesaageLooper數組的長度取余,選出一個MesaageLooper。然后再將消息push到這個MesaageLooper的阻塞隊列。
每個MesaageLooper的run方法實現的就是一個死循環(huán),從阻塞隊列中拿消息,當消息等于256時,或者阻塞超過1s就將拿到的消息合并成一個消息發(fā)送到mq。如果阻塞隊列滿,那么push會直接將消息發(fā)送到mq。因此,服務重啟時如果使用kill 9強行結束進程,至多只會有1s的數據丟失。設置1s還有一個原因就是控制消息的實時性。
灰度上線測試一天后也證明此方案對服務的影響并不大,無論是gc還是內存占用,都看不出加了這么一層邏輯。1s的平均請求按50w計算,四臺機器分擔,每個服務的每秒請求數平均是2000。
為何用golang實現消費者?
然而消息的消費并不順利。一個是因為消息消費我用了golang實現,我也是剛入門,寫起代碼來還感覺別扭,二是一個消息是由原本256個消息組合而成的問題。
使用golang其實是有原因的。原本計劃是讓消費者占用較小的內存,以實現將消費者寄生在其它服務所在的機器上,充分利用其它耗內存而cpu利用率低的服務所在的機器。同時利用docker實現快速部署,讓docker 的鏡像更小,不需要安裝jdk什么的。還有就是利用go的協(xié)程并發(fā)處理能力吧,讓消費者消費消息的速度能趕上消息的產生速度。
為入門golang買單
為了便于理解,我還是以java的線程池來說明。假設我配置的線程池線程數量是512。寄生在其它服務的機器上需要給主人點面子,不能把人家的cpu全部吃完,導致主服務不可用,所以線程的數量結合消息的消費情況綜合考慮,不能超過一半的cpu使用率,而選擇512這個數量。
Sqs支持一次拉取多條消息,并且有一個可見性超時的特性,當消息被消費者拉取到之后,在多長時間內未刪除,下次可能還會被拉取到,或者其它消費者還能拉取到。最初我設置的可見性超時是60s。
一開始我開啟5個線程拉取消息,每次最多拉取10條消息。那么很可能同一時間內會拉取到50條消息。由于一條消息是由原本256條消息合并而成的,所以512個線程同一時間段至多只能消費2條消息,而一條消息(合并后的)的消費平均耗時是10s,也就是說一分鐘內最多消費12條消息,其它38條消息在一分鐘后會被其它消費者拉取到,所以就會出現大量消息重復消費的情況,久而久之,消息越積累越多。
我用golang的channel實現生產者與消費者,channel的大小可設置,當channel滿時,拉取到的消息是放不進channel的,因此會將拉取線程阻塞住,只有消費者從 channel取數據才能繼續(xù)放入。但阻塞的那段時間要小于消息的可見性超時,因為消息只有在開始消費時我才會將其從mq中刪除。
后面的改進就是根據消費能力去調整消息的拉取線程數,以及每次拉取的消息數。還有一點要注意,為保證時刻有消息準備就緒開始消費,最好不要讓消息消費完再從mq中拉取。但這也會導致另一個問題,一些消息拉取到本地后,由于channel已滿,放不進,而其它空閑消費節(jié)點又拉不到,導致消息被消費到的時間延長。這就需要作出取舍。
到此,關于“MQ怎么將多消息合并為一條消息發(fā)送”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注創(chuàng)新互聯(lián)網站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
分享標題:MQ怎么將多消息合并為一條消息發(fā)送
轉載來于:http://weahome.cn/article/ggeiis.html