Fescar 全局锁介绍
生活随笔
收集整理的這篇文章主要介紹了
Fescar 全局锁介绍
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
開篇
?這篇文章的目的主要是講解TC的在處理分支事務(wù)注冊過程中對全局鎖的處理流程,理解了全局鎖以后才能明白對DB同一個記錄進行多次變更是如何解決的。
?如上圖所示,問最終全局事務(wù)A對資源R1應(yīng)該回滾到哪種狀態(tài)?很明顯,如果再根據(jù)UndoLog去做回滾,就會發(fā)生嚴重問題:覆蓋了全局事務(wù)B對資源R1的變更。
?那Fescar是如何解決這個問題呢?答案就是?Fescar的全局寫排它鎖解決方案,在全局事務(wù)A執(zhí)行過程中全局事務(wù)B會因為獲取不到全局鎖而處于等待狀態(tài)。
Fescar 全局鎖處理流程
RM 嘗試獲取全局鎖
public class ConnectionProxy extends AbstractConnectionProxy {public void commit() throws SQLException {if (context.inGlobalTransaction()) {try {// 1、向TC發(fā)起注冊操作并檢查是否能夠獲取全局鎖register();} catch (TransactionException e) {recognizeLockKeyConflictException(e);}try {if (context.hasUndoLog()) {UndoLogManager.flushUndoLogs(this);}// 2、執(zhí)行本地的事務(wù)的commit操作targetConnection.commit();} catch (Throwable ex) {report(false);if (ex instanceof SQLException) {throw (SQLException) ex;} else {throw new SQLException(ex);}}report(true);context.reset();} else {targetConnection.commit();}}private void register() throws TransactionException {Long branchId = DataSourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),null, context.getXid(), context.buildLockKeys());context.setBranchId(branchId);} }說明:
- RM 執(zhí)行本地事務(wù)提交操作在ConnectionProxy的commit()完成。
- commit()的過程當(dāng)中按照register->commit的流程執(zhí)行。
- register()過程RM會向TC發(fā)起注冊請求判斷是否能夠獲取全局鎖。
- register()通過DataSourceManager的branchRegister()操作完成。
TC處理全局鎖申請流程
public class DefaultCore implements Core {protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response,RpcContext rpcContext) throws TransactionException {response.setTransactionId(request.getTransactionId());response.setBranchId(core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(),XID.generateXID(request.getTransactionId()), request.getLockKey()));}public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String lockKeys) throws TransactionException {GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin);BranchSession branchSession = new BranchSession();branchSession.setTransactionId(XID.getTransactionId(xid));branchSession.setBranchId(UUIDGenerator.generateUUID());branchSession.setApplicationId(globalSession.getApplicationId());branchSession.setTxServiceGroup(globalSession.getTransactionServiceGroup());branchSession.setBranchType(branchType);branchSession.setResourceId(resourceId);branchSession.setLockKey(lockKeys);branchSession.setClientId(clientId);// 判斷branchSession是否能夠獲取鎖if (!branchSession.lock()) {throw new TransactionException(LockKeyConflict);}try {globalSession.addBranch(branchSession);} catch (RuntimeException ex) {throw new TransactionException(FailedToAddBranch);}return branchSession.getBranchId();}public boolean lock() throws TransactionException {return LockManagerFactory.get().acquireLock(this);}}說明:
- TC 在處理branchRegister()的過程中會判斷branchResiter請求攜帶的session信息能否獲取全局鎖。
- branchSession.lock()判斷能否獲取鎖,如果獲取失敗則拋出TransactionException(LockKeyConflict)異常。
- branchSession.lock()判斷能否獲取鎖,如果獲取成功則將branchSession添加到全局鎖當(dāng)中。
TC 判斷全局鎖分配流程
public class DefaultLockManagerImpl implements LockManager {public boolean acquireLock(BranchSession branchSession) throws TransactionException {String resourceId = branchSession.getResourceId();long transactionId = branchSession.getTransactionId();//1、根據(jù)resourceId去LOCK_MAP獲取,獲取失敗則新增一個空的對象。ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>> dbLockMap = LOCK_MAP.get(resourceId);if (dbLockMap == null) {LOCK_MAP.putIfAbsent(resourceId, new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>>());dbLockMap = LOCK_MAP.get(resourceId);}ConcurrentHashMap<Map<String, Long>, Set<String>> bucketHolder = branchSession.getLockHolder();// 2、獲取branchSession的全局鎖的key對象String lockKey = branchSession.getLockKey();if(StringUtils.isEmpty(lockKey)) {return true;}// 3、按照分號“;”切割多個LockKey,每個LockKey按照table:pk1;pk2;pk3格式組裝。String[] tableGroupedLockKeys = lockKey.split(";");for (String tableGroupedLockKey : tableGroupedLockKeys) {int idx = tableGroupedLockKey.indexOf(":");if (idx < 0) {branchSession.unlock();throw new ShouldNeverHappenException("Wrong format of LOCK KEYS: " + branchSession.getLockKey());}// 4、分割獲取branchRegister請求的表名和pks。String tableName = tableGroupedLockKey.substring(0, idx);String mergedPKs = tableGroupedLockKey.substring(idx + 1);// 5、獲取表下的已經(jīng)加鎖的記錄tableLockMap ConcurrentHashMap<Integer, Map<String, Long>> tableLockMap = dbLockMap.get(tableName);if (tableLockMap == null) {dbLockMap.putIfAbsent(tableName, new ConcurrentHashMap<Integer, Map<String, Long>>());tableLockMap = dbLockMap.get(tableName);}// 6、遍歷該表所有pks判斷是否已加鎖。String[] pks = mergedPKs.split(",");for (String pk : pks) {// 7、同一個表的pk按照hash值進行hash分配到tableLockMap當(dāng)中。int bucketId = pk.hashCode() % BUCKET_PER_TABLE;Map<String, Long> bucketLockMap = tableLockMap.get(bucketId);if (bucketLockMap == null) {tableLockMap.putIfAbsent(bucketId, new HashMap<String, Long>());bucketLockMap = tableLockMap.get(bucketId);}// 8、根據(jù)pk去獲取bucketLockMap當(dāng)中獲取鎖對象。synchronized (bucketLockMap) {Long lockingTransactionId = bucketLockMap.get(pk);if (lockingTransactionId == null) {// No existing lock// 9、將鎖添加到branchSession當(dāng)中bucketLockMap.put(pk, transactionId);Set<String> keysInHolder = bucketHolder.get(bucketLockMap);if (keysInHolder == null) {bucketHolder.putIfAbsent(bucketLockMap, new ConcurrentSet<String>());keysInHolder = bucketHolder.get(bucketLockMap);}keysInHolder.add(pk);} else if (lockingTransactionId.longValue() == transactionId) {// Locked by mecontinue;} else {// 直接返回異常LOGGER.info("Global lock on [" + tableName + ":" + pk + "] is holding by " + lockingTransactionId);branchSession.unlock(); // Release all acquired locks.return false;}}}}return true;} }說明:
- TC 判斷branchRegister操作能否獲取鎖按照維度層層遞進判斷。
- 1、先從ResourceId的維度進行判斷, LOCK_MAP.get(resourceId)。
- 2、再從tableName的維度進行判斷, dbLockMap.get(tableName)。
- 3、再從pk的hashcode的維度進行判斷,tableLockMap.get(bucketId)。
- 4、在從pk的維度進行判斷,bucketLockMap.get(pk)。
- 5、如果按照上述維度判斷后未存在加鎖的branchSession就返回能夠加鎖,否則返回不能加鎖。
- 6、判斷加鎖過程中處理了冪等,如果自己已經(jīng)加鎖了可以再次獲取鎖。
TM 全局鎖維護數(shù)據(jù)結(jié)構(gòu)
private static final ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>>> LOCK_MAP = new ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>>>();說明:
- LOCK_MAP 作為TC全局鎖的保存結(jié)構(gòu)。
- 第一層ConcurrentHashMap的key為resourceId。
- 第二層ConcurrentHashMap的key為tableName。
- 第三層ConcurrentHashMap的key為pk的hashCode。
- 第四層的Map的key為pk,value為transactionId(RM攜帶過來)。
參考文章
- 分布式事務(wù)中間件Fescar—全局寫排它鎖解讀
Fescar源碼分析連載
- Fescar 源碼解析系列
總結(jié)
以上是生活随笔為你收集整理的Fescar 全局锁介绍的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何在基于Bytom开发过程中集成IPF
- 下一篇: 寒假作业一编程总结