引言:Mycat已經(jīng)成為了一個(gè)強(qiáng)大的開源分布式數(shù)據(jù)庫中間件產(chǎn)品。面對企業(yè)應(yīng)用的海量數(shù)據(jù)事務(wù)處理,是目前最好的開源解決方案。但是如果想讓多臺機(jī)器中的數(shù)據(jù)保存一致,比較常規(guī)的解決方法是引入“協(xié)調(diào)者”來統(tǒng)一調(diào)度所有節(jié)點(diǎn)的執(zhí)行。
本文選自《分布式數(shù)據(jù)庫架構(gòu)及企業(yè)實(shí)踐——基于Mycat中間件》。成都創(chuàng)新互聯(lián)公司專注于利川網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗(yàn)。 熱誠為您提供利川營銷型網(wǎng)站建設(shè),利川網(wǎng)站制作、利川網(wǎng)頁設(shè)計(jì)、利川網(wǎng)站官網(wǎng)定制、成都小程序開發(fā)服務(wù),打造利川網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供利川網(wǎng)站排名全網(wǎng)營銷落地服務(wù)。
隨著并發(fā)量、數(shù)據(jù)量越來越大及業(yè)務(wù)已經(jīng)細(xì)化到不能再按照業(yè)務(wù)劃分,我們不得不使用分布式數(shù)據(jù)庫提高系統(tǒng)的性能。在分布式系統(tǒng)中,各個(gè)節(jié)點(diǎn)在物理上都是相對獨(dú)立的,每個(gè)節(jié)點(diǎn)上的數(shù)據(jù)操作都可以滿足 ACID。但是,各獨(dú)立節(jié)點(diǎn)之間無法知道其他節(jié)點(diǎn)事務(wù)的執(zhí)行情況,如果想讓多臺機(jī)器中的數(shù)據(jù)保存一致,就必須保證所有節(jié)點(diǎn)上的數(shù)據(jù)操作要么全部執(zhí)行成功,要么全部不執(zhí)行,比較常規(guī)的解決方法是引入“協(xié)調(diào)者”來統(tǒng)一調(diào)度所有節(jié)點(diǎn)的執(zhí)行。
X/Open 組織(即現(xiàn)在的 Open Group)定義了分布式事務(wù)處理模型。X/Open DTP 模型(1994)包括應(yīng)用程序(AP)、事務(wù)管理器(TM)、資源管理器(RM)、通信資源管理器(CRM)四部分。事務(wù)管理器(TM)是交易中間件,資源管理器(RM)是數(shù)據(jù)庫,通信資源管理器(CRM)是消息中間件。通常把一個(gè)數(shù)據(jù)庫內(nèi)部的事務(wù)處理看作本地事務(wù),而分布式事務(wù)處理的對象是全局事務(wù)。全局事務(wù)是指在分布式事務(wù)處理環(huán)境中,多個(gè)數(shù)據(jù)庫可能需要共同完成一個(gè)工作,這個(gè)工作就是一個(gè)全局事務(wù)。在一個(gè)事務(wù)中可能更新幾個(gè)不同的數(shù)據(jù)庫,此時(shí)一個(gè)數(shù)據(jù)庫對自己內(nèi)部所做操作的提交不僅需要本身的操作成功,還需要全局事務(wù)相關(guān)的其他數(shù)據(jù)庫的操作成功。如果任一數(shù)據(jù)庫的任一操作失敗,則參與此事務(wù)的所有數(shù)據(jù)庫所做的所有操作都必須回滾。XA就是X/Open DTP 定義的交易中間件與數(shù)據(jù)庫之間的接口規(guī)范(即接口函數(shù)),交易中間件用它來通知數(shù)據(jù)庫事務(wù)的開始、結(jié)束、提交、回滾等,XA 接口函數(shù)由數(shù)據(jù)庫廠商提供,根據(jù)這一思想衍生出二階段提交協(xié)議和三階段提交協(xié)議。
所謂的兩個(gè)階段是指準(zhǔn)備階段和提交階段。
準(zhǔn)備階段指事務(wù)協(xié)調(diào)者(事務(wù)管理器)向每個(gè)參與者(資源管理器)發(fā)送準(zhǔn)備消息,每個(gè)參與者要么直接返回失敗消息(如權(quán)限驗(yàn)證失?。?,要么在本地執(zhí)行事務(wù),寫本地的 redo 和undo日志但不提交,可以進(jìn)一步將準(zhǔn)備階段分為以下三步。
?。?)協(xié)調(diào)者節(jié)點(diǎn)向所有參與者節(jié)點(diǎn)詢問是否可以執(zhí)行提交操作(vote),并開始等待各參與者節(jié)點(diǎn)的響應(yīng)。
?。?)參與者節(jié)點(diǎn)執(zhí)行詢問發(fā)起為止的所有事務(wù)操作,并將 undo 信息和 redo 信息寫入日志。
?。?)各參與者節(jié)點(diǎn)響應(yīng)協(xié)調(diào)者節(jié)點(diǎn)發(fā)起的詢問。如果參與者節(jié)點(diǎn)的事務(wù)操作實(shí)際執(zhí)行成功,則它返回一個(gè)“同意”消息;如果參與者節(jié)點(diǎn)的事務(wù)操作實(shí)際執(zhí)行失敗,則它返回一個(gè)“中止”消息。
提交階段指如果協(xié)調(diào)者收到了參與者的失敗消息或者超時(shí),則直接向每個(gè)參與者發(fā)送回滾(Rollback)消息,否則發(fā)送提交(Commit)消息,參與者根據(jù)協(xié)調(diào)者的指令執(zhí)行提交或者回滾操作,釋放所有事務(wù)在處理過程中使用的鎖資源。
二階段提交所存在的缺點(diǎn)如下。
?。?)同步阻塞問題,在執(zhí)行過程中所有參與節(jié)點(diǎn)都是事務(wù)阻塞型的,當(dāng)參與者占用公共資源時(shí),其他第三方節(jié)點(diǎn)訪問公共資源時(shí)不得不處于阻塞狀態(tài)。
(2)單點(diǎn)故障,由于協(xié)調(diào)者的重要性,一旦協(xié)調(diào)者發(fā)生故障,則參與者會一直阻塞下去。
?。?)數(shù)據(jù)不一致,在二階段提交的第 2 個(gè)階段中,當(dāng)協(xié)調(diào)者向參與者發(fā)送 commit 請求之后發(fā)生了局部網(wǎng)絡(luò)異?;蛘咴诎l(fā)送 commit 請求的過程中協(xié)調(diào)者發(fā)生了故障,則會導(dǎo)致只有一部分參與者接收到了 commit 請求,而在這部分參與者在接收到 commit 請求之后就會執(zhí)行commit操作,其他部分未接收到 commit 請求的機(jī)器則無法執(zhí)行事務(wù)提交,于是整個(gè)分布式系統(tǒng)便出現(xiàn)了數(shù)據(jù)不一致的現(xiàn)象。
由于二階段提交存在諸如同步阻塞、單點(diǎn)問題、數(shù)據(jù)不一致、宕機(jī)等缺陷,所以,研究者們在二階段提交的基礎(chǔ)上做了改進(jìn),提出了三階段提交。
三階段提交(Three-phase commit,3PC),也叫作三階段提交協(xié)議(Three-phase commitprotocol),是二階段提交(2PC)的改進(jìn)版本。三階段提交把二階段提交的準(zhǔn)備階段再次一分為二,這樣三階段提交就有 CanCommit、PreCommit、DoCommit 三個(gè)階段。
?。?)CanCommit 階段:三階段提交的 CanCommit 階段其實(shí)和二階段提交的準(zhǔn)備階段很像,協(xié)調(diào)者向參與者發(fā)送 commit 請求,參與者如果可以提交就返回 Yes 響應(yīng),否則返回 No 響應(yīng)。
(2)PreCommit 階段:協(xié)調(diào)者根據(jù)參與者的反應(yīng)情況來決定是否可以記錄事務(wù)的 PreCommit操作。根據(jù)響應(yīng)情況,有以下兩種可能。
假如協(xié)調(diào)者從所有參與者那里獲得的反饋都是 Yes 響應(yīng),則執(zhí)行事務(wù)。
假如有任何一個(gè)參與者向協(xié)調(diào)者發(fā)送了 No 響應(yīng),或者等待超時(shí)之后協(xié)調(diào)者都沒有接到參與者的響應(yīng),則執(zhí)行事務(wù)的中斷。
(3)DoCommit階段:該階段進(jìn)行真正的事務(wù)提交,也可以分為執(zhí)行提交、中斷事務(wù)兩種執(zhí)行情況。
執(zhí)行提交的過程如下。
協(xié)調(diào)者接收到參與者發(fā)送的ACK響應(yīng)后,將從預(yù)提交狀態(tài)進(jìn)入提交狀態(tài),并向所有參與者發(fā)送doCommit請求。
事務(wù)提交參與者接收到doCommit請求之后,執(zhí)行正式的事務(wù)提交,并在完成事務(wù)提交之后釋放所有的事務(wù)資源。
事務(wù)提交完之后,向協(xié)調(diào)者發(fā)送ACK響應(yīng)。
協(xié)調(diào)者接收到所有參與者的ACK響應(yīng)之后,完成事務(wù)。中斷事務(wù)的過程如下。
協(xié)調(diào)者向所有參與者發(fā)送abort請求。
參與者接收到 abort 請求之后,利用其在第 2 個(gè)階段記錄的 undo 信息來執(zhí)行事務(wù)的回滾操作,并在完成回滾之后釋放所有的事務(wù)資源。
參與者完成事務(wù)回滾之后,向協(xié)調(diào)者發(fā)送 ACK 消息。
協(xié)調(diào)者接收到參與者反饋的 ACK 消息之后,執(zhí)行事務(wù)的中斷。
Mycat在1.6版本以后已經(jīng)完全支持 XA 分布式強(qiáng)事務(wù)類型了,先通過一個(gè)簡單的示例來了解Mycat中XA的用法。
用戶應(yīng)用側(cè)(AP)的使用流程如下:
(1)set autocommit=0
在應(yīng)用層需要設(shè)置事務(wù)不能自動(dòng)提交;
?。?)set xa=on
在 SQL 中設(shè)置 XA 為開啟狀態(tài);
(3)執(zhí)行 SQL
insert into travelrecord(id,name) values(1,’N’),(6000000,’A’),(321,’D’),(13400000,’C’),(59,’E’);
?。?)commit 或者 rollback
對事務(wù)進(jìn)行提交(提交成功或者回滾異常)。
完整的流程圖如圖所示。
Mycat 內(nèi)部實(shí)現(xiàn)側(cè)的實(shí)現(xiàn)流程如下:
(1)set autocommit=0
將 MySQLConnection 中的 autocommit 設(shè)置為 false;
(2)set xa=on
在Mycat中開啟 XA 事務(wù)管理器,用 MycatServer.getInstance().genXATXID()生成 XID,用XA START XID 命令進(jìn)行 XA 事務(wù)開始標(biāo)記,繼續(xù)拼裝 SQL 業(yè)務(wù)(Mycat 會將上面的 insert 數(shù)據(jù)分片到不同的節(jié)點(diǎn)上),拼裝 XA END XID,XA PREPARE XID 最后進(jìn)行 1pc 提交并記錄日志到 tm.log 中,如果 1pc 階段有異常,則直接回滾事務(wù) XA ROLLBACK xid。
?。?)在多節(jié)點(diǎn) MySQL 中全部進(jìn)行 2pc 提交(XA COMMIT),提交成功后,事務(wù)結(jié)束;如果有異常,則對事務(wù)進(jìn)行重新提交或者回滾。
Mycat 中的 XA 分布式事務(wù)的異常處理流程如下:
?。?)一階段 commit 異常:如果 1pc 提交任意一個(gè) mysql 節(jié)點(diǎn)無法提交或者異常,則全部節(jié)點(diǎn)的事務(wù)進(jìn)行回滾,拋出異常給應(yīng)用側(cè)事務(wù)回滾。
?。?)Mycat Crash Recovery
Mycat 崩潰以后,根據(jù) tm.log 事務(wù)日志再進(jìn)行重啟恢復(fù),mycat 啟動(dòng)后執(zhí)行事務(wù)日志查找各個(gè)節(jié)點(diǎn)中已經(jīng) prepared 的 XA 事務(wù),進(jìn)行 commit 或者 rollback。
通過用戶應(yīng)用側(cè)發(fā)送 set xa = on ; SQL 開啟 Mycat 內(nèi)部 XA 事務(wù)管理器的功能,事務(wù)管理器將對 MySQL 數(shù)據(jù)庫進(jìn)行 XA 方式的事務(wù)管理,具體事務(wù)管理功能的實(shí)現(xiàn)代碼如下:
MySQLConnection:數(shù)據(jù)庫連接。
NonBlockingSession:用戶連接 Session。
MultiNodeCoordinator:協(xié)調(diào)者。
CommitNodeHandler:分片提交處理。
RollbackNodeHandler:分片回滾處理。
XA 事務(wù)啟動(dòng)的源碼如下:
public class MySQLConnection extends BackendAIOConnection { //設(shè)置開啟事務(wù) private void getAutocommitCommand(StringBuilder sb, boolean autoCommit) { if (autoCommit) { sb.append("SET autocommit=1;"); } else { sb.append("SET autocommit=0;"); } } public void execute(RouteResultsetNode rrn, ServerConnection sc,boolean autocommit) throws UnsupportedEncodingException { if(!modifiedSQLExecuted && rrn.isModifySQL()) { modifiedSQLExecuted = true; } //獲取當(dāng)前事務(wù) ID String xaTXID = sc.getSession2().getXaTXID(); synAndDoExecute(xaTXID, rrn, sc.getCharsetIndex(), sc.getTxIsolation(),autocommit); } …… ……//省略此處代碼,建議讀者參考 GitHub 倉庫的 MyCAT-Server 項(xiàng)目的 MySQLConnection.java源碼}
用戶應(yīng)用側(cè)設(shè)置手動(dòng)提交以后,Mycat 會在當(dāng)前連接中加入
SET autocommit=0;
將該語句加入到 StringBuffer 中,等待提交到數(shù)據(jù)庫。
用戶連接 Session 的源碼如下:
public class NonBlockingSession implements Session { …… ……//省略此處代碼,建議讀者參考 GitHub 倉庫的 MyCAT-Server 項(xiàng)目的 NonBlockingSession.java 源碼} SET XA = ON ;語句分析
用戶應(yīng)用側(cè)發(fā)送該語句到 Mycat 中,由 SQL 語句解析器解析后交由 SetHandle 進(jìn)行處理c.getSession2().setXATXEnabled (true);
調(diào)用 NonBlockSession 中的 setXATXEnable d 方法設(shè)置 XA 開關(guān)啟動(dòng),并生成 XID,代碼如下:
public void setXATXEnabled(boolean xaTXEnabled) { LOGGER.info("XA Transaction enabled ,con " + this.getSource()); if (xaTXEnabled && this.xaTXID == null) { xaTXID = genXATXID(); } }
另外,NonBlockSession 會接收來自于用戶應(yīng)用側(cè)的 commit, 調(diào)用 commit 方法進(jìn)行處理事務(wù)提交的邏輯。
在 commit()方法中,首先會 check 節(jié)點(diǎn)個(gè)數(shù),一個(gè)節(jié)點(diǎn)和多個(gè)節(jié)點(diǎn)分為不同的處理過程,這里只講下多個(gè)節(jié)點(diǎn)的處理方法 checkDistriTransaxAndExecute();
該方法會對多個(gè)節(jié)點(diǎn)的事務(wù)進(jìn)行提交。
協(xié)調(diào)者的源碼如下:
public class MultiNodeCoordinator implements ResponseHandler { …… ……//省略此處代碼,建議讀者參考 GitHub 倉庫 MyCAT-Server 項(xiàng)目的 MultiNodeCoordinator.java 源碼}
在 NonBlockSession 的 checkDistriTransaxAndExecute()方法中, NonBlockSession 會話類會調(diào)用專門進(jìn)行多節(jié)點(diǎn)協(xié)同的 MultiNodeCoordinator 類進(jìn)行具體的處理,在 MultiNodeCoordinator類中,executeBatchNodeCmd 方法加入 XA 1PC 提交的處理,代碼片段如下:
for (RouteResultsetNode rrn : session.getTargetKeys()) { …… if (mysqlCon.getXaStatus() == TxState.TX_STARTED_STATE){ //recovery Log participantLogEntry[started] = new ParticipantLogEntry(xaTxId,conn.getHost(),0,conn.getSchema(),((MySQLConnection) conn).getXaStatus()); String[] cmds = new String[]{"XA END " + xaTxId,"XA PREPARE " + xaTxId}; if (LOGGER.isDebugEnabled()) { LOGGER.debug("Start execute the batch cmd : "+ cmds[0] + ";" +cmds[1]+","+"current connection:"+conn.getHost()+":"+conn.getPort()); } mysqlCon.execBatchCmd(cmds); } …… }
在 MultiNodeCoordinator 類的 okResponse 方法中,則進(jìn)行 2pc 的事務(wù)提交
MySQLConnection mysqlCon = (MySQLConnection) conn;switch (mysqlCon.getXaStatus()){ case TxState.TX_STARTED_STATE: if (mysqlCon.batchCmdFinished()){ String xaTxId = session.getXaTXID(); String cmd = "XA COMMIT " + xaTxId; if (LOGGER.isDebugEnabled()) { LOGGER.debug("Start execute the cmd :"+cmd+",current host:"+mysqlCon.getHost()+":"+mysqlCon.getPort()); } //recovery log CoordinatorLogEntry coordinatorLogEntry =inMemoryRepository.get(xaTxId); for(int i=0; i分片事務(wù)提交處理的源碼如下:
public class CommitNodeHandler implements ResponseHandler { //結(jié)束 XA public void commit(BackendConnection conn) { …… ……//省略此處代碼,建議讀者參考 GitHub 倉庫 MyCAT-Server 項(xiàng)目的 CommitNodeHandler.java源碼 } //提交 XA @Override public void okResponse(byte[] ok, BackendConnection conn) { …… ……//省略此處代碼,建議讀者參考 GitHub 倉庫的 MyCAT-Server 項(xiàng)目的 CommitNodeHandler.java 源碼}在 Mycat 中同樣支持單節(jié)點(diǎn) MySQL 數(shù)據(jù)庫的 XA 事務(wù)處理,在 CommitNodeHandler 類中就是對單節(jié)點(diǎn)的 XA 二階段處理,處理方式與 MultiNodeCoordinator 類同,通過 commit 方法進(jìn)行 1pc 的提交,而通過 okResponse 的方法進(jìn)行 2pc 階段的事務(wù)提交。
分片事務(wù)回滾處理的源碼如下:public class RollbackNodeHandler extends MultiNodeHandler { …… ……//省略此處代碼,建議讀者參考 GitHub 倉庫的 MyCAT-Server 項(xiàng)目的 RollbackNodeHandler.java 源碼}在 RollbackNodeHandler 的 rollback 方法中加入了對 XA 事務(wù)的 rollback 處理,用戶應(yīng)用側(cè)發(fā)起的 rollback 會在這個(gè)方法中進(jìn)行處理。
for (final RouteResultsetNode node : session.getTargetKeys()) { …… //support the XA rollback MySQLConnection mysqlCon = (MySQLConnection) conn; if(session.getXaTXID()!=null) { String xaTxId = session.getXaTXID(); mysqlCon.execCmd("XA END " + xaTxId + ";"); mysqlCon.execCmd("XA ROLLBACK " + xaTxId + ";"); }else { conn.rollback(); } …… }同樣,該方法會對所有的 MySQL 數(shù)據(jù)庫節(jié)點(diǎn)發(fā)起 xa rollback 指令。
本文選自《分布式數(shù)據(jù)庫架構(gòu)及企業(yè)實(shí)踐——基于Mycat中間件》,點(diǎn)此鏈接可在博文視點(diǎn)官網(wǎng)查看。
想及時(shí)獲得更多精彩文章,可在微信中搜索“博文視點(diǎn)”或者掃描下方二維碼并關(guān)注。
網(wǎng)頁標(biāo)題:Mycat分布式事務(wù)的實(shí)現(xiàn)
文章起源:http://weahome.cn/article/pphdeg.html