小編給大家分享一下RocketMQ事務消息的示例分析,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
創(chuàng)新互聯(lián)是專業(yè)的朝陽網(wǎng)站建設公司,朝陽接單;提供成都網(wǎng)站建設、做網(wǎng)站,網(wǎng)頁設計,網(wǎng)站設計,建網(wǎng)站,PHP網(wǎng)站建設等專業(yè)做網(wǎng)站服務;采用PHP框架,可快速的進行朝陽網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團隊,希望更多企業(yè)前來合作!
一、大事務 =小事務 + 異步
我們以一個轉(zhuǎn)帳的場景為例來說明這個問題,Bob向Smith轉(zhuǎn)賬100塊。這個列子在瓜子也有很多實際場景映射,如:車源狀態(tài)變化,訂單狀態(tài)變化,金融放款,物流運輸……
在單機環(huán)境下,執(zhí)行事務的情況,大概是下面這個樣子
當用戶增長到一定程度,Bob和Smith的賬戶及余額信息已經(jīng)不在同一臺服務器上了,那么上面的流程就變成了這樣
這時候你會發(fā)現(xiàn),同樣是一個轉(zhuǎn)賬的業(yè)務,在集群環(huán)境下,耗時居然成倍的增長,這顯然是不能夠接受的。而且跨網(wǎng)絡調(diào)用的事務需要解決網(wǎng)絡不穩(wěn)定的因素,直接放到業(yè)務代碼里控制,成本很高。那如何來規(guī)避這個問題?
大事務 =小事務 + 異步
將大事務拆分成多個小事務異步執(zhí)行。這樣基本上能夠?qū)⒖鐧C事務的執(zhí)行效率優(yōu)化到與單機一致。轉(zhuǎn)賬的事務就可以分解成如下兩個小事務
圖中執(zhí)行本地事務(Bob賬戶扣款)和發(fā)送異步消息應該保證同時成功或者同時失敗,也就是扣款成功了,發(fā)送消息一定要成功,如果扣款失敗了,就不能再發(fā)送消息。
二、什么是事務消息(Transactional message)
RocketMQ官方是這樣定義的??梢詫⑵湟暈閮呻A段提交消息實現(xiàn),以確保分布式系統(tǒng)中的最終一致性。事務性消息確保可以原子方式執(zhí)行本地事務的執(zhí)行和消息的發(fā)送。
1、事務狀態(tài)
事務消息有3種狀態(tài)
(1)TransactionStatus.CommitTransaction,提交事務,表示允許消費者消費(使用)這條消息
(2)TransactionStatus.RollbackTransaction,回滾事務,表示消息將被刪除,不允許使用
(3)TransactionStatus.Unknown,中間狀態(tài),表示需要MQ向消息發(fā)送方進行檢查以確定狀態(tài)
2、如何發(fā)送事務消息
RocketMQ(4.5.1版本)已經(jīng)把事務消息的發(fā)送方式封裝得非常優(yōu)雅,只需要兩個大的環(huán)節(jié)就能夠完成,創(chuàng)建事務消息生產(chǎn)者和實現(xiàn)TransactionListener接口??匆幌鹿俜降睦哟a
(1)創(chuàng)建事務消息生產(chǎn)者
使用TransactionMqProducer類創(chuàng)建消息生產(chǎn)客戶端,并指定唯一的ProducerGroup
設置自定義線程池來處理檢查請求
執(zhí)行本地事務之后,需要根據(jù)執(zhí)行結(jié)果回復MQ,回復上一小節(jié)中描述的狀態(tài)
(2)實現(xiàn)TransactionListener接口
“executeLocalTransaction”方法用于在發(fā)送半條消息成功時執(zhí)行本地事務。它返回上一節(jié)中提到的三個事務狀態(tài)之一。
“check local transaction”方法用于檢查本地事務狀態(tài)并響應MQ檢查請求。它還返回前一節(jié)中提到的三個事務狀態(tài)之一。
3、事務消息的執(zhí)行流程
代碼寫起來非常簡單,以至于光看代碼,并不能知道事務消息具體的執(zhí)行過程。
RocketMQ 事務消息的設計流程借鑒了兩階段提交理論,整體交互流程如下圖所示
事務發(fā)起方(即消息發(fā)送者)首先發(fā)送 prepare 消息到 MQ。
事務發(fā)起方(即消息發(fā)送者)在發(fā)送 prepare 消息成功后執(zhí)行本地事務。
根據(jù)本地事務執(zhí)行結(jié)果發(fā)送 commit 或者是 rollback 給 MQ。
如果消息是 rollback,MQ 將刪除該 prepare 消息不進行下發(fā)。
如果消息是 commit,MQ 將會把這個消息發(fā)送給 consumer 端。
如果執(zhí)行本地事務過程中,執(zhí)行端掛掉,或者超時,導致 MQ 收不到任何的消息(不知道是該 commit 還是該 rollback),RocketMQ 會定期掃描消息集群中的事務消息,這時候發(fā)現(xiàn)了某個 prepare 消息還不知道該怎么處理,它會向消息發(fā)送者確認,所以消息發(fā)送者需要實現(xiàn)一個 check 接口,RocketMQ 會根據(jù)消息發(fā)送者設置的策略來決定是 rollback 還是繼續(xù) commit。這樣就保證了消息發(fā)送與本地事務同時成功或同時失敗。
Consumer 端的消費成功機制由 MQ 保證。
4、事務消息的存儲模型
在具體實現(xiàn)上,RocketMQ 通過使用 Half Topic 以及 Operation Topic 兩個內(nèi)部隊列來存儲事務消息推進狀態(tài),如下圖所示
其中,Half Topic 對應隊列中存放著 prepare 消息,Operation Topic 對應的隊列則存放了 prepare message 對應的 commit/rollback 消息,消息體中則是 prepare message 對應的 offset,服務端通過比對兩個隊列的差值來找到尚未提交的超時事務,進行回查。
從用戶側(cè)來說,用戶需要分別實現(xiàn)本地事務執(zhí)行以及本地事務回查方法,因此只需關注本地事務的執(zhí)行狀態(tài)即可;而在 service 層,則對事務消息的兩階段提交進行了抽象,同時針對超時事務實現(xiàn)了回查邏輯,通過不斷掃描當前事務推進狀態(tài),來不斷反向請求 Producer 端獲取超時事務的執(zhí)行狀態(tài),在避免事務掛起的同時,也避免了 Producer 端的單點故障。
而在存儲層,RocketMQ 通過 Bridge 封裝了與底層隊列存儲的相關操作,用以操作兩個對應的內(nèi)部隊列,用戶也可以依賴其他存儲介質(zhì)實現(xiàn)自己的 service,RocketMQ 會通過 ServiceProvider 加載進來。
三、Notify的異曲同工
Notify和MetaQ是阿里的兩個消息中間件。MetaQ是一個高性能的存儲隊列;Notify是淘寶自主研發(fā)的一套消息服務引擎。貼兩個圖就什么都明白了
整體方案跟RocketMQ是完全相同的,只是兩者的Storage不同。
四、瓜子該怎么做事務一致性這塊工作
針對這個典型場景,有很多解決方案
1、Kafka換成RocketMQ
不行。有太多的業(yè)務跑在了Kafka上,替換消息中間件的成本基本不能接受。
2、類似去哪兒qmq的方案
這個方案研發(fā)簡單,但是侵入具體業(yè)務的數(shù)據(jù)庫,而且增加了部署運維的成本。
3、有人提出binlog+TCC的方案
沒有仔細研究,但是業(yè)務會經(jīng)常調(diào)整,想想負責配置數(shù)據(jù)庫日志的同學肯定會抓狂(DBA沒有那么了解業(yè)務)。
4、為Kafka配一個類似Notify的消息引擎
這個方案有一定的可行性
(1)把Kafka定位為MetaQ,研制一個Notify,為prepare message提供單獨的存儲
(2)現(xiàn)在各業(yè)務系統(tǒng)所采用的Kafka客戶端已經(jīng)是瓜子定制化開發(fā)的,可以模仿RocketMQ的客戶端進行改造。已有代碼的邏輯完全不受影響;需要事務一致性的功能,只需要換個接口,實現(xiàn)check邏輯即可,而原有消費方毫無感覺。
(3)似乎有可能結(jié)合spring的@Transactional標簽,在完全不改業(yè)務代碼(只升級自研Kafka客戶端)的情況下,也能緩解一些不一致問題
以上是“RocketMQ事務消息的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學習更多知識,歡迎關注創(chuàng)新互聯(lián)行業(yè)資訊頻道!