Apache ShardingSphere 是一套開源的分布式數據庫中間件解決方案組成的生態圈,它由 JDBC、Proxy 和 Sidecar(規劃中)這 3 款相互獨立,卻又能夠混合部署配合使用的產品組成。它們均提供標準化的數據分片、分布式事務和數據庫治理功能,可適用于如 Java 同構、異構語言、云原生等各種多樣化的應用場景。ShardingSphere 已于2020年4月16日成為 Apache 軟件基金會的頂級項目。
分布式系統CAP理論
一致性(Consistency) 可用性(Availability) 分區容錯性(Partition tolerance ) X/Open DTP模型與XA規范 X/Open,即現在的open group,是一個獨立的組織,主要負責制定各種行業技術標準。官網地址:http://www.opengroup.org/。X/Open組織主要由各大知名公司或者廠商進行支持,這些組織不光遵循X/Open組織定義的行業技術標準,也參與到標準的制定。下圖展示了open group目前主要成員(官網截圖):
DTP模型
應用程序(Application Program ,簡稱AP):用于定義事務邊界(即定義事務的開始和結束),并且在事務邊界內對資源進行操作。
資源管理器(Resource Manager,簡稱RM,一般也稱為事務參與者):如數據庫、文件系統等,并提供訪問資源的方式。
事務管理器(Transaction Manager ,簡稱TM,一般也稱為事務協調者):負責分配事務唯一標識,監控事務的執行進度,并負責事務的提交、回滾等。
XA規范
這里的接口規范特別多,我們只要來講講幾個最重要的。
xa_start : 在 RM端調用此接口開啟一個XA事務,后面需要接上XID 作為參數。
xa_end : 取消當前線程與事務的關聯, 與 xa_start是配對使用。
xa_prepare : 詢問RM 是否已經準備好了提交事務。
xa_commit : 通知RM 提交事務分支。
xa_rollback : 通知RM 提交回滾事務分支。
XA二階段提交 階段一 :TM通知各個RM準備提交它們的事務分支。如果RM判斷自己進行的工作可以被提交,那就就對工作內容進行持久化,再給TM肯定答復;要是發生了其他情況,那給TM的都是否定答復。在發送了否定答復并回滾了已經的工作后,RM就可以丟棄這個事務分支信息。
階段二 :TM根據階段1各個RM prepare的結果,決定是提交還是回滾事務。如果所有的RM都prepare成功,那么TM通知所有的RM進行提交;如果有RM prepare失敗的話,則TM通知所有RM回滾自己的事務分支。
MySQL對XA協議的支持 MySQL 從5.0.3開始支持XA分布式事務,且只有InnoDB存儲引擎支持XA事務。MySQL 在DTP模型中也是屬于資源管理器RM。
MySQL XA 事務的 SQL語法 XA?START?xid????//開啟XA事務,xid是一個唯一值,表示事務分支標識符
XA?END?xid??//結束一個XA事務,
XA?PREPARE?xid?準備提交
XA COMMIT xid [ONE PHASE]?//提交事務。兩階段提交協議中,如果只有一個RM參與,那么可以優化為一階段提交
XA?ROLLBACK?xid??//回滾
XA?RECOVER?[CONVERT?XID]??//列出所有處于PREPARE階段的XA事務
MySQL xid詳解 mysql中使用xid來作為一個事務分支的標識符。通過C語言進行描述,如下:
/?
??Transaction?branch?identification:?XID?and?NULLXID:
?/
#define?XIDDATASIZE?128??/??size?in?bytes??/
#define?MAXGTRIDSIZE?64??/??maximum?size?in?bytes?of?gtrid??/
#define?MAXBQUALSIZE?64??/??maximum?size?in?bytes?of?bqual??/
struct?xid_t?{long?formatID;?????/*?format?identifier?*/long?gtrid_length;?/*?value?1-64?*/long?bqual_length;?/*?value?1-64?*/char?data[XIDDATASIZE];};
/?
??A?value?of?-1?in?formatID?means?that?the?XID?is?null.
?/
typedef?struct?xid_t?XID;
/?
??Declarations?of?routines?by?which?RMs?call?TMs:
?/
extern?int?ax_reg(int,?XID??,?long);
extern?int?ax_unreg(int,?long);
gtrid :全局事務標識符(global transaction identifier),最大不能超過64字節。
bqual :分支限定符(branch qualifier),最大不能超過64字節。
formatId:記錄gtrid、bqual的格式,類似于memcached中flags字段的作用。
data :xid的值,其是 gtrid和bqual拼接后的內容。。
MySQL XA事務狀態
JTA規范 JTA(Java Transaction API):為J2EE平臺提供了分布式事務服務(distributed transaction)的能力。某種程度上,可以認為JTA規范是XA規范的Java版,其把XA規范中規定的DTP模型交互接口抽象成Java接口中的方法,并規定每個方法要實現什么樣的功能。
JTA 定義的接口 javax.transaction.TransactionManager : 事務管理器,負責事務的begin, commit,rollback 等命令。
javax.transaction.UserTransaction:用于聲明一個分布式事務。
javax.transaction.TransactionSynchronizationRegistry:事務同步注冊
javax.transaction.xa.XAResource:定義RM提供給TM操作的接口
javax.transaction.xa.Xid:事務xid接口。
TM provider: RM provider: ShardingSphere對XA分布式事務的支持 ShardingSphere針對XA分布式事務的接口以及JTA規范,提供了標準的,基于SPI實現的org.apache.shardingsphere.transaction.spi.ShardingTransactionManager。
public?interface?ShardingTransactionManager?extends?AutoCloseable?{/***?Initialize?sharding?transaction?manager.**?@param?databaseType?database?type*?@param?resourceDataSources?resource?data?sources*/void?init(DatabaseType?databaseType,?Collection<ResourceDataSource>?resourceDataSources);/***?Get?transaction?type.**?@return?transaction?type*/TransactionType?getTransactionType();/***?Judge?is?in?transaction?or?not.**?@return?in?transaction?or?not*/boolean?isInTransaction();/***?Get?transactional?connection.**?@param?dataSourceName?data?source?name*?@return?connection*?@throws?SQLException?SQL?exception*/Connection?getConnection(String?dataSourceName)?throws?SQLException;/***?Begin?transaction.*/void?begin();/***?Commit?transaction.*/void?commit();/***?Rollback?transaction.*/void?rollback();
}
對于XA分布式事務的支持的具體實現類為 :org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager。在此類中,會調用基于SPI實現的org.apache.shardingsphere.transaction.xa.spi.XATransactionManager,來進行XA事務的管理操作。
總結 我們了解了分布式事務的CAP理論,了解了X/Open的DTP模型,以及XA的接口規范,MySQL對XA協議的支持。最好我們講解了JTA的規范,以及ShardingSphere對XA事務進行整合的時候定義的SPI接口,這些都是很重要的理論基礎,接下來,我們將詳細來講解基于AtomkikosXATransactionManager的具體實現,以及源碼解析。
Shardingsphere整合Atomikos對XA分布式事務的源碼解析 Atomikos(https://www.atomikos.com/),其實是一家公司的名字,提供了基于JTA規范的XA分布式事務TM的實現。其旗下最著名的產品就是事務管理器。產品分兩個版本:
這兩個產品的關系如下圖所示:
ExtremeTransactions在TransactionEssentials的基礎上額外提供了以下功能(重要的):
org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager詳解 我們簡單的來回顧下org.apache.shardingsphere.transaction.spi.ShardingTransactionManager
public?interface?ShardingTransactionManager?extends?AutoCloseable?{/***?Initialize?sharding?transaction?manager.**?@param?databaseType?database?type*?@param?resourceDataSources?resource?data?sources*/void?init(DatabaseType?databaseType,?Collection<ResourceDataSource>?resourceDataSources);/***?Get?transaction?type.**?@return?transaction?type*/TransactionType?getTransactionType();/***?Judge?is?in?transaction?or?not.**?@return?in?transaction?or?not*/boolean?isInTransaction();/***?Get?transactional?connection.**?@param?dataSourceName?data?source?name*?@return?connection*?@throws?SQLException?SQL?exception*/Connection?getConnection(String?dataSourceName)?throws?SQLException;/***?Begin?transaction.*/void?begin();/***?Commit?transaction.*/void?commit();/***?Rollback?transaction.*/void?rollback();
}
我們重點先關注init方法,從它的命名,你就應該能夠看出來,這是整個框架的初始化方法,讓我們來看看它是如何進行初始化的。
private?final?Map<String,?XATransactionDataSource>?cachedDataSources?=?new?HashMap<>();private?final?XATransactionManager?xaTransactionManager?=?XATransactionManagerLoader.getInstance().getTransactionManager();@Overridepublic?void?init(final?DatabaseType?databaseType,?final?Collection<ResourceDataSource>?resourceDataSources)?{for?(ResourceDataSource?each?:?resourceDataSources)?{cachedDataSources.put(each.getOriginalName(),?new?XATransactionDataSource(databaseType,?each.getUniqueResourceName(),?each.getDataSource(),?xaTransactionManager));}xaTransactionManager.init();}
首先SPI的方式加載XATransactionManager的具體實現類,這里返回的就是org.apache.shardingsphere.transaction.xa.atomikos.manager.AtomikosTransactionManager。
我們在關注下 new XATransactionDataSource() , 進入 org.apache.shardingsphere.transaction.xa.jta.datasource。XATransactionDataSource類的構造方法。
public?XATransactionDataSource(final?DatabaseType?databaseType,?final?String?resourceName,?final?DataSource?dataSource,?final?XATransactionManager?xaTransactionManager)?{this.databaseType?=?databaseType;this.resourceName?=?resourceName;this.dataSource?=?dataSource;if?(!CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName()))?{//?重點關注?1?,返回了xaDatasourcexaDataSource?=?XADataSourceFactory.build(databaseType,?dataSource);this.xaTransactionManager?=?xaTransactionManager;//?重點關注2?注冊資源xaTransactionManager.registerRecoveryResource(resourceName,?xaDataSource);}}
public?final?class?XADataSourceFactory?{public?static?XADataSource?build(final?DatabaseType?databaseType,?final?DataSource?dataSource)?{return?new?DataSourceSwapper(XADataSourceDefinitionFactory.getXADataSourceDefinition(databaseType)).swap(dataSource);}
}
?public?XADataSource?swap(final?DataSource?dataSource)?{XADataSource?result?=?createXADataSource();setProperties(result,?getDatabaseAccessConfiguration(dataSource));return?result;}
很簡明,第一步創建,XADataSource,第二步給它設置屬性(包含數據的連接,用戶名密碼等),然后返回。
返回 XATransactionDataSource 類,關注xaTransactionManager.registerRecoveryResource(resourceName, xaDataSource); 從名字可以看出,這是注冊事務恢復資源。這個我們在事務恢復的時候詳解。
返回 XAShardingTransactionManager.init() ,我們重點來關注:xaTransactionManager.init();,最后進入AtomikosTransactionManager.init()。流程圖如下:
代碼:
public?final?class?AtomikosTransactionManager?implements?XATransactionManager?{private?final?UserTransactionManager?transactionManager?=?new?UserTransactionManager();private?final?UserTransactionService?userTransactionService?=?new?UserTransactionServiceImp();@Overridepublic?void?init()?{userTransactionService.init();}}
private?void?initialize()?{//添加恢復資源?不用關心for?(RecoverableResource?resource?:?resources_)?{Configuration.addResource?(?resource?);}for?(LogAdministrator?logAdministrator?:?logAdministrators_)?{Configuration.addLogAdministrator?(?logAdministrator?);}//注冊插件?不用關心for?(TransactionServicePlugin?nxt?:?tsListeners_)?{Configuration.registerTransactionServicePlugin?(?nxt?);}//獲取配置屬性?重點關心ConfigProperties?configProps?=?Configuration.getConfigProperties();configProps.applyUserSpecificProperties(properties_);//進行初始化Configuration.init();}
?@Overridepublic?ConfigProperties?initializeProperties()?{//讀取classpath下的默認配置transactions-defaults.propertiesProperties?defaults?=?new?Properties();loadPropertiesFromClasspath(defaults,?DEFAULT_PROPERTIES_FILE_NAME);//讀取classpath下,transactions.properties配置,覆蓋transactions-defaults.properties中相同key的值Properties?transactionsProperties?=?new?Properties(defaults);loadPropertiesFromClasspath(transactionsProperties,?TRANSACTIONS_PROPERTIES_FILE_NAME);//讀取classpath下,jta.properties,覆蓋transactions-defaults.properties、transactions.properties中相同key的值Properties?jtaProperties?=?new?Properties(transactionsProperties);loadPropertiesFromClasspath(jtaProperties,?JTA_PROPERTIES_FILE_NAME);//讀取通過java?-Dcom.atomikos.icatch.file方式指定的自定義配置文件路徑,覆蓋之前的同名配置Properties?customProperties?=?new?Properties(jtaProperties);loadPropertiesFromCustomFilePath(customProperties);//最終構造一個ConfigProperties對象,來表示實際要使用的配置Properties?finalProperties?=?new?Properties(customProperties);return?new?ConfigProperties(finalProperties);}
ublic?static?synchronized?boolean?init()?{boolean?startupInitiated?=?false;if?(service_?==?null)?{startupInitiated?=?true;//SPI方式加載插件注冊,無需過多關心addAllTransactionServicePluginServicesFromClasspath();ConfigProperties?configProperties?=?getConfigProperties();//調用插件的beforeInit方法進行初始化話,無需過多關心notifyBeforeInit(configProperties);//進行事務日志恢復的初始化,很重要,接下來詳解assembleSystemComponents(configProperties);//進入系統注解的初始化,一般重要initializeSystemComponents(configProperties);notifyAfterInit();if?(configProperties.getForceShutdownOnVmExit())?{addShutdownHook(new?ForceShutdownHook());}}return?startupInitiated;}
@Overridepublic?TransactionServiceProvider?assembleTransactionService(ConfigProperties?configProperties)?{RecoveryLog?recoveryLog?=null;//打印日志logProperties(configProperties.getCompletedProperties());//生成唯一名字String?tmUniqueName?=?configProperties.getTmUniqueName();long?maxTimeout?=?configProperties.getMaxTimeout();int?maxActives?=?configProperties.getMaxActives();boolean?threaded2pc?=?configProperties.getThreaded2pc();//SPI方式加載OltpLog?,這是最重要的擴展地方,如果用戶沒有SPI的方式去擴展那么就為nullOltpLog?oltpLog?=?createOltpLogFromClasspath();if?(oltpLog?==?null)?{LOGGER.logInfo("Using?default?(local)?logging?and?recovery...");//創建事務日志存儲資源Repository?repository?=?createRepository(configProperties);oltpLog?=?createOltpLog(repository);//????Assemble?recoveryLogrecoveryLog?=?createRecoveryLog(repository);}StateRecoveryManagerImp?recoveryManager?=?new?StateRecoveryManagerImp();recoveryManager.setOltpLog(oltpLog);//生成唯一id生成器,以后生成XID會用的到UniqueIdMgr?idMgr?=?new?UniqueIdMgr?(?tmUniqueName?);int?overflow?=?idMgr.getMaxIdLengthInBytes()?-?MAX_TID_LENGTH;if?(?overflow?>?0?)?{//?see?case?73086String?msg?=?"Value?too?long?:?"?+?tmUniqueName;LOGGER.logFatal?(?msg?);throw?new?SysException(msg);}return?new?TransactionServiceImp(tmUniqueName,?recoveryManager,?idMgr,?maxTimeout,?maxActives,?!threaded2pc,?recoveryLog);}
private?OltpLog?createOltpLogFromClasspath()?{OltpLog?ret?=?null;ServiceLoader<OltpLogFactory>?loader?=?ServiceLoader.load(OltpLogFactory.class,Configuration.class.getClassLoader());int?i?=?0;for?(OltpLogFactory?l?:?loader?)?{ret?=?l.createOltpLog();i++;}if?(i?>?1)?{String?msg?=?"More?than?one?OltpLogFactory?found?in?classpath?-?error?in?configuration!";LOGGER.logFatal(msg);throw?new?SysException(msg);}return?ret;}
?private?CachedRepository?createCoordinatorLogEntryRepository(ConfigProperties?configProperties)?throws?LogException?{//創建內存資源存儲InMemoryRepository?inMemoryCoordinatorLogEntryRepository?=?new?InMemoryRepository();//進行初始化inMemoryCoordinatorLogEntryRepository.init();//創建使用文件存儲資源作為backupFileSystemRepository?backupCoordinatorLogEntryRepository?=?new?FileSystemRepository();//進行初始化backupCoordinatorLogEntryRepository.init();//內存與file資源進行合并CachedRepository?repository?=?new?CachedRepository(inMemoryCoordinatorLogEntryRepository,?backupCoordinatorLogEntryRepository);repository.init();return?repository;}
這里就會創建出 CachedRepository,里面包含了 ?InMemoryRepository 與 FileSystemRepository
回到主線 com.atomikos.icatch.config.Configuration.init(), 最后來分析下notifyAfterInit();
?private?static?void?notifyAfterInit()?{//進行插件的初始化for?(TransactionServicePlugin?p?:?tsListenersList_)?{p.afterInit();}for?(LogAdministrator?a?:?logAdministrators_)?{a.registerLogControl(service_.getLogControl());}//設置事務恢復服務,進行事務的恢復for?(RecoverableResource?r?:?resourceList_?)?{r.setRecoveryService(recoveryService_);}}
?public?void?afterInit()?{TransactionManagerImp.installTransactionManager(Configuration.getCompositeTransactionManager(),?autoRegisterResources);//如果我們自定義擴展了?OltpLog?,這里就會返回null,如果是null,那么XaResourceRecoveryManager就是nullRecoveryLog?recoveryLog?=?Configuration.getRecoveryLog();long?maxTimeout?=?Configuration.getConfigProperties().getMaxTimeout();if?(recoveryLog?!=?null)?{XaResourceRecoveryManager.installXaResourceRecoveryManager(new?DefaultXaRecoveryLog(recoveryLog,?maxTimeout),Configuration.getConfigProperties().getTmUniqueName());}}
重點注意 RecoveryLog recoveryLog = Configuration.getRecoveryLog(); ,如果用戶采用SPI的方式,擴展了com.atomikos.recovery.OltpLog ,這里就會返回 null。如果是null,則不會對 XaResourceRecoveryManager 進行初始化。
回到 notifyAfterInit(), 我們來分析 setRecoveryService。
public?void?setRecoveryService?(?RecoveryService?recoveryService?)throws?ResourceException{if?(?recoveryService?!=?null?)?{if?(?LOGGER.isTraceEnabled()?)?LOGGER.logTrace?(?"Installing?recovery?service?on?resource?"+?getName?()?);this.branchIdentifier=recoveryService.getName();recover();}}
?public?void?recover()?{XaResourceRecoveryManager?xaResourceRecoveryManager?=?XaResourceRecoveryManager.getInstance();//null?for?LogCloud?recoveryif?(xaResourceRecoveryManager?!=?null)?{try?{xaResourceRecoveryManager.recover(getXAResource());}?catch?(Exception?e)?{refreshXAResource();?//cf?case?156968}}}
到這里atomikos的基本的初始化已經完成。
atomikos事務begin流程 我們知道,本地的事務,都會有一個 trainsaction.begin, 對應XA分布式事務來說也不另外,我們再把思路切換回XAShardingTransactionManager.begin(), 會調用com.atomikos.icatch.jta.TransactionManagerImp.begin()。流程圖如下:
代碼:
??public?void?begin?(?int?timeout?)?throws?NotSupportedException,SystemException{CompositeTransaction?ct?=?null;ResumePreviousTransactionSubTxAwareParticipant?resumeParticipant?=?null;ct?=?compositeTransactionManager.getCompositeTransaction();if?(?ct?!=?null?&&?ct.getProperty?(??JTA_PROPERTY_NAME?)?==?null?)?{LOGGER.logWarning?(?"JTA:?temporarily?suspending?incompatible?transaction:?"?+?ct.getTid()?+"?(will?be?resumed?after?JTA?transaction?ends)"?);ct?=?compositeTransactionManager.suspend();resumeParticipant?=?new?ResumePreviousTransactionSubTxAwareParticipant?(?ct?);}try?{//創建事務補償點ct?=?compositeTransactionManager.createCompositeTransaction?(?(?(?long?)?timeout?)?*?1000?);if?(?resumeParticipant?!=?null?)?ct.addSubTxAwareParticipant?(?resumeParticipant?);if?(?ct.isRoot?()?&&?getDefaultSerial?()?)ct.setSerial?();ct.setProperty?(?JTA_PROPERTY_NAME?,?"true"?);}?catch?(?SysException?se?)?{String?msg?=?"Error?in?begin()";LOGGER.logError(?msg?,?se?);throw?new?ExtendedSystemException?(?msg?,?se?);}recreateCompositeTransactionAsJtaTransaction(ct);}
public?CompositeTransaction?createCompositeTransaction?(?long?timeout?)?throws?SysException{CompositeTransaction?ct?=?null?,?ret?=?null;ct?=?getCurrentTx?();if?(?ct?==?null?)?{ret?=?getTransactionService().createCompositeTransaction?(?timeout?);if(LOGGER.isDebugEnabled()){LOGGER.logDebug("createCompositeTransaction?(?"?+?timeout?+?"?):?"+?"created?new?ROOT?transaction?with?id?"?+?ret.getTid?());}}?else?{if(LOGGER.isDebugEnabled())?LOGGER.logDebug("createCompositeTransaction?(?"?+?timeout?+?"?)");ret?=?ct.createSubTransaction?();}Thread?thread?=?Thread.currentThread?();setThreadMappings?(?ret,?thread?);return?ret;}
到這里atomikos的事務begin流程已經完成。大家可能有些疑惑,begin好像什么都沒有做,XA start 也沒調用?別慌,下一節繼續來講。
XATransactionDataSource getConnection() 流程 我們都知道想要執行SQL語句,必須要獲取到數據庫的connection。讓我們再回到 XAShardingTransactionManager.getConnection() 最后會調用到org.apache.shardingsphere.transaction.xa.jta.datasourceXATransactionDataSource.getConnection()。流程圖如下:
代碼 :
?public?Connection?getConnection()?throws?SQLException,?SystemException,?RollbackException?{//先檢查是否已經有存在的connection,這一步很關心,也是XA的關鍵,因為XA事務,必須在同一個connectionif?(CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName()))?{return?dataSource.getConnection();}//獲取數據庫連接Connection?result?=?dataSource.getConnection();//轉成XAConnection,其實是同一個連接XAConnection?xaConnection?=?XAConnectionFactory.createXAConnection(databaseType,?xaDataSource,?result);//獲取JTA事務定義接口Transaction?transaction?=?xaTransactionManager.getTransactionManager().getTransaction();if?(!enlistedTransactions.get().contains(transaction))?{//進行資源注冊transaction.enlistResource(new?SingleXAResource(resourceName,?xaConnection.getXAResource()));transaction.registerSynchronization(new?Synchronization()?{@Overridepublic?void?beforeCompletion()?{enlistedTransactions.get().remove(transaction);}@Overridepublic?void?afterCompletion(final?int?status)?{enlistedTransactions.get().clear();}});enlistedTransactions.get().add(transaction);}return?result;}
首先第一步很關心,尤其是對shardingsphere來說,因為在一個事務里面,會有多個SQL語句,打到相同的數據庫,所以對相同的數據庫,必須獲取同一個XAConnection,這樣才能進行XA事務的提交與回滾。
我們接下來關心 transaction.enlistResource(new SingleXAResource(resourceName, xaConnection.getXAResource()));, 會進入com.atomikos.icatch.jta.TransactionImp.enlistResource(), 代碼太長,截取一部分。
try?{restx?=?(XAResourceTransaction)?res.getResourceTransaction(this.compositeTransaction);//?next,?we?MUST?set?the?xa?resource?again,//?because?ONLY?the?instance?we?got?as?argument//?is?available?for?use?now?!//?older?instances?(set?in?restx?from?previous?sibling)//?have?connections?that?may?be?in?reuse?already//?->old?xares?not?valid?except?for?2pc?operationsrestx.setXAResource(xares);restx.resume();}?catch?(ResourceException?re)?{throw?new?ExtendedSystemException("Unexpected?error?during?enlist",?re);}?catch?(RuntimeException?e)?{throw?e;}addXAResourceTransaction(restx,?xares);
public?synchronized?void?resume()?throws?ResourceException?{int?flag?=?0;String?logFlag?=?"";if?(this.state.equals(TxState.LOCALLY_DONE))?{//?reused?instanceflag?=?XAResource.TMJOIN;logFlag?=?"XAResource.TMJOIN";}?else?if?(!this.knownInResource)?{//?new?instanceflag?=?XAResource.TMNOFLAGS;logFlag?=?"XAResource.TMNOFLAGS";}?elsethrow?new?IllegalStateException("Wrong?state?for?resume:?"+?this.state);try?{if?(LOGGER.isDebugEnabled())?{LOGGER.logDebug("XAResource.start?(?"?+?this.xidToHexString+?"?,?"?+?logFlag?+?"?)?on?resource?"+?this.resourcename+?"?represented?by?XAResource?instance?"+?this.xaresource);}this.xaresource.start(this.xid,?flag);}?catch?(XAException?xaerr)?{String?msg?=?interpretErrorCode(this.resourcename,?"resume",this.xid,?xaerr.errorCode);LOGGER.logWarning(msg,?xaerr);throw?new?ResourceException(msg,?xaerr);}setState(TxState.ACTIVE);this.knownInResource?=?true;}
?public?void?start(Xid?xid,?int?flags)?throws?XAException?{StringBuilder?commandBuf?=?new?StringBuilder(300);commandBuf.append("XA?START?");appendXid(commandBuf,?xid);switch(flags)?{case?0:break;case?2097152:commandBuf.append("?JOIN");break;case?134217728:commandBuf.append("?RESUME");break;default:throw?new?XAException(-5);}this.dispatchCommand(commandBuf.toString());this.underlyingConnection.setInGlobalTx(true);}
到這里,我們總結下,在獲取數據庫連接的時候,我們執行了XA協議接口中的 XA start xid
atomikos事務commit流程 好了,上面我們已經開啟了事務,現在我們來分析下事務commit流程,我們再把視角切換回XAShardingTransactionManager.commit(),最后我們會進入com.atomikos.icatch.imp.CompositeTransactionImp.commit() 方法。流程圖如下:
代碼:
?public?void?commit?()?throws?HeurRollbackException,?HeurMixedException,HeurHazardException,?SysException,?SecurityException,RollbackException{//首先更新下事務日志的狀態doCommit?();setSiblingInfoForIncoming1pcRequestFromRemoteClient();if?(?isRoot?()?)?{//真正的commit操作coordinator.terminate?(?true?);}}
?protected?void?terminate?(?boolean?commit?)?throws?HeurRollbackException,HeurMixedException,?SysException,?java.lang.SecurityException,HeurCommitException,?HeurHazardException,?RollbackException,IllegalStateException{synchronized?(?fsm_?)?{if?(?commit?)?{//判斷有幾個參與者,如果只有一個,直接提交if?(?participants_.size?()?<=?1?)?{commit?(?true?);}?else?{//否則,走XA?2階段提交流程,先prepare,?再提交int?prepareResult?=?prepare?();//?make?sure?to?only?do?commit?if?NOT?read?onlyif?(?prepareResult?!=?Participant.READ_ONLY?)commit?(?false?);}}?else?{rollback?();}}}
首先會判斷參與者的個數,這里我們可以理解為MySQL的database數量,如果只有一個,退化成一階段,直接提交。如果有多個,則走標準的XA二階段提交流程。
我們來看 prepare (); 流程,最后會走到com.atomikos.icatch.imp.PrepareMessage.send() ---> com.atomikos.datasource.xa.XAResourceTransaction.prepare()
int?ret?=?0;terminateInResource();if?(TxState.ACTIVE?==?this.state)?{//?tolerate?non-delisting?apps/serverssuspend();}//?duplicate?prepares?can?happen?for?siblings?in?serial?subtxs!!!//?in?that?case,?the?second?prepare?just?returns?READONLYif?(this.state?==?TxState.IN_DOUBT)return?Participant.READ_ONLY;else?if?(!(this.state?==?TxState.LOCALLY_DONE))throw?new?SysException("Wrong?state?for?prepare:?"?+?this.state);try?{//?refresh?xaresource?for?MQSeries:?seems?to?close?XAResource?after//?suspend???testOrRefreshXAResourceFor2PC();if?(LOGGER.isTraceEnabled())?{LOGGER.logTrace("About?to?call?prepare?on?XAResource?instance:?"+?this.xaresource);}ret?=?this.xaresource.prepare(this.xid);}?catch?(XAException?xaerr)?{String?msg?=?interpretErrorCode(this.resourcename,?"prepare",this.xid,?xaerr.errorCode);if?(XAException.XA_RBBASE?<=?xaerr.errorCode&&?xaerr.errorCode?<=?XAException.XA_RBEND)?{LOGGER.logWarning(msg,?xaerr);?//?see?case?84253throw?new?RollbackException(msg);}?else?{LOGGER.logError(msg,?xaerr);throw?new?SysException(msg,?xaerr);}}setState(TxState.IN_DOUBT);if?(ret?==?XAResource.XA_RDONLY)?{if?(LOGGER.isDebugEnabled())?{LOGGER.logDebug("XAResource.prepare?(?"?+?this.xidToHexString+?"?)?returning?XAResource.XA_RDONLY?"?+?"on?resource?"+?this.resourcename+?"?represented?by?XAResource?instance?"+?this.xaresource);}return?Participant.READ_ONLY;}?else?{if?(LOGGER.isDebugEnabled())?{LOGGER.logDebug("XAResource.prepare?(?"?+?this.xidToHexString+?"?)?returning?OK?"?+?"on?resource?"+?this.resourcename+?"?represented?by?XAResource?instance?"+?this.xaresource);}return?Participant.READ_ONLY?+?1;}
public?synchronized?void?suspend()?throws?ResourceException?{//?BugzID:?20545//?State?may?be?IN_DOUBT?or?TERMINATED?when?a?connection?is?closed?AFTER//?commit!//?In?that?case,?don't?call?END?again,?and?also?don't?generate?any//?error!//?This?is?required?for?some?hibernate?connection?release?strategies.if?(this.state.equals(TxState.ACTIVE))?{try?{if?(LOGGER.isDebugEnabled())?{LOGGER.logDebug("XAResource.end?(?"?+?this.xidToHexString+?"?,?XAResource.TMSUCCESS?)?on?resource?"+?this.resourcename+?"?represented?by?XAResource?instance?"+?this.xaresource);}//執行了?xa?end?語句this.xaresource.end(this.xid,?XAResource.TMSUCCESS);}?catch?(XAException?xaerr)?{String?msg?=?interpretErrorCode(this.resourcename,?"end",this.xid,?xaerr.errorCode);if?(LOGGER.isTraceEnabled())LOGGER.logTrace(msg,?xaerr);//?don't?throw:?fix?for?case?102827}setState(TxState.LOCALLY_DONE);}}
到了這里,我們已經執行了 XA start xid -> XA end xid --> XA prepare xid, 接下來就是最后一步 commit
//繁雜代碼過多,就顯示核心的
this.xaresource.commit(this.xid,?onePhase);
思考:這里的參與者提交是在一個循環里面,一個一個提交的,如果之前的提交了,后面的參與者提交的時候,掛了,就會造成數據的不一致性。
Atomikos rollback() 流程
上面我們已經分析了commit流程,其實rollback流程和commit流程一樣,我們在把目光切換回 org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager.rollback() ,最后會執行到com.atomikos.icatch.imp.CompositeTransactionImp.rollback()。
????public?void?rollback?()?throws?IllegalStateException,?SysException{//清空資源,更新事務日志狀態等doRollback?();if?(?isRoot?()?)?{try?{coordinator.terminate?(?false?);}?catch?(?Exception?e?)?{throw?new?SysException?(?"Unexpected?error?in?rollback:?"?+?e.getMessage?(),?e?);}}}
?protected?void?terminate?(?boolean?commit?)?throws?HeurRollbackException,HeurMixedException,?SysException,?java.lang.SecurityException,HeurCommitException,?HeurHazardException,?RollbackException,IllegalStateException{synchronized?(?fsm_?)?{if?(?commit?)?{if?(?participants_.size?()?<=?1?)?{commit?(?true?);}?else?{int?prepareResult?=?prepare?();//?make?sure?to?only?do?commit?if?NOT?read?onlyif?(?prepareResult?!=?Participant.READ_ONLY?)commit?(?false?);}}?else?{//如果是false,走的是rollbackrollback?();}}}
public?synchronized?void?rollback()throws?HeurCommitException,?HeurMixedException,HeurHazardException,?SysException?{terminateInResource();if?(rollbackShouldDoNothing())?{return;}if?(this.state.equals(TxState.TERMINATED))?{return;}if?(this.state.equals(TxState.HEUR_MIXED))throw?new?HeurMixedException();if?(this.state.equals(TxState.HEUR_COMMITTED))throw?new?HeurCommitException();if?(this.xaresource?==?null)?{throw?new?HeurHazardException("XAResourceTransaction?"+?getXid()?+?":?no?XAResource?to?rollback?");}try?{if?(this.state.equals(TxState.ACTIVE))?{?//?first?suspend?xidsuspend();}//?refresh?xaresource?for?MQSeries:?seems?to?close?XAResource?after//?suspend???testOrRefreshXAResourceFor2PC();if?(LOGGER.isDebugEnabled())?{LOGGER.logDebug("XAResource.rollback?(?"?+?this.xidToHexString+?"?)?"?+?"on?resource?"?+?this.resourcename+?"?represented?by?XAResource?instance?"+?this.xaresource);}this.xaresource.rollback(this.xid);
先在supend()方法里面執行了 XA end xid 語句, 接下來執行 this.xaresource.rollback(this.xid); 進行數據的回滾。
Atomikos-recover 流程 說事務恢復流程之前,我們來討論下,會啥會出現事務恢復?XA二階段提交協議不是強一致性的嗎?要解答這個問題,我們就要來看看XA二階段協議有什么問題?
問題一 :單點故障 由于協調者的重要性,一旦協調者TM發生故障。參與者RM會一直阻塞下去。尤其在第二階段,協調者發生故障,那么所有的參與者還都處于鎖定事務資源的狀態中,而無法繼續完成事務操作。(如果是協調者掛掉,可以重新選舉一個協調者,但是無法解決因為協調者宕機導致的參與者處于阻塞狀態的問題)
問題二 :數據不一致 數據不一致。在二階段提交的階段二中,當協調者向參與者發送commit請求之后,發生了局部網絡異常或者在發送commit請求過程中協調者發生了故障,這會導致只有一部分參與者接受到了commit請求。而在這部分參與者接到commit請求之后就會執行commit操作。但是其他部分未接到commit請求的機器則無法執行事務提交。于是整個分布式系統便出現了數據不一致性的現象。
如何解決? 解決的方案簡單,就是我們在事務的操作的每一步,我們都需要對事務狀態的日志進行人為的記錄,我們可以把日志記錄存儲在我們想存儲的地方,可以是本地存儲,也可以中心化的存儲。atomikos的開源版本,我們之前也分析了,它是使用內存 + file的方式,存儲在本地,這樣的話,如果在一個集群系統里面,如果有節點宕機,日志又存儲在本地,所以事務不能及時的恢復(需要重啟服務)。
Atomikos 多場景下事務恢復。 Atomikos 提供了二種方式,來應對不同場景下的異常情況。
public?synchronized?void?init?(?Properties?properties?)?throws?SysException{shutdownInProgress_?=?false;control_?=?new?com.atomikos.icatch.admin.imp.LogControlImp?(?(AdminLog)?this.recoveryLog?);ConfigProperties?configProperties?=?new?ConfigProperties(properties);long?recoveryDelay?=?configProperties.getRecoveryDelay();recoveryTimer?=?new?PooledAlarmTimer(recoveryDelay);recoveryTimer.addAlarmTimerListener(new?AlarmTimerListener()?{@Overridepublic?void?alarm(AlarmTimer?timer)?{//進行事務恢復performRecovery();}});TaskManager.SINGLETON.executeTask(recoveryTimer);initialized_?=?true;}
???public?void?recover()?{XaResourceRecoveryManager?xaResourceRecoveryManager?=?XaResourceRecoveryManager.getInstance();if?(xaResourceRecoveryManager?!=?null)?{?//null?for?LogCloud?recoverytry?{xaResourceRecoveryManager.recover(getXAResource());}?catch?(Exception?e)?{refreshXAResource();?//cf?case?156968}}}
?@Overridepublic?void?setRecoveryService?(?RecoveryService?recoveryService?)throws?ResourceException{if?(?recoveryService?!=?null?)?{if?(?LOGGER.isTraceEnabled()?)?LOGGER.logTrace?(?"Installing?recovery?service?on?resource?"+?getName?()?);this.branchIdentifier=recoveryService.getName();//進行事務恢復recover();}}
com.atomikos.datasource.xa.XATransactionalResource.recover() 流程詳解。
主代碼:
?public?void?recover(XAResource?xaResource)?throws?XAException?{//?根據XA?recovery?協議獲取?xidList<XID>?xidsToRecover?=?retrievePreparedXidsFromXaResource(xaResource);Collection<XID>?xidsToCommit;try?{//?xid?與日志記錄的xid進行匹配xidsToCommit?=?retrieveExpiredCommittingXidsFromLog();for?(XID?xid?:?xidsToRecover)?{if?(xidsToCommit.contains(xid))?{//執行?XA?commit?xid?進行提交replayCommit(xid,?xaResource);}?else?{attemptPresumedAbort(xid,?xaResource);}}}?catch?(LogException?couldNotRetrieveCommittingXids)?{LOGGER.logWarning("Transient?error?while?recovering?-?will?retry?later...",?couldNotRetrieveCommittingXids);}}
public?static?List<XID>?recoverXids(XAResource?xaResource,?XidSelector?selector)?throws?XAException?{List<XID>?ret?=?new?ArrayList<XID>();boolean?done?=?false;int?flags?=?XAResource.TMSTARTRSCAN;Xid[]?xidsFromLastScan?=?null;List<XID>?allRecoveredXidsSoFar?=?new?ArrayList<XID>();do?{xidsFromLastScan?=?xaResource.recover(flags);flags?=?XAResource.TMNOFLAGS;done?=?(xidsFromLastScan?==?null?||?xidsFromLastScan.length?==?0);if?(!done)?{//?TEMPTATIVELY?SET?done?TO?TRUE//?TO?TOLERATE?ORACLE?8.1.7?INFINITE//?LOOP?(ALWAYS?RETURNS?SAME?RECOVER//?SET).?IF?A?NEW?SET?OF?XIDS?IS?RETURNED//?THEN?done?WILL?BE?RESET?TO?FALSEdone?=?true;for?(?int?i?=?0;?i?<?xidsFromLastScan.length;?i++?)?{XID?xid?=?new?XID?(?xidsFromLastScan[i]?);//?our?own?XID?implements?equals?and?hashCode?properlyif?(!allRecoveredXidsSoFar.contains(xid))?{//?a?new?xid?is?returned?->?we?can?not?be?in?a?recovery?loop?->?go?onallRecoveredXidsSoFar.add(xid);done?=?false;if?(selector.selects(xid))?{ret.add(xid);}}}}}?while?(!done);return?ret;}
?protected?static?Xid[]?recover(Connection?c,?int?flag)?throws?XAException?{/**?The?XA?RECOVER?statement?returns?information?for?those?XA?transactions?on?the?MySQL?server?that?are?in?the?PREPARED?state.?(See?Section?13.4.7.2,????XA*?Transaction?States???.)?The?output?includes?a?row?for?each?such?XA?transaction?on?the?server,?regardless?of?which?client?started?it.**?XA?RECOVER?output?rows?look?like?this?(for?an?example?xid?value?consisting?of?the?parts?'abc',?'def',?and?7):**?mysql>?XA?RECOVER;*?+----------+--------------+--------------+--------+*?|?formatID?|?gtrid_length?|?bqual_length?|?data?|*?+----------+--------------+--------------+--------+*?|?7?|?3?|?3?|?abcdef?|*?+----------+--------------+--------------+--------+**?The?output?columns?have?the?following?meanings:**?formatID?is?the?formatID?part?of?the?transaction?xid*?gtrid_length?is?the?length?in?bytes?of?the?gtrid?part?of?the?xid*?bqual_length?is?the?length?in?bytes?of?the?bqual?part?of?the?xid*?data?is?the?concatenation?of?the?gtrid?and?bqual?parts?of?the?xid*/boolean?startRscan?=?((flag?&?TMSTARTRSCAN)?>?0);boolean?endRscan?=?((flag?&?TMENDRSCAN)?>?0);if?(!startRscan?&&?!endRscan?&&?flag?!=?TMNOFLAGS)?{throw?new?MysqlXAException(XAException.XAER_INVAL,?Messages.getString("MysqlXAConnection.001"),?null);}////?We?return?all?recovered?XIDs?at?once,?so?if?not??TMSTARTRSCAN,?return?no?new?XIDs////?We?don't?attempt?to?maintain?state?to?check?for?TMNOFLAGS?"outside"?of?a?scan//if?(!startRscan)?{return?new?Xid[0];}ResultSet?rs?=?null;Statement?stmt?=?null;List<MysqlXid>?recoveredXidList?=?new?ArrayList<MysqlXid>();try?{//?TODO:?Cache?this?for?lifetime?of?XAConnectionstmt?=?c.createStatement();rs?=?stmt.executeQuery("XA?RECOVER");while?(rs.next())?{final?int?formatId?=?rs.getInt(1);int?gtridLength?=?rs.getInt(2);int?bqualLength?=?rs.getInt(3);byte[]?gtridAndBqual?=?rs.getBytes(4);final?byte[]?gtrid?=?new?byte[gtridLength];final?byte[]?bqual?=?new?byte[bqualLength];if?(gtridAndBqual.length?!=?(gtridLength?+?bqualLength))?{throw?new?MysqlXAException(XAException.XA_RBPROTO,?Messages.getString("MysqlXAConnection.002"),?null);}System.arraycopy(gtridAndBqual,?0,?gtrid,?0,?gtridLength);System.arraycopy(gtridAndBqual,?gtridLength,?bqual,?0,?bqualLength);recoveredXidList.add(new?MysqlXid(gtrid,?bqual,?formatId));}}?catch?(SQLException?sqlEx)?{throw?mapXAExceptionFromSQLException(sqlEx);}?finally?{if?(rs?!=?null)?{try?{rs.close();}?catch?(SQLException?sqlEx)?{throw?mapXAExceptionFromSQLException(sqlEx);}}if?(stmt?!=?null)?{try?{stmt.close();}?catch?(SQLException?sqlEx)?{throw?mapXAExceptionFromSQLException(sqlEx);}}}int?numXids?=?recoveredXidList.size();Xid[]?asXids?=?new?Xid[numXids];Object[]?asObjects?=?recoveredXidList.toArray();for?(int?i?=?0;?i?<?numXids;?i++)?{asXids[i]?=?(Xid)?asObjects[i];}return?asXids;}
MySQL 5.6版本在客戶端退出的時候,自動把已經prepare的事務回滾了,那么MySQL為什么要這樣做?這主要取決于MySQL的內部實現,MySQL 5.7以前的版本,對于prepare的事務,MySQL是不會記錄binlog的(官方說是減少fsync,起到了優化的作用)。只有當分布式事務提交的時候才會把前面的操作寫入binlog信息,所以對于binlog來說,分布式事務與普通的事務沒有區別,而prepare以前的操作信息都保存在連接的IO_CACHE中,如果這個時候客戶端退出了,以前的binlog信息都會被丟失,再次重連后允許提交的話,會造成Binlog丟失,從而造成主從數據的不一致,所以官方在客戶端退出的時候直接把已經prepare的事務都回滾了!
??Collection<XID>?xidsToCommit?=?retrieveExpiredCommittingXidsFromLog();
public?Collection<ParticipantLogEntry>?getCommittingParticipants()throws?LogReadException?{Collection<ParticipantLogEntry>?committingParticipants?=?new?HashSet<ParticipantLogEntry>();Collection<CoordinatorLogEntry>?committingCoordinatorLogEntries?=?repository.findAllCommittingCoordinatorLogEntries();for?(CoordinatorLogEntry?coordinatorLogEntry?:?committingCoordinatorLogEntries)?{for?(ParticipantLogEntry?participantLogEntry?:?coordinatorLogEntry.participants)?{committingParticipants.add(participantLogEntry);}}return?committingParticipants;}
到這里我們來簡單介紹一下,事務日志的存儲結構。首先是 CoordinatorLogEntry,這是一次XA事務的所有信息實體類。
public?class?CoordinatorLogEntry?implements?Serializable?{//全局事務idpublic?final?String?id;//是否已經提交public?final?boolean?wasCommitted;/***?Only?for?subtransactions,?null?otherwise.*/public?final?String?superiorCoordinatorId;//參與者集合public?final?ParticipantLogEntry[]?participants;
}
public?class?ParticipantLogEntry?implements?Serializable?{private?static?final?long?serialVersionUID?=?1728296701394899871L;/***?The?ID?of?the?global?transaction?as?known?by?the?transaction?core.*/public?final?String?coordinatorId;/***?Identifies?the?participant?within?the?global?transaction.*/public?final?String?uri;/***?When?does?this?participant?expire?(expressed?in?millis?since?Jan?1,?1970)?*/public?final?long?expires;/***?Best-known?state?of?the?participant.*/public?final?TxState?state;/***?For?diagnostic?purposes,?null?if?not?relevant.*/public?final?String?resourceName;
}
public?Set<XID>?getExpiredCommittingXids()?throws?LogReadException?{Set<XID>?ret?=?new?HashSet<XID>();Collection<ParticipantLogEntry>?entries?=?log.getCommittingParticipants();for?(ParticipantLogEntry?entry?:?entries)?{if?(expired(entry)?&&?!http(entry))?{XID?xid?=?new?XID(entry.coordinatorId,?entry.uri);ret.add(xid);}}return?ret;}
List<XID>?xidsToRecover?=?retrievePreparedXidsFromXaResource(xaResource);Collection<XID>?xidsToCommit;try?{xidsToCommit?=?retrieveExpiredCommittingXidsFromLog();for?(XID?xid?:?xidsToRecover)?{if?(xidsToCommit.contains(xid))?{replayCommit(xid,?xaResource);}?else?{attemptPresumedAbort(xid,?xaResource);}}}?catch?(LogException?couldNotRetrieveCommittingXids)?{LOGGER.logWarning("Transient?error?while?recovering?-?will?retry?later...",?couldNotRetrieveCommittingXids);}
private?void?replayCommit(XID?xid,?XAResource?xaResource)?{if?(LOGGER.isDebugEnabled())?LOGGER.logDebug("Replaying?commit?of?xid:?"?+?xid);try?{//進行事務提交xaResource.commit(xid,?false);//更新事務日志log.terminated(xid);}?catch?(XAException?e)?{if?(alreadyHeuristicallyTerminatedByResource(e))?{handleHeuristicTerminationByResource(xid,?xaResource,?e,?true);}?else?if?(xidTerminatedInResourceByConcurrentCommit(e))?{log.terminated(xid);}?else?{LOGGER.logWarning("Transient?error?while?replaying?commit?-?will?retry?later...",?e);}}}
private?void?attemptPresumedAbort(XID?xid,?XAResource?xaResource)?{try?{log.presumedAborting(xid);if?(LOGGER.isDebugEnabled())?LOGGER.logDebug("Presumed?abort?of?xid:?"?+?xid);try?{//進行回滾xaResource.rollback(xid);//更新日志狀態log.terminated(xid);}?catch?(XAException?e)?{if?(alreadyHeuristicallyTerminatedByResource(e))?{handleHeuristicTerminationByResource(xid,?xaResource,?e,?false);}?else?if?(xidTerminatedInResourceByConcurrentRollback(e))?{log.terminated(xid);}?else?{LOGGER.logWarning("Unexpected?exception?during?recovery?-?ignoring?to?retry?later...",?e);}}}?catch?(IllegalStateException?presumedAbortNotAllowedInCurrentLogState)?{//?ignore?to?retry?later?if?necessary}?catch?(LogException?logWriteException)?{LOGGER.logWarning("log?write?failed?for?Xid:?"+xid+",?ignoring?to?retry?later",?logWriteException);}}
總結 文章到此,已經寫的很長很多了,我們分析了ShardingSphere對于XA方案,提供了一套SPI解決方案,對Atomikos進行了整合,也分析了Atomikos初始化流程,開始事務流程,獲取連接流程,提交事務流程,回滾事務流程,事務恢復流程。希望對大家理解XA的原理有所幫助。
加入我們 Apache ShardingSphere 一直踐行Apache Way的開源之道,社區完全開放與平等,人人享受開源帶來的快樂。
地址: https://github.com/apache/shardingsphere
作者介紹:肖宇,Apache ShardingSphere Committer,開源hmily分布式事務框架作者, 開源soul網關作者,熱愛開源,追求寫優雅代碼。目前就職入京東數科,參與ShardingSphere的開源建設,以及分布式數據庫的研發工作。
想知道更多? 掃 描下面的二維碼關注我
后臺回復"技術",加入技術群
【精彩推薦】
原創|OpenAPI標準規范 如此簡單| ES最全詳細使用教程 ClickHouse到底是什么?為什么如此牛逼! 原來ElasticSearch還可以這么理解 面試官:InnoDB中一棵B+樹可以存放多少行數據? 微服務下如何解耦?對于已經緊耦合下如何重構? 如何構建一套高性能、高可用、低成本的視頻處理系統? 架構之道:分離業務邏輯和技術細節 星巴克不使用兩階段提交 點個贊+在看,少個 bug?????
總結
以上是生活随笔 為你收集整理的深度剖析Apache Shardingsphere对分布式事务的支持 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。