事务回滚什么意思 try_分布式事务 TCC-Transaction 源码分析——事务恢复
1. 概述
本文分享 TCC 恢復。主要涉及如下二個 package 路徑下的類:
- org.mengyun.tcctransaction.recover
- RecoverConfig,事務恢復配置接口
- TransactionRecovery,事務恢復邏輯
- org.mengyun.tcctransaction.spring.recover :
- DefaultRecoverConfig,默認事務恢復配置實現
- RecoverScheduledJob,事務恢復定時任務
本文涉及到的類關系如下圖( 打開大圖 ):
在《TCC-Transaction 源碼分析 —— 事務存儲器》中,事務信息被持久化到外部的存儲器中。事務存儲是事務恢復的基礎。通過讀取外部存儲器中的異常事務,定時任務會按照一定頻率對事務進行重試,直到事務完成或超過最大重試次數。
你行好事會因為得到贊賞而愉悅同理,開源項目貢獻者會因為 Star 而更加有動力為 TCC-Transaction 點贊!傳送門ps:筆者假設你已經閱讀過《tcc-transaction 官方文檔 —— 使用指南1.2.x》。
2. 事務重試配置
org.mengyun.tcctransaction.recover.RecoverConfig,事務恢復配置接口,實現代碼如下:
public interface RecoverConfig {
/**
* @return 最大重試次數
*/
int getMaxRetryCount();
/**
* @return 恢復間隔時間,單位:秒
*/
int getRecoverDuration();
/**
* @return cron 表達式
*/
String getCronExpression();
/**
* @return 延遲取消異常集合
*/
Set> getDelayCancelExceptions();
/**
* 設置延遲取消異常集合
*
* @param delayRecoverExceptions 延遲取消異常集合
*/
void setDelayCancelExceptions(Set> delayRecoverExceptions);
}
- #getMaxRetryCount(),單個事務恢復最大重試次數。超過最大重試次數后,目前僅打出錯誤日志,下文會看到實現。
- #getRecoverDuration(),單個事務恢復重試的間隔時間,單位:秒。
- #getCronExpression(),定時任務 cron 表達式。
- #getDelayCancelExceptions(),延遲取消異常集合。
org.mengyun.tcctransaction.spring.recover.DefaultRecoverConfig,默認事務恢復配置實現,實現代碼如下:
public class DefaultRecoverConfig implements RecoverConfig {
public static final RecoverConfig INSTANCE = new DefaultRecoverConfig();
/**
* 最大重試次數
*/
private int maxRetryCount = 30;
/**
* 恢復間隔時間,單位:秒
*/
private int recoverDuration = 120;
/**
* cron 表達式
*/
private String cronExpression = "0 */1 * * * ?";
/**
* 延遲取消異常集合
*/
private Set> delayCancelExceptions = new HashSet>();
public DefaultRecoverConfig() {
delayCancelExceptions.add(OptimisticLockException.class);
delayCancelExceptions.add(SocketTimeoutException.class);
}
@Override
public void setDelayCancelExceptions(Set> delayCancelExceptions) {
this.delayCancelExceptions.addAll(delayCancelExceptions);
}
}
- maxRetryCount,單個事務恢復最大重試次數 為 30。
- recoverDuration,單個事務恢復重試的間隔時間為 120 秒。
- cronExpression,定時任務 cron 表達式為 "0 */1 * * * ?",每分鐘執行一次。如果你希望定時任務執行的更頻繁,可以修改 cron 表達式,例如 0/30 * * * * ?,每 30 秒執行一次。
- delayCancelExceptions,延遲取消異常集合。在 DefaultRecoverConfig 構造方法里,預先添加了 OptimisticLockException / SocketTimeoutException 。
- 官方解釋:事務恢復的疑問
- 這塊筆者還有一些疑問,如果有別的可能性導致這個情況,麻煩告知下筆者。謝謝。
- 官方解釋:為什么 tcc 事務切面中對樂觀鎖與socket超時異常不做回滾處理,只拋異常?
- 針對 SocketTimeoutException :try 階段,本地參與者調用遠程參與者( 遠程服務,例如 Dubbo,Http 服務),遠程參與者 try 階段的方法邏輯執行時間較長,超過 Socket 等待時長,發生 SocketTimeoutException,如果立刻執行事務回滾,遠程參與者 try 的方法未執行完成,可能導致 cancel 的方法實際未執行( try 的方法未執行完成,數據庫事務【非 TCC 事務】未提交,cancel 的方法讀取數據時發現未變更,導致方法實際未執行,最終 try 的方法執行完后,提交數據庫事務【非 TCC 事務】,較為極端 ),最終引起數據不一致。在事務恢復時,會對這種情況的事務進行取消回滾,如果此時遠程參與者的 try 的方法還未結束,還是可能發生數據不一致。
- 針對 OptimisticLockException :還是 SocketTimeoutException 的情況,事務恢復間隔時間小于 Socket 超時時間,此時事務恢復調用遠程參與者取消回滾事務,遠程參與者下次更新事務時,會因為樂觀鎖更新失敗,拋出 OptimisticLockException。如果 CompensableTransactionInterceptor 此時立刻取消回滾,可能會和定時任務的取消回滾沖突,因此統一交給定時任務處理。
3. 事務重試定時任務
org.mengyun.tcctransaction.spring.recover.RecoverScheduledJob,事務恢復定時任務,基于 Quartz 實現調度,不斷不斷不斷執行事務恢復。實現代碼如下:
public class RecoverScheduledJob {
private TransactionRecovery transactionRecovery;
private TransactionConfigurator transactionConfigurator;
private Scheduler scheduler;
public void init() {
try {
// Quartz JobDetail
MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();
jobDetail.setTargetObject(transactionRecovery);
jobDetail.setTargetMethod("startRecover");
jobDetail.setName("transactionRecoveryJob");
jobDetail.setConcurrent(false); // 禁止并發
jobDetail.afterPropertiesSet();
// Quartz CronTriggerFactoryBean
CronTriggerFactoryBean cronTrigger = new CronTriggerFactoryBean();
cronTrigger.setBeanName("transactionRecoveryCronTrigger");
cronTrigger.setCronExpression(transactionConfigurator.getRecoverConfig().getCronExpression());
cronTrigger.setJobDetail(jobDetail.getObject());
cronTrigger.afterPropertiesSet();
// 啟動任務調度
scheduler.scheduleJob(jobDetail.getObject(), cronTrigger.getObject());
// 啟動 Quartz Scheduler
scheduler.start();
} catch (Exception e) {
throw new SystemException(e);
}
}
}
- 調用 MethodInvokingJobDetailFactoryBean#setConcurrent(false) 方法,禁用任務并發執行。
- 調用 MethodInvokingJobDetailFactoryBean#setTargetObject(...) + MethodInvokingJobDetailFactoryBean#setTargetMethod(...) 方法,設置任務調用 TransactionRecovery#startRecover(...) 方法執行。
如果應用集群部署,會不會相同事務被多個定時任務同時重試?
答案是不會,事務在重試時會樂觀鎖更新,同時只有一個應用節點能更新成功。
官方解釋:多機部署下,所有機器都宕機,從異常中恢復時,所有的機器豈不是都可以查詢到所有的需要恢復的服務?
當然極端情況下,Socket 調用超時時間大于事務重試間隔,第一個節點在重試某個事務,一直未執行完成,第二個節點已經可以重試。
ps:建議,Socket 調用超時時間小于事務重試間隔。
是否定時任務和應用服務器解耦?
螞蟻金服的分布式事務服務 DTS 采用 client-server 模式:
- xts-client :負責事務的創建、提交、回滾、記錄。
- xts-server :負責異常事務的恢復。
4. 異常事務恢復
org.mengyun.tcctransaction.recover.TransactionRecovery,異常事務恢復,實現主體代碼如下:
```Java
public class TransactionRecovery {
/**
* 啟動恢復事務邏輯
*/
public void startRecover() {
// 加載異常事務集合
List transactions = loadErrorTransactions();
// 恢復異常事務集合
recoverErrorTransactions(transactions);
}
}
```
4.1 加載異常事務集合
調用 #loadErrorTransactions() 方法,加載異常事務集合。實現代碼如下:
private List loadErrorTransactions() {
TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();
long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();
RecoverConfig recoverConfig = transactionConfigurator.getRecoverConfig();
return transactionRepository.findAllUnmodifiedSince(new Date(currentTimeInMillis - recoverConfig.getRecoverDuration() * 1000));
}
- 異常事務的定義:當前時間超過 - 事務變更時間( 最后執行時間 ) >= 事務恢復間隔( RecoverConfig#getRecoverDuration() )。這里有一點要注意,已完成的事務會從事務存儲器刪除。
4.2 恢復異常事務集合
調用 #recoverErrorTransactions(...) 方法,恢復異常事務集合。實現代碼如下:
private void recoverErrorTransactions(List transactions) {
for (Transaction transaction : transactions) {
// 超過最大重試次數
if (transaction.getRetriedCount() > transactionConfigurator.getRecoverConfig().getMaxRetryCount()) {
logger.error(String.format("recover failed with max retry count,will not try again. txid:%s, status:%s,retried count:%d,transaction content:%s
總結
以上是生活随笔為你收集整理的事务回滚什么意思 try_分布式事务 TCC-Transaction 源码分析——事务恢复的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: openlayer右键菜单_使用Open
- 下一篇: 玩cf出现outofmemory_《穿越