背景
Seata 四種事務(wù)模式中,AT 事務(wù)模式是阿里體系獨(dú)創(chuàng)的事務(wù)模式,對(duì)業(yè)務(wù)無侵入,也是 Seata 用戶最多的一種事務(wù)模式,兼具易用性與高性能。
目前,Seata 社區(qū)正大力推進(jìn)其多語言版本建設(shè),Go、PHP、JS 和 Python 四個(gè)語言版本基本完成了 TCC 事務(wù)模式的實(shí)現(xiàn)。參照 Seata v1.5.2 版本的 AT 模式的實(shí)現(xiàn),并結(jié)合 Seata 官方文檔,本文嘗試從代碼角度詳解 Seata AT 事務(wù)模式的詳細(xì)流程,目的是梳理 Seata Java 版本 AT 模式的實(shí)現(xiàn)細(xì)節(jié)后,在多語言版本后續(xù)開發(fā)中,優(yōu)先實(shí)現(xiàn) AT 事務(wù)模式。
1、什么是 AT 模式?
AT 模式是一種二階段提交的分布式事務(wù)模式,它采用了本地 undo log 的方式來數(shù)據(jù)在修改前后的狀態(tài),并用它來實(shí)現(xiàn)回滾。從性能上來說,AT 模式由于有 undo log 的存在,一階段執(zhí)行完可以立即釋放鎖和連接資源,吞吐量比 XA 模式高。用戶在使用 AT 模式的時(shí)候,只需要配置好對(duì)應(yīng)的數(shù)據(jù)源即可,事務(wù)提交、回滾的流程都由 Seata 自動(dòng)完成,對(duì)用戶業(yè)務(wù)幾乎沒有入侵,使用便利。
2、AT 模式與 ACID 和 CAP
談?wù)摂?shù)據(jù)庫的事務(wù)模式,一般都會(huì)先談?wù)撌聞?wù)相關(guān)的 ACID 特性,但在分布式場(chǎng)景下,還需要考慮其 CAP 性質(zhì)。
2.1 AT 與 ACID
數(shù)據(jù)庫事務(wù)要滿足原子性、一致性、持久性以及隔離性四個(gè)性質(zhì),即 ACID 。在分布式事務(wù)場(chǎng)景下,一般地,首先保證原子性和持久性,其次保證一致性,隔離性則因?yàn)槠涫褂玫牟煌瑪?shù)據(jù)庫的鎖、數(shù)據(jù) MVCC 機(jī)制以及相關(guān)事務(wù)模式的差異, 具有多種隔離級(jí)別,如 MySQL 自身事務(wù)就有讀未提交(Read Uncommitted)、讀已提交(Read Committed)、可重復(fù)讀(Repeatable Read)、序列化(Serializable)等四種隔離級(jí)別。
2.1.1 AT 模式的讀隔離
在數(shù)據(jù)庫本地事務(wù)隔離級(jí)別讀已提交(Read Committed)或以上的基礎(chǔ)上,Seata(AT 模式)的默認(rèn)全局隔離級(jí)別是讀未提交(Read Uncommitted)。
如果應(yīng)用在特定場(chǎng)景下,必須要求全局的讀已提交,目前 Seata 的方式是通過 SELECT FOR UPDATE 語句的代理。
SELECT FOR UPDATE 語句的執(zhí)行會(huì)查詢?nèi)宙i,如果全局鎖被其他事務(wù)持有,則釋放本地鎖(回滾 SELECT FOR UPDATE 語句的本地執(zhí)行)并重試。這個(gè)過程中,查詢是被 block 住的,直到全局鎖拿到,即讀取的相關(guān)數(shù)據(jù)是已提交的,才返回。
出于總體性能上的考慮,Seata 目前的方案并沒有對(duì)所有 SELECT 語句都進(jìn)行代理,僅針對(duì) FOR UPDATE 的 SELECT 語句。
2.1.2 AT 模式的寫隔離
AT 會(huì)對(duì)寫操作的 SQL 進(jìn)行攔截,提交本地事務(wù)前,會(huì)向 TC 獲取全局鎖,未獲取到全局鎖的情況下,不能進(jìn)行寫,以此來保證不會(huì)發(fā)生寫沖突:
-一階段本地事務(wù)提交前,需要確保先拿到全局鎖;
-拿不到全局鎖,不能提交本地事務(wù);
-拿全局鎖的嘗試被限制在一定范圍內(nèi),超出范圍將放棄,并回滾本地事務(wù),釋放本地鎖。
2.2 AT 與 CAP
Seata 所有的事務(wù)模式在一般情況下,是需要保證 CP,即一致性和分區(qū)容錯(cuò)性,因?yàn)榉植际绞聞?wù)的核心就是要保證數(shù)據(jù)的一致性(包括弱一致性)。比如,在一些交易場(chǎng)景下,涉及到多個(gè)系統(tǒng)的金額的變化,保證一致性可以避免系統(tǒng)產(chǎn)生資損。
分布式系統(tǒng)不可避免地會(huì)出現(xiàn)服務(wù)不可用的情況,如 Seata 的 TC 出現(xiàn)不可用時(shí),用戶可能希望通過服務(wù)降級(jí),優(yōu)先保證整個(gè)服務(wù)的可用性,此時(shí) Seata 需要從 CP 系統(tǒng)轉(zhuǎn)換為一個(gè)保證 AP 的系統(tǒng)。
比如,有一個(gè)服務(wù)是給用戶端提供用戶修改信息的功能,假如此時(shí) TC 服務(wù)出現(xiàn)問題,為了不影響用戶的使用體驗(yàn),我們希望服務(wù)仍然可用,只不過所有的 SQL 的執(zhí)行降級(jí)為不走全局事務(wù),而是當(dāng)做本地事務(wù)執(zhí)行。
AT 模式默認(rèn)優(yōu)先保證 CP,但提供了配置通道讓用戶在 CP 和 AP 兩種模式下進(jìn)行切換:
-配置文件的 tm.degrade-check 參數(shù),其值為 true 則分支事務(wù)保證 AP,反之保證 CP;
-手動(dòng)修改配置中心的 service.disableGlobalTransaction 屬性為 true,則關(guān)閉全局事務(wù)實(shí)現(xiàn) AP。
3、AT 數(shù)據(jù)源代理
在 AT 模式中,用戶只需要配置好 AT 的代理數(shù)據(jù)源即可, AT 的所有流程都在代理數(shù)據(jù)源中完成,對(duì)用戶無感知。
AT 數(shù)據(jù)源代理的整體類結(jié)構(gòu)如下圖:
AT 事務(wù)數(shù)據(jù)源代理類結(jié)構(gòu)圖
AT 的數(shù)據(jù)源代理中,分別對(duì)目標(biāo)數(shù)據(jù)庫的 DataSource 、 Connection 和 Statement 進(jìn)行了代理,在執(zhí)行目標(biāo) SQL 動(dòng)作之前,完成了 RM 資源注冊(cè)、 undo log 生成、分支事務(wù)注冊(cè)、分支事務(wù)提交 / 回滾等操作,而這些操作對(duì)用戶并無感知。
下面的時(shí)序圖中,展示了 AT 模式在執(zhí)行過程中,這幾個(gè)代理類的動(dòng)作細(xì)節(jié):
注:圖片建議在 PC 端查看
4、AT 模式流程
以下是 AT 模式的整體流程,從這里可以看到分布式事務(wù)各個(gè)關(guān)鍵動(dòng)作的執(zhí)行時(shí)機(jī),每個(gè)動(dòng)作細(xì)節(jié),我們后面來討論:
注:圖片建議在 PC 端查看
4.1 一階段
在 AT 模式的第一階段, Seata 會(huì)通過代理數(shù)據(jù)源,攔截用戶執(zhí)行的業(yè)務(wù) SQL ,假如用戶沒有開啟事務(wù),會(huì)自動(dòng)開啟一個(gè)新事務(wù)。如果業(yè)務(wù) SQL 是寫操作(增、刪、改操作)類型,會(huì)解析業(yè)務(wù) SQL 的語法,生成 SELECT SQL 語句,把要被修改的記錄查出來,保存為 “before image” 。然后執(zhí)行業(yè)務(wù) SQL ,執(zhí)行完后用同樣的原理,將已經(jīng)被修改的記錄查出來,保存為 “after image” ,至此一個(gè) undo log 記錄就完整了。
隨后 RM 會(huì)向 TC 注冊(cè)分支事務(wù), TC 側(cè)會(huì)新加鎖記錄,鎖可以保證 AT 模式的讀、寫隔離。RM 再將 undo log 和業(yè)務(wù) SQL 的本地事務(wù)提交,保證業(yè)務(wù) SQL 和保存 undo log 記錄 SQL 的原子性。
4.2 二階段提交
AT 模式的二階段提交,TC 側(cè)會(huì)將該事務(wù)的鎖刪除,然后通知 RM 異步刪除 undo log 記錄即可。
4.3 二階段回滾
如果 AT 模式的二階段是回滾,那么 RM 側(cè)需要根據(jù)一階段保存的 undo log 數(shù)據(jù)中的 before image 記錄,通過逆向 SQL 的方式,對(duì)在一階段修改過的業(yè)務(wù)數(shù)據(jù)進(jìn)行還原即可。
但是在還原數(shù)據(jù)之前,需要進(jìn)行臟數(shù)據(jù)校驗(yàn)。因?yàn)樵谝浑A段提交后,到現(xiàn)在進(jìn)行回滾的中間這段時(shí)間,該記錄有可能被別的業(yè)務(wù)改動(dòng)過。校驗(yàn)的方式,就是用 undo log 的 after image 和現(xiàn)在數(shù)據(jù)庫的數(shù)據(jù)做比較,假如數(shù)據(jù)一致,說明沒有臟數(shù)據(jù);不一致則說明有臟數(shù)據(jù),出現(xiàn)臟數(shù)據(jù)就需要人工進(jìn)行處理了。
5、關(guān)鍵代碼模塊
如下是 AT 模式整個(gè)流程的主要模塊,我們從中可以了解開發(fā) AT 模式需要做哪些事情:
5.1 Undo log 數(shù)據(jù)格式
undo log 存在表 undo_log 表中,undo_log 表的表結(jié)構(gòu)如下:
rollback_info 存放了業(yè)務(wù)數(shù)據(jù)修改前后的內(nèi)容,數(shù)據(jù)表存放的是經(jīng)過壓縮后的格式,他的明文格式如下:
{ "branchId":2828558179596595558, "sqlUndoLogs":[ { "afterImage":{ "rows":[ { "fields":[ { "keyType":"PRIMARY_KEY", "name":"id", "type":4, "value":3 }, { "keyType":"NULL", "name":"count", "type":4, "value":70 } ] } ], "tableName":"stock_tbl" }, "beforeImage":{ "rows":[ { "fields":[ { "keyType":"PRIMARY_KEY", "name":"id", "type":4, "value":3 }, { "keyType":"NULL", "name":"count", "type":4, "value":100 } ] } ], "tableName":"stock_tbl" }, "sqlType":"UPDATE", "tableName":"stock_tbl" } ], "xid":"192.168.51.1022828558179596595550" }
5.2 UndoLogManager
UndoLogManager 負(fù)責(zé) undo log 的新加、刪除、回滾操作,不同的數(shù)據(jù)庫有不同的實(shí)現(xiàn)(不同數(shù)據(jù)庫的 SQL 語法會(huì)不同),公共邏輯放在了 AbstractUndoLogManager 抽象類中,整體的類繼承關(guān)系如下圖:
注:圖片建議在 PC 端查看
插入和刪除 undo log 的邏輯都比較簡(jiǎn)單,直接操作數(shù)據(jù)表就行。這里重點(diǎn)看下回滾 undo log 的邏輯:
源碼分析如下:
@Override public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException { Connection conn = null;b ResultSet rs = null; PreparedStatement selectPST = null; boolean originalAutoCommit = true; for (; ; ) { try { conn = dataSourceProxy.getPlainConnection(); // The entire undo process should run in a local transaction. // 開啟本地事務(wù),確保刪除undo log和恢復(fù)業(yè)務(wù)數(shù)據(jù)的SQL在一個(gè)事務(wù)中commit if (originalAutoCommit = conn.getAutoCommit()) { conn.setAutoCommit(false); } // Find UNDO LOG selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL); selectPST.setLong(1, branchId); selectPST.setString(2, xid); // 查出branchId的所有undo log記錄,用來恢復(fù)業(yè)務(wù)數(shù)據(jù) rs = selectPST.executeQuery(); boolean exists = false; while (rs.next()) { exists = true; // It is possible that the server repeatedly sends a rollback request to roll back // the same branch transaction to multiple processes, // ensuring that only the undo_log in the normal state is processed. int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS); // 如果state=1,說明可以回滾;state=1說明不能回滾 if (!canUndo(state)) { if (LOGGER.isInfoEnabled()) { LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state); } return; } String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT); Mapcontext = parseContext(contextString); byte[] rollbackInfo = getRollbackInfo(rs); String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY); // 根據(jù)serializer獲取序列化工具類 UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance() : UndoLogParserFactory.getInstance(serializer); // 反序列化undo log,得到業(yè)務(wù)記錄修改前后的明文 BranchUndoLog branchUndoLog = parser.decode(rollbackInfo); try { // put serializer name to local setCurrentSerializer(parser.getName()); List sqlUndoLogs = branchUndoLog.getSqlUndoLogs(); if (sqlUndoLogs.size() > 1) { Collections.reverse(sqlUndoLogs); } for (SQLUndoLog sqlUndoLog : sqlUndoLogs) { TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta( conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId()); sqlUndoLog.setTableMeta(tableMeta); AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor( dataSourceProxy.getDbType(), sqlUndoLog); undoExecutor.executeOn(conn); } } finally { // remove serializer name removeCurrentSerializer(); } } // If undo_log exists, it means that the branch transaction has completed the first phase, // we can directly roll back and clean the undo_log // Otherwise, it indicates that there is an exception in the branch transaction, // causing undo_log not to be written to the database. // For example, the business processing timeout, the global transaction is the initiator rolls back. // To ensure data consistency, we can insert an undo_log with GlobalFinished state // to prevent the local transaction of the first phase of other programs from being correctly submitted. // See https://github.com/seata/seata/issues/489 if (exists) { deleteUndoLog(xid, branchId, conn); conn.commit(); if (LOGGER.isInfoEnabled()) { LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId, State.GlobalFinished.name()); } } else { // 如果不存在undo log,可能是因?yàn)榉种聞?wù)還未執(zhí)行完成(比如,分支事務(wù)執(zhí)行超時(shí)),TM發(fā)起了回滾全局事務(wù)的請(qǐng)求。 // 這個(gè)時(shí)候,往undo_log表插入一條記錄,可以使分支事務(wù)提交的時(shí)候失敗(undo log) insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn); conn.commit(); if (LOGGER.isInfoEnabled()) { LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId, State.GlobalFinished.name()); } } return; } catch (SQLIntegrityConstraintViolationException e) { // Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log if (LOGGER.isInfoEnabled()) { LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId); } } catch (Throwable e) { if (conn != null) { try { conn.rollback(); } catch (SQLException rollbackEx) { LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx); } } throw new BranchTransactionException(BranchRollbackFailed_Retriable, String .format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid, branchId, e.getMessage()), e); } finally { try { if (rs != null) { rs.close(); } if (selectPST != null) { selectPST.close(); } if (conn != null) { if (originalAutoCommit) { conn.setAutoCommit(true); } conn.close(); } } catch (SQLException closeEx) { LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx); } } } }
備注:需要特別注意下,當(dāng)回滾的時(shí)候,發(fā)現(xiàn) undo log 不存在,需要往 undo_log 表新加一條記錄,避免因?yàn)?RM 在 TM 發(fā)出回滾請(qǐng)求后,又成功提交分支事務(wù)的場(chǎng)景。
5.3 Compressor 壓縮算法
Compressor 接口定義了壓縮算法的規(guī)范,用來壓縮文本,節(jié)省存儲(chǔ)空間:
public interface Compressor { /** * compress byte[] to byte[]. * @param bytes the bytes * @return the byte[] */ byte[] compress(byte[] bytes); /** * decompress byte[] to byte[]. * @param bytes the bytes * @return the byte[] */ byte[] decompress(byte[] bytes); }
目前已經(jīng)實(shí)現(xiàn)的壓縮算法有如下這些:
5.4 UndoLogParser 序列化算法
Serializer 接口定義了序列化算法的規(guī)范,用來序列化代碼:
public interface UndoLogParser { /** * Get the name of parser; * * @return the name of parser */ String getName(); /** * Get default context of this parser * * @return the default content if undo log is empty */ byte[] getDefaultContent(); /** * Encode branch undo log to byte array. * * @param branchUndoLog the branch undo log * @return the byte array */ byte[] encode(BranchUndoLog branchUndoLog); /** * Decode byte array to branch undo log. * * @param bytes the byte array * @return the branch undo log */ BranchUndoLog decode(byte[] bytes); }
目前已經(jīng)實(shí)現(xiàn)的序列化算法有如下這些:
5.5 Executor 執(zhí)行器
Executor 是 SQL 執(zhí)行的入口類, AT 在執(zhí)行 SQL 前后,需要管理 undo log 的 image 記錄,主要是構(gòu)建 undo log ,包括根據(jù)不同的業(yè)務(wù) SQL ,來組裝查詢 undo log 的 SQL 語句;執(zhí)行查詢 undo log 的 SQL ,獲取到鏡像記錄數(shù)據(jù);執(zhí)行插入 undo log 的邏輯(未提交事務(wù))。
public interface Executor{ /** * Execute t. * * @param args the args * @return the t * @throws Throwable the throwable */ T execute(Object... args) throws Throwable;}
針對(duì)不同的業(yè)務(wù) SQL ,有不同的 Executor 實(shí)現(xiàn),主要是因?yàn)椴煌僮?/ 不同數(shù)據(jù)庫類型的業(yè)務(wù) SQL ,生成 undo log 的 SQL 的邏輯不同,所以都分別重寫了 beforeImage () 和 afterImage () 方法。整體的繼承關(guān)系如下圖所示:
注:圖片建議在 PC 端查看
為了直觀地看到不同類型的 SQL 生成的 before image SQL 和 after iamge SQL ,這里做個(gè)梳理。假如目標(biāo)數(shù)據(jù)表的結(jié)構(gòu)如下:
public interface Executor{ /** * Execute t. * * @param args the args * @return the t * @throws Throwable the throwable */ T execute(Object... args) throws Throwable; }
注:圖片建議在 PC 端查看
5.6 AsyncWorker
AsyncWorker 是用來做異步執(zhí)行的,用來做分支事務(wù)提交和 undo log 記錄刪除等操作。
6、關(guān)于性能
并不存在某一種完美的分布式事務(wù)機(jī)制可以適應(yīng)所有場(chǎng)景,完美滿足所有需求。無論 AT 模式、TCC 模式還是 Saga 模式,本質(zhì)上都是對(duì) XA 規(guī)范在各種場(chǎng)景下安全性或者性能的不足的改進(jìn)。Seata 不同的事務(wù)模式是在一致性、可靠性、易用性、性能四個(gè)特性之間進(jìn)行不同的取舍。
近期 Seata 社區(qū)發(fā)現(xiàn)有同行,在未詳細(xì)分析 Java 版本 AT 模式的代碼的詳細(xì)實(shí)現(xiàn)的情況下,僅對(duì)某個(gè)早期的 Go 版本的 Seata 進(jìn)行短鏈接壓測(cè)后,質(zhì)疑 AT 模型的性能及其數(shù)據(jù)安全性,請(qǐng)具有一定思辨能力的用戶朋友們?cè)诮邮苓@個(gè)結(jié)論前仔細(xì)查閱其測(cè)試方法與測(cè)試對(duì)象,區(qū)分好 “李鬼” 與 “李逵”。
實(shí)際上,這個(gè)早期的 Go 版本實(shí)現(xiàn)僅參照了 Seata v1.4.0,且未嚴(yán)格把 Seata AT 模式的所有功能都予以實(shí)現(xiàn)。話說回來,即便其推崇的 Seata XA 模式,其也依賴于單 DB 的 XA 模式。而當(dāng)下最新版本的 MySQL XA 事務(wù)模式的 BUG 依然很多,這個(gè)地基并沒有其想象中的那樣百分百穩(wěn)固。
由阿里與螞蟻集團(tuán)共建的 Seata,是我們多年內(nèi)部分布式事務(wù)工程實(shí)踐與技術(shù)經(jīng)驗(yàn)的結(jié)晶,開源出來后得到了多達(dá) 150+ 以上行業(yè)同行生產(chǎn)環(huán)境的驗(yàn)證。開源大道既長(zhǎng)且寬,這個(gè)道路上可以有機(jī)動(dòng)車道也有非機(jī)動(dòng)車道,還可以有人行道,大家攜手把道路拓寬延長(zhǎng),而非站在人行道上宣傳機(jī)動(dòng)車道危險(xiǎn)性高且車速慢。
7、總結(jié)
Seata AT 模式依賴于各個(gè) DB 廠商的不同版本的 DB Driver(數(shù)據(jù)庫驅(qū)動(dòng)),每種數(shù)據(jù)庫發(fā)布新版本后,其 SQL 語義及其使用模式都可能發(fā)生改變。隨著近年 Seata 被其用戶們廣泛應(yīng)用于多種業(yè)務(wù)場(chǎng)景,在開發(fā)者們的努力下,Seata AT 模式保持了編程接口與其 XA 模式幾乎一致,適配了幾乎所有的主流數(shù)據(jù)庫,并覆蓋了這些數(shù)據(jù)庫的主要流行版本的 Driver:真正做到了把分布式系統(tǒng)的 “復(fù)雜性” 留在了框架層面,把易用性和高性能交給了用戶。
當(dāng)然,Seata Java 版本的 XA 和 AT 模式還有許多需要完善與改進(jìn)的地方,遑論其它多語言版本的實(shí)現(xiàn)。歡迎對(duì) Seata 及其多語言版本建設(shè)感興趣的同行參與到 Seata 的建設(shè)中來,共同努力把 Seata 打造成一個(gè)標(biāo)準(zhǔn)化分布式事務(wù)平臺(tái)。
審核編輯:劉清
-
PHP
+關(guān)注
關(guān)注
0文章
452瀏覽量
26690 -
JAVA語言
+關(guān)注
關(guān)注
0文章
138瀏覽量
20095 -
python
+關(guān)注
關(guān)注
56文章
4797瀏覽量
84691 -
CAP
+關(guān)注
關(guān)注
0文章
16瀏覽量
2092
原文標(biāo)題:Seata AT模式代碼級(jí)詳解
文章出處:【微信號(hào):OSC開源社區(qū),微信公眾號(hào):OSC開源社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論