這期內(nèi)容當(dāng)中小編將會給大家?guī)碛嘘P(guān)REST微服務(wù)中怎么利用消息中間件實現(xiàn)分布式事務(wù),文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
成都創(chuàng)新互聯(lián)從2013年創(chuàng)立,先為下冶等服務(wù)建站,下冶等地企業(yè),進(jìn)行企業(yè)商務(wù)咨詢服務(wù)。為下冶企業(yè)網(wǎng)站制作PC+手機+微官網(wǎng)三網(wǎng)同步一站式服務(wù)解決您的所有建站問題。
我們還是使用之前的實例,一個訂票系統(tǒng)的購票邏輯:
這篇教程的源代碼可以從github上獲取。
使用消息中間件實現(xiàn)分布式事務(wù),也就是使用事件驅(qū)動實現(xiàn)。在這種方式下,Order服務(wù)不會直接調(diào)用User服務(wù),而是往MQ上發(fā)一個消息,說明有新訂單需要扣費;User服務(wù)會響應(yīng)這個消息,并處理,處理完成后再發(fā)一個消息,說明有新訂單需要轉(zhuǎn)移票;然后就會有Ticket服務(wù)來處理。而每個服務(wù)都是在一個事務(wù)里面處理讀消息、處理業(yè)務(wù)、寫消息的事情。大致流程如下:
在這種方式下,訂單的處理是異步的,用戶發(fā)起一個訂單的時候,只是生成一個正在處理的訂單,然后通過消息中間件一步步的進(jìn)行扣費、交票、完成訂單等邏輯。而每一個服務(wù)中相應(yīng)的操作,基本都是:
從一個隊列中讀取消息
操作相應(yīng)數(shù)據(jù)庫操作
往下一個隊列中發(fā)送消息
也就是說,需要在這個方法中需要操作數(shù)據(jù)庫和MQ兩個資源,這正好是上一篇文章中介紹Spring內(nèi)部事務(wù)和外部事務(wù)時使用的實例中的場景。下面就是大致的代碼:
12345678 | @JmsListener(destination = "order:new", containerFactory = "orderFactory")@Transactionalpublic void create(OrderDTO orderDTO) {Order order = new Order(orderDTO);order.setStatus("PENDING");orderRepository.save(order);jmsTemplate.convertAndSend("order:need_to_pay", order);} |
它監(jiān)聽MQ的”order:new”隊列,處理訂單,往”order:need_to_pay”發(fā)送一個消息。然后用戶服務(wù)就會接收這個消息,觸發(fā)扣費流程。
在這個地方,我們可以使用JTA事務(wù),來使用兩階段提交來實現(xiàn)兩個資源的共同提交,但是這會影響系統(tǒng)的性能。而且,還需要使用的消息中間件實現(xiàn)了XA的規(guī)范,提供兩階段提交的功能。
這里也可以使用本地事務(wù),這時,每個事物都會有一個JMS的Session,并使用事務(wù)。如此一來,就存在一個數(shù)據(jù)庫的事物和一個JMS的事務(wù),兩個事務(wù)是相互獨立并依次提交的。這樣,就有可能在極少數(shù)情況下出錯,但是也能采取一些錯誤來盡量解決。我們對上面的事務(wù)處理展開(偽代碼,只是為了說明處理過程),來看看出錯的情況以及該如何處理:
1234567891011 | jmsTransaction.begin(); // get transactions from jms sessiondbTransaction.begin(); // get transactions from JDBC connectiontry {orderRepository.save(order);jmsTemplate.convertAndSend("order:need_to_pay", order);dbTransaction.commit();jmsTransaction.commit();} catch(Exception e) {dbTransaction.rollback();jmsTransaction.rollback();} |
在上面的方法中,只要發(fā)生了錯誤,MQ消息的消費就算失敗,MQ的監(jiān)聽器就會重新觸發(fā)一次這個方法。
這時,如果錯誤發(fā)生在:
數(shù)據(jù)提交時或之前。這時,整個數(shù)據(jù)庫的操作都會被重置(也可能就根本還沒更新),重試的時候不需要考慮重復(fù)提交的問題,因為之前的提交都已經(jīng)被回滾。
數(shù)據(jù)庫提交成功,但是JMS提交失敗。這時就需要防止重復(fù)提交來避免數(shù)據(jù)庫的重復(fù)操作。
我們可以采用之前說過的token方式,在調(diào)用這個方法前,生成一個唯一的token。這里使用Java的UUID生成一個ID作為token。(如果這里的重復(fù)調(diào)用只是在這個服務(wù)內(nèi)部重新觸發(fā),就不需要考慮分布式系統(tǒng)的全局一致性ID的問題。這需要根據(jù)實際情況來判斷用什么樣的UUID生成方式)所以,Controller里面接受購票請求如下:
1234567 | @PostMapping(value = "/")@Transactionalpublic void create(@RequestBody OrderDTO orderDTO) {String uid = UUID.randomUUID().toString();orderDTO.setToken(uid);jmsTemplate.convertAndSend("order:new", orderDTO);} |
然后在Service里面監(jiān)聽這個隊列,處理購票:
123456789101112131415 | @JmsListener(destination = "order:new", containerFactory = "orderFactory")@Transactionalpublic void create(OrderDTO orderDTO) {if (!this.processedUIDs.contains(orderDTO.getToken())) {Order order = new Order(orderDTO);order.setStatus("PENDING");orderRepository.save(order);orderDTO.setStatus(order.getStatus());orderDTO.setId(order.getId());} else {LOG.info("Duplicate jms message:{}", orderDTO);}jmsTemplate.convertAndSend("order:need_to_pay", orderDTO);processedUIDs.add(orderDTO.getToken());} |
簡單來說,解決辦法就是,如果是重復(fù)觸發(fā)的,就略過數(shù)據(jù)庫相關(guān)的處理,直接往MQ的目標(biāo)隊列發(fā)送需要的數(shù)據(jù)。使得整個流程能夠往下走。
剛才說的是在一個服務(wù)內(nèi)出錯的情況,還有一種錯誤情況是,訂單服務(wù)和用戶服務(wù)已經(jīng)處理完訂單創(chuàng)建和扣費的操作,然后到了Ticket服務(wù)的時候,卻發(fā)現(xiàn)沒有票了。雖然我們可以通過合理的設(shè)計業(yè)務(wù)邏輯來避免這種問題,例如,在操作之前先檢查用戶余額,檢查并鎖票,然后進(jìn)行操作數(shù)據(jù)的事情。但是,在有些情況下,很難通過業(yè)務(wù)流程的設(shè)計來完全避免這種問題。如果出現(xiàn)了這種的問題,我們也可以通過撤銷的流程來實現(xiàn),業(yè)務(wù)流程如下:
在上面的解決方案中,使用JDK的UUID類生成一個ID,實際上這個ID只是在當(dāng)前的JVM內(nèi),才能夠保證是唯一的。其次,在JMS的標(biāo)準(zhǔn)中,沒有規(guī)定一個消息的Listener在讀取一個消息失敗后,重新讀取的問題。在微服務(wù)環(huán)境中,如果一個應(yīng)用部署了多個實例,那個這個消息有可能會被另一個實例讀到。所以在上面的方案中,使用JVM內(nèi)的唯一ID放在消息的內(nèi)容中,它有可能被任意一個實例處理,處理失敗后,又有可能被另一個實例處理。這就會出問題。所以我們需要一個分布式環(huán)境下的生成唯一ID的解決辦法。例如,先獲得JVM的唯一ID以后,再加上IP+端口等信息。而且,對已經(jīng)處理過的ID的緩存,也需要存在分布式環(huán)境中。
所以,我們完全可以不使用兩階段提交,就實現(xiàn)微服務(wù)架構(gòu)下的分布式事務(wù)。使用這種方式,它的優(yōu)點是:
實現(xiàn)簡單。結(jié)合Spring的事務(wù),幾乎不用寫額外的事務(wù)相關(guān)的代碼,就能夠?qū)崿F(xiàn)。我們只需要更好的服務(wù)的拆分和設(shè)計業(yè)務(wù)流程。
系統(tǒng)吞吐量高。因為數(shù)據(jù)庫或MQ不會被長期的鎖住,可以并發(fā)的處理更多的事務(wù)。
容錯性好。各個服務(wù)之間通過MQ來觸發(fā)協(xié)調(diào),即使在處理一個任務(wù)的時候有一個服務(wù)停了,消息還會一直保持,直到服務(wù)起來開始監(jiān)聽,然后繼續(xù)觸發(fā)這個任務(wù)。
當(dāng)然這種方式也有一些缺點,最大的問題就是異步處理的問題。用戶發(fā)出一個請求后,處理該業(yè)務(wù)的服務(wù)只是簡單處理,往MQ發(fā)送消息開始處理流程,然后就返回了。這時候這個任務(wù)還在處理。雖然有時候我們可以通過等的方式,等待最終處理完成的消息,然后在返回給用戶。但是這樣又得考慮響應(yīng)時間、超時、各種錯誤等情況。
有些人會覺得這種方式使得開發(fā)和調(diào)試都變得復(fù)雜,在我看來,恰恰相反,這使得開發(fā)和調(diào)試都簡單了。首先,根據(jù)微服務(wù)架構(gòu)的設(shè)計原則,就是每個服務(wù)只負(fù)責(zé)一個功能模塊;再者,根據(jù)面向?qū)ο蟮脑O(shè)計原則,一個方法只做一件事情。如果我們能夠合理的拆分服務(wù),和每一步的處理方法,這正是一個好的設(shè)計。在維護(hù)的時候,每個方法、每個步驟做什么事情,都很清楚。
說到調(diào)試,我的原則是,你應(yīng)該通過單元測試來發(fā)現(xiàn)和解決問題,而不是調(diào)試。以上面的購票流程為例,每一個服務(wù),通過MQ觸發(fā)一個方法的時候,它的參數(shù)應(yīng)該是什么、狀態(tài)是什么都應(yīng)該是明確的,這個方法執(zhí)行完成后,會產(chǎn)生什么新的數(shù)據(jù),狀態(tài)會更新成什么,都應(yīng)該是明確的。而這些都可以通過單元測試來很好的測試。如果你的復(fù)雜流程中的每一個都能通過單元測試進(jìn)行完善的測試,那么這些方法串聯(lián)到一起,不但能夠很好的工作,也能應(yīng)付各種異常的情況。
上述就是小編為大家分享的REST微服務(wù)中怎么利用消息中間件實現(xiàn)分布式事務(wù)了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。